forked from arduino/arduino-cli
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdiscoverymanager.go
302 lines (265 loc) · 8.41 KB
/
discoverymanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
// This file is part of arduino-cli.
//
// Copyright 2020 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to [email protected].
package discoverymanager
import (
"errors"
"fmt"
"sync"
"time"
"github.com/arduino/arduino-cli/internal/i18n"
discovery "github.com/arduino/pluggable-discovery-protocol-handler/v2"
"github.com/sirupsen/logrus"
)
// DiscoveryManager manages the many-to-many communication between all pluggable
// discoveries and all watchers. Each PluggableDiscovery, once started, will
// produce a sequence of "events". These events will be broadcasted to all
// listening Watcher.
// The DiscoveryManager will not start the discoveries until the Start method
// is called.
type DiscoveryManager struct {
discoveriesMutex sync.Mutex
discoveries map[string]*discovery.Client // all registered PluggableDiscovery
discoveriesRunning bool // set to true once discoveries are started
feed chan *discovery.Event // all events will pass through this channel
watchersMutex sync.Mutex
watchers map[*PortWatcher]bool // all registered Watcher
watchersCache map[string]map[string]*discovery.Event // this is a cache of all active ports
userAgent string
}
var tr = i18n.Tr
// New creates a new DiscoveryManager
func New(userAgent string) *DiscoveryManager {
return &DiscoveryManager{
discoveries: map[string]*discovery.Client{},
watchers: map[*PortWatcher]bool{},
feed: make(chan *discovery.Event, 50),
watchersCache: map[string]map[string]*discovery.Event{},
userAgent: userAgent,
}
}
// Clear resets the DiscoveryManager to its initial state
func (dm *DiscoveryManager) Clear() {
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
if dm.discoveriesRunning {
for _, d := range dm.discoveries {
d.Quit()
logrus.Infof("Closed and removed discovery %s", d.GetID())
}
}
dm.discoveries = map[string]*discovery.Client{}
}
// IDs returns the list of discoveries' ids in this DiscoveryManager
func (dm *DiscoveryManager) IDs() []string {
ids := []string{}
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
for id := range dm.discoveries {
ids = append(ids, id)
}
return ids
}
// Start starts all the discoveries in this DiscoveryManager.
// If the discoveries are already running, this function does nothing.
func (dm *DiscoveryManager) Start() []error {
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
if dm.discoveriesRunning {
return nil
}
go func() {
// Send all events coming from the feed channel to all active watchers
for ev := range dm.feed {
dm.feedEvent(ev)
}
}()
errs := []error{}
var errsLock sync.Mutex
var wg sync.WaitGroup
for _, d := range dm.discoveries {
wg.Add(1)
go func(d *discovery.Client) {
if err := dm.startDiscovery(d); err != nil {
errsLock.Lock()
errs = append(errs, err)
errsLock.Unlock()
}
wg.Done()
}(d)
}
wg.Wait()
dm.discoveriesRunning = true
return errs
}
// Add adds a discovery to the list of managed discoveries
func (dm *DiscoveryManager) Add(id string, args ...string) error {
d := discovery.NewClient(id, args...)
d.SetLogger(logrus.WithField("discovery", id))
d.SetUserAgent(dm.userAgent)
return dm.add(d)
}
func (dm *DiscoveryManager) add(d *discovery.Client) error {
dm.discoveriesMutex.Lock()
defer dm.discoveriesMutex.Unlock()
id := d.GetID()
if _, has := dm.discoveries[id]; has {
return errors.New(tr("pluggable discovery already added: %s", id))
}
dm.discoveries[id] = d
if dm.discoveriesRunning {
dm.startDiscovery(d)
}
return nil
}
// PortWatcher is a watcher for all discovery events (port connection/disconnection)
type PortWatcher struct {
closeCB func()
feed chan *discovery.Event
}
// Feed returns the feed of events coming from the discoveries
func (pw *PortWatcher) Feed() <-chan *discovery.Event {
return pw.feed
}
// Close closes the PortWatcher
func (pw *PortWatcher) Close() {
pw.closeCB()
}
// Watch starts a watcher for all discovery events (port connection/disconnection).
// The watcher must be closed when it is no longer needed with the Close method.
func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
dm.Start()
watcher := &PortWatcher{
feed: make(chan *discovery.Event, 10),
}
watcher.closeCB = func() {
dm.watchersMutex.Lock()
defer dm.watchersMutex.Unlock()
delete(dm.watchers, watcher)
close(watcher.feed)
watcher.feed = nil
}
go func() {
dm.watchersMutex.Lock()
defer dm.watchersMutex.Unlock()
// Check if the watcher is still alive (it could have been closed before the goroutine started...)
if watcher.feed == nil {
return
}
// When a watcher is started, send all the current active ports first...
for _, cache := range dm.watchersCache {
for _, ev := range cache {
watcher.feed <- ev
}
}
// ...and after that add the watcher to the list of watchers receiving events
dm.watchers[watcher] = true
}()
return watcher, nil
}
func (dm *DiscoveryManager) startDiscovery(d *discovery.Client) (discErr error) {
defer func() {
// If this function returns an error log it
if discErr != nil {
logrus.Errorf("Discovery %s failed to run: %s", d.GetID(), discErr)
}
}()
if err := d.Run(); err != nil {
return fmt.Errorf(tr("discovery %[1]s process not started: %[2]w"), d.GetID(), err)
}
eventCh, err := d.StartSync(5)
if err != nil {
return fmt.Errorf("%s: %s", tr("starting discovery %s", d.GetID()), err)
}
go func(d *discovery.Client) {
// Transfer all incoming events from this discovery to the feed channel
for ev := range eventCh {
dm.feed <- ev
}
logrus.Infof("Discovery event channel closed %s. Exiting goroutine.", d.GetID())
}(d)
return nil
}
func (dm *DiscoveryManager) feedEvent(ev *discovery.Event) {
dm.watchersMutex.Lock()
defer dm.watchersMutex.Unlock()
sendToAllWatchers := func(ev *discovery.Event) {
// Send the event to all watchers
for watcher := range dm.watchers {
select {
case watcher.feed <- ev:
// OK
case <-time.After(time.Millisecond * 500):
// If the watcher is not able to process event fast enough
// remove the watcher from the list of watchers
logrus.Error("Watcher is not able to process events fast enough, removing it from the list of watchers")
delete(dm.watchers, watcher)
}
}
}
if ev.Type == "stop" {
// Send remove events for all the cached ports of the terminating discovery
cache := dm.watchersCache[ev.DiscoveryID]
for _, addEv := range cache {
removeEv := &discovery.Event{
Type: "remove",
Port: &discovery.Port{
Address: addEv.Port.Address,
AddressLabel: addEv.Port.AddressLabel,
Protocol: addEv.Port.Protocol,
ProtocolLabel: addEv.Port.ProtocolLabel},
DiscoveryID: addEv.DiscoveryID,
}
sendToAllWatchers(removeEv)
}
// Remove the cache for the terminating discovery
delete(dm.watchersCache, ev.DiscoveryID)
return
}
sendToAllWatchers(ev)
// Cache the event for the discovery
cache := dm.watchersCache[ev.DiscoveryID]
if cache == nil {
cache = map[string]*discovery.Event{}
dm.watchersCache[ev.DiscoveryID] = cache
}
eventID := ev.Port.Address + "|" + ev.Port.Protocol
switch ev.Type {
case "add":
cache[eventID] = ev
case "remove":
delete(cache, eventID)
default:
logrus.Errorf("Unhandled event from discovery: %s", ev.Type)
}
}
// List return the current list of ports detected from all discoveries
func (dm *DiscoveryManager) List() []*discovery.Port {
dm.Start()
res := []*discovery.Port{}
dm.watchersMutex.Lock()
defer dm.watchersMutex.Unlock()
for _, cache := range dm.watchersCache {
for _, ev := range cache {
res = append(res, ev.Port)
}
}
return res
}
// AddAllDiscoveriesFrom transfers discoveries from src to the receiver
func (dm *DiscoveryManager) AddAllDiscoveriesFrom(src *DiscoveryManager) {
for _, d := range src.discoveries {
dm.add(d)
}
}