Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 829a1f5

Browse files
committedApr 28, 2022
feat: broadcast and expect multiple acks
This feature was added in `[email protected]`: ```js io.timeout(1000).emit("some-event", (err, responses) => {   // ... }); ``` Thanks to this change, it will now work with multiple Socket.IO servers. Related: socketio/socket.io@8b20457
1 parent e546e4d commit 829a1f5

File tree

4 files changed

+369
-95
lines changed

4 files changed

+369
-95
lines changed
 

‎lib/index.ts

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ enum EventType {
2121
FETCH_SOCKETS_RESPONSE,
2222
SERVER_SIDE_EMIT,
2323
SERVER_SIDE_EMIT_RESPONSE,
24+
BROADCAST_CLIENT_COUNT,
25+
BROADCAST_ACK,
2426
}
2527

2628
interface Request {
@@ -32,6 +34,12 @@ interface Request {
3234
responses: any[];
3335
}
3436

37+
interface AckRequest {
38+
type: EventType.BROADCAST;
39+
clientCountCallback: (clientCount: number) => void;
40+
ack: (...args: any[]) => void;
41+
}
42+
3543
/**
3644
* UID of an emitter using the `@socket.io/postgres-emitter` package
3745
*/
@@ -151,6 +159,7 @@ export class PostgresAdapter extends Adapter {
151159
private heartbeatTimer: NodeJS.Timeout | undefined;
152160
private cleanupTimer: NodeJS.Timeout | undefined;
153161
private requests: Map<string, Request> = new Map();
162+
private ackRequests: Map<string, AckRequest> = new Map();
154163

155164
/**
156165
* Adapter constructor.
@@ -271,12 +280,54 @@ export class PostgresAdapter extends Adapter {
271280
}
272281
case EventType.BROADCAST: {
273282
debug("broadcast with opts %j", document.data.opts);
274-
super.broadcast(
275-
document.data.packet,
276-
PostgresAdapter.deserializeOptions(document.data.opts)
277-
);
283+
284+
const withAck = document.data.requestId !== undefined;
285+
if (withAck) {
286+
super.broadcastWithAck(
287+
document.data.packet,
288+
PostgresAdapter.deserializeOptions(document.data.opts),
289+
(clientCount) => {
290+
debug("waiting for %d client acknowledgements", clientCount);
291+
this.publish({
292+
type: EventType.BROADCAST_CLIENT_COUNT,
293+
data: {
294+
requestId: document.data.requestId,
295+
clientCount,
296+
},
297+
});
298+
},
299+
(arg) => {
300+
debug("received acknowledgement with value %j", arg);
301+
this.publish({
302+
type: EventType.BROADCAST_ACK,
303+
data: {
304+
requestId: document.data.requestId,
305+
packet: arg,
306+
},
307+
});
308+
}
309+
);
310+
} else {
311+
super.broadcast(
312+
document.data.packet,
313+
PostgresAdapter.deserializeOptions(document.data.opts)
314+
);
315+
}
316+
break;
317+
}
318+
319+
case EventType.BROADCAST_CLIENT_COUNT: {
320+
const request = this.ackRequests.get(document.data.requestId);
321+
request?.clientCountCallback(document.data.clientCount);
322+
break;
323+
}
324+
325+
case EventType.BROADCAST_ACK: {
326+
const request = this.ackRequests.get(document.data.requestId);
327+
request?.ack(document.data.packet);
278328
break;
279329
}
330+
280331
case EventType.SOCKETS_JOIN: {
281332
debug("calling addSockets with opts %j", document.data.opts);
282333
super.addSockets(
@@ -285,6 +336,7 @@ export class PostgresAdapter extends Adapter {
285336
);
286337
break;
287338
}
339+
288340
case EventType.SOCKETS_LEAVE: {
289341
debug("calling delSockets with opts %j", document.data.opts);
290342
super.delSockets(
@@ -419,6 +471,7 @@ export class PostgresAdapter extends Adapter {
419471
if (
420472
[
421473
EventType.BROADCAST,
474+
EventType.BROADCAST_ACK,
422475
EventType.SERVER_SIDE_EMIT,
423476
EventType.SERVER_SIDE_EMIT_RESPONSE,
424477
].includes(document.type) &&
@@ -506,6 +559,48 @@ export class PostgresAdapter extends Adapter {
506559
});
507560
}
508561

562+
public broadcastWithAck(
563+
packet: any,
564+
opts: BroadcastOptions,
565+
clientCountCallback: (clientCount: number) => void,
566+
ack: (...args: any[]) => void
567+
) {
568+
const onlyLocal = opts?.flags?.local;
569+
if (!onlyLocal) {
570+
const requestId = randomId();
571+
572+
this.publish({
573+
type: EventType.BROADCAST,
574+
data: {
575+
packet,
576+
requestId,
577+
opts: PostgresAdapter.serializeOptions(opts),
578+
},
579+
});
580+
581+
this.ackRequests.set(requestId, {
582+
type: EventType.BROADCAST,
583+
clientCountCallback,
584+
ack,
585+
});
586+
587+
// we have no way to know at this level whether the server has received an acknowledgement from each client, so we
588+
// will simply clean up the ackRequests map after the given delay
589+
setTimeout(() => {
590+
this.ackRequests.delete(requestId);
591+
}, opts.flags!.timeout);
592+
}
593+
594+
// packets with binary contents are modified by the broadcast method, hence the nextTick()
595+
process.nextTick(() => {
596+
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
597+
});
598+
}
599+
600+
public serverCount(): Promise<number> {
601+
return Promise.resolve(1 + this.nodesMap.size);
602+
}
603+
509604
addSockets(opts: BroadcastOptions, rooms: Room[]) {
510605
super.addSockets(opts, rooms);
511606

‎package-lock.json

Lines changed: 181 additions & 87 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"@types/pg": "^8.6.1",
2424
"debug": "~4.3.1",
2525
"pg": "^8.0.0",
26-
"socket.io-adapter": "~2.3.0"
26+
"socket.io-adapter": "~2.4.0"
2727
},
2828
"devDependencies": {
2929
"@types/expect.js": "^0.3.29",
@@ -33,7 +33,7 @@
3333
"mocha": "^8.4.0",
3434
"nyc": "^15.1.0",
3535
"prettier": "^2.1.2",
36-
"socket.io": "^4.1.1",
36+
"socket.io": "^4.5.0",
3737
"socket.io-client": "^4.1.1",
3838
"ts-node": "^9.1.1",
3939
"typescript": "^4.0.5"

‎test/index.ts

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { createServer } from "http";
22
import { Server, Socket as ServerSocket } from "socket.io";
33
import { io as ioc, Socket as ClientSocket } from "socket.io-client";
44
import expect = require("expect.js");
5-
import { createAdapter, PostgresAdapter } from "..";
5+
import { createAdapter, PostgresAdapter } from "../lib";
66
import type { AddressInfo } from "net";
77
import { times, sleep } from "./util";
88
import { Pool } from "pg";
@@ -175,6 +175,91 @@ describe("@socket.io/postgres-adapter", () => {
175175

176176
servers[0].local.emit("test");
177177
});
178+
179+
it("broadcasts with multiple acknowledgements", (done) => {
180+
clientSockets[0].on("test", (cb) => {
181+
cb(1);
182+
});
183+
184+
clientSockets[1].on("test", (cb) => {
185+
cb(2);
186+
});
187+
188+
clientSockets[2].on("test", (cb) => {
189+
cb(3);
190+
});
191+
192+
servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => {
193+
expect(err).to.be(null);
194+
expect(responses).to.contain(1);
195+
expect(responses).to.contain(2);
196+
expect(responses).to.contain(3);
197+
198+
setTimeout(() => {
199+
// @ts-ignore
200+
expect(servers[0].of("/").adapter.ackRequests.size).to.eql(0);
201+
202+
done();
203+
}, 500);
204+
});
205+
});
206+
207+
it("broadcasts with multiple acknowledgements (binary content)", (done) => {
208+
clientSockets[0].on("test", (cb) => {
209+
cb(Buffer.from([1]));
210+
});
211+
212+
clientSockets[1].on("test", (cb) => {
213+
cb(Buffer.from([2]));
214+
});
215+
216+
clientSockets[2].on("test", (cb) => {
217+
cb(Buffer.from([3]));
218+
});
219+
220+
servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => {
221+
expect(err).to.be(null);
222+
responses.forEach((response) => {
223+
expect(Buffer.isBuffer(response)).to.be(true);
224+
});
225+
226+
done();
227+
});
228+
});
229+
230+
it("broadcasts with multiple acknowledgements (no client)", (done) => {
231+
servers[0]
232+
.to("abc")
233+
.timeout(500)
234+
.emit("test", (err: Error, responses: any[]) => {
235+
expect(err).to.be(null);
236+
expect(responses).to.eql([]);
237+
238+
done();
239+
});
240+
});
241+
242+
it("broadcasts with multiple acknowledgements (timeout)", (done) => {
243+
clientSockets[0].on("test", (cb) => {
244+
cb(1);
245+
});
246+
247+
clientSockets[1].on("test", (cb) => {
248+
cb(2);
249+
});
250+
251+
clientSockets[2].on("test", (cb) => {
252+
// do nothing
253+
});
254+
255+
servers[0].timeout(500).emit("test", (err: Error, responses: any[]) => {
256+
expect(err).to.be.an(Error);
257+
expect(responses).to.contain(1);
258+
expect(responses).to.contain(2);
259+
260+
done();
261+
});
262+
});
178263
});
179264

180265
describe("socketsJoin", () => {
@@ -281,7 +366,7 @@ describe("@socket.io/postgres-adapter", () => {
281366
});
282367

283368
it("returns a single socket instance", async () => {
284-
serverSockets[1].data = "test";
369+
serverSockets[1].data = "test" as any;
285370

286371
const [remoteSocket] = await servers[0]
287372
.in(serverSockets[1].id)

0 commit comments

Comments
 (0)
Please sign in to comment.