@@ -1047,7 +1047,7 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1047
1047
rcs = []
1048
1048
1049
1049
while True :
1050
- rc = self ._wait_for_msg ()
1050
+ rc = self ._wait_for_msg (timeout = timeout )
1051
1051
if rc is not None :
1052
1052
rcs .append (rc )
1053
1053
if self .get_monotonic_time () - stamp > timeout :
@@ -1056,11 +1056,13 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1056
1056
1057
1057
return rcs if rcs else None
1058
1058
1059
- def _wait_for_msg (self ) -> Optional [int ]:
1059
+ def _wait_for_msg (self , timeout : Optional [ float ] = None ) -> Optional [int ]:
1060
1060
# pylint: disable = too-many-return-statements
1061
1061
1062
1062
"""Reads and processes network events.
1063
1063
Return the packet type or None if there is nothing to be received.
1064
+
1065
+ :param float timeout: return after this timeout, in seconds.
1064
1066
"""
1065
1067
# CPython socket module contains a timeout attribute
1066
1068
if hasattr (self ._socket_pool , "timeout" ):
@@ -1070,7 +1072,7 @@ def _wait_for_msg(self) -> Optional[int]:
1070
1072
return None
1071
1073
else : # socketpool, esp32spi
1072
1074
try :
1073
- res = self ._sock_exact_recv (1 )
1075
+ res = self ._sock_exact_recv (1 , timeout = timeout )
1074
1076
except OSError as error :
1075
1077
if error .errno in (errno .ETIMEDOUT , errno .EAGAIN ):
1076
1078
# raised by a socket timeout if 0 bytes were present
@@ -1139,7 +1141,9 @@ def _decode_remaining_length(self) -> int:
1139
1141
return n
1140
1142
sh += 7
1141
1143
1142
- def _sock_exact_recv (self , bufsize : int ) -> bytearray :
1144
+ def _sock_exact_recv (
1145
+ self , bufsize : int , timeout : Optional [float ] = None
1146
+ ) -> bytearray :
1143
1147
"""Reads _exact_ number of bytes from the connected socket. Will only return
1144
1148
bytearray with the exact number of bytes requested.
1145
1149
@@ -1150,6 +1154,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1150
1154
bytes is returned or trigger a timeout exception.
1151
1155
1152
1156
:param int bufsize: number of bytes to receive
1157
+ :param float timeout: timeout, in seconds. Defaults to keep_alive
1153
1158
:return: byte array
1154
1159
"""
1155
1160
stamp = self .get_monotonic_time ()
@@ -1161,7 +1166,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1161
1166
to_read = bufsize - recv_len
1162
1167
if to_read < 0 :
1163
1168
raise MMQTTException (f"negative number of bytes to read: { to_read } " )
1164
- read_timeout = self .keep_alive
1169
+ read_timeout = timeout if timeout is not None else self .keep_alive
1165
1170
mv = mv [recv_len :]
1166
1171
while to_read > 0 :
1167
1172
recv_len = self ._sock .recv_into (mv , to_read )
0 commit comments