feat(streamer): supervisor wiring all subprocesses

This commit is contained in:
devilreef 2026-04-30 15:30:45 +06:00
parent 7cdc8f0d44
commit 0ff5ecd9d0
Signed by: devilreef
SSH key fingerprint: SHA256:UZisRr4iuXx+IhkbZnR655L2RWAT6o2rgbGv5F/6m3Y

125
streamer/src/index.ts Normal file
View file

@ -0,0 +1,125 @@
import { createServer } from "node:http";
import { join } from "node:path";
import { loadConfig } from "./config.js";
import { NowPlayingWatcher } from "./nowplaying.js";
import { PageServer } from "./page-server.js";
import { PcmTap } from "./pcm-tap.js";
import { SpectrumAnalyzer } from "./fft.js";
import { ChromeRenderer } from "./chrome.js";
import { Ffmpeg } from "./ffmpeg.js";
import { IcecastPoller } from "./icecast.js";
const cfg = loadConfig();
const log = (level: string, msg: string, extra?: unknown) =>
console.log(JSON.stringify({ ts: new Date().toISOString(), level, msg, ...(extra ? { extra } : {}) }));
const PAGE_PORT = 8080;
const STATIC_DIR = join(import.meta.dirname ?? __dirname, "views");
const BARS = cfg.style === "denpa" ? 48 : 72;
let lastFrameAt = 0;
async function main() {
const np = new NowPlayingWatcher({
station: cfg.station,
nowPlayingDir: cfg.nowPlayingDir,
libraryDir: cfg.libraryDir,
});
const page = new PageServer({
port: PAGE_PORT,
staticDir: STATIC_DIR,
libraryDir: cfg.libraryDir,
style: cfg.style,
station: cfg.station,
tz: cfg.tz,
tuneInUrl: cfg.stationTuneInUrl,
});
const pcm = new PcmTap({ host: cfg.pcmHost, port: cfg.pcmPort });
const spec = new SpectrumAnalyzer({ bars: BARS, sampleRate: 48000 });
const ice = cfg.icecastStatusUrl
? new IcecastPoller({ statusUrl: cfg.icecastStatusUrl, mountName: `/${cfg.station}.mp3` })
: null;
const ffmpeg = new Ffmpeg({
rtmpUrl: cfg.rtmpUrl,
width: cfg.resolution.width,
height: cfg.resolution.height,
framerate: cfg.framerate,
videoBitrate: cfg.videoBitrate,
audioBitrate: cfg.audioBitrate,
});
const chrome = new ChromeRenderer({
pageUrl: `http://127.0.0.1:${PAGE_PORT}/?style=${cfg.style}`,
width: cfg.resolution.width,
height: cfg.resolution.height,
framerate: cfg.framerate,
});
await page.start();
await np.start();
np.on("change", (s) => page.pushNowPlaying(s));
if (np.current()) page.pushNowPlaying(np.current());
const pushTimer: NodeJS.Timeout = setInterval(() => page.pushSpectrum(spec.bars()), Math.round(1000 / 30));
ice?.on("listeners", (l) => page.pushListeners(l));
ice?.start();
const { videoIn, audioIn } = ffmpeg.start();
ffmpeg.on("log", (m: string) => log("info", "ffmpeg", m.trim()));
ffmpeg.on("exit", (code: number | null) => {
log("error", "ffmpeg exited", { code });
process.exit(1);
});
pcm.on("data", (chunk: Buffer) => {
spec.feed(chunk);
audioIn.write(chunk);
});
pcm.on("connecting", () => log("info", "pcm connecting"));
pcm.on("connected", () => log("info", "pcm connected"));
pcm.on("disconnected", () => log("warn", "pcm disconnected"));
pcm.start();
await chrome.start();
chrome.on("frame", (buf: Buffer) => {
lastFrameAt = Date.now();
videoIn.write(buf);
});
chrome.on("disconnected", () => {
log("error", "chrome disconnected");
process.exit(1);
});
// health endpoint
const health = createServer((req, res) => {
if (req.url !== "/health") { res.statusCode = 404; res.end(); return; }
const stale = Date.now() - lastFrameAt > 5000;
res.statusCode = stale ? 503 : 200;
res.setHeader("content-type", "application/json");
res.end(JSON.stringify({ ok: !stale, lastFrameAt }));
});
health.listen(cfg.healthPort, "0.0.0.0", () => log("info", `health :${cfg.healthPort}`));
const shutdown = async () => {
clearInterval(pushTimer);
pcm.stop();
await chrome.stop();
ffmpeg.stop();
np.stop();
ice?.stop();
await page.stop();
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
}
main().catch((err) => {
log("error", "fatal", { err: err instanceof Error ? err.message : String(err) });
process.exit(1);
});