From 690007e87e489a2bafd2b7875e341e253f0312fc Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Mon, 30 Oct 2023 22:40:19 +0100 Subject: [PATCH] use time.monotonic_ns() when available otherwise fall back to time.monotonic() however only if forced fixes #176 --- adafruit_minimqtt/adafruit_minimqtt.py | 69 ++++++++++++++++++-------- 1 file changed, 48 insertions(+), 21 deletions(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index cbeb8f6c..f15980d0 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -169,10 +169,12 @@ class MQTT: :param int connect_retries: How many times to try to connect to the broker before giving up on connect or reconnect. Exponential backoff will be used for the retries. :param class user_data: arbitrary data to pass as a second argument to the callbacks. + :param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set + this to True in order to operate correctly over more than 24 days or so """ - # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements, not-callable, invalid-name, no-member + # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements,not-callable,invalid-name,no-member,too-many-locals def __init__( self, *, @@ -190,6 +192,7 @@ def __init__( socket_timeout: int = 1, connect_retries: int = 5, user_data=None, + use_imprecise_time: Optional[bool] = None, ) -> None: self._socket_pool = socket_pool self._ssl_context = ssl_context @@ -197,6 +200,20 @@ def __init__( self._backwards_compatible_sock = False self._use_binary_mode = use_binary_mode + self.use_monotonic_ns = False + try: + time.monotonic_ns() + self.use_monotonic_ns = True + except AttributeError: + if use_imprecise_time: + self.use_monotonic_ns = False + else: + raise MMQTTException( # pylint: disable=raise-missing-from + "time.monotonic_ns() is not available. " + "Will use imprecise time however only if the" + "use_imprecise_time argument is set to True." + ) + if recv_timeout <= socket_timeout: raise MMQTTException( "recv_timeout must be strictly greater than socket_timeout" @@ -248,9 +265,8 @@ def __init__( self.client_id = client_id else: # assign a unique client_id - self.client_id = ( - f"cpy{randint(0, int(time.monotonic() * 100) % 1000)}{randint(0, 99)}" - ) + time_int = int(self.get_monotonic_time() * 100) % 1000 + self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}" # generated client_id's enforce spec.'s length rules if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: raise ValueError("MQTT Client ID must be between 1 and 23 bytes") @@ -273,6 +289,17 @@ def __init__( self.on_subscribe = None self.on_unsubscribe = None + def get_monotonic_time(self) -> float: + """ + Provide monotonic time in seconds. Based on underlying implementation + this might result in imprecise time, that will result in the library + not being able to operate if running contiguously for more than 24 days or so. + """ + if self.use_monotonic_ns: + return time.monotonic_ns() / 1000000000 + + return time.monotonic() + # pylint: disable=too-many-branches def _get_connect_socket(self, host: str, port: int, *, timeout: int = 1): """Obtains a new socket and connects to a broker. @@ -627,7 +654,7 @@ def _connect( self._send_str(self._username) self._send_str(self._password) self.logger.debug("Receiving CONNACK packet from broker") - stamp = time.monotonic() + stamp = self.get_monotonic_time() while True: op = self._wait_for_msg() if op == 32: @@ -643,7 +670,7 @@ def _connect( return result if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -672,13 +699,13 @@ def ping(self) -> list[int]: self.logger.debug("Sending PINGREQ") self._sock.send(MQTT_PINGREQ) ping_timeout = self.keep_alive - stamp = time.monotonic() + stamp = self.get_monotonic_time() rc, rcs = None, [] while rc != MQTT_PINGRESP: rc = self._wait_for_msg() if rc: rcs.append(rc) - if time.monotonic() - stamp > ping_timeout: + if self.get_monotonic_time() - stamp > ping_timeout: raise MMQTTException("PINGRESP not returned from broker.") return rcs @@ -759,7 +786,7 @@ def publish( if qos == 0 and self.on_publish is not None: self.on_publish(self, self._user_data, topic, self._pid) if qos == 1: - stamp = time.monotonic() + stamp = self.get_monotonic_time() while True: op = self._wait_for_msg() if op == 0x40: @@ -773,7 +800,7 @@ def publish( return if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -825,11 +852,11 @@ def subscribe(self, topic: str, qos: int = 0) -> None: for t, q in topics: self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q) self._sock.send(packet) - stamp = time.monotonic() + stamp = self.get_monotonic_time() while True: op = self._wait_for_msg() if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -892,10 +919,10 @@ def unsubscribe(self, topic: str) -> None: self._sock.send(packet) self.logger.debug("Waiting for UNSUBACK...") while True: - stamp = time.monotonic() + stamp = self.get_monotonic_time() op = self._wait_for_msg() if op is None: - if time.monotonic() - stamp > self._recv_timeout: + if self.get_monotonic_time() - stamp > self._recv_timeout: raise MMQTTException( f"No data received from broker for {self._recv_timeout} seconds." ) @@ -989,8 +1016,8 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: self.logger.debug(f"waiting for messages for {timeout} seconds") if self._timestamp == 0: - self._timestamp = time.monotonic() - current_time = time.monotonic() + self._timestamp = self.get_monotonic_time() + current_time = self.get_monotonic_time() if current_time - self._timestamp >= self.keep_alive: self._timestamp = 0 # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server @@ -1000,14 +1027,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]: rcs = self.ping() return rcs - stamp = time.monotonic() + stamp = self.get_monotonic_time() rcs = [] while True: rc = self._wait_for_msg() if rc is not None: rcs.append(rc) - if time.monotonic() - stamp > timeout: + if self.get_monotonic_time() - stamp > timeout: self.logger.debug(f"Loop timed out after {timeout} seconds") break @@ -1106,7 +1133,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray: :param int bufsize: number of bytes to receive :return: byte array """ - stamp = time.monotonic() + stamp = self.get_monotonic_time() if not self._backwards_compatible_sock: # CPython/Socketpool Impl. rc = bytearray(bufsize) @@ -1121,7 +1148,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray: recv_len = self._sock.recv_into(mv, to_read) to_read -= recv_len mv = mv[recv_len:] - if time.monotonic() - stamp > read_timeout: + if self.get_monotonic_time() - stamp > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." ) @@ -1141,7 +1168,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray: recv = self._sock.recv(to_read) to_read -= len(recv) rc += recv - if time.monotonic() - stamp > read_timeout: + if self.get_monotonic_time() - stamp > read_timeout: raise MMQTTException( f"Unable to receive {to_read} bytes within {read_timeout} seconds." )