@@ -34,16 +34,18 @@ type DiscoveryManager struct {
34
34
feed chan * discovery.Event
35
35
watchersMutex sync.Mutex
36
36
watchers map [* PortWatcher ]bool
37
+ watchersCache map [string ]map [string ]* discovery.Event
37
38
}
38
39
39
40
var tr = i18n .Tr
40
41
41
42
// New creates a new DiscoveryManager
42
43
func New () * DiscoveryManager {
43
44
return & DiscoveryManager {
44
- discoveries : map [string ]* discovery.PluggableDiscovery {},
45
- watchers : map [* PortWatcher ]bool {},
46
- feed : make (chan * discovery.Event , 50 ),
45
+ discoveries : map [string ]* discovery.PluggableDiscovery {},
46
+ watchers : map [* PortWatcher ]bool {},
47
+ feed : make (chan * discovery.Event , 50 ),
48
+ watchersCache : map [string ]map [string ]* discovery.Event {},
47
49
}
48
50
}
49
51
@@ -139,9 +141,16 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
139
141
dm .watchersMutex .Unlock ()
140
142
close (watcher .feed )
141
143
}
142
- dm .watchersMutex .Lock ()
143
- dm .watchers [watcher ] = true
144
- dm .watchersMutex .Unlock ()
144
+ go func () {
145
+ dm .watchersMutex .Lock ()
146
+ for _ , cache := range dm .watchersCache {
147
+ for _ , ev := range cache {
148
+ watcher .feed <- ev
149
+ }
150
+ }
151
+ dm .watchers [watcher ] = true
152
+ dm .watchersMutex .Unlock ()
153
+ }()
145
154
return watcher , nil
146
155
}
147
156
@@ -190,19 +199,35 @@ func (dm *DiscoveryManager) feeder() {
190
199
}
191
200
192
201
func (dm * DiscoveryManager ) cacheEvent (ev * discovery.Event ) {
193
- // XXX: TODO
202
+ cache := dm .watchersCache [ev .DiscoveryID ]
203
+ if cache == nil {
204
+ cache = map [string ]* discovery.Event {}
205
+ dm .watchersCache [ev .DiscoveryID ] = cache
206
+ }
207
+
208
+ eventID := ev .Port .Address + "|" + ev .Port .Protocol
209
+ switch ev .Type {
210
+ case "add" :
211
+ cache [eventID ] = ev
212
+ case "remove" :
213
+ delete (cache , eventID )
214
+ default :
215
+ logrus .Errorf ("Unhandled event from discovery: %s" , ev .Type )
216
+ return
217
+ }
194
218
}
195
219
196
220
// List return the current list of ports detected from all discoveries
197
221
func (dm * DiscoveryManager ) List () []* discovery.Port {
198
222
dm .Start ()
199
223
200
- // XXX: Cache ports and return them
201
- dm .discoveriesMutex .Lock ()
202
- defer dm .discoveriesMutex .Unlock ()
203
224
res := []* discovery.Port {}
204
- for _ , d := range dm .discoveries {
205
- res = append (res , d .ListCachedPorts ()... )
225
+ dm .watchersMutex .Lock ()
226
+ defer dm .watchersMutex .Unlock ()
227
+ for _ , cache := range dm .watchersCache {
228
+ for _ , ev := range cache {
229
+ res = append (res , ev .Port )
230
+ }
206
231
}
207
232
return res
208
233
}
0 commit comments