@@ -2,6 +2,7 @@ import { ClientDuplexStream } from '@grpc/grpc-js';
2
2
import { TextDecoder , TextEncoder } from 'util' ;
3
3
import { injectable , inject , named } from 'inversify' ;
4
4
import { Struct } from 'google-protobuf/google/protobuf/struct_pb' ;
5
+ import { Emitter } from '@theia/core/lib/common/event' ;
5
6
import { ILogger } from '@theia/core/lib/common/logger' ;
6
7
import { MonitorService , MonitorServiceClient , MonitorConfig , MonitorError , Status } from '../../common/protocol/monitor-service' ;
7
8
import { StreamingOpenReq , StreamingOpenResp , MonitorConfig as GrpcMonitorConfig } from '../cli-protocol/monitor/monitor_pb' ;
@@ -46,6 +47,8 @@ export class MonitorServiceImpl implements MonitorService {
46
47
47
48
protected client ?: MonitorServiceClient ;
48
49
protected connection ?: { duplex : ClientDuplexStream < StreamingOpenReq , StreamingOpenResp > , config : MonitorConfig } ;
50
+ protected messages : string [ ] = [ ] ;
51
+ protected onMessageDidReadEmitter = new Emitter < void > ( ) ;
49
52
50
53
setClient ( client : MonitorServiceClient | undefined ) : void {
51
54
this . client = client ;
@@ -86,11 +89,10 @@ export class MonitorServiceImpl implements MonitorService {
86
89
} ) . bind ( this ) ) ;
87
90
88
91
duplex . on ( 'data' , ( ( resp : StreamingOpenResp ) => {
89
- if ( this . client ) {
90
- const raw = resp . getData ( ) ;
91
- const data = typeof raw === 'string' ? raw : new TextDecoder ( 'utf8' ) . decode ( raw ) ;
92
- this . client . notifyRead ( { data } ) ;
93
- }
92
+ const raw = resp . getData ( ) ;
93
+ const message = typeof raw === 'string' ? raw : new TextDecoder ( 'utf8' ) . decode ( raw ) ;
94
+ this . messages . push ( message ) ;
95
+ this . onMessageDidReadEmitter . fire ( ) ;
94
96
} ) . bind ( this ) ) ;
95
97
96
98
const { type, port } = config ;
@@ -116,27 +118,31 @@ export class MonitorServiceImpl implements MonitorService {
116
118
}
117
119
118
120
async disconnect ( reason ?: MonitorError ) : Promise < Status > {
119
- if ( ! this . connection && reason && reason . code === MonitorError . ErrorCodes . CLIENT_CANCEL ) {
121
+ try {
122
+ if ( ! this . connection && reason && reason . code === MonitorError . ErrorCodes . CLIENT_CANCEL ) {
123
+ return Status . OK ;
124
+ }
125
+ this . logger . info ( `>>> Disposing monitor connection...` ) ;
126
+ if ( ! this . connection ) {
127
+ this . logger . warn ( `<<< Not connected. Nothing to dispose.` ) ;
128
+ return Status . NOT_CONNECTED ;
129
+ }
130
+ const { duplex, config } = this . connection ;
131
+ duplex . cancel ( ) ;
132
+ this . logger . info ( `<<< Disposed monitor connection for ${ Board . toString ( config . board , { useFqbn : false } ) } on port ${ Port . toString ( config . port ) } .` ) ;
133
+ this . connection = undefined ;
120
134
return Status . OK ;
135
+ } finally {
136
+ this . messages . length = 0 ;
121
137
}
122
- this . logger . info ( `>>> Disposing monitor connection...` ) ;
123
- if ( ! this . connection ) {
124
- this . logger . warn ( `<<< Not connected. Nothing to dispose.` ) ;
125
- return Status . NOT_CONNECTED ;
126
- }
127
- const { duplex, config } = this . connection ;
128
- duplex . cancel ( ) ;
129
- this . logger . info ( `<<< Disposed monitor connection for ${ Board . toString ( config . board , { useFqbn : false } ) } on port ${ Port . toString ( config . port ) } .` ) ;
130
- this . connection = undefined ;
131
- return Status . OK ;
132
138
}
133
139
134
- async send ( data : string ) : Promise < Status > {
140
+ async send ( message : string ) : Promise < Status > {
135
141
if ( ! this . connection ) {
136
142
return Status . NOT_CONNECTED ;
137
143
}
138
144
const req = new StreamingOpenReq ( ) ;
139
- req . setData ( new TextEncoder ( ) . encode ( data ) ) ;
145
+ req . setData ( new TextEncoder ( ) . encode ( message ) ) ;
140
146
return new Promise < Status > ( resolve => {
141
147
if ( this . connection ) {
142
148
this . connection . duplex . write ( req , ( ) => {
@@ -148,6 +154,19 @@ export class MonitorServiceImpl implements MonitorService {
148
154
} ) ;
149
155
}
150
156
157
+ async request ( ) : Promise < { message : string } > {
158
+ const message = this . messages . shift ( ) ;
159
+ if ( message ) {
160
+ return { message } ;
161
+ }
162
+ return new Promise < { message : string } > ( resolve => {
163
+ const toDispose = this . onMessageDidReadEmitter . event ( ( ) => {
164
+ toDispose . dispose ( ) ;
165
+ resolve ( this . request ( ) ) ;
166
+ } ) ;
167
+ } ) ;
168
+ }
169
+
151
170
protected mapType ( type ?: MonitorConfig . ConnectionType ) : GrpcMonitorConfig . TargetType {
152
171
switch ( type ) {
153
172
case MonitorConfig . ConnectionType . SERIAL : return GrpcMonitorConfig . TargetType . SERIAL ;
0 commit comments