Skip to content

Commit 0c48574

Browse files
committed
improve ping handling
- do not send PINGREQ unnecessarily - send PINGREQ even on long loop timeouts fixes #198
1 parent 70faa4f commit 0c48574

File tree

2 files changed

+176
-13
lines changed

2 files changed

+176
-13
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ def __init__(
226226
self._is_connected = False
227227
self._msg_size_lim = MQTT_MSG_SZ_LIM
228228
self._pid = 0
229-
self._timestamp: float = 0
229+
self._last_msg_sent_timestamp: float = 0
230230
self.logger = NullLogger()
231231
"""An optional logging attribute that can be set with with a Logger
232232
to enable debug logging."""
@@ -640,6 +640,7 @@ def _connect(
640640
if self._username is not None:
641641
self._send_str(self._username)
642642
self._send_str(self._password)
643+
self._last_msg_sent_timestamp = self.get_monotonic_time()
643644
self.logger.debug("Receiving CONNACK packet from broker")
644645
stamp = self.get_monotonic_time()
645646
while True:
@@ -694,6 +695,7 @@ def disconnect(self) -> None:
694695
self._sock.close()
695696
self._is_connected = False
696697
self._subscribed_topics = []
698+
self._last_msg_sent_timestamp = 0
697699
if self.on_disconnect is not None:
698700
self.on_disconnect(self, self.user_data, 0)
699701

@@ -707,6 +709,7 @@ def ping(self) -> list[int]:
707709
self._sock.send(MQTT_PINGREQ)
708710
ping_timeout = self.keep_alive
709711
stamp = self.get_monotonic_time()
712+
self._last_msg_sent_timestamp = stamp
710713
rc, rcs = None, []
711714
while rc != MQTT_PINGRESP:
712715
rc = self._wait_for_msg()
@@ -781,6 +784,7 @@ def publish(
781784
self._sock.send(pub_hdr_fixed)
782785
self._sock.send(pub_hdr_var)
783786
self._sock.send(msg)
787+
self._last_msg_sent_timestamp = self.get_monotonic_time()
784788
if qos == 0 and self.on_publish is not None:
785789
self.on_publish(self, self.user_data, topic, self._pid)
786790
if qos == 1:
@@ -858,6 +862,7 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
858862
self.logger.debug(f"payload: {payload}")
859863
self._sock.send(payload)
860864
stamp = self.get_monotonic_time()
865+
self._last_msg_sent_timestamp = stamp
861866
while True:
862867
op = self._wait_for_msg()
863868
if op is None:
@@ -933,6 +938,7 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
933938
for t in topics:
934939
self.logger.debug(f"UNSUBSCRIBING from topic {t}")
935940
self._sock.send(payload)
941+
self._last_msg_sent_timestamp = self.get_monotonic_time()
936942
self.logger.debug("Waiting for UNSUBACK...")
937943
while True:
938944
stamp = self.get_monotonic_time()
@@ -1022,7 +1028,6 @@ def reconnect(self, resub_topics: bool = True) -> int:
10221028
return ret
10231029

10241030
def loop(self, timeout: float = 0) -> Optional[list[int]]:
1025-
# pylint: disable = too-many-return-statements
10261031
"""Non-blocking message loop. Use this method to check for incoming messages.
10271032
Returns list of packet types of any messages received or None.
10281033
@@ -1031,22 +1036,26 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
10311036
"""
10321037
self._connected()
10331038
self.logger.debug(f"waiting for messages for {timeout} seconds")
1034-
if self._timestamp == 0:
1035-
self._timestamp = self.get_monotonic_time()
1036-
current_time = self.get_monotonic_time()
1037-
if current_time - self._timestamp >= self.keep_alive:
1038-
self._timestamp = 0
1039-
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
1040-
self.logger.debug(
1041-
"KeepAlive period elapsed - requesting a PINGRESP from the server..."
1042-
)
1043-
rcs = self.ping()
1044-
return rcs
10451039

10461040
stamp = self.get_monotonic_time()
10471041
rcs = []
10481042

10491043
while True:
1044+
if (
1045+
self.get_monotonic_time() - self._last_msg_sent_timestamp
1046+
>= self.keep_alive
1047+
):
1048+
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
1049+
self.logger.debug(
1050+
"KeepAlive period elapsed - requesting a PINGRESP from the server..."
1051+
)
1052+
rcs.extend(self.ping())
1053+
# ping() itself contains a _wait_for_msg() loop which might have taken a while,
1054+
# so check here as well.
1055+
if self.get_monotonic_time() - stamp > timeout:
1056+
self.logger.debug(f"Loop timed out after {timeout} seconds")
1057+
break
1058+
10501059
rc = self._wait_for_msg()
10511060
if rc is not None:
10521061
rcs.append(rc)

tests/test_loop.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,99 @@
88
import socket
99
import ssl
1010
import time
11+
import errno
12+
1113
from unittest import TestCase, main
1214
from unittest.mock import patch
15+
from unittest import mock
1316

1417
import adafruit_minimqtt.adafruit_minimqtt as MQTT
1518

1619

20+
class Nulltet:
21+
"""
22+
Mock Socket that does nothing.
23+
24+
Inspired by the Mocket class from Adafruit_CircuitPython_Requests
25+
"""
26+
27+
def __init__(self):
28+
self.sent = bytearray()
29+
30+
self.timeout = mock.Mock()
31+
self.connect = mock.Mock()
32+
self.close = mock.Mock()
33+
34+
def send(self, bytes_to_send):
35+
"""
36+
Record the bytes. return the length of this bytearray.
37+
"""
38+
self.sent.extend(bytes_to_send)
39+
return len(bytes_to_send)
40+
41+
# MiniMQTT checks for the presence of "recv_into" and switches behavior based on that.
42+
# pylint: disable=unused-argument,no-self-use
43+
def recv_into(self, retbuf, bufsize):
44+
"""Always raise timeout exception."""
45+
exc = OSError()
46+
exc.errno = errno.ETIMEDOUT
47+
raise exc
48+
49+
50+
class Pingtet:
51+
"""
52+
Mock Socket tailored for PINGREQ testing.
53+
Records sent data, hands out PINGRESP for each PINGREQ received.
54+
55+
Inspired by the Mocket class from Adafruit_CircuitPython_Requests
56+
"""
57+
58+
PINGRESP = bytearray([0xD0, 0x00])
59+
60+
def __init__(self):
61+
self._to_send = self.PINGRESP
62+
63+
self.sent = bytearray()
64+
65+
self.timeout = mock.Mock()
66+
self.connect = mock.Mock()
67+
self.close = mock.Mock()
68+
69+
self._got_pingreq = False
70+
71+
def send(self, bytes_to_send):
72+
"""
73+
Recognize PINGREQ and record the indication that it was received.
74+
Assumes it was sent in one chunk (of 2 bytes).
75+
Also record the bytes. return the length of this bytearray.
76+
"""
77+
self.sent.extend(bytes_to_send)
78+
if bytes_to_send == b"\xc0\0":
79+
self._got_pingreq = True
80+
return len(bytes_to_send)
81+
82+
# MiniMQTT checks for the presence of "recv_into" and switches behavior based on that.
83+
def recv_into(self, retbuf, bufsize):
84+
"""
85+
If the PINGREQ indication is on, return PINGRESP, otherwise raise timeout exception.
86+
"""
87+
if self._got_pingreq:
88+
size = min(bufsize, len(self._to_send))
89+
if size == 0:
90+
return size
91+
chop = self._to_send[0:size]
92+
retbuf[0:] = chop
93+
self._to_send = self._to_send[size:]
94+
if len(self._to_send) == 0:
95+
self._got_pingreq = False
96+
self._to_send = self.PINGRESP
97+
return size
98+
99+
exc = OSError()
100+
exc.errno = errno.ETIMEDOUT
101+
raise exc
102+
103+
17104
class Loop(TestCase):
18105
"""basic loop() test"""
19106

@@ -54,6 +141,8 @@ def test_loop_basic(self) -> None:
54141

55142
time_before = time.monotonic()
56143
timeout = random.randint(3, 8)
144+
# pylint: disable=protected-access
145+
mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time()
57146
rcs = mqtt_client.loop(timeout=timeout)
58147
time_after = time.monotonic()
59148

@@ -64,6 +153,7 @@ def test_loop_basic(self) -> None:
64153
assert rcs is not None
65154
assert len(rcs) > 1
66155
expected_rc = self.INITIAL_RCS_VAL
156+
# pylint: disable=not-an-iterable
67157
for ret_code in rcs:
68158
assert ret_code == expected_rc
69159
expected_rc += 1
@@ -84,6 +174,70 @@ def test_loop_is_connected(self):
84174

85175
assert "not connected" in str(context.exception)
86176

177+
# pylint: disable=no-self-use
178+
def test_loop_ping_timeout(self):
179+
"""Verify that ping will be sent even with loop timeout bigger than keep alive timeout
180+
and no outgoing messages are sent."""
181+
182+
recv_timeout = 2
183+
keep_alive_timeout = recv_timeout * 2
184+
mqtt_client = MQTT.MQTT(
185+
broker="localhost",
186+
port=1883,
187+
ssl_context=ssl.create_default_context(),
188+
connect_retries=1,
189+
socket_timeout=1,
190+
recv_timeout=recv_timeout,
191+
keep_alive=keep_alive_timeout,
192+
)
193+
194+
# patch is_connected() to avoid CONNECT/CONNACK handling.
195+
mqtt_client.is_connected = lambda: True
196+
mocket = Pingtet()
197+
# pylint: disable=protected-access
198+
mqtt_client._sock = mocket
199+
200+
start = time.monotonic()
201+
res = mqtt_client.loop(timeout=2 * keep_alive_timeout)
202+
assert time.monotonic() - start >= 2 * keep_alive_timeout
203+
assert len(mocket.sent) > 0
204+
assert len(res) == 2
205+
assert set(res) == {int(0xD0)}
206+
207+
# pylint: disable=no-self-use
208+
def test_loop_ping_vs_msgs_sent(self):
209+
"""Verify that ping will not be sent unnecessarily."""
210+
211+
recv_timeout = 2
212+
keep_alive_timeout = recv_timeout * 2
213+
mqtt_client = MQTT.MQTT(
214+
broker="localhost",
215+
port=1883,
216+
ssl_context=ssl.create_default_context(),
217+
connect_retries=1,
218+
socket_timeout=1,
219+
recv_timeout=recv_timeout,
220+
keep_alive=keep_alive_timeout,
221+
)
222+
223+
# patch is_connected() to avoid CONNECT/CONNACK handling.
224+
mqtt_client.is_connected = lambda: True
225+
226+
mocket = Nulltet()
227+
# pylint: disable=protected-access
228+
mqtt_client._sock = mocket
229+
230+
i = 0
231+
topic = "foo"
232+
message = "bar"
233+
for _ in range(3 * keep_alive_timeout):
234+
mqtt_client.publish(topic, message)
235+
mqtt_client.loop(1)
236+
i += 1
237+
238+
# This means no other messages than the PUBLISH messages generated by the code above.
239+
assert len(mocket.sent) == i * (2 + 2 + len(topic) + len(message))
240+
87241

88242
if __name__ == "__main__":
89243
main()

0 commit comments

Comments
 (0)