Skip to content

Commit 7ca010a

Browse files
committed
Add createServer (#18)
1 parent 2dced39 commit 7ca010a

File tree

11 files changed

+1671
-50
lines changed

11 files changed

+1671
-50
lines changed

packages/protocol/src/browser/client.ts

+43-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { ReadWriteConnection, InitData, OperatingSystem, ISharedProcessData } from "../common/connection";
2-
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, NewConnectionMessage } from "../proto";
2+
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, NewConnectionMessage, NewServerMessage } from "../proto";
33
import { Emitter, Event } from "@coder/events";
44
import { logger, field } from "@coder/logger";
5-
import { ChildProcess, SpawnOptions, ServerProcess, ServerSocket, Socket } from "./command";
5+
import { ChildProcess, SpawnOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server } from "./command";
66

77
/**
88
* Client accepts an arbitrary connection intended to communicate with the Server.
@@ -18,6 +18,9 @@ export class Client {
1818
private connectionId: number = 0;
1919
private readonly connections: Map<number, ServerSocket> = new Map();
2020

21+
private serverId: number = 0;
22+
private readonly servers: Map<number, ServerListener> = new Map();
23+
2124
private _initData: InitData | undefined;
2225
private initDataEmitter = new Emitter<InitData>();
2326
private initDataPromise: Promise<InitData>;
@@ -189,6 +192,14 @@ export class Client {
189192
return socket;
190193
}
191194

195+
public createServer(callback?: () => void): Server {
196+
const id = this.serverId++;
197+
const server = new ServerListener(this.connection, id, callback);
198+
this.servers.set(id, server);
199+
200+
return server;
201+
}
202+
192203
private doSpawn(command: string, args: string[] = [], options?: SpawnOptions, isFork: boolean = false, isBootstrapFork: boolean = true): ChildProcess {
193204
const id = this.sessionId++;
194205
const newSess = new NewSessionMessage();
@@ -333,6 +344,36 @@ export class Client {
333344
this.sharedProcessActiveEmitter.emit({
334345
socketPath: message.getSharedProcessActive()!.getSocketPath(),
335346
});
347+
} else if (message.hasServerEstablished()) {
348+
const s = this.servers.get(message.getServerEstablished()!.getId());
349+
if (!s) {
350+
return;
351+
}
352+
s.emit("connect");
353+
} else if (message.hasServerConnectionEstablished()) {
354+
const s = this.servers.get(message.getServerConnectionEstablished()!.getServerId());
355+
if (!s) {
356+
return;
357+
}
358+
const conId = message.getServerConnectionEstablished()!.getConnectionId();
359+
const serverSocket = new ServerSocket(this.connection, conId);
360+
serverSocket.emit("connect");
361+
this.connections.set(conId, serverSocket);
362+
s.emit("connection", serverSocket);
363+
} else if (message.getServerFailure()) {
364+
const s = this.servers.get(message.getServerFailure()!.getId());
365+
if (!s) {
366+
return;
367+
}
368+
s.emit("error", new Error(message.getNewSessionFailure()!.getReason().toString()));
369+
this.servers.delete(message.getNewSessionFailure()!.getId());
370+
} else if (message.hasServerClose()) {
371+
const s = this.servers.get(message.getServerClose()!.getId());
372+
if (!s) {
373+
return;
374+
}
375+
s.emit("close");
376+
this.servers.delete(message.getServerClose()!.getId());
336377
}
337378
}
338379
}

packages/protocol/src/browser/command.ts

+81-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as events from "events";
22
import * as stream from "stream";
33
import { ReadWriteConnection } from "../common/connection";
4-
import { ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage } from "../proto";
4+
import { ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage, ServerCloseMessage, NewServerMessage } from "../proto";
55

66
export interface TTYDimensions {
77
readonly columns: number;
@@ -237,3 +237,83 @@ export class ServerSocket extends events.EventEmitter implements Socket {
237237
throw new Error("Method not implemented.");
238238
}
239239
}
240+
241+
export interface Server {
242+
addListener(event: "close", listener: () => void): this;
243+
addListener(event: "connect", listener: (socket: Socket) => void): this;
244+
addListener(event: "error", listener: (err: Error) => void): this;
245+
246+
on(event: "close", listener: () => void): this;
247+
on(event: "connection", listener: (socket: Socket) => void): this;
248+
on(event: "error", listener: (err: Error) => void): this;
249+
250+
once(event: "close", listener: () => void): this;
251+
once(event: "connection", listener: (socket: Socket) => void): this;
252+
once(event: "error", listener: (err: Error) => void): this;
253+
254+
removeListener(event: "close", listener: () => void): this;
255+
removeListener(event: "connection", listener: (socket: Socket) => void): this;
256+
removeListener(event: "error", listener: (err: Error) => void): this;
257+
258+
emit(event: "close"): boolean;
259+
emit(event: "connection"): boolean;
260+
emit(event: "error"): boolean;
261+
262+
listen(path: string, listeningListener?: () => void): this;
263+
close(callback?: () => void): this;
264+
265+
readonly listening: boolean;
266+
}
267+
268+
export class ServerListener extends events.EventEmitter implements Server {
269+
private _listening: boolean = false;
270+
271+
public constructor(
272+
private readonly connection: ReadWriteConnection,
273+
private readonly id: number,
274+
connectCallback?: () => void,
275+
) {
276+
super();
277+
278+
this.on("connect", () => {
279+
this._listening = true;
280+
if (connectCallback) {
281+
connectCallback();
282+
}
283+
});
284+
}
285+
286+
public get listening(): boolean {
287+
return this._listening;
288+
}
289+
290+
public listen(path: string, listener?: () => void): this {
291+
const ns = new NewServerMessage();
292+
ns.setId(this.id);
293+
ns.setPath(path!);
294+
const cm = new ClientMessage();
295+
cm.setNewServer(ns);
296+
this.connection.send(cm.serializeBinary());
297+
298+
if (typeof listener !== "undefined") {
299+
this.once("connect", listener);
300+
}
301+
302+
return this;
303+
}
304+
305+
public close(callback?: Function | undefined): this {
306+
const closeMsg = new ServerCloseMessage();
307+
closeMsg.setId(this.id);
308+
closeMsg.setReason("Manually closed");
309+
const clientMsg = new ClientMessage();
310+
clientMsg.setServerClose(closeMsg);
311+
this.connection.send(clientMsg.serializeBinary());
312+
313+
if (callback) {
314+
callback();
315+
}
316+
317+
return this;
318+
}
319+
}

packages/protocol/src/node/command.ts

+46-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import * as net from "net";
33
import * as nodePty from "node-pty";
44
import * as stream from "stream";
55
import { TextEncoder } from "text-encoding";
6-
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, IdentifySessionMessage, NewConnectionMessage, ConnectionEstablishedMessage, NewConnectionFailureMessage, ConnectionCloseMessage, ConnectionOutputMessage } from "../proto";
6+
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, IdentifySessionMessage, NewConnectionMessage, ConnectionEstablishedMessage, NewConnectionFailureMessage, ConnectionCloseMessage, ConnectionOutputMessage, NewServerMessage, ServerEstablishedMessage, NewServerFailureMessage, ServerCloseMessage, ServerConnectionEstablishedMessage } from "../proto";
77
import { SendableConnection } from "../common/connection";
88
import { ServerOptions } from "./server";
99

@@ -180,3 +180,48 @@ export const handleNewConnection = (connection: SendableConnection, newConnectio
180180

181181
return socket;
182182
};
183+
184+
export const handleNewServer = (connection: SendableConnection, newServer: NewServerMessage, addSocket: (socket: net.Socket) => number, onExit: () => void): net.Server => {
185+
const s = net.createServer();
186+
187+
try {
188+
s.listen(newServer.getPath() ? newServer.getPath() : newServer.getPort(), () => {
189+
const se = new ServerEstablishedMessage();
190+
se.setId(newServer.getId());
191+
const sm = new ServerMessage();
192+
sm.setServerEstablished(se);
193+
connection.send(sm.serializeBinary());
194+
});
195+
} catch (ex) {
196+
const sf = new NewServerFailureMessage();
197+
sf.setId(newServer.getId());
198+
const sm = new ServerMessage();
199+
sm.setServerFailure(sf);
200+
connection.send(sm.serializeBinary());
201+
202+
onExit();
203+
}
204+
205+
s.on("close", () => {
206+
const sc = new ServerCloseMessage();
207+
sc.setId(newServer.getId());
208+
const sm = new ServerMessage();
209+
sm.setServerClose(sc);
210+
connection.send(sm.serializeBinary());
211+
212+
onExit();
213+
});
214+
215+
s.on("connection", (socket) => {
216+
const socketId = addSocket(socket);
217+
218+
const sock = new ServerConnectionEstablishedMessage();
219+
sock.setServerId(newServer.getId());
220+
sock.setConnectionId(socketId);
221+
const sm = new ServerMessage();
222+
sm.setServerConnectionEstablished(sock);
223+
connection.send(sm.serializeBinary());
224+
});
225+
226+
return s;
227+
};

packages/protocol/src/node/server.ts

+23-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { logger, field } from "@coder/logger";
88
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto";
99
import { evaluate } from "./evaluate";
1010
import { ReadWriteConnection } from "../common/connection";
11-
import { Process, handleNewSession, handleNewConnection } from "./command";
11+
import { Process, handleNewSession, handleNewConnection, handleNewServer } from "./command";
1212
import * as net from "net";
1313

1414
export interface ServerOptions {
@@ -22,6 +22,9 @@ export class Server {
2222

2323
private readonly sessions: Map<number, Process> = new Map();
2424
private readonly connections: Map<number, net.Socket> = new Map();
25+
private readonly servers: Map<number, net.Server> = new Map();
26+
27+
private connectionId: number = Number.MAX_SAFE_INTEGER;
2528

2629
public constructor(
2730
private readonly connection: ReadWriteConnection,
@@ -147,9 +150,28 @@ export class Server {
147150
return;
148151
}
149152
c.end();
153+
} else if (message.hasNewServer()) {
154+
const s = handleNewServer(this.connection, message.getNewServer()!, (socket) => {
155+
const id = this.connectionId--;
156+
this.connections.set(id, socket);
157+
return id;
158+
}, () => {
159+
this.connections.delete(message.getNewServer()!.getId());
160+
});
161+
this.servers.set(message.getNewServer()!.getId(), s);
162+
} else if (message.hasServerClose()) {
163+
const s = this.getServer(message.getServerClose()!.getId());
164+
if (!s) {
165+
return;
166+
}
167+
s.close();
150168
}
151169
}
152170

171+
private getServer(id: number): net.Server | undefined {
172+
return this.servers.get(id);
173+
}
174+
153175
private getConnection(id: number): net.Socket | undefined {
154176
return this.connections.get(id);
155177
}

packages/protocol/src/proto/client.proto

+11-5
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ message ClientMessage {
1414
NewConnectionMessage new_connection = 6;
1515
ConnectionOutputMessage connection_output = 7;
1616
ConnectionCloseMessage connection_close = 8;
17+
NewServerMessage new_server = 9;
18+
ServerCloseMessage server_close = 10;
1719

1820
// node.proto
19-
NewEvalMessage new_eval = 9;
21+
NewEvalMessage new_eval = 11;
2022
}
2123
}
2224

@@ -31,15 +33,19 @@ message ServerMessage {
3133
ConnectionOutputMessage connection_output = 6;
3234
ConnectionCloseMessage connection_close = 7;
3335
ConnectionEstablishedMessage connection_established = 8;
36+
NewServerFailureMessage server_failure = 9;
37+
ServerEstablishedMessage server_established = 10;
38+
ServerCloseMessage server_close = 11;
39+
ServerConnectionEstablishedMessage server_connection_established = 12;
3440

3541
// node.proto
36-
EvalFailedMessage eval_failed = 9;
37-
EvalDoneMessage eval_done = 10;
42+
EvalFailedMessage eval_failed = 13;
43+
EvalDoneMessage eval_done = 14;
3844

39-
WorkingInitMessage init = 11;
45+
WorkingInitMessage init = 15;
4046

4147
// vscode.proto
42-
SharedProcessActiveMessage shared_process_active = 12;
48+
SharedProcessActiveMessage shared_process_active = 16;
4349
}
4450
}
4551

0 commit comments

Comments
 (0)