Skip to content

Commit 645ae31

Browse files
authored
Merge pull request adafruit#133 from vladak/exact_recv_is_not
make sure to read the whole buffer in _sock_exact_recv()
2 parents 10d7045 + be9c198 commit 645ae31

File tree

1 file changed

+18
-36
lines changed

1 file changed

+18
-36
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

+18-36
Original file line numberDiff line numberDiff line change
@@ -316,35 +316,6 @@ def __enter__(self):
316316
def __exit__(self, exception_type, exception_value, traceback):
317317
self.deinit()
318318

319-
def _sock_exact_recv(self, bufsize):
320-
"""Reads _exact_ number of bytes from the connected socket. Will only return
321-
string with the exact number of bytes requested.
322-
323-
The semantics of native socket receive is that it returns no more than the
324-
specified number of bytes (i.e. max size). However, it makes no guarantees in
325-
terms of the minimum size of the buffer, which could be 1 byte. This is a
326-
wrapper for socket recv() to ensure that no less than the expected number of
327-
bytes is returned or trigger a timeout exception.
328-
329-
:param int bufsize: number of bytes to receive
330-
"""
331-
stamp = time.monotonic()
332-
rc = self._sock.recv(bufsize)
333-
to_read = bufsize - len(rc)
334-
assert to_read >= 0
335-
read_timeout = self.keep_alive
336-
while to_read > 0:
337-
recv = self._sock.recv(to_read)
338-
to_read -= len(recv)
339-
rc += recv
340-
if time.monotonic() - stamp > read_timeout:
341-
raise MMQTTException(
342-
"Unable to receive {} bytes within {} seconds.".format(
343-
to_read, read_timeout
344-
)
345-
)
346-
return rc
347-
348319
def deinit(self):
349320
"""De-initializes the MQTT client and disconnects from the mqtt broker."""
350321
self.disconnect()
@@ -988,15 +959,28 @@ def _sock_exact_recv(self, bufsize):
988959
bytes is returned or trigger a timeout exception.
989960
990961
:param int bufsize: number of bytes to receive
991-
962+
:return: byte array
992963
"""
964+
stamp = time.monotonic()
993965
if not self._backwards_compatible_sock:
994966
# CPython/Socketpool Impl.
995967
rc = bytearray(bufsize)
996-
self._sock.recv_into(rc, bufsize)
997-
else: # ESP32SPI Impl.
998-
stamp = time.monotonic()
968+
mv = memoryview(rc)
969+
recv_len = self._sock.recv_into(rc, bufsize)
970+
to_read = bufsize - recv_len
971+
if to_read < 0:
972+
raise MMQTTException(f"negative number of bytes to read: {to_read}")
999973
read_timeout = self.keep_alive
974+
mv = mv[recv_len:]
975+
while to_read > 0:
976+
recv_len = self._sock.recv_into(mv, to_read)
977+
to_read -= recv_len
978+
mv = mv[recv_len:]
979+
if time.monotonic() - stamp > read_timeout:
980+
raise MMQTTException(
981+
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
982+
)
983+
else: # ESP32SPI Impl.
1000984
# This will timeout with socket timeout (not keepalive timeout)
1001985
rc = self._sock.recv(bufsize)
1002986
if not rc:
@@ -1015,9 +999,7 @@ def _sock_exact_recv(self, bufsize):
1015999
rc += recv
10161000
if time.monotonic() - stamp > read_timeout:
10171001
raise MMQTTException(
1018-
"Unable to receive {} bytes within {} seconds.".format(
1019-
to_read, read_timeout
1020-
)
1002+
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
10211003
)
10221004
return rc
10231005

0 commit comments

Comments
 (0)