generated from lucas/rust-template
Implement initial boilerplate
The general architecture is a collection of four threads. Three threads that run their dedicated Tokio runtime and handle the axum server, send outgoing notifications and run scheduled API requests respectively. The fourth thread runs the Lua VM. Channels exist between the threads to send messages and allow the Lua script to orchestrate and configure everything.
This commit is contained in:
parent
4faa02d2af
commit
a02bf39f27
10 changed files with 1921 additions and 12 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1 +1,3 @@
|
|||
/target
|
||||
.envrc
|
||||
test.lua
|
||||
|
|
1393
Cargo.lock
generated
1393
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -6,7 +6,15 @@ edition = "2021"
|
|||
license = "EUPL-1.2"
|
||||
|
||||
[dependencies]
|
||||
axum = "0.7.5"
|
||||
color-eyre = "0.6.3"
|
||||
mlua = { version = "0.9.9", features = ["luajit", "macros", "serialize"] }
|
||||
reqwest = { version = "0.12.7", features = ["json"] }
|
||||
serde = { version = "1.0.209", features = ["derive"] }
|
||||
serde_json = "1.0.128"
|
||||
serde_repr = "0.1.19"
|
||||
tokio = { version = "1.40.0", features = ["rt", "sync"] }
|
||||
tokio-stream = "0.1.16"
|
||||
tracing = "0.1.40"
|
||||
tracing-error = "0.2.0"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
# Rust Project Template
|
||||
# Ntfy Collector
|
||||
|
||||
A simple boilerplate for Rust projects
|
||||
A daemon to collect notifications from various places and forward them to Ntfy.
|
||||
|
|
88
src/main.rs
88
src/main.rs
|
@ -1,7 +1,53 @@
|
|||
use std::fs;
|
||||
use std::thread::{self, JoinHandle};
|
||||
|
||||
use color_eyre::eyre::{eyre, Context as _};
|
||||
use color_eyre::Result;
|
||||
use tracing_error::ErrorLayer;
|
||||
use tracing_subscriber::{fmt, layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter};
|
||||
use tracing_subscriber::layer::SubscriberExt as _;
|
||||
use tracing_subscriber::util::SubscriberInitExt as _;
|
||||
use tracing_subscriber::{fmt, EnvFilter};
|
||||
|
||||
pub(crate) mod types;
|
||||
mod worker {
|
||||
mod api;
|
||||
mod lua;
|
||||
mod sender;
|
||||
mod server;
|
||||
|
||||
pub use api::worker as api;
|
||||
pub use lua::worker as lua;
|
||||
pub use sender::worker as sender;
|
||||
pub use server::worker as server;
|
||||
}
|
||||
|
||||
fn spawn<T, F>(name: &'static str, task: F) -> Result<JoinHandle<Result<T>>>
|
||||
where
|
||||
F: FnOnce() -> Result<T> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
thread::Builder::new()
|
||||
.name(name.to_string())
|
||||
.spawn(move || {
|
||||
let res = task();
|
||||
|
||||
if let Err(err) = &res {
|
||||
tracing::error!("Thread '{}' errored: {:?}", name, err);
|
||||
}
|
||||
|
||||
res
|
||||
})
|
||||
.wrap_err_with(|| format!("Failed to create thread '{}'", name))
|
||||
}
|
||||
|
||||
// TODO: Don't put the entire app into a Tokio runtime.
|
||||
// Instead, run single-threaded runtimes off of those threads where I need
|
||||
// Tokio.
|
||||
// - Axum server
|
||||
// - API query scheduler
|
||||
// - Ntfy sender
|
||||
// TODO: Find a way to communicate between them, in a way that works for both the non-Tokio
|
||||
// Lua thread and the various other threads.
|
||||
fn main() -> Result<()> {
|
||||
color_eyre::install()?;
|
||||
tracing_subscriber::registry()
|
||||
|
@ -10,7 +56,43 @@ fn main() -> Result<()> {
|
|||
.with(ErrorLayer::new(fmt::format::Pretty::default()))
|
||||
.init();
|
||||
|
||||
println!("Hello, world!");
|
||||
// TODO: Create a separate, fixed thread to run Lua on
|
||||
// TODO: Create a channel to send events to Lua
|
||||
// TODO: Create another thread that periodically checks
|
||||
// service APIs. The Lua thread should be able to send
|
||||
// configurations here
|
||||
// TODO: Create another thread that handles sending data to
|
||||
// Ntfy. Most importantly the Lua thread needs to send events here.
|
||||
// Could be consolidated with the service API thread, since it's all HTTP requests.
|
||||
|
||||
Ok(())
|
||||
let config_path = std::env::var("CONFIG_PATH").wrap_err("Missing variable 'CONFIG_PATH'")?;
|
||||
|
||||
let (ntfy_tx, ntfy_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (event_tx, event_rx) = std::sync::mpsc::channel();
|
||||
// A channel that lets other threads, mostly the Lua code, register
|
||||
// a task with the API fetcher.
|
||||
let (api_tx, api_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
{
|
||||
let code = fs::read_to_string(&config_path)
|
||||
.wrap_err_with(|| format!("Failed to read config from '{}'", config_path))?;
|
||||
|
||||
spawn("lua", move || worker::lua(code, event_rx, api_tx, ntfy_tx))?;
|
||||
}
|
||||
|
||||
{
|
||||
let event_tx = event_tx.clone();
|
||||
spawn("api", move || worker::api(api_rx, event_tx))?;
|
||||
}
|
||||
|
||||
{
|
||||
let event_tx = event_tx.clone();
|
||||
spawn("sender", move || worker::sender(event_tx, ntfy_rx))?;
|
||||
}
|
||||
|
||||
let server_worker = spawn("server", move || worker::server(event_tx))?;
|
||||
server_worker
|
||||
.join()
|
||||
.map_err(|err| eyre!("Thread 'server' panicked: {:?}", err))
|
||||
.and_then(|res| res)
|
||||
}
|
||||
|
|
104
src/types.rs
Normal file
104
src/types.rs
Normal file
|
@ -0,0 +1,104 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub(crate) struct WebhookEvent {
|
||||
pub topic: String,
|
||||
pub query: HashMap<String, String>,
|
||||
pub body: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub(crate) struct ApiEvent {
|
||||
pub id: String,
|
||||
pub body: serde_json::Value,
|
||||
pub status: u16,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub(crate) struct ErrorEvent {
|
||||
pub id: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum Event {
|
||||
Webhook(WebhookEvent),
|
||||
Api(ApiEvent),
|
||||
Error(ErrorEvent),
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn error(id: String, message: impl ToString) -> Self {
|
||||
Self::Error(ErrorEvent {
|
||||
id,
|
||||
message: message.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde_repr::Deserialize_repr, serde_repr::Serialize_repr)]
|
||||
#[repr(u8)]
|
||||
pub(crate) enum NtfyPriority {
|
||||
Min = 1,
|
||||
Low = 2,
|
||||
Default = 3,
|
||||
High = 4,
|
||||
Max = 5,
|
||||
}
|
||||
|
||||
impl Default for NtfyPriority {
|
||||
fn default() -> Self {
|
||||
Self::Default
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub(crate) struct NtfyAction {
|
||||
pub action: String,
|
||||
pub label: String,
|
||||
pub url: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub(crate) struct NtfyMessage {
|
||||
pub topic: String,
|
||||
pub message: String,
|
||||
pub title: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub tags: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub priority: NtfyPriority,
|
||||
#[serde(default)]
|
||||
pub click: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub actions: Vec<NtfyAction>,
|
||||
#[serde(default)]
|
||||
pub markdown: bool,
|
||||
#[serde(default)]
|
||||
pub icon: Option<String>,
|
||||
#[serde(default)]
|
||||
pub delay: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum Message {
|
||||
Ntfy(NtfyMessage),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub(crate) enum Method {
|
||||
Get,
|
||||
Post,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub(crate) struct ApiTask {
|
||||
pub id: String,
|
||||
pub interval: u64,
|
||||
pub method: Method,
|
||||
pub url: String,
|
||||
pub body: serde_json::Value,
|
||||
pub query: HashMap<String, String>,
|
||||
}
|
100
src/worker/api.rs
Normal file
100
src/worker/api.rs
Normal file
|
@ -0,0 +1,100 @@
|
|||
use std::sync::mpsc::Sender;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use color_eyre::eyre::Context as _;
|
||||
use color_eyre::Result;
|
||||
use reqwest::Client;
|
||||
use tokio::runtime;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_stream::wrappers::IntervalStream;
|
||||
use tokio_stream::{StreamExt as _, StreamMap};
|
||||
|
||||
use crate::types::{ApiEvent, ApiTask, Event, Method};
|
||||
|
||||
async fn perform_request(client: &Client, task: &ApiTask) -> Result<Event> {
|
||||
let req = match task.method {
|
||||
Method::Get => client.get(&task.url),
|
||||
Method::Post => {
|
||||
let body = serde_json::to_vec(&task.body)
|
||||
.expect("Type `serde_json::Value` should always be serializable");
|
||||
client.post(&task.url).body(body)
|
||||
}
|
||||
};
|
||||
let res = req.query(&task.query).send().await?;
|
||||
let status = res.status().as_u16();
|
||||
|
||||
let body: serde_json::Value = res.json().await?;
|
||||
|
||||
let event = ApiEvent {
|
||||
id: task.id.clone(),
|
||||
body,
|
||||
status,
|
||||
};
|
||||
Ok(Event::Api(event))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn run(mut api_rx: UnboundedReceiver<ApiTask>, event_tx: Sender<Event>) -> Result<()> {
|
||||
let tasks = Arc::new(Mutex::new(StreamMap::new()));
|
||||
|
||||
let client = Client::builder()
|
||||
.build()
|
||||
.wrap_err("Failed to build HTTP client")?;
|
||||
|
||||
tokio::spawn({
|
||||
let tasks = tasks.clone();
|
||||
async move {
|
||||
while let Some(task) = api_rx.recv().await {
|
||||
tracing::trace!("Received new API task: {:?}", task);
|
||||
|
||||
let id = task.id.clone();
|
||||
let interval = tokio::time::interval(Duration::from_secs(task.interval));
|
||||
|
||||
let task = Arc::new(task);
|
||||
let mut tasks = tasks.lock().await;
|
||||
tasks.insert(id, IntervalStream::new(interval).map(move |_| task.clone()));
|
||||
}
|
||||
tracing::error!("API task channel closed");
|
||||
}
|
||||
});
|
||||
|
||||
loop {
|
||||
// We need to guarantee that the lock on `tasks` is released as soon as possible.
|
||||
// To ensure that it isn't held for the entirety of the `sleep`, the indirection
|
||||
// via the extra `bool` value is used, so that the lock can be dropped immediately,
|
||||
// and during the `sleep` new streams can be created.
|
||||
let wait = {
|
||||
let mut tasks = tasks.lock().await;
|
||||
match tasks.next().await {
|
||||
Some((_, task)) => {
|
||||
let event = perform_request(&client, &task)
|
||||
.await
|
||||
.wrap_err("Failed to perform API request")?;
|
||||
event_tx.send(event).wrap_err("Failed to send event")?;
|
||||
|
||||
false
|
||||
}
|
||||
None => true,
|
||||
}
|
||||
};
|
||||
|
||||
if wait {
|
||||
// Ideally we would be able to wait explicitly for the first stream
|
||||
// to be registered. But as a workaround, we have to idle wait.
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument("api::worker", skip_all)]
|
||||
pub fn worker(api_rx: UnboundedReceiver<ApiTask>, event_tx: Sender<Event>) -> Result<()> {
|
||||
let rt = runtime::Builder::new_current_thread()
|
||||
.thread_name("api")
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.build()?;
|
||||
|
||||
rt.block_on(run(api_rx, event_tx))
|
||||
}
|
87
src/worker/lua.rs
Normal file
87
src/worker/lua.rs
Normal file
|
@ -0,0 +1,87 @@
|
|||
use std::sync::mpsc::Receiver;
|
||||
|
||||
use color_eyre::Result;
|
||||
use mlua::{Function, Lua, LuaSerdeExt};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::types::{ApiTask, Event, Message};
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn worker(
|
||||
config: String,
|
||||
event_rx: Receiver<Event>,
|
||||
api_tx: UnboundedSender<ApiTask>,
|
||||
ntfy_tx: UnboundedSender<Message>,
|
||||
) -> Result<()> {
|
||||
let lua = Lua::new();
|
||||
let globals = lua.globals();
|
||||
|
||||
let config = lua.load(config).set_name("config");
|
||||
|
||||
lua.scope(|scope| {
|
||||
let ntfy_fn = scope.create_function_mut(|_, data: mlua::Value| {
|
||||
let data = lua.from_value(data)?;
|
||||
tracing::trace!("Sending Ntfy message: {:?}", data);
|
||||
|
||||
match ntfy_tx.send(Message::Ntfy(data)) {
|
||||
Ok(_) => Ok((true, mlua::Value::Nil)),
|
||||
Err(_) => {
|
||||
let msg = lua.create_string("Failed to send message")?;
|
||||
Ok((false, mlua::Value::String(msg)))
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
let set_api_task_fn = scope.create_function_mut(|_, data: mlua::Value| {
|
||||
let task = lua.from_value(data)?;
|
||||
tracing::trace!("Sending task request: {:?}", task);
|
||||
|
||||
match api_tx.send(task) {
|
||||
Ok(_) => Ok((true, mlua::Value::Nil)),
|
||||
Err(_) => {
|
||||
let msg = lua.create_string("Failed to trigger task")?;
|
||||
Ok((false, mlua::Value::String(msg)))
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
globals.set("ntfy", ntfy_fn)?;
|
||||
globals.set("api_task", set_api_task_fn)?;
|
||||
|
||||
config.exec()?;
|
||||
|
||||
let event_fn: Function = match globals.get("on_event") {
|
||||
Ok(f) => f,
|
||||
Err(err) => match err {
|
||||
mlua::Error::FromLuaConversionError { from, to: _, message: _ } => {
|
||||
let err = mlua::Error::runtime(format!("Global function 'on_event' not defined properly. Got value of type '{}'", from));
|
||||
return Err(err);
|
||||
}
|
||||
err => return Err(err),
|
||||
},
|
||||
};
|
||||
|
||||
// Main blocking loop. As long as we can receive events, this scope will stay active.
|
||||
while let Ok(event) = event_rx.recv() {
|
||||
tracing::trace!("Received event: {:?}", event);
|
||||
match event {
|
||||
Event::Webhook(data) => {
|
||||
let data = lua.to_value(&data)?;
|
||||
event_fn.call::<_, ()>(("webhook", data))?
|
||||
}
|
||||
Event::Api(data) => {
|
||||
let data = lua.to_value(&data)?;
|
||||
event_fn.call::<_, ()>(("api", data))?
|
||||
}
|
||||
Event::Error(data) => {
|
||||
let data = lua.to_value(&data)?;
|
||||
event_fn.call::<_, ()>(("error", data))?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
71
src/worker/sender.rs
Normal file
71
src/worker/sender.rs
Normal file
|
@ -0,0 +1,71 @@
|
|||
use std::sync::mpsc::Sender;
|
||||
|
||||
use color_eyre::eyre::{self, Context as _};
|
||||
use color_eyre::Result;
|
||||
use reqwest::{header, Client};
|
||||
use tokio::runtime;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
|
||||
use crate::types::{Event, Message, NtfyMessage};
|
||||
|
||||
static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
|
||||
|
||||
#[tracing::instrument]
|
||||
async fn send_ntfy(client: &Client, url: &String, data: &NtfyMessage) -> Result<()> {
|
||||
let body = serde_json::to_string(data)?;
|
||||
tracing::trace!("JSON: {}", body);
|
||||
let res = client.post(url).body(body).send().await?;
|
||||
if res.status().as_u16() >= 400 {
|
||||
let body = res.text().await?;
|
||||
eyre::bail!("Ntfy server returned error: {}", body);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn run(event_tx: Sender<Event>, mut ntfy_rx: UnboundedReceiver<Message>) -> Result<()> {
|
||||
let ntfy_url = std::env::var("NTFY_URL").wrap_err("Missing env var 'NTFY_URL'")?;
|
||||
let ntfy_token = std::env::var("NTFY_TOKEN").wrap_err("Missing env var 'NTFY_TOKEN'")?;
|
||||
|
||||
let ntfy_client = Client::builder()
|
||||
.default_headers({
|
||||
let mut headers = header::HeaderMap::new();
|
||||
|
||||
let auth = format!("Bearer {}", ntfy_token);
|
||||
let mut auth = header::HeaderValue::from_str(&auth)?;
|
||||
auth.set_sensitive(true);
|
||||
|
||||
headers.insert(header::AUTHORIZATION, auth);
|
||||
|
||||
headers
|
||||
})
|
||||
.user_agent(APP_USER_AGENT)
|
||||
.build()?;
|
||||
|
||||
while let Some(message) = ntfy_rx.recv().await {
|
||||
tracing::trace!("Received notification: {:?}", message);
|
||||
|
||||
match message {
|
||||
Message::Ntfy(data) => {
|
||||
if let Err(err) = send_ntfy(&ntfy_client, &ntfy_url, &data).await {
|
||||
tracing::error!("Failed to send to Ntfy: {:?}", err);
|
||||
event_tx.send(Event::error(data.topic, err))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Stopped receiving messages");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub fn worker(event_tx: Sender<Event>, ntfy_rx: UnboundedReceiver<Message>) -> Result<()> {
|
||||
let rt = runtime::Builder::new_current_thread()
|
||||
.thread_name("sender")
|
||||
.enable_io()
|
||||
.build()?;
|
||||
|
||||
rt.block_on(run(event_tx, ntfy_rx))
|
||||
}
|
76
src/worker/server.rs
Normal file
76
src/worker/server.rs
Normal file
|
@ -0,0 +1,76 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::{Path, Query, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::routing::get;
|
||||
use axum::{Json, Router};
|
||||
use color_eyre::eyre::Context as _;
|
||||
use color_eyre::Result;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::runtime;
|
||||
|
||||
use crate::types::{Event, WebhookEvent};
|
||||
|
||||
struct AppState {
|
||||
event_tx: Sender<Event>,
|
||||
}
|
||||
|
||||
pub fn worker(event_tx: Sender<Event>) -> Result<()> {
|
||||
let rt = runtime::Builder::new_current_thread()
|
||||
.thread_name("server")
|
||||
.enable_io()
|
||||
.build()?;
|
||||
|
||||
rt.block_on(task(event_tx))
|
||||
}
|
||||
|
||||
async fn task(event_tx: Sender<Event>) -> Result<()> {
|
||||
let state = AppState { event_tx };
|
||||
let shared_state = Arc::new(state);
|
||||
|
||||
let app = Router::new()
|
||||
.route(
|
||||
"/webhook/:topic",
|
||||
get(webhook_handler).post(webhook_handler),
|
||||
)
|
||||
.with_state(shared_state);
|
||||
|
||||
let listener = TcpListener::bind("0.0.0.0:3000")
|
||||
.await
|
||||
.wrap_err("Failed to bind to TCP socket")?;
|
||||
tracing::info!("Listening on \"0.0.0.0:3000\"");
|
||||
|
||||
axum::serve(listener, app)
|
||||
.await
|
||||
.wrap_err("Failed to start server")
|
||||
}
|
||||
|
||||
async fn webhook_handler(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(topic): Path<String>,
|
||||
Query(query): Query<HashMap<String, String>>,
|
||||
body: Option<Json<serde_json::Value>>,
|
||||
) -> impl IntoResponse {
|
||||
let event = Event::Webhook(WebhookEvent {
|
||||
topic,
|
||||
query,
|
||||
body: body
|
||||
.map(|Json(body)| body)
|
||||
.unwrap_or(serde_json::Value::Null),
|
||||
});
|
||||
tracing::debug!("Received webhook event: {:?}", &event);
|
||||
|
||||
if state.event_tx.send(event).is_err() {
|
||||
tracing::error!("Failed to trigger webhook event");
|
||||
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Failed to trigger webhook event",
|
||||
)
|
||||
} else {
|
||||
(StatusCode::NO_CONTENT, "")
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue