@@ -169,10 +169,12 @@ class MQTT:
169
169
:param int connect_retries: How many times to try to connect to the broker before giving up
170
170
on connect or reconnect. Exponential backoff will be used for the retries.
171
171
:param class user_data: arbitrary data to pass as a second argument to the callbacks.
172
+ :param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set
173
+ this to True in order to operate
172
174
173
175
"""
174
176
175
- # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements, not-callable, invalid-name, no-member
177
+ # pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements,not-callable,invalid-name,no-member,too-many-locals
176
178
def __init__ (
177
179
self ,
178
180
* ,
@@ -190,13 +192,26 @@ def __init__(
190
192
socket_timeout : int = 1 ,
191
193
connect_retries : int = 5 ,
192
194
user_data = None ,
195
+ use_imprecise_time : Optional [bool ] = None ,
193
196
) -> None :
194
197
self ._socket_pool = socket_pool
195
198
self ._ssl_context = ssl_context
196
199
self ._sock = None
197
200
self ._backwards_compatible_sock = False
198
201
self ._use_binary_mode = use_binary_mode
199
202
203
+ self .monotonic_ns = False
204
+ try :
205
+ time .monotonic_ns ()
206
+ self .monotonic_ns = True
207
+ except AttributeError :
208
+ if use_imprecise_time :
209
+ self .monotonic_ns = False
210
+ else :
211
+ raise MMQTTException ("time.monotonic_ns() is not available. " # pylint: disable=raise-missing-from
212
+ "Will use imprecise time however only if the"
213
+ "use_imprecise_time argument is set to True." )
214
+
200
215
if recv_timeout <= socket_timeout :
201
216
raise MMQTTException (
202
217
"recv_timeout must be strictly greater than socket_timeout"
@@ -249,7 +264,7 @@ def __init__(
249
264
else :
250
265
# assign a unique client_id
251
266
self .client_id = (
252
- f"cpy{ randint (0 , int (time . monotonic () * 100 ) % 1000 )} { randint (0 , 99 )} "
267
+ f"cpy{ randint (0 , int (self . get_monotonic_time () * 100 ) % 1000 )} { randint (0 , 99 )} "
253
268
)
254
269
# generated client_id's enforce spec.'s length rules
255
270
if len (self .client_id .encode ("utf-8" )) > 23 or not self .client_id :
@@ -273,6 +288,17 @@ def __init__(
273
288
self .on_subscribe = None
274
289
self .on_unsubscribe = None
275
290
291
+ def get_monotonic_time (self ) -> float :
292
+ """
293
+ Provide monotonic time in seconds. Based on underlying implementation
294
+ this might result in imprecise time, that will result in the library
295
+ not being able to operate if running contiguously for more than 24 days or so.
296
+ """
297
+ if self .monotonic_ns :
298
+ return time .monotonic_ns () / 1000000
299
+
300
+ return time .monotonic ()
301
+
276
302
# pylint: disable=too-many-branches
277
303
def _get_connect_socket (self , host : str , port : int , * , timeout : int = 1 ):
278
304
"""Obtains a new socket and connects to a broker.
@@ -627,7 +653,7 @@ def _connect(
627
653
self ._send_str (self ._username )
628
654
self ._send_str (self ._password )
629
655
self .logger .debug ("Receiving CONNACK packet from broker" )
630
- stamp = time . monotonic ()
656
+ stamp = self . get_monotonic_time ()
631
657
while True :
632
658
op = self ._wait_for_msg ()
633
659
if op == 32 :
@@ -643,7 +669,7 @@ def _connect(
643
669
return result
644
670
645
671
if op is None :
646
- if time . monotonic () - stamp > self ._recv_timeout :
672
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
647
673
raise MMQTTException (
648
674
f"No data received from broker for { self ._recv_timeout } seconds."
649
675
)
@@ -672,13 +698,13 @@ def ping(self) -> list[int]:
672
698
self .logger .debug ("Sending PINGREQ" )
673
699
self ._sock .send (MQTT_PINGREQ )
674
700
ping_timeout = self .keep_alive
675
- stamp = time . monotonic ()
701
+ stamp = self . get_monotonic_time ()
676
702
rc , rcs = None , []
677
703
while rc != MQTT_PINGRESP :
678
704
rc = self ._wait_for_msg ()
679
705
if rc :
680
706
rcs .append (rc )
681
- if time . monotonic () - stamp > ping_timeout :
707
+ if self . get_monotonic_time () - stamp > ping_timeout :
682
708
raise MMQTTException ("PINGRESP not returned from broker." )
683
709
return rcs
684
710
@@ -759,7 +785,7 @@ def publish(
759
785
if qos == 0 and self .on_publish is not None :
760
786
self .on_publish (self , self ._user_data , topic , self ._pid )
761
787
if qos == 1 :
762
- stamp = time . monotonic ()
788
+ stamp = self . get_monotonic_time ()
763
789
while True :
764
790
op = self ._wait_for_msg ()
765
791
if op == 0x40 :
@@ -773,7 +799,7 @@ def publish(
773
799
return
774
800
775
801
if op is None :
776
- if time . monotonic () - stamp > self ._recv_timeout :
802
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
777
803
raise MMQTTException (
778
804
f"No data received from broker for { self ._recv_timeout } seconds."
779
805
)
@@ -825,11 +851,11 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
825
851
for t , q in topics :
826
852
self .logger .debug ("SUBSCRIBING to topic %s with QoS %d" , t , q )
827
853
self ._sock .send (packet )
828
- stamp = time . monotonic ()
854
+ stamp = self . get_monotonic_time ()
829
855
while True :
830
856
op = self ._wait_for_msg ()
831
857
if op is None :
832
- if time . monotonic () - stamp > self ._recv_timeout :
858
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
833
859
raise MMQTTException (
834
860
f"No data received from broker for { self ._recv_timeout } seconds."
835
861
)
@@ -892,10 +918,10 @@ def unsubscribe(self, topic: str) -> None:
892
918
self ._sock .send (packet )
893
919
self .logger .debug ("Waiting for UNSUBACK..." )
894
920
while True :
895
- stamp = time . monotonic ()
921
+ stamp = self . get_monotonic_time ()
896
922
op = self ._wait_for_msg ()
897
923
if op is None :
898
- if time . monotonic () - stamp > self ._recv_timeout :
924
+ if self . get_monotonic_time () - stamp > self ._recv_timeout :
899
925
raise MMQTTException (
900
926
f"No data received from broker for { self ._recv_timeout } seconds."
901
927
)
@@ -989,8 +1015,8 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
989
1015
990
1016
self .logger .debug (f"waiting for messages for { timeout } seconds" )
991
1017
if self ._timestamp == 0 :
992
- self ._timestamp = time . monotonic ()
993
- current_time = time . monotonic ()
1018
+ self ._timestamp = self . get_monotonic_time ()
1019
+ current_time = self . get_monotonic_time ()
994
1020
if current_time - self ._timestamp >= self .keep_alive :
995
1021
self ._timestamp = 0
996
1022
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
@@ -1000,14 +1026,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
1000
1026
rcs = self .ping ()
1001
1027
return rcs
1002
1028
1003
- stamp = time . monotonic ()
1029
+ stamp = self . get_monotonic_time ()
1004
1030
rcs = []
1005
1031
1006
1032
while True :
1007
1033
rc = self ._wait_for_msg ()
1008
1034
if rc is not None :
1009
1035
rcs .append (rc )
1010
- if time . monotonic () - stamp > timeout :
1036
+ if self . get_monotonic_time () - stamp > timeout :
1011
1037
self .logger .debug (f"Loop timed out after { timeout } seconds" )
1012
1038
break
1013
1039
@@ -1106,7 +1132,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1106
1132
:param int bufsize: number of bytes to receive
1107
1133
:return: byte array
1108
1134
"""
1109
- stamp = time . monotonic ()
1135
+ stamp = self . get_monotonic_time ()
1110
1136
if not self ._backwards_compatible_sock :
1111
1137
# CPython/Socketpool Impl.
1112
1138
rc = bytearray (bufsize )
@@ -1121,7 +1147,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1121
1147
recv_len = self ._sock .recv_into (mv , to_read )
1122
1148
to_read -= recv_len
1123
1149
mv = mv [recv_len :]
1124
- if time . monotonic () - stamp > read_timeout :
1150
+ if self . get_monotonic_time () - stamp > read_timeout :
1125
1151
raise MMQTTException (
1126
1152
f"Unable to receive { to_read } bytes within { read_timeout } seconds."
1127
1153
)
@@ -1141,7 +1167,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
1141
1167
recv = self ._sock .recv (to_read )
1142
1168
to_read -= len (recv )
1143
1169
rc += recv
1144
- if time . monotonic () - stamp > read_timeout :
1170
+ if self . get_monotonic_time () - stamp > read_timeout :
1145
1171
raise MMQTTException (
1146
1172
f"Unable to receive { to_read } bytes within { read_timeout } seconds."
1147
1173
)
0 commit comments