Skip to content

Commit c9fdc40

Browse files
Fix socket receive issues related to message buffer size
This change addresses a few issues in the handling of the MQTT messages that caused the library to become unstable: - Add wapper for socket.recv() so that an exact number of bytes are read into the buffer before attempting to parse the MQTT message; - Fix handling of ping response packets as part of _wait_for_msg(), together with all other MQTT messages; - Fix disconnect so it can gracefully handle cases when socket writes are not possible. Also re-init _subscribed_topics as an empty list instead of None. Related-to adafruit/Adafruit_CircuitPython_ESP32SPI#102 Fixes adafruit/Adafruit_CircuitPython_PyPortal#98 Fixes adafruit#54 Signed-off-by: Flavio Fernandes <[email protected]>
1 parent 682de96 commit c9fdc40

File tree

1 file changed

+66
-26
lines changed

1 file changed

+66
-26
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

+66-26
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def __init__(
166166
self.logger.setLevel(logging.INFO)
167167
self._sock = None
168168
self._is_connected = False
169+
self._pending_ping_response = False
169170
self._msg_size_lim = MQTT_MSG_SZ_LIM
170171
self._pid = 0
171172
self._timestamp = 0
@@ -192,6 +193,35 @@ def __enter__(self):
192193
def __exit__(self, exception_type, exception_value, traceback):
193194
self.deinit()
194195

196+
def _sock_exact_recv(self, bufsize):
197+
"""Reads _exact_ number of bytes from the connected socket. Will only return
198+
string with the exact number of bytes requested.
199+
200+
The semantics of native socket receive is that it returns no more than the
201+
specified number of bytes (i.e. max size). However, it makes no guarantees in
202+
terms of the minimum size of the buffer, which could be 1 byte. This is a
203+
wrapper for socket recv() to ensure that no less than the expected number of
204+
bytes is returned or trigger a timeout exception.
205+
206+
:param int bufsize: number of bytes to receive
207+
"""
208+
stamp = time.monotonic()
209+
rc = self._sock.recv(bufsize)
210+
to_read = bufsize - len(rc)
211+
assert to_read >= 0
212+
read_timeout = min(self.keep_alive, self._sock._timeout)
213+
while to_read > 0:
214+
recv = self._sock.recv(to_read)
215+
to_read -= len(recv)
216+
rc += recv
217+
if time.monotonic() - stamp > read_timeout:
218+
raise MMQTTException(
219+
"Unable to receive {} bytes within {} seconds.".format(
220+
to_read, read_timeout
221+
)
222+
)
223+
return rc
224+
195225
def deinit(self):
196226
"""De-initializes the MQTT client and disconnects from the mqtt broker."""
197227
self.disconnect()
@@ -368,7 +398,7 @@ def connect(self, clean_session=True):
368398
while True:
369399
op = self._wait_for_msg()
370400
if op == 32:
371-
rc = self._sock.recv(3)
401+
rc = self._sock_exact_recv(3)
372402
assert rc[0] == 0x02
373403
if rc[2] != 0x00:
374404
raise MMQTTException(CONNACK_ERRORS[rc[2]])
@@ -379,17 +409,20 @@ def connect(self, clean_session=True):
379409
return result
380410

381411
def disconnect(self):
382-
"""Disconnects the MiniMQTT client from the MQTT broker.
383-
"""
412+
"""Disconnects the MiniMQTT client from the MQTT broker."""
384413
self.is_connected()
385414
if self.logger is not None:
386415
self.logger.debug("Sending DISCONNECT packet to broker")
387-
self._sock.send(MQTT_DISCONNECT)
416+
try:
417+
self._sock.send(MQTT_DISCONNECT)
418+
except RuntimeError as e:
419+
if self.logger:
420+
self.logger.warning("Unable to send DISCONNECT packet: {}".format(e))
388421
if self.logger is not None:
389422
self.logger.debug("Closing socket")
390423
self._sock.close()
391424
self._is_connected = False
392-
self._subscribed_topics = None
425+
self._subscribed_topics = []
393426
if self.on_disconnect is not None:
394427
self.on_disconnect(self, self.user_data, 0)
395428

@@ -398,18 +431,15 @@ def ping(self):
398431
there is an active network connection.
399432
"""
400433
self.is_connected()
434+
if self._pending_ping_response:
435+
self._pending_ping_response = False
436+
raise MMQTTException("Ping response was pending from previous MQTT_PINGREQ")
401437
if self.logger is not None:
402438
self.logger.debug("Sending PINGREQ")
403439
self._sock.send(MQTT_PINGREQ)
404-
if self.logger is not None:
405-
self.logger.debug("Checking PINGRESP")
406-
while True:
407-
op = self._wait_for_msg(0.5)
408-
if op == 208:
409-
ping_resp = self._sock.recv(2)
410-
if ping_resp[0] != 0x00:
411-
raise MMQTTException("PINGRESP not returned from broker.")
412-
return
440+
# Set pending ping response. It will be checked upon next ping and
441+
# assumed to be cleared via _wait_for_msg()
442+
self._pending_ping_response = True
413443

414444
# pylint: disable=too-many-branches, too-many-statements
415445
def publish(self, topic, msg, retain=False, qos=0):
@@ -504,9 +534,9 @@ def publish(self, topic, msg, retain=False, qos=0):
504534
while True:
505535
op = self._wait_for_msg()
506536
if op == 0x40:
507-
sz = self._sock.recv(1)
537+
sz = self._sock_exact_recv(1)
508538
assert sz == b"\x02"
509-
rcv_pid = self._sock.recv(2)
539+
rcv_pid = self._sock_exact_recv(2)
510540
rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1]
511541
if pid == rcv_pid:
512542
if self.on_publish is not None:
@@ -589,7 +619,7 @@ def subscribe(self, topic, qos=0):
589619
while True:
590620
op = self._wait_for_msg()
591621
if op == 0x90:
592-
rc = self._sock.recv(4)
622+
rc = self._sock_exact_recv(4)
593623
assert rc[1] == packet[2] and rc[2] == packet[3]
594624
if rc[3] == 0x80:
595625
raise MMQTTException("SUBACK Failure!")
@@ -652,7 +682,7 @@ def unsubscribe(self, topic):
652682
while True:
653683
op = self._wait_for_msg()
654684
if op == 176:
655-
return_code = self._sock.recv(3)
685+
return_code = self._sock_exact_recv(3)
656686
assert return_code[0] == 0x02
657687
# [MQTT-3.32]
658688
assert (
@@ -709,27 +739,37 @@ def _wait_for_msg(self, timeout=30):
709739
"""Reads and processes network events.
710740
Returns response code if successful.
711741
"""
742+
# Check if there are any messages by reading 1 byte from socket
743+
# This is a place where it is okay to have 'less' than what we asked.
712744
res = self._sock.recv(1)
713745
self._sock.settimeout(timeout)
714746
if res in [None, b""]:
747+
# If we get here, it means that there is nothing to be received
715748
return None
716-
if res == MQTT_PINGRESP:
717-
sz = self._sock.recv(1)[0]
718-
assert sz == 0
749+
if res[0] == MQTT_PINGRESP:
750+
if self.logger:
751+
self.logger.debug("Checking PINGRESP")
752+
sz = self._sock_exact_recv(1)[0]
753+
if sz != 0x00:
754+
raise MMQTTException(
755+
"Unexpected PINGRESP returned from broker: {}.".format(sz)
756+
)
757+
# Ping response is no longer pending
758+
self._pending_ping_response = False
719759
return None
720760
if res[0] & 0xF0 != 0x30:
721761
return res[0]
722762
sz = self._recv_len()
723-
topic_len = self._sock.recv(2)
763+
topic_len = self._sock_exact_recv(2)
724764
topic_len = (topic_len[0] << 8) | topic_len[1]
725-
topic = self._sock.recv(topic_len)
765+
topic = self._sock_exact_recv(topic_len)
726766
topic = str(topic, "utf-8")
727767
sz -= topic_len + 2
728768
if res[0] & 0x06:
729-
pid = self._sock.recv(2)
769+
pid = self._sock_exact_recv(2)
730770
pid = pid[0] << 0x08 | pid[1]
731771
sz -= 0x02
732-
msg = self._sock.recv(sz)
772+
msg = self._sock_exact_recv(sz)
733773
self._handle_on_message(self, topic, str(msg, "utf-8"))
734774
if res[0] & 0x06 == 0x02:
735775
pkt = bytearray(b"\x40\x02\0\0")
@@ -743,7 +783,7 @@ def _recv_len(self):
743783
n = 0
744784
sh = 0
745785
while True:
746-
b = self._sock.recv(1)[0]
786+
b = self._sock_exact_recv(1)[0]
747787
n |= (b & 0x7F) << sh
748788
if not b & 0x80:
749789
return n

0 commit comments

Comments
 (0)