Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ab7d149

Browse files
committedJun 30, 2022
Correctly synchronize streams (Fix: some lines cut-off at the end of the output)
1 parent 9b6c9c4 commit ab7d149

File tree

3 files changed

+37
-23
lines changed

3 files changed

+37
-23
lines changed
 

‎arduino/utils/stream.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616
package utils
1717

1818
import (
19+
"context"
1920
"io"
2021
"time"
2122
)
2223

2324
// FeedStreamTo creates a pipe to pass data to the writer function.
2425
// FeedStreamTo returns the io.Writer side of the pipe, on which the user can write data
25-
func FeedStreamTo(writer func(data []byte)) io.Writer {
26+
func FeedStreamTo(writer func(data []byte)) (io.WriteCloser, context.Context) {
27+
ctx, cancel := context.WithCancel(context.Background())
2628
r, w := io.Pipe()
2729
go func() {
30+
defer cancel()
2831
data := make([]byte, 16384)
2932
for {
3033
if n, err := r.Read(data); err == nil {
@@ -41,7 +44,7 @@ func FeedStreamTo(writer func(data []byte)) io.Writer {
4144
}
4245
}
4346
}()
44-
return w
47+
return w, ctx
4548
}
4649

4750
// ConsumeStreamFrom creates a pipe to consume data from the reader function.

‎commands/daemon/daemon.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -266,15 +266,19 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke
266266

267267
// Compile FIXMEDOC
268268
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
269+
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
270+
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
269271
resp, err := compile.Compile(
270-
stream.Context(), req,
271-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) }),
272-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) }),
272+
stream.Context(), req, outStream, errStream,
273273
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) },
274274
false) // Set debug to false
275+
outStream.Close()
276+
errStream.Close()
275277
if err != nil {
276278
return convertErrorToRPCStatus(err)
277279
}
280+
<-outCtx.Done()
281+
<-errCtx.Done()
278282
return stream.Send(resp)
279283
}
280284

@@ -345,27 +349,31 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf
345349

346350
// Upload FIXMEDOC
347351
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
348-
resp, err := upload.Upload(
349-
stream.Context(), req,
350-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) }),
351-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) }),
352-
)
352+
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
353+
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
354+
resp, err := upload.Upload(stream.Context(), req, outStream, errStream)
355+
outStream.Close()
356+
errStream.Close()
353357
if err != nil {
354358
return convertErrorToRPCStatus(err)
355359
}
360+
<-outCtx.Done()
361+
<-errCtx.Done()
356362
return stream.Send(resp)
357363
}
358364

359365
// UploadUsingProgrammer FIXMEDOC
360366
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
361-
resp, err := upload.UsingProgrammer(
362-
stream.Context(), req,
363-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) }),
364-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) }),
365-
)
367+
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
368+
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
369+
resp, err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
370+
outStream.Close()
371+
errStream.Close()
366372
if err != nil {
367373
return convertErrorToRPCStatus(err)
368374
}
375+
<-outCtx.Done()
376+
<-errCtx.Done()
369377
return stream.Send(resp)
370378
}
371379

@@ -377,14 +385,16 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp
377385

378386
// BurnBootloader FIXMEDOC
379387
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
380-
resp, err := upload.BurnBootloader(
381-
stream.Context(), req,
382-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) }),
383-
utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) }),
384-
)
388+
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
389+
errStream, errCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
390+
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
391+
outStream.Close()
392+
errStream.Close()
385393
if err != nil {
386394
return convertErrorToRPCStatus(err)
387395
}
396+
<-outCtx.Done()
397+
<-errCtx.Done()
388398
return stream.Send(resp)
389399
}
390400

‎commands/daemon/debug.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
5050
// Launch debug recipe attaching stdin and out to grpc streaming
5151
signalChan := make(chan os.Signal)
5252
defer close(signalChan)
53+
outStream, outCtx := utils.FeedStreamTo(func(data []byte) { stream.Send(&dbg.DebugResponse{Data: data}) })
5354
resp, debugErr := cmd.Debug(stream.Context(), req,
5455
utils.ConsumeStreamFrom(func() ([]byte, error) {
5556
command, err := stream.Recv()
@@ -58,13 +59,13 @@ func (s *DebugService) Debug(stream dbg.DebugService_DebugServer) error {
5859
}
5960
return command.GetData(), err
6061
}),
61-
utils.FeedStreamTo(func(data []byte) {
62-
stream.Send(&dbg.DebugResponse{Data: data})
63-
}),
62+
outStream,
6463
signalChan)
64+
outStream.Close()
6565
if debugErr != nil {
6666
return debugErr
6767
}
68+
<-outCtx.Done()
6869
return stream.Send(resp)
6970
}
7071

0 commit comments

Comments
 (0)
Please sign in to comment.