@@ -18,6 +18,7 @@ package discoverymanager
18
18
import (
19
19
"fmt"
20
20
"sync"
21
+ "time"
21
22
22
23
"github.com/arduino/arduino-cli/arduino/discovery"
23
24
"github.com/arduino/arduino-cli/i18n"
@@ -83,7 +84,12 @@ func (dm *DiscoveryManager) Start() {
83
84
return
84
85
}
85
86
86
- go dm .feeder ()
87
+ go func () {
88
+ // Feed all watchers with data coming from the discoveries
89
+ for ev := range dm .feed {
90
+ dm .feedEvent (ev )
91
+ }
92
+ }()
87
93
88
94
var wg sync.WaitGroup
89
95
for _ , d := range dm .discoveries {
@@ -136,13 +142,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
136
142
dm .Start ()
137
143
138
144
watcher := & PortWatcher {
139
- feed : make (chan * discovery.Event ),
145
+ feed : make (chan * discovery.Event , 10 ),
140
146
}
141
147
watcher .closeCB = func () {
142
148
dm .watchersMutex .Lock ()
143
149
delete (dm .watchers , watcher )
144
- dm .watchersMutex .Unlock ()
145
150
close (watcher .feed )
151
+ dm .watchersMutex .Unlock ()
146
152
}
147
153
go func () {
148
154
dm .watchersMutex .Lock ()
@@ -182,44 +188,43 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis
182
188
return nil
183
189
}
184
190
185
- func (dm * DiscoveryManager ) feeder () {
186
- // Feed all watchers with data coming from the discoveries
187
- for ev := range dm .feed {
188
- dm .watchersMutex .Lock ()
189
- for watcher := range dm .watchers {
190
- select {
191
- case watcher .feed <- ev :
192
- // OK
193
- default :
194
- // If the watcher is not able to process event fast enough
195
- // remove the watcher from the list of watchers
196
- go watcher .Close ()
197
- }
191
+ func (dm * DiscoveryManager ) feedEvent (ev * discovery.Event ) {
192
+ dm .watchersMutex .Lock ()
193
+ defer dm .watchersMutex .Unlock ()
194
+
195
+ if ev .Type == "stop" {
196
+ // Remove all the cached events for the terminating discovery
197
+ delete (dm .watchersCache , ev .DiscoveryID )
198
+ return
199
+ }
200
+
201
+ // Send the event to all watchers
202
+ for watcher := range dm .watchers {
203
+ select {
204
+ case watcher .feed <- ev :
205
+ // OK
206
+ case <- time .After (time .Millisecond * 500 ):
207
+ // If the watcher is not able to process event fast enough
208
+ // remove the watcher from the list of watchers
209
+ logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
210
+ delete (dm .watchers , watcher )
198
211
}
199
- dm .cacheEvent (ev )
200
- dm .watchersMutex .Unlock ()
201
212
}
202
- }
203
213
204
- func ( dm * DiscoveryManager ) cacheEvent ( ev * discovery. Event ) {
214
+ // Cache the event for the discovery
205
215
cache := dm .watchersCache [ev .DiscoveryID ]
206
216
if cache == nil {
207
217
cache = map [string ]* discovery.Event {}
208
218
dm .watchersCache [ev .DiscoveryID ] = cache
209
219
}
210
-
211
220
eventID := ev .Port .Address + "|" + ev .Port .Protocol
212
221
switch ev .Type {
213
222
case "add" :
214
223
cache [eventID ] = ev
215
224
case "remove" :
216
225
delete (cache , eventID )
217
- case "quit" :
218
- // Remove all the events for this discovery
219
- delete (dm .watchersCache , ev .DiscoveryID )
220
226
default :
221
227
logrus .Errorf ("Unhandled event from discovery: %s" , ev .Type )
222
- return
223
228
}
224
229
}
225
230
0 commit comments