Skip to content

Commit e8d9a8b

Browse files
committed
exponential backoff for (re)connect
fixes adafruit#10
1 parent 729c77a commit e8d9a8b

File tree

1 file changed

+145
-34
lines changed

1 file changed

+145
-34
lines changed

adafruit_minimqtt/adafruit_minimqtt.py

+145-34
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class MQTT:
142142
143143
"""
144144

145-
# pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member
145+
# pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements, not-callable, invalid-name, no-member
146146
def __init__(
147147
self,
148148
*,
@@ -184,6 +184,12 @@ def __init__(
184184
self._timestamp = 0
185185
self.logger = None
186186

187+
self._reconnect_timeout = 0
188+
self._reconnect_attempt = 0
189+
self._reconnect_time = None
190+
self._reconnect_maximum_backoff = 32
191+
self._reconnect_attempt_max = 8
192+
187193
self.broker = broker
188194
self._username = username
189195
self._password = password
@@ -268,39 +274,38 @@ def _get_connect_socket(self, host, port, *, timeout=1):
268274
host, port, 0, self._socket_pool.SOCK_STREAM
269275
)[0]
270276

271-
sock = None
272-
retry_count = 0
273-
last_exception = None
274-
while retry_count < self._connect_retries and sock is None:
275-
retry_count += 1
276-
277-
try:
278-
sock = self._socket_pool.socket(addr_info[0], addr_info[1])
279-
except OSError:
280-
continue
277+
try:
278+
sock = self._socket_pool.socket(addr_info[0], addr_info[1])
279+
except OSError:
280+
# Do not consider this for back-off.
281+
if self.logger is not None:
282+
self.logger.warning(
283+
f"Failed to created socket for host {addr_info[0]} and port {addr_info[1]}"
284+
)
285+
return None
281286

282-
connect_host = addr_info[-1][0]
283-
if port == MQTT_TLS_PORT:
284-
sock = self._ssl_context.wrap_socket(sock, server_hostname=host)
285-
connect_host = host
286-
sock.settimeout(timeout)
287+
connect_host = addr_info[-1][0]
288+
if port == MQTT_TLS_PORT:
289+
sock = self._ssl_context.wrap_socket(sock, server_hostname=host)
290+
connect_host = host
291+
sock.settimeout(timeout)
287292

288-
try:
289-
sock.connect((connect_host, port))
290-
except MemoryError as exc:
291-
sock.close()
292-
sock = None
293-
last_exception = exc
294-
except OSError as exc:
295-
sock.close()
296-
sock = None
297-
last_exception = exc
298-
299-
if sock is None:
300-
if last_exception:
301-
raise RuntimeError("Repeated socket failures") from last_exception
293+
last_exception = None
294+
try:
295+
sock.connect((connect_host, port))
296+
except MemoryError as exc:
297+
sock.close()
298+
sock = None
299+
# Do not consider this for back-off.
300+
if self.logger is not None:
301+
self.logger.warning(f"Failed to allocate memory for connect: {exc}")
302+
except OSError as exc:
303+
sock.close()
304+
sock = None
305+
last_exception = exc
302306

303-
raise RuntimeError("Repeated socket failures")
307+
if last_exception:
308+
raise last_exception
304309

305310
self._backwards_compatible_sock = not hasattr(sock, "recv_into")
306311
return sock
@@ -418,8 +423,59 @@ def username_pw_set(self, username, password=None):
418423
if password is not None:
419424
self._password = password
420425

421-
# pylint: disable=too-many-branches, too-many-statements, too-many-locals
422426
def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
427+
"""Initiates connection with the MQTT Broker. Will perform exponential back-off
428+
on connect failures.
429+
430+
:param bool clean_session: Establishes a persistent session.
431+
:param str host: Hostname or IP address of the remote broker.
432+
:param int port: Network port of the remote broker.
433+
:param int keep_alive: Maximum period allowed for communication, in seconds.
434+
435+
"""
436+
437+
ret = None
438+
last_exception = None
439+
for i in range(0, self._reconnect_attempt_max):
440+
# If the last call to self._connect() returned None,
441+
# this means no back-off should be done.
442+
if ret and i > 0:
443+
try:
444+
self._recompute_reconnect() # Will raise MMQTTException on max attempts.
445+
except MMQTTException:
446+
break
447+
if self.logger is not None:
448+
self.logger.debug(
449+
f"Attempting to connect to MQTT broker (attempt #{self._reconnect_attempt}"
450+
)
451+
452+
try:
453+
ret = self._connect(
454+
clean_session=clean_session,
455+
host=host,
456+
port=port,
457+
keep_alive=keep_alive,
458+
)
459+
except OSError as e:
460+
last_exception = e
461+
if self.logger is not None:
462+
self.logger.info(f"failed to connect: {e}")
463+
except MMQTTException as e:
464+
last_exception = e
465+
if self.logger is not None:
466+
self.logger.info(f"MMQT error: {e}")
467+
468+
# TODO: are these indeed repeated ?
469+
if not ret:
470+
if last_exception:
471+
raise MMQTTException("Repeated connect failures") from last_exception
472+
473+
raise MMQTTException("Repeated connect failures")
474+
475+
return ret
476+
477+
# pylint: disable=too-many-branches, too-many-statements, too-many-locals
478+
def _connect(self, clean_session=True, host=None, port=None, keep_alive=None):
423479
"""Initiates connection with the MQTT Broker.
424480
425481
:param bool clean_session: Establishes a persistent session.
@@ -438,10 +494,18 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
438494
if self.logger is not None:
439495
self.logger.debug("Attempting to establish MQTT connection...")
440496

497+
if self._reconnect_attempt > 0:
498+
self.logger.debug(
499+
f"Sleeping for {self._reconnect_timeout:.3} seconds due to connect back-off"
500+
)
501+
time.sleep(self._reconnect_timeout)
502+
441503
# Get a new socket
442504
self._sock = self._get_connect_socket(
443505
self.broker, self.port, timeout=self._socket_timeout
444506
)
507+
if self._sock is None:
508+
return None
445509

446510
# Fixed Header
447511
fixed_header = bytearray([0x10])
@@ -521,6 +585,8 @@ def connect(self, clean_session=True, host=None, port=None, keep_alive=None):
521585
result = rc[0] & 1
522586
if self.on_connect is not None:
523587
self.on_connect(self, self._user_data, result, rc[2])
588+
self._reconnect_attempt = 0
589+
self._reconnect_time = None
524590
return result
525591

526592
if op is None:
@@ -782,15 +848,58 @@ def unsubscribe(self, topic):
782848
f"No data received from broker for {self._recv_timeout} seconds."
783849
)
784850

851+
def _recompute_reconnect(self):
852+
"""
853+
Recompute the reconnection timeout. The self._reconnect_timeout will be used
854+
in self._connect() to perform the actual sleep. Also, the monotonic time
855+
of last reconnect will be stored in self._reconnect_time.
856+
857+
Raise MMQTTException on maximum number of reconnect attempts reached.
858+
859+
"""
860+
self._reconnect_attempt = self._reconnect_attempt + 1
861+
if self._reconnect_attempt > self._reconnect_attempt_max:
862+
raise MMQTTException(
863+
f"Maximum number of reconnect attempts ({self._reconnect_attempt_max}) reached"
864+
)
865+
866+
self._reconnect_timeout = self._reconnect_attempt**2 + randint(0, 1000) / 1000
867+
time_diff = time.monotonic() - self._reconnect_time
868+
if self._reconnect_time and time_diff > 0:
869+
if self.logger is not None:
870+
self.logger.debug(f"Reducing reconnect timeout by {time_diff} seconds")
871+
self._reconnect_timeout = self._reconnect_timeout - time_diff
872+
873+
if self._reconnect_timeout > self._reconnect_maximum_backoff:
874+
if self.logger is not None:
875+
self.logger.debug(
876+
f"Truncating reconnect timeout to {self._reconnect_maximum_backoff:.3} seconds"
877+
)
878+
self._reconnect_timeout = self._reconnect_maximum_backoff
879+
880+
self._reconnect_time = time.monotonic()
881+
785882
def reconnect(self, resub_topics=True):
786883
"""Attempts to reconnect to the MQTT broker.
884+
Return the value from connect() if successful.
885+
TODO: describe behavior if already connected
787886
788887
:param bool resub_topics: Resubscribe to previously subscribed topics.
789888
790889
"""
890+
891+
# TODO: disconnect() if connected ?
892+
893+
# TODO: subtract the time spent between the calls ? (or use for debugging)
894+
self._recompute_reconnect()
791895
if self.logger is not None:
792-
self.logger.debug("Attempting to reconnect with MQTT broker")
793-
self.connect()
896+
self.logger.debug(
897+
f"Attempting to reconnect with MQTT broker (attempt #{self._reconnect_attempt}"
898+
)
899+
ret = self._connect() # Will raise MMQTTException on failure.
900+
if ret is None:
901+
# TODO
902+
return None
794903
if self.logger is not None:
795904
self.logger.debug("Reconnected with broker")
796905
if resub_topics:
@@ -804,6 +913,8 @@ def reconnect(self, resub_topics=True):
804913
feed = subscribed_topics.pop()
805914
self.subscribe(feed)
806915

916+
return ret
917+
807918
def loop(self, timeout=0):
808919
# pylint: disable = too-many-return-statements
809920
"""Non-blocking message loop. Use this method to

0 commit comments

Comments
 (0)