Skip to content

Commit b2d3695

Browse files
refactor(cluster): add explicit message types
1 parent b157e9e commit b2d3695

File tree

1 file changed

+113
-36
lines changed

1 file changed

+113
-36
lines changed

lib/cluster-adapter.ts

Lines changed: 113 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { Adapter, BroadcastOptions, Room } from "./index";
1+
import {
2+
Adapter,
3+
type BroadcastFlags,
4+
type BroadcastOptions,
5+
type Room,
6+
} from "./index";
27
import { debug as debugModule } from "debug";
38
import { randomBytes } from "crypto";
49

@@ -10,6 +15,10 @@ function randomId() {
1015
return randomBytes(8).toString("hex");
1116
}
1217

18+
type DistributiveOmit<T, K extends keyof any> = T extends any
19+
? Omit<T, K>
20+
: never;
21+
1322
export interface ClusterAdapterOptions {
1423
/**
1524
* The number of ms between two heartbeats.
@@ -38,11 +47,50 @@ export enum MessageType {
3847
BROADCAST_ACK,
3948
}
4049

41-
export interface ClusterMessage {
50+
export type ClusterMessage = {
4251
uid: string;
43-
type: MessageType;
44-
data?: Record<string, unknown>;
45-
}
52+
nsp: string;
53+
} & (
54+
| {
55+
type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT;
56+
}
57+
| {
58+
type: MessageType.BROADCAST;
59+
data: {
60+
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
61+
packet: unknown;
62+
requestId?: string;
63+
};
64+
}
65+
| {
66+
type: MessageType.SOCKETS_JOIN | MessageType.SOCKETS_LEAVE;
67+
data: {
68+
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
69+
rooms: string[];
70+
};
71+
}
72+
| {
73+
type: MessageType.DISCONNECT_SOCKETS;
74+
data: {
75+
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
76+
close?: boolean;
77+
};
78+
}
79+
| {
80+
type: MessageType.FETCH_SOCKETS;
81+
data: {
82+
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
83+
requestId: string;
84+
};
85+
}
86+
| {
87+
type: MessageType.SERVER_SIDE_EMIT;
88+
data: {
89+
requestId?: string;
90+
packet: unknown;
91+
};
92+
}
93+
);
4694

4795
interface ClusterRequest {
4896
type: MessageType;
@@ -53,13 +101,39 @@ interface ClusterRequest {
53101
responses: any[];
54102
}
55103

56-
interface ClusterResponse {
57-
type: MessageType;
58-
data: {
59-
requestId: string;
60-
[key: string]: unknown;
61-
};
62-
}
104+
export type ClusterResponse = {
105+
uid: string;
106+
nsp: string;
107+
} & (
108+
| {
109+
type: MessageType.FETCH_SOCKETS_RESPONSE;
110+
data: {
111+
requestId: string;
112+
sockets: unknown[];
113+
};
114+
}
115+
| {
116+
type: MessageType.SERVER_SIDE_EMIT_RESPONSE;
117+
data: {
118+
requestId: string;
119+
packet: unknown;
120+
};
121+
}
122+
| {
123+
type: MessageType.BROADCAST_CLIENT_COUNT;
124+
data: {
125+
requestId: string;
126+
clientCount: number;
127+
};
128+
}
129+
| {
130+
type: MessageType.BROADCAST_ACK;
131+
data: {
132+
requestId: string;
133+
packet: unknown;
134+
};
135+
}
136+
);
63137

64138
interface ClusterAckRequest {
65139
clientCountCallback: (clientCount: number) => void;
@@ -85,7 +159,7 @@ function decodeOptions(opts): BroadcastOptions {
85159
/**
86160
* A cluster-ready adapter. Any extending class must:
87161
*
88-
* - implement {@link ClusterAdapter#publishMessage} and {@link ClusterAdapter#publishResponse}
162+
* - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse}
89163
* - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse}
90164
*/
91165
export abstract class ClusterAdapter extends Adapter {
@@ -125,7 +199,7 @@ export abstract class ClusterAdapter extends Adapter {
125199
this.publishResponse(message.uid, {
126200
type: MessageType.BROADCAST_CLIENT_COUNT,
127201
data: {
128-
requestId: message.data.requestId as string,
202+
requestId: message.data.requestId,
129203
clientCount,
130204
},
131205
});
@@ -135,7 +209,7 @@ export abstract class ClusterAdapter extends Adapter {
135209
this.publishResponse(message.uid, {
136210
type: MessageType.BROADCAST_ACK,
137211
data: {
138-
requestId: message.data.requestId as string,
212+
requestId: message.data.requestId,
139213
packet: arg,
140214
},
141215
});
@@ -153,23 +227,17 @@ export abstract class ClusterAdapter extends Adapter {
153227
}
154228

155229
case MessageType.SOCKETS_JOIN:
156-
super.addSockets(
157-
decodeOptions(message.data.opts),
158-
message.data.rooms as string[]
159-
);
230+
super.addSockets(decodeOptions(message.data.opts), message.data.rooms);
160231
break;
161232

162233
case MessageType.SOCKETS_LEAVE:
163-
super.delSockets(
164-
decodeOptions(message.data.opts),
165-
message.data.rooms as string[]
166-
);
234+
super.delSockets(decodeOptions(message.data.opts), message.data.rooms);
167235
break;
168236

169237
case MessageType.DISCONNECT_SOCKETS:
170238
super.disconnectSockets(
171239
decodeOptions(message.data.opts),
172-
message.data.close as boolean
240+
message.data.close
173241
);
174242
break;
175243

@@ -182,7 +250,7 @@ export abstract class ClusterAdapter extends Adapter {
182250
this.publishResponse(message.uid, {
183251
type: MessageType.FETCH_SOCKETS_RESPONSE,
184252
data: {
185-
requestId: message.data.requestId as string,
253+
requestId: message.data.requestId,
186254
sockets: localSockets.map((socket) => {
187255
// remove sessionStore from handshake, as it may contain circular references
188256
const { sessionStore, ...handshake } = socket.handshake;
@@ -216,7 +284,7 @@ export abstract class ClusterAdapter extends Adapter {
216284
this.publishResponse(message.uid, {
217285
type: MessageType.SERVER_SIDE_EMIT_RESPONSE,
218286
data: {
219-
requestId: message.data.requestId as string,
287+
requestId: message.data.requestId,
220288
packet: arg,
221289
},
222290
});
@@ -247,7 +315,7 @@ export abstract class ClusterAdapter extends Adapter {
247315
case MessageType.BROADCAST_CLIENT_COUNT: {
248316
this.ackRequests
249317
.get(requestId)
250-
?.clientCountCallback(response.data.clientCount as number);
318+
?.clientCountCallback(response.data.clientCount);
251319
break;
252320
}
253321

@@ -264,7 +332,7 @@ export abstract class ClusterAdapter extends Adapter {
264332
}
265333

266334
request.current++;
267-
(response.data.sockets as any[]).forEach((socket) =>
335+
response.data.sockets.forEach((socket) =>
268336
request.responses.push(socket)
269337
);
270338

@@ -295,6 +363,7 @@ export abstract class ClusterAdapter extends Adapter {
295363
}
296364

297365
default:
366+
// @ts-ignore
298367
debug("unknown response type: %s", response.type);
299368
}
300369
}
@@ -537,11 +606,10 @@ export abstract class ClusterAdapter extends Adapter {
537606
});
538607
}
539608

540-
protected publish(message: Omit<ClusterMessage, "uid">) {
541-
return this.publishMessage({
542-
uid: this.uid,
543-
...message,
544-
});
609+
protected publish(message: DistributiveOmit<ClusterMessage, "nsp" | "uid">) {
610+
(message as ClusterMessage).uid = this.uid;
611+
(message as ClusterMessage).nsp = this.nsp.name;
612+
return this.doPublish(message as ClusterMessage);
545613
}
546614

547615
/**
@@ -551,7 +619,16 @@ export abstract class ClusterAdapter extends Adapter {
551619
* @protected
552620
* @return an offset, if applicable
553621
*/
554-
protected abstract publishMessage(message: ClusterMessage): Promise<string>;
622+
protected abstract doPublish(message: ClusterMessage): Promise<string>;
623+
624+
protected publishResponse(
625+
requesterUid: string,
626+
response: Omit<ClusterResponse, "nsp" | "uid">
627+
) {
628+
(response as ClusterResponse).uid = this.uid;
629+
(response as ClusterResponse).nsp = this.nsp.name;
630+
return this.doPublishResponse(requesterUid, response as ClusterResponse);
631+
}
555632

556633
/**
557634
* Send a response to the given member of the cluster.
@@ -560,7 +637,7 @@ export abstract class ClusterAdapter extends Adapter {
560637
* @param response
561638
* @protected
562639
*/
563-
protected abstract publishResponse(
640+
protected abstract doPublishResponse(
564641
requesterUid: string,
565642
response: ClusterResponse
566643
);
@@ -640,7 +717,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
640717
return Promise.resolve(1 + this.nodesMap.size);
641718
}
642719

643-
override publish(message: Omit<ClusterMessage, "uid">) {
720+
override publish(message: DistributiveOmit<ClusterMessage, "nsp" | "uid">) {
644721
this.scheduleHeartbeat();
645722

646723
return super.publish(message);

0 commit comments

Comments
 (0)