Skip to content

Commit 6c32a9c

Browse files
remove client stream
1 parent 1b48af3 commit 6c32a9c

File tree

6 files changed

+31
-60
lines changed

6 files changed

+31
-60
lines changed

Diff for: client_example/main.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -657,16 +657,13 @@ func callBoardList(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance)
657657
}
658658

659659
func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {
660-
watchClient, err := client.BoardListWatch(context.Background())
660+
req := &rpc.BoardListWatchRequest{Instance: instance}
661+
watchClient, err := client.BoardListWatch(context.Background(), req)
661662
if err != nil {
662663
log.Fatalf("Board list watch error: %s\n", err)
663664
}
664665

665666
// Start the watcher
666-
watchClient.Send(&rpc.BoardListWatchRequest{
667-
Instance: instance,
668-
})
669-
670667
go func() {
671668
for {
672669
res, err := watchClient.Recv()
@@ -693,9 +690,6 @@ func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Insta
693690
// Watch for 10 seconds and then interrupts
694691
timer := time.NewTicker(time.Duration(10 * time.Second))
695692
<-timer.C
696-
watchClient.Send(&rpc.BoardListWatchRequest{
697-
Interrupt: true,
698-
})
699693
}
700694

701695
func callPlatformUnInstall(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {

Diff for: commands/board/list.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -257,23 +257,22 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool {
257257
}
258258

259259
// Watch returns a channel that receives boards connection and disconnection events.
260-
// It also returns a callback function that must be used to stop and dispose the watch.
261-
func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, func(), error) {
260+
func Watch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) {
262261
pme, release := commands.GetPackageManagerExplorer(req)
263262
if pme == nil {
264-
return nil, nil, &arduino.InvalidInstanceError{}
263+
return nil, &arduino.InvalidInstanceError{}
265264
}
266265
defer release()
267266
dm := pme.DiscoveryManager()
268267

269268
watcher, err := dm.Watch()
270269
if err != nil {
271-
return nil, nil, err
270+
return nil, err
272271
}
273272

274-
ctx, cancel := context.WithCancel(context.Background())
275273
go func() {
276274
<-ctx.Done()
275+
logrus.Trace("closed watch")
277276
watcher.Close()
278277
}()
279278

@@ -301,5 +300,5 @@ func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse,
301300
}
302301
}()
303302

304-
return outChan, cancel, nil
303+
return outChan, nil
305304
}

Diff for: commands/daemon/daemon.go

+4-34
Original file line numberDiff line numberDiff line change
@@ -86,52 +86,22 @@ func (s *ArduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardS
8686
}
8787

8888
// BoardListWatch FIXMEDOC
89-
func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_BoardListWatchServer) error {
89+
func (s *ArduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
9090
syncSend := NewSynchronizedSend(stream.Send)
91-
msg, err := stream.Recv()
92-
if err == io.EOF {
93-
return nil
94-
}
95-
if err != nil {
96-
return err
97-
}
98-
99-
if msg.Instance == nil {
100-
err = fmt.Errorf(tr("no instance specified"))
91+
if req.Instance == nil {
92+
err := fmt.Errorf(tr("no instance specified"))
10193
syncSend.Send(&rpc.BoardListWatchResponse{
10294
EventType: "error",
10395
Error: err.Error(),
10496
})
10597
return err
10698
}
10799

108-
eventsChan, closeWatcher, err := board.Watch(msg)
100+
eventsChan, err := board.Watch(stream.Context(), req)
109101
if err != nil {
110102
return convertErrorToRPCStatus(err)
111103
}
112104

113-
go func() {
114-
defer closeWatcher()
115-
for {
116-
msg, err := stream.Recv()
117-
// Handle client closing the stream and eventual errors
118-
if err == io.EOF {
119-
logrus.Info("boards watcher stream closed")
120-
return
121-
}
122-
if err != nil {
123-
logrus.Infof("interrupting boards watcher: %v", err)
124-
return
125-
}
126-
127-
// Message received, does the client want to interrupt?
128-
if msg != nil && msg.Interrupt {
129-
logrus.Info("boards watcher interrupted by client")
130-
return
131-
}
132-
}
133-
}()
134-
135105
for event := range eventsChan {
136106
if err := syncSend.Send(event); err != nil {
137107
logrus.Infof("sending board watch message: %v", err)

Diff for: internal/cli/board/list.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package board
1717

1818
import (
19+
"context"
1920
"errors"
2021
"fmt"
2122
"os"
@@ -84,11 +85,10 @@ func runListCommand(watch bool, timeout int64, fqbn string) {
8485
}
8586

8687
func watchList(inst *rpc.Instance) {
87-
eventsChan, closeCB, err := board.Watch(&rpc.BoardListWatchRequest{Instance: inst})
88+
eventsChan, err := board.Watch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst})
8889
if err != nil {
8990
feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork)
9091
}
91-
defer closeCB()
9292

9393
// This is done to avoid printing the header each time a new event is received
9494
if feedback.GetFormat() == feedback.Text {

Diff for: internal/integrationtest/arduino-cli.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -362,16 +362,16 @@ func (inst *ArduinoCLIInstance) BoardList(timeout time.Duration) (*commands.Boar
362362
}
363363

364364
// BoardListWatch calls the "BoardListWatch" gRPC method.
365-
func (inst *ArduinoCLIInstance) BoardListWatch() (commands.ArduinoCoreService_BoardListWatchClient, error) {
365+
func (inst *ArduinoCLIInstance) BoardListWatch(ctx context.Context) (commands.ArduinoCoreService_BoardListWatchClient, error) {
366366
boardListWatchReq := &commands.BoardListWatchRequest{
367367
Instance: inst.instance,
368368
}
369369
logCallf(">>> BoardListWatch(%v)\n", boardListWatchReq)
370-
watcher, err := inst.cli.daemonClient.BoardListWatch(context.Background())
370+
watcher, err := inst.cli.daemonClient.BoardListWatch(ctx, boardListWatchReq)
371371
if err != nil {
372372
return watcher, err
373373
}
374-
return watcher, watcher.Send(boardListWatchReq)
374+
return watcher, nil
375375
}
376376

377377
// PlatformInstall calls the "PlatformInstall" gRPC method.

Diff for: internal/integrationtest/daemon/daemon_test.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/arduino/arduino-cli/internal/integrationtest"
2828
"github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
2929
"github.com/arduino/go-paths-helper"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/status"
3032

3133
"github.com/stretchr/testify/require"
3234
)
@@ -52,28 +54,34 @@ func TestArduinoCliDaemon(t *testing.T) {
5254
require.NoError(t, err)
5355
fmt.Printf("Got boardlist response with %d ports\n", len(boardListResp.GetPorts()))
5456

57+
// When the client closes the connection we expect that the streaming from the server closes.
5558
testWatcher := func() {
5659
// Run watcher
57-
watcher, err := grpcInst.BoardListWatch()
58-
require.NoError(t, err)
5960
ctx, cancel := context.WithCancel(context.Background())
61+
watcher, err := grpcInst.BoardListWatch(ctx)
62+
require.NoError(t, err)
63+
watcherCanceldCh := make(chan struct{})
6064
go func() {
61-
defer cancel()
6265
for {
6366
msg, err := watcher.Recv()
6467
if err == io.EOF {
6568
fmt.Println("Watcher EOF")
6669
return
6770
}
68-
require.Empty(t, msg.Error, "Board list watcher returned an error")
71+
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
72+
fmt.Println("Watcher canceled")
73+
watcherCanceldCh <- struct{}{}
74+
return
75+
}
6976
require.NoError(t, err, "BoardListWatch grpc call returned an error")
70-
fmt.Printf("WATCH> %v\n", msg)
77+
require.Empty(t, msg.Error, "Board list watcher returned an error")
78+
fmt.Printf("WATCH> %v %v\n", msg, err)
7179
}
7280
}()
7381
time.Sleep(time.Second)
74-
require.NoError(t, watcher.CloseSend())
82+
cancel()
7583
select {
76-
case <-ctx.Done():
84+
case <-watcherCanceldCh:
7785
// all right!
7886
case <-time.After(time.Second):
7987
require.Fail(t, "BoardListWatch didn't close")

0 commit comments

Comments
 (0)