Skip to content

Commit f529412

Browse files
feat: implement connection state recovery
More information about how this feature is supposed to work will be provided in the main repository.
1 parent d5c56d4 commit f529412

File tree

5 files changed

+458
-10
lines changed

5 files changed

+458
-10
lines changed

lib/contrib/yeast.ts

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// imported from https://github.com/unshiftio/yeast
2+
"use strict";
3+
4+
const alphabet =
5+
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_".split(
6+
""
7+
),
8+
length = 64,
9+
map = {};
10+
let seed = 0,
11+
i = 0,
12+
prev;
13+
14+
/**
15+
* Return a string representing the specified number.
16+
*
17+
* @param {Number} num The number to convert.
18+
* @returns {String} The string representation of the number.
19+
* @api public
20+
*/
21+
export function encode(num) {
22+
let encoded = "";
23+
24+
do {
25+
encoded = alphabet[num % length] + encoded;
26+
num = Math.floor(num / length);
27+
} while (num > 0);
28+
29+
return encoded;
30+
}
31+
32+
/**
33+
* Return the integer value specified by the given string.
34+
*
35+
* @param {String} str The string to convert.
36+
* @returns {Number} The integer value represented by the string.
37+
* @api public
38+
*/
39+
export function decode(str) {
40+
let decoded = 0;
41+
42+
for (i = 0; i < str.length; i++) {
43+
decoded = decoded * length + map[str.charAt(i)];
44+
}
45+
46+
return decoded;
47+
}
48+
49+
/**
50+
* Yeast: A tiny growing id generator.
51+
*
52+
* @returns {String} A unique id.
53+
* @api public
54+
*/
55+
export function yeast() {
56+
const now = encode(+new Date());
57+
58+
if (now !== prev) return (seed = 0), (prev = now);
59+
return now + "." + encode(seed++);
60+
}
61+
62+
//
63+
// Map each character to its index.
64+
//
65+
for (; i < length; i++) map[alphabet[i]] = i;

lib/index.ts

+147
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
import { EventEmitter } from "events";
2+
import { yeast } from "./contrib/yeast";
23

4+
/**
5+
* A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging
6+
*/
37
export type SocketId = string;
8+
/**
9+
* A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery
10+
* upon reconnection
11+
*/
12+
export type PrivateSessionId = string;
13+
414
// we could extend the Room type to "string | number", but that would be a breaking change
515
// related: https://github.com/socketio/socket.io-redis-adapter/issues/418
616
export type Room = string;
@@ -20,6 +30,15 @@ export interface BroadcastOptions {
2030
flags?: BroadcastFlags;
2131
}
2232

33+
interface SessionToPersist {
34+
sid: SocketId;
35+
pid: PrivateSessionId;
36+
rooms: Room[];
37+
data: unknown;
38+
}
39+
40+
export type Session = SessionToPersist & { missedPackets: unknown[][] };
41+
2342
export class Adapter extends EventEmitter {
2443
public rooms: Map<Room, Set<SocketId>> = new Map();
2544
public sids: Map<SocketId, Set<Room>> = new Map();
@@ -331,4 +350,132 @@ export class Adapter extends EventEmitter {
331350
"this adapter does not support the serverSideEmit() functionality"
332351
);
333352
}
353+
354+
/**
355+
* Save the client session in order to restore it upon reconnection.
356+
*/
357+
public persistSession(session: SessionToPersist) {}
358+
359+
/**
360+
* Restore the session and find the packets that were missed by the client.
361+
* @param pid
362+
* @param offset
363+
*/
364+
public restoreSession(
365+
pid: PrivateSessionId,
366+
offset: string
367+
): Promise<Session> {
368+
return null;
369+
}
370+
}
371+
372+
interface PersistedPacket {
373+
id: string;
374+
emittedAt: number;
375+
data: unknown[];
376+
opts: BroadcastOptions;
377+
}
378+
379+
type SessionWithTimestamp = SessionToPersist & { disconnectedAt: number };
380+
381+
export class SessionAwareAdapter extends Adapter {
382+
private readonly maxDisconnectionDuration: number;
383+
384+
private sessions: Map<PrivateSessionId, SessionWithTimestamp> = new Map();
385+
private packets: PersistedPacket[] = [];
386+
387+
constructor(readonly nsp: any) {
388+
super(nsp);
389+
this.maxDisconnectionDuration =
390+
nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration;
391+
392+
const timer = setInterval(() => {
393+
const threshold = Date.now() - this.maxDisconnectionDuration;
394+
this.sessions.forEach((session, sessionId) => {
395+
const hasExpired = session.disconnectedAt < threshold;
396+
if (hasExpired) {
397+
this.sessions.delete(sessionId);
398+
}
399+
});
400+
for (let i = this.packets.length - 1; i >= 0; i--) {
401+
const hasExpired = this.packets[i].emittedAt < threshold;
402+
if (hasExpired) {
403+
this.packets.splice(0, i + 1);
404+
break;
405+
}
406+
}
407+
}, 60 * 1000);
408+
// prevents the timer from keeping the process alive
409+
timer.unref();
410+
}
411+
412+
override persistSession(session: SessionToPersist) {
413+
(session as SessionWithTimestamp).disconnectedAt = Date.now();
414+
this.sessions.set(session.pid, session as SessionWithTimestamp);
415+
}
416+
417+
override restoreSession(
418+
pid: PrivateSessionId,
419+
offset: string
420+
): Promise<Session> {
421+
const session = this.sessions.get(pid);
422+
if (!session) {
423+
// the session may have expired
424+
return null;
425+
}
426+
const hasExpired =
427+
session.disconnectedAt + this.maxDisconnectionDuration < Date.now();
428+
if (hasExpired) {
429+
// the session has expired
430+
this.sessions.delete(pid);
431+
return null;
432+
}
433+
const index = this.packets.findIndex((packet) => packet.id === offset);
434+
if (index === -1) {
435+
// the offset may be too old
436+
return null;
437+
}
438+
const missedPackets = [];
439+
for (let i = index + 1; i < this.packets.length; i++) {
440+
const packet = this.packets[i];
441+
if (shouldIncludePacket(session.rooms, packet.opts)) {
442+
missedPackets.push(packet.data);
443+
}
444+
}
445+
return Promise.resolve({
446+
...session,
447+
missedPackets,
448+
});
449+
}
450+
451+
override broadcast(packet: any, opts: BroadcastOptions) {
452+
const isEventPacket = packet.type === 2;
453+
// packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and
454+
// restored on another server upon reconnection
455+
const withoutAcknowledgement = packet.id === undefined;
456+
const notVolatile = opts.flags?.volatile === undefined;
457+
if (isEventPacket && withoutAcknowledgement && notVolatile) {
458+
const id = yeast();
459+
// the offset is stored at the end of the data array, so the client knows the ID of the last packet it has
460+
// processed (and the format is backward-compatible)
461+
packet.data.push(id);
462+
this.packets.push({
463+
id,
464+
opts,
465+
data: packet.data,
466+
emittedAt: Date.now(),
467+
});
468+
}
469+
super.broadcast(packet, opts);
470+
}
471+
}
472+
473+
function shouldIncludePacket(
474+
sessionRooms: Room[],
475+
opts: BroadcastOptions
476+
): boolean {
477+
const included =
478+
opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room));
479+
const notExcluded = sessionRooms.every((room) => !opts.except.has(room));
480+
return included && notExcluded;
334481
}

package-lock.json

+7-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"nyc": "^15.1.0",
2121
"prettier": "^2.8.1",
2222
"ts-node": "^10.9.1",
23-
"typescript": "^4.0.3"
23+
"typescript": "^4.9.4"
2424
},
2525
"scripts": {
2626
"test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts",

0 commit comments

Comments
 (0)