diff --git a/arduino-ide-extension/src/browser/monitor/monitor-connection.ts b/arduino-ide-extension/src/browser/monitor/monitor-connection.ts index 1c39e95bf..df52954c9 100644 --- a/arduino-ide-extension/src/browser/monitor/monitor-connection.ts +++ b/arduino-ide-extension/src/browser/monitor/monitor-connection.ts @@ -3,7 +3,7 @@ import { deepClone } from '@theia/core/lib/common/objects'; import { Emitter, Event } from '@theia/core/lib/common/event'; import { MessageService } from '@theia/core/lib/common/message-service'; import { FrontendApplicationStateService } from '@theia/core/lib/browser/frontend-application-state'; -import { MonitorService, MonitorConfig, MonitorError, Status, MonitorReadEvent } from '../../common/protocol/monitor-service'; +import { MonitorService, MonitorConfig, MonitorError, Status } from '../../common/protocol/monitor-service'; import { BoardsServiceProvider } from '../boards/boards-service-provider'; import { Port, Board, BoardsService, AttachedBoardsChangeEvent } from '../../common/protocol/boards-service'; import { MonitorServiceClientImpl } from './monitor-service-client-impl'; @@ -48,7 +48,7 @@ export class MonitorConnection { /** * This emitter forwards all read events **iff** the connection is established. */ - protected readonly onReadEmitter = new Emitter(); + protected readonly onReadEmitter = new Emitter<{ message: string }>(); /** * Array for storing previous monitor errors received from the server, and based on the number of elements in this array, @@ -60,12 +60,6 @@ export class MonitorConnection { @postConstruct() protected init(): void { - // Forward the messages from the board **iff** connected. - this.monitorServiceClient.onRead(event => { - if (this.connected) { - this.onReadEmitter.fire(event); - } - }); this.monitorServiceClient.onError(async error => { let shouldReconnect = false; if (this.state) { @@ -179,6 +173,15 @@ export class MonitorConnection { console.info(`>>> Creating serial monitor connection for ${Board.toString(config.board)} on port ${Port.toString(config.port)}...`); const connectStatus = await this.monitorService.connect(config); if (Status.isOK(connectStatus)) { + const requestMessage = () => { + this.monitorService.request().then(({ message }) => { + if (this.connected) { + this.onReadEmitter.fire({ message }); + requestMessage(); + } + }); + } + requestMessage(); this.state = { config }; console.info(`<<< Serial monitor connection created for ${Board.toString(config.board, { useFqbn: false })} on port ${Port.toString(config.port)}.`); } @@ -225,7 +228,7 @@ export class MonitorConnection { return this.onConnectionChangedEmitter.event; } - get onRead(): Event { + get onRead(): Event<{ message: string }> { return this.onReadEmitter.event; } diff --git a/arduino-ide-extension/src/browser/monitor/monitor-service-client-impl.ts b/arduino-ide-extension/src/browser/monitor/monitor-service-client-impl.ts index 88c6cb8d2..1e8ea29e4 100644 --- a/arduino-ide-extension/src/browser/monitor/monitor-service-client-impl.ts +++ b/arduino-ide-extension/src/browser/monitor/monitor-service-client-impl.ts @@ -1,21 +1,13 @@ import { injectable } from 'inversify'; import { Emitter } from '@theia/core/lib/common/event'; -import { MonitorServiceClient, MonitorReadEvent, MonitorError } from '../../common/protocol/monitor-service'; +import { MonitorServiceClient, MonitorError } from '../../common/protocol/monitor-service'; @injectable() export class MonitorServiceClientImpl implements MonitorServiceClient { - protected readonly onReadEmitter = new Emitter(); protected readonly onErrorEmitter = new Emitter(); - readonly onRead = this.onReadEmitter.event; readonly onError = this.onErrorEmitter.event; - notifyRead(event: MonitorReadEvent): void { - this.onReadEmitter.fire(event); - const { data } = event; - console.debug(`Received data: ${data}`); - } - notifyError(error: MonitorError): void { this.onErrorEmitter.fire(error); } diff --git a/arduino-ide-extension/src/browser/monitor/monitor-widget.tsx b/arduino-ide-extension/src/browser/monitor/monitor-widget.tsx index d504e2c23..993c0efbb 100644 --- a/arduino-ide-extension/src/browser/monitor/monitor-widget.tsx +++ b/arduino-ide-extension/src/browser/monitor/monitor-widget.tsx @@ -294,8 +294,8 @@ export class SerialMonitorOutput extends React.Component { - const rawLines = data.split('\n'); + this.props.monitorConnection.onRead(({ message }) => { + const rawLines = message.split('\n'); const lines: string[] = [] const timestamp = () => this.state.timestamp ? `${dateFormat(new Date(), 'H:M:ss.l')} -> ` : ''; for (let i = 0; i < rawLines.length; i++) { diff --git a/arduino-ide-extension/src/common/protocol/monitor-service.ts b/arduino-ide-extension/src/common/protocol/monitor-service.ts index f7b11f1d5..83d2ce82c 100644 --- a/arduino-ide-extension/src/common/protocol/monitor-service.ts +++ b/arduino-ide-extension/src/common/protocol/monitor-service.ts @@ -20,7 +20,8 @@ export const MonitorService = Symbol('MonitorService'); export interface MonitorService extends JsonRpcServer { connect(config: MonitorConfig): Promise; disconnect(): Promise; - send(data: string | Uint8Array): Promise; + send(message: string): Promise; + request(): Promise<{ message: string }>; } export interface MonitorConfig { @@ -51,14 +52,9 @@ export namespace MonitorConfig { export const MonitorServiceClient = Symbol('MonitorServiceClient'); export interface MonitorServiceClient { - notifyRead(event: MonitorReadEvent): void; notifyError(event: MonitorError): void; } -export interface MonitorReadEvent { - readonly data: string; -} - export interface MonitorError { readonly message: string; /** diff --git a/arduino-ide-extension/src/node/monitor/monitor-service-impl.ts b/arduino-ide-extension/src/node/monitor/monitor-service-impl.ts index 659040fb7..a62089e38 100644 --- a/arduino-ide-extension/src/node/monitor/monitor-service-impl.ts +++ b/arduino-ide-extension/src/node/monitor/monitor-service-impl.ts @@ -2,6 +2,7 @@ import { ClientDuplexStream } from '@grpc/grpc-js'; import { TextDecoder, TextEncoder } from 'util'; import { injectable, inject, named } from 'inversify'; import { Struct } from 'google-protobuf/google/protobuf/struct_pb'; +import { Emitter } from '@theia/core/lib/common/event'; import { ILogger } from '@theia/core/lib/common/logger'; import { MonitorService, MonitorServiceClient, MonitorConfig, MonitorError, Status } from '../../common/protocol/monitor-service'; import { StreamingOpenReq, StreamingOpenResp, MonitorConfig as GrpcMonitorConfig } from '../cli-protocol/monitor/monitor_pb'; @@ -46,6 +47,8 @@ export class MonitorServiceImpl implements MonitorService { protected client?: MonitorServiceClient; protected connection?: { duplex: ClientDuplexStream, config: MonitorConfig }; + protected messages: string[] = []; + protected onMessageDidReadEmitter = new Emitter(); setClient(client: MonitorServiceClient | undefined): void { this.client = client; @@ -86,11 +89,10 @@ export class MonitorServiceImpl implements MonitorService { }).bind(this)); duplex.on('data', ((resp: StreamingOpenResp) => { - if (this.client) { - const raw = resp.getData(); - const data = typeof raw === 'string' ? raw : new TextDecoder('utf8').decode(raw); - this.client.notifyRead({ data }); - } + const raw = resp.getData(); + const message = typeof raw === 'string' ? raw : new TextDecoder('utf8').decode(raw); + this.messages.push(message); + this.onMessageDidReadEmitter.fire(); }).bind(this)); const { type, port } = config; @@ -116,27 +118,31 @@ export class MonitorServiceImpl implements MonitorService { } async disconnect(reason?: MonitorError): Promise { - if (!this.connection && reason && reason.code === MonitorError.ErrorCodes.CLIENT_CANCEL) { + try { + if (!this.connection && reason && reason.code === MonitorError.ErrorCodes.CLIENT_CANCEL) { + return Status.OK; + } + this.logger.info(`>>> Disposing monitor connection...`); + if (!this.connection) { + this.logger.warn(`<<< Not connected. Nothing to dispose.`); + return Status.NOT_CONNECTED; + } + const { duplex, config } = this.connection; + duplex.cancel(); + this.logger.info(`<<< Disposed monitor connection for ${Board.toString(config.board, { useFqbn: false })} on port ${Port.toString(config.port)}.`); + this.connection = undefined; return Status.OK; + } finally { + this.messages.length = 0; } - this.logger.info(`>>> Disposing monitor connection...`); - if (!this.connection) { - this.logger.warn(`<<< Not connected. Nothing to dispose.`); - return Status.NOT_CONNECTED; - } - const { duplex, config } = this.connection; - duplex.cancel(); - this.logger.info(`<<< Disposed monitor connection for ${Board.toString(config.board, { useFqbn: false })} on port ${Port.toString(config.port)}.`); - this.connection = undefined; - return Status.OK; } - async send(data: string): Promise { + async send(message: string): Promise { if (!this.connection) { return Status.NOT_CONNECTED; } const req = new StreamingOpenReq(); - req.setData(new TextEncoder().encode(data)); + req.setData(new TextEncoder().encode(message)); return new Promise(resolve => { if (this.connection) { this.connection.duplex.write(req, () => { @@ -148,6 +154,19 @@ export class MonitorServiceImpl implements MonitorService { }); } + async request(): Promise<{ message: string }> { + const message = this.messages.shift(); + if (message) { + return { message }; + } + return new Promise<{ message: string }>(resolve => { + const toDispose = this.onMessageDidReadEmitter.event(() => { + toDispose.dispose(); + resolve(this.request()); + }); + }); + } + protected mapType(type?: MonitorConfig.ConnectionType): GrpcMonitorConfig.TargetType { switch (type) { case MonitorConfig.ConnectionType.SERIAL: return GrpcMonitorConfig.TargetType.SERIAL;