This commit is contained in:
Lucas Schwiderski 2025-05-12 17:13:26 +02:00
parent 4de17290d0
commit 59ee7fa31e
Signed by: lucas
GPG key ID: AA12679AAA6DF4D8
10 changed files with 4592 additions and 73 deletions

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
.wineprefix
.envrc .envrc
/target /target
/msvc /msvc

3575
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,8 +6,13 @@ edition = "2024"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
bincode = "2.0.1"
bstr = "1.12.0"
color-eyre = "0.6.4"
libc = "0.2.144" libc = "0.2.144"
libp2p = { version = "0.55.0", features = ["tokio", "tcp", "noise", "yamux", "ping", "gossipsub"] }
log = { version = "0.4.27", features = ["release_max_level_info"] } log = { version = "0.4.27", features = ["release_max_level_info"] }
tokio = { version = "1.45.0", features = ["macros", "rt", "sync"] }
[build-dependencies] [build-dependencies]
bindgen = "0.71.0" bindgen = "0.71.0"

View file

@ -7,7 +7,7 @@ image_name := "dt-p2p-builder"
steam_library := env("steam_library") steam_library := env("steam_library")
game_dir := steam_library / "steamapps" / "common" / "Warhammer 40,000 DARKTIDE" game_dir := steam_library / "steamapps" / "common" / "Warhammer 40,000 DARKTIDE"
log_dir := env("appdata") / "Fatshark" / "Darktide" / "console_logs" log_dir := env("APPDATA") / "Fatshark" / "Darktide" / "console_logs"
proton_dir := env("HOME") / ".steam" / "steam" proton_dir := env("HOME") / ".steam" / "steam"

View file

@ -1,20 +1,25 @@
use std::ffi::{CString, c_char}; use std::ffi::{CStr, c_char};
use std::sync::OnceLock; use std::sync::{OnceLock, RwLock};
mod lua; mod lua;
mod plugin; mod plugin;
mod rpc;
mod stingray_sdk; mod stingray_sdk;
use log::error;
use plugin::Plugin; use plugin::Plugin;
use stingray_sdk::{GetApiFunction, LoggingApi, LuaApi, PluginApi, PluginApiID}; use stingray_sdk::{GetApiFunction, LoggingApi, LuaApi, PluginApi, PluginApiID};
/// The name that the plugin is registered to the engine as. /// The name that the plugin is registered to the engine as.
/// Must be unique across all plugins. /// Must be unique across all plugins.
pub const PLUGIN_NAME: &str = "dt_p2p"; pub const PLUGIN_NAME: &CStr = c"dt_p2p";
/// The module that Lua functions are assigned to. /// The module that Lua functions are assigned to.
pub const MODULE_NAME: &str = "dt_p2p"; pub const MODULE_NAME: &CStr = c"dt_p2p";
static PLUGIN: OnceLock<Plugin> = OnceLock::new(); const CHANNEL_BUFFER_SIZE: usize = 100;
const EVENTS_PER_TICK: usize = 25;
static PLUGIN: OnceLock<RwLock<Plugin>> = OnceLock::new();
static LOGGER: OnceLock<LoggingApi> = OnceLock::new(); static LOGGER: OnceLock<LoggingApi> = OnceLock::new();
static LUA: OnceLock<LuaApi> = OnceLock::new(); static LUA: OnceLock<LuaApi> = OnceLock::new();
@ -30,10 +35,22 @@ macro_rules! global {
}}; }};
} }
/// A macro to make accessing the plugin instance as mutable a little more convenient.
#[macro_export]
macro_rules! plugin_mut {
() => {{
let lock = $crate::PLUGIN.get().and_then(|l| l.write().ok());
if cfg!(debug_assertions) {
lock.expect("failed to acquire lock on plugin global")
} else {
unsafe { lock.unwrap_unchecked() }
}
}};
}
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub extern "C" fn get_name() -> *const c_char { pub extern "C" fn get_name() -> *const c_char {
let s = CString::new(PLUGIN_NAME).expect("Failed to create CString from plugin name"); PLUGIN_NAME.as_ptr()
s.as_ptr()
} }
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
@ -44,19 +61,27 @@ pub extern "C" fn setup_game(get_engine_api: GetApiFunction) {
let _ = LUA.get_or_init(|| LuaApi::get(get_engine_api)); let _ = LUA.get_or_init(|| LuaApi::get(get_engine_api));
let plugin = PLUGIN.get_or_init(Plugin::new); let plugin = match Plugin::new() {
Ok(plugin) => plugin,
Err(err) => {
error!("Failed to initialize plugin:\n{:?}", err);
return;
}
};
plugin.setup_game(); plugin.setup_game();
PLUGIN.set(RwLock::new(plugin));
} }
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub extern "C" fn shutdown_game() { pub extern "C" fn shutdown_game() {
let plugin = global!(PLUGIN); let plugin = plugin_mut!();
plugin.shutdown_game(); plugin.shutdown_game();
} }
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub extern "C" fn update_game(dt: f32) { pub extern "C" fn update_game(dt: f32) {
let plugin = global!(PLUGIN); let mut plugin = plugin_mut!();
plugin.update_game(dt); plugin.update_game(dt);
} }

View file

@ -1,16 +1,334 @@
use crate::stingray_sdk::lua_State; use std::collections::HashMap;
use crate::{LUA, global}; use std::panic::catch_unwind;
pub extern "C" fn do_something(l: *mut lua_State) -> i32 { use bstr::BStr;
let lua = global!(LUA); use bstr::ByteSlice as _;
use color_eyre::Result;
use color_eyre::eyre;
use color_eyre::eyre::Context as _;
if let Some(name) = lua.tolstring(l, 1) { use crate::plugin::Identifier;
lua.pushstring( use crate::plugin::Plugin;
l, use crate::stingray_sdk::LUA_REGISTRYINDEX;
format!("[do_something] Hello from Rust, {}", name.to_string_lossy()), use crate::stingray_sdk::{LuaState, LuaType, lua_State};
); use crate::{LUA, global, plugin_mut};
1
} else { pub const NAMESPACE_SEPARATOR: char = '.';
0
static RPC_CALLBACK_KEY: &str = "dt-p2p.rpc-callbacks";
// As a minor optimization, only `catch_unwind` in debug mode.
#[cfg(debug_assertions)]
fn lua_wrapper<F>(l: *mut lua_State, f: F) -> i32
where
F: FnOnce(&mut Plugin, &LuaState) -> Result<i32> + std::panic::UnwindSafe,
{
let lua = LuaState::new(l, global!(LUA));
let res = catch_unwind(|| {
let mut plugin = plugin_mut!();
f(&mut plugin, &lua)
});
match res {
Ok(Ok(i)) => i,
Ok(Err(err)) => lua.lib_error(format!("{:?}", err)),
Err(err) => lua.lib_error(format!("{:?}", err)),
} }
} }
#[cfg(not(debug_assertions))]
fn lua_wrapper<F>(l: *mut lua_State, f: F) -> i32
where
F: FnOnce(&mut Plugin, &LuaState) -> Result<i32>,
{
let lua = LuaState::new(l, global!(LUA));
let plugin = plugin_mut!();
let res = f(&mut plugin, &lua);
match res {
Ok(i) => i,
Err(err) => lua.lib_error(format!("{:?}", err)),
}
}
/// Returns the namespace and name for a RPC.
///
/// The namespace may either be part of the name, separated by `NAMESPACE_SEPARATOR`
/// or omitted, in which case the mod name is used as fallback.
fn get_identifier<'a>(lua: &'a LuaState) -> Result<Identifier<'a>> {
lua.getfield(1, c"name");
if !lua.isstring(-1) {
eyre::bail!("bad argument #1, not a mod object");
}
let mod_name = lua.tostring(-1);
let name = lua.tostring(2);
let (namespace, name) = name
.split_once_str(&[NAMESPACE_SEPARATOR as u8])
.map(|(namespace, name)| (BStr::new(namespace), BStr::new(name)))
.unwrap_or((mod_name, name));
Ok(Identifier::new(namespace, name))
}
// Called with `(mod: table, name: string, opts: table)`
pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
lua.getfield(1, c"version");
if !lua.istable(1) {
eyre::bail!("bad argument #1, expected a mod object");
}
if !lua.isstring(2) {
eyre::bail!("bad argument #2, expected a string");
}
if !lua.istable(3) {
eyre::bail!("bad argument #3, expected a table");
}
let _version = match lua.r#type(-1) {
LuaType::Nil => {
eyre::bail!("bad argument #1, mod object doesn't provide a 'version' field");
}
LuaType::Number => lua.tostring(-1),
LuaType::String => {
let version = lua.tostring(-1);
// TODO: parse as date or semver-like
version
}
x => {
eyre::bail!("invalid type {} for 'version' field", x)
}
};
let id = get_identifier(lua).wrap_err("failed to determine RPC name")?;
plugin.create_rpc(id);
Ok(0)
})
}
type LuaMap<'a> = HashMap<LuaKey<'a>, LuaValue<'a>>;
#[derive(bincode::BorrowDecode, bincode::Encode, PartialEq)]
enum LuaKey<'a> {
String(&'a [u8]),
Number(f64),
}
impl<'a> Eq for LuaKey<'a> {}
impl<'a> std::hash::Hash for LuaKey<'a> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
LuaKey::String(s) => s.hash(state),
LuaKey::Number(n) => (*n as u128).hash(state),
}
}
}
#[derive(bincode::BorrowDecode, bincode::Encode)]
pub enum LuaValue<'a> {
Nil,
Boolean(bool),
Number(f64),
String(&'a [u8]),
Table(LuaMap<'a>),
}
impl<'a> LuaValue<'a> {
fn parse_table(lua: &'a LuaState, idx: i32) -> Result<Self> {
let mut values = LuaMap::default();
//
// `nil` as initial key tells `lua_next` to pick the first key.
lua.pushnil();
loop {
if lua.next(idx) <= 0 {
break;
}
let key = match lua.r#type(-2) {
LuaType::Number => LuaKey::Number(lua.tonumber(-2)),
LuaType::String => LuaKey::String(lua.tostring(-2)),
t => eyre::bail!("Unsupported type {} for args table key", t),
};
let value = Self::parse(lua, -1)?;
values.insert(key, value);
// Pop the value, but keep the key so `lua_next` knows where to continue
lua.pop();
}
// Pop the last key
lua.pop();
Ok(LuaValue::Table(values))
}
pub fn parse(lua: &'a LuaState, idx: i32) -> Result<Self> {
match lua.r#type(idx) {
LuaType::None | LuaType::Nil => Ok(LuaValue::Nil),
LuaType::Boolean => Ok(LuaValue::Boolean(lua.toboolean(-1))),
LuaType::Number => Ok(LuaValue::Number(lua.tonumber(-1))),
LuaType::String => Ok(LuaValue::String(lua.tostring(-1))),
LuaType::Table => Self::parse_table(lua, -1),
x => eyre::bail!("Unknown or unsupported Lua type {}", x),
}
}
pub fn push(&self, lua: &LuaState) {
match self {
LuaValue::Nil => lua.pushnil(),
LuaValue::Boolean(bool) => lua.pushboolean(*bool),
LuaValue::Number(num) => lua.pushnumber(*num),
LuaValue::String(s) => lua.pushstring(*s),
LuaValue::Table(table) => Self::push_table(lua, table),
}
}
fn push_table(lua: &LuaState, table: &LuaMap) {
let (narr, nrec) = table.iter().fold((0, 0), |(narr, nrec), (k, _)| match k {
LuaKey::String(_) => (narr, nrec + 1),
LuaKey::Number(_) => (narr + 1, nrec),
});
lua.createtable(narr, nrec);
let tbl_index = lua.gettop();
for (k, v) in table.iter() {
match k {
LuaKey::String(s) => lua.pushstring(*s),
LuaKey::Number(num) => lua.pushnumber(*num),
}
v.push(lua);
lua.settable(tbl_index);
}
}
}
// Called with `(mod: table, name: string, args: table)`
pub extern "C" fn send_rpc(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
if !lua.istable(1) {
eyre::bail!("bad argument #1, expected a mod object");
}
if !lua.isstring(2) {
eyre::bail!("bad argument #2, expected a string");
}
if !lua.istable(3) {
eyre::bail!("bad argument #3, expected a table");
}
let id = get_identifier(lua).wrap_err("failed to determine RPC name")?;
let args = LuaValue::parse_table(lua, 3).wrap_err("Failed to parse args table")?;
plugin.send_rpc(id, args).wrap_err("Failed to send RPC")?;
Ok(0)
})
}
// Called with `(mod: table, name: string, callback: Fn(args))`
pub extern "C" fn receive_rpc(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
if !lua.istable(1) {
eyre::bail!("bad argument #1, expected a mod object");
}
if !lua.isstring(2) {
eyre::bail!("bad argument #2, expected a string");
}
if !lua.isfunction(3) {
eyre::bail!("bad argument #3, expected a function");
}
let id = get_identifier(lua).wrap_err("failed to determine RPC name")?;
// We will utilize Lua's _references_ to store the callback functions. Since we're doing
// the bookkeeping on the Rust side, we don't need to worry about any structured layout on
// the Lua side.
lua.pushvalue(3);
let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX);
plugin.add_rpc_listener(id, fn_ref);
Ok(0)
})
}
// Called with `(mod: table, callback: Fn(peer_id))`
pub extern "C" fn on_peer_connected(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
if !lua.istable(1) {
eyre::bail!("bad argument #1, expected a mod object");
}
if !lua.isfunction(2) {
eyre::bail!("bad argument #2, expected a function");
}
lua.getfield(1, c"name");
if !lua.isstring(-1) {
eyre::bail!("bad argument #1, not a mod object");
}
let mod_name = lua.tostring(-1);
lua.pushvalue(3);
let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX);
plugin.add_connect_listener(mod_name, fn_ref);
Ok(0)
})
}
// Called with `(mod: table, callback: Fn(peer_id))`
pub extern "C" fn on_peer_disconnected(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
if !lua.istable(1) {
eyre::bail!("bad argument #1, expected a mod object");
}
if !lua.isfunction(2) {
eyre::bail!("bad argument #2, expected a function");
}
lua.getfield(1, c"name");
if !lua.isstring(-1) {
eyre::bail!("bad argument #1, expected a mod object");
}
let mod_name = lua.tostring(-1);
lua.pushvalue(3);
let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX);
plugin.add_disconnect_listener(mod_name, fn_ref);
Ok(0)
})
}
// Called with `(dt: float)`
pub extern "C" fn update(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
if !lua.isnumber(1) {
eyre::bail!("bad argument #1, expected a number");
}
let dt = lua.tonumber(1);
plugin.lua_update(dt, lua)?;
Ok(0)
})
}

View file

@ -1,26 +1,382 @@
use log::info; use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash as _, Hasher as _};
use std::thread;
use std::thread::JoinHandle;
use crate::{LUA, MODULE_NAME, PLUGIN_NAME, global, lua}; use bstr::{BStr, BString, ByteSlice};
use color_eyre::eyre::Context as _;
use color_eyre::{Result, eyre};
use libp2p::futures::StreamExt;
use libp2p::gossipsub::{AllowAllSubscriptionFilter, Event, IdentityTransform, TopicHash};
use libp2p::swarm::SwarmEvent;
use libp2p::{gossipsub, noise, tcp, yamux};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{self, UnboundedSender};
pub(crate) struct Plugin {} use crate::lua::{LuaValue, NAMESPACE_SEPARATOR};
use crate::rpc::{RPC, RPCMessage};
use crate::stingray_sdk::{LUA_REGISTRYINDEX, LuaRef, LuaState};
use crate::{CHANNEL_BUFFER_SIZE, EVENTS_PER_TICK, lua};
use crate::{LUA, MODULE_NAME, global};
#[derive(Clone, Copy, Debug)]
pub struct Identifier<'a> {
name: &'a BStr,
namespace: &'a BStr,
}
impl<'a> Identifier<'a> {
pub fn new(namespace: &'a BStr, name: &'a BStr) -> Self {
Self { namespace, name }
}
pub(crate) fn to_topic(self) -> TopicHash {
TopicHash::from_raw(format!("{}", self))
}
}
impl std::fmt::Display for Identifier<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}{}", self.namespace, NAMESPACE_SEPARATOR, self.name)
}
}
#[derive(Debug)]
struct NamespacedMap<T> {
map: HashMap<BString, HashMap<BString, T>>,
}
impl<T> NamespacedMap<T> {
pub fn insert(&mut self, id: Identifier, val: T) {
if !self.map.contains_key(id.namespace) {
self.map.insert(id.namespace.to_owned(), Default::default());
}
let map = self
.map
.get_mut(id.namespace)
.expect("entry is verified to exist");
map.insert(id.name.to_owned(), val);
}
pub fn get(&self, id: &Identifier) -> Option<&T> {
self.map.get(id.namespace).and_then(|map| map.get(id.name))
}
}
impl<T> Default for NamespacedMap<T> {
fn default() -> Self {
Self {
map: Default::default(),
}
}
}
enum Task {}
pub(crate) struct Plugin {
swarm_thread: JoinHandle<Result<()>>,
event_rx: Receiver<SwarmEvent<Event>>,
send_tx: UnboundedSender<RPCMessage>,
rpcs: HashMap<TopicHash, RPC>,
connect_listeners: HashMap<TopicHash, LuaRef>,
disconnect_listeners: HashMap<TopicHash, LuaRef>,
}
impl Plugin { impl Plugin {
pub fn new() -> Self { pub fn new() -> Result<Self> {
Self {} let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)
.wrap_err("Failed to configure transport")?
.with_behaviour(|key| {
let message_id_fn = |msg: &gossipsub::Message| {
let mut s = DefaultHasher::new();
msg.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
let config = gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(message_id_fn)
.build()
.wrap_err("Failed to create gossipsub config")?;
// TODO: Benchmark if compression would be beneficial.
let behavior: gossipsub::Behaviour<IdentityTransform, AllowAllSubscriptionFilter> =
gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
config,
)?;
Ok(behavior)
})
.wrap_err("Failed to configure behavior")?
.build();
swarm
.listen_on(
"/ipv4/0.0.0.0/tcp/0"
.parse()
.expect("listen address invalid"),
)
.wrap_err("failed to listen on IPv4 socket")?;
// TODO: Connect to first hard coded relay server(s), which should then
// fan out to online peers.
// Maybe a cheap server (e.g. AWS free tier instance) is already enough to handle all
// session negotiations. If not, then a system is needed where each mod user also becomes
// available for that, and the hosted server merely acts as load balancer between them.
//
// swarm.dial(RELAY_SERVER)
let (event_tx, event_rx) = mpsc::channel::<SwarmEvent<Event>>(CHANNEL_BUFFER_SIZE);
// Since the `Sender` will be on the sync side, and we want to avoid blocking, we make this
// unbounded to limit the chance for it to actually block.
let (send_tx, rpc_rx) = mpsc::unbounded_channel::<RPCMessage>();
let swarm_thread = thread::Builder::new()
.name("p2p-swarm".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.wrap_err("Failed to create tokio runtime")?;
let mut swarm = swarm;
let mut rpc_rx = rpc_rx;
rt.block_on(async move {
loop {
tokio::select! {
event = swarm.select_next_some() => {
event_tx.send(event).await.wrap_err("Failed to queue event")?;
}
msg = rpc_rx.recv() => {
let Some(msg) = msg else {
eyre::bail!("RPC queue closed prematurely");
};
swarm.behaviour_mut().publish(msg.topic, msg.data).wrap_err("Failed to send RPC")?;
}
}
}
})
})
.wrap_err("Failed to create p2p-swarm thread")?;
Ok(Self {
swarm_thread,
event_rx,
send_tx,
rpcs: Default::default(),
connect_listeners: Default::default(),
disconnect_listeners: Default::default(),
})
} }
pub fn setup_game(&self) { pub fn setup_game(&self) {
info!("[setup_game] Hello, world! This is {}!", PLUGIN_NAME);
let lua = global!(LUA); let lua = global!(LUA);
lua.add_module_function(MODULE_NAME, "do_something", lua::do_something); lua.add_module_function(MODULE_NAME, c"update", lua::update);
lua.add_module_function(MODULE_NAME, c"create_rpc", lua::create_rpc);
lua.add_module_function(MODULE_NAME, c"send_rpc", lua::send_rpc);
lua.add_module_function(MODULE_NAME, c"receive_rpc", lua::receive_rpc);
// TODO: Handle mod reloads
lua.add_module_function(MODULE_NAME, c"on_peer_connected", lua::on_peer_connected);
lua.add_module_function(
MODULE_NAME,
c"on_peer_disconnected",
lua::on_peer_disconnected,
);
log::info!("Plugin initialized");
} }
pub fn shutdown_game(&self) { pub fn shutdown_game(&self) {
info!("[shutdown_game] Goodbye, world!"); // TODO: Find a way to send a close command, and make it blocking.
} }
pub fn update_game(&self, _dt: f32) {} // WARN: Due to how the instance of `Plugin` is currently stored and
// accessed by Lua functions, things will likely break if we attempt to get a `lua_State` here
// and trigger out own Lua functions.
pub fn update_game(&mut self, _dt: f32) {}
pub fn lua_update(&mut self, _dt: f64, lua: &LuaState) -> Result<()> {
if self.swarm_thread.is_finished() {
eyre::bail!("p2p-swarm thread terminated prematurely",);
// TODO: Move `self.swarm_thread` into a data structure that I can move it out of here,
// so that I can call `.join` and get the error it ended with.
}
// Limit the amount of events we handle per tick to avoid potential large spikes in
// frame time.
// TODO: Add a system to monitor the amount of incoming vs processed events, and add
// warnings when this cannot catch up.
// TODO: Check if it might become necessary to create a more elaborate system that can
// change this limit dynamically based on the amount of incoming messages.
// This must be able to eventually catch up.
for _ in 0..EVENTS_PER_TICK {
let event = match self.event_rx.try_recv() {
Ok(event) => event,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
eyre::bail!("p2p-swarm channel disconnected prematurely");
}
};
match event {
SwarmEvent::Behaviour(Event::Message {
propagation_source,
message_id,
message: msg,
}) => {
log::debug!("Received message {message_id} from {propagation_source}");
let Some(rpc) = self.rpcs.get(&msg.topic) else {
log::error!("Receive message for unknown topic {}", msg.topic);
continue;
};
let Ok((args, _)) = bincode::borrow_decode_from_slice::<LuaValue<'_>, _>(
&msg.data,
bincode::config::standard(),
) else {
log::error!(
"Failed to decode data for message {message_id} on topic {}",
msg.topic
);
continue;
};
args.push(lua);
let args_index = lua.gettop();
for fn_ref in rpc.listeners() {
lua.rawgeti(LUA_REGISTRYINDEX, *fn_ref);
lua.pushvalue(args_index);
lua.pcall(1, 0).wrap_err_with(|| {
format!("Failed to call listener for RPC '{}'", msg.topic)
})?;
}
}
SwarmEvent::Behaviour(Event::GossipsubNotSupported { peer_id }) => {
log::warn!(
"Peer {peer_id} does not support Gossipsub. Are they really a dt-p2p user?"
);
}
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint: _,
num_established,
concurrent_dial_errors: _,
established_in,
} => {
log::info!(
"Connection {} established with {} in {} ms. Total: {}",
connection_id,
peer_id,
established_in.as_millis(),
num_established
);
// TODO: Establish mods shared with this peer
// TODO: Call `on_peer_connected` listeners for the corresponding mods
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint: _,
num_established,
cause,
} => {
log::info!(
"Connection {} with {} closed for reason {:?}. Total: {}",
connection_id,
peer_id,
cause,
num_established
);
// TODO: Call `on_peer_disconnected` listeners for active mods
// TODO: Re-establish active mods
// With a peer dropping out, the amount of active mods may change if there is
// now only a single peer left for a mod.
}
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr: _,
send_back_addr: _,
error,
} => {
log::error!("Error on incoming connection {connection_id}: {error}");
}
SwarmEvent::OutgoingConnectionError {
connection_id,
peer_id: _,
error,
} => {
log::error!("Error on outgoing connection {connection_id}: {error}");
}
SwarmEvent::ListenerError { listener_id, error } => {
log::error!("Listener {listener_id} failed: {error}");
}
SwarmEvent::NewListenAddr {
listener_id,
address,
} => {
log::debug!("Listening on {address} with ID {listener_id}");
}
// TODO: Maybe add tracing information for the events we don't handle
_ => {}
}
}
Ok(())
}
pub(crate) fn create_rpc(&mut self, id: Identifier) {
let rpc = RPC::new();
self.rpcs.insert(id.to_topic(), rpc);
}
pub(crate) fn send_rpc(&self, id: Identifier, args: LuaValue<'_>) -> Result<()> {
if !self.rpcs.contains_key(&id.to_topic()) {
eyre::bail!("RPC '{}' does not exist", id);
}
let args = bincode::encode_to_vec(args, bincode::config::standard())
.wrap_err("Failed to encode RPC args")?;
let msg = RPCMessage::new(id, args);
self.send_tx
.send(msg)
.wrap_err("Failed to queue RPC call")?;
Ok(())
}
pub(crate) fn add_rpc_listener(&mut self, id: Identifier, fn_ref: i32) -> Result<()> {
let Some(rpc) = self.rpcs.get_mut(&id.to_topic()) else {
eyre::bail!("RPC '{}' does not exist", id);
};
rpc.add_listener(fn_ref);
Ok(())
}
pub(crate) fn add_connect_listener(&mut self, mod_name: &BStr, fn_ref: i32) {
let key = TopicHash::from_raw(mod_name.to_str_lossy());
self.connect_listeners.insert(key, fn_ref);
}
pub(crate) fn add_disconnect_listener(&mut self, mod_name: &BStr, fn_ref: i32) {
let key = TopicHash::from_raw(mod_name.to_str_lossy());
self.disconnect_listeners.insert(key, fn_ref);
}
} }
impl std::fmt::Debug for Plugin { impl std::fmt::Debug for Plugin {

View file

@ -155,6 +155,17 @@ typedef int (*lua_Writer)(lua_State *L, const void *p, size_t sz, void *ud);
typedef void *(*lua_Alloc)(void *ud, void *ptr, size_t osize, size_t nsize); typedef void *(*lua_Alloc)(void *ud, void *ptr, size_t osize, size_t nsize);
typedef struct luaL_Reg luaL_Reg; typedef struct luaL_Reg luaL_Reg;
#define LUA_REGISTRYINDEX (-10000)
#define LUA_ENVIRONINDEX (-10001)
#define LUA_GLOBALSINDEX (-10002)
#define LUA_OK 0
#define LUA_YIELD 1
#define LUA_ERRRUN 2
#define LUA_ERRSYNTAX 3
#define LUA_ERRMEM 4
#define LUA_ERRERR 5
/* /*
Interface to access Lua services. Interface to access Lua services.
*/ */

43
src/rpc.rs Normal file
View file

@ -0,0 +1,43 @@
use std::collections::HashSet;
use libp2p::gossipsub::TopicHash;
use crate::plugin::Identifier;
use crate::stingray_sdk::LuaRef;
#[derive(Debug)]
pub(crate) struct RPC {
listeners: HashSet<LuaRef>,
}
impl RPC {
pub fn new() -> Self {
Self {
listeners: Default::default(),
}
}
pub(crate) fn add_listener(&mut self, fn_ref: LuaRef) {
self.listeners.insert(fn_ref);
}
pub(crate) fn listeners(&self) -> impl Iterator<Item = &LuaRef> {
self.listeners.iter()
}
}
#[derive(Debug)]
pub(crate) struct RPCMessage {
pub topic: TopicHash,
pub data: Vec<u8>,
}
impl RPCMessage {
pub(crate) fn new(id: Identifier, data: impl Into<Vec<u8>>) -> Self {
let topic = id.to_topic();
Self {
topic,
data: data.into(),
}
}
}

View file

@ -11,13 +11,22 @@ mod bindings {
use std::ffi::CStr; use std::ffi::CStr;
use std::ffi::CString; use std::ffi::CString;
use std::os::raw::c_char; use std::os::raw::c_char;
use std::os::raw::c_int;
use std::os::raw::c_void; use std::os::raw::c_void;
pub use bindings::GetApiFunction; pub use bindings::GetApiFunction;
use bindings::LUA_OK;
pub use bindings::PluginApi; pub use bindings::PluginApi;
pub use bindings::PluginApiID; pub use bindings::PluginApiID;
use bindings::lua_CFunction; use bindings::lua_CFunction;
use bindings::lua_Integer;
use bindings::lua_Number;
pub use bindings::lua_State; pub use bindings::lua_State;
pub use bindings::{LUA_ENVIRONINDEX, LUA_GLOBALSINDEX, LUA_REGISTRYINDEX};
use bstr::BStr;
use color_eyre::Result;
use color_eyre::eyre;
use log::Level; use log::Level;
impl std::default::Default for PluginApi { impl std::default::Default for PluginApi {
@ -131,6 +140,17 @@ impl log::Log for LoggingApi {
fn flush(&self) {} fn flush(&self) {}
} }
#[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LuaStatus {
Ok = bindings::LUA_OK as i32,
Yield = bindings::LUA_YIELD as i32,
ErrRun = bindings::LUA_ERRRUN as i32,
ErrSyntax = bindings::LUA_ERRSYNTAX as i32,
ErrMem = bindings::LUA_ERRMEM as i32,
ErrErr = bindings::LUA_ERRERR as i32,
}
#[repr(i32)] #[repr(i32)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LuaType { pub enum LuaType {
@ -183,10 +203,38 @@ impl std::fmt::Display for LuaType {
} }
} }
pub type LuaRef = i32;
pub struct LuaApi { pub struct LuaApi {
add_module_function: unsafe extern "C" fn(*const c_char, *const c_char, lua_CFunction), add_module_function: unsafe extern "C" fn(*const c_char, *const c_char, lua_CFunction),
tolstring: unsafe extern "C" fn(*mut lua_State, i32, *mut usize) -> *const c_char, createtable: unsafe extern "C" fn(*mut lua_State, i32, i32),
getfield: unsafe extern "C" fn(*mut lua_State, i32, *const c_char),
gettable: unsafe extern "C" fn(*mut lua_State, i32),
gettop: unsafe extern "C" fn(*mut lua_State) -> i32,
isnumber: unsafe extern "C" fn(*mut lua_State, i32) -> c_int,
isnil: unsafe extern "C" fn(*mut lua_State, i32) -> c_int,
isstring: unsafe extern "C" fn(*mut lua_State, i32) -> c_int,
istable: unsafe extern "C" fn(*mut lua_State, i32) -> c_int,
lib_argerror: unsafe extern "C" fn(*mut lua_State, i32, *const c_char) -> i32,
lib_checklstring: unsafe extern "C" fn(*mut lua_State, i32, *mut usize) -> *const c_char,
lib_error: unsafe extern "C" fn(*mut lua_State, *const c_char, ...) -> i32,
lib_ref: unsafe extern "C" fn(*mut lua_State, i32) -> i32,
next: unsafe extern "C" fn(*mut lua_State, i32) -> i32,
objlen: unsafe extern "C" fn(*mut lua_State, i32) -> usize,
pcall: unsafe extern "C" fn(*mut lua_State, i32, i32, i32) -> i32,
pop: unsafe extern "C" fn(*mut lua_State),
pushboolean: unsafe extern "C" fn(*mut lua_State, i32),
pushinteger: unsafe extern "C" fn(*mut lua_State, lua_Integer),
pushnil: unsafe extern "C" fn(*mut lua_State),
pushnumber: unsafe extern "C" fn(*mut lua_State, lua_Number),
pushstring: unsafe extern "C" fn(*mut lua_State, *const c_char), pushstring: unsafe extern "C" fn(*mut lua_State, *const c_char),
pushvalue: unsafe extern "C" fn(*mut lua_State, i32),
rawgeti: unsafe extern "C" fn(*mut lua_State, i32, i32),
r#type: unsafe extern "C" fn(*mut lua_State, i32) -> i32,
settable: unsafe extern "C" fn(*mut lua_State, i32),
toboolean: unsafe extern "C" fn(*mut lua_State, i32) -> i32,
tolstring: unsafe extern "C" fn(*mut lua_State, i32, *mut usize) -> *const c_char,
tonumber: unsafe extern "C" fn(*mut lua_State, i32) -> f64,
} }
impl LuaApi { impl LuaApi {
@ -199,40 +247,211 @@ impl LuaApi {
unsafe { unsafe {
Self { Self {
add_module_function: (*api).add_module_function.unwrap_unchecked(), add_module_function: (*api).add_module_function.unwrap_unchecked(),
tolstring: (*api).tolstring.unwrap_unchecked(), createtable: (*api).createtable.unwrap_unchecked(),
getfield: (*api).getfield.unwrap_unchecked(),
gettable: (*api).gettable.unwrap_unchecked(),
gettop: (*api).gettop.unwrap_unchecked(),
isnumber: (*api).isnumber.unwrap_unchecked(),
isnil: (*api).isnil.unwrap_unchecked(),
isstring: (*api).isstring.unwrap_unchecked(),
istable: (*api).istable.unwrap_unchecked(),
lib_argerror: (*api).lib_argerror.unwrap_unchecked(),
lib_checklstring: (*api).lib_checklstring.unwrap_unchecked(),
lib_error: (*api).lib_error.unwrap_unchecked(),
lib_ref: (*api).lib_ref.unwrap_unchecked(),
next: (*api).next.unwrap_unchecked(),
objlen: (*api).objlen.unwrap_unchecked(),
pcall: (*api).pcall.unwrap_unchecked(),
pop: (*api).pop.unwrap_unchecked(),
pushboolean: (*api).pushboolean.unwrap_unchecked(),
pushinteger: (*api).pushinteger.unwrap_unchecked(),
pushnil: (*api).pushnil.unwrap_unchecked(),
pushnumber: (*api).pushnumber.unwrap_unchecked(),
pushstring: (*api).pushstring.unwrap_unchecked(), pushstring: (*api).pushstring.unwrap_unchecked(),
pushvalue: (*api).pushvalue.unwrap_unchecked(),
rawgeti: (*api).rawgeti.unwrap_unchecked(),
r#type: (*api).type_.unwrap_unchecked(),
settable: (*api).settable.unwrap_unchecked(),
toboolean: (*api).toboolean.unwrap_unchecked(),
tolstring: (*api).tolstring.unwrap_unchecked(),
tonumber: (*api).tonumber.unwrap_unchecked(),
} }
} }
} }
pub fn add_module_function( pub fn add_module_function(
&self, &self,
module: impl Into<Vec<u8>>, module: impl AsRef<CStr>,
name: impl Into<Vec<u8>>, name: impl AsRef<CStr>,
cb: extern "C" fn(*mut lua_State) -> i32, cb: extern "C" fn(*mut lua_State) -> i32,
) { ) {
let module = CString::new(module).expect("Invalid CString"); unsafe {
let name = CString::new(name).expect("Invalid CString"); (self.add_module_function)(module.as_ref().as_ptr(), name.as_ref().as_ptr(), Some(cb))
unsafe { (self.add_module_function)(module.as_ptr(), name.as_ptr(), Some(cb)) }
}
pub fn tolstring(&self, L: *mut lua_State, idx: i32) -> Option<&CStr> {
let mut len: usize = 0;
let c = unsafe { (self.tolstring)(L, idx, &mut len as *mut _) };
if len == 0 {
None
} else {
// Safety: As long as `len > 0`, Lua guarantees the constraints that `CStr::from_ptr`
// requires.
Some(unsafe { CStr::from_ptr(c) })
} }
} }
}
pub fn pushstring(&self, L: *mut lua_State, s: impl Into<Vec<u8>>) { /// A wrapper that combines a lua_State and a LuaApi to make calling functions more convenient.
pub struct LuaState<'a> {
l: *mut lua_State,
api: &'a LuaApi,
}
impl<'a> LuaState<'a> {
pub fn new(l: *mut lua_State, api: &'a LuaApi) -> Self {
Self { l, api }
}
pub fn gettop(&self) -> i32 {
unsafe { (self.api.gettop)(self.l) }
}
pub fn pushvalue(&self, idx: i32) {
unsafe { (self.api.pushvalue)(self.l, idx) }
}
pub fn isnil(&self, idx: i32) -> bool {
let is_nil = unsafe { (self.api.isnil)(self.l, idx) };
is_nil > 0
}
pub fn isnumber(&self, idx: i32) -> bool {
let is_number = unsafe { (self.api.isnumber)(self.l, idx) };
is_number > 0
}
pub fn isstring(&self, idx: i32) -> bool {
let is_string = unsafe { (self.api.isstring)(self.l, idx) };
is_string > 0
}
pub fn isfunction(&self, idx: i32) -> bool {
let is_fn = unsafe { (self.api.isfunction)(self.l, idx) };
is_fn > 0
}
pub fn istable(&self, idx: i32) -> bool {
let is_table = unsafe { (self.api.istable)(self.l, idx) };
is_table > 0
}
pub fn r#type(&self, idx: i32) -> LuaType {
let t = unsafe { (self.api.r#type)(self.l, idx) };
t.into()
}
pub fn tonumber(&self, idx: i32) -> f64 {
unsafe { (self.api.tonumber)(self.l, idx) }
}
pub fn toboolean(&self, idx: i32) -> bool {
let n = unsafe { (self.api.toboolean)(self.l, idx) };
n != 0
}
pub fn tostring(&self, idx: i32) -> &BStr {
let bytes = unsafe {
let mut len = 0usize;
// `c_char` is `i8` by default, but printable characters are all > 0, so we don't care.
let c = (self.api.tolstring)(self.l, idx, &mut len) as *const u8;
std::slice::from_raw_parts(c, len)
};
BStr::new(bytes)
}
pub fn objlen(&self, idx: i32) -> usize {
unsafe { (self.api.objlen)(self.l, idx) }
}
pub fn pushnil(&self) {
unsafe { (self.api.pushnil)(self.l) };
}
pub fn pushnumber(&self, n: f64) {
unsafe { (self.api.pushnumber)(self.l, n) };
}
pub fn pushinteger(&self, n: isize) {
unsafe { (self.api.pushinteger)(self.l, n) };
}
pub fn pushstring(&self, s: impl Into<Vec<u8>>) {
let s = CString::new(s).expect("Invalid CString"); let s = CString::new(s).expect("Invalid CString");
unsafe { (self.pushstring)(L, s.as_ptr()) } unsafe { (self.api.pushstring)(self.l, s.as_ptr()) };
}
pub fn pushboolean(&self, b: bool) {
unsafe { (self.api.pushboolean)(self.l, b as i32) }
}
pub fn gettable(&self, idx: i32) {
unsafe { (self.api.gettable)(self.l, idx) }
}
pub fn getfield(&self, idx: i32, k: impl AsRef<CStr>) {
unsafe { (self.api.getfield)(self.l, idx, k.as_ref().as_ptr()) }
}
pub fn createtable(&self, narr: i32, nrec: i32) {
unsafe { (self.api.createtable)(self.l, narr, nrec) }
}
pub fn settable(&self, idx: i32) {
unsafe { (self.api.settable)(self.l, idx) }
}
pub fn rawgeti(&self, idx: i32, n: i32) {
unsafe { (self.api.rawgeti)(self.l, idx, n) }
}
pub fn call(&self, nargs: i32, nresults: i32) {
self.pcall(nargs, nresults).unwrap()
}
pub fn pcall(&self, nargs: i32, nresults: i32) -> Result<()> {
// TODO: Re-create the engine's error handler function to populate the stack trace and
// local variables.
let res = unsafe { (self.api.pcall)(self.l, nargs, nresults, 0) };
if res as u32 == LUA_OK {
return Ok(());
}
let err = self.tostring(-1);
eyre::bail!("pcall failed: {}", err)
}
pub fn next(&self, idx: i32) -> i32 {
unsafe { (self.api.next)(self.l, idx) }
}
pub fn lib_argerror(&self, narg: i32, msg: impl AsRef<CStr>) -> i32 {
unsafe { (self.api.lib_argerror)(self.l, narg, msg.as_ref().as_ptr()) }
}
pub fn lib_checklstring(&self, idx: i32) -> &BStr {
let bytes = unsafe {
let mut len = 0usize;
// `c_char` is `i8` by default, but printable characters are all > 0, so we don't care.
let c = (self.api.lib_checklstring)(self.l, idx, &mut len) as *const u8;
std::slice::from_raw_parts(c, len)
};
BStr::new(bytes)
}
// Lua's `luaL_error` does have printf-like formatting capabilities,
// but since we can just use `format!()`, we don't need to bother handling the varargs here.
pub fn lib_error(&self, msg: impl Into<Vec<u8>>) -> i32 {
let s = CString::new(msg).expect("Invalid CString");
unsafe { (self.api.lib_error)(self.l, s.as_ptr()) }
}
pub fn lib_ref(&self, t: i32) -> LuaRef {
unsafe { (self.api.lib_ref)(self.l, t) }
}
// NOTE: This is not like the standard `lua_pop`, it only pops one value at a time.
// If more values need to be popped, use `settop(-x)`.
pub fn pop(&self) {
unsafe { (self.api.pop)(self.l) };
} }
} }