This commit is contained in:
Lucas Schwiderski 2025-05-30 11:53:37 +02:00
parent 59ee7fa31e
commit 3b9d652b7a
Signed by: lucas
GPG key ID: AA12679AAA6DF4D8
5 changed files with 541 additions and 187 deletions

View file

@ -70,7 +70,9 @@ pub extern "C" fn setup_game(get_engine_api: GetApiFunction) {
}; };
plugin.setup_game(); plugin.setup_game();
PLUGIN.set(RwLock::new(plugin)); PLUGIN
.set(RwLock::new(plugin))
.expect("Failed to set global plugin instance");
} }
#[unsafe(no_mangle)] #[unsafe(no_mangle)]

View file

@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::CStr;
use std::panic::catch_unwind; use std::panic::catch_unwind;
use bstr::BStr; use bstr::BStr;
@ -8,6 +9,7 @@ use color_eyre::eyre;
use color_eyre::eyre::Context as _; use color_eyre::eyre::Context as _;
use crate::plugin::Identifier; use crate::plugin::Identifier;
use crate::plugin::ModCallbacks;
use crate::plugin::Plugin; use crate::plugin::Plugin;
use crate::stingray_sdk::LUA_REGISTRYINDEX; use crate::stingray_sdk::LUA_REGISTRYINDEX;
use crate::stingray_sdk::{LuaState, LuaType, lua_State}; use crate::stingray_sdk::{LuaState, LuaType, lua_State};
@ -15,25 +17,34 @@ use crate::{LUA, global, plugin_mut};
pub const NAMESPACE_SEPARATOR: char = '.'; pub const NAMESPACE_SEPARATOR: char = '.';
static RPC_CALLBACK_KEY: &str = "dt-p2p.rpc-callbacks";
// As a minor optimization, only `catch_unwind` in debug mode. // As a minor optimization, only `catch_unwind` in debug mode.
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
fn lua_wrapper<F>(l: *mut lua_State, f: F) -> i32 fn lua_wrapper<F>(l: *mut lua_State, f: F) -> i32
where where
F: FnOnce(&mut Plugin, &LuaState) -> Result<i32> + std::panic::UnwindSafe, F: FnOnce(&mut Plugin, &LuaState) -> Result<i32> + std::panic::UnwindSafe,
{ {
let lua = LuaState::new(l, global!(LUA)); let lua = global!(LUA);
// We need to drop as many things as possible before the `lua_error` call
// will longjmp and thereby ignoring Rust's cleanup.
{
let lua = LuaState::new(l, lua);
let res = catch_unwind(|| { let res = catch_unwind(|| {
let mut plugin = plugin_mut!(); let mut plugin = plugin_mut!();
f(&mut plugin, &lua) f(&mut plugin, &lua)
}); });
match res { match res {
Ok(Ok(i)) => i, Ok(Ok(i)) => {
Ok(Err(err)) => lua.lib_error(format!("{:?}", err)), return i;
Err(err) => lua.lib_error(format!("{:?}", err)),
} }
Ok(Err(err)) => lua.pushstring(format!("{:?}", err)),
Err(err) => lua.pushstring(format!("{:?}", err)),
}
}
lua.error(l);
0
} }
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
@ -41,14 +52,25 @@ fn lua_wrapper<F>(l: *mut lua_State, f: F) -> i32
where where
F: FnOnce(&mut Plugin, &LuaState) -> Result<i32>, F: FnOnce(&mut Plugin, &LuaState) -> Result<i32>,
{ {
let lua = LuaState::new(l, global!(LUA)); let lua = global!(LUA);
let plugin = plugin_mut!();
let res = f(&mut plugin, &lua); // We need to drop as many things as possible before the `lua_error` call
match res { // will longjmp and thereby ignoring Rust's cleanup.
Ok(i) => i, {
Err(err) => lua.lib_error(format!("{:?}", err)), let lua = LuaState::new(l, lua);
let mut plugin = plugin_mut!();
match f(&mut plugin, &lua) {
Ok(Ok(i)) => {
return i;
} }
Ok(Err(err)) => lua.pushstring(format!("{:?}", err)),
Err(err) => lua.pushstring(format!("{:?}", err)),
}
}
lua.error(l);
0
} }
/// Returns the namespace and name for a RPC. /// Returns the namespace and name for a RPC.
@ -72,6 +94,64 @@ fn get_identifier<'a>(lua: &'a LuaState) -> Result<Identifier<'a>> {
Ok(Identifier::new(namespace, name)) Ok(Identifier::new(namespace, name))
} }
// Called with `(mod: table, callbacks: table)`
pub extern "C" fn register_mod(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
if !lua.istable(1) {
eyre::bail!("bad argument #1, expected a mod object");
}
let name = {
lua.getfield(1, c"name");
if !lua.isstring(-1) {
eyre::bail!("bad argument #1, not a mod object");
}
lua.tostring(-1)
};
let callbacks = match lua.r#type(2) {
LuaType::Nil => ModCallbacks::default(),
LuaType::Table => {
let get_callback = |name: &CStr| {
lua.getfield(2, name);
match lua.r#type(-1) {
LuaType::Nil => Ok(None),
LuaType::Function => Ok(Some(lua.lib_ref(LUA_REGISTRYINDEX))),
x => {
eyre::bail!(
"bad argument #2, expected a function for field {}, got {}",
name.to_string_lossy(),
x
);
}
}
};
ModCallbacks {
on_session_joined: get_callback(c"on_session_joined")?,
on_session_left: get_callback(c"on_session_left")?,
on_user_joined: get_callback(c"on_user_joined")?,
on_user_left: get_callback(c"on_user_left")?,
}
}
x => {
eyre::bail!("bad argument #2, expected a table got {}", x);
}
};
plugin.register_mod(name, callbacks);
// TODO: Register unload and disable handlers?
// Will likely have to monkey-patch the mod object.
// DMF doesn't expose a good way to hook into it, and it wouldn't be feasible to expect a
// DMF update to allow it.
Ok(0)
})
}
// Called with `(mod: table, name: string, opts: table)` // Called with `(mod: table, name: string, opts: table)`
pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 { pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| { lua_wrapper(l, |plugin, lua| {
@ -99,7 +179,10 @@ pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 {
version version
} }
x => { x => {
eyre::bail!("invalid type {} for 'version' field", x) eyre::bail!(
"invalid type {} for 'version' field, must be number or a version string",
x
)
} }
}; };
@ -113,14 +196,14 @@ pub extern "C" fn create_rpc(l: *mut lua_State) -> i32 {
type LuaMap<'a> = HashMap<LuaKey<'a>, LuaValue<'a>>; type LuaMap<'a> = HashMap<LuaKey<'a>, LuaValue<'a>>;
#[derive(bincode::BorrowDecode, bincode::Encode, PartialEq)] #[derive(bincode::BorrowDecode, bincode::Encode, PartialEq)]
enum LuaKey<'a> { pub enum LuaKey<'a> {
String(&'a [u8]), String(&'a [u8]),
Number(f64), Number(f64),
} }
impl<'a> Eq for LuaKey<'a> {} impl Eq for LuaKey<'_> {}
impl<'a> std::hash::Hash for LuaKey<'a> { impl std::hash::Hash for LuaKey<'_> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) { fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self { match self {
LuaKey::String(s) => s.hash(state), LuaKey::String(s) => s.hash(state),
@ -141,7 +224,7 @@ pub enum LuaValue<'a> {
impl<'a> LuaValue<'a> { impl<'a> LuaValue<'a> {
fn parse_table(lua: &'a LuaState, idx: i32) -> Result<Self> { fn parse_table(lua: &'a LuaState, idx: i32) -> Result<Self> {
let mut values = LuaMap::default(); let mut values = LuaMap::default();
//
// `nil` as initial key tells `lua_next` to pick the first key. // `nil` as initial key tells `lua_next` to pick the first key.
lua.pushnil(); lua.pushnil();
@ -156,7 +239,7 @@ impl<'a> LuaValue<'a> {
t => eyre::bail!("Unsupported type {} for args table key", t), t => eyre::bail!("Unsupported type {} for args table key", t),
}; };
let value = Self::parse(lua, -1)?; let value = Self::get_from_stack(lua, -1)?;
values.insert(key, value); values.insert(key, value);
@ -170,7 +253,8 @@ impl<'a> LuaValue<'a> {
Ok(LuaValue::Table(values)) Ok(LuaValue::Table(values))
} }
pub fn parse(lua: &'a LuaState, idx: i32) -> Result<Self> { /// Gets the value from the stack at `idx`
pub fn get_from_stack(lua: &'a LuaState, idx: i32) -> Result<Self> {
match lua.r#type(idx) { match lua.r#type(idx) {
LuaType::None | LuaType::Nil => Ok(LuaValue::Nil), LuaType::None | LuaType::Nil => Ok(LuaValue::Nil),
LuaType::Boolean => Ok(LuaValue::Boolean(lua.toboolean(-1))), LuaType::Boolean => Ok(LuaValue::Boolean(lua.toboolean(-1))),
@ -181,7 +265,8 @@ impl<'a> LuaValue<'a> {
} }
} }
pub fn push(&self, lua: &LuaState) { /// Pushes the value onto a Lua stack
pub fn push_to_stack(&self, lua: &LuaState) {
match self { match self {
LuaValue::Nil => lua.pushnil(), LuaValue::Nil => lua.pushnil(),
LuaValue::Boolean(bool) => lua.pushboolean(*bool), LuaValue::Boolean(bool) => lua.pushboolean(*bool),
@ -206,7 +291,7 @@ impl<'a> LuaValue<'a> {
LuaKey::Number(num) => lua.pushnumber(*num), LuaKey::Number(num) => lua.pushnumber(*num),
} }
v.push(lua); v.push_to_stack(lua);
lua.settable(tbl_index); lua.settable(tbl_index);
} }
@ -261,73 +346,47 @@ pub extern "C" fn receive_rpc(l: *mut lua_State) -> i32 {
lua.pushvalue(3); lua.pushvalue(3);
let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX); let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX);
plugin.add_rpc_listener(id, fn_ref); plugin
.add_rpc_listener(id, fn_ref)
.wrap_err("Failed to add RPC listener")?;
Ok(0) Ok(0)
}) })
} }
// Called with `(mod: table, callback: Fn(peer_id))` // Called with `(server_name: string, peer_id: string, player_id: string)`
pub extern "C" fn on_peer_connected(l: *mut lua_State) -> i32 { pub extern "C" fn join_session(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| { lua_wrapper(l, |plugin, lua| {
if !lua.istable(1) { if !lua.isstring(1) {
eyre::bail!("bad argument #1, expected a mod object"); eyre::bail!("bad argument #1, expected a string");
} }
if !lua.isfunction(2) { if !lua.isstring(2) {
eyre::bail!("bad argument #2, expected a function"); eyre::bail!("bad argument #2, expected a string");
} }
lua.getfield(1, c"name"); if !lua.isstring(3) {
if !lua.isstring(-1) { eyre::bail!("bad argument #3, expected a string");
eyre::bail!("bad argument #1, not a mod object");
} }
let mod_name = lua.tostring(-1); let server_name = lua.tostring(1);
lua.pushvalue(3); let peer_id = lua.tostring(2);
let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX); let player_id = lua.tostring(3);
plugin.add_connect_listener(mod_name, fn_ref); plugin
.join_session(lua, server_name, peer_id, player_id)
.wrap_err("Failed to join session")?;
Ok(0) Ok(0)
}) })
} }
// Called with `(mod: table, callback: Fn(peer_id))` // Called with `()`
pub extern "C" fn on_peer_disconnected(l: *mut lua_State) -> i32 { pub extern "C" fn leave_session(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| { lua_wrapper(l, |plugin, lua| {
if !lua.istable(1) { plugin
eyre::bail!("bad argument #1, expected a mod object"); .leave_session(lua)
} .wrap_err("Failed to leave session")?;
if !lua.isfunction(2) {
eyre::bail!("bad argument #2, expected a function");
}
lua.getfield(1, c"name");
if !lua.isstring(-1) {
eyre::bail!("bad argument #1, expected a mod object");
}
let mod_name = lua.tostring(-1);
lua.pushvalue(3);
let fn_ref = lua.lib_ref(LUA_REGISTRYINDEX);
plugin.add_disconnect_listener(mod_name, fn_ref);
Ok(0)
})
}
// Called with `(dt: float)`
pub extern "C" fn update(l: *mut lua_State) -> i32 {
lua_wrapper(l, |plugin, lua| {
if !lua.isnumber(1) {
eyre::bail!("bad argument #1, expected a number");
}
let dt = lua.tonumber(1);
plugin.lua_update(dt, lua)?;
Ok(0) Ok(0)
}) })

View file

@ -1,25 +1,38 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::{DefaultHasher, Hash as _, Hasher as _}; use std::hash::{DefaultHasher, Hash, Hasher as _};
use std::mem;
use std::ops::Deref;
use std::thread; use std::thread;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use bstr::{BStr, BString, ByteSlice}; use bstr::{BStr, BString};
use color_eyre::Result;
use color_eyre::eyre;
use color_eyre::eyre::Context as _; use color_eyre::eyre::Context as _;
use color_eyre::{Result, eyre}; use libp2p::PeerId;
use libp2p::futures::StreamExt; use libp2p::futures::StreamExt;
use libp2p::gossipsub::{AllowAllSubscriptionFilter, Event, IdentityTransform, TopicHash}; use libp2p::gossipsub;
use libp2p::gossipsub::{
AllowAllSubscriptionFilter, Event, IdentTopic, IdentityTransform, Message, TopicHash,
};
use libp2p::noise;
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use libp2p::{gossipsub, noise, tcp, yamux}; use libp2p::tcp;
use libp2p::yamux;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{self, UnboundedSender};
use crate::lua;
use crate::lua::{LuaValue, NAMESPACE_SEPARATOR}; use crate::lua::{LuaValue, NAMESPACE_SEPARATOR};
use crate::rpc::{RPC, RPCMessage}; use crate::rpc::RPC;
use crate::stingray_sdk::{LUA_REGISTRYINDEX, LuaRef, LuaState}; use crate::stingray_sdk::{LUA_REGISTRYINDEX, LuaRef, LuaState};
use crate::{CHANNEL_BUFFER_SIZE, EVENTS_PER_TICK, lua}; use crate::{CHANNEL_BUFFER_SIZE, EVENTS_PER_TICK};
use crate::{LUA, MODULE_NAME, global}; use crate::{LUA, MODULE_NAME, global};
type ModName = BString;
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct Identifier<'a> { pub struct Identifier<'a> {
name: &'a BStr, name: &'a BStr,
@ -30,9 +43,11 @@ impl<'a> Identifier<'a> {
pub fn new(namespace: &'a BStr, name: &'a BStr) -> Self { pub fn new(namespace: &'a BStr, name: &'a BStr) -> Self {
Self { namespace, name } Self { namespace, name }
} }
}
pub(crate) fn to_topic(self) -> TopicHash { impl From<Identifier<'_>> for String {
TopicHash::from_raw(format!("{}", self)) fn from(value: Identifier<'_>) -> Self {
format!("{}", value)
} }
} }
@ -42,46 +57,109 @@ impl std::fmt::Display for Identifier<'_> {
} }
} }
#[derive(Debug, bincode::Encode, bincode::Decode)]
// To avoid having to implement `bincode` for `bstr` types, we use raw `Vec<u8>` here,
// which can be turned into `BString` cheaply.
pub(crate) enum SwarmMessage {
// To avoid extra cloning, or having to deal with lifetimes,
// this message type contains the Lua args already encoded.
Rpc(Vec<u8>),
SessionJoin {
/// The in-game player ID for this peer
player_id: Vec<u8>,
/// A list of mod names this peer has enabled
mods: Vec<Vec<u8>>,
},
SessionLeave,
}
#[derive(Debug)] #[derive(Debug)]
struct NamespacedMap<T> { pub(crate) enum SwarmTask {
map: HashMap<BString, HashMap<BString, T>>, Message { topic: Topic, msg: SwarmMessage },
Subscribe(Topic),
Unsubscribe(Vec<Topic>),
} }
impl<T> NamespacedMap<T> { #[derive(Clone, Debug)]
pub fn insert(&mut self, id: Identifier, val: T) { pub(crate) struct Topic(IdentTopic);
if !self.map.contains_key(id.namespace) {
self.map.insert(id.namespace.to_owned(), Default::default());
}
let map = self impl Eq for Topic {}
.map impl PartialEq for Topic {
.get_mut(id.namespace) fn eq(&self, other: &Self) -> bool {
.expect("entry is verified to exist"); self.0.hash() == other.0.hash()
map.insert(id.name.to_owned(), val);
}
pub fn get(&self, id: &Identifier) -> Option<&T> {
self.map.get(id.namespace).and_then(|map| map.get(id.name))
} }
} }
impl<T> Default for NamespacedMap<T> { impl Hash for Topic {
fn default() -> Self { fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
Self { self.0.hash().hash(state);
map: Default::default(),
}
} }
} }
enum Task {} impl std::fmt::Display for Topic {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::ops::Deref for Topic {
type Target = IdentTopic;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<Topic> for TopicHash {
fn from(value: Topic) -> Self {
Self::from(value.0)
}
}
#[derive(Debug)]
struct Player {
id: BString,
mods: Vec<ModName>,
}
#[derive(Default)]
pub(crate) struct ModCallbacks {
pub on_session_joined: Option<LuaRef>,
pub on_session_left: Option<LuaRef>,
pub on_user_joined: Option<LuaRef>,
pub on_user_left: Option<LuaRef>,
}
struct Session {
online: bool,
address: String,
peer_id: BString,
topics: HashMap<TopicHash, Topic>,
peers: HashMap<PeerId, Player>,
}
impl Session {
pub fn new_topic(&mut self, topic: impl Into<String>) -> &Topic {
self.topics
.entry(TopicHash::from_raw(topic.into()))
.or_insert_with_key(|topic| {
Topic(IdentTopic::new(format!("{}/{}", self.address, topic)))
})
}
pub fn get_topic(&self, topic: impl Into<String>) -> Option<&Topic> {
let hash = TopicHash::from_raw(topic.into());
self.topics.get(&hash)
}
}
pub(crate) struct Plugin { pub(crate) struct Plugin {
swarm_thread: JoinHandle<Result<()>>, swarm_thread: JoinHandle<Result<()>>,
event_rx: Receiver<SwarmEvent<Event>>, event_rx: Receiver<SwarmEvent<Event>>,
send_tx: UnboundedSender<RPCMessage>, send_tx: UnboundedSender<SwarmTask>,
rpcs: HashMap<TopicHash, RPC>, session: Session,
connect_listeners: HashMap<TopicHash, LuaRef>, rpcs: HashMap<Topic, RPC>,
disconnect_listeners: HashMap<TopicHash, LuaRef>, mods: HashMap<ModName, ModCallbacks>,
} }
impl Plugin { impl Plugin {
@ -138,7 +216,7 @@ impl Plugin {
let (event_tx, event_rx) = mpsc::channel::<SwarmEvent<Event>>(CHANNEL_BUFFER_SIZE); let (event_tx, event_rx) = mpsc::channel::<SwarmEvent<Event>>(CHANNEL_BUFFER_SIZE);
// Since the `Sender` will be on the sync side, and we want to avoid blocking, we make this // Since the `Sender` will be on the sync side, and we want to avoid blocking, we make this
// unbounded to limit the chance for it to actually block. // unbounded to limit the chance for it to actually block.
let (send_tx, rpc_rx) = mpsc::unbounded_channel::<RPCMessage>(); let (send_tx, send_rx) = mpsc::unbounded_channel::<SwarmTask>();
let swarm_thread = thread::Builder::new() let swarm_thread = thread::Builder::new()
.name("p2p-swarm".into()) .name("p2p-swarm".into())
@ -148,7 +226,7 @@ impl Plugin {
.wrap_err("Failed to create tokio runtime")?; .wrap_err("Failed to create tokio runtime")?;
let mut swarm = swarm; let mut swarm = swarm;
let mut rpc_rx = rpc_rx; let mut send_rx = send_rx;
rt.block_on(async move { rt.block_on(async move {
loop { loop {
@ -156,11 +234,32 @@ impl Plugin {
event = swarm.select_next_some() => { event = swarm.select_next_some() => {
event_tx.send(event).await.wrap_err("Failed to queue event")?; event_tx.send(event).await.wrap_err("Failed to queue event")?;
} }
msg = rpc_rx.recv() => { task = send_rx.recv() => {
let Some(msg) = msg else { let Some(task) = task else {
eyre::bail!("RPC queue closed prematurely"); eyre::bail!("Task queue closed prematurely");
}; };
swarm.behaviour_mut().publish(msg.topic, msg.data).wrap_err("Failed to send RPC")?;
let behavior = swarm.behaviour_mut();
match task{
SwarmTask::Message { topic, msg } => {
let data = bincode::encode_to_vec(msg, bincode::config::standard())
.wrap_err("Failed to encode swarm message")?;
behavior
.publish(topic, data)
.wrap_err("Failed to send message")?;
},
SwarmTask::Subscribe(topic) => {
behavior
.subscribe(&topic)
.wrap_err_with(|| format!("Failed to subscribe to topic {}", topic))?;
},
SwarmTask::Unsubscribe(topics) => {
for topic in topics {
behavior.unsubscribe(&topic);
}
}
}
} }
} }
} }
@ -168,51 +267,64 @@ impl Plugin {
}) })
.wrap_err("Failed to create p2p-swarm thread")?; .wrap_err("Failed to create p2p-swarm thread")?;
log::info!("Plugin initialized");
Ok(Self { Ok(Self {
swarm_thread, swarm_thread,
event_rx, event_rx,
send_tx, send_tx,
session: Session {
online: false,
address: Default::default(),
peer_id: Default::default(),
topics: Default::default(),
peers: Default::default(),
},
rpcs: Default::default(), rpcs: Default::default(),
connect_listeners: Default::default(), mods: Default::default(),
disconnect_listeners: Default::default(),
}) })
} }
pub fn setup_game(&self) { pub fn setup_game(&self) {
let lua = global!(LUA); let lua = global!(LUA);
lua.add_module_function(MODULE_NAME, c"update", lua::update); // Internal API
// TODO: Maybe useful to move these to a separate module, e.g. `P2PInternal`?
lua.add_module_function(MODULE_NAME, c"join_session", lua::join_session);
lua.add_module_function(MODULE_NAME, c"leave_session", lua::leave_session);
// User-facing API
lua.add_module_function(MODULE_NAME, c"register_mod", lua::register_mod);
lua.add_module_function(MODULE_NAME, c"create_rpc", lua::create_rpc); lua.add_module_function(MODULE_NAME, c"create_rpc", lua::create_rpc);
lua.add_module_function(MODULE_NAME, c"send_rpc", lua::send_rpc); lua.add_module_function(MODULE_NAME, c"send_rpc", lua::send_rpc);
lua.add_module_function(MODULE_NAME, c"receive_rpc", lua::receive_rpc); lua.add_module_function(MODULE_NAME, c"receive_rpc", lua::receive_rpc);
// TODO: Handle mod reloads log::debug!("Lua functions registered");
lua.add_module_function(MODULE_NAME, c"on_peer_connected", lua::on_peer_connected);
lua.add_module_function(
MODULE_NAME,
c"on_peer_disconnected",
lua::on_peer_disconnected,
);
log::info!("Plugin initialized");
} }
pub fn shutdown_game(&self) { pub fn shutdown_game(&self) {
// TODO: Find a way to send a close command, and make it blocking. // TODO: Find a way to send a close command, and make it blocking.
} }
// WARN: Due to how the instance of `Plugin` is currently stored and pub fn update_game(&mut self, dt: f32) {
// accessed by Lua functions, things will likely break if we attempt to get a `lua_State` here if let Err(err) = self.update(dt) {
// and trigger out own Lua functions. log::error!("{:?}", err);
pub fn update_game(&mut self, _dt: f32) {} // TODO: Exit application or find a better way to report the error
}
}
pub fn lua_update(&mut self, _dt: f64, lua: &LuaState) -> Result<()> { pub fn update(&mut self, _dt: f32) -> Result<()> {
if self.swarm_thread.is_finished() { if self.swarm_thread.is_finished() {
eyre::bail!("p2p-swarm thread terminated prematurely",); eyre::bail!("p2p-swarm thread terminated prematurely",);
// TODO: Move `self.swarm_thread` into a data structure that I can move it out of here, // TODO: Move `self.swarm_thread` into a data structure that I can move it out of here,
// so that I can call `.join` and get the error it ended with. // so that I can call `.join` and get the error it ended with.
} }
let lua = {
let lua = global!(LUA);
let l = lua.getscriptenvironmentstate();
LuaState::new(l, lua)
};
// Limit the amount of events we handle per tick to avoid potential large spikes in // Limit the amount of events we handle per tick to avoid potential large spikes in
// frame time. // frame time.
// TODO: Add a system to monitor the amount of incoming vs processed events, and add // TODO: Add a system to monitor the amount of incoming vs processed events, and add
@ -233,36 +345,127 @@ impl Plugin {
SwarmEvent::Behaviour(Event::Message { SwarmEvent::Behaviour(Event::Message {
propagation_source, propagation_source,
message_id, message_id,
message: msg, message:
Message {
topic,
data,
source,
sequence_number: _,
},
}) => { }) => {
log::debug!("Received message {message_id} from {propagation_source}"); log::debug!(
let Some(rpc) = self.rpcs.get(&msg.topic) else { "Received message {message_id} from {propagation_source} for topic {}",
log::error!("Receive message for unknown topic {}", msg.topic); topic
);
let Some(topic) = self.session.topics.get(&topic) else {
log::warn!("Received message for unknown topic {}", topic);
continue; continue;
}; };
let Ok((args, _)) = bincode::borrow_decode_from_slice::<LuaValue<'_>, _>( let Ok((msg, _)) = bincode::borrow_decode_from_slice::<SwarmMessage, _>(
&msg.data, &data,
bincode::config::standard(), bincode::config::standard(),
) else { ) else {
log::error!( log::error!(
"Failed to decode data for message {message_id} on topic {}", "Failed to decode data for message {message_id} on topic {}",
msg.topic topic
); );
continue; continue;
}; };
args.push(lua); match msg {
SwarmMessage::Rpc(rpc_args) => {
let Some(rpc) = self.rpcs.get(topic) else {
log::warn!("Topic {} does not have an RPC", topic);
continue;
};
let Ok((args, _)) = bincode::borrow_decode_from_slice::<LuaValue<'_>, _>(
&rpc_args,
bincode::config::standard(),
) else {
log::error!(
"Failed to decode data for message {message_id} on topic {}",
topic
);
continue;
};
args.push_to_stack(&lua);
let args_index = lua.gettop(); let args_index = lua.gettop();
for fn_ref in rpc.listeners() { for fn_ref in rpc.listeners() {
lua.rawgeti(LUA_REGISTRYINDEX, *fn_ref); lua.rawgeti(LUA_REGISTRYINDEX, *fn_ref);
lua.pushvalue(args_index); lua.pushvalue(args_index);
lua.pcall(1, 0).wrap_err_with(|| { lua.pcall(1, 0).wrap_err_with(|| {
format!("Failed to call listener for RPC '{}'", msg.topic) format!("Failed to call listener for RPC '{}'", topic)
})?; })?;
} }
} }
SwarmMessage::SessionJoin { player_id, mods } => match source {
Some(peer_id) => {
let id = BString::new(player_id);
let mods = mods.into_iter().map(ModName::new).collect::<Vec<_>>();
let peer = Player { id, mods };
// TODO: Check if a corresponding player is in the lobby
{
let callbacks = self
.mods
.iter()
.filter(|(name, _)| peer.mods.contains(name))
.filter_map(|(_, cbs)| cbs.on_user_joined);
lua.pushstring(peer.id.clone());
let arg_index = lua.gettop();
for fn_ref in callbacks {
lua.pushnumber(fn_ref as f64);
lua.gettable(LUA_REGISTRYINDEX);
lua.pushvalue(arg_index);
lua.pcall(1, 0)
.wrap_err("on_user_joined handler failed")?;
}
}
self.session.peers.insert(peer_id, peer);
}
None => {
log::warn!(
"Got SessionJoin event without a source peer_id. Don't know how to handle."
);
}
},
SwarmMessage::SessionLeave => match source {
Some(peer_id) => {
if let Some(peer) = self.session.peers.remove(&peer_id) {
let callbacks = self
.mods
.iter()
.filter(|(name, _)| peer.mods.contains(name))
.filter_map(|(_, cbs)| cbs.on_user_left);
lua.pushstring(peer.id);
let arg_index = lua.gettop();
for fn_ref in callbacks {
lua.pushnumber(fn_ref as f64);
lua.gettable(LUA_REGISTRYINDEX);
lua.pushvalue(arg_index);
lua.pcall(1, 0).wrap_err("on_user_left handler failed")?;
}
}
}
None => {
log::warn!(
"Got SessionLeave event without a source peer_id. Don't know how to handle."
);
}
},
}
}
SwarmEvent::Behaviour(Event::GossipsubNotSupported { peer_id }) => { SwarmEvent::Behaviour(Event::GossipsubNotSupported { peer_id }) => {
log::warn!( log::warn!(
"Peer {peer_id} does not support Gossipsub. Are they really a dt-p2p user?" "Peer {peer_id} does not support Gossipsub. Are they really a dt-p2p user?"
@ -283,8 +486,9 @@ impl Plugin {
established_in.as_millis(), established_in.as_millis(),
num_established num_established
); );
// TODO: Establish mods shared with this peer // TODO: Start a timeout to wait for a `SwarmMessage::SessionJoin` to show a
// TODO: Call `on_peer_connected` listeners for the corresponding mods // warning when a client connected but didn't join the session.
// Not sure if that should ever happen.
} }
SwarmEvent::ConnectionClosed { SwarmEvent::ConnectionClosed {
peer_id, peer_id,
@ -300,10 +504,13 @@ impl Plugin {
cause, cause,
num_established num_established
); );
// TODO: Call `on_peer_disconnected` listeners for active mods if self.session.peers.contains_key(&peer_id) {
// TODO: Re-establish active mods log::warn!("Peer dropped connection without properly leaving session!");
// With a peer dropping out, the amount of active mods may change if there is // TODO: Start a timeout and if the peer doesn't come back, remove the peer
// now only a single peer left for a mod. // and trigger "user left" callbacks.
// TODO: Maybe also check if a corresponding player is still in the game
// lobby.
}
} }
SwarmEvent::IncomingConnectionError { SwarmEvent::IncomingConnectionError {
connection_id, connection_id,
@ -337,20 +544,34 @@ impl Plugin {
Ok(()) Ok(())
} }
pub(crate) fn register_mod(&mut self, name: &BStr, callbacks: ModCallbacks) {
self.mods.insert(name.to_owned(), callbacks);
}
pub(crate) fn create_rpc(&mut self, id: Identifier) { pub(crate) fn create_rpc(&mut self, id: Identifier) {
let rpc = RPC::new(); let rpc = RPC::new();
self.rpcs.insert(id.to_topic(), rpc); let topic = self.session.new_topic(id);
self.rpcs.insert(topic.clone(), rpc);
} }
pub(crate) fn send_rpc(&self, id: Identifier, args: LuaValue<'_>) -> Result<()> { pub(crate) fn send_rpc(&self, id: Identifier, args: LuaValue<'_>) -> Result<()> {
if !self.rpcs.contains_key(&id.to_topic()) { let Some(topic) = self.session.get_topic(id) else {
eyre::bail!("Topic for this RPC does not exist");
};
if !self.rpcs.contains_key(topic) {
eyre::bail!("RPC '{}' does not exist", id); eyre::bail!("RPC '{}' does not exist", id);
} }
let args = bincode::encode_to_vec(args, bincode::config::standard()) let args = bincode::encode_to_vec(args, bincode::config::standard())
.wrap_err("Failed to encode RPC args")?; .wrap_err("Failed to encode RPC args")?;
let msg = RPCMessage::new(id, args); // TODO: Add metrics/profiling for size of the encoded args
let msg = SwarmTask::Message {
topic: topic.clone(),
msg: SwarmMessage::Rpc(args),
};
self.send_tx self.send_tx
.send(msg) .send(msg)
.wrap_err("Failed to queue RPC call")?; .wrap_err("Failed to queue RPC call")?;
@ -358,24 +579,92 @@ impl Plugin {
Ok(()) Ok(())
} }
pub(crate) fn add_rpc_listener(&mut self, id: Identifier, fn_ref: i32) -> Result<()> { pub(crate) fn add_rpc_listener(&mut self, id: Identifier, fn_ref: LuaRef) -> Result<()> {
let Some(rpc) = self.rpcs.get_mut(&id.to_topic()) else { let topic = self.session.new_topic(id);
let Some(rpc) = self.rpcs.get_mut(topic) else {
eyre::bail!("RPC '{}' does not exist", id); eyre::bail!("RPC '{}' does not exist", id);
}; };
rpc.add_listener(fn_ref); rpc.add_listener(fn_ref);
self.send_tx
.send(SwarmTask::Subscribe(topic.clone()))
.wrap_err("Failed to subscribe to RPC topic")?;
Ok(()) Ok(())
} }
pub(crate) fn add_connect_listener(&mut self, mod_name: &BStr, fn_ref: i32) { pub(crate) fn join_session(
let key = TopicHash::from_raw(mod_name.to_str_lossy()); &mut self,
self.connect_listeners.insert(key, fn_ref); lua: &LuaState,
server_name: &BStr,
peer_id: &BStr,
player_id: &BStr,
) -> Result<()> {
self.session.peer_id = peer_id.to_owned();
self.session.address = format!("/{:08X}", hash(server_name));
let session_topic = self.session.new_topic("session").clone();
self.send_tx
.send(SwarmTask::Subscribe(session_topic.clone()))
.wrap_err("Failed to subscribe to session topic")?;
// TODO: Is there a way to do this without the intermediate `Vec`?
// Probably not, since we cannot keep an immutable reference on `self.rpcs`, while
// also trying to get a mutable reference on `self` for `self.subscribe`.
let topics = self.rpcs.keys().cloned().collect::<Vec<_>>();
for topic in topics {
self.send_tx
.send(SwarmTask::Subscribe(topic.clone()))
.wrap_err_with(|| format!("Failed to subscribe to {}", topic))?;
} }
pub(crate) fn add_disconnect_listener(&mut self, mod_name: &BStr, fn_ref: i32) { self.session.online = true;
let key = TopicHash::from_raw(mod_name.to_str_lossy());
self.disconnect_listeners.insert(key, fn_ref); let msg = SwarmTask::Message {
topic: session_topic,
msg: SwarmMessage::SessionJoin {
player_id: player_id.to_vec(),
mods: self.mods.keys().map(|name| name.to_vec()).collect(),
},
};
self.send_tx
.send(msg)
.wrap_err("Failed to queue RPC call")?;
// TODO: Do I want to wait for a reply or some sort of confirmation?
// Currently, we just assume this works always.
for fn_ref in self.mods.values().filter_map(|cbs| cbs.on_session_joined) {
lua.pushnumber(fn_ref as f64);
lua.gettable(LUA_REGISTRYINDEX);
lua.pcall(0, 0)
.wrap_err("on_session_joined handler failed")?;
}
Ok(())
}
pub(crate) fn leave_session(&mut self, lua: &LuaState) -> Result<()> {
if !self.session.online {
log::warn!("There is no session to leave");
return Ok(());
}
let topics = mem::take(&mut self.session.topics);
self.send_tx
.send(SwarmTask::Unsubscribe(topics.into_values().collect()))
.wrap_err("Failed to queue unsubscribe task")?;
self.session.online = false;
for fn_ref in self.mods.values().filter_map(|cbs| cbs.on_session_left) {
lua.pushnumber(fn_ref as f64);
lua.gettable(LUA_REGISTRYINDEX);
lua.pcall(0, 0).wrap_err("on_session_left handler failed")?;
}
Ok(())
} }
} }
@ -384,3 +673,9 @@ impl std::fmt::Debug for Plugin {
f.write_str("PluginApi") f.write_str("PluginApi")
} }
} }
fn hash(val: impl AsRef<[u8]>) -> u64 {
let mut hasher = DefaultHasher::new();
hasher.write(val.as_ref());
hasher.finish()
}

View file

@ -1,8 +1,5 @@
use std::collections::HashSet; use std::collections::HashSet;
use libp2p::gossipsub::TopicHash;
use crate::plugin::Identifier;
use crate::stingray_sdk::LuaRef; use crate::stingray_sdk::LuaRef;
#[derive(Debug)] #[derive(Debug)]
@ -25,19 +22,3 @@ impl RPC {
self.listeners.iter() self.listeners.iter()
} }
} }
#[derive(Debug)]
pub(crate) struct RPCMessage {
pub topic: TopicHash,
pub data: Vec<u8>,
}
impl RPCMessage {
pub(crate) fn new(id: Identifier, data: impl Into<Vec<u8>>) -> Self {
let topic = id.to_topic();
Self {
topic,
data: data.into(),
}
}
}

View file

@ -208,6 +208,8 @@ pub type LuaRef = i32;
pub struct LuaApi { pub struct LuaApi {
add_module_function: unsafe extern "C" fn(*const c_char, *const c_char, lua_CFunction), add_module_function: unsafe extern "C" fn(*const c_char, *const c_char, lua_CFunction),
createtable: unsafe extern "C" fn(*mut lua_State, i32, i32), createtable: unsafe extern "C" fn(*mut lua_State, i32, i32),
error: unsafe extern "C" fn(*mut lua_State) -> i32,
getscriptenvironmentstate: unsafe extern "C" fn() -> *mut lua_State,
getfield: unsafe extern "C" fn(*mut lua_State, i32, *const c_char), getfield: unsafe extern "C" fn(*mut lua_State, i32, *const c_char),
gettable: unsafe extern "C" fn(*mut lua_State, i32), gettable: unsafe extern "C" fn(*mut lua_State, i32),
gettop: unsafe extern "C" fn(*mut lua_State) -> i32, gettop: unsafe extern "C" fn(*mut lua_State) -> i32,
@ -248,6 +250,8 @@ impl LuaApi {
Self { Self {
add_module_function: (*api).add_module_function.unwrap_unchecked(), add_module_function: (*api).add_module_function.unwrap_unchecked(),
createtable: (*api).createtable.unwrap_unchecked(), createtable: (*api).createtable.unwrap_unchecked(),
error: (*api).error.unwrap_unchecked(),
getscriptenvironmentstate: (*api).getscriptenvironmentstate.unwrap_unchecked(),
getfield: (*api).getfield.unwrap_unchecked(), getfield: (*api).getfield.unwrap_unchecked(),
gettable: (*api).gettable.unwrap_unchecked(), gettable: (*api).gettable.unwrap_unchecked(),
gettop: (*api).gettop.unwrap_unchecked(), gettop: (*api).gettop.unwrap_unchecked(),
@ -289,6 +293,14 @@ impl LuaApi {
(self.add_module_function)(module.as_ref().as_ptr(), name.as_ref().as_ptr(), Some(cb)) (self.add_module_function)(module.as_ref().as_ptr(), name.as_ref().as_ptr(), Some(cb))
} }
} }
pub fn getscriptenvironmentstate(&self) -> *mut lua_State {
unsafe { (self.getscriptenvironmentstate)() }
}
pub fn error(&self, l: *mut lua_State) {
unsafe { (self.error)(l) };
}
} }
/// A wrapper that combines a lua_State and a LuaApi to make calling functions more convenient. /// A wrapper that combines a lua_State and a LuaApi to make calling functions more convenient.
@ -326,8 +338,7 @@ impl<'a> LuaState<'a> {
} }
pub fn isfunction(&self, idx: i32) -> bool { pub fn isfunction(&self, idx: i32) -> bool {
let is_fn = unsafe { (self.api.isfunction)(self.l, idx) }; matches!(self.r#type(idx), LuaType::Function)
is_fn > 0
} }
pub fn istable(&self, idx: i32) -> bool { pub fn istable(&self, idx: i32) -> bool {
@ -420,6 +431,10 @@ impl<'a> LuaState<'a> {
eyre::bail!("pcall failed: {}", err) eyre::bail!("pcall failed: {}", err)
} }
pub fn error(&self) {
self.api.error(self.l)
}
pub fn next(&self, idx: i32) -> i32 { pub fn next(&self, idx: i32) -> i32 {
unsafe { (self.api.next)(self.l, idx) } unsafe { (self.api.next)(self.l, idx) }
} }
@ -445,6 +460,8 @@ impl<'a> LuaState<'a> {
unsafe { (self.api.lib_error)(self.l, s.as_ptr()) } unsafe { (self.api.lib_error)(self.l, s.as_ptr()) }
} }
/// Creates and returns a reference, in the table at index t, for the object at the top of
/// the stack (and pops the object).
pub fn lib_ref(&self, t: i32) -> LuaRef { pub fn lib_ref(&self, t: i32) -> LuaRef {
unsafe { (self.api.lib_ref)(self.l, t) } unsafe { (self.api.lib_ref)(self.l, t) }
} }