Skip to content

Commit bdea97b

Browse files
author
Sergey Vasilyev
authored
Merge pull request zalando-incubator#125 from nolar/background-events
Post k8s-events in the background
2 parents b977f12 + aba10c7 commit bdea97b

16 files changed

+265
-92
lines changed

kopf/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
EventsConfig,
2323
WorkersConfig
2424
)
25-
from kopf.events import (
25+
from kopf.engines.posting import (
2626
event,
2727
info,
2828
warn,

kopf/clients/events.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,33 @@
66
import kubernetes.client.rest
77

88
from kopf import config
9+
from kopf.structs import hierarchies
910

1011
logger = logging.getLogger(__name__)
1112

1213
MAX_MESSAGE_LENGTH = 1024
1314
CUT_MESSAGE_INFIX = '...'
1415

1516

16-
async def post_event(*, obj, type, reason, message=''):
17+
async def post_event(*, obj=None, ref=None, type, reason, message=''):
1718
"""
1819
Issue an event for the object.
20+
21+
This is where they can also be accumulated, aggregated, grouped,
22+
and where the rate-limits should be maintained. It can (and should)
23+
be done by the client library, as it is done in the Go client.
1924
"""
2025

26+
# Object reference - similar to the owner reference, but different.
27+
if obj is not None and ref is not None:
28+
raise TypeError("Only one of obj= and ref= is allowed for a posted event. Got both.")
29+
if obj is None and ref is None:
30+
raise TypeError("One of obj= and ref= is required for a posted event. Got none.")
31+
if ref is None:
32+
ref = hierarchies.build_object_reference(obj)
33+
2134
now = datetime.datetime.utcnow()
22-
namespace = obj['metadata']['namespace']
35+
namespace = ref['namespace'] or 'default'
2336

2437
# Prevent a common case of event posting errors but shortening the message.
2538
if len(message) > MAX_MESSAGE_LENGTH:
@@ -28,15 +41,6 @@ async def post_event(*, obj, type, reason, message=''):
2841
suffix = message[-MAX_MESSAGE_LENGTH // 2 + (len(infix) - len(infix) // 2):]
2942
message = f'{prefix}{infix}{suffix}'
3043

31-
# Object reference - similar to the owner reference, but different.
32-
ref = dict(
33-
apiVersion=obj['apiVersion'],
34-
kind=obj['kind'],
35-
name=obj['metadata']['name'],
36-
uid=obj['metadata']['uid'],
37-
namespace=obj['metadata']['namespace'],
38-
)
39-
4044
meta = kubernetes.client.V1ObjectMeta(
4145
namespace=namespace,
4246
generate_name='kopf-event-',

kopf/engines/posting.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
All the functions to write the Kubernetes events for the Kubernetes objects.
3+
4+
They are used internally in the handling routines to show the progress,
5+
and can be used directly from the handlers to add arbitrary custom events.
6+
7+
The actual k8s-event posting runs in the background,
8+
and posts the k8s-events as soon as they are queued.
9+
"""
10+
import asyncio
11+
import sys
12+
from contextvars import ContextVar
13+
from typing import Mapping, Text, NamedTuple
14+
15+
from kopf import config
16+
from kopf.clients import events
17+
from kopf.structs import dicts
18+
from kopf.structs import hierarchies
19+
20+
event_queue_var: ContextVar[asyncio.Queue] = ContextVar('event_queue_var')
21+
22+
23+
class K8sEvent(NamedTuple):
24+
"""
25+
A single k8s-event to be posted, with all ref-information preserved.
26+
It can exist and be posted even after the object is garbage-collected.
27+
"""
28+
ref: Mapping
29+
type: Text
30+
reason: Text
31+
message: Text
32+
33+
34+
def event(objs, *, type, reason, message=''):
35+
queue = event_queue_var.get()
36+
for obj in dicts.walk(objs):
37+
ref = hierarchies.build_object_reference(obj)
38+
event = K8sEvent(ref=ref, type=type, reason=reason, message=message)
39+
queue.put_nowait(event)
40+
41+
42+
def info(obj, *, reason, message=''):
43+
if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO:
44+
return
45+
event(obj, type='Normal', reason=reason, message=message)
46+
47+
48+
def warn(obj, *, reason, message=''):
49+
if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING:
50+
return
51+
event(obj, type='Warning', reason=reason, message=message)
52+
53+
54+
def exception(obj, *, reason='', message='', exc=None):
55+
if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR:
56+
return
57+
if exc is None:
58+
_, exc, _ = sys.exc_info()
59+
reason = reason if reason else type(exc).__name__
60+
message = f'{message} {exc}' if message and exc else f'{exc}' if exc else f'{message}'
61+
event(obj, type='Error', reason=reason, message=message)
62+
63+
64+
async def poster(
65+
event_queue: asyncio.Queue,
66+
):
67+
"""
68+
Post events in the background as they are queued.
69+
70+
When the events come from the logging system, they have
71+
their reason, type, and other fields adjusted to meet Kubernetes's concepts.
72+
73+
When the events are explicitly defined via `kopf.event` and similar calls,
74+
they have these special fields defined already.
75+
76+
In either case, we pass the queued events directly to the K8s client
77+
(or a client wrapper/adapter), with no extra processing.
78+
79+
This task is defined in this module only because all other tasks are here,
80+
so we keep all forever-running tasks together.
81+
"""
82+
while True:
83+
posted_event = await event_queue.get()
84+
await events.post_event(
85+
ref=posted_event.ref,
86+
type=posted_event.type,
87+
reason=posted_event.reason,
88+
message=posted_event.message)

kopf/events.py

Lines changed: 14 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,20 @@
11
"""
2-
All the functions to write the Kubernetes events on the Kubernetes objects.
3-
4-
They are used internally in the handling routine to show the progress,
5-
and can be used directly from the handlers to add arbitrary custom events.
6-
7-
The events look like this:
8-
9-
kubectl describe -f myres.yaml
10-
...
11-
TODO
12-
2+
**THIS MODULE IS DEPRECATED AND WILL BE REMOVED.**
133
"""
14-
import asyncio
15-
import sys
16-
17-
from kopf import config
18-
from kopf.clients import events
19-
20-
21-
# TODO: rename it it kopf.log()? kopf.events.log()? kopf.events.warn()?
22-
async def event_async(obj, *, type, reason, message=''):
23-
"""
24-
Issue an event for the object.
25-
"""
26-
if isinstance(obj, (list, tuple)):
27-
for item in obj:
28-
await events.post_event(obj=item, type=type, reason=reason, message=message)
29-
else:
30-
await events.post_event(obj=obj, type=type, reason=reason, message=message)
31-
32-
33-
# Shortcuts for the only two officially documented event types as of now.
34-
# However, any arbitrary strings can be used as an event type to the base function.
35-
async def info_async(obj, *, reason, message=''):
36-
if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO:
37-
return
38-
await event_async(obj, reason=reason, message=message, type='Normal')
39-
40-
41-
async def warn_async(obj, *, reason, message=''):
42-
if config.EventsConfig.events_loglevel > config.LOGLEVEL_WARNING:
43-
return
44-
await event_async(obj, reason=reason, message=message, type='Warning')
45-
46-
47-
async def exception_async(obj, *, reason='', message='', exc=None):
48-
if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR:
49-
return
50-
51-
if exc is None:
52-
_, exc, _ = sys.exc_info()
53-
reason = reason if reason else type(exc).__name__
54-
message = f'{message} {exc}' if message else f'{exc}'
55-
await event_async(obj, reason=reason, message=message, type='Error')
56-
57-
58-
# Next 4 funcs are just synchronous interface for async event functions.
59-
def event(obj, *, type, reason, message=''):
60-
asyncio.wait_for(event_async(obj, type=type, reason=reason, message=message), timeout=None)
61-
62-
63-
def info(obj, *, reason, message=''):
64-
asyncio.wait_for(info_async(obj, reason=reason, message=message), timeout=None)
4+
import warnings
655

6+
from kopf.engines.posting import (
7+
event,
8+
info,
9+
warn,
10+
exception,
11+
)
6612

67-
def warn(obj, *, reason, message=''):
68-
asyncio.wait_for(warn_async(obj, reason=reason, message=message), timeout=None)
13+
__all__ = ['event', 'info', 'warn', 'exception']
6914

7015

71-
def exception(obj, *, reason='', message='', exc=None):
72-
asyncio.wait_for(exception_async(obj, reason=reason, message=message, exc=exc), timeout=None)
16+
# Triggered on explicit `import kopf.events` (not imported this way normally).
17+
warnings.warn(
18+
"`kopf.events` is deprecated; "
19+
"use `kopf` directly: e.g. `kopf.event(...)`.",
20+
DeprecationWarning, stacklevel=0)

kopf/reactor/handling.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
from contextvars import ContextVar
2222
from typing import Optional, Callable, Iterable, Union, Collection
2323

24-
from kopf import events
2524
from kopf.clients import patching
25+
from kopf.engines import posting
2626
from kopf.reactor import causation
2727
from kopf.reactor import invocation
2828
from kopf.reactor import registries
@@ -79,6 +79,7 @@ async def custom_object_handler(
7979
resource: registries.Resource,
8080
event: dict,
8181
freeze: asyncio.Event,
82+
event_queue: asyncio.Queue,
8283
) -> None:
8384
"""
8485
Handle a single custom object low-level watch-event.
@@ -98,6 +99,7 @@ async def custom_object_handler(
9899
namespace=body.get('metadata', {}).get('namespace', 'default'),
99100
name=body.get('metadata', {}).get('name', body.get('metadata', {}).get('uid', None)),
100101
))
102+
posting.event_queue_var.set(event_queue) # till the end of this object's task.
101103

102104
# If the global freeze is set for the processing (i.e. other operator overrides), do nothing.
103105
if freeze.is_set():
@@ -205,7 +207,7 @@ async def handle_cause(
205207
done = False
206208
else:
207209
logger.info(f"All handlers succeeded for {title}.")
208-
await events.info_async(cause.body, reason='Success', message=f"All handlers succeeded for {title}.")
210+
posting.info(cause.body, reason='Success', message=f"All handlers succeeded for {title}.")
209211
done = True
210212
else:
211213
skip = True
@@ -383,34 +385,34 @@ async def _execute(
383385
# Definitely retriable error, no matter what is the error-reaction mode.
384386
except HandlerRetryError as e:
385387
logger.exception(f"Handler {handler.id!r} failed with a retry exception. Will retry.")
386-
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
388+
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
387389
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
388390
handlers_left.append(handler)
389391

390392
# Definitely fatal error, no matter what is the error-reaction mode.
391393
except HandlerFatalError as e:
392394
logger.exception(f"Handler {handler.id!r} failed with a fatal exception. Will stop.")
393-
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
395+
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
394396
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
395397
# TODO: report the handling failure somehow (beside logs/events). persistent status?
396398

397399
# Regular errors behave as either retriable or fatal depending on the error-reaction mode.
398400
except Exception as e:
399401
if retry_on_errors:
400402
logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.")
401-
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
403+
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will retry.")
402404
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY)
403405
handlers_left.append(handler)
404406
else:
405407
logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.")
406-
await events.exception_async(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
408+
posting.exception(cause.body, message=f"Handler {handler.id!r} failed. Will stop.")
407409
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
408410
# TODO: report the handling failure somehow (beside logs/events). persistent status?
409411

410412
# No errors means the handler should be excluded from future runs in this reaction cycle.
411413
else:
412414
logger.info(f"Handler {handler.id!r} succeeded.")
413-
await events.info_async(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.")
415+
posting.info(cause.body, reason='Success', message=f"Handler {handler.id!r} succeeded.")
414416
status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result)
415417

416418
# Provoke the retry of the handling cycle if there were any unfinished handlers,

kopf/reactor/queueing.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from kopf import config
3535
from kopf.clients import watching
3636
from kopf.engines import peering
37+
from kopf.engines import posting
3738
from kopf.reactor import handling
3839
from kopf.reactor import lifecycles
3940
from kopf.reactor import registries
@@ -174,9 +175,17 @@ def create_tasks(
174175
# The freezer and the registry are scoped to this whole task-set, to sync them all.
175176
lifecycle = lifecycle if lifecycle is not None else lifecycles.get_default_lifecycle()
176177
registry = registry if registry is not None else registries.get_default_registry()
178+
event_queue = asyncio.Queue()
177179
freeze = asyncio.Event()
178180
tasks = []
179181

182+
# K8s-event posting. Events are queued in-memory and posted in the background.
183+
# NB: currently, it is a global task, but can be made per-resource or per-object.
184+
tasks.extend([
185+
loop.create_task(posting.poster(
186+
event_queue=event_queue)),
187+
])
188+
180189
# Monitor the peers, unless explicitly disabled.
181190
ourselves: Optional[peering.Peer] = peering.Peer.detect(
182191
id=peering.detect_own_id(), priority=priority,
@@ -204,6 +213,7 @@ def create_tasks(
204213
lifecycle=lifecycle,
205214
registry=registry,
206215
resource=resource,
216+
event_queue=event_queue,
207217
freeze=freeze))), # freeze is only checked
208218
])
209219

0 commit comments

Comments
 (0)