Skip to content

Commit 5f222c2

Browse files
authored
Merge pull request #210 from kevin-tritz/timestamp_ns
maintain time.monotonic precision by using ns integer timestamps
2 parents f871143 + 2d562af commit 5f222c2

File tree

5 files changed

+31
-57
lines changed

5 files changed

+31
-57
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 25 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from random import randint
3737

3838
from adafruit_connection_manager import get_connection_manager
39+
from adafruit_ticks import ticks_ms, ticks_diff
3940

4041
try:
4142
from typing import List, Optional, Tuple, Type, Union
@@ -133,8 +134,6 @@ class MQTT:
133134
This works with all callbacks but the "on_message" and those added via add_topic_callback();
134135
for those, to get access to the user_data use the 'user_data' member of the MQTT object
135136
passed as 1st argument.
136-
:param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set
137-
this to True in order to operate correctly over more than 24 days or so
138137
139138
"""
140139

@@ -156,7 +155,6 @@ def __init__(
156155
socket_timeout: int = 1,
157156
connect_retries: int = 5,
158157
user_data=None,
159-
use_imprecise_time: Optional[bool] = None,
160158
) -> None:
161159
self._connection_manager = get_connection_manager(socket_pool)
162160
self._socket_pool = socket_pool
@@ -165,20 +163,6 @@ def __init__(
165163
self._backwards_compatible_sock = False
166164
self._use_binary_mode = use_binary_mode
167165

168-
self.use_monotonic_ns = False
169-
try:
170-
time.monotonic_ns()
171-
self.use_monotonic_ns = True
172-
except AttributeError:
173-
if use_imprecise_time:
174-
self.use_monotonic_ns = False
175-
else:
176-
raise MMQTTException( # pylint: disable=raise-missing-from
177-
"time.monotonic_ns() is not available. "
178-
"Will use imprecise time however only if the"
179-
"use_imprecise_time argument is set to True."
180-
)
181-
182166
if recv_timeout <= socket_timeout:
183167
raise MMQTTException(
184168
"recv_timeout must be strictly greater than socket_timeout"
@@ -191,7 +175,7 @@ def __init__(
191175
self._is_connected = False
192176
self._msg_size_lim = MQTT_MSG_SZ_LIM
193177
self._pid = 0
194-
self._last_msg_sent_timestamp: float = 0
178+
self._last_msg_sent_timestamp: int = 0
195179
self.logger = NullLogger()
196180
"""An optional logging attribute that can be set with with a Logger
197181
to enable debug logging."""
@@ -230,7 +214,7 @@ def __init__(
230214
self.client_id = client_id
231215
else:
232216
# assign a unique client_id
233-
time_int = int(self.get_monotonic_time() * 100) % 1000
217+
time_int = int(ticks_ms() / 10) % 1000
234218
self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}"
235219
# generated client_id's enforce spec.'s length rules
236220
if len(self.client_id.encode("utf-8")) > 23 or not self.client_id:
@@ -254,17 +238,6 @@ def __init__(
254238
self.on_subscribe = None
255239
self.on_unsubscribe = None
256240

257-
def get_monotonic_time(self) -> float:
258-
"""
259-
Provide monotonic time in seconds. Based on underlying implementation
260-
this might result in imprecise time, that will result in the library
261-
not being able to operate if running contiguously for more than 24 days or so.
262-
"""
263-
if self.use_monotonic_ns:
264-
return time.monotonic_ns() / 1000000000
265-
266-
return time.monotonic()
267-
268241
def __enter__(self):
269242
return self
270243

@@ -546,9 +519,9 @@ def _connect(
546519
if self._username is not None:
547520
self._send_str(self._username)
548521
self._send_str(self._password)
549-
self._last_msg_sent_timestamp = self.get_monotonic_time()
522+
self._last_msg_sent_timestamp = ticks_ms()
550523
self.logger.debug("Receiving CONNACK packet from broker")
551-
stamp = self.get_monotonic_time()
524+
stamp = ticks_ms()
552525
while True:
553526
op = self._wait_for_msg()
554527
if op == 32:
@@ -564,7 +537,7 @@ def _connect(
564537
return result
565538

566539
if op is None:
567-
if self.get_monotonic_time() - stamp > self._recv_timeout:
540+
if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout:
568541
raise MMQTTException(
569542
f"No data received from broker for {self._recv_timeout} seconds."
570543
)
@@ -618,15 +591,16 @@ def ping(self) -> list[int]:
618591
self._connected()
619592
self.logger.debug("Sending PINGREQ")
620593
self._sock.send(MQTT_PINGREQ)
621-
ping_timeout = self._recv_timeout
622-
stamp = self.get_monotonic_time()
594+
ping_timeout = self.keep_alive
595+
stamp = ticks_ms()
596+
623597
self._last_msg_sent_timestamp = stamp
624598
rc, rcs = None, []
625599
while rc != MQTT_PINGRESP:
626600
rc = self._wait_for_msg()
627601
if rc:
628602
rcs.append(rc)
629-
if self.get_monotonic_time() - stamp > ping_timeout:
603+
if ticks_diff(ticks_ms(), stamp) / 1000 > ping_timeout:
630604
raise MMQTTException(
631605
f"PINGRESP not returned from broker within {ping_timeout} seconds."
632606
)
@@ -697,11 +671,11 @@ def publish(
697671
self._sock.send(pub_hdr_fixed)
698672
self._sock.send(pub_hdr_var)
699673
self._sock.send(msg)
700-
self._last_msg_sent_timestamp = self.get_monotonic_time()
674+
self._last_msg_sent_timestamp = ticks_ms()
701675
if qos == 0 and self.on_publish is not None:
702676
self.on_publish(self, self.user_data, topic, self._pid)
703677
if qos == 1:
704-
stamp = self.get_monotonic_time()
678+
stamp = ticks_ms()
705679
while True:
706680
op = self._wait_for_msg()
707681
if op == 0x40:
@@ -715,7 +689,7 @@ def publish(
715689
return
716690

717691
if op is None:
718-
if self.get_monotonic_time() - stamp > self._recv_timeout:
692+
if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout:
719693
raise MMQTTException(
720694
f"No data received from broker for {self._recv_timeout} seconds."
721695
)
@@ -774,12 +748,12 @@ def subscribe(self, topic: Optional[Union[tuple, str, list]], qos: int = 0) -> N
774748
self.logger.debug(f"SUBSCRIBING to topic {t} with QoS {q}")
775749
self.logger.debug(f"payload: {payload}")
776750
self._sock.send(payload)
777-
stamp = self.get_monotonic_time()
751+
stamp = ticks_ms()
778752
self._last_msg_sent_timestamp = stamp
779753
while True:
780754
op = self._wait_for_msg()
781755
if op is None:
782-
if self.get_monotonic_time() - stamp > self._recv_timeout:
756+
if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout:
783757
raise MMQTTException(
784758
f"No data received from broker for {self._recv_timeout} seconds."
785759
)
@@ -851,13 +825,13 @@ def unsubscribe(self, topic: Optional[Union[str, list]]) -> None:
851825
for t in topics:
852826
self.logger.debug(f"UNSUBSCRIBING from topic {t}")
853827
self._sock.send(payload)
854-
self._last_msg_sent_timestamp = self.get_monotonic_time()
828+
self._last_msg_sent_timestamp = ticks_ms()
855829
self.logger.debug("Waiting for UNSUBACK...")
856830
while True:
857-
stamp = self.get_monotonic_time()
831+
stamp = ticks_ms()
858832
op = self._wait_for_msg()
859833
if op is None:
860-
if self.get_monotonic_time() - stamp > self._recv_timeout:
834+
if ticks_diff(ticks_ms(), stamp) / 1000 > self._recv_timeout:
861835
raise MMQTTException(
862836
f"No data received from broker for {self._recv_timeout} seconds."
863837
)
@@ -957,12 +931,12 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
957931
self._connected()
958932
self.logger.debug(f"waiting for messages for {timeout} seconds")
959933

960-
stamp = self.get_monotonic_time()
934+
stamp = ticks_ms()
961935
rcs = []
962936

963937
while True:
964938
if (
965-
self.get_monotonic_time() - self._last_msg_sent_timestamp
939+
ticks_diff(ticks_ms(), self._last_msg_sent_timestamp) / 1000
966940
>= self.keep_alive
967941
):
968942
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
@@ -972,22 +946,21 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
972946
rcs.extend(self.ping())
973947
# ping() itself contains a _wait_for_msg() loop which might have taken a while,
974948
# so check here as well.
975-
if self.get_monotonic_time() - stamp > timeout:
949+
if ticks_diff(ticks_ms(), stamp) / 1000 > timeout:
976950
self.logger.debug(f"Loop timed out after {timeout} seconds")
977951
break
978952

979953
rc = self._wait_for_msg()
980954
if rc is not None:
981955
rcs.append(rc)
982-
if self.get_monotonic_time() - stamp > timeout:
956+
if ticks_diff(ticks_ms(), stamp) / 1000 > timeout:
983957
self.logger.debug(f"Loop timed out after {timeout} seconds")
984958
break
985959

986960
return rcs if rcs else None
987961

988962
def _wait_for_msg(self, timeout: Optional[float] = None) -> Optional[int]:
989963
# pylint: disable = too-many-return-statements
990-
991964
"""Reads and processes network events.
992965
Return the packet type or None if there is nothing to be received.
993966
@@ -1086,7 +1059,7 @@ def _sock_exact_recv(
10861059
:param float timeout: timeout, in seconds. Defaults to keep_alive
10871060
:return: byte array
10881061
"""
1089-
stamp = self.get_monotonic_time()
1062+
stamp = ticks_ms()
10901063
if not self._backwards_compatible_sock:
10911064
# CPython, socketpool, esp32spi, wiznet5k
10921065
rc = bytearray(bufsize)
@@ -1101,7 +1074,7 @@ def _sock_exact_recv(
11011074
recv_len = self._sock.recv_into(mv, to_read)
11021075
to_read -= recv_len
11031076
mv = mv[recv_len:]
1104-
if self.get_monotonic_time() - stamp > read_timeout:
1077+
if ticks_diff(ticks_ms(), stamp) / 1000 > read_timeout:
11051078
raise MMQTTException(
11061079
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
11071080
)
@@ -1121,7 +1094,7 @@ def _sock_exact_recv(
11211094
recv = self._sock.recv(to_read)
11221095
to_read -= len(recv)
11231096
rc += recv
1124-
if self.get_monotonic_time() - stamp > read_timeout:
1097+
if ticks_diff(ticks_ms(), stamp) / 1000 > read_timeout:
11251098
raise MMQTTException(
11261099
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
11271100
)

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
# Uncomment the below if you use native CircuitPython modules such as
2727
# digitalio, micropython and busio. List the modules you use. Without it, the
2828
# autodoc module docs will fail to generate with a warning.
29-
autodoc_mock_imports = ["micropython", "microcontroller", "random"]
29+
autodoc_mock_imports = ["microcontroller", "random"]
3030

3131

3232
intersphinx_mapping = {

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44

55
Adafruit-Blinka
66
Adafruit-Circuitpython-ConnectionManager
7+
adafruit-circuitpython-ticks

tests/test_loop.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def test_loop_basic(self) -> None:
142142
time_before = time.monotonic()
143143
timeout = random.randint(3, 8)
144144
# pylint: disable=protected-access
145-
mqtt_client._last_msg_sent_timestamp = mqtt_client.get_monotonic_time()
145+
mqtt_client._last_msg_sent_timestamp = MQTT.ticks_ms()
146146
rcs = mqtt_client.loop(timeout=timeout)
147147
time_after = time.monotonic()
148148

@@ -220,10 +220,10 @@ def test_loop_ping_timeout(self):
220220
mqtt_client._sock = mocket
221221

222222
start = time.monotonic()
223-
res = mqtt_client.loop(timeout=2 * keep_alive_timeout)
223+
res = mqtt_client.loop(timeout=2 * keep_alive_timeout + recv_timeout)
224224
assert time.monotonic() - start >= 2 * keep_alive_timeout
225225
assert len(mocket.sent) > 0
226-
assert len(res) == 2
226+
assert len(res) == 3
227227
assert set(res) == {int(0xD0)}
228228

229229
# pylint: disable=no-self-use

tests/test_recv_timeout.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def test_recv_timeout_vs_keepalive(self) -> None:
4949
socket_mock.recv_into.assert_called()
5050

5151
now = time.monotonic()
52-
assert recv_timeout <= (now - start) < keep_alive
52+
assert recv_timeout <= (now - start) <= (keep_alive + 0.1)
5353

5454

5555
if __name__ == "__main__":

0 commit comments

Comments
 (0)