Skip to content

Restart discovery after re-initializing client. #1167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions arduino-ide-extension/src/node/arduino-ide-backend-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => {

// Shared port/board discovery for the server
bind(BoardDiscovery).toSelf().inSingletonScope();
bind(BackendApplicationContribution).toService(BoardDiscovery);

// Core service -> `verify` and `upload`. Singleton per BE, each FE connection gets its proxy.
bind(ConnectionContainerModule).toConstantValue(
Expand Down Expand Up @@ -350,10 +351,10 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => {
bind(ILogger)
.toDynamicValue((ctx) => {
const parentLogger = ctx.container.get<ILogger>(ILogger);
return parentLogger.child('discovery');
return parentLogger.child('discovery-log'); // TODO: revert
})
.inSingletonScope()
.whenTargetNamed('discovery');
.whenTargetNamed('discovery-log'); // TODO: revert

// Logger for the CLI config service. From the CLI config (FS path aware), we make a URI-aware app config.
bind(ILogger)
Expand Down
275 changes: 211 additions & 64 deletions arduino-ide-extension/src/node/board-discovery.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { injectable, inject, postConstruct, named } from '@theia/core/shared/inversify';
import { injectable, inject, named } from '@theia/core/shared/inversify';
import { ClientDuplexStream } from '@grpc/grpc-js';
import { ILogger } from '@theia/core/lib/common/logger';
import { deepClone } from '@theia/core/lib/common/objects';
import { CoreClientAware, CoreClientProvider } from './core-client-provider';
import { CoreClientAware } from './core-client-provider';
import {
BoardListWatchRequest,
BoardListWatchResponse,
Expand All @@ -14,31 +14,49 @@ import {
AvailablePorts,
AttachedBoardsChangeEvent,
} from '../common/protocol';
import { Emitter, Event } from '@theia/core/lib/common/event';
import { DisposableCollection } from '@theia/core/lib/common/disposable';
import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb';
import { v4 } from 'uuid';
import { ServiceError } from './service-error';
import { BackendApplicationContribution } from '@theia/core/lib/node';
import { Deferred } from '@theia/core/lib/common/promise-util';

type Duplex = ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>;
interface StreamWrapper extends Disposable {
readonly stream: Duplex;
readonly uuid: string; // For logging only
}

/**
* Singleton service for tracking the available ports and board and broadcasting the
* changes to all connected frontend instances. \
* changes to all connected frontend instances.
*
* Unlike other services, this is not connection scoped.
*/
@injectable()
export class BoardDiscovery extends CoreClientAware {
export class BoardDiscovery
extends CoreClientAware
implements BackendApplicationContribution
{
@inject(ILogger)
@named('discovery')
protected discoveryLogger: ILogger;
@named('discovery-log')
private readonly logger: ILogger;

@inject(NotificationServiceServer)
protected readonly notificationService: NotificationServiceServer;
private readonly notificationService: NotificationServiceServer;

// Used to know if the board watch process is already running to avoid
// starting it multiple times
private watching: boolean;

protected boardWatchDuplex:
| ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>
| undefined;
private watching: Deferred<void> | undefined;
private stopping: Deferred<void> | undefined;
private wrapper: StreamWrapper | undefined;
private readonly onStreamDidEndEmitter = new Emitter<void>(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization.
private readonly onStreamDidCancelEmitter = new Emitter<void>(); // when the watcher is canceled by the IDE2
private readonly toDisposeOnStopWatch = new DisposableCollection();

/**
* Keys are the `address` of the ports. \
* Keys are the `address` of the ports.
*
* The `protocol` is ignored because the board detach event does not carry the protocol information,
* just the address.
* ```json
Expand All @@ -48,57 +66,173 @@ export class BoardDiscovery extends CoreClientAware {
* }
* ```
*/
protected _state: AvailablePorts = {};
get state(): AvailablePorts {
return this._state;
private _availablePorts: AvailablePorts = {};
get availablePorts(): AvailablePorts {
return this._availablePorts;
}

@postConstruct()
protected async init(): Promise<void> {
this.coreClient.then((client) => this.startBoardListWatch(client));
onStart(): void {
this.start();
this.onClientDidRefresh(() => this.restart());
}

stopBoardListWatch(coreClient: CoreClientProvider.Client): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.boardWatchDuplex) {
return resolve();
}
private async restart(): Promise<void> {
this.logger.info('restarting before stop');
await this.stop();
this.logger.info('restarting after stop');
return this.start();
}

const { instance } = coreClient;
const req = new BoardListWatchRequest();
req.setInstance(instance);
try {
this.boardWatchDuplex.write(req.setInterrupt(true), resolve);
} catch (e) {
this.discoveryLogger.error(e);
resolve();
}
});
onStop(): void {
this.stop();
}

startBoardListWatch(coreClient: CoreClientProvider.Client): void {
if (this.watching) {
// We want to avoid starting the board list watch process multiple
// times to meet unforseen consequences
async stop(restart = false): Promise<void> {
this.logger.info('stop');
if (this.stopping) {
this.logger.info('stop already stopping');
return this.stopping.promise;
}
if (!this.watching) {
return;
}
this.watching = true;
const { client, instance } = coreClient;
const req = new BoardListWatchRequest();
req.setInstance(instance);
this.boardWatchDuplex = client.boardListWatch();
this.boardWatchDuplex.on('end', () => {
this.watching = false;
console.info('board watch ended');
this.stopping = new Deferred();
this.logger.info('>>> Stopping boards watcher...');
return new Promise<void>((resolve, reject) => {
const timeout = this.createTimeout(10_000, reject);
const toDispose = new DisposableCollection();
const waitForEvent = (event: Event<unknown>) =>
event(() => {
this.logger.info('stop received event: either end or cancel');
toDispose.dispose();
this.stopping?.resolve();
this.stopping = undefined;
this.logger.info('stop stopped');
resolve();
if (restart) {
this.start();
}
});
toDispose.pushAll([
timeout,
waitForEvent(this.onStreamDidEndEmitter.event),
waitForEvent(this.onStreamDidCancelEmitter.event),
]);
this.logger.info('Canceling boards watcher...');
this.toDisposeOnStopWatch.dispose();
});
this.boardWatchDuplex.on('close', () => {
this.watching = false;
console.info('board watch ended');
}

private createTimeout(
after: number,
onTimeout: (error: Error) => void
): Disposable {
const timer = setTimeout(
() => onTimeout(new Error(`Timed out after ${after} ms.`)),
after
);
return Disposable.create(() => clearTimeout(timer));
}

private async requestStartWatch(
req: BoardListWatchRequest,
duplex: Duplex
): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (
!duplex.write(req, (err: Error | undefined) => {
if (err) {
reject(err);
return;
}
})
) {
duplex.once('drain', resolve);
} else {
process.nextTick(resolve);
}
});
this.boardWatchDuplex.on('data', (resp: BoardListWatchResponse) => {
}

private async createWrapper(
client: ArduinoCoreServiceClient
): Promise<StreamWrapper> {
if (this.wrapper) {
throw new Error(`Duplex was already set.`);
}
const stream = client
.boardListWatch()
.on('end', () => {
this.logger.info('received end');
this.onStreamDidEndEmitter.fire();
})
.on('error', (error) => {
this.logger.info('error received');
if (ServiceError.isCancel(error)) {
this.logger.info('cancel error received!');
this.onStreamDidCancelEmitter.fire();
} else {
this.logger.error(
'Unexpected error occurred during the boards discovery.',
error
);
// TODO: terminate? restart? reject?
}
});
const wrapper = {
stream,
uuid: v4(),
dispose: () => {
this.logger.info('disposing requesting cancel');
// Cancelling the stream will kill the discovery `builtin:mdns-discovery process`.
// The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI.
stream.cancel();
this.logger.info('disposing canceled');
this.wrapper = undefined;
},
};
this.toDisposeOnStopWatch.pushAll([
wrapper,
Disposable.create(() => {
this.watching?.reject(new Error(`Stopping watcher.`));
this.watching = undefined;
}),
]);
return wrapper;
}

private toJson(arg: BoardListWatchRequest | BoardListWatchResponse): string {
let object: Record<string, unknown> | undefined = undefined;
if (arg instanceof BoardListWatchRequest) {
object = BoardListWatchRequest.toObject(false, arg);
} else if (arg instanceof BoardListWatchResponse) {
object = BoardListWatchResponse.toObject(false, arg);
} else {
throw new Error(`Unhandled object type: ${arg}`);
}
return JSON.stringify(object);
}

async start(): Promise<void> {
this.logger.info('start');
if (this.stopping) {
this.logger.info('start is stopping wait');
await this.stopping.promise;
this.logger.info('start stopped');
}
if (this.watching) {
this.logger.info('start already watching');
return this.watching.promise;
}
this.watching = new Deferred();
this.logger.info('start new deferred');
const { client, instance } = await this.coreClient;
const wrapper = await this.createWrapper(client);
wrapper.stream.on('data', async (resp: BoardListWatchResponse) => {
this.logger.info('onData', this.toJson(resp));
if (resp.getEventType() === 'quit') {
this.watching = false;
console.info('board watch ended');
this.logger.info('quit received');
this.stop();
return;
}

Expand All @@ -117,8 +251,8 @@ export class BoardDiscovery extends CoreClientAware {
throw new Error(`Unexpected event type: '${resp.getEventType()}'`);
}

const oldState = deepClone(this._state);
const newState = deepClone(this._state);
const oldState = deepClone(this._availablePorts);
const newState = deepClone(this._availablePorts);

const address = (detectedPort as any).getPort().getAddress();
const protocol = (detectedPort as any).getPort().getProtocol();
Expand All @@ -130,7 +264,9 @@ export class BoardDiscovery extends CoreClientAware {
// protocols.
const portID = `${address}|${protocol}`;
const label = (detectedPort as any).getPort().getLabel();
const protocolLabel = (detectedPort as any).getPort().getProtocolLabel();
const protocolLabel = (detectedPort as any)
.getPort()
.getProtocolLabel();
const port = {
id: portID,
address,
Expand All @@ -150,16 +286,20 @@ export class BoardDiscovery extends CoreClientAware {
if (eventType === 'add') {
if (newState[portID]) {
const [, knownBoards] = newState[portID];
console.warn(
`Port '${Port.toString(port)}' was already available. Known boards before override: ${JSON.stringify(
this.logger.warn(
`Port '${Port.toString(
port
)}' was already available. Known boards before override: ${JSON.stringify(
knownBoards
)}`
);
}
newState[portID] = [port, boards];
} else if (eventType === 'remove') {
if (!newState[portID]) {
console.warn(`Port '${Port.toString(port)}' was not available. Skipping`);
this.logger.warn(
`Port '${Port.toString(port)}' was not available. Skipping`
);
return;
}
delete newState[portID];
Expand All @@ -180,14 +320,21 @@ export class BoardDiscovery extends CoreClientAware {
},
};

this._state = newState;
this._availablePorts = newState;
this.notificationService.notifyAttachedBoardsDidChange(event);
}
});
this.boardWatchDuplex.write(req);
this.logger.info('start request start watch');
await this.requestStartWatch(
new BoardListWatchRequest().setInstance(instance),
wrapper.stream
);
this.logger.info('start requested start watch');
this.watching.resolve();
this.logger.info('start resolved watching');
}

getAttachedBoards(state: AvailablePorts = this.state): Board[] {
getAttachedBoards(state: AvailablePorts = this.availablePorts): Board[] {
const attachedBoards: Board[] = [];
for (const portID of Object.keys(state)) {
const [, boards] = state[portID];
Expand All @@ -196,7 +343,7 @@ export class BoardDiscovery extends CoreClientAware {
return attachedBoards;
}

getAvailablePorts(state: AvailablePorts = this.state): Port[] {
getAvailablePorts(state: AvailablePorts = this.availablePorts): Port[] {
const availablePorts: Port[] = [];
for (const portID of Object.keys(state)) {
const [port] = state[portID];
Expand Down
Loading