Skip to content

Commit 1a23d7b

Browse files
Fix socket receive issues related to message buffer size
This change addresses a few issues in the handling of the MQTT messages that caused the library to become unstable: - Add wapper for socket.recv() so that an exact number of bytes are read into the buffer before attempting to parse the MQTT message; - Fix handling of ping response packets as part of _wait_for_msg(), together with all other MQTT messages; - Fix disconnect so it can gracefully handle cases when socket writes are not possible. Also re-init _subscribed_topics as an empty list instead of None. Related-to adafruit/Adafruit_CircuitPython_ESP32SPI#102 Fixes adafruit/Adafruit_CircuitPython_PyPortal#98 Fixes adafruit#54 Signed-off-by: Flavio Fernandes <[email protected]>
1 parent 682de96 commit 1a23d7b

File tree

1 file changed

+61
-24
lines changed

1 file changed

+61
-24
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

+61-24
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def __init__(
166166
self.logger.setLevel(logging.INFO)
167167
self._sock = None
168168
self._is_connected = False
169+
self._pending_ping_response = False
169170
self._msg_size_lim = MQTT_MSG_SZ_LIM
170171
self._pid = 0
171172
self._timestamp = 0
@@ -192,6 +193,32 @@ def __enter__(self):
192193
def __exit__(self, exception_type, exception_value, traceback):
193194
self.deinit()
194195

196+
def _sock_exact_recv(self, bufsize):
197+
"""Reads _exact_ number of bytes from the connected socket. Will only return
198+
string with the exact number of bytes requested.
199+
200+
The semantics of native socket receive is that it returns no more than the
201+
specified number of bytes (i.e. max size). However, it makes no guarantees in
202+
terms of the minimum size of the buffer, which could be 1 byte. This is a
203+
wrapper for socket recv() to ensure that no less than the expected number of
204+
bytes is returned or trigger a timeout exception.
205+
206+
:param int bufsize: number of bytes to receive
207+
"""
208+
stamp = time.monotonic()
209+
rc = self._sock.recv(bufsize)
210+
to_read = bufsize - len(rc)
211+
assert to_read >= 0
212+
read_timeout = min(self.keep_alive, self._sock._timeout)
213+
while to_read > 0:
214+
recv = self._sock.recv(to_read)
215+
to_read -= len(recv)
216+
rc += recv
217+
if time.monotonic() - stamp > read_timeout:
218+
raise MMQTTException("Unable to receive {} bytes within {} seconds.".format(
219+
to_read, read_timeout))
220+
return rc
221+
195222
def deinit(self):
196223
"""De-initializes the MQTT client and disconnects from the mqtt broker."""
197224
self.disconnect()
@@ -368,7 +395,7 @@ def connect(self, clean_session=True):
368395
while True:
369396
op = self._wait_for_msg()
370397
if op == 32:
371-
rc = self._sock.recv(3)
398+
rc = self._sock_exact_recv(3)
372399
assert rc[0] == 0x02
373400
if rc[2] != 0x00:
374401
raise MMQTTException(CONNACK_ERRORS[rc[2]])
@@ -384,12 +411,17 @@ def disconnect(self):
384411
self.is_connected()
385412
if self.logger is not None:
386413
self.logger.debug("Sending DISCONNECT packet to broker")
387-
self._sock.send(MQTT_DISCONNECT)
414+
try:
415+
self._sock.send(MQTT_DISCONNECT)
416+
except RuntimeError as e:
417+
if self.logger:
418+
self.logger.warning(
419+
"Unable to send DISCONNECT packet: {}".format(e))
388420
if self.logger is not None:
389421
self.logger.debug("Closing socket")
390422
self._sock.close()
391423
self._is_connected = False
392-
self._subscribed_topics = None
424+
self._subscribed_topics = []
393425
if self.on_disconnect is not None:
394426
self.on_disconnect(self, self.user_data, 0)
395427

@@ -398,18 +430,15 @@ def ping(self):
398430
there is an active network connection.
399431
"""
400432
self.is_connected()
433+
if self._pending_ping_response:
434+
self._pending_ping_response = False
435+
raise MMQTTException("Ping response was pending from previous MQTT_PINGREQ")
401436
if self.logger is not None:
402437
self.logger.debug("Sending PINGREQ")
403438
self._sock.send(MQTT_PINGREQ)
404-
if self.logger is not None:
405-
self.logger.debug("Checking PINGRESP")
406-
while True:
407-
op = self._wait_for_msg(0.5)
408-
if op == 208:
409-
ping_resp = self._sock.recv(2)
410-
if ping_resp[0] != 0x00:
411-
raise MMQTTException("PINGRESP not returned from broker.")
412-
return
439+
# Set pending ping response. I will be checked upon next ping and
440+
# assumed to be cleared via _wait_for_msg()
441+
self._pending_ping_response = True
413442

414443
# pylint: disable=too-many-branches, too-many-statements
415444
def publish(self, topic, msg, retain=False, qos=0):
@@ -504,9 +533,9 @@ def publish(self, topic, msg, retain=False, qos=0):
504533
while True:
505534
op = self._wait_for_msg()
506535
if op == 0x40:
507-
sz = self._sock.recv(1)
536+
sz = self._sock_exact_recv(1)
508537
assert sz == b"\x02"
509-
rcv_pid = self._sock.recv(2)
538+
rcv_pid = self._sock_exact_recv(2)
510539
rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1]
511540
if pid == rcv_pid:
512541
if self.on_publish is not None:
@@ -589,7 +618,7 @@ def subscribe(self, topic, qos=0):
589618
while True:
590619
op = self._wait_for_msg()
591620
if op == 0x90:
592-
rc = self._sock.recv(4)
621+
rc = self._sock_exact_recv(4)
593622
assert rc[1] == packet[2] and rc[2] == packet[3]
594623
if rc[3] == 0x80:
595624
raise MMQTTException("SUBACK Failure!")
@@ -652,7 +681,7 @@ def unsubscribe(self, topic):
652681
while True:
653682
op = self._wait_for_msg()
654683
if op == 176:
655-
return_code = self._sock.recv(3)
684+
return_code = self._sock_exact_recv(3)
656685
assert return_code[0] == 0x02
657686
# [MQTT-3.32]
658687
assert (
@@ -709,27 +738,35 @@ def _wait_for_msg(self, timeout=30):
709738
"""Reads and processes network events.
710739
Returns response code if successful.
711740
"""
741+
# Check if there are any messages by reading 1 byte from socket
742+
# This is a place where it is okay to have 'less' than what we asked.
712743
res = self._sock.recv(1)
713744
self._sock.settimeout(timeout)
714745
if res in [None, b""]:
746+
# If we get here, it means that there is nothing to be received
715747
return None
716-
if res == MQTT_PINGRESP:
717-
sz = self._sock.recv(1)[0]
718-
assert sz == 0
748+
if res[0] == MQTT_PINGRESP:
749+
if self.logger is not None:
750+
self.logger.debug("Checking PINGRESP")
751+
sz = self._sock_exact_recv(1)[0]
752+
if sz != 0x00:
753+
raise MMQTTException("Unexpected PINGRESP returned from broker: {}.".format(sz))
754+
# Ping response is no longer pending
755+
self._pending_ping_response = False
719756
return None
720757
if res[0] & 0xF0 != 0x30:
721758
return res[0]
722759
sz = self._recv_len()
723-
topic_len = self._sock.recv(2)
760+
topic_len = self._sock_exact_recv(2)
724761
topic_len = (topic_len[0] << 8) | topic_len[1]
725-
topic = self._sock.recv(topic_len)
762+
topic = self._sock_exact_recv(topic_len)
726763
topic = str(topic, "utf-8")
727764
sz -= topic_len + 2
728765
if res[0] & 0x06:
729-
pid = self._sock.recv(2)
766+
pid = self._sock_exact_recv(2)
730767
pid = pid[0] << 0x08 | pid[1]
731768
sz -= 0x02
732-
msg = self._sock.recv(sz)
769+
msg = self._sock_exact_recv(sz)
733770
self._handle_on_message(self, topic, str(msg, "utf-8"))
734771
if res[0] & 0x06 == 0x02:
735772
pkt = bytearray(b"\x40\x02\0\0")
@@ -743,7 +780,7 @@ def _recv_len(self):
743780
n = 0
744781
sh = 0
745782
while True:
746-
b = self._sock.recv(1)[0]
783+
b = self._sock_exact_recv(1)[0]
747784
n |= (b & 0x7F) << sh
748785
if not b & 0x80:
749786
return n

0 commit comments

Comments
 (0)