Skip to content

Commit 855c238

Browse files
authored
Fix Compile gRPC call hangs when there is a lot of output (#2171)
* Added integration test * Apply stream-send synchronization in Compile grpc call * Apply the same pattern to all daemon gRPC calls
1 parent ede5a78 commit 855c238

File tree

6 files changed

+2671
-50
lines changed

6 files changed

+2671
-50
lines changed

Diff for: commands/daemon/daemon.go

+64-45
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func (s *ArduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardS
8787

8888
// BoardListWatch FIXMEDOC
8989
func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_BoardListWatchServer) error {
90+
syncSend := NewSynchronizedSend(stream.Send)
9091
msg, err := stream.Recv()
9192
if err == io.EOF {
9293
return nil
@@ -97,7 +98,7 @@ func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_Boa
9798

9899
if msg.Instance == nil {
99100
err = fmt.Errorf(tr("no instance specified"))
100-
stream.Send(&rpc.BoardListWatchResponse{
101+
syncSend.Send(&rpc.BoardListWatchResponse{
101102
EventType: "error",
102103
Error: err.Error(),
103104
})
@@ -132,7 +133,7 @@ func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_Boa
132133
}()
133134

134135
for event := range eventsChan {
135-
if err := stream.Send(event); err != nil {
136+
if err := syncSend.Send(event); err != nil {
136137
logrus.Infof("sending board watch message: %v", err)
137138
}
138139
}
@@ -148,16 +149,18 @@ func (s *ArduinoCoreServerImpl) Destroy(ctx context.Context, req *rpc.DestroyReq
148149

149150
// UpdateIndex FIXMEDOC
150151
func (s *ArduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream rpc.ArduinoCoreService_UpdateIndexServer) error {
152+
syncSend := NewSynchronizedSend(stream.Send)
151153
err := commands.UpdateIndex(stream.Context(), req,
152-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.UpdateIndexResponse{DownloadProgress: p}) },
154+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.UpdateIndexResponse{DownloadProgress: p}) },
153155
)
154156
return convertErrorToRPCStatus(err)
155157
}
156158

157159
// UpdateLibrariesIndex FIXMEDOC
158160
func (s *ArduinoCoreServerImpl) UpdateLibrariesIndex(req *rpc.UpdateLibrariesIndexRequest, stream rpc.ArduinoCoreService_UpdateLibrariesIndexServer) error {
161+
syncSend := NewSynchronizedSend(stream.Send)
159162
err := commands.UpdateLibrariesIndex(stream.Context(), req,
160-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.UpdateLibrariesIndexResponse{DownloadProgress: p}) },
163+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.UpdateLibrariesIndexResponse{DownloadProgress: p}) },
161164
)
162165
return convertErrorToRPCStatus(err)
163166
}
@@ -177,9 +180,8 @@ func (s *ArduinoCoreServerImpl) Create(ctx context.Context, req *rpc.CreateReque
177180

178181
// Init FIXMEDOC
179182
func (s *ArduinoCoreServerImpl) Init(req *rpc.InitRequest, stream rpc.ArduinoCoreService_InitServer) error {
180-
err := commands.Init(req, func(message *rpc.InitResponse) {
181-
stream.Send(message)
182-
})
183+
syncSend := NewSynchronizedSend(stream.Send)
184+
err := commands.Init(req, func(message *rpc.InitResponse) { syncSend.Send(message) })
183185
return convertErrorToRPCStatus(err)
184186
}
185187

@@ -202,16 +204,17 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke
202204

203205
// Compile FIXMEDOC
204206
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
205-
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
206-
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
207+
syncSend := NewSynchronizedSend(stream.Send)
208+
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.CompileResponse{OutStream: data}) })
209+
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.CompileResponse{ErrStream: data}) })
207210
compileResp, compileErr := compile.Compile(
208211
stream.Context(), req, outStream, errStream,
209-
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) })
212+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.CompileResponse{Progress: p}) })
210213
outStream.Close()
211214
errStream.Close()
212215
var compileRespSendErr error
213216
if compileResp != nil {
214-
compileRespSendErr = stream.Send(compileResp)
217+
compileRespSendErr = syncSend.Send(compileResp)
215218
}
216219
if compileErr != nil {
217220
return convertErrorToRPCStatus(compileErr)
@@ -221,52 +224,56 @@ func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.Ardu
221224

222225
// PlatformInstall FIXMEDOC
223226
func (s *ArduinoCoreServerImpl) PlatformInstall(req *rpc.PlatformInstallRequest, stream rpc.ArduinoCoreService_PlatformInstallServer) error {
227+
syncSend := NewSynchronizedSend(stream.Send)
224228
resp, err := core.PlatformInstall(
225229
stream.Context(), req,
226-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformInstallResponse{Progress: p}) },
227-
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformInstallResponse{TaskProgress: p}) },
230+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.PlatformInstallResponse{Progress: p}) },
231+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.PlatformInstallResponse{TaskProgress: p}) },
228232
)
229233
if err != nil {
230234
return convertErrorToRPCStatus(err)
231235
}
232-
return stream.Send(resp)
236+
return syncSend.Send(resp)
233237
}
234238

235239
// PlatformDownload FIXMEDOC
236240
func (s *ArduinoCoreServerImpl) PlatformDownload(req *rpc.PlatformDownloadRequest, stream rpc.ArduinoCoreService_PlatformDownloadServer) error {
241+
syncSend := NewSynchronizedSend(stream.Send)
237242
resp, err := core.PlatformDownload(
238243
stream.Context(), req,
239-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformDownloadResponse{Progress: p}) },
244+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.PlatformDownloadResponse{Progress: p}) },
240245
)
241246
if err != nil {
242247
return convertErrorToRPCStatus(err)
243248
}
244-
return stream.Send(resp)
249+
return syncSend.Send(resp)
245250
}
246251

247252
// PlatformUninstall FIXMEDOC
248253
func (s *ArduinoCoreServerImpl) PlatformUninstall(req *rpc.PlatformUninstallRequest, stream rpc.ArduinoCoreService_PlatformUninstallServer) error {
254+
syncSend := NewSynchronizedSend(stream.Send)
249255
resp, err := core.PlatformUninstall(
250256
stream.Context(), req,
251-
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformUninstallResponse{TaskProgress: p}) },
257+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.PlatformUninstallResponse{TaskProgress: p}) },
252258
)
253259
if err != nil {
254260
return convertErrorToRPCStatus(err)
255261
}
256-
return stream.Send(resp)
262+
return syncSend.Send(resp)
257263
}
258264

259265
// PlatformUpgrade FIXMEDOC
260266
func (s *ArduinoCoreServerImpl) PlatformUpgrade(req *rpc.PlatformUpgradeRequest, stream rpc.ArduinoCoreService_PlatformUpgradeServer) error {
267+
syncSend := NewSynchronizedSend(stream.Send)
261268
resp, err := core.PlatformUpgrade(
262269
stream.Context(), req,
263-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformUpgradeResponse{Progress: p}) },
264-
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformUpgradeResponse{TaskProgress: p}) },
270+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.PlatformUpgradeResponse{Progress: p}) },
271+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.PlatformUpgradeResponse{TaskProgress: p}) },
265272
)
266273
if err != nil {
267274
return convertErrorToRPCStatus(err)
268275
}
269-
return stream.Send(resp)
276+
return syncSend.Send(resp)
270277
}
271278

272279
// PlatformSearch FIXMEDOC
@@ -286,8 +293,9 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf
286293

287294
// Upload FIXMEDOC
288295
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
289-
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
290-
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
296+
syncSend := NewSynchronizedSend(stream.Send)
297+
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadResponse{OutStream: data}) })
298+
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadResponse{ErrStream: data}) })
291299
err := upload.Upload(stream.Context(), req, outStream, errStream)
292300
outStream.Close()
293301
errStream.Close()
@@ -299,8 +307,9 @@ func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.Arduin
299307

300308
// UploadUsingProgrammer FIXMEDOC
301309
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
302-
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
303-
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
310+
syncSend := NewSynchronizedSend(stream.Send)
311+
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
312+
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
304313
err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
305314
outStream.Close()
306315
errStream.Close()
@@ -318,15 +327,16 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp
318327

319328
// BurnBootloader FIXMEDOC
320329
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
321-
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
322-
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
330+
syncSend := NewSynchronizedSend(stream.Send)
331+
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
332+
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
323333
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
324334
outStream.Close()
325335
errStream.Close()
326336
if err != nil {
327337
return convertErrorToRPCStatus(err)
328338
}
329-
return stream.Send(resp)
339+
return syncSend.Send(resp)
330340
}
331341

332342
// ListProgrammersAvailableForUpload FIXMEDOC
@@ -337,49 +347,54 @@ func (s *ArduinoCoreServerImpl) ListProgrammersAvailableForUpload(ctx context.Co
337347

338348
// LibraryDownload FIXMEDOC
339349
func (s *ArduinoCoreServerImpl) LibraryDownload(req *rpc.LibraryDownloadRequest, stream rpc.ArduinoCoreService_LibraryDownloadServer) error {
350+
syncSend := NewSynchronizedSend(stream.Send)
340351
resp, err := lib.LibraryDownload(
341352
stream.Context(), req,
342-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryDownloadResponse{Progress: p}) },
353+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryDownloadResponse{Progress: p}) },
343354
)
344355
if err != nil {
345356
return convertErrorToRPCStatus(err)
346357
}
347-
return stream.Send(resp)
358+
return syncSend.Send(resp)
348359
}
349360

350361
// LibraryInstall FIXMEDOC
351362
func (s *ArduinoCoreServerImpl) LibraryInstall(req *rpc.LibraryInstallRequest, stream rpc.ArduinoCoreService_LibraryInstallServer) error {
363+
syncSend := NewSynchronizedSend(stream.Send)
352364
err := lib.LibraryInstall(
353365
stream.Context(), req,
354-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryInstallResponse{Progress: p}) },
355-
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryInstallResponse{TaskProgress: p}) },
366+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryInstallResponse{Progress: p}) },
367+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryInstallResponse{TaskProgress: p}) },
356368
)
357369
return convertErrorToRPCStatus(err)
358370
}
359371

360372
// LibraryUpgrade FIXMEDOC
361373
func (s *ArduinoCoreServerImpl) LibraryUpgrade(req *rpc.LibraryUpgradeRequest, stream rpc.ArduinoCoreService_LibraryUpgradeServer) error {
374+
syncSend := NewSynchronizedSend(stream.Send)
362375
err := lib.LibraryUpgrade(
363376
stream.Context(), req,
364-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryUpgradeResponse{Progress: p}) },
365-
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryUpgradeResponse{TaskProgress: p}) },
377+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryUpgradeResponse{Progress: p}) },
378+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryUpgradeResponse{TaskProgress: p}) },
366379
)
367380
return convertErrorToRPCStatus(err)
368381
}
369382

370383
// LibraryUninstall FIXMEDOC
371384
func (s *ArduinoCoreServerImpl) LibraryUninstall(req *rpc.LibraryUninstallRequest, stream rpc.ArduinoCoreService_LibraryUninstallServer) error {
385+
syncSend := NewSynchronizedSend(stream.Send)
372386
err := lib.LibraryUninstall(stream.Context(), req,
373-
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryUninstallResponse{TaskProgress: p}) },
387+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryUninstallResponse{TaskProgress: p}) },
374388
)
375389
return convertErrorToRPCStatus(err)
376390
}
377391

378392
// LibraryUpgradeAll FIXMEDOC
379393
func (s *ArduinoCoreServerImpl) LibraryUpgradeAll(req *rpc.LibraryUpgradeAllRequest, stream rpc.ArduinoCoreService_LibraryUpgradeAllServer) error {
394+
syncSend := NewSynchronizedSend(stream.Send)
380395
err := lib.LibraryUpgradeAll(req,
381-
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryUpgradeAllResponse{Progress: p}) },
382-
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryUpgradeAllResponse{TaskProgress: p}) },
396+
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryUpgradeAllResponse{Progress: p}) },
397+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryUpgradeAllResponse{TaskProgress: p}) },
383398
)
384399
return convertErrorToRPCStatus(err)
385400
}
@@ -410,18 +425,20 @@ func (s *ArduinoCoreServerImpl) ArchiveSketch(ctx context.Context, req *rpc.Arch
410425

411426
// ZipLibraryInstall FIXMEDOC
412427
func (s *ArduinoCoreServerImpl) ZipLibraryInstall(req *rpc.ZipLibraryInstallRequest, stream rpc.ArduinoCoreService_ZipLibraryInstallServer) error {
428+
syncSend := NewSynchronizedSend(stream.Send)
413429
err := lib.ZipLibraryInstall(
414430
stream.Context(), req,
415-
func(p *rpc.TaskProgress) { stream.Send(&rpc.ZipLibraryInstallResponse{TaskProgress: p}) },
431+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.ZipLibraryInstallResponse{TaskProgress: p}) },
416432
)
417433
return convertErrorToRPCStatus(err)
418434
}
419435

420436
// GitLibraryInstall FIXMEDOC
421437
func (s *ArduinoCoreServerImpl) GitLibraryInstall(req *rpc.GitLibraryInstallRequest, stream rpc.ArduinoCoreService_GitLibraryInstallServer) error {
438+
syncSend := NewSynchronizedSend(stream.Send)
422439
err := lib.GitLibraryInstall(
423440
stream.Context(), req,
424-
func(p *rpc.TaskProgress) { stream.Send(&rpc.GitLibraryInstallResponse{TaskProgress: p}) },
441+
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.GitLibraryInstallResponse{TaskProgress: p}) },
425442
)
426443
return convertErrorToRPCStatus(err)
427444
}
@@ -434,6 +451,8 @@ func (s *ArduinoCoreServerImpl) EnumerateMonitorPortSettings(ctx context.Context
434451

435452
// Monitor FIXMEDOC
436453
func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
454+
syncSend := NewSynchronizedSend(stream.Send)
455+
437456
// The configuration must be sent on the first message
438457
req, err := stream.Recv()
439458
if err != nil {
@@ -446,7 +465,7 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
446465
}
447466

448467
// Send a message with Success set to true to notify the caller of the port being now active
449-
_ = stream.Send(&rpc.MonitorResponse{Success: true})
468+
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})
450469

451470
cancelCtx, cancel := context.WithCancel(stream.Context())
452471
go func() {
@@ -457,13 +476,13 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
457476
return
458477
}
459478
if err != nil {
460-
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
479+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
461480
return
462481
}
463482
if conf := msg.GetPortConfiguration(); conf != nil {
464483
for _, c := range conf.GetSettings() {
465484
if err := portProxy.Config(c.SettingId, c.Value); err != nil {
466-
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
485+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
467486
}
468487
}
469488
}
@@ -474,7 +493,7 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
474493
return
475494
}
476495
if err != nil {
477-
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
496+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
478497
return
479498
}
480499
tx = tx[n:]
@@ -491,10 +510,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
491510
break
492511
}
493512
if err != nil {
494-
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
513+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
495514
break
496515
}
497-
if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
516+
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
498517
break
499518
}
500519
}

Diff for: commands/daemon/stream.go

+23
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,26 @@ func consumeStreamFrom(reader func() ([]byte, error)) io.Reader {
9898
}()
9999
return r
100100
}
101+
102+
// SynchronizedSender is a sender function with an extra protection for
103+
// concurrent writes, if multiple threads call the Send method they will
104+
// be blocked and serialized.
105+
type SynchronizedSender[T any] struct {
106+
lock sync.Mutex
107+
protectedSend func(T) error
108+
}
109+
110+
// Send the message using the underlyng stream.
111+
func (s *SynchronizedSender[T]) Send(value T) error {
112+
s.lock.Lock()
113+
err := s.protectedSend(value)
114+
s.lock.Unlock()
115+
return err
116+
}
117+
118+
// NewSynchronizedSend takes a Send function and wraps it in a SynchronizedSender
119+
func NewSynchronizedSend[T any](send func(T) error) *SynchronizedSender[T] {
120+
return &SynchronizedSender[T]{
121+
protectedSend: send,
122+
}
123+
}

Diff for: internal/integrationtest/arduino-cli.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -388,14 +388,15 @@ func (inst *ArduinoCLIInstance) PlatformInstall(ctx context.Context, packager, a
388388
}
389389

390390
// Compile calls the "Compile" gRPC method.
391-
func (inst *ArduinoCLIInstance) Compile(ctx context.Context, fqbn, sketchPath string) (commands.ArduinoCoreService_CompileClient, error) {
391+
func (inst *ArduinoCLIInstance) Compile(ctx context.Context, fqbn, sketchPath string, warnings string) (commands.ArduinoCoreService_CompileClient, error) {
392392
compileCl, err := inst.cli.daemonClient.Compile(ctx, &commands.CompileRequest{
393393
Instance: inst.instance,
394394
Fqbn: fqbn,
395395
SketchPath: sketchPath,
396396
Verbose: true,
397+
Warnings: warnings,
397398
})
398-
logCallf(">>> Compile(%v %v)\n", fqbn, sketchPath)
399+
logCallf(">>> Compile(%v %v warnings=%v)\n", fqbn, sketchPath, warnings)
399400
return compileCl, err
400401
}
401402

0 commit comments

Comments
 (0)