@@ -1004,7 +1004,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1004
1004
rcs = []
1005
1005
1006
1006
while True :
1007
- rc = self ._wait_for_msg ()
1007
+ rc = self ._wait_for_msg (timeout = timeout )
1008
1008
if rc is not None :
1009
1009
rcs .append (rc )
1010
1010
if time .monotonic () - stamp > timeout :
@@ -1013,11 +1013,13 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1013
1013
1014
1014
return rcs if rcs else None
1015
1015
1016
- def _wait_for_msg (self ) -> Optional [int ]:
1016
+ def _wait_for_msg (self , timeout : Optional [ float ] = None ) -> Optional [int ]:
1017
1017
# pylint: disable = too-many-return-statements
1018
1018
1019
1019
"""Reads and processes network events.
1020
1020
Return the packet type or None if there is nothing to be received.
1021
+
1022
+ :param float timeout: return after this timeout, in seconds.
1021
1023
"""
1022
1024
# CPython socket module contains a timeout attribute
1023
1025
if hasattr (self ._socket_pool , "timeout" ):
@@ -1027,7 +1029,7 @@ def _wait_for_msg(self) -> Optional[int]:
1027
1029
return None
1028
1030
else : # socketpool, esp32spi
1029
1031
try :
1030
- res = self ._sock_exact_recv (1 )
1032
+ res = self ._sock_exact_recv (1 , timeout = timeout )
1031
1033
except OSError as error :
1032
1034
if error .errno in (errno .ETIMEDOUT , errno .EAGAIN ):
1033
1035
# raised by a socket timeout if 0 bytes were present
@@ -1093,7 +1095,9 @@ def _recv_len(self) -> int:
1093
1095
return n
1094
1096
sh += 7
1095
1097
1096
- def _sock_exact_recv (self , bufsize : int ) -> bytearray :
1098
+ def _sock_exact_recv (
1099
+ self , bufsize : int , timeout : Optional [float ] = None
1100
+ ) -> bytearray :
1097
1101
"""Reads _exact_ number of bytes from the connected socket. Will only return
1098
1102
bytearray with the exact number of bytes requested.
1099
1103
@@ -1104,6 +1108,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1104
1108
bytes is returned or trigger a timeout exception.
1105
1109
1106
1110
:param int bufsize: number of bytes to receive
1111
+ :param float timeout: timeout, in seconds. Defaults to keep_alive
1107
1112
:return: byte array
1108
1113
"""
1109
1114
stamp = time .monotonic ()
@@ -1115,7 +1120,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1115
1120
to_read = bufsize - recv_len
1116
1121
if to_read < 0 :
1117
1122
raise MMQTTException (f"negative number of bytes to read: { to_read } " )
1118
- read_timeout = self .keep_alive
1123
+ read_timeout = timeout if timeout is not None else self .keep_alive
1119
1124
mv = mv [recv_len :]
1120
1125
while to_read > 0 :
1121
1126
recv_len = self ._sock .recv_into (mv , to_read )
0 commit comments