Skip to content

Commit 3e5a14d

Browse files
Fix socket receive issues related to message buffer size
This change addresses a few issues in the handling of the MQTT messages that caused the library to become unstable: - Add wrapper for socket.recv() so that an exact number of bytes are read into the buffer before attempting to parse the MQTT message; - Fix handling of ping response packets via _wait_for_msg(); - Fix disconnect so it can gracefully handle cases when socket writes are not possible. Also re-init _subscribed_topics as an empty list instead of None. Related-to adafruit/Adafruit_CircuitPython_ESP32SPI#102 Fixes adafruit/Adafruit_CircuitPython_PyPortal#98 Fixes adafruit#54 Signed-off-by: Flavio Fernandes <[email protected]>
1 parent e02d658 commit 3e5a14d

File tree

1 file changed

+72
-28
lines changed

1 file changed

+72
-28
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

+72-28
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,35 @@ def __enter__(self):
175175
def __exit__(self, exception_type, exception_value, traceback):
176176
self.deinit()
177177

178+
def _sock_exact_recv(self, bufsize):
179+
"""Reads _exact_ number of bytes from the connected socket. Will only return
180+
string with the exact number of bytes requested.
181+
182+
The semantics of native socket receive is that it returns no more than the
183+
specified number of bytes (i.e. max size). However, it makes no guarantees in
184+
terms of the minimum size of the buffer, which could be 1 byte. This is a
185+
wrapper for socket recv() to ensure that no less than the expected number of
186+
bytes is returned or trigger a timeout exception.
187+
188+
:param int bufsize: number of bytes to receive
189+
"""
190+
stamp = time.monotonic()
191+
rc = self._sock.recv(bufsize)
192+
to_read = bufsize - len(rc)
193+
assert to_read >= 0
194+
read_timeout = self.keep_alive
195+
while to_read > 0:
196+
recv = self._sock.recv(to_read)
197+
to_read -= len(recv)
198+
rc += recv
199+
if time.monotonic() - stamp > read_timeout:
200+
raise MMQTTException(
201+
"Unable to receive {} bytes within {} seconds.".format(
202+
to_read, read_timeout
203+
)
204+
)
205+
return rc
206+
178207
def deinit(self):
179208
"""De-initializes the MQTT client and disconnects from the mqtt broker."""
180209
self.disconnect()
@@ -351,7 +380,7 @@ def connect(self, clean_session=True):
351380
while True:
352381
op = self._wait_for_msg()
353382
if op == 32:
354-
rc = self._sock.recv(3)
383+
rc = self._sock_exact_recv(3)
355384
assert rc[0] == 0x02
356385
if rc[2] != 0x00:
357386
raise MMQTTException(CONNACK_ERRORS[rc[2]])
@@ -366,32 +395,38 @@ def disconnect(self):
366395
self.is_connected()
367396
if self.logger is not None:
368397
self.logger.debug("Sending DISCONNECT packet to broker")
369-
self._sock.send(MQTT_DISCONNECT)
398+
try:
399+
self._sock.send(MQTT_DISCONNECT)
400+
except RuntimeError as e:
401+
if self.logger:
402+
self.logger.warning("Unable to send DISCONNECT packet: {}".format(e))
370403
if self.logger is not None:
371404
self.logger.debug("Closing socket")
372405
self._sock.close()
373406
self._is_connected = False
374-
self._subscribed_topics = None
407+
self._subscribed_topics = []
375408
if self.on_disconnect is not None:
376409
self.on_disconnect(self, self.user_data, 0)
377410

378411
def ping(self):
379412
"""Pings the MQTT Broker to confirm if the broker is alive or if
380413
there is an active network connection.
414+
Returns response codes of any messages received while waiting for PINGRESP.
381415
"""
382416
self.is_connected()
383-
if self.logger is not None:
417+
if self.logger:
384418
self.logger.debug("Sending PINGREQ")
385419
self._sock.send(MQTT_PINGREQ)
386-
if self.logger is not None:
387-
self.logger.debug("Checking PINGRESP")
388-
while True:
389-
op = self._wait_for_msg(0.5)
390-
if op == 208:
391-
ping_resp = self._sock.recv(2)
392-
if ping_resp[0] != 0x00:
393-
raise MMQTTException("PINGRESP not returned from broker.")
394-
return
420+
ping_timeout = self.keep_alive
421+
stamp = time.monotonic()
422+
rc, rcs = None, []
423+
while rc != MQTT_PINGRESP:
424+
rc = self._wait_for_msg()
425+
if rc:
426+
rcs.append(rc)
427+
if time.monotonic() - stamp > ping_timeout:
428+
raise MMQTTException("PINGRESP not returned from broker.")
429+
return rcs
395430

396431
# pylint: disable=too-many-branches, too-many-statements
397432
def publish(self, topic, msg, retain=False, qos=0):
@@ -486,9 +521,9 @@ def publish(self, topic, msg, retain=False, qos=0):
486521
while True:
487522
op = self._wait_for_msg()
488523
if op == 0x40:
489-
sz = self._sock.recv(1)
524+
sz = self._sock_exact_recv(1)
490525
assert sz == b"\x02"
491-
rcv_pid = self._sock.recv(2)
526+
rcv_pid = self._sock_exact_recv(2)
492527
rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1]
493528
if pid == rcv_pid:
494529
if self.on_publish is not None:
@@ -571,7 +606,7 @@ def subscribe(self, topic, qos=0):
571606
while True:
572607
op = self._wait_for_msg()
573608
if op == 0x90:
574-
rc = self._sock.recv(4)
609+
rc = self._sock_exact_recv(4)
575610
assert rc[1] == packet[2] and rc[2] == packet[3]
576611
if rc[3] == 0x80:
577612
raise MMQTTException("SUBACK Failure!")
@@ -634,7 +669,7 @@ def unsubscribe(self, topic):
634669
while True:
635670
op = self._wait_for_msg()
636671
if op == 176:
637-
return_code = self._sock.recv(3)
672+
return_code = self._sock_exact_recv(3)
638673
assert return_code[0] == 0x02
639674
# [MQTT-3.32]
640675
assert (
@@ -671,6 +706,7 @@ def reconnect(self, resub_topics=True):
671706
def loop(self):
672707
"""Non-blocking message loop. Use this method to
673708
check incoming subscription messages.
709+
Returns response codes of any messages received.
674710
"""
675711
if self._timestamp == 0:
676712
self._timestamp = time.monotonic()
@@ -682,10 +718,12 @@ def loop(self):
682718
"KeepAlive period elapsed - \
683719
requesting a PINGRESP from the server..."
684720
)
685-
self.ping()
721+
rcs = self.ping()
686722
self._timestamp = 0
723+
return rcs
687724
self._sock.settimeout(0.1)
688-
return self._wait_for_msg()
725+
rc = self._wait_for_msg()
726+
return [rc] if rc else None
689727

690728
def _wait_for_msg(self, timeout=30):
691729
"""Reads and processes network events.
@@ -694,24 +732,30 @@ def _wait_for_msg(self, timeout=30):
694732
res = self._sock.recv(1)
695733
self._sock.settimeout(timeout)
696734
if res in [None, b""]:
735+
# If we get here, it means that there is nothing to be received
697736
return None
698-
if res == MQTT_PINGRESP:
699-
sz = self._sock.recv(1)[0]
700-
assert sz == 0
701-
return None
737+
if res[0] == MQTT_PINGRESP:
738+
if self.logger:
739+
self.logger.debug("Checking PINGRESP")
740+
sz = self._sock_exact_recv(1)[0]
741+
if sz != 0x00:
742+
raise MMQTTException(
743+
"Unexpected PINGRESP returned from broker: {}.".format(sz)
744+
)
745+
return MQTT_PINGRESP
702746
if res[0] & 0xF0 != 0x30:
703747
return res[0]
704748
sz = self._recv_len()
705-
topic_len = self._sock.recv(2)
749+
topic_len = self._sock_exact_recv(2)
706750
topic_len = (topic_len[0] << 8) | topic_len[1]
707-
topic = self._sock.recv(topic_len)
751+
topic = self._sock_exact_recv(topic_len)
708752
topic = str(topic, "utf-8")
709753
sz -= topic_len + 2
710754
if res[0] & 0x06:
711-
pid = self._sock.recv(2)
755+
pid = self._sock_exact_recv(2)
712756
pid = pid[0] << 0x08 | pid[1]
713757
sz -= 0x02
714-
msg = self._sock.recv(sz)
758+
msg = self._sock_exact_recv(sz)
715759
self._handle_on_message(self, topic, str(msg, "utf-8"))
716760
if res[0] & 0x06 == 0x02:
717761
pkt = bytearray(b"\x40\x02\0\0")
@@ -725,7 +769,7 @@ def _recv_len(self):
725769
n = 0
726770
sh = 0
727771
while True:
728-
b = self._sock.recv(1)[0]
772+
b = self._sock_exact_recv(1)[0]
729773
n |= (b & 0x7F) << sh
730774
if not b & 0x80:
731775
return n

0 commit comments

Comments
 (0)