Skip to content

Commit 3340d49

Browse files
committed
enable push handler support to be tested / test it
1 parent 9b1ac4c commit 3340d49

File tree

3 files changed

+98
-6
lines changed

3 files changed

+98
-6
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export default class RedisCommandsQueue {
5252
readonly decoder;
5353
readonly #pubSub = new PubSub();
5454
readonly #pushHandlers: Map<string, (pushMsg: Array<any>) => unknown> = new Map();
55-
readonly #builtInSet = new Set<string>;
55+
readonly #builtInSet: ReadonlySet<string>;
5656

5757
get isPubSubActive() {
5858
return this.#pubSub.isActive;
@@ -76,9 +76,11 @@ export default class RedisCommandsQueue {
7676
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub));
7777
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this));
7878
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this));
79-
79+
80+
const s = new Set<string>();
81+
this.#builtInSet = s;
8082
for (const str in this.#pushHandlers.keys) {
81-
this.#builtInSet.add(str);
83+
s.add(str);
8284
}
8385

8486
this.decoder = this.#initiateDecoder();
@@ -122,6 +124,14 @@ export default class RedisCommandsQueue {
122124
this.#pushHandlers.set(messageType, handler);
123125
}
124126

127+
removePushHandler(messageType: string) {
128+
if (this.#builtInSet.has(messageType)) {
129+
throw new Error("Cannot override built in push message handler");
130+
}
131+
132+
this.#pushHandlers.delete(messageType);
133+
}
134+
125135
#onPush(push: Array<any>) {
126136
const handler = this.#pushHandlers.get(push[0].toString());
127137
if (handler) {
@@ -141,9 +151,7 @@ export default class RedisCommandsQueue {
141151
onReply: reply => this.#onReply(reply),
142152
onErrorReply: err => this.#onErrorReply(err),
143153
onPush: push => {
144-
if (!this.#onPush(push)) {
145-
146-
}
154+
return this.#onPush(push);
147155
},
148156
getTypeMapping: () => this.#getTypeMapping()
149157
});

packages/client/lib/client/index.spec.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec'
99
import { RESP_TYPES } from '../RESP/decoder';
1010
import { BlobStringReply, NumberReply } from '../RESP/types';
1111
import { SortedSetMember } from '../commands/generic-transformers';
12+
import { createClient } from '../..';
1213

1314
export const SQUARE_SCRIPT = defineScript({
1415
SCRIPT:
@@ -769,4 +770,79 @@ describe('Client', () => {
769770
}
770771
}, GLOBAL.SERVERS.OPEN);
771772
});
773+
774+
describe('Push Handlers', () => {
775+
testUtils.testWithClient('RESP2: add/remove invalidate handler, and validate its called', async client => {
776+
const key = 'x'
777+
778+
const duplicate = await client.duplicate().connect();
779+
try {
780+
const id = await duplicate.clientId();
781+
782+
let nodeResolve;
783+
784+
const promise = new Promise((res) => {
785+
nodeResolve = res;
786+
});
787+
788+
duplicate.addPushHandler("invalidate", (push: Array<any>) => {
789+
assert.equal(push[0].toString(), "invalidate");
790+
assert.notEqual(push[1], null);
791+
assert.equal(push[1].length, 1);
792+
assert.equal(push[1][0].toString(), key);
793+
// this test removing the handler,
794+
// as flushAll in cleanup of test will issue a full invalidate,
795+
// which would fail if this handler is called on it
796+
duplicate.removePushHandler("invalidate");
797+
nodeResolve();
798+
})
799+
800+
await client.sendCommand(['CLIENT', 'TRACKING', 'ON', 'REDIRECT', id.toString()]);
801+
await client.get(key);
802+
await client.set(key, '1');
803+
804+
// force an invalidate all
805+
await client.flushAll();
806+
807+
await nodeResolve;
808+
} finally {
809+
duplicate.destroy();
810+
}
811+
}, {
812+
...GLOBAL.SERVERS.OPEN
813+
});
814+
815+
testUtils.testWithClient('RESP3: add/remove invalidate handler, and validate its called', async client => {
816+
const key = 'x'
817+
818+
let nodeResolve;
819+
820+
const promise = new Promise((res) => {
821+
nodeResolve = res;
822+
});
823+
824+
client.addPushHandler("invalidate", (push: Array<any>) => {
825+
assert.equal(push[0].toString(), "invalidate");
826+
assert.equal(push[1].length, 1);
827+
assert.equal(push[1].length, 1);
828+
assert.equal(push[1][0].toString(), key);
829+
// this test removing the handler,
830+
// as flushAll in cleanup of test will issue a full invalidate,
831+
// which would fail if this handler is called on it
832+
client.removePushHandler("invalidate");
833+
nodeResolve();
834+
})
835+
836+
await client.sendCommand(['CLIENT', 'TRACKING', 'ON']);
837+
await client.get(key);
838+
await client.set(key, '1');
839+
840+
await nodeResolve;
841+
}, {
842+
...GLOBAL.SERVERS.OPEN,
843+
clientOptions: {
844+
RESP: 3
845+
}
846+
});
847+
});
772848
});

packages/client/lib/client/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,14 @@ export default class RedisClient<
573573
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
574574
}
575575

576+
addPushHandler(messageType: string, handler: (pushMsg: Array<any>) => unknown) {
577+
this._self.#queue.addPushHandler(messageType, handler);
578+
}
579+
580+
removePushHandler(messageType: string) {
581+
this._self.#queue.removePushHandler(messageType);
582+
}
583+
576584
sendCommand<T = ReplyUnion>(
577585
args: Array<RedisArgument>,
578586
options?: CommandOptions

0 commit comments

Comments
 (0)