Skip to content

Commit f69a453

Browse files
committed
avoid endless loop when waiting for data from broker
fixes #115
1 parent 75ca1c5 commit f69a453

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
# Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt
77
# Modified Work Copyright (c) 2012-2019 Roger Light and others, Paho MQTT Python
88

9+
# pylint: disable=too-many-lines
10+
911
"""
1012
`adafruit_minimqtt`
1113
================================================================================
@@ -126,6 +128,7 @@ class MQTT:
126128
:param str client_id: Optional client identifier, defaults to a unique, generated string.
127129
:param bool is_ssl: Sets a secure or insecure connection with the broker.
128130
:param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client.
131+
:param int recv_timeout: receive timeout, in seconds.
129132
:param socket socket_pool: A pool of socket resources available for the given radio.
130133
:param ssl_context: SSL context for long-lived SSL connections.
131134
:param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks.
@@ -142,6 +145,7 @@ def __init__(
142145
client_id=None,
143146
is_ssl=True,
144147
keep_alive=60,
148+
recv_timeout=10,
145149
socket_pool=None,
146150
ssl_context=None,
147151
use_binary_mode=False,
@@ -154,6 +158,7 @@ def __init__(
154158
self._use_binary_mode = use_binary_mode
155159

156160
self.keep_alive = keep_alive
161+
self._recv_timeout = recv_timeout
157162
self._user_data = None
158163
self._is_connected = False
159164
self._msg_size_lim = MQTT_MSG_SZ_LIM
@@ -514,6 +519,7 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
514519
self._send_str(self._password)
515520
if self.logger is not None:
516521
self.logger.debug("Receiving CONNACK packet from broker")
522+
stamp = time.monotonic()
517523
while True:
518524
op = self._wait_for_msg()
519525
if op == 32:
@@ -527,6 +533,12 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
527533
self.on_connect(self, self._user_data, result, rc[2])
528534
return result
529535

536+
if op is None:
537+
if time.monotonic() - stamp > self._recv_timeout:
538+
raise MMQTTException(
539+
f"No data received from broker for {self._recv_timeout} seconds."
540+
)
541+
530542
def disconnect(self):
531543
"""Disconnects the MiniMQTT client from the MQTT broker."""
532544
self.is_connected()
@@ -637,6 +649,7 @@ def publish(self, topic, msg, retain=False, qos=0):
637649
if qos == 0 and self.on_publish is not None:
638650
self.on_publish(self, self._user_data, topic, self._pid)
639651
if qos == 1:
652+
stamp = time.monotonic()
640653
while True:
641654
op = self._wait_for_msg()
642655
if op == 0x40:
@@ -649,6 +662,12 @@ def publish(self, topic, msg, retain=False, qos=0):
649662
self.on_publish(self, self._user_data, topic, rcv_pid)
650663
return
651664

665+
if op is None:
666+
if time.monotonic() - stamp > self._recv_timeout:
667+
raise MMQTTException(
668+
f"No data received from broker for {self._recv_timeout} seconds."
669+
)
670+
652671
def subscribe(self, topic, qos=0):
653672
"""Subscribes to a topic on the MQTT Broker.
654673
This method can subscribe to one topics or multiple topics.
@@ -697,6 +716,7 @@ def subscribe(self, topic, qos=0):
697716
for t, q in topics:
698717
self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q)
699718
self._sock.send(packet)
719+
stamp = time.monotonic()
700720
while True:
701721
op = self._wait_for_msg()
702722
if op == 0x90:
@@ -710,6 +730,12 @@ def subscribe(self, topic, qos=0):
710730
self._subscribed_topics.append(t)
711731
return
712732

733+
if op is None:
734+
if time.monotonic() - stamp > self._recv_timeout:
735+
raise MMQTTException(
736+
f"No data received from broker for {self._recv_timeout} seconds."
737+
)
738+
713739
def unsubscribe(self, topic):
714740
"""Unsubscribes from a MQTT topic.
715741
@@ -747,6 +773,7 @@ def unsubscribe(self, topic):
747773
if self.logger is not None:
748774
self.logger.debug("Waiting for UNSUBACK...")
749775
while True:
776+
stamp = time.monotonic()
750777
op = self._wait_for_msg()
751778
if op == 176:
752779
rc = self._sock_exact_recv(3)
@@ -759,6 +786,12 @@ def unsubscribe(self, topic):
759786
self._subscribed_topics.remove(t)
760787
return
761788

789+
if op is None:
790+
if time.monotonic() - stamp > self._recv_timeout:
791+
raise MMQTTException(
792+
f"No data received from broker for {self._recv_timeout} seconds."
793+
)
794+
762795
def reconnect(self, resub_topics=True):
763796
"""Attempts to reconnect to the MQTT broker.
764797

0 commit comments

Comments
 (0)