diff --git a/streamer/src/index.ts b/streamer/src/index.ts new file mode 100644 index 0000000..28bbf65 --- /dev/null +++ b/streamer/src/index.ts @@ -0,0 +1,125 @@ +import { createServer } from "node:http"; +import { join } from "node:path"; +import { loadConfig } from "./config.js"; +import { NowPlayingWatcher } from "./nowplaying.js"; +import { PageServer } from "./page-server.js"; +import { PcmTap } from "./pcm-tap.js"; +import { SpectrumAnalyzer } from "./fft.js"; +import { ChromeRenderer } from "./chrome.js"; +import { Ffmpeg } from "./ffmpeg.js"; +import { IcecastPoller } from "./icecast.js"; + +const cfg = loadConfig(); +const log = (level: string, msg: string, extra?: unknown) => + console.log(JSON.stringify({ ts: new Date().toISOString(), level, msg, ...(extra ? { extra } : {}) })); + +const PAGE_PORT = 8080; +const STATIC_DIR = join(import.meta.dirname ?? __dirname, "views"); +const BARS = cfg.style === "denpa" ? 48 : 72; + +let lastFrameAt = 0; + +async function main() { + const np = new NowPlayingWatcher({ + station: cfg.station, + nowPlayingDir: cfg.nowPlayingDir, + libraryDir: cfg.libraryDir, + }); + + const page = new PageServer({ + port: PAGE_PORT, + staticDir: STATIC_DIR, + libraryDir: cfg.libraryDir, + style: cfg.style, + station: cfg.station, + tz: cfg.tz, + tuneInUrl: cfg.stationTuneInUrl, + }); + + const pcm = new PcmTap({ host: cfg.pcmHost, port: cfg.pcmPort }); + const spec = new SpectrumAnalyzer({ bars: BARS, sampleRate: 48000 }); + + const ice = cfg.icecastStatusUrl + ? new IcecastPoller({ statusUrl: cfg.icecastStatusUrl, mountName: `/${cfg.station}.mp3` }) + : null; + + const ffmpeg = new Ffmpeg({ + rtmpUrl: cfg.rtmpUrl, + width: cfg.resolution.width, + height: cfg.resolution.height, + framerate: cfg.framerate, + videoBitrate: cfg.videoBitrate, + audioBitrate: cfg.audioBitrate, + }); + + const chrome = new ChromeRenderer({ + pageUrl: `http://127.0.0.1:${PAGE_PORT}/?style=${cfg.style}`, + width: cfg.resolution.width, + height: cfg.resolution.height, + framerate: cfg.framerate, + }); + + await page.start(); + await np.start(); + np.on("change", (s) => page.pushNowPlaying(s)); + if (np.current()) page.pushNowPlaying(np.current()); + + const pushTimer: NodeJS.Timeout = setInterval(() => page.pushSpectrum(spec.bars()), Math.round(1000 / 30)); + + ice?.on("listeners", (l) => page.pushListeners(l)); + ice?.start(); + + const { videoIn, audioIn } = ffmpeg.start(); + ffmpeg.on("log", (m: string) => log("info", "ffmpeg", m.trim())); + ffmpeg.on("exit", (code: number | null) => { + log("error", "ffmpeg exited", { code }); + process.exit(1); + }); + + pcm.on("data", (chunk: Buffer) => { + spec.feed(chunk); + audioIn.write(chunk); + }); + pcm.on("connecting", () => log("info", "pcm connecting")); + pcm.on("connected", () => log("info", "pcm connected")); + pcm.on("disconnected", () => log("warn", "pcm disconnected")); + pcm.start(); + + await chrome.start(); + chrome.on("frame", (buf: Buffer) => { + lastFrameAt = Date.now(); + videoIn.write(buf); + }); + chrome.on("disconnected", () => { + log("error", "chrome disconnected"); + process.exit(1); + }); + + // health endpoint + const health = createServer((req, res) => { + if (req.url !== "/health") { res.statusCode = 404; res.end(); return; } + const stale = Date.now() - lastFrameAt > 5000; + res.statusCode = stale ? 503 : 200; + res.setHeader("content-type", "application/json"); + res.end(JSON.stringify({ ok: !stale, lastFrameAt })); + }); + health.listen(cfg.healthPort, "0.0.0.0", () => log("info", `health :${cfg.healthPort}`)); + + const shutdown = async () => { + clearInterval(pushTimer); + pcm.stop(); + await chrome.stop(); + ffmpeg.stop(); + np.stop(); + ice?.stop(); + await page.stop(); + process.exit(0); + }; + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); +} + +main().catch((err) => { + log("error", "fatal", { err: err instanceof Error ? err.message : String(err) }); + process.exit(1); +});