Skip to content

Commit 894c1ac

Browse files
committed
Add trio connection.
1 parent 48f9f43 commit 894c1ac

File tree

10 files changed

+2778
-221
lines changed

10 files changed

+2778
-221
lines changed

docs/topics/keepalive.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ measured during the last exchange of Ping and Pong frames::
136136
Alternatively, you can measure the latency at any time by calling
137137
:attr:`~asyncio.connection.Connection.ping` and awaiting its result::
138138

139-
pong_waiter = await websocket.ping()
140-
latency = await pong_waiter
139+
pong_received = await websocket.ping()
140+
latency = await pong_received
141141

142142
Latency between a client and a server may increase for two reasons:
143143

src/websockets/asyncio/connection.py

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ def __init__(
101101
self.close_deadline: float | None = None
102102

103103
# Protect sending fragmented messages.
104-
self.fragmented_send_waiter: asyncio.Future[None] | None = None
104+
self.send_in_progress: asyncio.Future[None] | None = None
105105

106106
# Mapping of ping IDs to pong waiters, in chronological order.
107-
self.pong_waiters: dict[bytes, tuple[asyncio.Future[float], float]] = {}
107+
self.pending_pings: dict[bytes, tuple[asyncio.Future[float], float]] = {}
108108

109109
self.latency: float = 0
110110
"""
@@ -468,8 +468,8 @@ async def send(
468468
"""
469469
# While sending a fragmented message, prevent sending other messages
470470
# until all fragments are sent.
471-
while self.fragmented_send_waiter is not None:
472-
await asyncio.shield(self.fragmented_send_waiter)
471+
while self.send_in_progress is not None:
472+
await asyncio.shield(self.send_in_progress)
473473

474474
# Unfragmented message -- this case must be handled first because
475475
# strings and bytes-like objects are iterable.
@@ -502,8 +502,8 @@ async def send(
502502
except StopIteration:
503503
return
504504

505-
assert self.fragmented_send_waiter is None
506-
self.fragmented_send_waiter = self.loop.create_future()
505+
assert self.send_in_progress is None
506+
self.send_in_progress = self.loop.create_future()
507507
try:
508508
# First fragment.
509509
if isinstance(chunk, str):
@@ -549,8 +549,8 @@ async def send(
549549
raise
550550

551551
finally:
552-
self.fragmented_send_waiter.set_result(None)
553-
self.fragmented_send_waiter = None
552+
self.send_in_progress.set_result(None)
553+
self.send_in_progress = None
554554

555555
# Fragmented message -- async iterator.
556556

@@ -561,8 +561,8 @@ async def send(
561561
except StopAsyncIteration:
562562
return
563563

564-
assert self.fragmented_send_waiter is None
565-
self.fragmented_send_waiter = self.loop.create_future()
564+
assert self.send_in_progress is None
565+
self.send_in_progress = self.loop.create_future()
566566
try:
567567
# First fragment.
568568
if isinstance(chunk, str):
@@ -610,8 +610,8 @@ async def send(
610610
raise
611611

612612
finally:
613-
self.fragmented_send_waiter.set_result(None)
614-
self.fragmented_send_waiter = None
613+
self.send_in_progress.set_result(None)
614+
self.send_in_progress = None
615615

616616
else:
617617
raise TypeError("data must be str, bytes, iterable, or async iterable")
@@ -635,7 +635,7 @@ async def close(self, code: int = 1000, reason: str = "") -> None:
635635
# The context manager takes care of waiting for the TCP connection
636636
# to terminate after calling a method that sends a close frame.
637637
async with self.send_context():
638-
if self.fragmented_send_waiter is not None:
638+
if self.send_in_progress is not None:
639639
self.protocol.fail(
640640
CloseCode.INTERNAL_ERROR,
641641
"close during fragmented message",
@@ -677,9 +677,9 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
677677
678678
::
679679
680-
pong_waiter = await ws.ping()
680+
pong_received = await ws.ping()
681681
# only if you want to wait for the corresponding pong
682-
latency = await pong_waiter
682+
latency = await pong_received
683683
684684
Raises:
685685
ConnectionClosed: When the connection is closed.
@@ -696,19 +696,19 @@ async def ping(self, data: Data | None = None) -> Awaitable[float]:
696696

697697
async with self.send_context():
698698
# Protect against duplicates if a payload is explicitly set.
699-
if data in self.pong_waiters:
699+
if data in self.pending_pings:
700700
raise ConcurrencyError("already waiting for a pong with the same data")
701701

702702
# Generate a unique random payload otherwise.
703-
while data is None or data in self.pong_waiters:
703+
while data is None or data in self.pending_pings:
704704
data = struct.pack("!I", random.getrandbits(32))
705705

706-
pong_waiter = self.loop.create_future()
706+
pong_received = self.loop.create_future()
707707
# The event loop's default clock is time.monotonic(). Its resolution
708708
# is a bit low on Windows (~16ms). This is improved in Python 3.13.
709-
self.pong_waiters[data] = (pong_waiter, self.loop.time())
709+
self.pending_pings[data] = (pong_received, self.loop.time())
710710
self.protocol.send_ping(data)
711-
return pong_waiter
711+
return pong_received
712712

713713
async def pong(self, data: Data = b"") -> None:
714714
"""
@@ -757,7 +757,7 @@ def acknowledge_pings(self, data: bytes) -> None:
757757
758758
"""
759759
# Ignore unsolicited pong.
760-
if data not in self.pong_waiters:
760+
if data not in self.pending_pings:
761761
return
762762

763763
pong_timestamp = self.loop.time()
@@ -766,20 +766,20 @@ def acknowledge_pings(self, data: bytes) -> None:
766766
# Acknowledge all previous pings too in that case.
767767
ping_id = None
768768
ping_ids = []
769-
for ping_id, (pong_waiter, ping_timestamp) in self.pong_waiters.items():
769+
for ping_id, (pong_received, ping_timestamp) in self.pending_pings.items():
770770
ping_ids.append(ping_id)
771771
latency = pong_timestamp - ping_timestamp
772-
if not pong_waiter.done():
773-
pong_waiter.set_result(latency)
772+
if not pong_received.done():
773+
pong_received.set_result(latency)
774774
if ping_id == data:
775775
self.latency = latency
776776
break
777777
else:
778778
raise AssertionError("solicited pong not found in pings")
779779

780-
# Remove acknowledged pings from self.pong_waiters.
780+
# Remove acknowledged pings from self.pending_pings.
781781
for ping_id in ping_ids:
782-
del self.pong_waiters[ping_id]
782+
del self.pending_pings[ping_id]
783783

784784
def abort_pings(self) -> None:
785785
"""
@@ -791,16 +791,16 @@ def abort_pings(self) -> None:
791791
assert self.protocol.state is CLOSED
792792
exc = self.protocol.close_exc
793793

794-
for pong_waiter, _ping_timestamp in self.pong_waiters.values():
795-
if not pong_waiter.done():
796-
pong_waiter.set_exception(exc)
794+
for pong_received, _ping_timestamp in self.pending_pings.values():
795+
if not pong_received.done():
796+
pong_received.set_exception(exc)
797797
# If the exception is never retrieved, it will be logged when ping
798798
# is garbage-collected. This is confusing for users.
799799
# Given that ping is done (with an exception), canceling it does
800800
# nothing, but it prevents logging the exception.
801-
pong_waiter.cancel()
801+
pong_received.cancel()
802802

803-
self.pong_waiters.clear()
803+
self.pending_pings.clear()
804804

805805
async def keepalive(self) -> None:
806806
"""
@@ -821,7 +821,7 @@ async def keepalive(self) -> None:
821821
# connection to be closed before raising ConnectionClosed.
822822
# However, connection_lost() cancels keepalive_task before
823823
# it gets a chance to resume excuting.
824-
pong_waiter = await self.ping()
824+
pong_received = await self.ping()
825825
if self.debug:
826826
self.logger.debug("% sent keepalive ping")
827827

@@ -830,9 +830,9 @@ async def keepalive(self) -> None:
830830
async with asyncio_timeout(self.ping_timeout):
831831
# connection_lost cancels keepalive immediately
832832
# after setting a ConnectionClosed exception on
833-
# pong_waiter. A CancelledError is raised here,
833+
# pong_received. A CancelledError is raised here,
834834
# not a ConnectionClosed exception.
835-
latency = await pong_waiter
835+
latency = await pong_received
836836
self.logger.debug("% received keepalive pong")
837837
except asyncio.TimeoutError:
838838
if self.debug:
@@ -1201,7 +1201,7 @@ def broadcast(
12011201
if connection.protocol.state is not OPEN:
12021202
continue
12031203

1204-
if connection.fragmented_send_waiter is not None:
1204+
if connection.send_in_progress is not None:
12051205
if raise_exceptions:
12061206
exception = ConcurrencyError("sending a fragmented message")
12071207
exceptions.append(exception)

src/websockets/sync/connection.py

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __init__(
104104
self.send_in_progress = False
105105

106106
# Mapping of ping IDs to pong waiters, in chronological order.
107-
self.pong_waiters: dict[bytes, tuple[threading.Event, float, bool]] = {}
107+
self.pending_pings: dict[bytes, tuple[threading.Event, float, bool]] = {}
108108

109109
self.latency: float = 0
110110
"""
@@ -629,8 +629,9 @@ def ping(
629629
630630
::
631631
632-
pong_event = ws.ping()
633-
pong_event.wait() # only if you want to wait for the pong
632+
pong_received = ws.ping()
633+
# only if you want to wait for the corresponding pong
634+
pong_received.wait()
634635
635636
Raises:
636637
ConnectionClosed: When the connection is closed.
@@ -647,17 +648,17 @@ def ping(
647648

648649
with self.send_context():
649650
# Protect against duplicates if a payload is explicitly set.
650-
if data in self.pong_waiters:
651+
if data in self.pending_pings:
651652
raise ConcurrencyError("already waiting for a pong with the same data")
652653

653654
# Generate a unique random payload otherwise.
654-
while data is None or data in self.pong_waiters:
655+
while data is None or data in self.pending_pings:
655656
data = struct.pack("!I", random.getrandbits(32))
656657

657-
pong_waiter = threading.Event()
658-
self.pong_waiters[data] = (pong_waiter, time.monotonic(), ack_on_close)
658+
pong_received = threading.Event()
659+
self.pending_pings[data] = (pong_received, time.monotonic(), ack_on_close)
659660
self.protocol.send_ping(data)
660-
return pong_waiter
661+
return pong_received
661662

662663
def pong(self, data: Data = b"") -> None:
663664
"""
@@ -707,7 +708,7 @@ def acknowledge_pings(self, data: bytes) -> None:
707708
"""
708709
with self.protocol_mutex:
709710
# Ignore unsolicited pong.
710-
if data not in self.pong_waiters:
711+
if data not in self.pending_pings:
711712
return
712713

713714
pong_timestamp = time.monotonic()
@@ -717,21 +718,21 @@ def acknowledge_pings(self, data: bytes) -> None:
717718
ping_id = None
718719
ping_ids = []
719720
for ping_id, (
720-
pong_waiter,
721+
pong_received,
721722
ping_timestamp,
722723
_ack_on_close,
723-
) in self.pong_waiters.items():
724+
) in self.pending_pings.items():
724725
ping_ids.append(ping_id)
725-
pong_waiter.set()
726+
pong_received.set()
726727
if ping_id == data:
727728
self.latency = pong_timestamp - ping_timestamp
728729
break
729730
else:
730731
raise AssertionError("solicited pong not found in pings")
731732

732-
# Remove acknowledged pings from self.pong_waiters.
733+
# Remove acknowledged pings from self.pending_pings.
733734
for ping_id in ping_ids:
734-
del self.pong_waiters[ping_id]
735+
del self.pending_pings[ping_id]
735736

736737
def acknowledge_pending_pings(self) -> None:
737738
"""
@@ -740,11 +741,11 @@ def acknowledge_pending_pings(self) -> None:
740741
"""
741742
assert self.protocol.state is CLOSED
742743

743-
for pong_waiter, _ping_timestamp, ack_on_close in self.pong_waiters.values():
744+
for pong_received, _ping_timestamp, ack_on_close in self.pending_pings.values():
744745
if ack_on_close:
745-
pong_waiter.set()
746+
pong_received.set()
746747

747-
self.pong_waiters.clear()
748+
self.pending_pings.clear()
748749

749750
def keepalive(self) -> None:
750751
"""
@@ -762,15 +763,14 @@ def keepalive(self) -> None:
762763
break
763764

764765
try:
765-
pong_waiter = self.ping(ack_on_close=True)
766+
pong_received = self.ping(ack_on_close=True)
766767
except ConnectionClosed:
767768
break
768769
if self.debug:
769770
self.logger.debug("% sent keepalive ping")
770771

771772
if self.ping_timeout is not None:
772-
#
773-
if pong_waiter.wait(self.ping_timeout):
773+
if pong_received.wait(self.ping_timeout):
774774
if self.debug:
775775
self.logger.debug("% received keepalive pong")
776776
else:
@@ -804,7 +804,7 @@ def recv_events(self) -> None:
804804
805805
Run this method in a thread as long as the connection is alive.
806806
807-
``recv_events()`` exits immediately when the ``self.socket`` is closed.
807+
``recv_events()`` exits immediately when ``self.socket`` is closed.
808808
809809
"""
810810
try:
@@ -979,6 +979,7 @@ def send_context(
979979
# Minor layering violation: we assume that the connection
980980
# will be closing soon if it isn't in the expected state.
981981
wait_for_close = True
982+
# TODO: calculate close deadline if not set?
982983
raise_close_exc = True
983984

984985
# To avoid a deadlock, release the connection lock by exiting the

0 commit comments

Comments
 (0)