Skip to content

Commit dfbeca5

Browse files
authored
ruff server no longer hangs after shutdown (#11222)
## Summary Fixes #11207. The server would hang after handling a shutdown request on `IoThreads::join()` because a global sender (`MESSENGER`, used to send `window/showMessage` notifications) would remain allocated even after the event loop finished, which kept the writer I/O thread channel open. To fix this, I've made a few structural changes to `ruff server`. I've wrapped the send/receive channels and thread join handle behind a new struct, `Connection`, which facilitates message sending and receiving, and also runs `IoThreads::join()` after the event loop finishes. To control the number of sender channels, the `Connection` wraps the sender channel in an `Arc` and only allows the creation of a wrapper type, `ClientSender`, which hold a weak reference to this `Arc` instead of direct channel access. The wrapper type implements the channel methods directly to prevent access to the inner channel (which would allow the channel to be cloned). ClientSender's function is analogous to [`WeakSender` in `tokio`](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.WeakSender.html). Additionally, the receiver channel cannot be accessed directly - the `Connection` only exposes an iterator over it. These changes will guarantee that all channels are closed before the I/O threads are joined. ## Test Plan Repeatedly open and close an editor utilizing `ruff server` while observing the task monitor. The net total amount of open `ruff` instances should be zero once all editor windows have closed. The following logs should also appear after the server is shut down: <img width="835" alt="Screenshot 2024-04-30 at 3 56 22 PM" src="https://github.com/astral-sh/ruff/assets/19577865/404b74f5-ef08-4bb4-9fa2-72e72b946695"> This can be tested on VS Code by changing the settings and then checking `Output`.
1 parent 9e69cd6 commit dfbeca5

File tree

5 files changed

+189
-53
lines changed

5 files changed

+189
-53
lines changed

crates/ruff_server/src/message.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use crate::server::ClientSender;
66

77
static MESSENGER: OnceLock<ClientSender> = OnceLock::new();
88

9-
pub(crate) fn init_messenger(client_sender: &ClientSender) {
9+
pub(crate) fn init_messenger(client_sender: ClientSender) {
1010
MESSENGER
11-
.set(client_sender.clone())
11+
.set(client_sender)
1212
.expect("messenger should only be initialized once");
1313

1414
// unregister any previously registered panic hook

crates/ruff_server/src/server.rs

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::num::NonZeroUsize;
44

5-
use lsp::Connection;
65
use lsp_server as lsp;
76
use lsp_types as types;
87
use types::ClientCapabilities;
@@ -18,6 +17,8 @@ use types::TextDocumentSyncOptions;
1817
use types::WorkDoneProgressOptions;
1918
use types::WorkspaceFoldersServerCapabilities;
2019

20+
use self::connection::Connection;
21+
use self::connection::ConnectionInitializer;
2122
use self::schedule::event_loop_thread;
2223
use self::schedule::Scheduler;
2324
use self::schedule::Task;
@@ -28,34 +29,39 @@ use crate::PositionEncoding;
2829

2930
mod api;
3031
mod client;
32+
mod connection;
3133
mod schedule;
3234

33-
pub(crate) use client::ClientSender;
35+
pub(crate) use connection::ClientSender;
3436

3537
pub(crate) type Result<T> = std::result::Result<T, api::Error>;
3638

3739
pub struct Server {
38-
conn: lsp::Connection,
40+
connection: Connection,
3941
client_capabilities: ClientCapabilities,
40-
threads: lsp::IoThreads,
4142
worker_threads: NonZeroUsize,
4243
session: Session,
4344
}
4445

4546
impl Server {
4647
pub fn new(worker_threads: NonZeroUsize) -> crate::Result<Self> {
47-
let (conn, threads) = lsp::Connection::stdio();
48+
let connection = ConnectionInitializer::stdio();
4849

49-
crate::message::init_messenger(&conn.sender);
50-
51-
let (id, params) = conn.initialize_start()?;
52-
53-
let init_params: types::InitializeParams = serde_json::from_value(params)?;
50+
let (id, init_params) = connection.initialize_start()?;
5451

5552
let client_capabilities = init_params.capabilities;
5653
let position_encoding = Self::find_best_position_encoding(&client_capabilities);
5754
let server_capabilities = Self::server_capabilities(position_encoding);
5855

56+
let connection = connection.initialize_finish(
57+
id,
58+
&server_capabilities,
59+
crate::SERVER_NAME,
60+
crate::version(),
61+
)?;
62+
63+
crate::message::init_messenger(connection.make_sender());
64+
5965
let AllSettings {
6066
global_settings,
6167
mut workspace_settings,
@@ -86,19 +92,8 @@ impl Server {
8692
anyhow::anyhow!("Failed to get the current working directory while creating a default workspace.")
8793
})?;
8894

89-
let initialize_data = serde_json::json!({
90-
"capabilities": server_capabilities,
91-
"serverInfo": {
92-
"name": crate::SERVER_NAME,
93-
"version": crate::version()
94-
}
95-
});
96-
97-
conn.initialize_finish(id, initialize_data)?;
98-
9995
Ok(Self {
100-
conn,
101-
threads,
96+
connection,
10297
worker_threads,
10398
session: Session::new(
10499
&client_capabilities,
@@ -111,17 +106,20 @@ impl Server {
111106
}
112107

113108
pub fn run(self) -> crate::Result<()> {
114-
let result = event_loop_thread(move || {
109+
event_loop_thread(move || {
115110
Self::event_loop(
116-
&self.conn,
111+
&self.connection,
117112
&self.client_capabilities,
118113
self.session,
119114
self.worker_threads,
120-
)
115+
)?;
116+
self.connection.close()?;
117+
// Note: when we start routing tracing through the LSP,
118+
// this should be replaced with a log directly to `stderr`.
119+
tracing::info!("Server has shut down successfully");
120+
Ok(())
121121
})?
122-
.join();
123-
self.threads.join()?;
124-
result
122+
.join()
125123
}
126124

127125
#[allow(clippy::needless_pass_by_value)] // this is because we aren't using `next_request_id` yet.
@@ -132,22 +130,21 @@ impl Server {
132130
worker_threads: NonZeroUsize,
133131
) -> crate::Result<()> {
134132
let mut scheduler =
135-
schedule::Scheduler::new(&mut session, worker_threads, &connection.sender);
133+
schedule::Scheduler::new(&mut session, worker_threads, connection.make_sender());
136134

137135
Self::try_register_capabilities(client_capabilities, &mut scheduler);
138-
for msg in &connection.receiver {
136+
for msg in connection.incoming() {
137+
if connection.handle_shutdown(&msg)? {
138+
break;
139+
}
139140
let task = match msg {
140-
lsp::Message::Request(req) => {
141-
if connection.handle_shutdown(&req)? {
142-
return Ok(());
143-
}
144-
api::request(req)
145-
}
141+
lsp::Message::Request(req) => api::request(req),
146142
lsp::Message::Notification(notification) => api::notification(notification),
147143
lsp::Message::Response(response) => scheduler.response(response),
148144
};
149145
scheduler.dispatch(task);
150146
}
147+
151148
Ok(())
152149
}
153150

crates/ruff_server/src/server/client.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ use lsp_server::{Notification, RequestId};
44
use rustc_hash::FxHashMap;
55
use serde_json::Value;
66

7-
use super::schedule::Task;
8-
9-
pub(crate) type ClientSender = crossbeam::channel::Sender<lsp_server::Message>;
7+
use super::{schedule::Task, ClientSender};
108

119
type ResponseBuilder<'s> = Box<dyn FnOnce(lsp_server::Response) -> Task<'s>>;
1210

@@ -29,12 +27,12 @@ pub(crate) struct Requester<'s> {
2927
}
3028

3129
impl<'s> Client<'s> {
32-
pub(super) fn new(sender: &ClientSender) -> Self {
30+
pub(super) fn new(sender: ClientSender) -> Self {
3331
Self {
3432
notifier: Notifier(sender.clone()),
3533
responder: Responder(sender.clone()),
3634
requester: Requester {
37-
sender: sender.clone(),
35+
sender,
3836
next_request_id: 1,
3937
response_handlers: FxHashMap::default(),
4038
},
@@ -60,16 +58,15 @@ impl Notifier {
6058

6159
let message = lsp_server::Message::Notification(Notification::new(method, params));
6260

63-
Ok(self.0.send(message)?)
61+
self.0.send(message)
6462
}
6563

6664
pub(crate) fn notify_method(&self, method: String) -> crate::Result<()> {
67-
Ok(self
68-
.0
65+
self.0
6966
.send(lsp_server::Message::Notification(Notification::new(
7067
method,
7168
Value::Null,
72-
)))?)
69+
)))
7370
}
7471
}
7572

@@ -82,15 +79,15 @@ impl Responder {
8279
where
8380
R: serde::Serialize,
8481
{
85-
Ok(self.0.send(
82+
self.0.send(
8683
match result {
8784
Ok(res) => lsp_server::Response::new_ok(id, res),
8885
Err(crate::server::api::Error { code, error }) => {
8986
lsp_server::Response::new_err(id, code as i32, format!("{error}"))
9087
}
9188
}
9289
.into(),
93-
)?)
90+
)
9491
}
9592
}
9693

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
use lsp_server as lsp;
2+
use lsp_types::{notification::Notification, request::Request};
3+
use std::sync::{Arc, Weak};
4+
5+
type ConnectionSender = crossbeam::channel::Sender<lsp::Message>;
6+
type ConnectionReceiver = crossbeam::channel::Receiver<lsp::Message>;
7+
8+
/// A builder for `Connection` that handles LSP initialization.
9+
pub(crate) struct ConnectionInitializer {
10+
connection: lsp::Connection,
11+
threads: lsp::IoThreads,
12+
}
13+
14+
/// Handles inbound and outbound messages with the client.
15+
pub(crate) struct Connection {
16+
sender: Arc<ConnectionSender>,
17+
receiver: ConnectionReceiver,
18+
threads: lsp::IoThreads,
19+
}
20+
21+
impl ConnectionInitializer {
22+
/// Create a new LSP server connection over stdin/stdout.
23+
pub(super) fn stdio() -> Self {
24+
let (connection, threads) = lsp::Connection::stdio();
25+
Self {
26+
connection,
27+
threads,
28+
}
29+
}
30+
31+
/// Starts the initialization process with the client by listening for an initialization request.
32+
/// Returns a request ID that should be passed into `initialize_finish` later,
33+
/// along with the initialization parameters that were provided.
34+
pub(super) fn initialize_start(
35+
&self,
36+
) -> crate::Result<(lsp::RequestId, lsp_types::InitializeParams)> {
37+
let (id, params) = self.connection.initialize_start()?;
38+
Ok((id, serde_json::from_value(params)?))
39+
}
40+
41+
/// Finishes the initialization process with the client,
42+
/// returning an initialized `Connection`.
43+
pub(super) fn initialize_finish(
44+
self,
45+
id: lsp::RequestId,
46+
server_capabilities: &lsp_types::ServerCapabilities,
47+
name: &str,
48+
version: &str,
49+
) -> crate::Result<Connection> {
50+
self.connection.initialize_finish(
51+
id,
52+
serde_json::json!({
53+
"capabilities": server_capabilities,
54+
"serverInfo": {
55+
"name": name,
56+
"version": version
57+
}
58+
}),
59+
)?;
60+
let Self {
61+
connection: lsp::Connection { sender, receiver },
62+
threads,
63+
} = self;
64+
Ok(Connection {
65+
sender: Arc::new(sender),
66+
receiver,
67+
threads,
68+
})
69+
}
70+
}
71+
72+
impl Connection {
73+
/// Make a new `ClientSender` for sending messages to the client.
74+
pub(super) fn make_sender(&self) -> ClientSender {
75+
ClientSender {
76+
weak_sender: Arc::downgrade(&self.sender),
77+
}
78+
}
79+
80+
/// An iterator over incoming messages from the client.
81+
pub(super) fn incoming(&self) -> crossbeam::channel::Iter<lsp::Message> {
82+
self.receiver.iter()
83+
}
84+
85+
/// Check and respond to any incoming shutdown requests; returns`true` if the server should be shutdown.
86+
pub(super) fn handle_shutdown(&self, message: &lsp::Message) -> crate::Result<bool> {
87+
match message {
88+
lsp::Message::Request(lsp::Request { id, method, .. })
89+
if method == lsp_types::request::Shutdown::METHOD =>
90+
{
91+
self.sender
92+
.send(lsp::Response::new_ok(id.clone(), ()).into())?;
93+
tracing::info!("Shutdown request received. Waiting for an exit notification...");
94+
match self.receiver.recv_timeout(std::time::Duration::from_secs(30))? {
95+
lsp::Message::Notification(lsp::Notification { method, .. }) if method == lsp_types::notification::Exit::METHOD => {
96+
tracing::info!("Exit notification received. Server shutting down...");
97+
Ok(true)
98+
},
99+
message => anyhow::bail!("Server received unexpected message {message:?} while waiting for exit notification")
100+
}
101+
}
102+
lsp::Message::Notification(lsp::Notification { method, .. })
103+
if method == lsp_types::notification::Exit::METHOD =>
104+
{
105+
tracing::error!("Server received an exit notification before a shutdown request was sent. Exiting...");
106+
Ok(true)
107+
}
108+
_ => Ok(false),
109+
}
110+
}
111+
112+
/// Join the I/O threads that underpin this connection.
113+
/// This is guaranteed to be nearly immediate since
114+
/// we close the only active channels to these threads prior
115+
/// to joining them.
116+
pub(super) fn close(self) -> crate::Result<()> {
117+
std::mem::drop(
118+
Arc::into_inner(self.sender)
119+
.expect("the client sender shouldn't have more than one strong reference"),
120+
);
121+
std::mem::drop(self.receiver);
122+
self.threads.join()?;
123+
Ok(())
124+
}
125+
}
126+
127+
/// A weak reference to an underlying sender channel, used for communication with the client.
128+
/// If the `Connection` that created this `ClientSender` is dropped, any `send` calls will throw
129+
/// an error.
130+
#[derive(Clone, Debug)]
131+
pub(crate) struct ClientSender {
132+
weak_sender: Weak<ConnectionSender>,
133+
}
134+
135+
// note: additional wrapper functions for senders may be implemented as needed.
136+
impl ClientSender {
137+
pub(crate) fn send(&self, msg: lsp::Message) -> crate::Result<()> {
138+
let Some(sender) = self.weak_sender.upgrade() else {
139+
anyhow::bail!("The connection with the client has been closed");
140+
};
141+
142+
Ok(sender.send(msg)?)
143+
}
144+
}

crates/ruff_server/src/server/schedule.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use std::num::NonZeroUsize;
22

3-
use crossbeam::channel::Sender;
4-
53
use crate::session::Session;
64

75
mod task;
@@ -14,7 +12,7 @@ use self::{
1412
thread::ThreadPriority,
1513
};
1614

17-
use super::client::Client;
15+
use super::{client::Client, ClientSender};
1816

1917
/// The event loop thread is actually a secondary thread that we spawn from the
2018
/// _actual_ main thread. This secondary thread has a larger stack size
@@ -45,7 +43,7 @@ impl<'s> Scheduler<'s> {
4543
pub(super) fn new(
4644
session: &'s mut Session,
4745
worker_threads: NonZeroUsize,
48-
sender: &Sender<lsp_server::Message>,
46+
sender: ClientSender,
4947
) -> Self {
5048
const FMT_THREADS: usize = 1;
5149
Self {

0 commit comments

Comments
 (0)