Skip to content

Minor connections refactor #3178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/vscode/src/vs/base/parts/ipc/common/ipc.net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,11 @@ export class PersistentProtocol implements IMessagePassingProtocol {
}, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5);
}

// NOTE@coder: Set the socket without initiating a reconnect.
public setSocket(socket: ISocket): void {
this._socket = socket;
}

public getSocket(): ISocket {
return this._socket;
}
Expand Down
4 changes: 3 additions & 1 deletion lib/vscode/src/vs/server/entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import { logger } from 'vs/server/node/logger';
import { enableCustomMarketplace } from 'vs/server/node/marketplace';
import { Vscode } from 'vs/server/node/server';

setUnexpectedErrorHandler((error) => logger.warn(error instanceof Error ? error.message : error));
setUnexpectedErrorHandler((error) => {
logger.warn('Uncaught error', field('error', error instanceof Error ? error.message : error));
});
enableCustomMarketplace();
proxyAgent.monkeyPatch(true);

Expand Down
125 changes: 78 additions & 47 deletions lib/vscode/src/vs/server/node/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,49 @@ import * as cp from 'child_process';
import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter } from 'vs/base/common/event';
import { FileAccess } from 'vs/base/common/network';
import { ISocket } from 'vs/base/parts/ipc/common/ipc.net';
import { WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import { INativeEnvironmentService } from 'vs/platform/environment/common/environment';
import { IRemoteExtensionHostStartParams } from 'vs/platform/remote/common/remoteAgentConnection';
import { getNlsConfiguration } from 'vs/server/node/nls';
import { Protocol } from 'vs/server/node/protocol';
import { IExtHostReadyMessage } from 'vs/workbench/services/extensions/common/extensionHostProtocol';

export abstract class Connection {
private readonly _onClose = new Emitter<void>();
/**
* Fire when the connection is closed (not just disconnected). This should
* only happen when the connection is offline and old or has an error.
*/
public readonly onClose = this._onClose.event;
private disposed = false;
private _offline: number | undefined;

public constructor(protected protocol: Protocol, public readonly token: string) {}
protected readonly logger: Logger;

public constructor(
protected readonly protocol: Protocol,
public readonly name: string,
) {
this.logger = logger.named(
this.name,
field('token', this.protocol.options.reconnectionToken),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between " and ' in JavaScript/TypeScript?

Copy link
Member Author

@code-asher code-asher Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None but VS Code's linting rules enforce ' for "unlocalized" strings (well technically externalized strings) so the only time they are allowed is in calls like:

localize('projectQuickPick.help', "TypeScript help")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah very interesting pattern, so they're using something that extracts all "-quoted strings into resource bundles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they might be doing something like that, although if they can already detect and report to the dev when a string should be using " or ' I'm not sure why they can't just use that knowledge to extract it and don't bother the dev. 😛

);

this.logger.debug('Connecting...');
this.onClose(() => this.logger.debug('Closed'));
}

public get offline(): number | undefined {
return this._offline;
}

public reconnect(socket: ISocket, buffer: VSBuffer): void {
public reconnect(protocol: Protocol): void {
this.logger.debug('Reconnecting...');
this._offline = undefined;
this.doReconnect(socket, buffer);
this.doReconnect(protocol);
}

public dispose(): void {
public dispose(reason?: string): void {
this.logger.debug('Disposing...', field('reason', reason));
if (!this.disposed) {
this.disposed = true;
this.doDispose();
Expand All @@ -36,6 +54,7 @@ export abstract class Connection {
}

protected setOffline(): void {
this.logger.debug('Disconnected');
if (!this._offline) {
this._offline = Date.now();
}
Expand All @@ -44,29 +63,34 @@ export abstract class Connection {
/**
* Set up the connection on a new socket.
*/
protected abstract doReconnect(socket: ISocket, buffer: VSBuffer): void;
protected abstract doReconnect(protcol: Protocol): void;

/**
* Dispose/destroy everything permanently.
*/
protected abstract doDispose(): void;
}

/**
* Used for all the IPC channels.
*/
export class ManagementConnection extends Connection {
public constructor(protected protocol: Protocol, token: string) {
super(protocol, token);
public constructor(protocol: Protocol) {
super(protocol, 'management');
protocol.onDidDispose(() => this.dispose()); // Explicit close.
protocol.onSocketClose(() => this.setOffline()); // Might reconnect.
protocol.sendMessage({ type: 'ok' });
}

protected doDispose(): void {
this.protocol.sendDisconnect();
this.protocol.dispose();
this.protocol.getUnderlyingSocket().destroy();
this.protocol.destroy();
}

protected doReconnect(socket: ISocket, buffer: VSBuffer): void {
this.protocol.beginAcceptReconnection(socket, buffer);
protected doReconnect(protocol: Protocol): void {
protocol.sendMessage({ type: 'ok' });
this.protocol.beginAcceptReconnection(protocol.getSocket(), protocol.readEntireBuffer());
this.protocol.endAcceptReconnection();
protocol.dispose();
}
}

Expand All @@ -85,55 +109,62 @@ type ExtHostMessage = DisconnectedMessage | ConsoleMessage | IExtHostReadyMessag

export class ExtensionHostConnection extends Connection {
private process?: cp.ChildProcess;
private readonly logger: Logger;

public constructor(
locale: string, protocol: Protocol, buffer: VSBuffer, token: string,
protocol: Protocol,
private readonly params: IRemoteExtensionHostStartParams,
private readonly environment: INativeEnvironmentService,
) {
super(protocol, token);
this.logger = logger.named('exthost', field('token', token));
this.protocol.dispose();
this.spawn(locale, buffer).then((p) => this.process = p);
this.protocol.getUnderlyingSocket().pause();
super(protocol, 'exthost');

protocol.sendMessage({ debugPort: this.params.port });
const buffer = protocol.readEntireBuffer();
const inflateBytes = protocol.inflateBytes;
protocol.dispose();
protocol.getUnderlyingSocket().pause();

this.spawn(buffer, inflateBytes).then((p) => this.process = p);
}

protected doDispose(): void {
this.protocol.destroy();
if (this.process) {
this.process.kill();
}
this.protocol.getUnderlyingSocket().destroy();
}

protected doReconnect(socket: ISocket, buffer: VSBuffer): void {
// This is just to set the new socket.
this.protocol.beginAcceptReconnection(socket, null);
this.protocol.dispose();
this.sendInitMessage(buffer);
protected doReconnect(protocol: Protocol): void {
protocol.sendMessage({ debugPort: this.params.port });
const buffer = protocol.readEntireBuffer();
const inflateBytes = protocol.inflateBytes;
protocol.dispose();
protocol.getUnderlyingSocket().pause();
this.protocol.setSocket(protocol.getSocket());

this.sendInitMessage(buffer, inflateBytes);
}

private sendInitMessage(buffer: VSBuffer): void {
const socket = this.protocol.getUnderlyingSocket();
socket.pause();
private sendInitMessage(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): void {
if (!this.process) {
throw new Error('Tried to initialize VS Code before spawning');
}

const wrapperSocket = this.protocol.getSocket();
this.logger.debug('Sending socket');

this.logger.trace('Sending socket');
this.process!.send({ // Process must be set at this point.
// TODO: Do something with the debug port.
this.process.send({
type: 'VSCODE_EXTHOST_IPC_SOCKET',
initialDataChunk: Buffer.from(buffer.buffer).toString('base64'),
skipWebSocketFrames: !(wrapperSocket instanceof WebSocketNodeSocket),
skipWebSocketFrames: this.protocol.options.skipWebSocketFrames,
permessageDeflate: this.protocol.options.permessageDeflate,
inflateBytes: wrapperSocket instanceof WebSocketNodeSocket
? Buffer.from(wrapperSocket.recordedInflateBytes.buffer).toString('base64')
: undefined,
}, socket);
inflateBytes: inflateBytes ? Buffer.from(inflateBytes).toString('base64') : undefined,
}, this.protocol.getUnderlyingSocket());
}

private async spawn(locale: string, buffer: VSBuffer): Promise<cp.ChildProcess> {
this.logger.trace('Getting NLS configuration...');
const config = await getNlsConfiguration(locale, this.environment.userDataPath);
this.logger.trace('Spawning extension host...');
private async spawn(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): Promise<cp.ChildProcess> {
this.logger.debug('Getting NLS configuration...');
const config = await getNlsConfiguration(this.params.language, this.environment.userDataPath);
this.logger.debug('Spawning extension host...');
const proc = cp.fork(
FileAccess.asFileUri('bootstrap-fork', require).fsPath,
// While not technically necessary, makes it easier to tell which process
Expand Down Expand Up @@ -162,7 +193,7 @@ export class ExtensionHostConnection extends Connection {
this.dispose();
});
proc.on('exit', (code) => {
this.logger.trace('Exited', field('code', code));
this.logger.debug('Exited', field('code', code));
this.dispose();
});
if (proc.stdout && proc.stderr) {
Expand All @@ -181,20 +212,20 @@ export class ExtensionHostConnection extends Connection {
}
break;
case 'VSCODE_EXTHOST_DISCONNECTED':
this.logger.trace('Going offline');
this.logger.debug('Got disconnected message');
this.setOffline();
break;
case 'VSCODE_EXTHOST_IPC_READY':
this.logger.trace('Got ready message');
this.sendInitMessage(buffer);
this.logger.debug('Handshake completed');
this.sendInitMessage(buffer, inflateBytes);
break;
default:
this.logger.error('Unexpected message', field('event', event));
break;
}
});

this.logger.trace('Waiting for handshake...');
this.logger.debug('Waiting for handshake...');
return proc;
}
}
84 changes: 69 additions & 15 deletions lib/vscode/src/vs/server/node/protocol.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
import { field } from '@coder/logger';
import { field, logger, Logger } from '@coder/logger';
import * as net from 'net';
import { VSBuffer } from 'vs/base/common/buffer';
import { PersistentProtocol } from 'vs/base/parts/ipc/common/ipc.net';
import { NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import { AuthRequest, ConnectionTypeRequest, HandshakeMessage } from 'vs/platform/remote/common/remoteAgentConnection';
import { logger } from 'vs/server/node/logger';

export interface SocketOptions {
/** The token is how we identify and connect to existing sessions. */
readonly reconnectionToken: string;
/** Specifies that the client is trying to reconnect. */
readonly reconnection: boolean;
/** If true assume this is not a web socket (always false for code-server). */
readonly skipWebSocketFrames: boolean;
/** Whether to support compression (web socket only). */
readonly permessageDeflate?: boolean;
/**
* Seed zlib with these bytes (web socket only). If parts of inflating was
* done in a different zlib instance we need to pass all those bytes into zlib
* otherwise the inflate might hit an inflated portion referencing a distance
* too far back.
*/
readonly inflateBytes?: VSBuffer;
readonly recordInflateBytes?: boolean;
}

export class Protocol extends PersistentProtocol {
private readonly logger: Logger;

public constructor(socket: net.Socket, public readonly options: SocketOptions) {
super(
options.skipWebSocketFrames
Expand All @@ -24,9 +34,12 @@ export class Protocol extends PersistentProtocol {
new NodeSocket(socket),
options.permessageDeflate || false,
options.inflateBytes || null,
options.recordInflateBytes || false,
// Always record inflate bytes if using permessage-deflate.
options.permessageDeflate || false,
),
);

this.logger = logger.named('protocol', field('token', this.options.reconnectionToken));
}

public getUnderlyingSocket(): net.Socket {
Expand All @@ -40,31 +53,44 @@ export class Protocol extends PersistentProtocol {
* Perform a handshake to get a connection request.
*/
public handshake(): Promise<ConnectionTypeRequest> {
logger.trace('Protocol handshake', field('token', this.options.reconnectionToken));
this.logger.debug('Initiating handshake...');

return new Promise((resolve, reject) => {
const cleanup = () => {
handler.dispose();
onClose.dispose();
clearTimeout(timeout);
};

const onClose = this.onSocketClose(() => {
cleanup();
this.logger.debug('Handshake failed');
reject(new Error('Protocol socket closed unexpectedly'));
});

const timeout = setTimeout(() => {
logger.error('Handshake timed out', field('token', this.options.reconnectionToken));
reject(new Error('timed out'));
cleanup();
this.logger.debug('Handshake timed out');
reject(new Error('Protocol handshake timed out'));
}, 10000); // Matches the client timeout.

const handler = this.onControlMessage((rawMessage) => {
try {
const raw = rawMessage.toString();
logger.trace('Protocol message', field('token', this.options.reconnectionToken), field('message', raw));
this.logger.trace('Got message', field('message', raw));
const message = JSON.parse(raw);
switch (message.type) {
case 'auth':
return this.authenticate(message);
case 'connectionType':
handler.dispose();
clearTimeout(timeout);
cleanup();
this.logger.debug('Handshake completed');
return resolve(message);
default:
throw new Error('Unrecognized message type');
}
} catch (error) {
handler.dispose();
clearTimeout(timeout);
cleanup();
reject(error);
}
});
Expand All @@ -90,10 +116,38 @@ export class Protocol extends PersistentProtocol {
}

/**
* Send a handshake message. In the case of the extension host, it just sends
* back a debug port.
* Send a handshake message. In the case of the extension host it should just
* send a debug port.
*/
public sendMessage(message: HandshakeMessage | { debugPort?: number } ): void {
public sendMessage(message: HandshakeMessage | { debugPort?: number | null } ): void {
this.sendControl(VSBuffer.fromString(JSON.stringify(message)));
}

/**
* Disconnect and dispose everything including the underlying socket.
*/
public destroy(reason?: string): void {
try {
if (reason) {
this.sendMessage({ type: 'error', reason });
}
// If still connected try notifying the client.
this.sendDisconnect();
} catch (error) {
// I think the write might fail if already disconnected.
this.logger.warn(error.message || error);
}
this.dispose(); // This disposes timers and socket event handlers.
this.getSocket().dispose(); // This will destroy() the socket.
}

/**
* Get inflateBytes from the current socket.
*/
public get inflateBytes(): Uint8Array | undefined {
const socket = this.getSocket();
return socket instanceof WebSocketNodeSocket
? socket.recordedInflateBytes.buffer
: undefined;
}
}
Loading