diff --git a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts index 1cf4f39af..169a728a9 100644 --- a/arduino-ide-extension/src/node/arduino-ide-backend-module.ts +++ b/arduino-ide-extension/src/node/arduino-ide-backend-module.ts @@ -203,6 +203,7 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => { // Shared port/board discovery for the server bind(BoardDiscovery).toSelf().inSingletonScope(); + bind(BackendApplicationContribution).toService(BoardDiscovery); // Core service -> `verify` and `upload`. Singleton per BE, each FE connection gets its proxy. bind(ConnectionContainerModule).toConstantValue( @@ -350,10 +351,10 @@ export default new ContainerModule((bind, unbind, isBound, rebind) => { bind(ILogger) .toDynamicValue((ctx) => { const parentLogger = ctx.container.get(ILogger); - return parentLogger.child('discovery'); + return parentLogger.child('discovery-log'); // TODO: revert }) .inSingletonScope() - .whenTargetNamed('discovery'); + .whenTargetNamed('discovery-log'); // TODO: revert // Logger for the CLI config service. From the CLI config (FS path aware), we make a URI-aware app config. bind(ILogger) diff --git a/arduino-ide-extension/src/node/board-discovery.ts b/arduino-ide-extension/src/node/board-discovery.ts index f2d2dcb41..448d48e42 100644 --- a/arduino-ide-extension/src/node/board-discovery.ts +++ b/arduino-ide-extension/src/node/board-discovery.ts @@ -1,8 +1,8 @@ -import { injectable, inject, postConstruct, named } from '@theia/core/shared/inversify'; +import { injectable, inject, named } from '@theia/core/shared/inversify'; import { ClientDuplexStream } from '@grpc/grpc-js'; import { ILogger } from '@theia/core/lib/common/logger'; import { deepClone } from '@theia/core/lib/common/objects'; -import { CoreClientAware, CoreClientProvider } from './core-client-provider'; +import { CoreClientAware } from './core-client-provider'; import { BoardListWatchRequest, BoardListWatchResponse, @@ -14,31 +14,49 @@ import { AvailablePorts, AttachedBoardsChangeEvent, } from '../common/protocol'; +import { Emitter, Event } from '@theia/core/lib/common/event'; +import { DisposableCollection } from '@theia/core/lib/common/disposable'; +import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol'; +import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb'; +import { v4 } from 'uuid'; +import { ServiceError } from './service-error'; +import { BackendApplicationContribution } from '@theia/core/lib/node'; +import { Deferred } from '@theia/core/lib/common/promise-util'; + +type Duplex = ClientDuplexStream; +interface StreamWrapper extends Disposable { + readonly stream: Duplex; + readonly uuid: string; // For logging only +} /** * Singleton service for tracking the available ports and board and broadcasting the - * changes to all connected frontend instances. \ + * changes to all connected frontend instances. + * * Unlike other services, this is not connection scoped. */ @injectable() -export class BoardDiscovery extends CoreClientAware { +export class BoardDiscovery + extends CoreClientAware + implements BackendApplicationContribution +{ @inject(ILogger) - @named('discovery') - protected discoveryLogger: ILogger; + @named('discovery-log') + private readonly logger: ILogger; @inject(NotificationServiceServer) - protected readonly notificationService: NotificationServiceServer; + private readonly notificationService: NotificationServiceServer; - // Used to know if the board watch process is already running to avoid - // starting it multiple times - private watching: boolean; - - protected boardWatchDuplex: - | ClientDuplexStream - | undefined; + private watching: Deferred | undefined; + private stopping: Deferred | undefined; + private wrapper: StreamWrapper | undefined; + private readonly onStreamDidEndEmitter = new Emitter(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization. + private readonly onStreamDidCancelEmitter = new Emitter(); // when the watcher is canceled by the IDE2 + private readonly toDisposeOnStopWatch = new DisposableCollection(); /** - * Keys are the `address` of the ports. \ + * Keys are the `address` of the ports. + * * The `protocol` is ignored because the board detach event does not carry the protocol information, * just the address. * ```json @@ -48,57 +66,173 @@ export class BoardDiscovery extends CoreClientAware { * } * ``` */ - protected _state: AvailablePorts = {}; - get state(): AvailablePorts { - return this._state; + private _availablePorts: AvailablePorts = {}; + get availablePorts(): AvailablePorts { + return this._availablePorts; } - @postConstruct() - protected async init(): Promise { - this.coreClient.then((client) => this.startBoardListWatch(client)); + onStart(): void { + this.start(); + this.onClientDidRefresh(() => this.restart()); } - stopBoardListWatch(coreClient: CoreClientProvider.Client): Promise { - return new Promise((resolve, reject) => { - if (!this.boardWatchDuplex) { - return resolve(); - } + private async restart(): Promise { + this.logger.info('restarting before stop'); + await this.stop(); + this.logger.info('restarting after stop'); + return this.start(); + } - const { instance } = coreClient; - const req = new BoardListWatchRequest(); - req.setInstance(instance); - try { - this.boardWatchDuplex.write(req.setInterrupt(true), resolve); - } catch (e) { - this.discoveryLogger.error(e); - resolve(); - } - }); + onStop(): void { + this.stop(); } - startBoardListWatch(coreClient: CoreClientProvider.Client): void { - if (this.watching) { - // We want to avoid starting the board list watch process multiple - // times to meet unforseen consequences + async stop(restart = false): Promise { + this.logger.info('stop'); + if (this.stopping) { + this.logger.info('stop already stopping'); + return this.stopping.promise; + } + if (!this.watching) { return; } - this.watching = true; - const { client, instance } = coreClient; - const req = new BoardListWatchRequest(); - req.setInstance(instance); - this.boardWatchDuplex = client.boardListWatch(); - this.boardWatchDuplex.on('end', () => { - this.watching = false; - console.info('board watch ended'); + this.stopping = new Deferred(); + this.logger.info('>>> Stopping boards watcher...'); + return new Promise((resolve, reject) => { + const timeout = this.createTimeout(10_000, reject); + const toDispose = new DisposableCollection(); + const waitForEvent = (event: Event) => + event(() => { + this.logger.info('stop received event: either end or cancel'); + toDispose.dispose(); + this.stopping?.resolve(); + this.stopping = undefined; + this.logger.info('stop stopped'); + resolve(); + if (restart) { + this.start(); + } + }); + toDispose.pushAll([ + timeout, + waitForEvent(this.onStreamDidEndEmitter.event), + waitForEvent(this.onStreamDidCancelEmitter.event), + ]); + this.logger.info('Canceling boards watcher...'); + this.toDisposeOnStopWatch.dispose(); }); - this.boardWatchDuplex.on('close', () => { - this.watching = false; - console.info('board watch ended'); + } + + private createTimeout( + after: number, + onTimeout: (error: Error) => void + ): Disposable { + const timer = setTimeout( + () => onTimeout(new Error(`Timed out after ${after} ms.`)), + after + ); + return Disposable.create(() => clearTimeout(timer)); + } + + private async requestStartWatch( + req: BoardListWatchRequest, + duplex: Duplex + ): Promise { + return new Promise((resolve, reject) => { + if ( + !duplex.write(req, (err: Error | undefined) => { + if (err) { + reject(err); + return; + } + }) + ) { + duplex.once('drain', resolve); + } else { + process.nextTick(resolve); + } }); - this.boardWatchDuplex.on('data', (resp: BoardListWatchResponse) => { + } + + private async createWrapper( + client: ArduinoCoreServiceClient + ): Promise { + if (this.wrapper) { + throw new Error(`Duplex was already set.`); + } + const stream = client + .boardListWatch() + .on('end', () => { + this.logger.info('received end'); + this.onStreamDidEndEmitter.fire(); + }) + .on('error', (error) => { + this.logger.info('error received'); + if (ServiceError.isCancel(error)) { + this.logger.info('cancel error received!'); + this.onStreamDidCancelEmitter.fire(); + } else { + this.logger.error( + 'Unexpected error occurred during the boards discovery.', + error + ); + // TODO: terminate? restart? reject? + } + }); + const wrapper = { + stream, + uuid: v4(), + dispose: () => { + this.logger.info('disposing requesting cancel'); + // Cancelling the stream will kill the discovery `builtin:mdns-discovery process`. + // The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI. + stream.cancel(); + this.logger.info('disposing canceled'); + this.wrapper = undefined; + }, + }; + this.toDisposeOnStopWatch.pushAll([ + wrapper, + Disposable.create(() => { + this.watching?.reject(new Error(`Stopping watcher.`)); + this.watching = undefined; + }), + ]); + return wrapper; + } + + private toJson(arg: BoardListWatchRequest | BoardListWatchResponse): string { + let object: Record | undefined = undefined; + if (arg instanceof BoardListWatchRequest) { + object = BoardListWatchRequest.toObject(false, arg); + } else if (arg instanceof BoardListWatchResponse) { + object = BoardListWatchResponse.toObject(false, arg); + } else { + throw new Error(`Unhandled object type: ${arg}`); + } + return JSON.stringify(object); + } + + async start(): Promise { + this.logger.info('start'); + if (this.stopping) { + this.logger.info('start is stopping wait'); + await this.stopping.promise; + this.logger.info('start stopped'); + } + if (this.watching) { + this.logger.info('start already watching'); + return this.watching.promise; + } + this.watching = new Deferred(); + this.logger.info('start new deferred'); + const { client, instance } = await this.coreClient; + const wrapper = await this.createWrapper(client); + wrapper.stream.on('data', async (resp: BoardListWatchResponse) => { + this.logger.info('onData', this.toJson(resp)); if (resp.getEventType() === 'quit') { - this.watching = false; - console.info('board watch ended'); + this.logger.info('quit received'); + this.stop(); return; } @@ -117,8 +251,8 @@ export class BoardDiscovery extends CoreClientAware { throw new Error(`Unexpected event type: '${resp.getEventType()}'`); } - const oldState = deepClone(this._state); - const newState = deepClone(this._state); + const oldState = deepClone(this._availablePorts); + const newState = deepClone(this._availablePorts); const address = (detectedPort as any).getPort().getAddress(); const protocol = (detectedPort as any).getPort().getProtocol(); @@ -130,7 +264,9 @@ export class BoardDiscovery extends CoreClientAware { // protocols. const portID = `${address}|${protocol}`; const label = (detectedPort as any).getPort().getLabel(); - const protocolLabel = (detectedPort as any).getPort().getProtocolLabel(); + const protocolLabel = (detectedPort as any) + .getPort() + .getProtocolLabel(); const port = { id: portID, address, @@ -150,8 +286,10 @@ export class BoardDiscovery extends CoreClientAware { if (eventType === 'add') { if (newState[portID]) { const [, knownBoards] = newState[portID]; - console.warn( - `Port '${Port.toString(port)}' was already available. Known boards before override: ${JSON.stringify( + this.logger.warn( + `Port '${Port.toString( + port + )}' was already available. Known boards before override: ${JSON.stringify( knownBoards )}` ); @@ -159,7 +297,9 @@ export class BoardDiscovery extends CoreClientAware { newState[portID] = [port, boards]; } else if (eventType === 'remove') { if (!newState[portID]) { - console.warn(`Port '${Port.toString(port)}' was not available. Skipping`); + this.logger.warn( + `Port '${Port.toString(port)}' was not available. Skipping` + ); return; } delete newState[portID]; @@ -180,14 +320,21 @@ export class BoardDiscovery extends CoreClientAware { }, }; - this._state = newState; + this._availablePorts = newState; this.notificationService.notifyAttachedBoardsDidChange(event); } }); - this.boardWatchDuplex.write(req); + this.logger.info('start request start watch'); + await this.requestStartWatch( + new BoardListWatchRequest().setInstance(instance), + wrapper.stream + ); + this.logger.info('start requested start watch'); + this.watching.resolve(); + this.logger.info('start resolved watching'); } - getAttachedBoards(state: AvailablePorts = this.state): Board[] { + getAttachedBoards(state: AvailablePorts = this.availablePorts): Board[] { const attachedBoards: Board[] = []; for (const portID of Object.keys(state)) { const [, boards] = state[portID]; @@ -196,7 +343,7 @@ export class BoardDiscovery extends CoreClientAware { return attachedBoards; } - getAvailablePorts(state: AvailablePorts = this.state): Port[] { + getAvailablePorts(state: AvailablePorts = this.availablePorts): Port[] { const availablePorts: Port[] = []; for (const portID of Object.keys(state)) { const [port] = state[portID]; diff --git a/arduino-ide-extension/src/node/boards-service-impl.ts b/arduino-ide-extension/src/node/boards-service-impl.ts index 98cdcb7fb..5f2e1e64e 100644 --- a/arduino-ide-extension/src/node/boards-service-impl.ts +++ b/arduino-ide-extension/src/node/boards-service-impl.ts @@ -1,4 +1,4 @@ -import { injectable, inject, named } from '@theia/core/shared/inversify'; +import { injectable, inject } from '@theia/core/shared/inversify'; import { ILogger } from '@theia/core/lib/common/logger'; import { notEmpty } from '@theia/core/lib/common/objects'; import { @@ -50,10 +50,6 @@ export class BoardsServiceImpl @inject(ILogger) protected logger: ILogger; - @inject(ILogger) - @named('discovery') - protected discoveryLogger: ILogger; - @inject(ResponseService) protected readonly responseService: ResponseService; @@ -64,7 +60,7 @@ export class BoardsServiceImpl protected readonly boardDiscovery: BoardDiscovery; async getState(): Promise { - return this.boardDiscovery.state; + return this.boardDiscovery.availablePorts; } async getAttachedBoards(): Promise { @@ -414,7 +410,7 @@ export class BoardsServiceImpl console.info('>>> Starting boards package installation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.platformInstall(req); resp.on( @@ -426,7 +422,7 @@ export class BoardsServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', (error) => { @@ -465,7 +461,7 @@ export class BoardsServiceImpl console.info('>>> Starting boards package uninstallation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.platformUninstall(req); resp.on( @@ -477,7 +473,7 @@ export class BoardsServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); diff --git a/arduino-ide-extension/src/node/core-client-provider.ts b/arduino-ide-extension/src/node/core-client-provider.ts index 3258ef829..449f28ab5 100644 --- a/arduino-ide-extension/src/node/core-client-provider.ts +++ b/arduino-ide-extension/src/node/core-client-provider.ts @@ -5,7 +5,7 @@ import { injectable, postConstruct, } from '@theia/core/shared/inversify'; -import { Emitter } from '@theia/core/lib/common/event'; +import { Emitter, Event } from '@theia/core/lib/common/event'; import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb'; import { Instance } from './cli-protocol/cc/arduino/cli/commands/v1/common_pb'; import { @@ -53,6 +53,8 @@ export class CoreClientProvider { private readonly onClientReadyEmitter = new Emitter(); private readonly onClientReady = this.onClientReadyEmitter.event; + private readonly onClientDidRefreshEmitter = + new Emitter(); @postConstruct() protected init(): void { @@ -88,6 +90,10 @@ export class CoreClientProvider { return this.pending.promise; } + get onClientDidRefresh(): Event { + return this.onClientDidRefreshEmitter.event; + } + /** * Encapsulates both the gRPC core client creation (`CreateRequest`) and initialization (`InitRequest`). */ @@ -253,6 +259,7 @@ export class CoreClientProvider { await this.initInstance(client); // notify clients about the index update only after the client has been "re-initialized" and the new content is available. progressHandler.reportEnd(); + this.onClientDidRefreshEmitter.fire(client); } catch (err) { console.error('Failed to update indexes', err); progressHandler.reportError( @@ -404,6 +411,10 @@ export abstract class CoreClientAware { protected get coreClient(): Promise { return this.coreClientProvider.client; } + + protected get onClientDidRefresh(): Event { + return this.coreClientProvider.onClientDidRefresh; + } } class IndexUpdateRequiredBeforeInitError extends Error { diff --git a/arduino-ide-extension/src/node/library-service-impl.ts b/arduino-ide-extension/src/node/library-service-impl.ts index 42ad456c4..e0d3be567 100644 --- a/arduino-ide-extension/src/node/library-service-impl.ts +++ b/arduino-ide-extension/src/node/library-service-impl.ts @@ -269,7 +269,7 @@ export class LibraryServiceImpl console.info('>>> Starting library package installation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.libraryInstall(req); resp.on( @@ -281,7 +281,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', (error) => { @@ -323,7 +323,7 @@ export class LibraryServiceImpl } // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.zipLibraryInstall(req); resp.on( @@ -335,7 +335,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); @@ -358,7 +358,7 @@ export class LibraryServiceImpl console.info('>>> Starting library package uninstallation...', item); // stop the board discovery - await this.boardDiscovery.stopBoardListWatch(coreClient); + await this.boardDiscovery.stop(); const resp = client.libraryUninstall(req); resp.on( @@ -370,7 +370,7 @@ export class LibraryServiceImpl ); await new Promise((resolve, reject) => { resp.on('end', () => { - this.boardDiscovery.startBoardListWatch(coreClient); + this.boardDiscovery.start(); // TODO: remove discovery dependency from boards service. See https://github.com/arduino/arduino-ide/pull/1107 why this is here. resolve(); }); resp.on('error', reject); diff --git a/arduino-ide-extension/src/node/service-error.ts b/arduino-ide-extension/src/node/service-error.ts index 3abbbc0b0..a56cf13ea 100644 --- a/arduino-ide-extension/src/node/service-error.ts +++ b/arduino-ide-extension/src/node/service-error.ts @@ -2,6 +2,9 @@ import { Metadata, StatusObject } from '@grpc/grpc-js'; export type ServiceError = StatusObject & Error; export namespace ServiceError { + export function isCancel(arg: unknown): arg is ServiceError & { code: 1 } { + return is(arg) && arg.code === 1; // https://grpc.github.io/grpc/core/md_doc_statuscodes.html + } export function is(arg: unknown): arg is ServiceError { return arg instanceof Error && isStatusObjet(arg); }