|
13 | 13 | }
|
14 | 14 |
|
15 | 15 |
|
| 16 | +def _emit_event(event): |
| 17 | + module = type_mapping.get(event["type"]) |
| 18 | + if module: |
| 19 | + asyncio.ensure_future(module.handle_event(event)) |
| 20 | + |
| 21 | + |
16 | 22 | async def monitor(crds, namespace):
|
17 | 23 | log.info(f"Monitoring charts.k8s.openttd.org in namespace '{namespace}' for changes ...")
|
18 | 24 |
|
19 |
| - stream = watch.Watch().stream(crds.list_namespaced_custom_object, |
20 |
| - "k8s.openttd.org", |
21 |
| - "v1", |
22 |
| - namespace, |
23 |
| - "charts", |
24 |
| - _request_timeout=30) |
25 |
| - async for event in stream: |
26 |
| - module = type_mapping.get(event["type"]) |
27 |
| - if module: |
28 |
| - asyncio.ensure_future(module.handle_event(event)) |
29 |
| - |
30 |
| - log.info(f"Monitoring in namespace '{namespace}' stopped") |
| 25 | + # Prepare what function we want to call with which parameters |
| 26 | + func = crds.list_namespaced_custom_object |
| 27 | + args = ["k8s.openttd.org", "v1", namespace, "charts"] |
| 28 | + |
| 29 | + # Start by listing all entries, and emit an 'ADDED' for each existing |
| 30 | + # entry. This allows us to get in a known-good-state, and monitor all |
| 31 | + # changes after. |
| 32 | + initial_list = await func(*args) |
| 33 | + for item in initial_list['items']: |
| 34 | + _emit_event({"type": "ADDED", "object": item}) |
| 35 | + |
| 36 | + # The list has the resource_version we should use as starting point of |
| 37 | + # our watch(). |
| 38 | + resource_version = initial_list['metadata']['resourceVersion'] |
| 39 | + |
| 40 | + my_watch = watch.Watch() |
| 41 | + # XXX - kubernetes-asyncio has not sync'd with upstream yet. |
| 42 | + # See https://github.com/kubernetes-client/python-base/commit/2d69e89dab7134186cbcdaf82381ab6295c6c394 |
| 43 | + # and https://github.com/tomplus/kubernetes_asyncio/issues/77 |
| 44 | + # If this gets fixed, the next line can be removed. |
| 45 | + my_watch.resource_version = resource_version |
| 46 | + |
| 47 | + async with my_watch.stream(func, *args, resource_version=resource_version, _request_timeout=30) as stream: |
| 48 | + async for event in stream: |
| 49 | + _emit_event(event) |
| 50 | + |
| 51 | + log.error(f"Monitoring in namespace '{namespace}' stopped unexpectedly") |
0 commit comments