Skip to content

Commit eab8c80

Browse files
committed
Fixed format of 'callBroadcast' method
1 parent 0d595e8 commit eab8c80

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

amqp-rpc/amqp-rpc-tests.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,42 @@ rpc.call<any>('withoutCB', {}, function (msg) {
3939
console.log('withoutCB results:', msg); //output: please run function without cb parameter
4040
});
4141

42-
rpc.call<any>('withoutCB', {}); //output message on server side console
42+
rpc.call<any>('withoutCB', {}); //output message on server side console
43+
44+
import os = require('os');
45+
interface State {
46+
type: string;
47+
}
48+
49+
var counter = 0;
50+
rpc.onBroadcast<State>('getWorkerStat', function (params, cb) {
51+
if (params && params.type == 'fullStat') {
52+
cb(null, {
53+
pid: process.pid,
54+
hostname: os.hostname(),
55+
uptime: process.uptime(),
56+
counter: counter++
57+
});
58+
}
59+
else {
60+
cb(null, { counter: counter++ })
61+
}
62+
});
63+
64+
var all_stats: any = {};
65+
rpc.callBroadcast<State>(
66+
'getWorkerStat',
67+
{ type: 'fullStat' }, //request parameters
68+
{ //call options
69+
ttl: 1000, //wait response time (1 seconds), after run onComplete
70+
onResponse: function (err: any, stat: any) { //callback on each worker response
71+
all_stats[stat.hostname + ':' + stat.pid] = stat;
72+
},
73+
onComplete: function () { //callback on ttl expired
74+
console.log('----------------------- WORKER STATISTICS ----------------------------------------');
75+
for (var worker in all_stats) {
76+
var s: any = all_stats[worker];
77+
console.log(worker, '\tuptime=', s.uptime.toFixed(2) + ' seconds', '\tcounter=', s.counter);
78+
}
79+
}
80+
});

amqp-rpc/amqp-rpc.d.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ declare module "amqp-rpc" {
3535
}
3636

3737
export interface BroadcastOptions {
38-
ttl?: boolean;
38+
ttl?: number;
3939
onResponse?: any;
4040
context?: any;
4141
onComplete?: any;
@@ -52,6 +52,10 @@ declare module "amqp-rpc" {
5252
(...args: any[]): void;
5353
}
5454

55+
export interface CallbackWithError {
56+
(err: any, ...args: any[]): void;
57+
}
58+
5559
export function factory(opt?: Options): amqpRPC;
5660

5761
export class amqpRPC {
@@ -61,8 +65,8 @@ declare module "amqp-rpc" {
6165
call<T>(cmd: string, params: T, cb?: Callback, context?: any, options?: CallOptions): string;
6266
on<T>(cmd: string, cb: (param?: T, cb?: Callback, info?: CommandInfo) => void, context?: any, options?: HandlerOptions): boolean;
6367
off(cmd: string): boolean;
64-
callBroadcast(cmd: string, params: any, options: BroadcastOptions): void;
65-
onBroadcast(cmd: string, cb: (err: any) => void, context: any, options?: any): boolean;
68+
callBroadcast<T>(cmd: string, params: T, options?: BroadcastOptions): void;
69+
onBroadcast<T>(cmd: string, cb?: (params?: T, cb?: CallbackWithError) => void, context?: any, options?: any): boolean;
6670
offBroadcast(cmd: string): boolean;
6771
}
68-
}
72+
}

0 commit comments

Comments
 (0)