@@ -169,10 +169,12 @@ class MQTT:
169
169
:param int connect_retries: How many times to try to connect to the broker before giving up
170
170
on connect or reconnect. Exponential backoff will be used for the retries.
171
171
:param class user_data: arbitrary data to pass as a second argument to the callbacks.
172
+ :param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set
173
+ this to True in order to operate correctly over more than 24 days or so
172
174
173
175
"""
174
176
175
- # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements, not-callable, invalid-name, no-member
177
+ # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements,not-callable,invalid-name,no-member,too-many-locals
176
178
def __init__ (
177
179
self ,
178
180
* ,
@@ -190,13 +192,28 @@ def __init__(
190
192
socket_timeout : int = 1 ,
191
193
connect_retries : int = 5 ,
192
194
user_data = None ,
195
+ use_imprecise_time : Optional [bool ] = None ,
193
196
) -> None :
194
197
self ._socket_pool = socket_pool
195
198
self ._ssl_context = ssl_context
196
199
self ._sock = None
197
200
self ._backwards_compatible_sock = False
198
201
self ._use_binary_mode = use_binary_mode
199
202
203
+ self .use_monotonic_ns = False
204
+ try :
205
+ time .monotonic_ns ()
206
+ self .use_monotonic_ns = True
207
+ except AttributeError :
208
+ if use_imprecise_time :
209
+ self .use_monotonic_ns = False
210
+ else :
211
+ raise MMQTTException ( # pylint: disable=raise-missing-from
212
+ "time.monotonic_ns() is not available. "
213
+ "Will use imprecise time however only if the"
214
+ "use_imprecise_time argument is set to True."
215
+ )
216
+
200
217
if recv_timeout <= socket_timeout :
201
218
raise MMQTTException (
202
219
"recv_timeout must be strictly greater than socket_timeout"
@@ -248,9 +265,8 @@ def __init__(
248
265
self .client_id = client_id
249
266
else :
250
267
# assign a unique client_id
251
- self .client_id = (
252
- f"cpy{ randint (0 , int (time .monotonic () * 100 ) % 1000 )} { randint (0 , 99 )} "
253
- )
268
+ time_int = int (self .get_monotonic_time () * 100 ) % 1000
269
+ self .client_id = f"cpy{ randint (0 , time_int )} { randint (0 , 99 )} "
254
270
# generated client_id's enforce spec.'s length rules
255
271
if len (self .client_id .encode ("utf-8" )) > 23 or not self .client_id :
256
272
raise ValueError ("MQTT Client ID must be between 1 and 23 bytes" )
@@ -273,6 +289,17 @@ def __init__(
273
289
self .on_subscribe = None
274
290
self .on_unsubscribe = None
275
291
292
+ def get_monotonic_time (self ) -> float :
293
+ """
294
+ Provide monotonic time in seconds. Based on underlying implementation
295
+ this might result in imprecise time, that will result in the library
296
+ not being able to operate if running contiguously for more than 24 days or so.
297
+ """
298
+ if self .use_monotonic_ns :
299
+ return time .monotonic_ns () / 1000000000
300
+
301
+ return time .monotonic ()
302
+
276
303
# pylint: disable=too-many-branches
277
304
def _get_connect_socket (self , host : str , port : int , * , timeout : int = 1 ):
278
305
"""Obtains a new socket and connects to a broker.
@@ -627,7 +654,7 @@ def _connect(
627
654
self ._send_str (self ._username )
628
655
self ._send_str (self ._password )
629
656
self .logger .debug ("Receiving CONNACK packet from broker" )
630
- stamp = time . monotonic ()
657
+ stamp = self . get_monotonic_time ()
631
658
while True :
632
659
op = self ._wait_for_msg ()
633
660
if op == 32 :
@@ -643,7 +670,7 @@ def _connect(
643
670
return result
644
671
645
672
if op is None :
646
- if time . monotonic () - stamp > self ._recv_timeout :
673
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
647
674
raise MMQTTException (
648
675
f"No data received from broker for { self ._recv_timeout } seconds."
649
676
)
@@ -672,13 +699,13 @@ def ping(self) -> list[int]:
672
699
self .logger .debug ("Sending PINGREQ" )
673
700
self ._sock .send (MQTT_PINGREQ )
674
701
ping_timeout = self .keep_alive
675
- stamp = time . monotonic ()
702
+ stamp = self . get_monotonic_time ()
676
703
rc , rcs = None , []
677
704
while rc != MQTT_PINGRESP :
678
705
rc = self ._wait_for_msg ()
679
706
if rc :
680
707
rcs .append (rc )
681
- if time . monotonic () - stamp > ping_timeout :
708
+ if self . get_monotonic_time () - stamp > ping_timeout :
682
709
raise MMQTTException ("PINGRESP not returned from broker." )
683
710
return rcs
684
711
@@ -759,7 +786,7 @@ def publish(
759
786
if qos == 0 and self .on_publish is not None :
760
787
self .on_publish (self , self ._user_data , topic , self ._pid )
761
788
if qos == 1 :
762
- stamp = time . monotonic ()
789
+ stamp = self . get_monotonic_time ()
763
790
while True :
764
791
op = self ._wait_for_msg ()
765
792
if op == 0x40 :
@@ -773,7 +800,7 @@ def publish(
773
800
return
774
801
775
802
if op is None :
776
- if time . monotonic () - stamp > self ._recv_timeout :
803
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
777
804
raise MMQTTException (
778
805
f"No data received from broker for { self ._recv_timeout } seconds."
779
806
)
@@ -825,11 +852,11 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
825
852
for t , q in topics :
826
853
self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
827
854
self ._sock .send (packet )
828
- stamp = time . monotonic ()
855
+ stamp = self . get_monotonic_time ()
829
856
while True :
830
857
op = self ._wait_for_msg ()
831
858
if op is None :
832
- if time . monotonic () - stamp > self ._recv_timeout :
859
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
833
860
raise MMQTTException (
834
861
f"No data received from broker for { self ._recv_timeout } seconds."
835
862
)
@@ -892,10 +919,10 @@ def unsubscribe(self, topic: str) -> None:
892
919
self ._sock .send (packet )
893
920
self .logger .debug ("Waiting for UNSUBACK..." )
894
921
while True :
895
- stamp = time . monotonic ()
922
+ stamp = self . get_monotonic_time ()
896
923
op = self ._wait_for_msg ()
897
924
if op is None :
898
- if time . monotonic () - stamp > self ._recv_timeout :
925
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
899
926
raise MMQTTException (
900
927
f"No data received from broker for { self ._recv_timeout } seconds."
901
928
)
@@ -989,8 +1016,8 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
989
1016
990
1017
self .logger .debug (f"waiting for messages for { timeout } seconds" )
991
1018
if self ._timestamp == 0 :
992
- self ._timestamp = time . monotonic ()
993
- current_time = time . monotonic ()
1019
+ self ._timestamp = self . get_monotonic_time ()
1020
+ current_time = self . get_monotonic_time ()
994
1021
if current_time - self ._timestamp >= self .keep_alive :
995
1022
self ._timestamp = 0
996
1023
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
@@ -1000,14 +1027,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1000
1027
rcs = self .ping ()
1001
1028
return rcs
1002
1029
1003
- stamp = time . monotonic ()
1030
+ stamp = self . get_monotonic_time ()
1004
1031
rcs = []
1005
1032
1006
1033
while True :
1007
1034
rc = self ._wait_for_msg ()
1008
1035
if rc is not None :
1009
1036
rcs .append (rc )
1010
- if time . monotonic () - stamp > timeout :
1037
+ if self . get_monotonic_time () - stamp > timeout :
1011
1038
self .logger .debug (f"Loop timed out after { timeout } seconds" )
1012
1039
break
1013
1040
@@ -1106,7 +1133,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1106
1133
:param int bufsize: number of bytes to receive
1107
1134
:return: byte array
1108
1135
"""
1109
- stamp = time . monotonic ()
1136
+ stamp = self . get_monotonic_time ()
1110
1137
if not self ._backwards_compatible_sock :
1111
1138
# CPython/Socketpool Impl.
1112
1139
rc = bytearray (bufsize )
@@ -1121,7 +1148,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1121
1148
recv_len = self ._sock .recv_into (mv , to_read )
1122
1149
to_read -= recv_len
1123
1150
mv = mv [recv_len :]
1124
- if time . monotonic () - stamp > read_timeout :
1151
+ if self . get_monotonic_time () - stamp > read_timeout :
1125
1152
raise MMQTTException (
1126
1153
f"Unable to receive { to_read } bytes within { read_timeout } seconds."
1127
1154
)
@@ -1141,7 +1168,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1141
1168
recv = self ._sock .recv (to_read )
1142
1169
to_read -= len (recv )
1143
1170
rc += recv
1144
- if time . monotonic () - stamp > read_timeout :
1171
+ if self . get_monotonic_time () - stamp > read_timeout :
1145
1172
raise MMQTTException (
1146
1173
f"Unable to receive { to_read } bytes within { read_timeout } seconds."
1147
1174
)
0 commit comments