Skip to content

Convert fully to protobuf (was partially JSON) #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 75 additions & 58 deletions packages/protocol/src/browser/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { Emitter } from "@coder/events";
import { logger, field } from "@coder/logger";
import { ReadWriteConnection, InitData, SharedProcessData } from "../common/connection";
import { Module, ServerProxy } from "../common/proxy";
import { stringify, parse, moduleToProto, protoToModule, protoToOperatingSystem } from "../common/util";
import { Ping, ServerMessage, ClientMessage, MethodMessage, NamedProxyMessage, NumberedProxyMessage, SuccessMessage, FailMessage, EventMessage, CallbackMessage } from "../proto";
import { argumentToProto, protoToArgument, moduleToProto, protoToModule, protoToOperatingSystem } from "../common/util";
import { Argument, Ping, ServerMessage, ClientMessage, Method, Event, Callback } from "../proto";
import { FsModule, ChildProcessModule, NetModule, NodePtyModule, SpdlogModule, TrashModule } from "./modules";

// tslint:disable no-any
Expand All @@ -24,8 +24,8 @@ export class Client {
private messageId = 0;
private callbackId = 0;
private readonly proxies = new Map<number | Module, ProxyData>();
private readonly successEmitter = new Emitter<SuccessMessage>();
private readonly failEmitter = new Emitter<FailMessage>();
private readonly successEmitter = new Emitter<Method.Success>();
private readonly failEmitter = new Emitter<Method.Fail>();
private readonly eventEmitter = new Emitter<{ event: string; args: any[]; }>();

private _initData: InitData | undefined;
Expand Down Expand Up @@ -129,9 +129,9 @@ export class Client {
field("event listeners", this.eventEmitter.counts),
]);

const message = new FailMessage();
const message = new Method.Fail();
const error = new Error("disconnected");
message.setResponse(stringify(error));
message.setResponse(argumentToProto(error));
this.failEmitter.emit(message);

this.eventEmitter.emit({ event: "disconnected", args: [error] });
Expand Down Expand Up @@ -182,20 +182,21 @@ export class Client {
case "kill":
return Promise.resolve();
}

return Promise.reject(
new Error(`Unable to call "${method}" on proxy ${proxyId}: disconnected`),
);
}

const message = new MethodMessage();
const message = new Method();
const id = this.messageId++;
let proxyMessage: NamedProxyMessage | NumberedProxyMessage;
let proxyMessage: Method.Named | Method.Numbered;
if (typeof proxyId === "string") {
proxyMessage = new NamedProxyMessage();
proxyMessage = new Method.Named();
proxyMessage.setModule(moduleToProto(proxyId));
message.setNamedProxy(proxyMessage);
} else {
proxyMessage = new NumberedProxyMessage();
proxyMessage = new Method.Numbered();
proxyMessage.setProxyId(proxyId);
message.setNumberedProxy(proxyMessage);
}
Expand All @@ -215,16 +216,14 @@ export class Client {
return callbackId;
};

const stringifiedArgs = args.map((a) => stringify(a, storeCallback));
logger.trace(() => [
"sending",
field("id", id),
field("proxyId", proxyId),
field("method", method),
field("args", stringifiedArgs),
]);

proxyMessage.setArgsList(stringifiedArgs);
proxyMessage.setArgsList(args.map((a) => argumentToProto(a, storeCallback)));

const clientMessage = new ClientMessage();
clientMessage.setMethod(message);
Expand All @@ -246,12 +245,12 @@ export class Client {

const d1 = this.successEmitter.event(id, (message) => {
dispose();
resolve(this.parse(message.getResponse(), promise));
resolve(this.protoToArgument(message.getResponse(), promise));
});

const d2 = this.failEmitter.event(id, (message) => {
dispose();
reject(parse(message.getResponse()));
reject(protoToArgument(message.getResponse()));
});
});

Expand All @@ -262,42 +261,53 @@ export class Client {
* Handle all messages from the server.
*/
private async handleMessage(message: ServerMessage): Promise<void> {
if (message.hasInit()) {
const init = message.getInit()!;
this._initData = {
dataDirectory: init.getDataDirectory(),
homeDirectory: init.getHomeDirectory(),
tmpDirectory: init.getTmpDirectory(),
workingDirectory: init.getWorkingDirectory(),
os: protoToOperatingSystem(init.getOperatingSystem()),
shell: init.getShell(),
builtInExtensionsDirectory: init.getBuiltinExtensionsDir(),
};
this.initDataEmitter.emit(this._initData);
} else if (message.hasSuccess()) {
this.emitSuccess(message.getSuccess()!);
} else if (message.hasFail()) {
this.emitFail(message.getFail()!);
} else if (message.hasEvent()) {
await this.emitEvent(message.getEvent()!);
} else if (message.hasCallback()) {
await this.runCallback(message.getCallback()!);
} else if (message.hasSharedProcessActive()) {
const sharedProcessActiveMessage = message.getSharedProcessActive()!;
this.sharedProcessActiveEmitter.emit({
socketPath: sharedProcessActiveMessage.getSocketPath(),
logPath: sharedProcessActiveMessage.getLogPath(),
});
} else if (message.hasPong()) {
// Nothing to do since pings are on a timer rather than waiting for the
// next pong in case a message from either the client or server is dropped
// which would break the ping cycle.
} else {
throw new Error("unknown message type");
switch (message.getMsgCase()) {
case ServerMessage.MsgCase.INIT:
const init = message.getInit()!;
this._initData = {
dataDirectory: init.getDataDirectory(),
homeDirectory: init.getHomeDirectory(),
tmpDirectory: init.getTmpDirectory(),
workingDirectory: init.getWorkingDirectory(),
os: protoToOperatingSystem(init.getOperatingSystem()),
shell: init.getShell(),
builtInExtensionsDirectory: init.getBuiltinExtensionsDir(),
};
this.initDataEmitter.emit(this._initData);
break;
case ServerMessage.MsgCase.SUCCESS:
this.emitSuccess(message.getSuccess()!);
break;
case ServerMessage.MsgCase.FAIL:
this.emitFail(message.getFail()!);
break;
case ServerMessage.MsgCase.EVENT:
await this.emitEvent(message.getEvent()!);
break;
case ServerMessage.MsgCase.CALLBACK:
await this.runCallback(message.getCallback()!);
break;
case ServerMessage.MsgCase.SHARED_PROCESS_ACTIVE:
const sharedProcessActiveMessage = message.getSharedProcessActive()!;
this.sharedProcessActiveEmitter.emit({
socketPath: sharedProcessActiveMessage.getSocketPath(),
logPath: sharedProcessActiveMessage.getLogPath(),
});
break;
case ServerMessage.MsgCase.PONG:
// Nothing to do since pings are on a timer rather than waiting for the
// next pong in case a message from either the client or server is dropped
// which would break the ping cycle.
break;
default:
throw new Error("unknown message type");
}
}

private emitSuccess(message: SuccessMessage): void {
/**
* Convert message to a success event.
*/
private emitSuccess(message: Method.Success): void {
logger.trace(() => [
"received resolve",
field("id", message.getId()),
Expand All @@ -306,7 +316,10 @@ export class Client {
this.successEmitter.emit(message.getId(), message);
}

private emitFail(message: FailMessage): void {
/**
* Convert message to a fail event.
*/
private emitFail(message: Method.Fail): void {
logger.trace(() => [
"received reject",
field("id", message.getId()),
Expand All @@ -322,7 +335,7 @@ export class Client {
* request before it emits. Instead, emit all events from the server so all
* events are always caught on the client.
*/
private async emitEvent(message: EventMessage): Promise<void> {
private async emitEvent(message: Event): Promise<void> {
const eventMessage = message.getNamedEvent()! || message.getNumberedEvent()!;
const proxyId = message.getNamedEvent()
? protoToModule(message.getNamedEvent()!.getModule())
Expand All @@ -333,10 +346,9 @@ export class Client {
"received event",
field("proxyId", proxyId),
field("event", event),
field("args", eventMessage.getArgsList()),
]);

const args = eventMessage.getArgsList().map((a) => this.parse(a));
const args = eventMessage.getArgsList().map((a) => this.protoToArgument(a));
this.eventEmitter.emit(proxyId, { event, args });
}

Expand All @@ -348,7 +360,7 @@ export class Client {
* also only be used when passed together with the method. If they are sent
* afterward, they may never be called due to timing issues.
*/
private async runCallback(message: CallbackMessage): Promise<void> {
private async runCallback(message: Callback): Promise<void> {
const callbackMessage = message.getNamedCallback()! || message.getNumberedCallback()!;
const proxyId = message.getNamedCallback()
? protoToModule(message.getNamedCallback()!.getModule())
Expand All @@ -359,16 +371,15 @@ export class Client {
"running callback",
field("proxyId", proxyId),
field("callbackId", callbackId),
field("args", callbackMessage.getArgsList()),
]);
const args = callbackMessage.getArgsList().map((a) => this.parse(a));
const args = callbackMessage.getArgsList().map((a) => this.protoToArgument(a));
this.getProxy(proxyId).callbacks.get(callbackId)!(...args);
}

/**
* Start the ping loop. Does nothing if already pinging.
*/
private startPinging = (): void => {
private readonly startPinging = (): void => {
if (typeof this.pingTimeout !== "undefined") {
return;
}
Expand Down Expand Up @@ -505,10 +516,16 @@ export class Client {
await this.getProxy(proxyId).promise;
}

private parse(value?: string, promise?: Promise<any>): any {
return parse(value, undefined, (id) => this.createProxy(id, promise));
/**
* Same as protoToArgument except provides createProxy.
*/
private protoToArgument(value?: Argument, promise?: Promise<any>): any {
return protoToArgument(value, undefined, (id) => this.createProxy(id, promise));
}

/**
* Get a proxy. Error if it doesn't exist.
*/
private getProxy(proxyId: number | Module): ProxyData {
if (!this.proxies.has(proxyId)) {
throw new Error(`proxy ${proxyId} disposed too early`);
Expand Down
17 changes: 11 additions & 6 deletions packages/protocol/src/browser/modules/child_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { ClientProxy } from "../../common/proxy";
import { ChildProcessModuleProxy, ChildProcessProxy, ChildProcessProxies } from "../../node/modules/child_process";
import { Readable, Writable } from "./stream";

// tslint:disable completed-docs

export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.ChildProcess {
public readonly stdin: stream.Writable;
public readonly stdout: stream.Readable;
Expand All @@ -23,10 +25,10 @@ export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.C
this.stderr = new Readable(proxyPromises.then((p) => p.stderr!));
this.stdio = [this.stdin, this.stdout, this.stderr];

this.proxy.getPid().then((pid) => {
this.catch(this.proxy.getPid().then((pid) => {
this._pid = pid;
this._connected = true;
});
}));
this.on("disconnect", () => this._connected = false);
this.on("exit", () => {
this._connected = false;
Expand All @@ -48,19 +50,19 @@ export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.C

public kill(): void {
this._killed = true;
this.proxy.kill();
this.catch(this.proxy.kill());
}

public disconnect(): void {
this.proxy.disconnect();
this.catch(this.proxy.disconnect());
}

public ref(): void {
this.proxy.ref();
this.catch(this.proxy.ref());
}

public unref(): void {
this.proxy.unref();
this.catch(this.proxy.unref());
}

public send(
Expand Down Expand Up @@ -88,6 +90,9 @@ export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.C
return true; // Always true since we can't get this synchronously.
}

/**
* Exit and close the process when disconnected.
*/
protected handleDisconnect(): void {
this.emit("exit", 1);
this.emit("close");
Expand Down
5 changes: 3 additions & 2 deletions packages/protocol/src/browser/modules/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { FsModuleProxy, Stats as IStats, WatcherProxy, WriteStreamProxy } from "
import { Writable } from "./stream";

// tslint:disable no-any
// tslint:disable completed-docs

class StatBatch extends Batch<IStats, { path: fs.PathLike }> {
public constructor(private readonly proxy: FsModuleProxy) {
Expand Down Expand Up @@ -39,7 +40,7 @@ class ReaddirBatch extends Batch<Buffer[] | fs.Dirent[] | string[], { path: fs.P

class Watcher extends ClientProxy<WatcherProxy> implements fs.FSWatcher {
public close(): void {
this.proxy.close();
this.catch(this.proxy.close());
}

protected handleDisconnect(): void {
Expand All @@ -57,7 +58,7 @@ class WriteStream extends Writable<WriteStreamProxy> implements fs.WriteStream {
}

public close(): void {
this.proxy.close();
this.catch(this.proxy.close());
}
}

Expand Down
Loading