Skip to content

Commit 6093927

Browse files
committed
Inlining methods in ArduinoCoreServiceImpl (part 4: BoardListWatch)
The BoardListWatch RPC call has been converted into a method of the gRPC server implementation. This commit boasts an helper method to convert a gRPC streaming response into a channel.
1 parent e35c740 commit 6093927

File tree

14 files changed

+131
-67
lines changed

14 files changed

+131
-67
lines changed

Diff for: commands/grpc_streaming_helpers.go

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// This file is part of arduino-cli.
2+
//
3+
// Copyright 2024 ARDUINO SA (http://www.arduino.cc/)
4+
//
5+
// This software is released under the GNU General Public License version 3,
6+
// which covers the main part of arduino-cli.
7+
// The terms of this license can be found at:
8+
// https://www.gnu.org/licenses/gpl-3.0.en.html
9+
//
10+
// You can be released from the requirements of the above licenses by purchasing
11+
// a commercial license. Buying such a license is mandatory if you want to
12+
// modify or otherwise use the software for commercial activities involving the
13+
// Arduino software without disclosing the source code of your own applications.
14+
// To purchase a commercial license, send an email to [email protected].
15+
16+
package commands
17+
18+
import (
19+
"context"
20+
"errors"
21+
"sync"
22+
23+
"google.golang.org/grpc/metadata"
24+
)
25+
26+
type streamingResponseProxyToChan[T any] struct {
27+
ctx context.Context
28+
respChan chan<- *T
29+
respLock sync.Mutex
30+
}
31+
32+
func streamResponseToChan[T any](ctx context.Context) (*streamingResponseProxyToChan[T], <-chan *T) {
33+
respChan := make(chan *T, 1)
34+
w := &streamingResponseProxyToChan[T]{
35+
ctx: ctx,
36+
respChan: respChan,
37+
}
38+
go func() {
39+
<-ctx.Done()
40+
w.respLock.Lock()
41+
close(w.respChan)
42+
w.respChan = nil
43+
w.respLock.Unlock()
44+
}()
45+
return w, respChan
46+
}
47+
48+
func (w *streamingResponseProxyToChan[T]) Send(resp *T) error {
49+
w.respLock.Lock()
50+
if w.respChan != nil {
51+
w.respChan <- resp
52+
}
53+
w.respLock.Unlock()
54+
return nil
55+
}
56+
57+
func (w *streamingResponseProxyToChan[T]) Context() context.Context {
58+
return w.ctx
59+
}
60+
61+
func (w *streamingResponseProxyToChan[T]) RecvMsg(m any) error {
62+
return errors.New("RecvMsg not implemented")
63+
}
64+
65+
func (w *streamingResponseProxyToChan[T]) SendHeader(metadata.MD) error {
66+
return errors.New("SendHeader not implemented")
67+
}
68+
69+
func (w *streamingResponseProxyToChan[T]) SendMsg(m any) error {
70+
return errors.New("SendMsg not implemented")
71+
}
72+
73+
func (w *streamingResponseProxyToChan[T]) SetHeader(metadata.MD) error {
74+
return errors.New("SetHeader not implemented")
75+
}
76+
77+
func (w *streamingResponseProxyToChan[T]) SetTrailer(tr metadata.MD) {
78+
}

Diff for: commands/service.go

-32
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package commands
1818
import (
1919
"context"
2020
"errors"
21-
"fmt"
2221
"io"
2322
"sync/atomic"
2423

@@ -42,37 +41,6 @@ type arduinoCoreServerImpl struct {
4241
versionString string
4342
}
4443

45-
// BoardSearch exposes to the gRPC interface the board search command
46-
func (s *arduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
47-
return BoardSearch(ctx, req)
48-
}
49-
50-
// BoardListWatch FIXMEDOC
51-
func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
52-
syncSend := NewSynchronizedSend(stream.Send)
53-
if req.GetInstance() == nil {
54-
err := fmt.Errorf(tr("no instance specified"))
55-
syncSend.Send(&rpc.BoardListWatchResponse{
56-
EventType: "error",
57-
Error: err.Error(),
58-
})
59-
return err
60-
}
61-
62-
eventsChan, err := BoardListWatch(stream.Context(), req)
63-
if err != nil {
64-
return err
65-
}
66-
67-
for event := range eventsChan {
68-
if err := syncSend.Send(event); err != nil {
69-
logrus.Infof("sending board watch message: %v", err)
70-
}
71-
}
72-
73-
return nil
74-
}
75-
7644
// Destroy FIXMEDOC
7745
func (s *arduinoCoreServerImpl) Destroy(ctx context.Context, req *rpc.DestroyRequest) (*rpc.DestroyResponse, error) {
7846
return Destroy(ctx, req)

Diff for: commands/service_board_list.go

+24-10
Original file line numberDiff line numberDiff line change
@@ -262,29 +262,43 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool {
262262
return false
263263
}
264264

265-
// BoardListWatch returns a channel that receives boards connection and disconnection events.
266-
func BoardListWatch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) {
265+
// BoardListWatchProxyToChan return a stream, to be used in BoardListWatch method,
266+
// that proxies all the responses to a channel.
267+
func BoardListWatchProxyToChan(ctx context.Context) (rpc.ArduinoCoreService_BoardListWatchServer, <-chan *rpc.BoardListWatchResponse) {
268+
return streamResponseToChan[rpc.BoardListWatchResponse](ctx)
269+
}
270+
271+
// BoardListWatch FIXMEDOC
272+
func (s *arduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
273+
syncSend := NewSynchronizedSend(stream.Send)
274+
if req.GetInstance() == nil {
275+
err := fmt.Errorf(tr("no instance specified"))
276+
syncSend.Send(&rpc.BoardListWatchResponse{
277+
EventType: "error",
278+
Error: err.Error(),
279+
})
280+
return err
281+
}
282+
267283
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
268284
if err != nil {
269-
return nil, err
285+
return err
270286
}
271287
defer release()
272288
dm := pme.DiscoveryManager()
273289

274290
watcher, err := dm.Watch()
275291
if err != nil {
276-
return nil, err
292+
return err
277293
}
278294

279295
go func() {
280-
<-ctx.Done()
296+
<-stream.Context().Done()
281297
logrus.Trace("closed watch")
282298
watcher.Close()
283299
}()
284300

285-
outChan := make(chan *rpc.BoardListWatchResponse)
286301
go func() {
287-
defer close(outChan)
288302
for event := range watcher.Feed() {
289303
port := &rpc.DetectedPort{
290304
Port: rpc.DiscoveryPortToRPC(event.Port),
@@ -298,13 +312,13 @@ func BoardListWatch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan
298312
}
299313
port.MatchingBoards = boards
300314
}
301-
outChan <- &rpc.BoardListWatchResponse{
315+
stream.Send(&rpc.BoardListWatchResponse{
302316
EventType: event.Type,
303317
Port: port,
304318
Error: boardsError,
305-
}
319+
})
306320
}
307321
}()
308322

309-
return outChan, nil
323+
return nil
310324
}

Diff for: commands/service_board_search.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
// Boards are searched in all platforms, including those in the index that are not yet
3030
// installed. Note that platforms that are not installed don't include boards' FQBNs.
3131
// If no search argument is used all boards are returned.
32-
func BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
32+
func (s *arduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardSearchRequest) (*rpc.BoardSearchResponse, error) {
3333
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
3434
if err != nil {
3535
return nil, err

Diff for: internal/cli/arguments/fqbn.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func CalculateFQBNAndPort(portArgs *Port, fqbnArg *Fqbn, instance *rpc.Instance,
8585
return fqbn, port
8686
}
8787

88-
port, err := portArgs.GetPort(instance, defaultAddress, defaultProtocol)
88+
port, err := portArgs.GetPort(instance, srv, defaultAddress, defaultProtocol)
8989
if err != nil {
9090
feedback.Fatal(tr("Error getting port metadata: %v", err), feedback.ErrGeneric)
9191
}

Diff for: internal/cli/arguments/port.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,12 @@ func (p *Port) AddToCommand(cmd *cobra.Command, srv rpc.ArduinoCoreServiceServer
5656
// This method allows will bypass the discoveries if:
5757
// - a nil instance is passed: in this case the plain port and protocol arguments are returned (even if empty)
5858
// - a protocol is specified: in this case the discoveries are not needed to autodetect the protocol.
59-
func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, defaultAddress, defaultProtocol string) (string, string, error) {
59+
func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, srv rpc.ArduinoCoreServiceServer, defaultAddress, defaultProtocol string) (string, string, error) {
6060
if p.protocol != "" || instance == nil {
6161
return p.address, p.protocol, nil
6262
}
6363

64-
port, err := p.GetPort(instance, defaultAddress, defaultProtocol)
64+
port, err := p.GetPort(instance, srv, defaultAddress, defaultProtocol)
6565
if err != nil {
6666
return "", "", err
6767
}
@@ -70,8 +70,7 @@ func (p *Port) GetPortAddressAndProtocol(instance *rpc.Instance, defaultAddress,
7070

7171
// GetPort returns the Port obtained by parsing command line arguments.
7272
// The extra metadata for the ports is obtained using the pluggable discoveries.
73-
func (p *Port) GetPort(instance *rpc.Instance, defaultAddress, defaultProtocol string) (*rpc.Port, error) {
74-
73+
func (p *Port) GetPort(instance *rpc.Instance, srv rpc.ArduinoCoreServiceServer, defaultAddress, defaultProtocol string) (*rpc.Port, error) {
7574
address := p.address
7675
protocol := p.protocol
7776
if address == "" && (defaultAddress != "" || defaultProtocol != "") {
@@ -91,7 +90,10 @@ func (p *Port) GetPort(instance *rpc.Instance, defaultAddress, defaultProtocol s
9190

9291
ctx, cancel := context.WithCancel(context.Background())
9392
defer cancel()
94-
watcher, err := commands.BoardListWatch(ctx, &rpc.BoardListWatchRequest{Instance: instance})
93+
94+
stream, watcher := commands.BoardListWatchProxyToChan(ctx)
95+
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: instance}, stream)
96+
9597
if err != nil {
9698
return nil, err
9799
}

Diff for: internal/cli/board/attach.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
4545
if len(args) > 0 {
4646
sketchPath = args[0]
4747
}
48-
runAttachCommand(sketchPath, &port, fqbn.String(), &programmer)
48+
runAttachCommand(srv, sketchPath, &port, fqbn.String(), &programmer)
4949
},
5050
}
5151
fqbn.AddToCommand(attachCommand, srv)
@@ -55,10 +55,10 @@ func initAttachCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
5555
return attachCommand
5656
}
5757

58-
func runAttachCommand(path string, port *arguments.Port, fqbn string, programmer *arguments.Programmer) {
58+
func runAttachCommand(srv rpc.ArduinoCoreServiceServer, path string, port *arguments.Port, fqbn string, programmer *arguments.Programmer) {
5959
sketchPath := arguments.InitSketchPath(path)
6060

61-
portAddress, portProtocol, _ := port.GetPortAddressAndProtocol(nil, "", "")
61+
portAddress, portProtocol, _ := port.GetPortAddressAndProtocol(nil, srv, "", "")
6262
newDefaults, err := commands.SetSketchDefaults(context.Background(), &rpc.SetSketchDefaultsRequest{
6363
SketchPath: sketchPath.String(),
6464
DefaultFqbn: fqbn,

Diff for: internal/cli/board/board.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func NewCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
3939
boardCommand.AddCommand(initDetailsCommand(srv))
4040
boardCommand.AddCommand(initListCommand(srv))
4141
boardCommand.AddCommand(initListAllCommand(srv))
42-
boardCommand.AddCommand(initSearchCommand())
42+
boardCommand.AddCommand(initSearchCommand(srv))
4343

4444
return boardCommand
4545
}

Diff for: internal/cli/board/list.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64,
6363
logrus.Info("Executing `arduino-cli board list`")
6464

6565
if watch {
66-
watchList(inst)
66+
watchList(inst, srv)
6767
return
6868
}
6969

@@ -88,8 +88,9 @@ func runListCommand(srv rpc.ArduinoCoreServiceServer, watch bool, timeout int64,
8888
feedback.PrintResult(listResult{result.NewDetectedPorts(ports)})
8989
}
9090

91-
func watchList(inst *rpc.Instance) {
92-
eventsChan, err := commands.BoardListWatch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst})
91+
func watchList(inst *rpc.Instance, srv rpc.ArduinoCoreServiceServer) {
92+
stream, eventsChan := commands.BoardListWatchProxyToChan(context.Background())
93+
err := srv.BoardListWatch(&rpc.BoardListWatchRequest{Instance: inst}, stream)
9394
if err != nil {
9495
feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork)
9596
}

Diff for: internal/cli/board/search.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"sort"
2323
"strings"
2424

25-
"github.com/arduino/arduino-cli/commands"
2625
"github.com/arduino/arduino-cli/internal/cli/feedback"
2726
"github.com/arduino/arduino-cli/internal/cli/feedback/result"
2827
"github.com/arduino/arduino-cli/internal/cli/feedback/table"
@@ -32,7 +31,7 @@ import (
3231
"github.com/spf13/cobra"
3332
)
3433

35-
func initSearchCommand() *cobra.Command {
34+
func initSearchCommand(srv rpc.ArduinoCoreServiceServer) *cobra.Command {
3635
var searchCommand = &cobra.Command{
3736
Use: fmt.Sprintf("search [%s]", tr("boardname")),
3837
Short: tr("Search for a board in the Boards Manager."),
@@ -41,18 +40,20 @@ func initSearchCommand() *cobra.Command {
4140
" " + os.Args[0] + " board search\n" +
4241
" " + os.Args[0] + " board search zero",
4342
Args: cobra.ArbitraryArgs,
44-
Run: runSearchCommand,
43+
Run: func(cmd *cobra.Command, args []string) {
44+
runSearchCommand(srv, args)
45+
},
4546
}
4647
searchCommand.Flags().BoolVarP(&showHiddenBoard, "show-hidden", "a", false, tr("Show also boards marked as 'hidden' in the platform"))
4748
return searchCommand
4849
}
4950

50-
func runSearchCommand(cmd *cobra.Command, args []string) {
51+
func runSearchCommand(srv rpc.ArduinoCoreServiceServer, args []string) {
5152
inst := instance.CreateAndInit()
5253

5354
logrus.Info("Executing `arduino-cli board search`")
5455

55-
res, err := commands.BoardSearch(context.Background(), &rpc.BoardSearchRequest{
56+
res, err := srv.BoardSearch(context.Background(), &rpc.BoardSearchRequest{
5657
Instance: inst,
5758
SearchArgs: strings.Join(args, " "),
5859
IncludeHiddenBoards: showHiddenBoard,

Diff for: internal/cli/burnbootloader/burnbootloader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func runBootloaderCommand(srv rpc.ArduinoCoreServiceServer) {
7171
logrus.Info("Executing `arduino-cli burn-bootloader`")
7272

7373
// We don't need a Sketch to upload a board's bootloader
74-
discoveryPort, err := port.GetPort(instance, "", "")
74+
discoveryPort, err := port.GetPort(instance, srv, "", "")
7575
if err != nil {
7676
feedback.Fatal(tr("Error during Upload: %v", err), feedback.ErrGeneric)
7777
}

Diff for: internal/cli/debug/debug_check.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func runDebugCheckCommand(srv rpc.ArduinoCoreServiceServer, portArgs *arguments.
5555
instance := instance.CreateAndInit()
5656
logrus.Info("Executing `arduino-cli debug`")
5757

58-
port, err := portArgs.GetPort(instance, "", "")
58+
port, err := portArgs.GetPort(instance, srv, "", "")
5959
if err != nil {
6060
feedback.FatalError(err, feedback.ErrBadArgument)
6161
}

Diff for: internal/cli/monitor/monitor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func runMonitorCmd(
140140
fqbn, _ = portArgs.DetectFQBN(inst, srv)
141141
}
142142

143-
portAddress, portProtocol, err := portArgs.GetPortAddressAndProtocol(inst, defaultPort, defaultProtocol)
143+
portAddress, portProtocol, err := portArgs.GetPortAddressAndProtocol(inst, srv, defaultPort, defaultProtocol)
144144
if err != nil {
145145
feedback.FatalError(err, feedback.ErrGeneric)
146146
}

Diff for: internal/integrationtest/daemon/daemon_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,15 @@ func TestArduinoCliDaemon(t *testing.T) {
6363
require.NoError(t, err)
6464
watcherCanceldCh := make(chan struct{})
6565
go func() {
66+
defer close(watcherCanceldCh)
6667
for {
6768
msg, err := watcher.Recv()
6869
if errors.Is(err, io.EOF) {
69-
fmt.Println("Watcher EOF")
70+
fmt.Println("Got EOF from watcher")
7071
return
7172
}
7273
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
73-
fmt.Println("Watcher canceled")
74-
watcherCanceldCh <- struct{}{}
74+
fmt.Println("Got Canceled error from watcher")
7575
return
7676
}
7777
require.NoError(t, err, "BoardListWatch grpc call returned an error")

0 commit comments

Comments
 (0)