Skip to content

Commit 42f13a8

Browse files
authored
Merge pull request adafruit#117 from vladak/recv_timeout
avoid endless loop when waiting for data from broker
2 parents ec8b60b + 26633b9 commit 42f13a8

File tree

1 file changed

+36
-0
lines changed

1 file changed

+36
-0
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class MQTT:
128128
:param str client_id: Optional client identifier, defaults to a unique, generated string.
129129
:param bool is_ssl: Sets a secure or insecure connection with the broker.
130130
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
131+
:param int recv_timeout: receive timeout, in seconds.
131132
:param socket socket_pool: A pool of socket resources available for the given radio.
132133
:param ssl_context: SSL context for long-lived SSL connections.
133134
:param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks.
@@ -146,6 +147,7 @@ def __init__(
146147
client_id=None,
147148
is_ssl=True,
148149
keep_alive=60,
150+
recv_timeout=10,
149151
socket_pool=None,
150152
ssl_context=None,
151153
use_binary_mode=False,
@@ -157,7 +159,13 @@ def __init__(
157159
self._sock = None
158160
self._backwards_compatible_sock = False
159161
self._use_binary_mode = use_binary_mode
162+
163+
if recv_timeout <= socket_timeout:
164+
raise MMQTTException(
165+
"recv_timeout must be strictly greater than socket_timeout"
166+
)
160167
self._socket_timeout = socket_timeout
168+
self._recv_timeout = recv_timeout
161169

162170
self.keep_alive = keep_alive
163171
self._user_data = None
@@ -522,6 +530,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
522530
self._send_str(self._password)
523531
if self.logger is not None:
524532
self.logger.debug("Receiving CONNACK packet from broker")
533+
stamp = time.monotonic()
525534
while True:
526535
op = self._wait_for_msg()
527536
if op == 32:
@@ -535,6 +544,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
535544
self.on_connect(self, self._user_data, result, rc[2])
536545
return result
537546

547+
if op is None:
548+
if time.monotonic() - stamp > self._recv_timeout:
549+
raise MMQTTException(
550+
f"No data received from broker for {self._recv_timeout} seconds."
551+
)
552+
538553
def disconnect(self):
539554
"""Disconnects the MiniMQTT client from the MQTT broker."""
540555
self.is_connected()
@@ -645,6 +660,7 @@ def publish(self, topic, msg, retain=False, qos=0):
645660
if qos == 0 and self.on_publish is not None:
646661
self.on_publish(self, self._user_data, topic, self._pid)
647662
if qos == 1:
663+
stamp = time.monotonic()
648664
while True:
649665
op = self._wait_for_msg()
650666
if op == 0x40:
@@ -657,6 +673,12 @@ def publish(self, topic, msg, retain=False, qos=0):
657673
self.on_publish(self, self._user_data, topic, rcv_pid)
658674
return
659675

676+
if op is None:
677+
if time.monotonic() - stamp > self._recv_timeout:
678+
raise MMQTTException(
679+
f"No data received from broker for {self._recv_timeout} seconds."
680+
)
681+
660682
def subscribe(self, topic, qos=0):
661683
"""Subscribes to a topic on the MQTT Broker.
662684
This method can subscribe to one topics or multiple topics.
@@ -705,6 +727,7 @@ def subscribe(self, topic, qos=0):
705727
for t, q in topics:
706728
self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q)
707729
self._sock.send(packet)
730+
stamp = time.monotonic()
708731
while True:
709732
op = self._wait_for_msg()
710733
if op == 0x90:
@@ -718,6 +741,12 @@ def subscribe(self, topic, qos=0):
718741
self._subscribed_topics.append(t)
719742
return
720743

744+
if op is None:
745+
if time.monotonic() - stamp > self._recv_timeout:
746+
raise MMQTTException(
747+
f"No data received from broker for {self._recv_timeout} seconds."
748+
)
749+
721750
def unsubscribe(self, topic):
722751
"""Unsubscribes from a MQTT topic.
723752
@@ -755,6 +784,7 @@ def unsubscribe(self, topic):
755784
if self.logger is not None:
756785
self.logger.debug("Waiting for UNSUBACK...")
757786
while True:
787+
stamp = time.monotonic()
758788
op = self._wait_for_msg()
759789
if op == 176:
760790
rc = self._sock_exact_recv(3)
@@ -767,6 +797,12 @@ def unsubscribe(self, topic):
767797
self._subscribed_topics.remove(t)
768798
return
769799

800+
if op is None:
801+
if time.monotonic() - stamp > self._recv_timeout:
802+
raise MMQTTException(
803+
f"No data received from broker for {self._recv_timeout} seconds."
804+
)
805+
770806
def reconnect(self, resub_topics=True):
771807
"""Attempts to reconnect to the MQTT broker.
772808

0 commit comments

Comments
 (0)