Skip to content

Commit dad52e3

Browse files
committed
Add commands (#2)
* Add remote command execution * Add tests for environment variables and resize * Fix tab spacing, add newlines * Remove extra newline * Add fork
1 parent 2abbefc commit dad52e3

16 files changed

+744
-109
lines changed

packages/server/package.json

+3
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22
"name": "server",
33
"dependencies": {
44
"express": "^4.16.4",
5+
"node-pty": "^0.8.0",
56
"ws": "^6.1.2"
67
},
78
"devDependencies": {
89
"@types/express": "^4.16.0",
10+
"@types/text-encoding": "^0.0.35",
911
"@types/ws": "^6.0.1",
12+
"text-encoding": "^0.7.0",
1013
"ts-protoc-gen": "^0.8.0"
1114
}
1215
}

packages/server/scripts/generate_proto.sh

100644100755
File mode changed.

packages/server/src/browser/client.ts

+97-7
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
import { ReadWriteConnection } from "../common/connection";
2-
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage } from "../proto";
2+
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage } from "../proto";
33
import { Emitter } from "@coder/events";
44
import { logger, field } from "@coder/logger";
5-
5+
import { ChildProcess, SpawnOptions, ServerProcess } from "./command";
66

77
export class Client {
8-
98
private evalId: number = 0;
109
private evalDoneEmitter: Emitter<EvalDoneMessage> = new Emitter();
1110
private evalFailedEmitter: Emitter<EvalFailedMessage> = new Emitter();
1211

12+
private sessionId: number = 0;
13+
private sessions: Map<number, ServerProcess> = new Map();
14+
1315
public constructor(
1416
private readonly connection: ReadWriteConnection,
1517
) {
@@ -86,20 +88,108 @@ export class Client {
8688
if (failedMsg.getId() === id) {
8789
d1.dispose();
8890
d2.dispose();
89-
91+
9092
rej(failedMsg.getMessage());
9193
}
9294
});
93-
95+
9496
return prom;
9597
}
96-
98+
99+
/**
100+
* Spawns a process from a command. _Somewhat_ reflects the "child_process" API.
101+
* @param command
102+
* @param args Arguments
103+
* @param options Options to execute for the command
104+
*/
105+
public spawn(command: string, args: string[] = [], options?: SpawnOptions): ChildProcess {
106+
return this.doSpawn(command, args, options, false);
107+
}
108+
109+
/**
110+
* Fork a module.
111+
* @param modulePath Path of the module
112+
* @param args Args to add for the module
113+
* @param options Options to execute
114+
*/
115+
public fork(modulePath: string, args: string[] = [], options?: SpawnOptions): ChildProcess {
116+
return this.doSpawn(modulePath, args, options, true);
117+
}
118+
119+
private doSpawn(command: string, args: string[] = [], options?: SpawnOptions, isFork: boolean = false): ChildProcess {
120+
const id = this.sessionId++;
121+
const newSess = new NewSessionMessage();
122+
newSess.setId(id);
123+
newSess.setCommand(command);
124+
newSess.setArgsList(args);
125+
newSess.setIsFork(isFork);
126+
if (options) {
127+
if (options.cwd) {
128+
newSess.setCwd(options.cwd);
129+
}
130+
if (options.env) {
131+
Object.keys(options.env).forEach((envKey) => {
132+
newSess.getEnvMap().set(envKey, options.env![envKey]);
133+
});
134+
}
135+
if (options.tty) {
136+
const tty = new TTYDimensions();
137+
tty.setHeight(options.tty.rows);
138+
tty.setWidth(options.tty.columns);
139+
newSess.setTtyDimensions(tty);
140+
}
141+
}
142+
const clientMsg = new ClientMessage();
143+
clientMsg.setNewSession(newSess);
144+
this.connection.send(clientMsg.serializeBinary());
145+
146+
const serverProc = new ServerProcess(this.connection, id, options ? options.tty !== undefined : false);
147+
serverProc.stdin.on("close", () => {
148+
console.log("stdin closed");
149+
const c = new CloseSessionInputMessage();
150+
c.setId(id);
151+
const cm = new ClientMessage();
152+
cm.setCloseSessionInput(c);
153+
this.connection.send(cm.serializeBinary());
154+
});
155+
this.sessions.set(id, serverProc);
156+
return serverProc;
157+
}
158+
97159
private handleMessage(message: ServerMessage): void {
98160
if (message.hasEvalDone()) {
99161
this.evalDoneEmitter.emit(message.getEvalDone()!);
100162
} else if (message.hasEvalFailed()) {
101163
this.evalFailedEmitter.emit(message.getEvalFailed()!);
164+
} else if (message.hasNewSessionFailure()) {
165+
const s = this.sessions.get(message.getNewSessionFailure()!.getId());
166+
if (!s) {
167+
return;
168+
}
169+
s.emit("error", new Error(message.getNewSessionFailure()!.getMessage()));
170+
this.sessions.delete(message.getNewSessionFailure()!.getId());
171+
} else if (message.hasSessionDone()) {
172+
const s = this.sessions.get(message.getSessionDone()!.getId());
173+
if (!s) {
174+
return;
175+
}
176+
s.emit("exit", message.getSessionDone()!.getExitStatus());
177+
this.sessions.delete(message.getSessionDone()!.getId());
178+
} else if (message.hasSessionOutput()) {
179+
const output = message.getSessionOutput()!;
180+
const s = this.sessions.get(output.getId());
181+
if (!s) {
182+
return;
183+
}
184+
const data = new TextDecoder().decode(output.getData_asU8());
185+
const stream = output.getFd() === SessionOutputMessage.FD.STDOUT ? s.stdout : s.stderr;
186+
stream.emit("data", data);
187+
} else if (message.hasIdentifySession()) {
188+
const s = this.sessions.get(message.getIdentifySession()!.getId());
189+
if (!s) {
190+
return;
191+
}
192+
s.pid = message.getIdentifySession()!.getPid();
102193
}
103194
}
104-
105195
}
+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import * as events from "events";
2+
import * as stream from "stream";
3+
import { SendableConnection } from "../common/connection";
4+
import { ShutdownSessionMessage, ClientMessage, SessionOutputMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions } from "../proto";
5+
6+
export interface TTYDimensions {
7+
readonly columns: number;
8+
readonly rows: number;
9+
}
10+
11+
export interface SpawnOptions {
12+
cwd?: string;
13+
env?: { readonly [key: string]: string };
14+
tty?: TTYDimensions;
15+
}
16+
17+
export interface ChildProcess {
18+
readonly stdin: stream.Writable;
19+
readonly stdout: stream.Readable;
20+
readonly stderr: stream.Readable;
21+
22+
readonly killed?: boolean;
23+
readonly pid: number | undefined;
24+
25+
kill(signal?: string): void;
26+
send(message: string | Uint8Array): void;
27+
28+
on(event: "error", listener: (err: Error) => void): void;
29+
on(event: "exit", listener: (code: number, signal: string) => void): void;
30+
31+
resize?(dimensions: TTYDimensions): void;
32+
}
33+
34+
export class ServerProcess extends events.EventEmitter implements ChildProcess {
35+
public readonly stdin = new stream.Writable();
36+
public readonly stdout = new stream.Readable({ read: () => true });
37+
public readonly stderr = new stream.Readable({ read: () => true });
38+
public pid: number | undefined;
39+
40+
private _killed: boolean = false;
41+
42+
public constructor(
43+
private readonly connection: SendableConnection,
44+
private readonly id: number,
45+
private readonly hasTty: boolean = false,
46+
) {
47+
super();
48+
49+
if (!this.hasTty) {
50+
delete this.resize;
51+
}
52+
}
53+
54+
public get killed(): boolean {
55+
return this._killed;
56+
}
57+
58+
public kill(signal?: string): void {
59+
const kill = new ShutdownSessionMessage();
60+
kill.setId(this.id);
61+
if (signal) {
62+
kill.setSignal(signal);
63+
}
64+
const client = new ClientMessage();
65+
client.setShutdownSession(kill);
66+
this.connection.send(client.serializeBinary());
67+
68+
this._killed = true;
69+
}
70+
71+
public send(message: string | Uint8Array): void {
72+
const send = new WriteToSessionMessage();
73+
send.setId(this.id);
74+
send.setData(typeof message === "string" ? new TextEncoder().encode(message) : message);
75+
const client = new ClientMessage();
76+
client.setWriteToSession(send);
77+
this.connection.send(client.serializeBinary());
78+
}
79+
80+
public resize(dimensions: TTYDimensions) {
81+
const resize = new ResizeSessionTTYMessage();
82+
resize.setId(this.id);
83+
const tty = new ProtoTTYDimensions();
84+
tty.setHeight(dimensions.rows);
85+
tty.setWidth(dimensions.columns);
86+
resize.setTtyDimensions(tty);
87+
const client = new ClientMessage();
88+
client.setResizeSessionTty(resize);
89+
this.connection.send(client.serializeBinary());
90+
}
91+
}

packages/server/src/node/command.ts

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import * as cp from "child_process";
2+
import * as nodePty from "node-pty";
3+
import * as stream from "stream";
4+
import { TextEncoder } from "text-encoding";
5+
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, ShutdownSessionMessage, IdentifySessionMessage, ClientMessage } from "../proto";
6+
import { SendableConnection } from "../common/connection";
7+
8+
export interface Process {
9+
stdin?: stream.Writable;
10+
stdout?: stream.Readable;
11+
stderr?: stream.Readable;
12+
13+
pid: number;
14+
killed?: boolean;
15+
16+
on(event: "data", cb: (data: string) => void): void;
17+
on(event: 'exit', listener: (exitCode: number, signal?: number) => void): void;
18+
write(data: string | Uint8Array): void;
19+
resize?(cols: number, rows: number): void;
20+
kill(signal?: string): void;
21+
title?: number;
22+
}
23+
24+
export const handleNewSession = (connection: SendableConnection, newSession: NewSessionMessage, onExit: () => void): Process => {
25+
let process: Process;
26+
27+
const env = {} as any;
28+
newSession.getEnvMap().forEach((value: any, key: any) => {
29+
env[key] = value;
30+
});
31+
if (newSession.getTtyDimensions()) {
32+
// Spawn with node-pty
33+
process = nodePty.spawn(newSession.getCommand(), newSession.getArgsList(), {
34+
cols: newSession.getTtyDimensions()!.getWidth(),
35+
rows: newSession.getTtyDimensions()!.getHeight(),
36+
cwd: newSession.getCwd(),
37+
env,
38+
});
39+
} else {
40+
const options = {
41+
cwd: newSession.getCwd(),
42+
env,
43+
};
44+
let proc: cp.ChildProcess;
45+
if (newSession.getIsFork()) {
46+
proc = cp.fork(newSession.getCommand(), newSession.getArgsList());
47+
} else {
48+
proc = cp.spawn(newSession.getCommand(), newSession.getArgsList(), options);
49+
}
50+
51+
process = {
52+
stdin: proc.stdin,
53+
stderr: proc.stderr,
54+
stdout: proc.stdout,
55+
on: (...args: any[]) => (<any>proc.on)(...args),
56+
write: (d) => proc.stdin.write(d),
57+
kill: (s) => proc.kill(s || "SIGTERM"),
58+
pid: proc.pid,
59+
};
60+
}
61+
62+
const sendOutput = (fd: SessionOutputMessage.FD, msg: string | Uint8Array): void => {
63+
const serverMsg = new ServerMessage();
64+
const d = new SessionOutputMessage();
65+
d.setId(newSession.getId());
66+
d.setData(typeof msg === "string" ? new TextEncoder().encode(msg) : msg);
67+
d.setFd(SessionOutputMessage.FD.STDOUT);
68+
serverMsg.setSessionOutput(d);
69+
connection.send(serverMsg.serializeBinary());
70+
};
71+
72+
if (process.stdout && process.stderr) {
73+
process.stdout.on("data", (data) => {
74+
sendOutput(SessionOutputMessage.FD.STDOUT, data);
75+
});
76+
77+
process.stderr.on("data", (data) => {
78+
sendOutput(SessionOutputMessage.FD.STDERR, data);
79+
});
80+
} else {
81+
process.on("data", (data) => {
82+
sendOutput(SessionOutputMessage.FD.STDOUT, Buffer.from(data));
83+
});
84+
}
85+
86+
const id = new IdentifySessionMessage();
87+
id.setId(newSession.getId());
88+
id.setPid(process.pid);
89+
const sm = new ServerMessage();
90+
sm.setIdentifySession(id);
91+
connection.send(sm.serializeBinary());
92+
93+
process.on("exit", (code, signal) => {
94+
const serverMsg = new ServerMessage();
95+
const exit = new SessionDoneMessage();
96+
exit.setId(newSession.getId());
97+
exit.setExitStatus(code);
98+
serverMsg.setSessionDone(exit);
99+
connection.send(serverMsg.serializeBinary());
100+
101+
onExit();
102+
});
103+
104+
return process;
105+
};

0 commit comments

Comments
 (0)