I have been working on a p2p game using the bevy engine and the steamworks-rs bindings.
Connecting to the steam servers and joining lobbies works just fine, but I can't seem to get networked messages working. I am of course using two different computers and steam accounts for testing.
I am pretty sure that in theory, user A can send a message to user B that is in the same lobby, and then user B gets a SteamNetworkingMessagesSessionRequest_t
callback that they have to accept through a AcceptSessionWithUser
call. That should handle the handshake and then messages can be sent just fine between the two.
But in my case, whenever user A sends a message, user B receives a session request callback and sucessfully accepts it, but instead of initiating the connection, user A gets a SessionFailed callback with error 1001
From the steamworks docs
// 1xxx: Application ended the connection in a "usual" manner.
// E.g.: user intentionally disconnected from the server,
// gameplay ended normally, etc
I am not sure what could be causing this.
Edit: Newer developments, it seems that if I spam messages long enough between the two, at some point I get a sea of "Assertion Failed, Duplicate P2P connection" errors, and then messages work just fine.
So i'm still very much confused and looking for help.
Here is a minimum reproduction of my problem: https://github.com/Sigma-dev/min_reprod You will need two steam clients to run it, then press C to create a lobby on one of them, then join the lobby with the steam overlay on the other. Then press T to try to send messages (which will fail).
use bevy::prelude::*;
use bevy_steamworks::{Client, LobbyId, LobbyType, SteamworksEvent, SteamworksPlugin};
use flume::{Receiver, Sender};
use steamworks::networking_types::{NetworkingIdentity, SendFlags};
#[derive(Resource)]
pub struct NetworkClient {
lobby_id: Option<LobbyId>,
tx: Sender<LobbyId>,
rx: Receiver<LobbyId>
}
fn main() {
App::new()
.add_plugins(SteamworksPlugin::init_app(480).unwrap())
.add_plugins(DefaultPlugins)
.add_systems(Startup, setup)
.add_systems(Update, (update, receive))
.run();
}
fn setup(
steam_client: Res<Client>,
mut commands: Commands
) {
println!("Connected: {}", steam_client.user().steam_id().raw());
steam_client.networking_utils().init_relay_network_access();
steam_client.networking_messages().session_request_callback(
|res| {
match res.accept() {
true => println!("Succesfully accepted"),
false => println!("Failed to accept"),
}
}
);
steam_client.networking_messages().session_failed_callback(
|res| {
println!("Session Failed: {:?}", res.end_reason().unwrap());
}
);
let (tx, rx) = flume::unbounded();
commands.insert_resource(NetworkClient {
lobby_id: None,
tx,
rx,
});
}
fn update(
client: Res<NetworkClient>,
steam_client: Res<Client>,
keys: Res<ButtonInput<KeyCode>>,
) {
if keys.just_pressed(KeyCode::KeyC) {
let tx: Sender<LobbyId> = client.tx.clone();
steam_client.matchmaking().create_lobby(LobbyType::Public, 2,
move |res| {
if let Ok(lobby_id) = res {
tx.send(lobby_id);
}
});
}
else if keys.just_pressed(KeyCode::KeyT) {
let Some(lobby_id) = client.lobby_id else {return;};
for player in steam_client.matchmaking().lobby_members(lobby_id) {
if player == steam_client.user().steam_id() {
continue;
}
let res = steam_client.networking_messages().send_message_to_user( NetworkingIdentity::new_steam_id(player), SendFlags::RELIABLE, &[], 0);
match res {
Ok(_) => println!("Message sent succesfully"),
Err(err) => println!("Message error: {}", err.to_string()),
}
}
}
}
fn receive(
mut client: ResMut<NetworkClient>,
steam_client: Res<Client>,
mut evs: EventReader<SteamworksEvent>,
) {
let rx: Receiver<LobbyId> = client.rx.clone();
if let Ok(lobby_id) = rx.try_recv() {
client.lobby_id = Some(lobby_id);
println!("Joined Lobby: {}", lobby_id.raw());
}
let messages: Vec<steamworks::networking_types::NetworkingMessage<steamworks::ClientManager>> = steam_client.networking_messages().receive_messages_on_channel(0, 1);
for message in messages {
println!("Received message");
drop(message); //not sure about usefullness, mentionned in steam docs as release
}
for ev in evs.read() {
//println!("EV");
match ev {
SteamworksEvent::GameLobbyJoinRequested(info) => {
println!("Trying to join: {}", info.lobby_steam_id.raw());
let tx = client.tx.clone();
steam_client.matchmaking().join_lobby(info.lobby_steam_id,
move |res| {
if let Ok(lobby_id) = res {
match tx.send(lobby_id) {
Ok(_) => {}
Err(_) => {
}
}
}
}); },
_ => {}
}
}
}