Skip to content

Commit 3222297

Browse files
committed
Add stdio sources (#17)
1 parent 1fec316 commit 3222297

File tree

10 files changed

+157
-47
lines changed

10 files changed

+157
-47
lines changed

packages/protocol/src/browser/client.ts

+12-2
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,18 @@ export class Client {
284284
return;
285285
}
286286
const data = new TextDecoder().decode(output.getData_asU8());
287-
const stream = output.getFd() === SessionOutputMessage.FD.STDOUT ? s.stdout : s.stderr;
288-
stream.emit("data", data);
287+
const source = output.getSource();
288+
switch (source) {
289+
case SessionOutputMessage.Source.STDOUT:
290+
case SessionOutputMessage.Source.STDERR:
291+
(source === SessionOutputMessage.Source.STDOUT ? s.stdout : s.stderr).emit("data", data);
292+
break;
293+
case SessionOutputMessage.Source.IPC:
294+
s.emit("message", JSON.parse(data));
295+
break;
296+
default:
297+
throw new Error(`Unknown source ${source}`);
298+
}
289299
} else if (message.hasIdentifySession()) {
290300
const s = this.sessions.get(message.getIdentifySession()!.getId());
291301
if (!s) {

packages/protocol/src/browser/command.ts

+11-7
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ export interface ChildProcess {
2323
readonly pid: number | undefined;
2424

2525
kill(signal?: string): void;
26-
send(message: string | Uint8Array): void;
2726

27+
send(message: string | Uint8Array, ipc?: false): void;
28+
send(message: any, ipc: true): void;
29+
30+
on(event: "message", listener: (data: any) => void): void;
2831
on(event: "error", listener: (err: Error) => void): void;
2932
on(event: "exit", listener: (code: number, signal: string) => void): void;
3033

@@ -45,10 +48,6 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess {
4548
private readonly hasTty: boolean = false,
4649
) {
4750
super();
48-
this.connection.onMessage((message) => {
49-
this.emit("message", message);
50-
});
51-
5251
if (!this.hasTty) {
5352
delete this.resize;
5453
}
@@ -71,10 +70,15 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess {
7170
this._killed = true;
7271
}
7372

74-
public send(message: string | Uint8Array): void {
73+
public send(message: string | Uint8Array | any, ipc: boolean = false): void {
7574
const send = new WriteToSessionMessage();
7675
send.setId(this.id);
77-
send.setData(typeof message === "string" ? new TextEncoder().encode(message) : message);
76+
send.setSource(ipc ? WriteToSessionMessage.Source.IPC : WriteToSessionMessage.Source.STDIN);
77+
if (ipc) {
78+
send.setData(new TextEncoder().encode(JSON.stringify(message)));
79+
} else {
80+
send.setData(typeof message === "string" ? new TextEncoder().encode(message) : message);
81+
}
7882
const client = new ClientMessage();
7983
client.setWriteToSession(send);
8084
this.connection.send(client.serializeBinary());

packages/protocol/src/node/command.ts

+13-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { SendableConnection } from "../common/connection";
88
import { ServerOptions } from "./server";
99

1010
export interface Process {
11+
stdio?: Array<stream.Readable | stream.Writable>;
1112
stdin?: stream.Writable;
1213
stdout?: stream.Readable;
1314
stderr?: stream.Readable;
@@ -69,27 +70,34 @@ export const handleNewSession = (connection: SendableConnection, newSession: New
6970
};
7071
}
7172

72-
const sendOutput = (_fd: SessionOutputMessage.FD, msg: string | Uint8Array): void => {
73+
const sendOutput = (_source: SessionOutputMessage.Source, msg: string | Uint8Array): void => {
7374
const serverMsg = new ServerMessage();
7475
const d = new SessionOutputMessage();
7576
d.setId(newSession.getId());
7677
d.setData(typeof msg === "string" ? new TextEncoder().encode(msg) : msg);
77-
d.setFd(SessionOutputMessage.FD.STDOUT);
78+
d.setSource(_source);
7879
serverMsg.setSessionOutput(d);
7980
connection.send(serverMsg.serializeBinary());
8081
};
8182

8283
if (process.stdout && process.stderr) {
8384
process.stdout.on("data", (data) => {
84-
sendOutput(SessionOutputMessage.FD.STDOUT, data);
85+
sendOutput(SessionOutputMessage.Source.STDOUT, data);
8586
});
8687

8788
process.stderr.on("data", (data) => {
88-
sendOutput(SessionOutputMessage.FD.STDERR, data);
89+
sendOutput(SessionOutputMessage.Source.STDERR, data);
8990
});
9091
} else {
9192
process.on("data", (data) => {
92-
sendOutput(SessionOutputMessage.FD.STDOUT, Buffer.from(data));
93+
sendOutput(SessionOutputMessage.Source.STDOUT, Buffer.from(data));
94+
});
95+
}
96+
97+
if (process.stdio && process.stdio[3]) {
98+
// We have ipc fd
99+
process.stdio[3].on("data", (data) => {
100+
sendOutput(SessionOutputMessage.Source.IPC, data);
93101
});
94102
}
95103

packages/protocol/src/node/server.ts

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import * as os from "os";
22
import * as cp from "child_process";
33
import * as path from "path";
4-
import { mkdir } from "fs";
4+
import { mkdir, WriteStream } from "fs";
55
import { promisify } from "util";
66
import { TextDecoder } from "text-encoding";
77
import { logger, field } from "@coder/logger";
8-
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage } from "../proto";
8+
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto";
99
import { evaluate } from "./evaluate";
1010
import { ReadWriteConnection } from "../common/connection";
1111
import { Process, handleNewSession, handleNewConnection } from "./command";
@@ -120,7 +120,16 @@ export class Server {
120120
if (!s) {
121121
return;
122122
}
123-
s.write(new TextDecoder().decode(message.getWriteToSession()!.getData_asU8()));
123+
const data = new TextDecoder().decode(message.getWriteToSession()!.getData_asU8());
124+
const source = message.getWriteToSession()!.getSource();
125+
if (source === WriteToSessionMessage.Source.IPC) {
126+
if (!s.stdio || !s.stdio[3]) {
127+
throw new Error("Cannot send message via IPC to process without IPC");
128+
}
129+
(s.stdio[3] as WriteStream).write(data);
130+
} else {
131+
s.write(data);
132+
}
124133
} else if (message.hasNewConnection()) {
125134
const socket = handleNewConnection(this.connection, message.getNewConnection()!, () => {
126135
this.connections.delete(message.getNewConnection()!.getId());

packages/protocol/src/proto/command.proto

+8-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ message IdentifySessionMessage {
4545
message WriteToSessionMessage {
4646
uint64 id = 1;
4747
bytes data = 2;
48+
enum Source {
49+
Stdin = 0;
50+
Ipc = 1;
51+
}
52+
Source source = 3;
4853
}
4954

5055
// Resizes the TTY of the session identified by the id.
@@ -67,11 +72,12 @@ message ShutdownSessionMessage {
6772
// SessionOutputMessage carries data read from the stdout or stderr of the session identified by the id.
6873
message SessionOutputMessage {
6974
uint64 id = 1;
70-
enum FD {
75+
enum Source {
7176
Stdout = 0;
7277
Stderr = 1;
78+
Ipc = 2;
7379
}
74-
FD fd = 2;
80+
Source source = 2;
7581
bytes data = 3;
7682
}
7783

packages/protocol/src/proto/command_pb.d.ts

+14-4
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ export class WriteToSessionMessage extends jspb.Message {
144144
getData_asB64(): string;
145145
setData(value: Uint8Array | string): void;
146146

147+
getSource(): WriteToSessionMessage.Source;
148+
setSource(value: WriteToSessionMessage.Source): void;
149+
147150
serializeBinary(): Uint8Array;
148151
toObject(includeInstance?: boolean): WriteToSessionMessage.AsObject;
149152
static toObject(includeInstance: boolean, msg: WriteToSessionMessage): WriteToSessionMessage.AsObject;
@@ -158,6 +161,12 @@ export namespace WriteToSessionMessage {
158161
export type AsObject = {
159162
id: number,
160163
data: Uint8Array | string,
164+
source: WriteToSessionMessage.Source,
165+
}
166+
167+
export enum Source {
168+
STDIN = 0,
169+
IPC = 1,
161170
}
162171
}
163172

@@ -235,8 +244,8 @@ export class SessionOutputMessage extends jspb.Message {
235244
getId(): number;
236245
setId(value: number): void;
237246

238-
getFd(): SessionOutputMessage.FD;
239-
setFd(value: SessionOutputMessage.FD): void;
247+
getSource(): SessionOutputMessage.Source;
248+
setSource(value: SessionOutputMessage.Source): void;
240249

241250
getData(): Uint8Array | string;
242251
getData_asU8(): Uint8Array;
@@ -256,13 +265,14 @@ export class SessionOutputMessage extends jspb.Message {
256265
export namespace SessionOutputMessage {
257266
export type AsObject = {
258267
id: number,
259-
fd: SessionOutputMessage.FD,
268+
source: SessionOutputMessage.Source,
260269
data: Uint8Array | string,
261270
}
262271

263-
export enum FD {
272+
export enum Source {
264273
STDOUT = 0,
265274
STDERR = 1,
275+
IPC = 2,
266276
}
267277
}
268278

packages/protocol/src/proto/command_pb.js

+51-14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)