use std::fs; use std::sync::mpsc::Sender; use std::thread::{self, JoinHandle}; use color_eyre::eyre::{bail, eyre, Context as _}; use color_eyre::Result; use tracing_error::ErrorLayer; use tracing_subscriber::layer::SubscriberExt as _; use tracing_subscriber::util::SubscriberInitExt as _; use tracing_subscriber::{fmt, EnvFilter}; pub(crate) mod types; mod worker { mod api; mod lua; mod sender; mod server; pub use api::worker as api; pub use lua::worker as lua; pub use sender::worker as sender; pub use server::worker as server; } pub(crate) static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); fn spawn( name: &'static str, close_tx: Sender<&'static str>, task: F, ) -> Result>> where F: FnOnce() -> Result + Send + 'static, T: Send + 'static, { thread::Builder::new() .name(name.to_string()) .spawn(move || { let res = task(); if let Err(err) = &res { tracing::error!("Thread '{}' errored: {:?}", name, err); } let _ = close_tx.send(name); res }) .wrap_err_with(|| format!("Failed to create thread '{}'", name)) } fn main() -> Result<()> { color_eyre::install()?; tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into())) .with(fmt::layer().compact()) .with(ErrorLayer::new(fmt::format::Pretty::default())) .init(); let config_path = std::env::var("CONFIG_PATH").wrap_err("Missing variable 'CONFIG_PATH'")?; // A channel send to each thread to signal that any of them finished. // Since all workers are supposed to be running indefinitely, waiting for // events from the outside, they can only stop because of an error. // But to be able let (close_tx, close_rx) = std::sync::mpsc::channel(); let (sender_tx, sender_rx) = tokio::sync::mpsc::unbounded_channel(); let (event_tx, event_rx) = std::sync::mpsc::channel(); // A channel that lets other threads, mostly the Lua code, register // a task with the API fetcher. let (api_tx, api_rx) = tokio::sync::mpsc::unbounded_channel(); let lua_thread = { let code = fs::read_to_string(&config_path) .wrap_err_with(|| format!("Failed to read config from '{}'", config_path))?; spawn("lua", close_tx.clone(), move || { worker::lua(code, event_rx, api_tx, sender_tx) })? }; let api_thread = { let event_tx = event_tx.clone(); spawn("api", close_tx.clone(), move || { worker::api(api_rx, event_tx) })? }; let sender_thread = { let event_tx = event_tx.clone(); spawn("sender", close_tx.clone(), move || { worker::sender(event_tx, sender_rx) })? }; let server_thread = spawn("server", close_tx, move || worker::server(event_tx))?; match close_rx.recv() { Ok(name) => match name { "api" => api_thread .join() .map_err(|err| eyre!("Thread 'api' panicked: {:?}", err)) .and_then(|res| res), "lua" => lua_thread .join() .map_err(|err| eyre!("Thread 'lua' panicked: {:?}", err)) .and_then(|res| res), "sender" => sender_thread .join() .map_err(|err| eyre!("Thread 'sender' panicked: {:?}", err)) .and_then(|res| res), "server" => server_thread .join() .map_err(|err| eyre!("Thread 'server' panicked: {:?}", err)) .and_then(|res| res), _ => bail!("Unknown thread '{}'", name), }, Err(_) => unreachable!( "Any thread given this channel will send a closing notification before dropping it" ), } }