From 18efbb095780b95922145b3fc626a0a1a7c6e035 Mon Sep 17 00:00:00 2001 From: Lucas Schwiderski Date: Tue, 24 Sep 2024 11:48:15 +0200 Subject: [PATCH] Catch failing worker threads Previously if one of the threads other than the server failed, it would log the error but continue to run in a broken state. So instead this makes sure than if any thread finishes the whole application stops. Since all worker threads should always wait indefinitely for more work, this should only happen if one of them bails on an error. --- src/main.rs | 91 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0485687..095627e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,8 @@ use std::fs; +use std::sync::mpsc::Sender; use std::thread::{self, JoinHandle}; -use color_eyre::eyre::{eyre, Context as _}; +use color_eyre::eyre::{bail, eyre, Context as _}; use color_eyre::Result; use tracing_error::ErrorLayer; use tracing_subscriber::layer::SubscriberExt as _; @@ -24,7 +25,11 @@ mod worker { pub(crate) static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); -fn spawn(name: &'static str, task: F) -> Result>> +fn spawn( + name: &'static str, + close_tx: Sender<&'static str>, + task: F, +) -> Result>> where F: FnOnce() -> Result + Send + 'static, T: Send + 'static, @@ -38,19 +43,13 @@ where tracing::error!("Thread '{}' errored: {:?}", name, err); } + let _ = close_tx.send(name); + res }) .wrap_err_with(|| format!("Failed to create thread '{}'", name)) } -// TODO: Don't put the entire app into a Tokio runtime. -// Instead, run single-threaded runtimes off of those threads where I need -// Tokio. -// - Axum server -// - API query scheduler -// - Ntfy sender -// TODO: Find a way to communicate between them, in a way that works for both the non-Tokio -// Lua thread and the various other threads. fn main() -> Result<()> { color_eyre::install()?; tracing_subscriber::registry() @@ -59,43 +58,67 @@ fn main() -> Result<()> { .with(ErrorLayer::new(fmt::format::Pretty::default())) .init(); - // TODO: Create a separate, fixed thread to run Lua on - // TODO: Create a channel to send events to Lua - // TODO: Create another thread that periodically checks - // service APIs. The Lua thread should be able to send - // configurations here - // TODO: Create another thread that handles sending data to - // Ntfy. Most importantly the Lua thread needs to send events here. - // Could be consolidated with the service API thread, since it's all HTTP requests. - let config_path = std::env::var("CONFIG_PATH").wrap_err("Missing variable 'CONFIG_PATH'")?; - let (ntfy_tx, ntfy_rx) = tokio::sync::mpsc::unbounded_channel(); + // A channel send to each thread to signal that any of them finished. + // Since all workers are supposed to be running indefinitely, waiting for + // events from the outside, they can only stop because of an error. + // But to be able + let (close_tx, close_rx) = std::sync::mpsc::channel(); + + let (sender_tx, sender_rx) = tokio::sync::mpsc::unbounded_channel(); let (event_tx, event_rx) = std::sync::mpsc::channel(); // A channel that lets other threads, mostly the Lua code, register // a task with the API fetcher. let (api_tx, api_rx) = tokio::sync::mpsc::unbounded_channel(); - { + let lua_thread = { let code = fs::read_to_string(&config_path) .wrap_err_with(|| format!("Failed to read config from '{}'", config_path))?; - spawn("lua", move || worker::lua(code, event_rx, api_tx, ntfy_tx))?; - } + spawn("lua", close_tx.clone(), move || { + worker::lua(code, event_rx, api_tx, sender_tx) + })? + }; - { + let api_thread = { let event_tx = event_tx.clone(); - spawn("api", move || worker::api(api_rx, event_tx))?; - } + spawn("api", close_tx.clone(), move || { + worker::api(api_rx, event_tx) + })? + }; - { + let sender_thread = { let event_tx = event_tx.clone(); - spawn("sender", move || worker::sender(event_tx, ntfy_rx))?; - } + spawn("sender", close_tx.clone(), move || { + worker::sender(event_tx, sender_rx) + })? + }; - let server_worker = spawn("server", move || worker::server(event_tx))?; - server_worker - .join() - .map_err(|err| eyre!("Thread 'server' panicked: {:?}", err)) - .and_then(|res| res) + let server_thread = spawn("server", close_tx, move || worker::server(event_tx))?; + + match close_rx.recv() { + Ok(name) => match name { + "api" => api_thread + .join() + .map_err(|err| eyre!("Thread 'api' panicked: {:?}", err)) + .and_then(|res| res), + "lua" => lua_thread + .join() + .map_err(|err| eyre!("Thread 'lua' panicked: {:?}", err)) + .and_then(|res| res), + "sender" => sender_thread + .join() + .map_err(|err| eyre!("Thread 'sender' panicked: {:?}", err)) + .and_then(|res| res), + "server" => server_thread + .join() + .map_err(|err| eyre!("Thread 'server' panicked: {:?}", err)) + .and_then(|res| res), + _ => bail!("Unknown thread '{}'", name), + }, + Err(_) => unreachable!( + "Any thread given this channel will send a closing notification before dropping it" + ), + } }