@@ -27,28 +27,22 @@ import (
27
27
// DiscoveryManager is required to handle multiple pluggable-discovery that
28
28
// may be shared across platforms
29
29
type DiscoveryManager struct {
30
- discoveries map [string ]* discovery.PluggableDiscovery
31
- globalEventCh chan * discovery.Event
30
+ discoveries map [string ]* discovery.PluggableDiscovery
32
31
}
33
32
34
33
var tr = i18n .Tr
35
34
36
35
// New creates a new DiscoveryManager
37
36
func New () * DiscoveryManager {
38
37
return & DiscoveryManager {
39
- discoveries : map [string ]* discovery.PluggableDiscovery {},
40
- globalEventCh : nil ,
38
+ discoveries : map [string ]* discovery.PluggableDiscovery {},
41
39
}
42
40
}
43
41
44
42
// Clear resets the DiscoveryManager to its initial state
45
43
func (dm * DiscoveryManager ) Clear () {
46
44
dm .QuitAll ()
47
45
dm .discoveries = map [string ]* discovery.PluggableDiscovery {}
48
- if dm .globalEventCh != nil {
49
- close (dm .globalEventCh )
50
- dm .globalEventCh = nil
51
- }
52
46
}
53
47
54
48
// IDs returns the list of discoveries' ids in this DiscoveryManager
@@ -134,10 +128,10 @@ func (dm *DiscoveryManager) StartAll() []error {
134
128
// StartSyncAll the discoveries for this DiscoveryManager,
135
129
// returns an error for each discovery failing to start syncing
136
130
func (dm * DiscoveryManager ) StartSyncAll () (<- chan * discovery.Event , []error ) {
137
- if dm .globalEventCh == nil {
138
- dm .globalEventCh = make (chan * discovery.Event , 5 )
139
- }
131
+ eventSink := make (chan * discovery.Event , 5 )
132
+ var wg sync.WaitGroup
140
133
errs := dm .parallelize (func (d * discovery.PluggableDiscovery ) error {
134
+ wg .Add (1 )
141
135
state := d .State ()
142
136
if state != discovery .Idling || state == discovery .Syncing {
143
137
// Already syncing
@@ -150,12 +144,18 @@ func (dm *DiscoveryManager) StartSyncAll() (<-chan *discovery.Event, []error) {
150
144
}
151
145
go func () {
152
146
for ev := range eventCh {
153
- dm . globalEventCh <- ev
147
+ eventSink <- ev
154
148
}
149
+ wg .Done ()
155
150
}()
156
151
return nil
157
152
})
158
- return dm .globalEventCh , errs
153
+ go func () {
154
+ wg .Wait ()
155
+ eventSink <- & discovery.Event {Type : "quit" }
156
+ close (eventSink )
157
+ }()
158
+ return eventSink , errs
159
159
}
160
160
161
161
// StopAll the discoveries for this DiscoveryManager,
@@ -189,15 +189,6 @@ func (dm *DiscoveryManager) QuitAll() []error {
189
189
}
190
190
return nil
191
191
})
192
- // Close the global channel only if there were no errors
193
- // quitting all alive discoveries
194
- if len (errs ) == 0 && dm .globalEventCh != nil {
195
- // Let events consumers that discoveries are quitting no more events
196
- // will be sent on this channel
197
- dm .globalEventCh <- & discovery.Event {Type : "quit" }
198
- close (dm .globalEventCh )
199
- dm .globalEventCh = nil
200
- }
201
192
return errs
202
193
}
203
194
0 commit comments