generated from lucas/rust-template
Compare commits
7 commits
4faa02d2af
...
18efbb0957
Author | SHA1 | Date | |
---|---|---|---|
18efbb0957 | |||
c105ac80cd | |||
f5c64b788f | |||
e9796333bb | |||
23d27389d3 | |||
d2cb39f9a2 | |||
a02bf39f27 |
12 changed files with 2017 additions and 13 deletions
1
.dockerignore
Normal file
1
.dockerignore
Normal file
|
@ -0,0 +1 @@
|
||||||
|
target/
|
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1 +1,3 @@
|
||||||
/target
|
/target
|
||||||
|
.envrc
|
||||||
|
lua/
|
||||||
|
|
1301
Cargo.lock
generated
1301
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
11
Cargo.toml
11
Cargo.toml
|
@ -6,7 +6,18 @@ edition = "2021"
|
||||||
license = "EUPL-1.2"
|
license = "EUPL-1.2"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
axum = "0.7.5"
|
||||||
color-eyre = "0.6.3"
|
color-eyre = "0.6.3"
|
||||||
|
mlua = { version = "0.9.9", features = ["luajit", "macros", "serialize", "vendored"] }
|
||||||
|
reqwest = { version = "0.12.7", default-features = false, features = ["json", "rustls-tls"] }
|
||||||
|
serde = { version = "1.0.209", features = ["derive"] }
|
||||||
|
serde_json = "1.0.128"
|
||||||
|
serde_repr = "0.1.19"
|
||||||
|
tokio = { version = "1.40.0", features = ["rt", "sync"] }
|
||||||
tracing = "0.1.40"
|
tracing = "0.1.40"
|
||||||
tracing-error = "0.2.0"
|
tracing-error = "0.2.0"
|
||||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
strip = true
|
||||||
|
lto = true
|
||||||
|
|
27
Dockerfile
Normal file
27
Dockerfile
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
FROM rust:1.81.0-slim AS build
|
||||||
|
|
||||||
|
WORKDIR /src
|
||||||
|
|
||||||
|
RUN set -ex; \
|
||||||
|
apt-get update; \
|
||||||
|
apt-get install -y --no-install-recommends \
|
||||||
|
make \
|
||||||
|
libluajit-5.1-dev \
|
||||||
|
; \
|
||||||
|
rm -rf /var/lib/apt/lists/*;
|
||||||
|
|
||||||
|
COPY . /src
|
||||||
|
RUN --mount=type=cache,id=cargo-registry,target=/cargo/registry \
|
||||||
|
--mount=type=cache,id=cargo-target,target=/src/target \
|
||||||
|
cargo build --release --locked && cp /src/target/release/ntfy-collector /src/;
|
||||||
|
|
||||||
|
FROM gcr.io/distroless/cc-debian12 AS final
|
||||||
|
|
||||||
|
WORKDIR /ntfy-collector
|
||||||
|
ENV CONFIG_PATH=/ntfy-collector/lua/config.lua
|
||||||
|
ENV LUA_PATH=/ntfy-collector/lua/?.lua;/ntfy-collector/lua/?/init.lua
|
||||||
|
|
||||||
|
COPY --from=build /src/ntfy-collector /usr/bin/ntfy-collector
|
||||||
|
COPY ./lua ./lua
|
||||||
|
|
||||||
|
CMD ["/usr/bin/ntfy-collector"]
|
|
@ -1,3 +1,3 @@
|
||||||
# Rust Project Template
|
# Ntfy Collector
|
||||||
|
|
||||||
A simple boilerplate for Rust projects
|
A daemon to collect notifications from various places and forward them to Ntfy.
|
||||||
|
|
116
src/main.rs
116
src/main.rs
|
@ -1,16 +1,124 @@
|
||||||
|
use std::fs;
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
use std::thread::{self, JoinHandle};
|
||||||
|
|
||||||
|
use color_eyre::eyre::{bail, eyre, Context as _};
|
||||||
use color_eyre::Result;
|
use color_eyre::Result;
|
||||||
use tracing_error::ErrorLayer;
|
use tracing_error::ErrorLayer;
|
||||||
use tracing_subscriber::{fmt, layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter};
|
use tracing_subscriber::layer::SubscriberExt as _;
|
||||||
|
use tracing_subscriber::util::SubscriberInitExt as _;
|
||||||
|
use tracing_subscriber::{fmt, EnvFilter};
|
||||||
|
|
||||||
|
pub(crate) mod types;
|
||||||
|
mod worker {
|
||||||
|
mod api;
|
||||||
|
mod lua;
|
||||||
|
mod sender;
|
||||||
|
mod server;
|
||||||
|
|
||||||
|
pub use api::worker as api;
|
||||||
|
pub use lua::worker as lua;
|
||||||
|
pub use sender::worker as sender;
|
||||||
|
pub use server::worker as server;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) static APP_USER_AGENT: &str =
|
||||||
|
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
|
||||||
|
|
||||||
|
fn spawn<T, F>(
|
||||||
|
name: &'static str,
|
||||||
|
close_tx: Sender<&'static str>,
|
||||||
|
task: F,
|
||||||
|
) -> Result<JoinHandle<Result<T>>>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> Result<T> + Send + 'static,
|
||||||
|
T: Send + 'static,
|
||||||
|
{
|
||||||
|
thread::Builder::new()
|
||||||
|
.name(name.to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
let res = task();
|
||||||
|
|
||||||
|
if let Err(err) = &res {
|
||||||
|
tracing::error!("Thread '{}' errored: {:?}", name, err);
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = close_tx.send(name);
|
||||||
|
|
||||||
|
res
|
||||||
|
})
|
||||||
|
.wrap_err_with(|| format!("Failed to create thread '{}'", name))
|
||||||
|
}
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
fn main() -> Result<()> {
|
||||||
color_eyre::install()?;
|
color_eyre::install()?;
|
||||||
tracing_subscriber::registry()
|
tracing_subscriber::registry()
|
||||||
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()))
|
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()))
|
||||||
.with(fmt::layer().pretty())
|
.with(fmt::layer().compact())
|
||||||
.with(ErrorLayer::new(fmt::format::Pretty::default()))
|
.with(ErrorLayer::new(fmt::format::Pretty::default()))
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
println!("Hello, world!");
|
let config_path = std::env::var("CONFIG_PATH").wrap_err("Missing variable 'CONFIG_PATH'")?;
|
||||||
|
|
||||||
Ok(())
|
// A channel send to each thread to signal that any of them finished.
|
||||||
|
// Since all workers are supposed to be running indefinitely, waiting for
|
||||||
|
// events from the outside, they can only stop because of an error.
|
||||||
|
// But to be able
|
||||||
|
let (close_tx, close_rx) = std::sync::mpsc::channel();
|
||||||
|
|
||||||
|
let (sender_tx, sender_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (event_tx, event_rx) = std::sync::mpsc::channel();
|
||||||
|
// A channel that lets other threads, mostly the Lua code, register
|
||||||
|
// a task with the API fetcher.
|
||||||
|
let (api_tx, api_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let lua_thread = {
|
||||||
|
let code = fs::read_to_string(&config_path)
|
||||||
|
.wrap_err_with(|| format!("Failed to read config from '{}'", config_path))?;
|
||||||
|
|
||||||
|
spawn("lua", close_tx.clone(), move || {
|
||||||
|
worker::lua(code, event_rx, api_tx, sender_tx)
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let api_thread = {
|
||||||
|
let event_tx = event_tx.clone();
|
||||||
|
spawn("api", close_tx.clone(), move || {
|
||||||
|
worker::api(api_rx, event_tx)
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let sender_thread = {
|
||||||
|
let event_tx = event_tx.clone();
|
||||||
|
spawn("sender", close_tx.clone(), move || {
|
||||||
|
worker::sender(event_tx, sender_rx)
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let server_thread = spawn("server", close_tx, move || worker::server(event_tx))?;
|
||||||
|
|
||||||
|
match close_rx.recv() {
|
||||||
|
Ok(name) => match name {
|
||||||
|
"api" => api_thread
|
||||||
|
.join()
|
||||||
|
.map_err(|err| eyre!("Thread 'api' panicked: {:?}", err))
|
||||||
|
.and_then(|res| res),
|
||||||
|
"lua" => lua_thread
|
||||||
|
.join()
|
||||||
|
.map_err(|err| eyre!("Thread 'lua' panicked: {:?}", err))
|
||||||
|
.and_then(|res| res),
|
||||||
|
"sender" => sender_thread
|
||||||
|
.join()
|
||||||
|
.map_err(|err| eyre!("Thread 'sender' panicked: {:?}", err))
|
||||||
|
.and_then(|res| res),
|
||||||
|
"server" => server_thread
|
||||||
|
.join()
|
||||||
|
.map_err(|err| eyre!("Thread 'server' panicked: {:?}", err))
|
||||||
|
.and_then(|res| res),
|
||||||
|
_ => bail!("Unknown thread '{}'", name),
|
||||||
|
},
|
||||||
|
Err(_) => unreachable!(
|
||||||
|
"Any thread given this channel will send a closing notification before dropping it"
|
||||||
|
),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
183
src/types.rs
Normal file
183
src/types.rs
Normal file
|
@ -0,0 +1,183 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use mlua::IntoLua;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Converts a `serde_json::Value` to a `mlua::Value`.
|
||||||
|
fn json_to_lua(value: serde_json::Value, lua: &mlua::Lua) -> mlua::Result<mlua::Value<'_>> {
|
||||||
|
match value {
|
||||||
|
serde_json::Value::Null => Ok(mlua::Value::Nil),
|
||||||
|
serde_json::Value::Bool(value) => Ok(mlua::Value::Boolean(value)),
|
||||||
|
serde_json::Value::Number(value) => match value.as_f64() {
|
||||||
|
Some(number) => Ok(mlua::Value::Number(number)),
|
||||||
|
None => Err(mlua::Error::ToLuaConversionError {
|
||||||
|
from: "serde_json::Value::Number",
|
||||||
|
to: "Number",
|
||||||
|
message: Some("Number cannot be represented by a floating-point type".into()),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
serde_json::Value::String(value) => lua.create_string(value).map(mlua::Value::String),
|
||||||
|
serde_json::Value::Array(value) => {
|
||||||
|
let tbl = lua.create_table_with_capacity(value.len(), 0)?;
|
||||||
|
for v in value {
|
||||||
|
let v = json_to_lua(v, lua)?;
|
||||||
|
tbl.push(v)?;
|
||||||
|
}
|
||||||
|
Ok(mlua::Value::Table(tbl))
|
||||||
|
}
|
||||||
|
serde_json::Value::Object(value) => {
|
||||||
|
let tbl = lua.create_table_with_capacity(0, value.len())?;
|
||||||
|
for (k, v) in value {
|
||||||
|
let k = lua.create_string(k)?;
|
||||||
|
let v = json_to_lua(v, lua)?;
|
||||||
|
tbl.set(k, v)?;
|
||||||
|
}
|
||||||
|
Ok(mlua::Value::Table(tbl))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize)]
|
||||||
|
pub(crate) struct WebhookEvent {
|
||||||
|
pub topic: String,
|
||||||
|
pub query: HashMap<String, String>,
|
||||||
|
pub body: serde_json::Value,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize)]
|
||||||
|
pub(crate) struct ApiEvent {
|
||||||
|
pub id: String,
|
||||||
|
pub body: serde_json::Value,
|
||||||
|
pub headers: HashMap<String, Vec<u8>>,
|
||||||
|
pub status: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'lua> IntoLua<'lua> for ApiEvent {
|
||||||
|
fn into_lua(self, lua: &'lua mlua::Lua) -> mlua::Result<mlua::Value<'lua>> {
|
||||||
|
let headers = lua.create_table_with_capacity(0, self.headers.len())?;
|
||||||
|
for (k, v) in self.headers {
|
||||||
|
let k = lua.create_string(k)?;
|
||||||
|
let v = lua.create_string(v)?;
|
||||||
|
headers.set(k, v)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let body = json_to_lua(self.body, lua)?;
|
||||||
|
let tbl = lua.create_table_with_capacity(0, 4)?;
|
||||||
|
|
||||||
|
lua.create_string(self.id)
|
||||||
|
.and_then(|id| tbl.set("id", id))?;
|
||||||
|
tbl.set("status", self.status)?;
|
||||||
|
tbl.set("headers", headers)?;
|
||||||
|
tbl.set("body", body)?;
|
||||||
|
|
||||||
|
Ok(mlua::Value::Table(tbl))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize)]
|
||||||
|
pub(crate) struct ErrorEvent {
|
||||||
|
pub id: String,
|
||||||
|
pub message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub(crate) enum Event {
|
||||||
|
Webhook(WebhookEvent),
|
||||||
|
Api(ApiEvent),
|
||||||
|
Error(ErrorEvent),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Event {
|
||||||
|
pub fn error(id: String, message: impl ToString) -> Self {
|
||||||
|
Self::Error(ErrorEvent {
|
||||||
|
id,
|
||||||
|
message: message.to_string(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, serde_repr::Deserialize_repr, serde_repr::Serialize_repr)]
|
||||||
|
#[repr(u8)]
|
||||||
|
pub(crate) enum NtfyPriority {
|
||||||
|
Min = 1,
|
||||||
|
Low = 2,
|
||||||
|
Default = 3,
|
||||||
|
High = 4,
|
||||||
|
Max = 5,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for NtfyPriority {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Default
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
|
pub(crate) struct NtfyAction {
|
||||||
|
pub action: String,
|
||||||
|
pub label: String,
|
||||||
|
pub url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
|
pub(crate) struct NtfyMessage {
|
||||||
|
// Needs to be deserialized from Lua, but should not be sent to Ntfy
|
||||||
|
#[serde(skip_serializing)]
|
||||||
|
pub url: String,
|
||||||
|
// Needs to be deserialized from Lua, but should not be sent to Ntfy
|
||||||
|
#[serde(skip_serializing)]
|
||||||
|
pub token: String,
|
||||||
|
pub topic: String,
|
||||||
|
pub message: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub title: Option<String>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub tags: Vec<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub priority: NtfyPriority,
|
||||||
|
#[serde(default)]
|
||||||
|
pub click: Option<String>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub actions: Vec<NtfyAction>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub markdown: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub icon: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub delay: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub(crate) enum Message {
|
||||||
|
Ntfy(NtfyMessage),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub(crate) enum Method {
|
||||||
|
#[serde(alias = "GET", alias = "get")]
|
||||||
|
Get,
|
||||||
|
#[serde(alias = "POST", alias = "post")]
|
||||||
|
Post,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Method {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Get
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
pub(crate) struct ApiTask {
|
||||||
|
pub id: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub delay: u64,
|
||||||
|
#[serde(default)]
|
||||||
|
pub method: Method,
|
||||||
|
pub url: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub body: serde_json::Value,
|
||||||
|
#[serde(default)]
|
||||||
|
pub query: HashMap<String, String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub headers: HashMap<String, String>,
|
||||||
|
}
|
121
src/worker/api.rs
Normal file
121
src/worker/api.rs
Normal file
|
@ -0,0 +1,121 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use color_eyre::eyre::Context as _;
|
||||||
|
use color_eyre::Result;
|
||||||
|
use reqwest::{header, Client};
|
||||||
|
use tokio::runtime;
|
||||||
|
use tokio::sync::mpsc::UnboundedReceiver;
|
||||||
|
use tracing::Instrument as _;
|
||||||
|
|
||||||
|
use crate::types::{ApiEvent, ApiTask, Event, Method};
|
||||||
|
use crate::APP_USER_AGENT;
|
||||||
|
|
||||||
|
fn header_map_to_hashmap(headers: &header::HeaderMap) -> HashMap<String, Vec<u8>> {
|
||||||
|
let mut map = HashMap::with_capacity(headers.len());
|
||||||
|
|
||||||
|
for (k, v) in headers {
|
||||||
|
map.insert(k.to_string(), v.as_bytes().to_vec());
|
||||||
|
}
|
||||||
|
|
||||||
|
map
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument]
|
||||||
|
async fn perform_request(client: &Client, task: &ApiTask) -> Result<Event> {
|
||||||
|
let req = match task.method {
|
||||||
|
Method::Get => client.get(&task.url),
|
||||||
|
Method::Post => {
|
||||||
|
let body = serde_json::to_vec(&task.body)
|
||||||
|
.expect("Type `serde_json::Value` should always be serializable");
|
||||||
|
client.post(&task.url).body(body)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = req
|
||||||
|
.query(&task.query)
|
||||||
|
.headers({
|
||||||
|
let mut headers = header::HeaderMap::new();
|
||||||
|
|
||||||
|
for (k, v) in &task.headers {
|
||||||
|
// Non-ASCII characters aren't supported anyways for header name, so no need to map
|
||||||
|
// those.
|
||||||
|
let k = header::HeaderName::from_lowercase(k.to_ascii_lowercase().as_bytes())
|
||||||
|
.wrap_err_with(|| format!("Invalid header name '{}'", k))?;
|
||||||
|
let v = header::HeaderValue::from_str(v)
|
||||||
|
.wrap_err_with(|| format!("Invalid header value '{}'", v))?;
|
||||||
|
|
||||||
|
headers.insert(k, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
headers
|
||||||
|
})
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
let status = res.status();
|
||||||
|
let headers = header_map_to_hashmap(res.headers());
|
||||||
|
|
||||||
|
let body: serde_json::Value = if let Ok(text) = res.text().await {
|
||||||
|
serde_json::from_str(&text).unwrap_or(serde_json::Value::String(text))
|
||||||
|
} else {
|
||||||
|
serde_json::Value::Null
|
||||||
|
};
|
||||||
|
|
||||||
|
let event = ApiEvent {
|
||||||
|
id: task.id.clone(),
|
||||||
|
headers,
|
||||||
|
body,
|
||||||
|
status: status.as_u16(),
|
||||||
|
};
|
||||||
|
Ok(Event::Api(event))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn run(mut api_rx: UnboundedReceiver<ApiTask>, event_tx: Sender<Event>) -> Result<()> {
|
||||||
|
let (task_tx, mut task_rx) = tokio::sync::mpsc::channel(64);
|
||||||
|
|
||||||
|
let client = Client::builder()
|
||||||
|
.user_agent(APP_USER_AGENT)
|
||||||
|
.build()
|
||||||
|
.wrap_err("Failed to build HTTP client")?;
|
||||||
|
|
||||||
|
tokio::spawn({
|
||||||
|
let span = tracing::info_span!("api receiver");
|
||||||
|
async move {
|
||||||
|
while let Some(task) = api_rx.recv().await {
|
||||||
|
tracing::trace!(id = task.id, "Received new API task");
|
||||||
|
|
||||||
|
let _ = task_tx
|
||||||
|
.send(async move {
|
||||||
|
tokio::time::sleep(Duration::from_secs(task.delay)).await;
|
||||||
|
task
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
tracing::error!("API task channel closed");
|
||||||
|
}
|
||||||
|
.instrument(span)
|
||||||
|
});
|
||||||
|
|
||||||
|
while let Some(task) = task_rx.recv().await {
|
||||||
|
let task = task.await;
|
||||||
|
let event = perform_request(&client, &task)
|
||||||
|
.await
|
||||||
|
.wrap_err("Failed to perform API request")?;
|
||||||
|
event_tx.send(event).wrap_err("Failed to send event")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument("api::worker", skip_all)]
|
||||||
|
pub fn worker(api_rx: UnboundedReceiver<ApiTask>, event_tx: Sender<Event>) -> Result<()> {
|
||||||
|
let rt = runtime::Builder::new_current_thread()
|
||||||
|
.thread_name("api")
|
||||||
|
.enable_io()
|
||||||
|
.enable_time()
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
rt.block_on(run(api_rx, event_tx))
|
||||||
|
}
|
118
src/worker/lua.rs
Normal file
118
src/worker/lua.rs
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
use std::sync::mpsc::Receiver;
|
||||||
|
|
||||||
|
use color_eyre::Result;
|
||||||
|
use mlua::{Function, IntoLua as _, Lua, LuaSerdeExt};
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
|
use crate::types::{ApiTask, Event, Message, NtfyMessage};
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub fn worker(
|
||||||
|
config: String,
|
||||||
|
event_rx: Receiver<Event>,
|
||||||
|
api_tx: UnboundedSender<ApiTask>,
|
||||||
|
ntfy_tx: UnboundedSender<Message>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let lua = Lua::new();
|
||||||
|
let globals = lua.globals();
|
||||||
|
|
||||||
|
let config = lua.load(config).set_name("config");
|
||||||
|
|
||||||
|
lua.scope(|scope| {
|
||||||
|
let log_trace_fn = scope.create_function(|_, s: String| {
|
||||||
|
tracing::trace!(name: "lua", "{}", s);
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
let log_debug_fn = scope.create_function(|_, s: String| {
|
||||||
|
tracing::debug!(name: "lua", "{}", s);
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
let log_info_fn = scope.create_function(|_, s: String| {
|
||||||
|
tracing::info!(name: "lua", "{}", s);
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
let log_warn_fn = scope.create_function(|_, s: String| {
|
||||||
|
tracing::warn!(name: "lua", "{}", s);
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
let log_error_fn = scope.create_function(|_, s: String| {
|
||||||
|
tracing::error!(name: "lua", "{}", s);
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let log_tbl = lua.create_table_with_capacity(0, 5)?;
|
||||||
|
log_tbl.set("trace", log_trace_fn)?;
|
||||||
|
log_tbl.set("debug", log_debug_fn)?;
|
||||||
|
log_tbl.set("info", log_info_fn)?;
|
||||||
|
log_tbl.set("warn", log_warn_fn)?;
|
||||||
|
log_tbl.set("error", log_error_fn)?;
|
||||||
|
|
||||||
|
let ntfy_fn = scope.create_function_mut(|_, data: mlua::Value| {
|
||||||
|
let data: NtfyMessage = lua.from_value(data)?;
|
||||||
|
tracing::trace!(topic = data.topic, title = data.title, msg = data.message,"Sending Ntfy message");
|
||||||
|
|
||||||
|
match ntfy_tx.send(Message::Ntfy(data)) {
|
||||||
|
Ok(_) => Ok((true, mlua::Value::Nil)),
|
||||||
|
Err(_) => {
|
||||||
|
let msg = lua.create_string("Failed to send message")?;
|
||||||
|
Ok((false, mlua::Value::String(msg)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let set_api_task_fn = scope.create_function_mut(|_, data: mlua::Value| {
|
||||||
|
let task: ApiTask = lua.from_value(data)?;
|
||||||
|
tracing::trace!("Sending task request: id = {}, delay = {}", task.id, task.delay);
|
||||||
|
|
||||||
|
match api_tx.send(task) {
|
||||||
|
Ok(_) => Ok((true, mlua::Value::Nil)),
|
||||||
|
Err(_) => {
|
||||||
|
let msg = lua.create_string("Failed to trigger task")?;
|
||||||
|
Ok((false, mlua::Value::String(msg)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
globals.set("log", log_tbl)?;
|
||||||
|
globals.set("ntfy", ntfy_fn)?;
|
||||||
|
globals.set("api_task", set_api_task_fn)?;
|
||||||
|
|
||||||
|
config.exec()?;
|
||||||
|
|
||||||
|
let event_fn: Function = match globals.get("on_event") {
|
||||||
|
Ok(f) => f,
|
||||||
|
Err(err) => match err {
|
||||||
|
mlua::Error::FromLuaConversionError { from, to: _, message: _ } => {
|
||||||
|
let err = mlua::Error::runtime(format!("Global function 'on_event' not defined properly. Got value of type '{}'", from));
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
|
err => return Err(err),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Main blocking loop. As long as we can receive events, this scope will stay active.
|
||||||
|
while let Ok(event) = event_rx.recv() {
|
||||||
|
match event {
|
||||||
|
Event::Webhook(data) => {
|
||||||
|
tracing::trace!(id = data.topic, "Received webhook event");
|
||||||
|
let data = lua.to_value(&data)?;
|
||||||
|
event_fn.call::<_, ()>(("webhook", data))?
|
||||||
|
}
|
||||||
|
Event::Api(data) => {
|
||||||
|
tracing::trace!(id = data.id, status = data.status, "Received api event");
|
||||||
|
let data = data.into_lua(&lua)?;
|
||||||
|
event_fn.call::<_, ()>(("api", data))?
|
||||||
|
}
|
||||||
|
Event::Error(data) => {
|
||||||
|
tracing::trace!(id = data.id, message = data.message, "Received error event");
|
||||||
|
let data = lua.to_value(&data)?;
|
||||||
|
event_fn.call::<_, ()>(("error", data))?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
70
src/worker/sender.rs
Normal file
70
src/worker/sender.rs
Normal file
|
@ -0,0 +1,70 @@
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
|
||||||
|
use color_eyre::eyre;
|
||||||
|
use color_eyre::Result;
|
||||||
|
use reqwest::header;
|
||||||
|
use reqwest::Client;
|
||||||
|
use tokio::runtime;
|
||||||
|
use tokio::sync::mpsc::UnboundedReceiver;
|
||||||
|
|
||||||
|
use crate::types::{Event, Message, NtfyMessage};
|
||||||
|
use crate::APP_USER_AGENT;
|
||||||
|
|
||||||
|
#[tracing::instrument]
|
||||||
|
async fn send_ntfy(client: &Client, data: &NtfyMessage) -> Result<()> {
|
||||||
|
let body = serde_json::to_string(data)?;
|
||||||
|
|
||||||
|
let res = client
|
||||||
|
.post(&data.url)
|
||||||
|
.headers({
|
||||||
|
let mut headers = header::HeaderMap::new();
|
||||||
|
|
||||||
|
let auth = format!("Bearer {}", data.token);
|
||||||
|
let mut auth = header::HeaderValue::from_str(&auth)?;
|
||||||
|
auth.set_sensitive(true);
|
||||||
|
|
||||||
|
headers.insert(header::AUTHORIZATION, auth);
|
||||||
|
|
||||||
|
headers
|
||||||
|
})
|
||||||
|
.body(body)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if res.status().as_u16() >= 400 {
|
||||||
|
let body = res.text().await.unwrap_or_default();
|
||||||
|
eyre::bail!("Ntfy server returned error: {}", body);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn run(event_tx: Sender<Event>, mut ntfy_rx: UnboundedReceiver<Message>) -> Result<()> {
|
||||||
|
let ntfy_client = Client::builder().user_agent(APP_USER_AGENT).build()?;
|
||||||
|
|
||||||
|
while let Some(message) = ntfy_rx.recv().await {
|
||||||
|
match message {
|
||||||
|
Message::Ntfy(data) => {
|
||||||
|
tracing::trace!(topic = data.topic, "Sending Ntfy notification");
|
||||||
|
if let Err(err) = send_ntfy(&ntfy_client, &data).await {
|
||||||
|
tracing::error!("Failed to send to Ntfy: {:?}", err);
|
||||||
|
event_tx.send(Event::error(data.topic, err))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::debug!("Stopped receiving messages");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub fn worker(event_tx: Sender<Event>, ntfy_rx: UnboundedReceiver<Message>) -> Result<()> {
|
||||||
|
let rt = runtime::Builder::new_current_thread()
|
||||||
|
.thread_name("sender")
|
||||||
|
.enable_io()
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
rt.block_on(run(event_tx, ntfy_rx))
|
||||||
|
}
|
76
src/worker/server.rs
Normal file
76
src/worker/server.rs
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::extract::{Path, Query, State};
|
||||||
|
use axum::http::StatusCode;
|
||||||
|
use axum::response::IntoResponse;
|
||||||
|
use axum::routing::get;
|
||||||
|
use axum::{Json, Router};
|
||||||
|
use color_eyre::eyre::Context as _;
|
||||||
|
use color_eyre::Result;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::runtime;
|
||||||
|
|
||||||
|
use crate::types::{Event, WebhookEvent};
|
||||||
|
|
||||||
|
struct AppState {
|
||||||
|
event_tx: Sender<Event>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn worker(event_tx: Sender<Event>) -> Result<()> {
|
||||||
|
let rt = runtime::Builder::new_current_thread()
|
||||||
|
.thread_name("server")
|
||||||
|
.enable_io()
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
rt.block_on(task(event_tx))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn task(event_tx: Sender<Event>) -> Result<()> {
|
||||||
|
let state = AppState { event_tx };
|
||||||
|
let shared_state = Arc::new(state);
|
||||||
|
|
||||||
|
let app = Router::new()
|
||||||
|
.route(
|
||||||
|
"/webhook/:topic",
|
||||||
|
get(webhook_handler).post(webhook_handler),
|
||||||
|
)
|
||||||
|
.with_state(shared_state);
|
||||||
|
|
||||||
|
let listener = TcpListener::bind("0.0.0.0:3000")
|
||||||
|
.await
|
||||||
|
.wrap_err("Failed to bind to TCP socket")?;
|
||||||
|
tracing::info!("Listening on \"0.0.0.0:3000\"");
|
||||||
|
|
||||||
|
axum::serve(listener, app)
|
||||||
|
.await
|
||||||
|
.wrap_err("Failed to start server")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn webhook_handler(
|
||||||
|
State(state): State<Arc<AppState>>,
|
||||||
|
Path(topic): Path<String>,
|
||||||
|
Query(query): Query<HashMap<String, String>>,
|
||||||
|
body: Option<Json<serde_json::Value>>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let event = Event::Webhook(WebhookEvent {
|
||||||
|
topic,
|
||||||
|
query,
|
||||||
|
body: body
|
||||||
|
.map(|Json(body)| body)
|
||||||
|
.unwrap_or(serde_json::Value::Null),
|
||||||
|
});
|
||||||
|
tracing::debug!("Received webhook event: {:?}", &event);
|
||||||
|
|
||||||
|
if state.event_tx.send(event).is_err() {
|
||||||
|
tracing::error!("Failed to trigger webhook event");
|
||||||
|
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
"Failed to trigger webhook event",
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
(StatusCode::NO_CONTENT, "")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue