@@ -79,7 +79,7 @@ class Anycast(Generic[_T]):
79
79
When the channel is not needed anymore, it should be closed with the
80
80
[`close()`][frequenz.channels.Anycast.close] method. This will prevent further
81
81
attempts to [`send()`][frequenz.channels.Sender.send] data. Receivers will still be
82
- able to drain the pending values on the channel, but after that, subsequent
82
+ able to drain the pending messages on the channel, but after that, subsequent
83
83
[`receive()`][frequenz.channels.Receiver.receive] calls will raise a
84
84
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception.
85
85
@@ -101,9 +101,9 @@ class Anycast(Generic[_T]):
101
101
102
102
103
103
async def send(sender: Sender[int]) -> None:
104
- for msg in range(3):
105
- print(f"sending {msg }")
106
- await sender.send(msg )
104
+ for message in range(3):
105
+ print(f"sending {message }")
106
+ await sender.send(message )
107
107
108
108
109
109
async def main() -> None:
@@ -115,8 +115,8 @@ async def main() -> None:
115
115
async with asyncio.TaskGroup() as task_group:
116
116
task_group.create_task(send(sender))
117
117
for _ in range(3):
118
- msg = await receiver.receive()
119
- print(f"received {msg }")
118
+ message = await receiver.receive()
119
+ print(f"received {message }")
120
120
await asyncio.sleep(0.1) # sleep (or work) with the data
121
121
122
122
@@ -146,15 +146,15 @@ async def main() -> None:
146
146
147
147
148
148
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
149
- for msg in range(start, stop):
150
- print(f"{name} sending {msg }")
151
- await sender.send(msg )
149
+ for message in range(start, stop):
150
+ print(f"{name} sending {message }")
151
+ await sender.send(message )
152
152
153
153
154
154
async def recv(name: str, receiver: Receiver[int]) -> None:
155
155
try:
156
- async for msg in receiver:
157
- print(f"{name} received {msg }")
156
+ async for message in receiver:
157
+ print(f"{name} received {message }")
158
158
await asyncio.sleep(0.1) # sleep (or work) with the data
159
159
except ReceiverStoppedError:
160
160
pass
@@ -181,15 +181,15 @@ async def main() -> None:
181
181
sender_1 sending 11
182
182
sender_1 sending 12
183
183
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
184
- consumes a value
184
+ consumes a message
185
185
sender_2 sending 20
186
186
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
187
- consumes a value
187
+ consumes a message
188
188
receiver_1 received 10
189
189
receiver_1 received 11
190
190
sender_2 sending 21
191
191
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
192
- consumes a value
192
+ consumes a message
193
193
receiver_1 received 12
194
194
receiver_1 received 20
195
195
receiver_1 received 21
@@ -219,16 +219,16 @@ def __init__(self, *, name: str, limit: int = 10) -> None:
219
219
self ._send_cv : Condition = Condition ()
220
220
"""The condition to wait for free space in the channel's buffer.
221
221
222
- If the channel's buffer is full, then the sender waits for values to
222
+ If the channel's buffer is full, then the sender waits for messages to
223
223
get consumed using this condition until there's some free space
224
224
available in the channel's buffer.
225
225
"""
226
226
227
227
self ._recv_cv : Condition = Condition ()
228
- """The condition to wait for values in the channel's buffer.
228
+ """The condition to wait for messages in the channel's buffer.
229
229
230
- If the channel's buffer is empty, then the receiver waits for values
231
- using this condition until there's a value available in the channel's
230
+ If the channel's buffer is empty, then the receiver waits for messages
231
+ using this condition until there's a message available in the channel's
232
232
buffer.
233
233
"""
234
234
@@ -255,11 +255,11 @@ def is_closed(self) -> bool:
255
255
256
256
@property
257
257
def limit (self ) -> int :
258
- """The maximum number of values that can be stored in the channel's buffer.
258
+ """The maximum number of messages that can be stored in the channel's buffer.
259
259
260
260
If the length of channel's buffer reaches the limit, then the sender
261
261
blocks at the [send()][frequenz.channels.Sender.send] method until
262
- a value is consumed.
262
+ a message is consumed.
263
263
"""
264
264
maxlen = self ._deque .maxlen
265
265
assert maxlen is not None
@@ -271,7 +271,7 @@ async def close(self) -> None:
271
271
Any further attempts to [send()][frequenz.channels.Sender.send] data
272
272
will return `False`.
273
273
274
- Receivers will still be able to drain the pending values on the channel,
274
+ Receivers will still be able to drain the pending messages on the channel,
275
275
but after that, subsequent
276
276
[receive()][frequenz.channels.Receiver.receive] calls will return `None`
277
277
immediately.
@@ -309,16 +309,16 @@ class _Sender(Sender[_T]):
309
309
method.
310
310
"""
311
311
312
- def __init__ (self , chan : Anycast [_T ]) -> None :
312
+ def __init__ (self , channel : Anycast [_T ], / ) -> None :
313
313
"""Initialize this sender.
314
314
315
315
Args:
316
- chan : A reference to the channel that this sender belongs to.
316
+ channel : A reference to the channel that this sender belongs to.
317
317
"""
318
- self ._chan : Anycast [_T ] = chan
318
+ self ._channel : Anycast [_T ] = channel
319
319
"""The channel that this sender belongs to."""
320
320
321
- async def send (self , msg : _T ) -> None :
321
+ async def send (self , message : _T , / ) -> None :
322
322
"""Send a message across the channel.
323
323
324
324
To send, this method inserts the message into the Anycast channel's
@@ -327,47 +327,47 @@ async def send(self, msg: _T) -> None:
327
327
message will be received by exactly one receiver.
328
328
329
329
Args:
330
- msg : The message to be sent.
330
+ message : The message to be sent.
331
331
332
332
Raises:
333
333
SenderError: If the underlying channel was closed.
334
334
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
335
335
set as the cause.
336
336
"""
337
337
# pylint: disable=protected-access
338
- if self ._chan ._closed :
338
+ if self ._channel ._closed :
339
339
raise SenderError ("The channel was closed" , self ) from ChannelClosedError (
340
- self ._chan
340
+ self ._channel
341
341
)
342
- if len (self ._chan ._deque ) == self ._chan ._deque .maxlen :
342
+ if len (self ._channel ._deque ) == self ._channel ._deque .maxlen :
343
343
_logger .warning (
344
344
"Anycast channel [%s] is full, blocking sender until a receiver "
345
- "consumes a value " ,
345
+ "consumes a message " ,
346
346
self ,
347
347
)
348
- while len (self ._chan ._deque ) == self ._chan ._deque .maxlen :
349
- async with self ._chan ._send_cv :
350
- await self ._chan ._send_cv .wait ()
348
+ while len (self ._channel ._deque ) == self ._channel ._deque .maxlen :
349
+ async with self ._channel ._send_cv :
350
+ await self ._channel ._send_cv .wait ()
351
351
_logger .info (
352
352
"Anycast channel [%s] has space again, resuming the blocked sender" ,
353
353
self ,
354
354
)
355
- self ._chan ._deque .append (msg )
356
- async with self ._chan ._recv_cv :
357
- self ._chan ._recv_cv .notify (1 )
355
+ self ._channel ._deque .append (message )
356
+ async with self ._channel ._recv_cv :
357
+ self ._channel ._recv_cv .notify (1 )
358
358
# pylint: enable=protected-access
359
359
360
360
def __str__ (self ) -> str :
361
361
"""Return a string representation of this sender."""
362
- return f"{ self ._chan } :{ type (self ).__name__ } "
362
+ return f"{ self ._channel } :{ type (self ).__name__ } "
363
363
364
364
def __repr__ (self ) -> str :
365
365
"""Return a string representation of this sender."""
366
- return f"{ type (self ).__name__ } ({ self ._chan !r} )"
366
+ return f"{ type (self ).__name__ } ({ self ._channel !r} )"
367
367
368
368
369
369
class _Empty :
370
- """A sentinel value to indicate that a value has not been set."""
370
+ """A sentinel to indicate that a message has not been set."""
371
371
372
372
373
373
class _Receiver (Receiver [_T ]):
@@ -377,21 +377,21 @@ class _Receiver(Receiver[_T]):
377
377
method.
378
378
"""
379
379
380
- def __init__ (self , chan : Anycast [_T ]) -> None :
380
+ def __init__ (self , channel : Anycast [_T ], / ) -> None :
381
381
"""Initialize this receiver.
382
382
383
383
Args:
384
- chan : A reference to the channel that this receiver belongs to.
384
+ channel : A reference to the channel that this receiver belongs to.
385
385
"""
386
- self ._chan : Anycast [_T ] = chan
386
+ self ._channel : Anycast [_T ] = channel
387
387
"""The channel that this receiver belongs to."""
388
388
389
389
self ._next : _T | type [_Empty ] = _Empty
390
390
391
391
async def ready (self ) -> bool :
392
- """Wait until the receiver is ready with a value or an error.
392
+ """Wait until the receiver is ready with a message or an error.
393
393
394
- Once a call to `ready()` has finished, the value should be read with
394
+ Once a call to `ready()` has finished, the message should be read with
395
395
a call to `consume()` (`receive()` or iterated over). The receiver will
396
396
remain ready (this method will return immediately) until it is
397
397
consumed.
@@ -404,31 +404,31 @@ async def ready(self) -> bool:
404
404
return True
405
405
406
406
# pylint: disable=protected-access
407
- while len (self ._chan ._deque ) == 0 :
408
- if self ._chan ._closed :
407
+ while len (self ._channel ._deque ) == 0 :
408
+ if self ._channel ._closed :
409
409
return False
410
- async with self ._chan ._recv_cv :
411
- await self ._chan ._recv_cv .wait ()
412
- self ._next = self ._chan ._deque .popleft ()
413
- async with self ._chan ._send_cv :
414
- self ._chan ._send_cv .notify (1 )
410
+ async with self ._channel ._recv_cv :
411
+ await self ._channel ._recv_cv .wait ()
412
+ self ._next = self ._channel ._deque .popleft ()
413
+ async with self ._channel ._send_cv :
414
+ self ._channel ._send_cv .notify (1 )
415
415
# pylint: enable=protected-access
416
416
return True
417
417
418
418
def consume (self ) -> _T :
419
- """Return the latest value once `ready()` is complete.
419
+ """Return the latest message once `ready()` is complete.
420
420
421
421
Returns:
422
- The next value that was received.
422
+ The next message that was received.
423
423
424
424
Raises:
425
425
ReceiverStoppedError: If the receiver stopped producing messages.
426
426
ReceiverError: If there is some problem with the receiver.
427
427
"""
428
428
if ( # pylint: disable=protected-access
429
- self ._next is _Empty and self ._chan ._closed
429
+ self ._next is _Empty and self ._channel ._closed
430
430
):
431
- raise ReceiverStoppedError (self ) from ChannelClosedError (self ._chan )
431
+ raise ReceiverStoppedError (self ) from ChannelClosedError (self ._channel )
432
432
433
433
assert (
434
434
self ._next is not _Empty
@@ -442,8 +442,8 @@ def consume(self) -> _T:
442
442
443
443
def __str__ (self ) -> str :
444
444
"""Return a string representation of this receiver."""
445
- return f"{ self ._chan } :{ type (self ).__name__ } "
445
+ return f"{ self ._channel } :{ type (self ).__name__ } "
446
446
447
447
def __repr__ (self ) -> str :
448
448
"""Return a string representation of this receiver."""
449
- return f"{ type (self ).__name__ } ({ self ._chan !r} )"
449
+ return f"{ type (self ).__name__ } ({ self ._channel !r} )"
0 commit comments