From d478ccabe2f9d72cc0d927982404d5f3646c43fe Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Wed, 17 Aug 2022 21:43:36 +0200 Subject: [PATCH 1/3] avoid endless loop when waiting for data from broker fixes #115 --- adafruit_minimqtt/adafruit_minimqtt.py | 31 ++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 56673aad..9b1d54a2 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -128,6 +128,7 @@ class MQTT: :param str client_id: Optional client identifier, defaults to a unique, generated string. :param bool is_ssl: Sets a secure or insecure connection with the broker. :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client. + :param int recv_timeout: receive timeout, in seconds. :param socket socket_pool: A pool of socket resources available for the given radio. :param ssl_context: SSL context for long-lived SSL connections. :param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks. @@ -146,6 +147,7 @@ def __init__( client_id=None, is_ssl=True, keep_alive=60, + recv_timeout=10, socket_pool=None, ssl_context=None, use_binary_mode=False, @@ -160,6 +162,7 @@ def __init__( self._socket_timeout = socket_timeout self.keep_alive = keep_alive + self._recv_timeout = recv_timeout self._user_data = None self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM @@ -522,6 +525,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None): self._send_str(self._password) if self.logger is not None: self.logger.debug("Receiving CONNACK packet from broker") + stamp = time.monotonic() while True: op = self._wait_for_msg() if op == 32: @@ -535,6 +539,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None): self.on_connect(self, self._user_data, result, rc[2]) return result + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def disconnect(self): """Disconnects the MiniMQTT client from the MQTT broker.""" self.is_connected() @@ -645,6 +655,7 @@ def publish(self, topic, msg, retain=False, qos=0): if qos == 0 and self.on_publish is not None: self.on_publish(self, self._user_data, topic, self._pid) if qos == 1: + stamp = time.monotonic() while True: op = self._wait_for_msg() if op == 0x40: @@ -657,6 +668,12 @@ def publish(self, topic, msg, retain=False, qos=0): self.on_publish(self, self._user_data, topic, rcv_pid) return + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def subscribe(self, topic, qos=0): """Subscribes to a topic on the MQTT Broker. This method can subscribe to one topics or multiple topics. @@ -705,6 +722,7 @@ def subscribe(self, topic, qos=0): for t, q in topics: self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q) self._sock.send(packet) + stamp = time.monotonic() while True: op = self._wait_for_msg() if op == 0x90: @@ -718,6 +736,12 @@ def subscribe(self, topic, qos=0): self._subscribed_topics.append(t) return + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def unsubscribe(self, topic): """Unsubscribes from a MQTT topic. @@ -755,6 +779,7 @@ def unsubscribe(self, topic): if self.logger is not None: self.logger.debug("Waiting for UNSUBACK...") while True: + stamp = time.monotonic() op = self._wait_for_msg() if op == 176: rc = self._sock_exact_recv(3) @@ -767,6 +792,12 @@ def unsubscribe(self, topic): self._subscribed_topics.remove(t) return + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def reconnect(self, resub_topics=True): """Attempts to reconnect to the MQTT broker. From b00ab42e88477d46ce8f7fd84291cc94a6f889f4 Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Thu, 18 Aug 2022 18:09:59 +0200 Subject: [PATCH 2/3] check socket timeout against receive timeout --- adafruit_minimqtt/adafruit_minimqtt.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 9b1d54a2..9f96eb35 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -159,10 +159,13 @@ def __init__( self._sock = None self._backwards_compatible_sock = False self._use_binary_mode = use_binary_mode + + if recv_timeout <= socket_timeout: + raise MMQTTException("recv_timeout must be strictly greater than socket_timeout") self._socket_timeout = socket_timeout + self._recv_timeout = recv_timeout self.keep_alive = keep_alive - self._recv_timeout = recv_timeout self._user_data = None self._is_connected = False self._msg_size_lim = MQTT_MSG_SZ_LIM From c0085aa5c365e29e9984de9f0517d64b76acdc6d Mon Sep 17 00:00:00 2001 From: Vladimir Kotal Date: Thu, 18 Aug 2022 18:16:34 +0200 Subject: [PATCH 3/3] apply black --- adafruit_minimqtt/adafruit_minimqtt.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 9f96eb35..d1d8a56c 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -161,7 +161,9 @@ def __init__( self._use_binary_mode = use_binary_mode if recv_timeout <= socket_timeout: - raise MMQTTException("recv_timeout must be strictly greater than socket_timeout") + raise MMQTTException( + "recv_timeout must be strictly greater than socket_timeout" + ) self._socket_timeout = socket_timeout self._recv_timeout = recv_timeout