Skip to content

Commit 7847cb9

Browse files
committed
Inlining methods in ArduinoCoreServiceImpl (part 8: Monitor)
This change is quite challenging because it implements a bidirectional streaming service. The gRPC implementation is slightly simpler, BTW the command-line requires a bit of streams fiddling to get the same behaviour as before because now: * the Monitor call do not return anymore a clean io.ReadWriteCloser. * the call to srv.Monitor is blocking until the port is closed or the context is canceled.
1 parent 620b4e5 commit 7847cb9

File tree

3 files changed

+195
-168
lines changed

3 files changed

+195
-168
lines changed

commands/service.go

-101
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,10 @@ package commands
1717

1818
import (
1919
"context"
20-
"errors"
21-
"io"
22-
"sync/atomic"
2320

2421
"github.com/arduino/arduino-cli/commands/cache"
25-
"github.com/arduino/arduino-cli/commands/cmderrors"
2622
"github.com/arduino/arduino-cli/commands/updatecheck"
2723
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
28-
"github.com/sirupsen/logrus"
2924
)
3025

3126
// NewArduinoCoreServer returns an implementation of the ArduinoCoreService gRPC service
@@ -356,102 +351,6 @@ func (s *arduinoCoreServerImpl) EnumerateMonitorPortSettings(ctx context.Context
356351
return EnumerateMonitorPortSettings(ctx, req)
357352
}
358353

359-
// Monitor FIXMEDOC
360-
func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
361-
syncSend := NewSynchronizedSend(stream.Send)
362-
363-
// The configuration must be sent on the first message
364-
req, err := stream.Recv()
365-
if err != nil {
366-
return err
367-
}
368-
369-
openReq := req.GetOpenRequest()
370-
if openReq == nil {
371-
return &cmderrors.InvalidInstanceError{}
372-
}
373-
portProxy, _, err := Monitor(stream.Context(), openReq)
374-
if err != nil {
375-
return err
376-
}
377-
378-
// Send a message with Success set to true to notify the caller of the port being now active
379-
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})
380-
381-
cancelCtx, cancel := context.WithCancel(stream.Context())
382-
gracefulCloseInitiated := &atomic.Bool{}
383-
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())
384-
385-
// gRPC stream receiver (gRPC data -> monitor, config, close)
386-
go func() {
387-
defer cancel()
388-
for {
389-
msg, err := stream.Recv()
390-
if errors.Is(err, io.EOF) {
391-
return
392-
}
393-
if err != nil {
394-
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
395-
return
396-
}
397-
if conf := msg.GetUpdatedConfiguration(); conf != nil {
398-
for _, c := range conf.GetSettings() {
399-
if err := portProxy.Config(c.GetSettingId(), c.GetValue()); err != nil {
400-
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
401-
}
402-
}
403-
}
404-
if closeMsg := msg.GetClose(); closeMsg {
405-
gracefulCloseInitiated.Store(true)
406-
if err := portProxy.Close(); err != nil {
407-
logrus.WithError(err).Debug("Error closing monitor port")
408-
}
409-
gracefulCloseCancel()
410-
}
411-
tx := msg.GetTxData()
412-
for len(tx) > 0 {
413-
n, err := portProxy.Write(tx)
414-
if errors.Is(err, io.EOF) {
415-
return
416-
}
417-
if err != nil {
418-
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
419-
return
420-
}
421-
tx = tx[n:]
422-
}
423-
}
424-
}()
425-
426-
// gRPC stream sender (monitor -> gRPC)
427-
go func() {
428-
defer cancel() // unlock the receiver
429-
buff := make([]byte, 4096)
430-
for {
431-
n, err := portProxy.Read(buff)
432-
if errors.Is(err, io.EOF) {
433-
break
434-
}
435-
if err != nil {
436-
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
437-
break
438-
}
439-
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
440-
break
441-
}
442-
}
443-
}()
444-
445-
<-cancelCtx.Done()
446-
if gracefulCloseInitiated.Load() {
447-
// Port closing has been initiated in the receiver
448-
<-gracefuleCloseCtx.Done()
449-
} else {
450-
portProxy.Close()
451-
}
452-
return nil
453-
}
454-
455354
// CheckForArduinoCLIUpdates FIXMEDOC
456355
func (s *arduinoCoreServerImpl) CheckForArduinoCLIUpdates(ctx context.Context, req *rpc.CheckForArduinoCLIUpdatesRequest) (*rpc.CheckForArduinoCLIUpdatesResponse, error) {
457356
return updatecheck.CheckForArduinoCLIUpdates(ctx, req)

commands/service_monitor.go

+178-52
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ package commands
1717

1818
import (
1919
"context"
20+
"errors"
2021
"fmt"
2122
"io"
23+
"sync/atomic"
2224

2325
"github.com/arduino/arduino-cli/commands/cmderrors"
2426
"github.com/arduino/arduino-cli/commands/internal/instances"
@@ -27,87 +29,211 @@ import (
2729
pluggableMonitor "github.com/arduino/arduino-cli/internal/arduino/monitor"
2830
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
2931
"github.com/arduino/go-properties-orderedmap"
32+
"github.com/djherbis/buffer"
33+
"github.com/djherbis/nio/v3"
3034
"github.com/sirupsen/logrus"
35+
"google.golang.org/grpc/metadata"
3136
)
3237

33-
// portProxy is an io.ReadWriteCloser that maps into the monitor port of the board
34-
type portProxy struct {
35-
rw io.ReadWriter
36-
changeSettingsCB func(setting, value string) error
37-
closeCB func() error
38+
type monitorPipeServer struct {
39+
ctx context.Context
40+
req atomic.Pointer[rpc.MonitorPortOpenRequest]
41+
in *nio.PipeReader
42+
out *nio.PipeWriter
3843
}
3944

40-
func (p *portProxy) Read(buff []byte) (int, error) {
41-
return p.rw.Read(buff)
45+
func (s *monitorPipeServer) Send(resp *rpc.MonitorResponse) error {
46+
if len(resp.GetRxData()) > 0 {
47+
if _, err := s.out.Write(resp.GetRxData()); err != nil {
48+
return err
49+
}
50+
}
51+
return nil
52+
}
53+
54+
func (s *monitorPipeServer) Recv() (r *rpc.MonitorRequest, e error) {
55+
if conf := s.req.Swap(nil); conf != nil {
56+
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_OpenRequest{OpenRequest: conf}}, nil
57+
}
58+
buff := make([]byte, 4096)
59+
n, err := s.in.Read(buff)
60+
if err != nil {
61+
return nil, err
62+
}
63+
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_TxData{TxData: buff[:n]}}, nil
4264
}
4365

44-
func (p *portProxy) Write(buff []byte) (int, error) {
45-
return p.rw.Write(buff)
66+
func (s *monitorPipeServer) Context() context.Context {
67+
return s.ctx
4668
}
4769

48-
// Config sets the port configuration setting to the specified value
49-
func (p *portProxy) Config(setting, value string) error {
50-
return p.changeSettingsCB(setting, value)
70+
func (s *monitorPipeServer) RecvMsg(m any) error { return nil }
71+
func (s *monitorPipeServer) SendHeader(metadata.MD) error { return nil }
72+
func (s *monitorPipeServer) SendMsg(m any) error { return nil }
73+
func (s *monitorPipeServer) SetHeader(metadata.MD) error { return nil }
74+
func (s *monitorPipeServer) SetTrailer(metadata.MD) {}
75+
76+
type monitorPipeClient struct {
77+
in *nio.PipeReader
78+
out *nio.PipeWriter
79+
close func()
5180
}
5281

53-
// Close the port
54-
func (p *portProxy) Close() error {
55-
return p.closeCB()
82+
func (s *monitorPipeClient) Read(buff []byte) (n int, err error) {
83+
return s.in.Read(buff)
5684
}
5785

58-
// Monitor opens a communication port. It returns a PortProxy to communicate with the port and a PortDescriptor
59-
// that describes the available configuration settings.
60-
func Monitor(ctx context.Context, req *rpc.MonitorPortOpenRequest) (*portProxy, *pluggableMonitor.PortDescriptor, error) {
61-
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
62-
if err != nil {
63-
return nil, nil, err
64-
}
65-
defer release()
86+
func (s *monitorPipeClient) Write(buff []byte) (n int, err error) {
87+
return s.out.Write(buff)
88+
}
6689

67-
m, boardSettings, err := findMonitorAndSettingsForProtocolAndBoard(pme, req.GetPort().GetProtocol(), req.GetFqbn())
90+
func (s *monitorPipeClient) Close() error {
91+
s.in.Close()
92+
s.out.Close()
93+
s.close()
94+
return nil
95+
}
96+
97+
// MonitorServerToReadWriteCloser creates a monitor server that proxies the data to a ReadWriteCloser.
98+
// The server is returned along with the ReadWriteCloser that can be used to send and receive data
99+
// to the server. The MonitorPortOpenRequest is used to configure the monitor.
100+
func MonitorServerToReadWriteCloser(ctx context.Context, req *rpc.MonitorPortOpenRequest) (rpc.ArduinoCoreService_MonitorServer, io.ReadWriteCloser) {
101+
server := &monitorPipeServer{}
102+
client := &monitorPipeClient{}
103+
server.req.Store(req)
104+
server.ctx, client.close = context.WithCancel(ctx)
105+
client.in, server.out = nio.Pipe(buffer.New(32 * 1024))
106+
server.in, client.out = nio.Pipe(buffer.New(32 * 1024))
107+
return server, client
108+
}
109+
110+
// Monitor opens a port monitor and streams data back and forth until the request is kept alive.
111+
func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorServer) error {
112+
// The configuration must be sent on the first message
113+
req, err := stream.Recv()
68114
if err != nil {
69-
return nil, nil, err
115+
return err
70116
}
71117

72-
if err := m.Run(); err != nil {
73-
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
118+
openReq := req.GetOpenRequest()
119+
if openReq == nil {
120+
return &cmderrors.InvalidInstanceError{}
74121
}
75122

76-
descriptor, err := m.Describe()
123+
pme, release, err := instances.GetPackageManagerExplorer(openReq.GetInstance())
77124
if err != nil {
78-
m.Quit()
79-
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
125+
return err
80126
}
81-
82-
// Apply user-requested settings
83-
if portConfig := req.GetPortConfiguration(); portConfig != nil {
127+
defer release()
128+
monitor, boardSettings, err := findMonitorAndSettingsForProtocolAndBoard(pme, openReq.GetPort().GetProtocol(), openReq.GetFqbn())
129+
if err != nil {
130+
return err
131+
}
132+
if err := monitor.Run(); err != nil {
133+
return &cmderrors.FailedMonitorError{Cause: err}
134+
}
135+
if _, err := monitor.Describe(); err != nil {
136+
monitor.Quit()
137+
return &cmderrors.FailedMonitorError{Cause: err}
138+
}
139+
if portConfig := openReq.GetPortConfiguration(); portConfig != nil {
84140
for _, setting := range portConfig.GetSettings() {
85-
boardSettings.Remove(setting.GetSettingId()) // Remove board settings overridden by the user
86-
if err := m.Configure(setting.GetSettingId(), setting.GetValue()); err != nil {
141+
boardSettings.Remove(setting.GetSettingId())
142+
if err := monitor.Configure(setting.GetSettingId(), setting.GetValue()); err != nil {
87143
logrus.Errorf("Could not set configuration %s=%s: %s", setting.GetSettingId(), setting.GetValue(), err)
88144
}
89145
}
90146
}
91-
// Apply specific board settings
92147
for setting, value := range boardSettings.AsMap() {
93-
m.Configure(setting, value)
148+
monitor.Configure(setting, value)
94149
}
95-
96-
monIO, err := m.Open(req.GetPort().GetAddress(), req.GetPort().GetProtocol())
150+
monitorIO, err := monitor.Open(openReq.GetPort().GetAddress(), openReq.GetPort().GetProtocol())
97151
if err != nil {
98-
m.Quit()
99-
return nil, nil, &cmderrors.FailedMonitorError{Cause: err}
100-
}
101-
102-
logrus.Infof("Port %s successfully opened", req.GetPort().GetAddress())
103-
return &portProxy{
104-
rw: monIO,
105-
changeSettingsCB: m.Configure,
106-
closeCB: func() error {
107-
m.Close()
108-
return m.Quit()
109-
},
110-
}, descriptor, nil
152+
monitor.Quit()
153+
return &cmderrors.FailedMonitorError{Cause: err}
154+
}
155+
logrus.Infof("Port %s successfully opened", openReq.GetPort().GetAddress())
156+
monitorClose := func() error {
157+
monitor.Close()
158+
return monitor.Quit()
159+
}
160+
161+
// Send a message with Success set to true to notify the caller of the port being now active
162+
syncSend := NewSynchronizedSend(stream.Send)
163+
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})
164+
165+
ctx, cancel := context.WithCancel(stream.Context())
166+
gracefulCloseInitiated := &atomic.Bool{}
167+
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())
168+
169+
// gRPC stream receiver (gRPC data -> monitor, config, close)
170+
go func() {
171+
defer cancel()
172+
for {
173+
msg, err := stream.Recv()
174+
if errors.Is(err, io.EOF) {
175+
return
176+
}
177+
if err != nil {
178+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
179+
return
180+
}
181+
if conf := msg.GetUpdatedConfiguration(); conf != nil {
182+
for _, c := range conf.GetSettings() {
183+
if err := monitor.Configure(c.GetSettingId(), c.GetValue()); err != nil {
184+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
185+
}
186+
}
187+
}
188+
if closeMsg := msg.GetClose(); closeMsg {
189+
gracefulCloseInitiated.Store(true)
190+
if err := monitorClose(); err != nil {
191+
logrus.WithError(err).Debug("Error closing monitor port")
192+
}
193+
gracefulCloseCancel()
194+
}
195+
tx := msg.GetTxData()
196+
for len(tx) > 0 {
197+
n, err := monitorIO.Write(tx)
198+
if errors.Is(err, io.EOF) {
199+
return
200+
}
201+
if err != nil {
202+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
203+
return
204+
}
205+
tx = tx[n:]
206+
}
207+
}
208+
}()
209+
210+
// gRPC stream sender (monitor -> gRPC)
211+
go func() {
212+
defer cancel() // unlock the receiver
213+
buff := make([]byte, 4096)
214+
for {
215+
n, err := monitorIO.Read(buff)
216+
if errors.Is(err, io.EOF) {
217+
break
218+
}
219+
if err != nil {
220+
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
221+
break
222+
}
223+
if err := syncSend.Send(&rpc.MonitorResponse{RxData: buff[:n]}); err != nil {
224+
break
225+
}
226+
}
227+
}()
228+
229+
<-ctx.Done()
230+
if gracefulCloseInitiated.Load() {
231+
// Port closing has been initiated in the receiver
232+
<-gracefuleCloseCtx.Done()
233+
} else {
234+
monitorClose()
235+
}
236+
return nil
111237
}
112238

113239
func findMonitorAndSettingsForProtocolAndBoard(pme *packagemanager.Explorer, protocol, fqbn string) (*pluggableMonitor.PluggableMonitor, *properties.Map, error) {

0 commit comments

Comments
 (0)