diff --git a/src/main.rs b/src/main.rs index 0485687..095627e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,8 @@ use std::fs; +use std::sync::mpsc::Sender; use std::thread::{self, JoinHandle}; -use color_eyre::eyre::{eyre, Context as _}; +use color_eyre::eyre::{bail, eyre, Context as _}; use color_eyre::Result; use tracing_error::ErrorLayer; use tracing_subscriber::layer::SubscriberExt as _; @@ -24,7 +25,11 @@ mod worker { pub(crate) static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); -fn spawn(name: &'static str, task: F) -> Result>> +fn spawn( + name: &'static str, + close_tx: Sender<&'static str>, + task: F, +) -> Result>> where F: FnOnce() -> Result + Send + 'static, T: Send + 'static, @@ -38,19 +43,13 @@ where tracing::error!("Thread '{}' errored: {:?}", name, err); } + let _ = close_tx.send(name); + res }) .wrap_err_with(|| format!("Failed to create thread '{}'", name)) } -// TODO: Don't put the entire app into a Tokio runtime. -// Instead, run single-threaded runtimes off of those threads where I need -// Tokio. -// - Axum server -// - API query scheduler -// - Ntfy sender -// TODO: Find a way to communicate between them, in a way that works for both the non-Tokio -// Lua thread and the various other threads. fn main() -> Result<()> { color_eyre::install()?; tracing_subscriber::registry() @@ -59,43 +58,67 @@ fn main() -> Result<()> { .with(ErrorLayer::new(fmt::format::Pretty::default())) .init(); - // TODO: Create a separate, fixed thread to run Lua on - // TODO: Create a channel to send events to Lua - // TODO: Create another thread that periodically checks - // service APIs. The Lua thread should be able to send - // configurations here - // TODO: Create another thread that handles sending data to - // Ntfy. Most importantly the Lua thread needs to send events here. - // Could be consolidated with the service API thread, since it's all HTTP requests. - let config_path = std::env::var("CONFIG_PATH").wrap_err("Missing variable 'CONFIG_PATH'")?; - let (ntfy_tx, ntfy_rx) = tokio::sync::mpsc::unbounded_channel(); + // A channel send to each thread to signal that any of them finished. + // Since all workers are supposed to be running indefinitely, waiting for + // events from the outside, they can only stop because of an error. + // But to be able + let (close_tx, close_rx) = std::sync::mpsc::channel(); + + let (sender_tx, sender_rx) = tokio::sync::mpsc::unbounded_channel(); let (event_tx, event_rx) = std::sync::mpsc::channel(); // A channel that lets other threads, mostly the Lua code, register // a task with the API fetcher. let (api_tx, api_rx) = tokio::sync::mpsc::unbounded_channel(); - { + let lua_thread = { let code = fs::read_to_string(&config_path) .wrap_err_with(|| format!("Failed to read config from '{}'", config_path))?; - spawn("lua", move || worker::lua(code, event_rx, api_tx, ntfy_tx))?; - } + spawn("lua", close_tx.clone(), move || { + worker::lua(code, event_rx, api_tx, sender_tx) + })? + }; - { + let api_thread = { let event_tx = event_tx.clone(); - spawn("api", move || worker::api(api_rx, event_tx))?; - } + spawn("api", close_tx.clone(), move || { + worker::api(api_rx, event_tx) + })? + }; - { + let sender_thread = { let event_tx = event_tx.clone(); - spawn("sender", move || worker::sender(event_tx, ntfy_rx))?; - } + spawn("sender", close_tx.clone(), move || { + worker::sender(event_tx, sender_rx) + })? + }; - let server_worker = spawn("server", move || worker::server(event_tx))?; - server_worker - .join() - .map_err(|err| eyre!("Thread 'server' panicked: {:?}", err)) - .and_then(|res| res) + let server_thread = spawn("server", close_tx, move || worker::server(event_tx))?; + + match close_rx.recv() { + Ok(name) => match name { + "api" => api_thread + .join() + .map_err(|err| eyre!("Thread 'api' panicked: {:?}", err)) + .and_then(|res| res), + "lua" => lua_thread + .join() + .map_err(|err| eyre!("Thread 'lua' panicked: {:?}", err)) + .and_then(|res| res), + "sender" => sender_thread + .join() + .map_err(|err| eyre!("Thread 'sender' panicked: {:?}", err)) + .and_then(|res| res), + "server" => server_thread + .join() + .map_err(|err| eyre!("Thread 'server' panicked: {:?}", err)) + .and_then(|res| res), + _ => bail!("Unknown thread '{}'", name), + }, + Err(_) => unreachable!( + "Any thread given this channel will send a closing notification before dropping it" + ), + } }