Skip to content

Commit 0e23ff0

Browse files
fix(cluster): notify the other nodes when closing
The clustered adapter will now: - periodically check if a node has left the cluster - send an event when it leaves the cluster This should reduce the number of "timeout reached: only x responses received out of y" errors.
1 parent 80af4e9 commit 0e23ff0

File tree

1 file changed

+210
-3
lines changed

1 file changed

+210
-3
lines changed

lib/cluster-adapter.ts

Lines changed: 210 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,18 @@ export enum MessageType {
4545
SERVER_SIDE_EMIT_RESPONSE,
4646
BROADCAST_CLIENT_COUNT,
4747
BROADCAST_ACK,
48+
ADAPTER_CLOSE,
4849
}
4950

5051
export type ClusterMessage = {
5152
uid: string;
5253
nsp: string;
5354
} & (
5455
| {
55-
type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT;
56+
type:
57+
| MessageType.INITIAL_HEARTBEAT
58+
| MessageType.HEARTBEAT
59+
| MessageType.ADAPTER_CLOSE;
5660
}
5761
| {
5862
type: MessageType.BROADCAST;
@@ -643,11 +647,21 @@ export abstract class ClusterAdapter extends Adapter {
643647
);
644648
}
645649

650+
interface CustomClusterRequest {
651+
type: MessageType;
652+
resolve: Function;
653+
timeout: NodeJS.Timeout;
654+
missingUids: Set<string>;
655+
responses: any[];
656+
}
657+
646658
export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
647659
private readonly _opts: Required<ClusterAdapterOptions>;
648660

649661
private heartbeatTimer: NodeJS.Timeout;
650662
private nodesMap: Map<string, number> = new Map(); // uid => timestamp of last message
663+
private readonly cleanupTimer: NodeJS.Timeout | undefined;
664+
private customRequests: Map<string, CustomClusterRequest> = new Map();
651665

652666
protected constructor(nsp, opts: ClusterAdapterOptions) {
653667
super(nsp);
@@ -658,9 +672,19 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
658672
},
659673
opts
660674
);
675+
this.cleanupTimer = setInterval(() => {
676+
const now = Date.now();
677+
this.nodesMap.forEach((lastSeen, uid) => {
678+
const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout;
679+
if (nodeSeemsDown) {
680+
debug("node %s seems down", uid);
681+
this.removeNode(uid);
682+
}
683+
});
684+
}, 1_000);
661685
}
662686

663-
override init(): Promise<void> | void {
687+
override init() {
664688
this.publish({
665689
type: MessageType.INITIAL_HEARTBEAT,
666690
});
@@ -677,8 +701,14 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
677701
}, this._opts.heartbeatInterval);
678702
}
679703

680-
override close(): Promise<void> | void {
704+
override close() {
705+
this.publish({
706+
type: MessageType.ADAPTER_CLOSE,
707+
});
681708
clearTimeout(this.heartbeatTimer);
709+
if (this.cleanupTimer) {
710+
clearInterval(this.cleanupTimer);
711+
}
682712
}
683713

684714
override async onMessage(message: ClusterMessage, offset?: string) {
@@ -700,6 +730,9 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
700730
case MessageType.HEARTBEAT:
701731
// nothing to do
702732
break;
733+
case MessageType.ADAPTER_CLOSE:
734+
this.removeNode(message.uid);
735+
break;
703736
default:
704737
super.onMessage(message, offset);
705738
}
@@ -722,4 +755,178 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
722755

723756
return super.publish(message);
724757
}
758+
759+
override async serverSideEmit(packet: any[]) {
760+
const withAck = typeof packet[packet.length - 1] === "function";
761+
762+
if (!withAck) {
763+
return this.publish({
764+
type: MessageType.SERVER_SIDE_EMIT,
765+
data: {
766+
packet,
767+
},
768+
});
769+
}
770+
771+
const ack = packet.pop();
772+
const expectedResponseCount = this.nodesMap.size;
773+
774+
debug(
775+
'waiting for %d responses to "serverSideEmit" request',
776+
expectedResponseCount
777+
);
778+
779+
if (expectedResponseCount <= 0) {
780+
return ack(null, []);
781+
}
782+
783+
const requestId = randomId();
784+
785+
const timeout = setTimeout(() => {
786+
const storedRequest = this.customRequests.get(requestId);
787+
if (storedRequest) {
788+
ack(
789+
new Error(
790+
`timeout reached: missing ${storedRequest.missingUids.size} responses`
791+
),
792+
storedRequest.responses
793+
);
794+
this.customRequests.delete(requestId);
795+
}
796+
}, DEFAULT_TIMEOUT);
797+
798+
const storedRequest = {
799+
type: MessageType.SERVER_SIDE_EMIT,
800+
resolve: ack,
801+
timeout,
802+
missingUids: new Set([...this.nodesMap.keys()]),
803+
responses: [],
804+
};
805+
this.customRequests.set(requestId, storedRequest);
806+
807+
this.publish({
808+
type: MessageType.SERVER_SIDE_EMIT,
809+
data: {
810+
requestId, // the presence of this attribute defines whether an acknowledgement is needed
811+
packet,
812+
},
813+
});
814+
}
815+
816+
override async fetchSockets(opts: BroadcastOptions): Promise<any[]> {
817+
const [localSockets, serverCount] = await Promise.all([
818+
super.fetchSockets({
819+
rooms: opts.rooms,
820+
except: opts.except,
821+
flags: {
822+
local: true,
823+
},
824+
}),
825+
this.serverCount(),
826+
]);
827+
const expectedResponseCount = serverCount - 1;
828+
829+
if (opts.flags?.local || expectedResponseCount <= 0) {
830+
return localSockets as any[];
831+
}
832+
833+
const requestId = randomId();
834+
835+
return new Promise<any[]>((resolve, reject) => {
836+
const timeout = setTimeout(() => {
837+
const storedRequest = this.customRequests.get(requestId);
838+
if (storedRequest) {
839+
reject(
840+
new Error(
841+
`timeout reached: missing ${storedRequest.missingUids.size} responses`
842+
)
843+
);
844+
this.customRequests.delete(requestId);
845+
}
846+
}, opts.flags.timeout || DEFAULT_TIMEOUT);
847+
848+
const storedRequest = {
849+
type: MessageType.FETCH_SOCKETS,
850+
resolve,
851+
timeout,
852+
missingUids: new Set([...this.nodesMap.keys()]),
853+
responses: localSockets as any[],
854+
};
855+
this.customRequests.set(requestId, storedRequest);
856+
857+
this.publish({
858+
type: MessageType.FETCH_SOCKETS,
859+
data: {
860+
opts: encodeOptions(opts),
861+
requestId,
862+
},
863+
});
864+
});
865+
}
866+
867+
override onResponse(response: ClusterResponse) {
868+
const requestId = response.data.requestId;
869+
870+
debug("received response %s to request %s", response.type, requestId);
871+
872+
switch (response.type) {
873+
case MessageType.FETCH_SOCKETS_RESPONSE: {
874+
const request = this.customRequests.get(requestId);
875+
876+
if (!request) {
877+
return;
878+
}
879+
880+
(response.data.sockets as any[]).forEach((socket) =>
881+
request.responses.push(socket)
882+
);
883+
884+
request.missingUids.delete(response.uid);
885+
if (request.missingUids.size === 0) {
886+
clearTimeout(request.timeout);
887+
request.resolve(request.responses);
888+
this.customRequests.delete(requestId);
889+
}
890+
break;
891+
}
892+
893+
case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
894+
const request = this.customRequests.get(requestId);
895+
896+
if (!request) {
897+
return;
898+
}
899+
900+
request.responses.push(response.data.packet);
901+
902+
request.missingUids.delete(response.uid);
903+
if (request.missingUids.size === 0) {
904+
clearTimeout(request.timeout);
905+
request.resolve(null, request.responses);
906+
this.customRequests.delete(requestId);
907+
}
908+
break;
909+
}
910+
911+
default:
912+
super.onResponse(response);
913+
}
914+
}
915+
916+
private removeNode(uid: string) {
917+
this.customRequests.forEach((request, requestId) => {
918+
request.missingUids.delete(uid);
919+
if (request.missingUids.size === 0) {
920+
clearTimeout(request.timeout);
921+
if (request.type === MessageType.FETCH_SOCKETS) {
922+
request.resolve(request.responses);
923+
} else if (request.type === MessageType.SERVER_SIDE_EMIT) {
924+
request.resolve(null, request.responses);
925+
}
926+
this.customRequests.delete(requestId);
927+
}
928+
});
929+
930+
this.nodesMap.delete(uid);
931+
}
725932
}

0 commit comments

Comments
 (0)