Skip to content

Commit b90bf88

Browse files
Fix socket receive issues related to message buffer size
This change addresses a few issues in the handling of the MQTT messages that caused the library to become unstable: - Add wapper for socket.recv() so that an exact number of bytes are read into the buffer before attempting to parse the MQTT message; - Fix handling of ping response packets as part of _wait_for_msg(), together with all other MQTT messages; - Fix disconnect so it can gracefully handle cases when socket writes are not possible. Also re-init _subscribed_topics as an empty list instead of None. Related-to adafruit/Adafruit_CircuitPython_ESP32SPI#102 Fixes adafruit/Adafruit_CircuitPython_PyPortal#98 Fixes #54 Signed-off-by: Flavio Fernandes <[email protected]>
1 parent e02d658 commit b90bf88

File tree

1 file changed

+63
-24
lines changed

1 file changed

+63
-24
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

+63-24
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def __init__(
149149
self.logger.setLevel(logging.INFO)
150150
self._sock = None
151151
self._is_connected = False
152+
self._pending_ping_response = False
152153
self._msg_size_lim = MQTT_MSG_SZ_LIM
153154
self._pid = 0
154155
self._timestamp = 0
@@ -175,6 +176,35 @@ def __enter__(self):
175176
def __exit__(self, exception_type, exception_value, traceback):
176177
self.deinit()
177178

179+
def _sock_exact_recv(self, bufsize):
180+
"""Reads _exact_ number of bytes from the connected socket. Will only return
181+
string with the exact number of bytes requested.
182+
183+
The semantics of native socket receive is that it returns no more than the
184+
specified number of bytes (i.e. max size). However, it makes no guarantees in
185+
terms of the minimum size of the buffer, which could be 1 byte. This is a
186+
wrapper for socket recv() to ensure that no less than the expected number of
187+
bytes is returned or trigger a timeout exception.
188+
189+
:param int bufsize: number of bytes to receive
190+
"""
191+
stamp = time.monotonic()
192+
rc = self._sock.recv(bufsize)
193+
to_read = bufsize - len(rc)
194+
assert to_read >= 0
195+
read_timeout = self.keep_alive
196+
while to_read > 0:
197+
recv = self._sock.recv(to_read)
198+
to_read -= len(recv)
199+
rc += recv
200+
if time.monotonic() - stamp > read_timeout:
201+
raise MMQTTException(
202+
"Unable to receive {} bytes within {} seconds.".format(
203+
to_read, read_timeout
204+
)
205+
)
206+
return rc
207+
178208
def deinit(self):
179209
"""De-initializes the MQTT client and disconnects from the mqtt broker."""
180210
self.disconnect()
@@ -351,7 +381,7 @@ def connect(self, clean_session=True):
351381
while True:
352382
op = self._wait_for_msg()
353383
if op == 32:
354-
rc = self._sock.recv(3)
384+
rc = self._sock_exact_recv(3)
355385
assert rc[0] == 0x02
356386
if rc[2] != 0x00:
357387
raise MMQTTException(CONNACK_ERRORS[rc[2]])
@@ -366,12 +396,16 @@ def disconnect(self):
366396
self.is_connected()
367397
if self.logger is not None:
368398
self.logger.debug("Sending DISCONNECT packet to broker")
369-
self._sock.send(MQTT_DISCONNECT)
399+
try:
400+
self._sock.send(MQTT_DISCONNECT)
401+
except RuntimeError as e:
402+
if self.logger:
403+
self.logger.warning("Unable to send DISCONNECT packet: {}".format(e))
370404
if self.logger is not None:
371405
self.logger.debug("Closing socket")
372406
self._sock.close()
373407
self._is_connected = False
374-
self._subscribed_topics = None
408+
self._subscribed_topics = []
375409
if self.on_disconnect is not None:
376410
self.on_disconnect(self, self.user_data, 0)
377411

@@ -380,18 +414,15 @@ def ping(self):
380414
there is an active network connection.
381415
"""
382416
self.is_connected()
417+
if self._pending_ping_response:
418+
self._pending_ping_response = False
419+
raise MMQTTException("Ping response was pending from previous MQTT_PINGREQ")
383420
if self.logger is not None:
384421
self.logger.debug("Sending PINGREQ")
385422
self._sock.send(MQTT_PINGREQ)
386-
if self.logger is not None:
387-
self.logger.debug("Checking PINGRESP")
388-
while True:
389-
op = self._wait_for_msg(0.5)
390-
if op == 208:
391-
ping_resp = self._sock.recv(2)
392-
if ping_resp[0] != 0x00:
393-
raise MMQTTException("PINGRESP not returned from broker.")
394-
return
423+
# Set pending ping response. It will be checked upon next ping and
424+
# assumed to be cleared via _wait_for_msg()
425+
self._pending_ping_response = True
395426

396427
# pylint: disable=too-many-branches, too-many-statements
397428
def publish(self, topic, msg, retain=False, qos=0):
@@ -486,9 +517,9 @@ def publish(self, topic, msg, retain=False, qos=0):
486517
while True:
487518
op = self._wait_for_msg()
488519
if op == 0x40:
489-
sz = self._sock.recv(1)
520+
sz = self._sock_exact_recv(1)
490521
assert sz == b"\x02"
491-
rcv_pid = self._sock.recv(2)
522+
rcv_pid = self._sock_exact_recv(2)
492523
rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1]
493524
if pid == rcv_pid:
494525
if self.on_publish is not None:
@@ -571,7 +602,7 @@ def subscribe(self, topic, qos=0):
571602
while True:
572603
op = self._wait_for_msg()
573604
if op == 0x90:
574-
rc = self._sock.recv(4)
605+
rc = self._sock_exact_recv(4)
575606
assert rc[1] == packet[2] and rc[2] == packet[3]
576607
if rc[3] == 0x80:
577608
raise MMQTTException("SUBACK Failure!")
@@ -634,7 +665,7 @@ def unsubscribe(self, topic):
634665
while True:
635666
op = self._wait_for_msg()
636667
if op == 176:
637-
return_code = self._sock.recv(3)
668+
return_code = self._sock_exact_recv(3)
638669
assert return_code[0] == 0x02
639670
# [MQTT-3.32]
640671
assert (
@@ -694,24 +725,32 @@ def _wait_for_msg(self, timeout=30):
694725
res = self._sock.recv(1)
695726
self._sock.settimeout(timeout)
696727
if res in [None, b""]:
728+
# If we get here, it means that there is nothing to be received
697729
return None
698-
if res == MQTT_PINGRESP:
699-
sz = self._sock.recv(1)[0]
700-
assert sz == 0
730+
if res[0] == MQTT_PINGRESP:
731+
if self.logger:
732+
self.logger.debug("Checking PINGRESP")
733+
sz = self._sock_exact_recv(1)[0]
734+
if sz != 0x00:
735+
raise MMQTTException(
736+
"Unexpected PINGRESP returned from broker: {}.".format(sz)
737+
)
738+
# Ping response is no longer pending
739+
self._pending_ping_response = False
701740
return None
702741
if res[0] & 0xF0 != 0x30:
703742
return res[0]
704743
sz = self._recv_len()
705-
topic_len = self._sock.recv(2)
744+
topic_len = self._sock_exact_recv(2)
706745
topic_len = (topic_len[0] << 8) | topic_len[1]
707-
topic = self._sock.recv(topic_len)
746+
topic = self._sock_exact_recv(topic_len)
708747
topic = str(topic, "utf-8")
709748
sz -= topic_len + 2
710749
if res[0] & 0x06:
711-
pid = self._sock.recv(2)
750+
pid = self._sock_exact_recv(2)
712751
pid = pid[0] << 0x08 | pid[1]
713752
sz -= 0x02
714-
msg = self._sock.recv(sz)
753+
msg = self._sock_exact_recv(sz)
715754
self._handle_on_message(self, topic, str(msg, "utf-8"))
716755
if res[0] & 0x06 == 0x02:
717756
pkt = bytearray(b"\x40\x02\0\0")
@@ -725,7 +764,7 @@ def _recv_len(self):
725764
n = 0
726765
sh = 0
727766
while True:
728-
b = self._sock.recv(1)[0]
767+
b = self._sock_exact_recv(1)[0]
729768
n |= (b & 0x7F) << sh
730769
if not b & 0x80:
731770
return n

0 commit comments

Comments
 (0)