denpa-radio/config/liquidsoap.liq

222 lines
6.3 KiB
Text

# denpa-radio liquidsoap script
# adding a station: copy a station block, rename, restart liquidsoap.
settings.log.stdout.set(true)
settings.log.level.set(3)
source_pw = environment.get(default="", "SOURCE_PW")
# deck-shuffle: full shuffle once, hand out in order, reshuffle on exhaustion.
# returns a record with .source (request.dynamic) and .peek (returns next N filenames).
def make_deck(dir) =
files = ref([])
cursor = ref(0)
last_played = ref("")
def reload_and_shuffle() =
all = file.ls(dir, recursive=true, absolute=true)
audio_exts = ["flac", "mp3", "opus", "m4a", "wav", "ogg"]
audio = list.filter(
fun(f) -> list.mem(string.case(lower=true, file.extension(leading_dot=false, f)), audio_exts),
all
)
files := list.shuffle(audio)
if !last_played != "" and list.length(!files) > 1 and list.nth(default="", !files, 0) == !last_played then
files := list.append(
[list.nth(default="", !files, 1), list.nth(default="", !files, 0)],
list.tl(list.tl(!files))
)
end
cursor := 0
end
def next_file() =
if !cursor >= list.length(!files) then reload_and_shuffle() end
if list.length(!files) == 0 then "" else
f = list.nth(default="", !files, !cursor)
cursor := !cursor + 1
last_played := f
f
end
end
def next_request() =
f = next_file()
if f == "" then null() else request.create(f) end
end
def peek(n) =
list.init(
min(n, list.length(!files) - !cursor),
fun(i) -> list.nth(default="", !files, !cursor + i)
)
end
reload_and_shuffle()
{ source = request.dynamic(next_request), peek = peek }
end
# filename-based fallbacks for tracks with empty embedded tags.
# strips leading "08. " / "12 - " etc. from the file stem.
def filename_title(filename) =
base = path.basename(filename)
ext = file.extension(leading_dot=true, base)
stem =
if ext != "" then
string.sub(base, start=0, length=string.length(base) - string.length(ext))
else base end
re = regexp("^[0-9]+[. \\-]+ *")
re.replace(fun(_) -> "", stem)
end
def filename_album(filename) =
path.basename(path.dirname(filename))
end
def or_else(s, fallback) =
if s == "" then fallback else s end
end
# walk up from the audio file to find cover.{jpg,png,webp}.
# falls back to <station>/cover.* (one level above tracks/).
def cover_for(filename) =
def find_in(dir) =
candidates = ["#{dir}/cover.jpg", "#{dir}/cover.png", "#{dir}/cover.webp"]
list.fold(
fun(found, c) -> if found == "" and file.exists(c) then c else found end,
"",
candidates
)
end
album_dir = path.dirname(filename)
c1 = find_in(album_dir)
if c1 != "" then c1 else
tracks_dir = path.dirname(album_dir)
station_root = path.dirname(tracks_dir)
find_in(station_root)
end
end
# build a json object for one upcoming track from its filename.
# reads tags via request.metadata; uses request.duration for length.
def track_meta_json(filename) =
r = request.create(filename)
ignore(request.resolve(r))
m = request.metadata(r)
request.destroy(r)
d = null.get(default=-1., request.duration(filename))
obj = json()
obj.add("title", or_else(m["title"], filename_title(filename)))
obj.add("artist", m["artist"])
obj.add("album", or_else(m["album"], filename_album(filename)))
obj.add("duration", if d > 0. then string(int_of_float(d)) else "" end)
obj.add("cover_path", cover_for(filename))
obj
end
def emit_now_playing(station, m, upcoming) =
filename = m["filename"]
if filename != "" then
meta_dur = m["duration"]
dur_str =
if meta_dur != "" then
meta_dur
else
d = null.get(default=-1., request.duration(filename))
if d > 0. then string(int_of_float(d)) else "" end
end
payload = json()
payload.add("station", station)
payload.add("artist", m["artist"])
payload.add("title", or_else(m["title"], filename_title(filename)))
payload.add("album", or_else(m["album"], filename_album(filename)))
payload.add("filename", filename)
payload.add("duration", dur_str)
payload.add("started_at", string(int_of_float(time())))
payload.add("cover_path", cover_for(filename))
up_arr = list.map(track_meta_json, upcoming)
payload.add("up_next", up_arr)
file.write(
data=payload.stringify(),
atomic=true,
temp_dir="/now-playing",
"/now-playing/#{station}.json"
)
end
end
def append_history(station, m) =
filename = m["filename"]
if filename != "" then
history_path = "/now-playing/#{station}.history.json"
entry = {
title = m["title"],
artist = m["artist"],
album = m["album"],
filename = filename,
started_at = string(int_of_float(time()))
}
arr =
if file.exists(history_path) then
existing = file.contents(history_path)
try
let json.parse (parsed : [{
title: string,
artist: string,
album: string,
filename: string,
started_at: string
}]) = existing
parsed
catch _ do
[]
end
else
[]
end
new_arr = list.add(entry, list.prefix(49, arr))
out_str = json.stringify(new_arr)
file.write(data=out_str, atomic=true, temp_dir="/now-playing", history_path)
end
end
# === station: minecraft ===
minecraft_deck = make_deck("/library/minecraft/tracks")
minecraft = minecraft_deck.source
minecraft.on_track(fun (m) -> begin
emit_now_playing("minecraft", m, minecraft_deck.peek(4))
append_history("minecraft", m)
end)
minecraft = crossfade(minecraft)
minecraft = mksafe(minecraft)
output.icecast(
%mp3(bitrate=192, stereo=true),
host="icecast", port=8000, password=source_pw,
mount="/minecraft.mp3",
name="Denpa — Minecraft",
description="Minecraft soundtracks",
genre="game-ost",
url="https://denpa.femboy.page",
minecraft
)
output.icecast(
%opus(bitrate=96, vbr="constrained", samplerate=48000, channels=2),
host="icecast", port=8000, password=source_pw,
mount="/minecraft.opus",
name="Denpa — Minecraft",
description="Minecraft soundtracks",
genre="game-ost",
url="https://denpa.femboy.page",
minecraft
)
output.url(
%ffmpeg(format="s16le", %audio(codec="pcm_s16le", ac=2, ar=48000)),
url="tcp://0.0.0.0:9100?listen=1",
minecraft
)