Skip to content

Commit cdeee83

Browse files
committed
Make it possible to bind some events on demand
1 parent f4b17b6 commit cdeee83

File tree

14 files changed

+243
-227
lines changed

14 files changed

+243
-227
lines changed

packages/protocol/src/browser/modules/child_process.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { ClientWritableProxy, ClientReadableProxy, Readable, Writable } from "./
88

99
// tslint:disable completed-docs
1010

11-
export interface ClientChildProcessProxy extends ChildProcessProxy, ClientServerProxy {}
11+
export interface ClientChildProcessProxy extends ChildProcessProxy, ClientServerProxy<cp.ChildProcess> {}
1212

1313
export interface ClientChildProcessProxies {
1414
childProcess: ClientChildProcessProxy;

packages/protocol/src/browser/modules/fs.ts

+4-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ import { IEncodingOptions, IEncodingOptionsCallback } from "../../common/util";
55
import { FsModuleProxy, ReadStreamProxy, Stats as IStats, WatcherProxy, WriteStreamProxy } from "../../node/modules/fs";
66
import { Readable, Writable } from "./stream";
77

8-
// tslint:disable no-any
9-
// tslint:disable completed-docs
8+
// tslint:disable completed-docs no-any
109

1110
class StatBatch extends Batch<IStats, { path: fs.PathLike }> {
1211
public constructor(private readonly proxy: FsModuleProxy) {
@@ -38,7 +37,7 @@ class ReaddirBatch extends Batch<Buffer[] | fs.Dirent[] | string[], { path: fs.P
3837
}
3938
}
4039

41-
interface ClientWatcherProxy extends WatcherProxy, ClientServerProxy {}
40+
interface ClientWatcherProxy extends WatcherProxy, ClientServerProxy<fs.FSWatcher> {}
4241

4342
class Watcher extends ClientProxy<ClientWatcherProxy> implements fs.FSWatcher {
4443
public close(): void {
@@ -50,7 +49,7 @@ class Watcher extends ClientProxy<ClientWatcherProxy> implements fs.FSWatcher {
5049
}
5150
}
5251

53-
interface ClientReadStreamProxy extends ReadStreamProxy, ClientServerProxy {}
52+
interface ClientReadStreamProxy extends ReadStreamProxy, ClientServerProxy<fs.ReadStream> {}
5453

5554
class ReadStream extends Readable<ClientReadStreamProxy> implements fs.ReadStream {
5655
public get bytesRead(): number {
@@ -66,7 +65,7 @@ class ReadStream extends Readable<ClientReadStreamProxy> implements fs.ReadStrea
6665
}
6766
}
6867

69-
interface ClientWriteStreamProxy extends WriteStreamProxy, ClientServerProxy {}
68+
interface ClientWriteStreamProxy extends WriteStreamProxy, ClientServerProxy<fs.WriteStream> {}
7069

7170
class WriteStream extends Writable<ClientWriteStreamProxy> implements fs.WriteStream {
7271
public get bytesWritten(): number {

packages/protocol/src/browser/modules/net.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { Duplex } from "./stream";
66

77
// tslint:disable completed-docs
88

9-
interface ClientNetSocketProxy extends NetSocketProxy, ClientServerProxy {}
9+
interface ClientNetSocketProxy extends NetSocketProxy, ClientServerProxy<net.Socket> {}
1010

1111
export class Socket extends Duplex<ClientNetSocketProxy> implements net.Socket {
1212
private _connecting: boolean = false;
@@ -128,7 +128,7 @@ export class Socket extends Duplex<ClientNetSocketProxy> implements net.Socket {
128128
}
129129
}
130130

131-
interface ClientNetServerProxy extends NetServerProxy, ClientServerProxy {
131+
interface ClientNetServerProxy extends NetServerProxy, ClientServerProxy<net.Server> {
132132
onConnection(cb: (proxy: ClientNetSocketProxy) => void): Promise<void>;
133133
}
134134

packages/protocol/src/browser/modules/stream.ts

+4-9
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import { ClientProxy, ClientServerProxy } from "../../common/proxy";
44
import { isPromise } from "../../common/util";
55
import { DuplexProxy, ReadableProxy, WritableProxy } from "../../node/modules/stream";
66

7-
// tslint:disable completed-docs
7+
// tslint:disable completed-docs no-any
88

9-
export interface ClientWritableProxy extends WritableProxy, ClientServerProxy {}
9+
export interface ClientWritableProxy extends WritableProxy, ClientServerProxy<stream.Writable> {}
1010

1111
export class Writable<T extends ClientWritableProxy = ClientWritableProxy> extends ClientProxy<T> implements stream.Writable {
1212
public get writable(): boolean {
@@ -53,7 +53,6 @@ export class Writable<T extends ClientWritableProxy = ClientWritableProxy> exten
5353
return this.catch(this.proxy.setDefaultEncoding(encoding));
5454
}
5555

56-
// tslint:disable-next-line no-any
5756
public write(chunk: any, encoding?: string | ((error?: Error | null) => void), callback?: (error?: Error | null) => void): boolean {
5857
if (typeof encoding === "function") {
5958
callback = encoding;
@@ -68,7 +67,6 @@ export class Writable<T extends ClientWritableProxy = ClientWritableProxy> exten
6867
return true; // Always true since we can't get this synchronously.
6968
}
7069

71-
// tslint:disable-next-line no-any
7270
public end(data?: any | (() => void), encoding?: string | (() => void), callback?: (() => void)): void {
7371
if (typeof data === "function") {
7472
callback = data;
@@ -91,7 +89,7 @@ export class Writable<T extends ClientWritableProxy = ClientWritableProxy> exten
9189
}
9290
}
9391

94-
export interface ClientReadableProxy extends ReadableProxy, ClientServerProxy {}
92+
export interface ClientReadableProxy extends ReadableProxy, ClientServerProxy<stream.Readable> {}
9593

9694
export class Readable<T extends ClientReadableProxy = ClientReadableProxy> extends ClientProxy<T> implements stream.Readable {
9795
public get readable(): boolean {
@@ -147,7 +145,6 @@ export class Readable<T extends ClientReadableProxy = ClientReadableProxy> exten
147145
}
148146

149147
public pipe<P extends NodeJS.WritableStream>(destination: P, options?: { end?: boolean }): P {
150-
// tslint:disable-next-line no-any this will be a Writable instance.
151148
const writableProxy = (destination as any as Writable).proxyPromise;
152149
if (!writableProxy) {
153150
throw new Error("can only pipe stream proxies");
@@ -161,7 +158,6 @@ export class Readable<T extends ClientReadableProxy = ClientReadableProxy> exten
161158
return destination;
162159
}
163160

164-
// tslint:disable-next-line no-any
165161
public [Symbol.asyncIterator](): AsyncIterableIterator<any> {
166162
throw new Error("not implemented");
167163
}
@@ -180,7 +176,7 @@ export class Readable<T extends ClientReadableProxy = ClientReadableProxy> exten
180176
}
181177
}
182178

183-
export interface ClientDuplexProxy extends DuplexProxy, ClientServerProxy {}
179+
export interface ClientDuplexProxy extends DuplexProxy, ClientServerProxy<stream.Duplex> {}
184180

185181
export class Duplex<T extends ClientDuplexProxy = ClientDuplexProxy> extends Writable<T> implements stream.Duplex, stream.Readable {
186182
private readonly _readable: Readable;
@@ -246,7 +242,6 @@ export class Duplex<T extends ClientDuplexProxy = ClientDuplexProxy> extends Wri
246242
this._readable.unshift();
247243
}
248244

249-
// tslint:disable-next-line no-any
250245
public [Symbol.asyncIterator](): AsyncIterableIterator<any> {
251246
return this._readable[Symbol.asyncIterator]();
252247
}

packages/protocol/src/common/proxy.ts

+85-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { EventEmitter } from "events";
2-
import { isPromise } from "./util";
2+
import { isPromise, EventCallback } from "./util";
33

44
// tslint:disable no-any
55

@@ -64,6 +64,22 @@ export abstract class ClientProxy<T extends ClientServerProxy> extends EventEmit
6464
return this;
6565
}
6666

67+
/**
68+
* Bind the event locally and ensure the event is bound on the server.
69+
*/
70+
public addListener(event: string, listener: (...args: any[]) => void): this {
71+
this.catch(this.proxy.bindDelayedEvent(event));
72+
73+
return super.on(event, listener);
74+
}
75+
76+
/**
77+
* Alias for `addListener`.
78+
*/
79+
public on(event: string, listener: (...args: any[]) => void): this {
80+
return this.addListener(event, listener);
81+
}
82+
6783
/**
6884
* Original promise for the server proxy. Can be used to be passed as an
6985
* argument.
@@ -89,9 +105,9 @@ export abstract class ClientProxy<T extends ClientServerProxy> extends EventEmit
89105
? unpromisify(this._proxyPromise)
90106
: this._proxyPromise;
91107
if (this.bindEvents) {
92-
this.catch(this.proxy.onEvent((event, ...args): void => {
108+
this.proxy.onEvent((event, ...args): void => {
93109
this.emit(event, ...args);
94-
}));
110+
});
95111
}
96112

97113
return this._proxy;
@@ -114,8 +130,27 @@ export abstract class ClientProxy<T extends ClientServerProxy> extends EventEmit
114130
}
115131
}
116132

133+
export interface ServerProxyOptions<T> {
134+
/**
135+
* The events to bind immediately.
136+
*/
137+
bindEvents: string[];
138+
/**
139+
* Events that signal the proxy is done.
140+
*/
141+
doneEvents: string[];
142+
/**
143+
* Events that should only be bound when asked
144+
*/
145+
delayedEvents?: string[];
146+
/**
147+
* Whatever is emitting events (stream, child process, etc).
148+
*/
149+
instance: T;
150+
}
151+
117152
/**
118-
* Proxy to the actual instance on the server. Every method must only accept
153+
* The actual proxy instance on the server. Every method must only accept
119154
* serializable arguments and must return promises with serializable values.
120155
*
121156
* If a proxy itself has proxies on creation (like how ChildProcess has stdin),
@@ -125,17 +160,51 @@ export abstract class ClientProxy<T extends ClientServerProxy> extends EventEmit
125160
* Events listeners are added client-side (since all events automatically
126161
* forward to the client), so onDone and onEvent do not need to be asynchronous.
127162
*/
128-
export interface ServerProxy {
163+
export abstract class ServerProxy<T extends EventEmitter = EventEmitter> {
164+
public readonly instance: T;
165+
166+
private readonly callbacks = <EventCallback[]>[];
167+
168+
public constructor(private readonly options: ServerProxyOptions<T>) {
169+
this.instance = options.instance;
170+
}
171+
129172
/**
130173
* Dispose the proxy.
131174
*/
132-
dispose(): Promise<void>;
175+
public async dispose(): Promise<void> {
176+
this.instance.removeAllListeners();
177+
}
133178

134179
/**
135180
* This is used instead of an event to force it to be implemented since there
136181
* would be no guarantee the implementation would remember to emit the event.
137182
*/
138-
onDone(cb: () => void): void;
183+
public onDone(cb: () => void): void {
184+
this.options.doneEvents.forEach((event) => {
185+
this.instance.on(event, cb);
186+
});
187+
}
188+
189+
/**
190+
* Bind an event that will not fire without first binding it and shouldn't be
191+
* bound immediately.
192+
193+
* For example, binding to `data` switches a stream to flowing mode, so we
194+
* don't want to do it until we're asked. Otherwise something like `pipe`
195+
* won't work because potentially some or all of the data will already have
196+
* been flushed out.
197+
*/
198+
public async bindDelayedEvent(event: string): Promise<void> {
199+
if (this.options.delayedEvents
200+
&& this.options.delayedEvents.includes(event)
201+
&& !this.options.bindEvents.includes(event)) {
202+
this.options.bindEvents.push(event);
203+
this.callbacks.forEach((cb) => {
204+
this.instance.on(event, (...args: any[]) => cb(event, ...args));
205+
});
206+
}
207+
}
139208

140209
/**
141210
* Listen to all possible events. On the client, this is to reduce boilerplate
@@ -147,15 +216,20 @@ export interface ServerProxy {
147216
*
148217
* This cannot be async because then we can bind to the events too late.
149218
*/
150-
// tslint:disable-next-line no-any
151-
onEvent(cb: (event: string, ...args: any[]) => void): void;
219+
public onEvent(cb: EventCallback): void {
220+
this.callbacks.push(cb);
221+
this.options.bindEvents.forEach((event) => {
222+
this.instance.on(event, (...args: any[]) => cb(event, ...args));
223+
});
224+
}
152225
}
153226

154227
/**
155228
* A server-side proxy stored on the client. The proxy ID only exists on the
156-
* client-side version of the server proxy.
229+
* client-side version of the server proxy. The event listeners are handled by
230+
* the client and the remaining methods are proxied to the server.
157231
*/
158-
export interface ClientServerProxy extends ServerProxy {
232+
export interface ClientServerProxy<T extends EventEmitter = EventEmitter> extends ServerProxy<T> {
159233
proxyId: number | Module;
160234
}
161235

packages/protocol/src/common/util.ts

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ export const escapePath = (path: string): string => {
1919
return `'${path.replace(/'/g, "'\\''")}'`;
2020
};
2121

22+
export type EventCallback = (event: string, ...args: any[]) => void;
23+
2224
export type IEncodingOptions = {
2325
encoding?: BufferEncoding | null;
2426
flag?: string;

packages/protocol/src/node/modules/child_process.ts

+21-25
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,35 @@ import { WritableProxy, ReadableProxy } from "./stream";
77

88
export type ForkProvider = (modulePath: string, args?: string[], options?: cp.ForkOptions) => cp.ChildProcess;
99

10-
export class ChildProcessProxy implements ServerProxy {
11-
public constructor(private readonly process: cp.ChildProcess) {}
10+
export class ChildProcessProxy extends ServerProxy<cp.ChildProcess> {
11+
public constructor(instance: cp.ChildProcess) {
12+
super({
13+
bindEvents: ["close", "disconnect", "error", "exit", "message"],
14+
doneEvents: ["close"],
15+
instance,
16+
});
17+
}
1218

1319
public async kill(signal?: string): Promise<void> {
14-
this.process.kill(signal);
20+
this.instance.kill(signal);
1521
}
1622

1723
public async disconnect(): Promise<void> {
18-
this.process.disconnect();
24+
this.instance.disconnect();
1925
}
2026

2127
public async ref(): Promise<void> {
22-
this.process.ref();
28+
this.instance.ref();
2329
}
2430

2531
public async unref(): Promise<void> {
26-
this.process.unref();
32+
this.instance.unref();
2733
}
2834

2935
// tslint:disable-next-line no-any
3036
public async send(message: any): Promise<void> {
3137
return new Promise((resolve, reject): void => {
32-
this.process.send(message, (error) => {
38+
this.instance.send(message, (error) => {
3339
if (error) {
3440
reject(error);
3541
} else {
@@ -40,25 +46,13 @@ export class ChildProcessProxy implements ServerProxy {
4046
}
4147

4248
public async getPid(): Promise<number> {
43-
return this.process.pid;
44-
}
45-
46-
public onDone(cb: () => void): void {
47-
this.process.on("close", cb);
49+
return this.instance.pid;
4850
}
4951

5052
public async dispose(): Promise<void> {
51-
this.process.kill();
52-
setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap.
53-
}
54-
55-
// tslint:disable-next-line no-any
56-
public onEvent(cb: (event: string, ...args: any[]) => void): void {
57-
this.process.on("close", (code, signal) => cb("close", code, signal));
58-
this.process.on("disconnect", () => cb("disconnect"));
59-
this.process.on("error", (error) => cb("error", error));
60-
this.process.on("exit", (exitCode, signal) => cb("exit", exitCode, signal));
61-
this.process.on("message", (message) => cb("message", message));
53+
this.instance.kill();
54+
setTimeout(() => this.instance.kill("SIGKILL"), 5000); // Double tap.
55+
await super.dispose();
6256
}
6357
}
6458

@@ -98,8 +92,10 @@ export class ChildProcessModuleProxy {
9892
return {
9993
childProcess: new ChildProcessProxy(process),
10094
stdin: process.stdin && new WritableProxy(process.stdin),
101-
stdout: process.stdout && new ReadableProxy(process.stdout),
102-
stderr: process.stderr && new ReadableProxy(process.stderr),
95+
// Child processes streams appear to immediately flow so we need to bind
96+
// to the data event right away.
97+
stdout: process.stdout && new ReadableProxy(process.stdout, ["data"]),
98+
stderr: process.stderr && new ReadableProxy(process.stderr, ["data"]),
10399
};
104100
}
105101
}

0 commit comments

Comments
 (0)