@@ -1034,6 +1034,13 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1034
1034
:param float timeout: return after this timeout, in seconds.
1035
1035
1036
1036
"""
1037
+ if timeout < self ._socket_timeout :
1038
+ raise MMQTTException (
1039
+ # pylint: disable=consider-using-f-string
1040
+ "loop timeout ({}) must be bigger " .format (timeout )
1041
+ + "than socket timeout ({}))" .format (self ._socket_timeout )
1042
+ )
1043
+
1037
1044
self ._connected ()
1038
1045
self .logger .debug (f"waiting for messages for { timeout } seconds" )
1039
1046
@@ -1065,11 +1072,13 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1065
1072
1066
1073
return rcs if rcs else None
1067
1074
1068
- def _wait_for_msg (self ) -> Optional [int ]:
1075
+ def _wait_for_msg (self , timeout : Optional [ float ] = None ) -> Optional [int ]:
1069
1076
# pylint: disable = too-many-return-statements
1070
1077
1071
1078
"""Reads and processes network events.
1072
1079
Return the packet type or None if there is nothing to be received.
1080
+
1081
+ :param float timeout: return after this timeout, in seconds.
1073
1082
"""
1074
1083
# CPython socket module contains a timeout attribute
1075
1084
if hasattr (self ._socket_pool , "timeout" ):
@@ -1079,7 +1088,7 @@ def _wait_for_msg(self) -> Optional[int]:
1079
1088
return None
1080
1089
else : # socketpool, esp32spi
1081
1090
try :
1082
- res = self ._sock_exact_recv (1 )
1091
+ res = self ._sock_exact_recv (1 , timeout = timeout )
1083
1092
except OSError as error :
1084
1093
if error .errno in (errno .ETIMEDOUT , errno .EAGAIN ):
1085
1094
# raised by a socket timeout if 0 bytes were present
@@ -1148,7 +1157,9 @@ def _decode_remaining_length(self) -> int:
1148
1157
return n
1149
1158
sh += 7
1150
1159
1151
- def _sock_exact_recv (self , bufsize : int ) -> bytearray :
1160
+ def _sock_exact_recv (
1161
+ self , bufsize : int , timeout : Optional [float ] = None
1162
+ ) -> bytearray :
1152
1163
"""Reads _exact_ number of bytes from the connected socket. Will only return
1153
1164
bytearray with the exact number of bytes requested.
1154
1165
@@ -1159,6 +1170,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1159
1170
bytes is returned or trigger a timeout exception.
1160
1171
1161
1172
:param int bufsize: number of bytes to receive
1173
+ :param float timeout: timeout, in seconds. Defaults to keep_alive
1162
1174
:return: byte array
1163
1175
"""
1164
1176
stamp = self .get_monotonic_time ()
@@ -1170,7 +1182,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1170
1182
to_read = bufsize - recv_len
1171
1183
if to_read < 0 :
1172
1184
raise MMQTTException (f"negative number of bytes to read: { to_read } " )
1173
- read_timeout = self .keep_alive
1185
+ read_timeout = timeout if timeout is not None else self .keep_alive
1174
1186
mv = mv [recv_len :]
1175
1187
while to_read > 0 :
1176
1188
recv_len = self ._sock .recv_into (mv , to_read )
0 commit comments