Skip to content

Fix Compile gRPC call hangs when there is a lot of output #2171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 64 additions & 45 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (s *ArduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardS

// BoardListWatch FIXMEDOC
func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_BoardListWatchServer) error {
syncSend := NewSynchronizedSend(stream.Send)
msg, err := stream.Recv()
if err == io.EOF {
return nil
Expand All @@ -97,7 +98,7 @@ func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_Boa

if msg.Instance == nil {
err = fmt.Errorf(tr("no instance specified"))
stream.Send(&rpc.BoardListWatchResponse{
syncSend.Send(&rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
})
Expand Down Expand Up @@ -132,7 +133,7 @@ func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_Boa
}()

for event := range eventsChan {
if err := stream.Send(event); err != nil {
if err := syncSend.Send(event); err != nil {
logrus.Infof("sending board watch message: %v", err)
}
}
Expand All @@ -148,16 +149,18 @@ func (s *ArduinoCoreServerImpl) Destroy(ctx context.Context, req *rpc.DestroyReq

// UpdateIndex FIXMEDOC
func (s *ArduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream rpc.ArduinoCoreService_UpdateIndexServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := commands.UpdateIndex(stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.UpdateIndexResponse{DownloadProgress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.UpdateIndexResponse{DownloadProgress: p}) },
)
return convertErrorToRPCStatus(err)
}

// UpdateLibrariesIndex FIXMEDOC
func (s *ArduinoCoreServerImpl) UpdateLibrariesIndex(req *rpc.UpdateLibrariesIndexRequest, stream rpc.ArduinoCoreService_UpdateLibrariesIndexServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := commands.UpdateLibrariesIndex(stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.UpdateLibrariesIndexResponse{DownloadProgress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.UpdateLibrariesIndexResponse{DownloadProgress: p}) },
)
return convertErrorToRPCStatus(err)
}
Expand All @@ -177,9 +180,8 @@ func (s *ArduinoCoreServerImpl) Create(ctx context.Context, req *rpc.CreateReque

// Init FIXMEDOC
func (s *ArduinoCoreServerImpl) Init(req *rpc.InitRequest, stream rpc.ArduinoCoreService_InitServer) error {
err := commands.Init(req, func(message *rpc.InitResponse) {
stream.Send(message)
})
syncSend := NewSynchronizedSend(stream.Send)
err := commands.Init(req, func(message *rpc.InitResponse) { syncSend.Send(message) })
return convertErrorToRPCStatus(err)
}

Expand All @@ -202,16 +204,17 @@ func (s *ArduinoCoreServerImpl) LoadSketch(ctx context.Context, req *rpc.LoadSke

// Compile FIXMEDOC
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.ArduinoCoreService_CompileServer) error {
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.CompileResponse{ErrStream: data}) })
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.CompileResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.CompileResponse{ErrStream: data}) })
compileResp, compileErr := compile.Compile(
stream.Context(), req, outStream, errStream,
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResponse{Progress: p}) })
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.CompileResponse{Progress: p}) })
outStream.Close()
errStream.Close()
var compileRespSendErr error
if compileResp != nil {
compileRespSendErr = stream.Send(compileResp)
compileRespSendErr = syncSend.Send(compileResp)
}
if compileErr != nil {
return convertErrorToRPCStatus(compileErr)
Expand All @@ -221,52 +224,56 @@ func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileRequest, stream rpc.Ardu

// PlatformInstall FIXMEDOC
func (s *ArduinoCoreServerImpl) PlatformInstall(req *rpc.PlatformInstallRequest, stream rpc.ArduinoCoreService_PlatformInstallServer) error {
syncSend := NewSynchronizedSend(stream.Send)
resp, err := core.PlatformInstall(
stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformInstallResponse{Progress: p}) },
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformInstallResponse{TaskProgress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.PlatformInstallResponse{Progress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.PlatformInstallResponse{TaskProgress: p}) },
)
if err != nil {
return convertErrorToRPCStatus(err)
}
return stream.Send(resp)
return syncSend.Send(resp)
}

// PlatformDownload FIXMEDOC
func (s *ArduinoCoreServerImpl) PlatformDownload(req *rpc.PlatformDownloadRequest, stream rpc.ArduinoCoreService_PlatformDownloadServer) error {
syncSend := NewSynchronizedSend(stream.Send)
resp, err := core.PlatformDownload(
stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformDownloadResponse{Progress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.PlatformDownloadResponse{Progress: p}) },
)
if err != nil {
return convertErrorToRPCStatus(err)
}
return stream.Send(resp)
return syncSend.Send(resp)
}

// PlatformUninstall FIXMEDOC
func (s *ArduinoCoreServerImpl) PlatformUninstall(req *rpc.PlatformUninstallRequest, stream rpc.ArduinoCoreService_PlatformUninstallServer) error {
syncSend := NewSynchronizedSend(stream.Send)
resp, err := core.PlatformUninstall(
stream.Context(), req,
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformUninstallResponse{TaskProgress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.PlatformUninstallResponse{TaskProgress: p}) },
)
if err != nil {
return convertErrorToRPCStatus(err)
}
return stream.Send(resp)
return syncSend.Send(resp)
}

// PlatformUpgrade FIXMEDOC
func (s *ArduinoCoreServerImpl) PlatformUpgrade(req *rpc.PlatformUpgradeRequest, stream rpc.ArduinoCoreService_PlatformUpgradeServer) error {
syncSend := NewSynchronizedSend(stream.Send)
resp, err := core.PlatformUpgrade(
stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformUpgradeResponse{Progress: p}) },
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformUpgradeResponse{TaskProgress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.PlatformUpgradeResponse{Progress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.PlatformUpgradeResponse{TaskProgress: p}) },
)
if err != nil {
return convertErrorToRPCStatus(err)
}
return stream.Send(resp)
return syncSend.Send(resp)
}

// PlatformSearch FIXMEDOC
Expand All @@ -286,8 +293,9 @@ func (s *ArduinoCoreServerImpl) PlatformList(ctx context.Context, req *rpc.Platf

// Upload FIXMEDOC
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.ArduinoCoreService_UploadServer) error {
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadResponse{ErrStream: data}) })
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadResponse{ErrStream: data}) })
err := upload.Upload(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
Expand All @@ -299,8 +307,9 @@ func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadRequest, stream rpc.Arduin

// UploadUsingProgrammer FIXMEDOC
func (s *ArduinoCoreServerImpl) UploadUsingProgrammer(req *rpc.UploadUsingProgrammerRequest, stream rpc.ArduinoCoreService_UploadUsingProgrammerServer) error {
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadUsingProgrammerResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.UploadUsingProgrammerResponse{ErrStream: data}) })
err := upload.UsingProgrammer(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
Expand All @@ -318,15 +327,16 @@ func (s *ArduinoCoreServerImpl) SupportedUserFields(ctx context.Context, req *rp

// BurnBootloader FIXMEDOC
func (s *ArduinoCoreServerImpl) BurnBootloader(req *rpc.BurnBootloaderRequest, stream rpc.ArduinoCoreService_BurnBootloaderServer) error {
outStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { stream.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
syncSend := NewSynchronizedSend(stream.Send)
outStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.BurnBootloaderResponse{OutStream: data}) })
errStream := feedStreamTo(func(data []byte) { syncSend.Send(&rpc.BurnBootloaderResponse{ErrStream: data}) })
resp, err := upload.BurnBootloader(stream.Context(), req, outStream, errStream)
outStream.Close()
errStream.Close()
if err != nil {
return convertErrorToRPCStatus(err)
}
return stream.Send(resp)
return syncSend.Send(resp)
}

// ListProgrammersAvailableForUpload FIXMEDOC
Expand All @@ -337,49 +347,54 @@ func (s *ArduinoCoreServerImpl) ListProgrammersAvailableForUpload(ctx context.Co

// LibraryDownload FIXMEDOC
func (s *ArduinoCoreServerImpl) LibraryDownload(req *rpc.LibraryDownloadRequest, stream rpc.ArduinoCoreService_LibraryDownloadServer) error {
syncSend := NewSynchronizedSend(stream.Send)
resp, err := lib.LibraryDownload(
stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryDownloadResponse{Progress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryDownloadResponse{Progress: p}) },
)
if err != nil {
return convertErrorToRPCStatus(err)
}
return stream.Send(resp)
return syncSend.Send(resp)
}

// LibraryInstall FIXMEDOC
func (s *ArduinoCoreServerImpl) LibraryInstall(req *rpc.LibraryInstallRequest, stream rpc.ArduinoCoreService_LibraryInstallServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := lib.LibraryInstall(
stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryInstallResponse{Progress: p}) },
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryInstallResponse{TaskProgress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryInstallResponse{Progress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryInstallResponse{TaskProgress: p}) },
)
return convertErrorToRPCStatus(err)
}

// LibraryUpgrade FIXMEDOC
func (s *ArduinoCoreServerImpl) LibraryUpgrade(req *rpc.LibraryUpgradeRequest, stream rpc.ArduinoCoreService_LibraryUpgradeServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := lib.LibraryUpgrade(
stream.Context(), req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryUpgradeResponse{Progress: p}) },
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryUpgradeResponse{TaskProgress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryUpgradeResponse{Progress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryUpgradeResponse{TaskProgress: p}) },
)
return convertErrorToRPCStatus(err)
}

// LibraryUninstall FIXMEDOC
func (s *ArduinoCoreServerImpl) LibraryUninstall(req *rpc.LibraryUninstallRequest, stream rpc.ArduinoCoreService_LibraryUninstallServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := lib.LibraryUninstall(stream.Context(), req,
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryUninstallResponse{TaskProgress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryUninstallResponse{TaskProgress: p}) },
)
return convertErrorToRPCStatus(err)
}

// LibraryUpgradeAll FIXMEDOC
func (s *ArduinoCoreServerImpl) LibraryUpgradeAll(req *rpc.LibraryUpgradeAllRequest, stream rpc.ArduinoCoreService_LibraryUpgradeAllServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := lib.LibraryUpgradeAll(req,
func(p *rpc.DownloadProgress) { stream.Send(&rpc.LibraryUpgradeAllResponse{Progress: p}) },
func(p *rpc.TaskProgress) { stream.Send(&rpc.LibraryUpgradeAllResponse{TaskProgress: p}) },
func(p *rpc.DownloadProgress) { syncSend.Send(&rpc.LibraryUpgradeAllResponse{Progress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.LibraryUpgradeAllResponse{TaskProgress: p}) },
)
return convertErrorToRPCStatus(err)
}
Expand Down Expand Up @@ -410,18 +425,20 @@ func (s *ArduinoCoreServerImpl) ArchiveSketch(ctx context.Context, req *rpc.Arch

// ZipLibraryInstall FIXMEDOC
func (s *ArduinoCoreServerImpl) ZipLibraryInstall(req *rpc.ZipLibraryInstallRequest, stream rpc.ArduinoCoreService_ZipLibraryInstallServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := lib.ZipLibraryInstall(
stream.Context(), req,
func(p *rpc.TaskProgress) { stream.Send(&rpc.ZipLibraryInstallResponse{TaskProgress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.ZipLibraryInstallResponse{TaskProgress: p}) },
)
return convertErrorToRPCStatus(err)
}

// GitLibraryInstall FIXMEDOC
func (s *ArduinoCoreServerImpl) GitLibraryInstall(req *rpc.GitLibraryInstallRequest, stream rpc.ArduinoCoreService_GitLibraryInstallServer) error {
syncSend := NewSynchronizedSend(stream.Send)
err := lib.GitLibraryInstall(
stream.Context(), req,
func(p *rpc.TaskProgress) { stream.Send(&rpc.GitLibraryInstallResponse{TaskProgress: p}) },
func(p *rpc.TaskProgress) { syncSend.Send(&rpc.GitLibraryInstallResponse{TaskProgress: p}) },
)
return convertErrorToRPCStatus(err)
}
Expand All @@ -434,6 +451,8 @@ func (s *ArduinoCoreServerImpl) EnumerateMonitorPortSettings(ctx context.Context

// Monitor FIXMEDOC
func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
syncSend := NewSynchronizedSend(stream.Send)

// The configuration must be sent on the first message
req, err := stream.Recv()
if err != nil {
Expand All @@ -446,7 +465,7 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}

// Send a message with Success set to true to notify the caller of the port being now active
_ = stream.Send(&rpc.MonitorResponse{Success: true})
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})

cancelCtx, cancel := context.WithCancel(stream.Context())
go func() {
Expand All @@ -457,13 +476,13 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
return
}
if err != nil {
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if conf := msg.GetPortConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := portProxy.Config(c.SettingId, c.Value); err != nil {
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
}
}
}
Expand All @@ -474,7 +493,7 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
return
}
if err != nil {
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
tx = tx[n:]
Expand All @@ -491,10 +510,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
break
}
if err != nil {
stream.Send(&rpc.MonitorResponse{Error: err.Error()})
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
break
}
if err := stream.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
break
}
}
Expand Down
23 changes: 23 additions & 0 deletions commands/daemon/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,26 @@ func consumeStreamFrom(reader func() ([]byte, error)) io.Reader {
}()
return r
}

// SynchronizedSender is a sender function with an extra protection for
// concurrent writes, if multiple threads call the Send method they will
// be blocked and serialized.
type SynchronizedSender[T any] struct {
lock sync.Mutex
protectedSend func(T) error
}

// Send the message using the underlyng stream.
func (s *SynchronizedSender[T]) Send(value T) error {
s.lock.Lock()
err := s.protectedSend(value)
s.lock.Unlock()
return err
}

// NewSynchronizedSend takes a Send function and wraps it in a SynchronizedSender
func NewSynchronizedSend[T any](send func(T) error) *SynchronizedSender[T] {
return &SynchronizedSender[T]{
protectedSend: send,
}
}
5 changes: 3 additions & 2 deletions internal/integrationtest/arduino-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,15 @@ func (inst *ArduinoCLIInstance) PlatformInstall(ctx context.Context, packager, a
}

// Compile calls the "Compile" gRPC method.
func (inst *ArduinoCLIInstance) Compile(ctx context.Context, fqbn, sketchPath string) (commands.ArduinoCoreService_CompileClient, error) {
func (inst *ArduinoCLIInstance) Compile(ctx context.Context, fqbn, sketchPath string, warnings string) (commands.ArduinoCoreService_CompileClient, error) {
compileCl, err := inst.cli.daemonClient.Compile(ctx, &commands.CompileRequest{
Instance: inst.instance,
Fqbn: fqbn,
SketchPath: sketchPath,
Verbose: true,
Warnings: warnings,
})
logCallf(">>> Compile(%v %v)\n", fqbn, sketchPath)
logCallf(">>> Compile(%v %v warnings=%v)\n", fqbn, sketchPath, warnings)
return compileCl, err
}

Expand Down
Loading