Skip to content

use time.monotonic_ns() when available #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions adafruit_minimqtt/adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ class MQTT:
This works with all callbacks but the "on_message" and those added via add_topic_callback();
for those, to get access to the user_data use the 'user_data' member of the MQTT object
passed as 1st argument.
:param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set
this to True in order to operate correctly over more than 24 days or so

"""

# pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements, not-callable, invalid-name, no-member
# pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements,not-callable,invalid-name,no-member,too-many-locals
def __init__(
self,
*,
Expand All @@ -193,13 +195,28 @@ def __init__(
socket_timeout: int = 1,
connect_retries: int = 5,
user_data=None,
use_imprecise_time: Optional[bool] = None,
) -> None:
self._socket_pool = socket_pool
self._ssl_context = ssl_context
self._sock = None
self._backwards_compatible_sock = False
self._use_binary_mode = use_binary_mode

self.use_monotonic_ns = False
try:
time.monotonic_ns()
self.use_monotonic_ns = True
except AttributeError:
if use_imprecise_time:
self.use_monotonic_ns = False
else:
raise MMQTTException( # pylint: disable=raise-missing-from
"time.monotonic_ns() is not available. "
"Will use imprecise time however only if the"
"use_imprecise_time argument is set to True."
)

if recv_timeout <= socket_timeout:
raise MMQTTException(
"recv_timeout must be strictly greater than socket_timeout"
Expand Down Expand Up @@ -251,9 +268,8 @@ def __init__(
self.client_id = client_id
else:
# assign a unique client_id
self.client_id = (
f"cpy{randint(0, int(time.monotonic() * 100) % 1000)}{randint(0, 99)}"
)
time_int = int(self.get_monotonic_time() * 100) % 1000
self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}"
# generated client_id's enforce spec.'s length rules
if len(self.client_id.encode("utf-8")) > 23 or not self.client_id:
raise ValueError("MQTT Client ID must be between 1 and 23 bytes")
Expand All @@ -276,6 +292,17 @@ def __init__(
self.on_subscribe = None
self.on_unsubscribe = None

def get_monotonic_time(self) -> float:
"""
Provide monotonic time in seconds. Based on underlying implementation
this might result in imprecise time, that will result in the library
not being able to operate if running contiguously for more than 24 days or so.
"""
if self.use_monotonic_ns:
return time.monotonic_ns() / 1000000000

return time.monotonic()

# pylint: disable=too-many-branches
def _get_connect_socket(self, host: str, port: int, *, timeout: int = 1):
"""Obtains a new socket and connects to a broker.
Expand Down Expand Up @@ -636,7 +663,7 @@ def _connect(
self._send_str(self._username)
self._send_str(self._password)
self.logger.debug("Receiving CONNACK packet from broker")
stamp = time.monotonic()
stamp = self.get_monotonic_time()
while True:
op = self._wait_for_msg()
if op == 32:
Expand All @@ -652,7 +679,7 @@ def _connect(
return result

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -681,13 +708,13 @@ def ping(self) -> list[int]:
self.logger.debug("Sending PINGREQ")
self._sock.send(MQTT_PINGREQ)
ping_timeout = self.keep_alive
stamp = time.monotonic()
stamp = self.get_monotonic_time()
rc, rcs = None, []
while rc != MQTT_PINGRESP:
rc = self._wait_for_msg()
if rc:
rcs.append(rc)
if time.monotonic() - stamp > ping_timeout:
if self.get_monotonic_time() - stamp > ping_timeout:
raise MMQTTException("PINGRESP not returned from broker.")
return rcs

Expand Down Expand Up @@ -768,7 +795,7 @@ def publish(
if qos == 0 and self.on_publish is not None:
self.on_publish(self, self.user_data, topic, self._pid)
if qos == 1:
stamp = time.monotonic()
stamp = self.get_monotonic_time()
while True:
op = self._wait_for_msg()
if op == 0x40:
Expand All @@ -782,7 +809,7 @@ def publish(
return

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -834,11 +861,11 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
for t, q in topics:
self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q)
self._sock.send(packet)
stamp = time.monotonic()
stamp = self.get_monotonic_time()
while True:
op = self._wait_for_msg()
if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -901,10 +928,10 @@ def unsubscribe(self, topic: str) -> None:
self._sock.send(packet)
self.logger.debug("Waiting for UNSUBACK...")
while True:
stamp = time.monotonic()
stamp = self.get_monotonic_time()
op = self._wait_for_msg()
if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -998,8 +1025,8 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
self._connected()
self.logger.debug(f"waiting for messages for {timeout} seconds")
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
self._timestamp = self.get_monotonic_time()
current_time = self.get_monotonic_time()
if current_time - self._timestamp >= self.keep_alive:
self._timestamp = 0
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
Expand All @@ -1009,14 +1036,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
rcs = self.ping()
return rcs

stamp = time.monotonic()
stamp = self.get_monotonic_time()
rcs = []

while True:
rc = self._wait_for_msg()
if rc is not None:
rcs.append(rc)
if time.monotonic() - stamp > timeout:
if self.get_monotonic_time() - stamp > timeout:
self.logger.debug(f"Loop timed out after {timeout} seconds")
break

Expand Down Expand Up @@ -1115,7 +1142,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
:param int bufsize: number of bytes to receive
:return: byte array
"""
stamp = time.monotonic()
stamp = self.get_monotonic_time()
if not self._backwards_compatible_sock:
# CPython/Socketpool Impl.
rc = bytearray(bufsize)
Expand All @@ -1130,7 +1157,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
recv_len = self._sock.recv_into(mv, to_read)
to_read -= recv_len
mv = mv[recv_len:]
if time.monotonic() - stamp > read_timeout:
if self.get_monotonic_time() - stamp > read_timeout:
raise MMQTTException(
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
)
Expand All @@ -1150,7 +1177,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
recv = self._sock.recv(to_read)
to_read -= len(recv)
rc += recv
if time.monotonic() - stamp > read_timeout:
if self.get_monotonic_time() - stamp > read_timeout:
raise MMQTTException(
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
)
Expand Down