Skip to content

Commit 54d5ee0

Browse files
feat: implement connection state recovery
Connection state recovery allows a client to reconnect after a temporary disconnection and restore its state: - id - rooms - data - missed packets Usage: ```js import { Server } from "socket.io"; const io = new Server({ connectionStateRecovery: { // default values maxDisconnectionDuration: 2 * 60 * 1000, skipMiddlewares: true, }, }); io.on("connection", (socket) => { console.log(socket.recovered); // whether the state was recovered or not }); ``` Here's how it works: - the server sends a session ID during the handshake (which is different from the current `id` attribute, which is public and can be freely shared) - the server also includes an offset in each packet (added at the end of the data array, for backward compatibility) - upon temporary disconnection, the server stores the client state for a given delay (implemented at the adapter level) - upon reconnection, the client sends both the session ID and the last offset it has processed, and the server tries to restore the state A few notes: - the base adapter exposes two additional methods, persistSession() and restoreSession(), that must be implemented by the other adapters in order to allow the feature to work within a cluster See: socketio/socket.io-adapter@f529412 - acknowledgements are not affected, because it won't work if the client reconnects on another server (as the ack id is local) - any disconnection that lasts longer than the `maxDisconnectionDuration` value will result in a new session, so users will still need to care for the state reconciliation between the server and the client Related: #4510
1 parent da2b542 commit 54d5ee0

File tree

8 files changed

+426
-81
lines changed

8 files changed

+426
-81
lines changed

lib/client.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ export class Client<
114114
* @param {Object} auth - the auth parameters
115115
* @private
116116
*/
117-
private connect(name: string, auth: object = {}): void {
117+
private connect(name: string, auth: Record<string, unknown> = {}): void {
118118
if (this.server._nsps.has(name)) {
119119
debug("connecting to namespace %s", name);
120120
return this.doConnect(name, auth);
@@ -152,10 +152,10 @@ export class Client<
152152
*
153153
* @private
154154
*/
155-
private doConnect(name: string, auth: object): void {
155+
private doConnect(name: string, auth: Record<string, unknown>): void {
156156
const nsp = this.server.of(name);
157157

158-
const socket = nsp._add(this, auth, () => {
158+
nsp._add(this, auth, (socket) => {
159159
this.sockets.set(socket.id, socket);
160160
this.nsps.set(nsp.name, socket);
161161

@@ -228,7 +228,7 @@ export class Client<
228228
}
229229

230230
private writeToEngine(
231-
encodedPackets: Array<String | Buffer>,
231+
encodedPackets: Array<string | Buffer>,
232232
opts: WriteOptions
233233
): void {
234234
if (opts.volatile && !this.conn.transport.writable) {
@@ -267,7 +267,7 @@ export class Client<
267267
*/
268268
private ondecoded(packet: Packet): void {
269269
let namespace: string;
270-
let authPayload;
270+
let authPayload: Record<string, unknown>;
271271
if (this.conn.protocol === 3) {
272272
const parsed = url.parse(packet.nsp, true);
273273
namespace = parsed.pathname!;

lib/index.ts

+39-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@ import { Client } from "./client";
1717
import { EventEmitter } from "events";
1818
import { ExtendedError, Namespace, ServerReservedEventsMap } from "./namespace";
1919
import { ParentNamespace } from "./parent-namespace";
20-
import { Adapter, Room, SocketId } from "socket.io-adapter";
20+
import {
21+
Adapter,
22+
SessionAwareAdapter,
23+
Room,
24+
SocketId,
25+
} from "socket.io-adapter";
2126
import * as parser from "socket.io-parser";
2227
import type { Encoder } from "socket.io-parser";
2328
import debugModule from "debug";
@@ -72,6 +77,25 @@ interface ServerOptions extends EngineOptions, AttachOptions {
7277
* @default 45000
7378
*/
7479
connectTimeout: number;
80+
/**
81+
* Whether to enable the recovery of connection state when a client temporarily disconnects.
82+
*
83+
* The connection state includes the missed packets, the rooms the socket was in and the `data` attribute.
84+
*/
85+
connectionStateRecovery: {
86+
/**
87+
* The backup duration of the sessions and the packets.
88+
*
89+
* @default 120000 (2 minutes)
90+
*/
91+
maxDisconnectionDuration?: number;
92+
/**
93+
* Whether to skip middlewares upon successful connection state recovery.
94+
*
95+
* @default true
96+
*/
97+
skipMiddlewares?: boolean;
98+
};
7599
}
76100

77101
/**
@@ -148,7 +172,7 @@ export class Server<
148172
> = new Map();
149173
private _adapter?: AdapterConstructor;
150174
private _serveClient: boolean;
151-
private opts: Partial<EngineOptions>;
175+
private readonly opts: Partial<ServerOptions>;
152176
private eio: Engine;
153177
private _path: string;
154178
private clientPathRegex: RegExp;
@@ -204,9 +228,20 @@ export class Server<
204228
this.serveClient(false !== opts.serveClient);
205229
this._parser = opts.parser || parser;
206230
this.encoder = new this._parser.Encoder();
207-
this.adapter(opts.adapter || Adapter);
208-
this.sockets = this.of("/");
209231
this.opts = opts;
232+
if (opts.connectionStateRecovery) {
233+
opts.connectionStateRecovery = Object.assign(
234+
{
235+
maxDisconnectionDuration: 2 * 60 * 1000,
236+
skipMiddlewares: true,
237+
},
238+
opts.connectionStateRecovery
239+
);
240+
this.adapter(opts.adapter || SessionAwareAdapter);
241+
} else {
242+
this.adapter(opts.adapter || Adapter);
243+
}
244+
this.sockets = this.of("/");
210245
if (srv || typeof srv == "number")
211246
this.attach(
212247
srv as http.Server | HTTPSServer | Http2SecureServer | number

lib/namespace.ts

+62-19
Original file line numberDiff line numberDiff line change
@@ -296,13 +296,25 @@ export class Namespace<
296296
* @return {Socket}
297297
* @private
298298
*/
299-
_add(
299+
async _add(
300300
client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
301-
query,
302-
fn?: () => void
303-
): Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData> {
301+
auth: Record<string, unknown>,
302+
fn: (
303+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
304+
) => void
305+
) {
304306
debug("adding socket to nsp %s", this.name);
305-
const socket = new Socket(this, client, query);
307+
const socket = await this._createSocket(client, auth);
308+
309+
if (
310+
// @ts-ignore
311+
this.server.opts.connectionStateRecovery?.skipMiddlewares &&
312+
socket.recovered &&
313+
client.conn.readyState === "open"
314+
) {
315+
return this._doConnect(socket, fn);
316+
}
317+
306318
this.run(socket, (err) => {
307319
process.nextTick(() => {
308320
if ("open" !== client.conn.readyState) {
@@ -324,22 +336,53 @@ export class Namespace<
324336
}
325337
}
326338

327-
// track socket
328-
this.sockets.set(socket.id, socket);
329-
330-
// it's paramount that the internal `onconnect` logic
331-
// fires before user-set events to prevent state order
332-
// violations (such as a disconnection before the connection
333-
// logic is complete)
334-
socket._onconnect();
335-
if (fn) fn();
336-
337-
// fire user-set events
338-
this.emitReserved("connect", socket);
339-
this.emitReserved("connection", socket);
339+
this._doConnect(socket, fn);
340340
});
341341
});
342-
return socket;
342+
}
343+
344+
private async _createSocket(
345+
client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
346+
auth: Record<string, unknown>
347+
) {
348+
const sessionId = auth.pid;
349+
const offset = auth.offset;
350+
if (
351+
// @ts-ignore
352+
this.server.opts.connectionStateRecovery &&
353+
typeof sessionId === "string" &&
354+
typeof offset === "string"
355+
) {
356+
const session = await this.adapter.restoreSession(sessionId, offset);
357+
if (session) {
358+
debug("connection state recovered for sid %s", session.sid);
359+
return new Socket(this, client, auth, session);
360+
} else {
361+
debug("unable to restore session state");
362+
}
363+
}
364+
return new Socket(this, client, auth);
365+
}
366+
367+
private _doConnect(
368+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
369+
fn: (
370+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
371+
) => void
372+
) {
373+
// track socket
374+
this.sockets.set(socket.id, socket);
375+
376+
// it's paramount that the internal `onconnect` logic
377+
// fires before user-set events to prevent state order
378+
// violations (such as a disconnection before the connection
379+
// logic is complete)
380+
socket._onconnect();
381+
if (fn) fn(socket);
382+
383+
// fire user-set events
384+
this.emitReserved("connect", socket);
385+
this.emitReserved("connection", socket);
343386
}
344387

345388
/**

lib/socket.ts

+75-11
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,21 @@ import { Packet, PacketType } from "socket.io-parser";
22
import debugModule from "debug";
33
import type { Server } from "./index";
44
import {
5-
EventParams,
5+
DefaultEventsMap,
66
EventNames,
7+
EventParams,
78
EventsMap,
89
StrictEventEmitter,
9-
DefaultEventsMap,
1010
} from "./typed-events";
1111
import type { Client } from "./client";
1212
import type { Namespace, NamespaceReservedEventsMap } from "./namespace";
13-
import type { IncomingMessage, IncomingHttpHeaders } from "http";
13+
import type { IncomingHttpHeaders, IncomingMessage } from "http";
1414
import type {
1515
Adapter,
1616
BroadcastFlags,
17+
PrivateSessionId,
1718
Room,
19+
Session,
1820
SocketId,
1921
} from "socket.io-adapter";
2022
import base64id from "base64id";
@@ -39,6 +41,15 @@ export type DisconnectReason =
3941
| "client namespace disconnect"
4042
| "server namespace disconnect";
4143

44+
const RECOVERABLE_DISCONNECT_REASONS: ReadonlySet<DisconnectReason> = new Set([
45+
"transport error",
46+
"transport close",
47+
"forced close",
48+
"ping timeout",
49+
"server shutting down",
50+
"forced server close",
51+
]);
52+
4253
export interface SocketReservedEventsMap {
4354
disconnect: (reason: DisconnectReason) => void;
4455
disconnecting: (reason: DisconnectReason) => void;
@@ -173,6 +184,11 @@ export class Socket<
173184
* An unique identifier for the session.
174185
*/
175186
public readonly id: SocketId;
187+
/**
188+
* Whether the connection state was recovered after a temporary disconnection. In that case, any missed packets will
189+
* be transmitted to the client, the data attribute and the rooms will be restored.
190+
*/
191+
public readonly recovered: boolean = false;
176192
/**
177193
* The handshake details.
178194
*/
@@ -197,6 +213,14 @@ export class Socket<
197213
*/
198214
public connected: boolean = false;
199215

216+
/**
217+
* The session ID, which must not be shared (unlike {@link id}).
218+
*
219+
* @private
220+
*/
221+
private readonly pid: PrivateSessionId;
222+
223+
// TODO: remove this unused reference
200224
private readonly server: Server<
201225
ListenEvents,
202226
EmitEvents,
@@ -221,16 +245,32 @@ export class Socket<
221245
constructor(
222246
readonly nsp: Namespace<ListenEvents, EmitEvents, ServerSideEvents>,
223247
readonly client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
224-
auth: object
248+
auth: Record<string, unknown>,
249+
previousSession?: Session
225250
) {
226251
super();
227252
this.server = nsp.server;
228253
this.adapter = this.nsp.adapter;
229-
if (client.conn.protocol === 3) {
230-
// @ts-ignore
231-
this.id = nsp.name !== "/" ? nsp.name + "#" + client.id : client.id;
254+
if (previousSession) {
255+
this.id = previousSession.sid;
256+
this.pid = previousSession.pid;
257+
previousSession.rooms.forEach((room) => this.join(room));
258+
this.data = previousSession.data as Partial<SocketData>;
259+
previousSession.missedPackets.forEach((packet) => {
260+
this.packet({
261+
type: PacketType.EVENT,
262+
data: packet,
263+
});
264+
});
265+
this.recovered = true;
232266
} else {
233-
this.id = base64id.generateId(); // don't reuse the Engine.IO id because it's sensitive information
267+
if (client.conn.protocol === 3) {
268+
// @ts-ignore
269+
this.id = nsp.name !== "/" ? nsp.name + "#" + client.id : client.id;
270+
} else {
271+
this.id = base64id.generateId(); // don't reuse the Engine.IO id because it's sensitive information
272+
}
273+
this.pid = base64id.generateId();
234274
}
235275
this.handshake = this.buildHandshake(auth);
236276
}
@@ -299,8 +339,18 @@ export class Socket<
299339
const flags = Object.assign({}, this.flags);
300340
this.flags = {};
301341

302-
this.notifyOutgoingListeners(packet);
303-
this.packet(packet, flags);
342+
// @ts-ignore
343+
if (this.nsp.server.opts.connectionStateRecovery) {
344+
// this ensures the packet is stored and can be transmitted upon reconnection
345+
this.adapter.broadcast(packet, {
346+
rooms: new Set([this.id]),
347+
except: new Set(),
348+
flags,
349+
});
350+
} else {
351+
this.notifyOutgoingListeners(packet);
352+
this.packet(packet, flags);
353+
}
304354

305355
return true;
306356
}
@@ -508,7 +558,10 @@ export class Socket<
508558
if (this.conn.protocol === 3) {
509559
this.packet({ type: PacketType.CONNECT });
510560
} else {
511-
this.packet({ type: PacketType.CONNECT, data: { sid: this.id } });
561+
this.packet({
562+
type: PacketType.CONNECT,
563+
data: { sid: this.id, pid: this.pid },
564+
});
512565
}
513566
}
514567

@@ -644,6 +697,17 @@ export class Socket<
644697
if (!this.connected) return this;
645698
debug("closing socket - reason %s", reason);
646699
this.emitReserved("disconnecting", reason);
700+
701+
if (RECOVERABLE_DISCONNECT_REASONS.has(reason)) {
702+
debug("connection state recovery is enabled for sid %s", this.id);
703+
this.adapter.persistSession({
704+
sid: this.id,
705+
pid: this.pid,
706+
rooms: [...this.rooms],
707+
data: this.data,
708+
});
709+
}
710+
647711
this._cleanup();
648712
this.nsp._remove(this);
649713
this.client._remove(this);

0 commit comments

Comments
 (0)