Skip to content

Commit 9e61ef6

Browse files
remove client stream
1 parent 1b48af3 commit 9e61ef6

File tree

6 files changed

+32
-60
lines changed

6 files changed

+32
-60
lines changed

Diff for: client_example/main.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -657,16 +657,13 @@ func callBoardList(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance)
657657
}
658658

659659
func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {
660-
watchClient, err := client.BoardListWatch(context.Background())
660+
req := &rpc.BoardListWatchRequest{Instance: instance}
661+
watchClient, err := client.BoardListWatch(context.Background(), req)
661662
if err != nil {
662663
log.Fatalf("Board list watch error: %s\n", err)
663664
}
664665

665666
// Start the watcher
666-
watchClient.Send(&rpc.BoardListWatchRequest{
667-
Instance: instance,
668-
})
669-
670667
go func() {
671668
for {
672669
res, err := watchClient.Recv()
@@ -693,9 +690,6 @@ func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Insta
693690
// Watch for 10 seconds and then interrupts
694691
timer := time.NewTicker(time.Duration(10 * time.Second))
695692
<-timer.C
696-
watchClient.Send(&rpc.BoardListWatchRequest{
697-
Interrupt: true,
698-
})
699693
}
700694

701695
func callPlatformUnInstall(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {

Diff for: commands/board/list.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -258,22 +258,22 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool {
258258

259259
// Watch returns a channel that receives boards connection and disconnection events.
260260
// It also returns a callback function that must be used to stop and dispose the watch.
261-
func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, func(), error) {
261+
func Watch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) {
262262
pme, release := commands.GetPackageManagerExplorer(req)
263263
if pme == nil {
264-
return nil, nil, &arduino.InvalidInstanceError{}
264+
return nil, &arduino.InvalidInstanceError{}
265265
}
266266
defer release()
267267
dm := pme.DiscoveryManager()
268268

269269
watcher, err := dm.Watch()
270270
if err != nil {
271-
return nil, nil, err
271+
return nil, err
272272
}
273273

274-
ctx, cancel := context.WithCancel(context.Background())
275274
go func() {
276275
<-ctx.Done()
276+
logrus.Trace("closed watch")
277277
watcher.Close()
278278
}()
279279

@@ -301,5 +301,5 @@ func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse,
301301
}
302302
}()
303303

304-
return outChan, cancel, nil
304+
return outChan, nil
305305
}

Diff for: commands/daemon/daemon.go

+4-34
Original file line numberDiff line numberDiff line change
@@ -86,52 +86,22 @@ func (s *ArduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardS
8686
}
8787

8888
// BoardListWatch FIXMEDOC
89-
func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_BoardListWatchServer) error {
89+
func (s *ArduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
9090
syncSend := NewSynchronizedSend(stream.Send)
91-
msg, err := stream.Recv()
92-
if err == io.EOF {
93-
return nil
94-
}
95-
if err != nil {
96-
return err
97-
}
98-
99-
if msg.Instance == nil {
100-
err = fmt.Errorf(tr("no instance specified"))
91+
if req.Instance == nil {
92+
err := fmt.Errorf(tr("no instance specified"))
10193
syncSend.Send(&rpc.BoardListWatchResponse{
10294
EventType: "error",
10395
Error: err.Error(),
10496
})
10597
return err
10698
}
10799

108-
eventsChan, closeWatcher, err := board.Watch(msg)
100+
eventsChan, err := board.Watch(stream.Context(), req)
109101
if err != nil {
110102
return convertErrorToRPCStatus(err)
111103
}
112104

113-
go func() {
114-
defer closeWatcher()
115-
for {
116-
msg, err := stream.Recv()
117-
// Handle client closing the stream and eventual errors
118-
if err == io.EOF {
119-
logrus.Info("boards watcher stream closed")
120-
return
121-
}
122-
if err != nil {
123-
logrus.Infof("interrupting boards watcher: %v", err)
124-
return
125-
}
126-
127-
// Message received, does the client want to interrupt?
128-
if msg != nil && msg.Interrupt {
129-
logrus.Info("boards watcher interrupted by client")
130-
return
131-
}
132-
}
133-
}()
134-
135105
for event := range eventsChan {
136106
if err := syncSend.Send(event); err != nil {
137107
logrus.Infof("sending board watch message: %v", err)

Diff for: internal/cli/board/list.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package board
1717

1818
import (
19+
"context"
1920
"errors"
2021
"fmt"
2122
"os"
@@ -84,11 +85,10 @@ func runListCommand(watch bool, timeout int64, fqbn string) {
8485
}
8586

8687
func watchList(inst *rpc.Instance) {
87-
eventsChan, closeCB, err := board.Watch(&rpc.BoardListWatchRequest{Instance: inst})
88+
eventsChan, err := board.Watch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst})
8889
if err != nil {
8990
feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork)
9091
}
91-
defer closeCB()
9292

9393
// This is done to avoid printing the header each time a new event is received
9494
if feedback.GetFormat() == feedback.Text {

Diff for: internal/integrationtest/arduino-cli.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -362,16 +362,16 @@ func (inst *ArduinoCLIInstance) BoardList(timeout time.Duration) (*commands.Boar
362362
}
363363

364364
// BoardListWatch calls the "BoardListWatch" gRPC method.
365-
func (inst *ArduinoCLIInstance) BoardListWatch() (commands.ArduinoCoreService_BoardListWatchClient, error) {
365+
func (inst *ArduinoCLIInstance) BoardListWatch(ctx context.Context) (commands.ArduinoCoreService_BoardListWatchClient, error) {
366366
boardListWatchReq := &commands.BoardListWatchRequest{
367367
Instance: inst.instance,
368368
}
369369
logCallf(">>> BoardListWatch(%v)\n", boardListWatchReq)
370-
watcher, err := inst.cli.daemonClient.BoardListWatch(context.Background())
370+
watcher, err := inst.cli.daemonClient.BoardListWatch(ctx, boardListWatchReq)
371371
if err != nil {
372372
return watcher, err
373373
}
374-
return watcher, watcher.Send(boardListWatchReq)
374+
return watcher, nil
375375
}
376376

377377
// PlatformInstall calls the "PlatformInstall" gRPC method.

Diff for: internal/integrationtest/daemon/daemon_test.go

+16-8
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/arduino/arduino-cli/internal/integrationtest"
2828
"github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
2929
"github.com/arduino/go-paths-helper"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/status"
3032

3133
"github.com/stretchr/testify/require"
3234
)
@@ -54,28 +56,34 @@ func TestArduinoCliDaemon(t *testing.T) {
5456

5557
testWatcher := func() {
5658
// Run watcher
57-
watcher, err := grpcInst.BoardListWatch()
59+
ctx, cancel := context.WithCancel(context.TODO())
60+
watcher, err := grpcInst.BoardListWatch(ctx)
5861
require.NoError(t, err)
59-
ctx, cancel := context.WithCancel(context.Background())
62+
watcherCanceldCh := make(chan struct{})
6063
go func() {
61-
defer cancel()
6264
for {
6365
msg, err := watcher.Recv()
6466
if err == io.EOF {
6567
fmt.Println("Watcher EOF")
6668
return
6769
}
68-
require.Empty(t, msg.Error, "Board list watcher returned an error")
70+
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
71+
fmt.Println("Watcher canceled")
72+
watcherCanceldCh <- struct{}{}
73+
return
74+
}
6975
require.NoError(t, err, "BoardListWatch grpc call returned an error")
70-
fmt.Printf("WATCH> %v\n", msg)
76+
require.Empty(t, msg.Error, "Board list watcher returned an error")
77+
fmt.Printf("WATCH> %v %v\n", msg, err)
7178
}
7279
}()
7380
time.Sleep(time.Second)
74-
require.NoError(t, watcher.CloseSend())
81+
cancel()
82+
time.Sleep(time.Second)
7583
select {
76-
case <-ctx.Done():
84+
case <-watcherCanceldCh:
7785
// all right!
78-
case <-time.After(time.Second):
86+
case <-time.After(2 * time.Second):
7987
require.Fail(t, "BoardListWatch didn't close")
8088
}
8189
}

0 commit comments

Comments
 (0)