Skip to content

Commit 5928a7b

Browse files
authored
de_net: Support custom task spawner (#561)
This makes the crate compatible with Bevy, needed because Bevy uses custom task executor.
1 parent b92b944 commit 5928a7b

File tree

3 files changed

+42
-17
lines changed

3 files changed

+42
-17
lines changed

crates/connector/src/game/mod.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ mod state;
2121
/// automatically added to the game as if they sent [`de_net::ToGame::Join`].
2222
pub(crate) async fn startup(net: Network, owner: SocketAddr) {
2323
let port = net.port();
24-
let (outputs, inputs, errors) = de_net::startup(net);
24+
let (outputs, inputs, errors) = de_net::startup(
25+
|t| {
26+
task::spawn(t);
27+
},
28+
net,
29+
);
2530

2631
let (server_sender, server_receiver) = bounded(16);
2732
task::spawn(ereceiver::run(port, errors, server_sender.clone()));

crates/connector/src/server.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::net::SocketAddr;
22

33
use anyhow::Context;
4+
use async_std::task;
45
use de_net::{
56
self, FromServer, MessageDecoder, MessageReceiver, MessageSender, Network, OutMessage, Peers,
67
ToServer,
@@ -19,7 +20,12 @@ pub(crate) struct MainServer {
1920
impl MainServer {
2021
/// Setup the server & startup its network stack.
2122
pub(crate) fn start(net: Network) -> Self {
22-
let (outputs, inputs, _) = de_net::startup(net);
23+
let (outputs, inputs, _) = de_net::startup(
24+
|t| {
25+
task::spawn(t);
26+
},
27+
net,
28+
);
2329
Self { outputs, inputs }
2430
}
2531

crates/net/src/tasks/mod.rs

+29-15
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,13 @@
5555
//! data. The user communicates with these via [`MessageSender`] and
5656
//! [`MessageReceiver`] respectively.
5757
58-
use async_std::{channel::bounded, task};
58+
use async_std::channel::bounded;
5959
pub use communicator::{
6060
ConnErrorReceiver, ConnectionError, InMessage, MessageDecoder, MessageReceiver, MessageSender,
6161
OutMessage, OutMessageBuilder,
6262
};
6363
pub(crate) use dsender::OutDatagram;
64+
use futures::future::BoxFuture;
6465
use tracing::info;
6566

6667
use crate::{
@@ -89,69 +90,82 @@ const CHANNEL_CAPACITY: usize = 1024;
8990
/// closed. Once the [`MessageSender`], [`MessageReceiver`], and
9091
/// [`ConnErrorReceiver`] are all dropped, the networking stack will terminate
9192
/// completely.
92-
pub fn startup(network: Network) -> (MessageSender, MessageReceiver, ConnErrorReceiver) {
93+
///
94+
/// # Arguments
95+
///
96+
/// * `spawn` - async task spawner.
97+
///
98+
/// * `network` - network communication will happen over this socket.
99+
pub fn startup<S>(spawn: S, network: Network) -> (MessageSender, MessageReceiver, ConnErrorReceiver)
100+
where
101+
S: Fn(BoxFuture<'static, ()>),
102+
{
93103
let port = network.port();
94104
info!("Starting up network stack on port {port}...");
95105

96106
let messages = Messages::new(network);
97107

98108
let (out_datagrams_sender, out_datagrams_receiver) = bounded(16);
99-
task::spawn(dsender::run(port, out_datagrams_receiver, messages.clone()));
109+
spawn(Box::pin(dsender::run(
110+
port,
111+
out_datagrams_receiver,
112+
messages.clone(),
113+
)));
100114

101115
let (in_system_datagrams_sender, in_system_datagrams_receiver) = bounded(16);
102116
let (in_user_datagrams_sender, in_user_datagrams_receiver) = bounded(16);
103-
task::spawn(dreceiver::run(
117+
spawn(Box::pin(dreceiver::run(
104118
port,
105119
in_system_datagrams_sender,
106120
in_user_datagrams_sender,
107121
messages,
108-
));
122+
)));
109123

110124
let resends = Resends::new();
111125
let (sreceiver_cancellation_sender, sreceiver_cancellation_receiver) = cancellation();
112-
task::spawn(sreceiver::run(
126+
spawn(Box::pin(sreceiver::run(
113127
port,
114128
sreceiver_cancellation_receiver,
115129
in_system_datagrams_receiver,
116130
resends.clone(),
117-
));
131+
)));
118132

119133
let (inputs_sender, inputs_receiver) = bounded(CHANNEL_CAPACITY);
120134
let (confirmer_cancellation_sender, confirmer_cancellation_receiver) = cancellation();
121135
let confirms = Confirmations::new();
122-
task::spawn(ureceiver::run(
136+
spawn(Box::pin(ureceiver::run(
123137
port,
124138
confirmer_cancellation_sender,
125139
in_user_datagrams_receiver,
126140
inputs_sender,
127141
confirms.clone(),
128-
));
142+
)));
129143

130144
let (outputs_sender, outputs_receiver) = bounded(CHANNEL_CAPACITY);
131145
let (errors_sender, errors_receiver) = bounded(CHANNEL_CAPACITY);
132146
let (resender_cancellation_sender, resender_cancellation_receiver) = cancellation();
133-
task::spawn(resender::run(
147+
spawn(Box::pin(resender::run(
134148
port,
135149
resender_cancellation_receiver,
136150
sreceiver_cancellation_sender,
137151
out_datagrams_sender.clone(),
138152
errors_sender,
139153
resends.clone(),
140-
));
154+
)));
141155

142-
task::spawn(confirmer::run(
156+
spawn(Box::pin(confirmer::run(
143157
port,
144158
confirmer_cancellation_receiver,
145159
out_datagrams_sender.clone(),
146160
confirms,
147-
));
148-
task::spawn(usender::run(
161+
)));
162+
spawn(Box::pin(usender::run(
149163
port,
150164
resender_cancellation_sender,
151165
out_datagrams_sender,
152166
outputs_receiver,
153167
resends,
154-
));
168+
)));
155169

156170
(
157171
MessageSender(outputs_sender),

0 commit comments

Comments
 (0)