From 3b9d652b7a7017e0feeab7d9bc5e6f332535ef85 Mon Sep 17 00:00:00 2001 From: Lucas Schwiderski Date: Fri, 30 May 2025 11:53:37 +0200 Subject: [PATCH] WIP --- src/lib.rs | 4 +- src/lua.rs | 207 ++++++++++++------- src/plugin.rs | 477 +++++++++++++++++++++++++++++++++++--------- src/rpc.rs | 19 -- src/stingray_sdk.rs | 21 +- 5 files changed, 541 insertions(+), 187 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 57ffed6..78ef962 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,9 @@ pub extern "C" fn setup_game(get_engine_api: GetApiFunction) { }; plugin.setup_game(); - PLUGIN.set(RwLock::new(plugin)); + PLUGIN + .set(RwLock::new(plugin)) + .expect("Failed to set global plugin instance"); } #[unsafe(no_mangle)] diff --git a/src/lua.rs b/src/lua.rs index c045a51..7570734 100644 --- a/src/lua.rs +++ b/src/lua.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::ffi::CStr; use std::panic::catch_unwind; use bstr::BStr; @@ -8,6 +9,7 @@ use color_eyre::eyre; use color_eyre::eyre::Context as _; use crate::plugin::Identifier; +use crate::plugin::ModCallbacks; use crate::plugin::Plugin; use crate::stingray_sdk::LUA_REGISTRYINDEX; use crate::stingray_sdk::{LuaState, LuaType, lua_State}; @@ -15,25 +17,34 @@ use crate::{LUA, global, plugin_mut}; pub const NAMESPACE_SEPARATOR: char = '.'; -static RPC_CALLBACK_KEY: &str = "dt-p2p.rpc-callbacks"; - // As a minor optimization, only `catch_unwind` in debug mode. #[cfg(debug_assertions)] fn lua_wrapper(l: *mut lua_State, f: F) -> i32 where F: FnOnce(&mut Plugin, &LuaState) -> Result + std::panic::UnwindSafe, { - let lua = LuaState::new(l, global!(LUA)); + let lua = global!(LUA); - let res = catch_unwind(|| { - let mut plugin = plugin_mut!(); - f(&mut plugin, &lua) - }); - match res { - Ok(Ok(i)) => i, - Ok(Err(err)) => lua.lib_error(format!("{:?}", err)), - Err(err) => lua.lib_error(format!("{:?}", err)), + // We need to drop as many things as possible before the `lua_error` call + // will longjmp and thereby ignoring Rust's cleanup. + { + let lua = LuaState::new(l, lua); + + let res = catch_unwind(|| { + let mut plugin = plugin_mut!(); + f(&mut plugin, &lua) + }); + match res { + Ok(Ok(i)) => { + return i; + } + Ok(Err(err)) => lua.pushstring(format!("{:?}", err)), + Err(err) => lua.pushstring(format!("{:?}", err)), + } } + + lua.error(l); + 0 } #[cfg(not(debug_assertions))] @@ -41,14 +52,25 @@ fn lua_wrapper(l: *mut lua_State, f: F) -> i32 where F: FnOnce(&mut Plugin, &LuaState) -> Result, { - let lua = LuaState::new(l, global!(LUA)); - let plugin = plugin_mut!(); + let lua = global!(LUA); - let res = f(&mut plugin, &lua); - match res { - Ok(i) => i, - Err(err) => lua.lib_error(format!("{:?}", err)), + // We need to drop as many things as possible before the `lua_error` call + // will longjmp and thereby ignoring Rust's cleanup. + { + let lua = LuaState::new(l, lua); + + let mut plugin = plugin_mut!(); + match f(&mut plugin, &lua) { + Ok(Ok(i)) => { + return i; + } + Ok(Err(err)) => lua.pushstring(format!("{:?}", err)), + Err(err) => lua.pushstring(format!("{:?}", err)), + } } + + lua.error(l); + 0 } /// Returns the namespace and name for a RPC. @@ -72,6 +94,64 @@ fn get_identifier<'a>(lua: &'a LuaState) -> Result> { Ok(Identifier::new(namespace, name)) } +// Called with `(mod: table, callbacks: table)` +pub extern "C" fn register_mod(l: *mut lua_State) -> i32 { + lua_wrapper(l, |plugin, lua| { + if !lua.istable(1) { + eyre::bail!("bad argument #1, expected a mod object"); + } + + let name = { + lua.getfield(1, c"name"); + if !lua.isstring(-1) { + eyre::bail!("bad argument #1, not a mod object"); + } + + lua.tostring(-1) + }; + + let callbacks = match lua.r#type(2) { + LuaType::Nil => ModCallbacks::default(), + LuaType::Table => { + let get_callback = |name: &CStr| { + lua.getfield(2, name); + + match lua.r#type(-1) { + LuaType::Nil => Ok(None), + LuaType::Function => Ok(Some(lua.lib_ref(LUA_REGISTRYINDEX))), + x => { + eyre::bail!( + "bad argument #2, expected a function for field {}, got {}", + name.to_string_lossy(), + x + ); + } + } + }; + + ModCallbacks { + on_session_joined: get_callback(c"on_session_joined")?, + on_session_left: get_callback(c"on_session_left")?, + on_user_joined: get_callback(c"on_user_joined")?, + on_user_left: get_callback(c"on_user_left")?, + } + } + x => { + eyre::bail!("bad argument #2, expected a table got {}", x); + } + }; + + plugin.register_mod(name, callbacks); + + // TODO: Register unload and disable handlers? + // Will likely have to monkey-patch the mod object. + // DMF doesn't expose a good way to hook into it, and it wouldn't be feasible to expect a + // DMF update to allow it. + + Ok(0) + }) +} + // Called with `(mod: table, name: string, opts: table)` pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 { lua_wrapper(l, |plugin, lua| { @@ -99,7 +179,10 @@ pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 { version } x => { - eyre::bail!("invalid type {} for 'version' field", x) + eyre::bail!( + "invalid type {} for 'version' field, must be number or a version string", + x + ) } }; @@ -113,14 +196,14 @@ pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 { type LuaMap<'a> = HashMap, LuaValue<'a>>; #[derive(bincode::BorrowDecode, bincode::Encode, PartialEq)] -enum LuaKey<'a> { +pub enum LuaKey<'a> { String(&'a [u8]), Number(f64), } -impl<'a> Eq for LuaKey<'a> {} +impl Eq for LuaKey<'_> {} -impl<'a> std::hash::Hash for LuaKey<'a> { +impl std::hash::Hash for LuaKey<'_> { fn hash(&self, state: &mut H) { match self { LuaKey::String(s) => s.hash(state), @@ -141,7 +224,7 @@ pub enum LuaValue<'a> { impl<'a> LuaValue<'a> { fn parse_table(lua: &'a LuaState, idx: i32) -> Result { let mut values = LuaMap::default(); - // + // `nil` as initial key tells `lua_next` to pick the first key. lua.pushnil(); @@ -156,7 +239,7 @@ impl<'a> LuaValue<'a> { t => eyre::bail!("Unsupported type {} for args table key", t), }; - let value = Self::parse(lua, -1)?; + let value = Self::get_from_stack(lua, -1)?; values.insert(key, value); @@ -170,7 +253,8 @@ impl<'a> LuaValue<'a> { Ok(LuaValue::Table(values)) } - pub fn parse(lua: &'a LuaState, idx: i32) -> Result { + /// Gets the value from the stack at `idx` + pub fn get_from_stack(lua: &'a LuaState, idx: i32) -> Result { match lua.r#type(idx) { LuaType::None | LuaType::Nil => Ok(LuaValue::Nil), LuaType::Boolean => Ok(LuaValue::Boolean(lua.toboolean(-1))), @@ -181,7 +265,8 @@ impl<'a> LuaValue<'a> { } } - pub fn push(&self, lua: &LuaState) { + /// Pushes the value onto a Lua stack + pub fn push_to_stack(&self, lua: &LuaState) { match self { LuaValue::Nil => lua.pushnil(), LuaValue::Boolean(bool) => lua.pushboolean(*bool), @@ -206,7 +291,7 @@ impl<'a> LuaValue<'a> { LuaKey::Number(num) => lua.pushnumber(*num), } - v.push(lua); + v.push_to_stack(lua); lua.settable(tbl_index); } @@ -261,73 +346,47 @@ pub extern "C" fn receive_rpc(l: *mut lua_State) -> i32 { lua.pushvalue(3); let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX); - plugin.add_rpc_listener(id, fn_ref); + plugin + .add_rpc_listener(id, fn_ref) + .wrap_err("Failed to add RPC listener")?; Ok(0) }) } -// Called with `(mod: table, callback: Fn(peer_id))` -pub extern "C" fn on_peer_connected(l: *mut lua_State) -> i32 { +// Called with `(server_name: string, peer_id: string, player_id: string)` +pub extern "C" fn join_session(l: *mut lua_State) -> i32 { lua_wrapper(l, |plugin, lua| { - if !lua.istable(1) { - eyre::bail!("bad argument #1, expected a mod object"); + if !lua.isstring(1) { + eyre::bail!("bad argument #1, expected a string"); } - if !lua.isfunction(2) { - eyre::bail!("bad argument #2, expected a function"); + if !lua.isstring(2) { + eyre::bail!("bad argument #2, expected a string"); } - lua.getfield(1, c"name"); - if !lua.isstring(-1) { - eyre::bail!("bad argument #1, not a mod object"); + if !lua.isstring(3) { + eyre::bail!("bad argument #3, expected a string"); } - let mod_name = lua.tostring(-1); - lua.pushvalue(3); - let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX); + let server_name = lua.tostring(1); + let peer_id = lua.tostring(2); + let player_id = lua.tostring(3); - plugin.add_connect_listener(mod_name, fn_ref); + plugin + .join_session(lua, server_name, peer_id, player_id) + .wrap_err("Failed to join session")?; Ok(0) }) } -// Called with `(mod: table, callback: Fn(peer_id))` -pub extern "C" fn on_peer_disconnected(l: *mut lua_State) -> i32 { +// Called with `()` +pub extern "C" fn leave_session(l: *mut lua_State) -> i32 { lua_wrapper(l, |plugin, lua| { - if !lua.istable(1) { - eyre::bail!("bad argument #1, expected a mod object"); - } - - if !lua.isfunction(2) { - eyre::bail!("bad argument #2, expected a function"); - } - - lua.getfield(1, c"name"); - if !lua.isstring(-1) { - eyre::bail!("bad argument #1, expected a mod object"); - } - - let mod_name = lua.tostring(-1); - lua.pushvalue(3); - let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX); - - plugin.add_disconnect_listener(mod_name, fn_ref); - - Ok(0) - }) -} - -// Called with `(dt: float)` -pub extern "C" fn update(l: *mut lua_State) -> i32 { - lua_wrapper(l, |plugin, lua| { - if !lua.isnumber(1) { - eyre::bail!("bad argument #1, expected a number"); - } - - let dt = lua.tonumber(1); - plugin.lua_update(dt, lua)?; + plugin + .leave_session(lua) + .wrap_err("Failed to leave session")?; Ok(0) }) diff --git a/src/plugin.rs b/src/plugin.rs index 5f1e266..e7f4045 100644 --- a/src/plugin.rs +++ b/src/plugin.rs @@ -1,25 +1,38 @@ use std::collections::HashMap; -use std::hash::{DefaultHasher, Hash as _, Hasher as _}; +use std::hash::{DefaultHasher, Hash, Hasher as _}; +use std::mem; +use std::ops::Deref; use std::thread; use std::thread::JoinHandle; -use bstr::{BStr, BString, ByteSlice}; +use bstr::{BStr, BString}; +use color_eyre::Result; +use color_eyre::eyre; use color_eyre::eyre::Context as _; -use color_eyre::{Result, eyre}; +use libp2p::PeerId; use libp2p::futures::StreamExt; -use libp2p::gossipsub::{AllowAllSubscriptionFilter, Event, IdentityTransform, TopicHash}; +use libp2p::gossipsub; +use libp2p::gossipsub::{ + AllowAllSubscriptionFilter, Event, IdentTopic, IdentityTransform, Message, TopicHash, +}; +use libp2p::noise; use libp2p::swarm::SwarmEvent; -use libp2p::{gossipsub, noise, tcp, yamux}; +use libp2p::tcp; +use libp2p::yamux; +use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::error::TryRecvError; -use tokio::sync::mpsc::{self, UnboundedSender}; +use crate::lua; use crate::lua::{LuaValue, NAMESPACE_SEPARATOR}; -use crate::rpc::{RPC, RPCMessage}; +use crate::rpc::RPC; use crate::stingray_sdk::{LUA_REGISTRYINDEX, LuaRef, LuaState}; -use crate::{CHANNEL_BUFFER_SIZE, EVENTS_PER_TICK, lua}; +use crate::{CHANNEL_BUFFER_SIZE, EVENTS_PER_TICK}; use crate::{LUA, MODULE_NAME, global}; +type ModName = BString; + #[derive(Clone, Copy, Debug)] pub struct Identifier<'a> { name: &'a BStr, @@ -30,9 +43,11 @@ impl<'a> Identifier<'a> { pub fn new(namespace: &'a BStr, name: &'a BStr) -> Self { Self { namespace, name } } +} - pub(crate) fn to_topic(self) -> TopicHash { - TopicHash::from_raw(format!("{}", self)) +impl From> for String { + fn from(value: Identifier<'_>) -> Self { + format!("{}", value) } } @@ -42,46 +57,109 @@ impl std::fmt::Display for Identifier<'_> { } } +#[derive(Debug, bincode::Encode, bincode::Decode)] +// To avoid having to implement `bincode` for `bstr` types, we use raw `Vec` here, +// which can be turned into `BString` cheaply. +pub(crate) enum SwarmMessage { + // To avoid extra cloning, or having to deal with lifetimes, + // this message type contains the Lua args already encoded. + Rpc(Vec), + SessionJoin { + /// The in-game player ID for this peer + player_id: Vec, + /// A list of mod names this peer has enabled + mods: Vec>, + }, + SessionLeave, +} + #[derive(Debug)] -struct NamespacedMap { - map: HashMap>, +pub(crate) enum SwarmTask { + Message { topic: Topic, msg: SwarmMessage }, + Subscribe(Topic), + Unsubscribe(Vec), } -impl NamespacedMap { - pub fn insert(&mut self, id: Identifier, val: T) { - if !self.map.contains_key(id.namespace) { - self.map.insert(id.namespace.to_owned(), Default::default()); - } +#[derive(Clone, Debug)] +pub(crate) struct Topic(IdentTopic); - let map = self - .map - .get_mut(id.namespace) - .expect("entry is verified to exist"); - map.insert(id.name.to_owned(), val); - } - - pub fn get(&self, id: &Identifier) -> Option<&T> { - self.map.get(id.namespace).and_then(|map| map.get(id.name)) +impl Eq for Topic {} +impl PartialEq for Topic { + fn eq(&self, other: &Self) -> bool { + self.0.hash() == other.0.hash() } } -impl Default for NamespacedMap { - fn default() -> Self { - Self { - map: Default::default(), - } +impl Hash for Topic { + fn hash(&self, state: &mut H) { + self.0.hash().hash(state); } } -enum Task {} +impl std::fmt::Display for Topic { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::ops::Deref for Topic { + type Target = IdentTopic; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for TopicHash { + fn from(value: Topic) -> Self { + Self::from(value.0) + } +} + +#[derive(Debug)] +struct Player { + id: BString, + mods: Vec, +} + +#[derive(Default)] +pub(crate) struct ModCallbacks { + pub on_session_joined: Option, + pub on_session_left: Option, + pub on_user_joined: Option, + pub on_user_left: Option, +} + +struct Session { + online: bool, + address: String, + peer_id: BString, + topics: HashMap, + peers: HashMap, +} + +impl Session { + pub fn new_topic(&mut self, topic: impl Into) -> &Topic { + self.topics + .entry(TopicHash::from_raw(topic.into())) + .or_insert_with_key(|topic| { + Topic(IdentTopic::new(format!("{}/{}", self.address, topic))) + }) + } + + pub fn get_topic(&self, topic: impl Into) -> Option<&Topic> { + let hash = TopicHash::from_raw(topic.into()); + self.topics.get(&hash) + } +} pub(crate) struct Plugin { swarm_thread: JoinHandle>, event_rx: Receiver>, - send_tx: UnboundedSender, - rpcs: HashMap, - connect_listeners: HashMap, - disconnect_listeners: HashMap, + send_tx: UnboundedSender, + session: Session, + rpcs: HashMap, + mods: HashMap, } impl Plugin { @@ -138,7 +216,7 @@ impl Plugin { let (event_tx, event_rx) = mpsc::channel::>(CHANNEL_BUFFER_SIZE); // Since the `Sender` will be on the sync side, and we want to avoid blocking, we make this // unbounded to limit the chance for it to actually block. - let (send_tx, rpc_rx) = mpsc::unbounded_channel::(); + let (send_tx, send_rx) = mpsc::unbounded_channel::(); let swarm_thread = thread::Builder::new() .name("p2p-swarm".into()) @@ -148,7 +226,7 @@ impl Plugin { .wrap_err("Failed to create tokio runtime")?; let mut swarm = swarm; - let mut rpc_rx = rpc_rx; + let mut send_rx = send_rx; rt.block_on(async move { loop { @@ -156,11 +234,32 @@ impl Plugin { event = swarm.select_next_some() => { event_tx.send(event).await.wrap_err("Failed to queue event")?; } - msg = rpc_rx.recv() => { - let Some(msg) = msg else { - eyre::bail!("RPC queue closed prematurely"); + task = send_rx.recv() => { + let Some(task) = task else { + eyre::bail!("Task queue closed prematurely"); }; - swarm.behaviour_mut().publish(msg.topic, msg.data).wrap_err("Failed to send RPC")?; + + let behavior = swarm.behaviour_mut(); + + match task{ + SwarmTask::Message { topic, msg } => { + let data = bincode::encode_to_vec(msg, bincode::config::standard()) + .wrap_err("Failed to encode swarm message")?; + behavior + .publish(topic, data) + .wrap_err("Failed to send message")?; + }, + SwarmTask::Subscribe(topic) => { + behavior + .subscribe(&topic) + .wrap_err_with(|| format!("Failed to subscribe to topic {}", topic))?; + }, + SwarmTask::Unsubscribe(topics) => { + for topic in topics { + behavior.unsubscribe(&topic); + } + } + } } } } @@ -168,51 +267,64 @@ impl Plugin { }) .wrap_err("Failed to create p2p-swarm thread")?; + log::info!("Plugin initialized"); + Ok(Self { swarm_thread, event_rx, send_tx, + session: Session { + online: false, + address: Default::default(), + peer_id: Default::default(), + topics: Default::default(), + peers: Default::default(), + }, rpcs: Default::default(), - connect_listeners: Default::default(), - disconnect_listeners: Default::default(), + mods: Default::default(), }) } pub fn setup_game(&self) { let lua = global!(LUA); - lua.add_module_function(MODULE_NAME, c"update", lua::update); + // Internal API + // TODO: Maybe useful to move these to a separate module, e.g. `P2PInternal`? + lua.add_module_function(MODULE_NAME, c"join_session", lua::join_session); + lua.add_module_function(MODULE_NAME, c"leave_session", lua::leave_session); + // User-facing API + lua.add_module_function(MODULE_NAME, c"register_mod", lua::register_mod); lua.add_module_function(MODULE_NAME, c"create_rpc", lua::create_rpc); lua.add_module_function(MODULE_NAME, c"send_rpc", lua::send_rpc); lua.add_module_function(MODULE_NAME, c"receive_rpc", lua::receive_rpc); - // TODO: Handle mod reloads - lua.add_module_function(MODULE_NAME, c"on_peer_connected", lua::on_peer_connected); - lua.add_module_function( - MODULE_NAME, - c"on_peer_disconnected", - lua::on_peer_disconnected, - ); - - log::info!("Plugin initialized"); + log::debug!("Lua functions registered"); } pub fn shutdown_game(&self) { // TODO: Find a way to send a close command, and make it blocking. } - // WARN: Due to how the instance of `Plugin` is currently stored and - // accessed by Lua functions, things will likely break if we attempt to get a `lua_State` here - // and trigger out own Lua functions. - pub fn update_game(&mut self, _dt: f32) {} + pub fn update_game(&mut self, dt: f32) { + if let Err(err) = self.update(dt) { + log::error!("{:?}", err); + // TODO: Exit application or find a better way to report the error + } + } - pub fn lua_update(&mut self, _dt: f64, lua: &LuaState) -> Result<()> { + pub fn update(&mut self, _dt: f32) -> Result<()> { if self.swarm_thread.is_finished() { eyre::bail!("p2p-swarm thread terminated prematurely",); // TODO: Move `self.swarm_thread` into a data structure that I can move it out of here, // so that I can call `.join` and get the error it ended with. } + let lua = { + let lua = global!(LUA); + let l = lua.getscriptenvironmentstate(); + LuaState::new(l, lua) + }; + // Limit the amount of events we handle per tick to avoid potential large spikes in // frame time. // TODO: Add a system to monitor the amount of incoming vs processed events, and add @@ -233,34 +345,125 @@ impl Plugin { SwarmEvent::Behaviour(Event::Message { propagation_source, message_id, - message: msg, + message: + Message { + topic, + data, + source, + sequence_number: _, + }, }) => { - log::debug!("Received message {message_id} from {propagation_source}"); - let Some(rpc) = self.rpcs.get(&msg.topic) else { - log::error!("Receive message for unknown topic {}", msg.topic); + log::debug!( + "Received message {message_id} from {propagation_source} for topic {}", + topic + ); + + let Some(topic) = self.session.topics.get(&topic) else { + log::warn!("Received message for unknown topic {}", topic); continue; }; - let Ok((args, _)) = bincode::borrow_decode_from_slice::, _>( - &msg.data, + let Ok((msg, _)) = bincode::borrow_decode_from_slice::( + &data, bincode::config::standard(), ) else { log::error!( "Failed to decode data for message {message_id} on topic {}", - msg.topic + topic ); continue; }; - args.push(lua); - let args_index = lua.gettop(); + match msg { + SwarmMessage::Rpc(rpc_args) => { + let Some(rpc) = self.rpcs.get(topic) else { + log::warn!("Topic {} does not have an RPC", topic); + continue; + }; - for fn_ref in rpc.listeners() { - lua.rawgeti(LUA_REGISTRYINDEX, *fn_ref); - lua.pushvalue(args_index); - lua.pcall(1, 0).wrap_err_with(|| { - format!("Failed to call listener for RPC '{}'", msg.topic) - })?; + let Ok((args, _)) = bincode::borrow_decode_from_slice::, _>( + &rpc_args, + bincode::config::standard(), + ) else { + log::error!( + "Failed to decode data for message {message_id} on topic {}", + topic + ); + continue; + }; + + args.push_to_stack(&lua); + let args_index = lua.gettop(); + + for fn_ref in rpc.listeners() { + lua.rawgeti(LUA_REGISTRYINDEX, *fn_ref); + lua.pushvalue(args_index); + lua.pcall(1, 0).wrap_err_with(|| { + format!("Failed to call listener for RPC '{}'", topic) + })?; + } + } + SwarmMessage::SessionJoin { player_id, mods } => match source { + Some(peer_id) => { + let id = BString::new(player_id); + let mods = mods.into_iter().map(ModName::new).collect::>(); + let peer = Player { id, mods }; + + // TODO: Check if a corresponding player is in the lobby + + { + let callbacks = self + .mods + .iter() + .filter(|(name, _)| peer.mods.contains(name)) + .filter_map(|(_, cbs)| cbs.on_user_joined); + + lua.pushstring(peer.id.clone()); + let arg_index = lua.gettop(); + + for fn_ref in callbacks { + lua.pushnumber(fn_ref as f64); + lua.gettable(LUA_REGISTRYINDEX); + lua.pushvalue(arg_index); + lua.pcall(1, 0) + .wrap_err("on_user_joined handler failed")?; + } + } + + self.session.peers.insert(peer_id, peer); + } + None => { + log::warn!( + "Got SessionJoin event without a source peer_id. Don't know how to handle." + ); + } + }, + SwarmMessage::SessionLeave => match source { + Some(peer_id) => { + if let Some(peer) = self.session.peers.remove(&peer_id) { + let callbacks = self + .mods + .iter() + .filter(|(name, _)| peer.mods.contains(name)) + .filter_map(|(_, cbs)| cbs.on_user_left); + + lua.pushstring(peer.id); + let arg_index = lua.gettop(); + + for fn_ref in callbacks { + lua.pushnumber(fn_ref as f64); + lua.gettable(LUA_REGISTRYINDEX); + lua.pushvalue(arg_index); + lua.pcall(1, 0).wrap_err("on_user_left handler failed")?; + } + } + } + None => { + log::warn!( + "Got SessionLeave event without a source peer_id. Don't know how to handle." + ); + } + }, } } SwarmEvent::Behaviour(Event::GossipsubNotSupported { peer_id }) => { @@ -283,8 +486,9 @@ impl Plugin { established_in.as_millis(), num_established ); - // TODO: Establish mods shared with this peer - // TODO: Call `on_peer_connected` listeners for the corresponding mods + // TODO: Start a timeout to wait for a `SwarmMessage::SessionJoin` to show a + // warning when a client connected but didn't join the session. + // Not sure if that should ever happen. } SwarmEvent::ConnectionClosed { peer_id, @@ -300,10 +504,13 @@ impl Plugin { cause, num_established ); - // TODO: Call `on_peer_disconnected` listeners for active mods - // TODO: Re-establish active mods - // With a peer dropping out, the amount of active mods may change if there is - // now only a single peer left for a mod. + if self.session.peers.contains_key(&peer_id) { + log::warn!("Peer dropped connection without properly leaving session!"); + // TODO: Start a timeout and if the peer doesn't come back, remove the peer + // and trigger "user left" callbacks. + // TODO: Maybe also check if a corresponding player is still in the game + // lobby. + } } SwarmEvent::IncomingConnectionError { connection_id, @@ -337,20 +544,34 @@ impl Plugin { Ok(()) } + pub(crate) fn register_mod(&mut self, name: &BStr, callbacks: ModCallbacks) { + self.mods.insert(name.to_owned(), callbacks); + } + pub(crate) fn create_rpc(&mut self, id: Identifier) { let rpc = RPC::new(); - self.rpcs.insert(id.to_topic(), rpc); + let topic = self.session.new_topic(id); + self.rpcs.insert(topic.clone(), rpc); } pub(crate) fn send_rpc(&self, id: Identifier, args: LuaValue<'_>) -> Result<()> { - if !self.rpcs.contains_key(&id.to_topic()) { + let Some(topic) = self.session.get_topic(id) else { + eyre::bail!("Topic for this RPC does not exist"); + }; + + if !self.rpcs.contains_key(topic) { eyre::bail!("RPC '{}' does not exist", id); } let args = bincode::encode_to_vec(args, bincode::config::standard()) .wrap_err("Failed to encode RPC args")?; - let msg = RPCMessage::new(id, args); + // TODO: Add metrics/profiling for size of the encoded args + + let msg = SwarmTask::Message { + topic: topic.clone(), + msg: SwarmMessage::Rpc(args), + }; self.send_tx .send(msg) .wrap_err("Failed to queue RPC call")?; @@ -358,24 +579,92 @@ impl Plugin { Ok(()) } - pub(crate) fn add_rpc_listener(&mut self, id: Identifier, fn_ref: i32) -> Result<()> { - let Some(rpc) = self.rpcs.get_mut(&id.to_topic()) else { + pub(crate) fn add_rpc_listener(&mut self, id: Identifier, fn_ref: LuaRef) -> Result<()> { + let topic = self.session.new_topic(id); + let Some(rpc) = self.rpcs.get_mut(topic) else { eyre::bail!("RPC '{}' does not exist", id); }; rpc.add_listener(fn_ref); + self.send_tx + .send(SwarmTask::Subscribe(topic.clone())) + .wrap_err("Failed to subscribe to RPC topic")?; Ok(()) } - pub(crate) fn add_connect_listener(&mut self, mod_name: &BStr, fn_ref: i32) { - let key = TopicHash::from_raw(mod_name.to_str_lossy()); - self.connect_listeners.insert(key, fn_ref); + pub(crate) fn join_session( + &mut self, + lua: &LuaState, + server_name: &BStr, + peer_id: &BStr, + player_id: &BStr, + ) -> Result<()> { + self.session.peer_id = peer_id.to_owned(); + self.session.address = format!("/{:08X}", hash(server_name)); + + let session_topic = self.session.new_topic("session").clone(); + + self.send_tx + .send(SwarmTask::Subscribe(session_topic.clone())) + .wrap_err("Failed to subscribe to session topic")?; + + // TODO: Is there a way to do this without the intermediate `Vec`? + // Probably not, since we cannot keep an immutable reference on `self.rpcs`, while + // also trying to get a mutable reference on `self` for `self.subscribe`. + let topics = self.rpcs.keys().cloned().collect::>(); + for topic in topics { + self.send_tx + .send(SwarmTask::Subscribe(topic.clone())) + .wrap_err_with(|| format!("Failed to subscribe to {}", topic))?; + } + + self.session.online = true; + + let msg = SwarmTask::Message { + topic: session_topic, + msg: SwarmMessage::SessionJoin { + player_id: player_id.to_vec(), + mods: self.mods.keys().map(|name| name.to_vec()).collect(), + }, + }; + self.send_tx + .send(msg) + .wrap_err("Failed to queue RPC call")?; + + // TODO: Do I want to wait for a reply or some sort of confirmation? + // Currently, we just assume this works always. + + for fn_ref in self.mods.values().filter_map(|cbs| cbs.on_session_joined) { + lua.pushnumber(fn_ref as f64); + lua.gettable(LUA_REGISTRYINDEX); + lua.pcall(0, 0) + .wrap_err("on_session_joined handler failed")?; + } + + Ok(()) } - pub(crate) fn add_disconnect_listener(&mut self, mod_name: &BStr, fn_ref: i32) { - let key = TopicHash::from_raw(mod_name.to_str_lossy()); - self.disconnect_listeners.insert(key, fn_ref); + pub(crate) fn leave_session(&mut self, lua: &LuaState) -> Result<()> { + if !self.session.online { + log::warn!("There is no session to leave"); + return Ok(()); + } + + let topics = mem::take(&mut self.session.topics); + self.send_tx + .send(SwarmTask::Unsubscribe(topics.into_values().collect())) + .wrap_err("Failed to queue unsubscribe task")?; + + self.session.online = false; + + for fn_ref in self.mods.values().filter_map(|cbs| cbs.on_session_left) { + lua.pushnumber(fn_ref as f64); + lua.gettable(LUA_REGISTRYINDEX); + lua.pcall(0, 0).wrap_err("on_session_left handler failed")?; + } + + Ok(()) } } @@ -384,3 +673,9 @@ impl std::fmt::Debug for Plugin { f.write_str("PluginApi") } } + +fn hash(val: impl AsRef<[u8]>) -> u64 { + let mut hasher = DefaultHasher::new(); + hasher.write(val.as_ref()); + hasher.finish() +} diff --git a/src/rpc.rs b/src/rpc.rs index eaff863..27a0995 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,8 +1,5 @@ use std::collections::HashSet; -use libp2p::gossipsub::TopicHash; - -use crate::plugin::Identifier; use crate::stingray_sdk::LuaRef; #[derive(Debug)] @@ -25,19 +22,3 @@ impl RPC { self.listeners.iter() } } - -#[derive(Debug)] -pub(crate) struct RPCMessage { - pub topic: TopicHash, - pub data: Vec, -} - -impl RPCMessage { - pub(crate) fn new(id: Identifier, data: impl Into>) -> Self { - let topic = id.to_topic(); - Self { - topic, - data: data.into(), - } - } -} diff --git a/src/stingray_sdk.rs b/src/stingray_sdk.rs index c310280..1143a73 100644 --- a/src/stingray_sdk.rs +++ b/src/stingray_sdk.rs @@ -208,6 +208,8 @@ pub type LuaRef = i32; pub struct LuaApi { add_module_function: unsafe extern "C" fn(*const c_char, *const c_char, lua_CFunction), createtable: unsafe extern "C" fn(*mut lua_State, i32, i32), + error: unsafe extern "C" fn(*mut lua_State) -> i32, + getscriptenvironmentstate: unsafe extern "C" fn() -> *mut lua_State, getfield: unsafe extern "C" fn(*mut lua_State, i32, *const c_char), gettable: unsafe extern "C" fn(*mut lua_State, i32), gettop: unsafe extern "C" fn(*mut lua_State) -> i32, @@ -248,6 +250,8 @@ impl LuaApi { Self { add_module_function: (*api).add_module_function.unwrap_unchecked(), createtable: (*api).createtable.unwrap_unchecked(), + error: (*api).error.unwrap_unchecked(), + getscriptenvironmentstate: (*api).getscriptenvironmentstate.unwrap_unchecked(), getfield: (*api).getfield.unwrap_unchecked(), gettable: (*api).gettable.unwrap_unchecked(), gettop: (*api).gettop.unwrap_unchecked(), @@ -289,6 +293,14 @@ impl LuaApi { (self.add_module_function)(module.as_ref().as_ptr(), name.as_ref().as_ptr(), Some(cb)) } } + + pub fn getscriptenvironmentstate(&self) -> *mut lua_State { + unsafe { (self.getscriptenvironmentstate)() } + } + + pub fn error(&self, l: *mut lua_State) { + unsafe { (self.error)(l) }; + } } /// A wrapper that combines a lua_State and a LuaApi to make calling functions more convenient. @@ -326,8 +338,7 @@ impl<'a> LuaState<'a> { } pub fn isfunction(&self, idx: i32) -> bool { - let is_fn = unsafe { (self.api.isfunction)(self.l, idx) }; - is_fn > 0 + matches!(self.r#type(idx), LuaType::Function) } pub fn istable(&self, idx: i32) -> bool { @@ -420,6 +431,10 @@ impl<'a> LuaState<'a> { eyre::bail!("pcall failed: {}", err) } + pub fn error(&self) { + self.api.error(self.l) + } + pub fn next(&self, idx: i32) -> i32 { unsafe { (self.api.next)(self.l, idx) } } @@ -445,6 +460,8 @@ impl<'a> LuaState<'a> { unsafe { (self.api.lib_error)(self.l, s.as_ptr()) } } + /// Creates and returns a reference, in the table at index t, for the object at the top of + /// the stack (and pops the object). pub fn lib_ref(&self, t: i32) -> LuaRef { unsafe { (self.api.lib_ref)(self.l, t) } }