@@ -21,6 +21,17 @@ import {
21
21
import { Deferred } from '@theia/core/lib/common/promise-util' ;
22
22
23
23
export const MonitorServiceName = 'monitor-service' ;
24
+ type DuplexHandlerKeys =
25
+ | 'close'
26
+ | 'end'
27
+ | 'error'
28
+ | 'data'
29
+ | 'status'
30
+ | 'metadata' ;
31
+ interface DuplexHandler {
32
+ key : DuplexHandlerKeys ;
33
+ callback : ( ...args : any ) => void ;
34
+ }
24
35
25
36
export class MonitorService extends CoreClientAware implements Disposable {
26
37
// Bidirectional gRPC stream used to receive and send data from the running
@@ -121,6 +132,15 @@ export class MonitorService extends CoreClientAware implements Disposable {
121
132
return ! ! this . duplex ;
122
133
}
123
134
135
+ setDuplexHandlers (
136
+ duplex : ClientDuplexStream < MonitorRequest , MonitorResponse > ,
137
+ handlers : DuplexHandler [ ]
138
+ ) : void {
139
+ for ( const handler of handlers ) {
140
+ duplex . on ( handler . key , handler . callback ) ;
141
+ }
142
+ }
143
+
124
144
/**
125
145
* Start and connects a monitor using currently set board and port.
126
146
* If a monitor is already started or board fqbn, port address and/or protocol
@@ -169,45 +189,8 @@ export class MonitorService extends CoreClientAware implements Disposable {
169
189
170
190
await this . coreClientProvider . initialized ;
171
191
const coreClient = await this . coreClient ( ) ;
172
- const { client, instance } = coreClient ;
173
- this . duplex = client . monitor ( ) ;
174
- this . duplex
175
- . on ( 'close' , ( ) => {
176
- this . duplex = null ;
177
- this . updateClientsSettings ( { monitorUISettings : { connected : false } } ) ;
178
- this . logger . info (
179
- `monitor to ${ this . port ?. address } using ${ this . port ?. protocol } closed by client`
180
- ) ;
181
- } )
182
- . on ( 'end' , ( ) => {
183
- this . duplex = null ;
184
- this . updateClientsSettings ( { monitorUISettings : { connected : false } } ) ;
185
- this . logger . info (
186
- `monitor to ${ this . port ?. address } using ${ this . port ?. protocol } closed by server`
187
- ) ;
188
- } )
189
- . on ( 'error' , ( err : Error ) => {
190
- this . logger . error ( err ) ;
191
- // TODO
192
- // this.theiaFEClient?.notifyError()
193
- } )
194
- . on (
195
- 'data' ,
196
- ( ( res : MonitorResponse ) => {
197
- if ( res . getError ( ) ) {
198
- // TODO: Maybe disconnect
199
- this . logger . error ( res . getError ( ) ) ;
200
- return ;
201
- }
202
- const data = res . getRxData ( ) ;
203
- const message =
204
- typeof data === 'string'
205
- ? data
206
- : new TextDecoder ( 'utf8' ) . decode ( data ) ;
207
- this . messages . push ( ...splitLines ( message ) ) ;
208
- } ) . bind ( this )
209
- ) ;
210
192
193
+ const { instance } = coreClient ;
211
194
const req = new MonitorRequest ( ) ;
212
195
req . setInstance ( instance ) ;
213
196
if ( this . board ?. fqbn ) {
@@ -228,34 +211,94 @@ export class MonitorService extends CoreClientAware implements Disposable {
228
211
}
229
212
req . setPortConfiguration ( config ) ;
230
213
231
- const connect = new Promise < Status > ( ( resolve ) => {
232
- if ( this . duplex ?. write ( req ) ) {
233
- this . startMessagesHandlers ( ) ;
234
- this . logger . info (
235
- `started monitor to ${ this . port ?. address } using ${ this . port ?. protocol } `
236
- ) ;
237
- this . updateClientsSettings ( {
238
- monitorUISettings : { connected : true , serialPort : this . port . address } ,
239
- } ) ;
240
- resolve ( Status . OK ) ;
241
- return ;
242
- }
214
+ // Promise executor
215
+ const writeToStream = ( resolve : ( value : boolean ) => void ) => {
216
+ this . duplex = this . duplex || coreClient . client . monitor ( ) ;
217
+
218
+ const duplexHandlers : DuplexHandler [ ] = [
219
+ {
220
+ key : 'close' ,
221
+ callback : ( ) => {
222
+ this . duplex = null ;
223
+ this . updateClientsSettings ( {
224
+ monitorUISettings : { connected : false } ,
225
+ } ) ;
226
+ this . logger . info (
227
+ `monitor to ${ this . port ?. address } using ${ this . port ?. protocol } closed by client`
228
+ ) ;
229
+ } ,
230
+ } ,
231
+ {
232
+ key : 'end' ,
233
+ callback : ( ) => {
234
+ this . duplex = null ;
235
+ this . updateClientsSettings ( {
236
+ monitorUISettings : { connected : false } ,
237
+ } ) ;
238
+ this . logger . info (
239
+ `monitor to ${ this . port ?. address } using ${ this . port ?. protocol } closed by server`
240
+ ) ;
241
+ } ,
242
+ } ,
243
+ {
244
+ key : 'error' ,
245
+ callback : ( err : Error ) => {
246
+ this . logger . error ( err ) ;
247
+ resolve ( false ) ;
248
+ // TODO
249
+ // this.theiaFEClient?.notifyError()
250
+ } ,
251
+ } ,
252
+ {
253
+ key : 'data' ,
254
+ callback : ( res : MonitorResponse ) => {
255
+ if ( res . getError ( ) ) {
256
+ // TODO: Maybe disconnect
257
+ this . logger . error ( res . getError ( ) ) ;
258
+ return ;
259
+ }
260
+ const data = res . getRxData ( ) ;
261
+ const message =
262
+ typeof data === 'string'
263
+ ? data
264
+ : new TextDecoder ( 'utf8' ) . decode ( data ) ;
265
+ this . messages . push ( ...splitLines ( message ) ) ;
266
+
267
+ // if (res.getSuccess()) {
268
+ // resolve(true);
269
+ // return;
270
+ // }
271
+ } ,
272
+ } ,
273
+ ] ;
274
+
275
+ this . setDuplexHandlers ( this . duplex , duplexHandlers ) ;
276
+ this . duplex . write ( req ) ;
277
+ } ;
278
+
279
+ let attemptsRemaining = 10 ;
280
+ let wroteToStreamWithoutError = false ;
281
+ do {
282
+ await new Promise ( ( r ) => setTimeout ( r , 10000 ) ) ;
283
+ wroteToStreamWithoutError = await new Promise ( writeToStream ) ;
284
+ attemptsRemaining -= 1 ;
285
+ } while ( ! wroteToStreamWithoutError && attemptsRemaining > 0 ) ;
286
+
287
+ if ( wroteToStreamWithoutError ) {
288
+ this . startMessagesHandlers ( ) ;
289
+ this . logger . info (
290
+ `started monitor to ${ this . port ?. address } using ${ this . port ?. protocol } `
291
+ ) ;
292
+ this . updateClientsSettings ( {
293
+ monitorUISettings : { connected : true , serialPort : this . port . address } ,
294
+ } ) ;
295
+ return Status . OK ;
296
+ } else {
243
297
this . logger . warn (
244
298
`failed starting monitor to ${ this . port ?. address } using ${ this . port ?. protocol } `
245
299
) ;
246
- resolve ( Status . NOT_CONNECTED ) ;
247
- } ) ;
248
-
249
- const connectTimeout = new Promise < Status > ( ( resolve ) => {
250
- setTimeout ( async ( ) => {
251
- this . logger . warn (
252
- `timeout starting monitor to ${ this . port ?. address } using ${ this . port ?. protocol } `
253
- ) ;
254
- resolve ( Status . NOT_CONNECTED ) ;
255
- } , 1000 ) ;
256
- } ) ;
257
- // Try opening a monitor connection with a timeout
258
- return await Promise . race ( [ connect , connectTimeout ] ) ;
300
+ return Status . NOT_CONNECTED ;
301
+ }
259
302
}
260
303
261
304
/**
0 commit comments