diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 56673aad..d1d8a56c 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -128,6 +128,7 @@ class MQTT: :param str client_id: Optional client identifier, defaults to a unique, generated string. :param bool is_ssl: Sets a secure or insecure connection with the broker. :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client. + :param int recv_timeout: receive timeout, in seconds. :param socket socket_pool: A pool of socket resources available for the given radio. :param ssl_context: SSL context for long-lived SSL connections. :param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks. @@ -146,6 +147,7 @@ def __init__( client_id=None, is_ssl=True, keep_alive=60, + recv_timeout=10, socket_pool=None, ssl_context=None, use_binary_mode=False, @@ -157,7 +159,13 @@ def __init__( self._sock = None self._backwards_compatible_sock = False self._use_binary_mode = use_binary_mode + + if recv_timeout <= socket_timeout: + raise MMQTTException( + "recv_timeout must be strictly greater than socket_timeout" + ) self._socket_timeout = socket_timeout + self._recv_timeout = recv_timeout self.keep_alive = keep_alive self._user_data = None @@ -522,6 +530,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None): self._send_str(self._password) if self.logger is not None: self.logger.debug("Receiving CONNACK packet from broker") + stamp = time.monotonic() while True: op = self._wait_for_msg() if op == 32: @@ -535,6 +544,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None): self.on_connect(self, self._user_data, result, rc[2]) return result + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def disconnect(self): """Disconnects the MiniMQTT client from the MQTT broker.""" self.is_connected() @@ -645,6 +660,7 @@ def publish(self, topic, msg, retain=False, qos=0): if qos == 0 and self.on_publish is not None: self.on_publish(self, self._user_data, topic, self._pid) if qos == 1: + stamp = time.monotonic() while True: op = self._wait_for_msg() if op == 0x40: @@ -657,6 +673,12 @@ def publish(self, topic, msg, retain=False, qos=0): self.on_publish(self, self._user_data, topic, rcv_pid) return + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def subscribe(self, topic, qos=0): """Subscribes to a topic on the MQTT Broker. This method can subscribe to one topics or multiple topics. @@ -705,6 +727,7 @@ def subscribe(self, topic, qos=0): for t, q in topics: self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q) self._sock.send(packet) + stamp = time.monotonic() while True: op = self._wait_for_msg() if op == 0x90: @@ -718,6 +741,12 @@ def subscribe(self, topic, qos=0): self._subscribed_topics.append(t) return + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def unsubscribe(self, topic): """Unsubscribes from a MQTT topic. @@ -755,6 +784,7 @@ def unsubscribe(self, topic): if self.logger is not None: self.logger.debug("Waiting for UNSUBACK...") while True: + stamp = time.monotonic() op = self._wait_for_msg() if op == 176: rc = self._sock_exact_recv(3) @@ -767,6 +797,12 @@ def unsubscribe(self, topic): self._subscribed_topics.remove(t) return + if op is None: + if time.monotonic() - stamp > self._recv_timeout: + raise MMQTTException( + f"No data received from broker for {self._recv_timeout} seconds." + ) + def reconnect(self, resub_topics=True): """Attempts to reconnect to the MQTT broker.