45
45
received_all_event = threading .Event ()
46
46
47
47
# Callback when connection is accidentally lost.
48
- def on_connection_interrupted (error_code ):
48
+ def on_connection_interrupted (connection , error_code ):
49
49
print ("Connection interrupted. error_code:{}" .format (error_code ))
50
50
51
51
# Callback when an interrupted connection is re-established.
52
- def on_connection_resumed (error_code , session_present ):
52
+ def on_connection_resumed (connection , error_code , session_present ):
53
53
print ("Connection resumed. error_code:{} session_present:{}" .format (error_code , session_present ))
54
54
55
+ if not session_present :
56
+ print ("Server did not save state. Resubscribing to existing topics..." )
57
+ resubscribe_future , _ = connection .resubscribe_existing_topics ()
58
+
59
+ # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
60
+ # evaluate result with a callback instead.
61
+ def on_resubscribed (resubscribe_future ):
62
+ try :
63
+ resubscribe_results = resubscribe_future .result ()
64
+ print ("Resubscribe complete. results:{}" .format (resubscribe_results ))
65
+ for topic , qos in resubscribe_results ['topics' ]:
66
+ assert (qos )
67
+ except Exception as e :
68
+ print ("Resubscribe failed. Exiting" , e )
69
+ exit (- 1 )
70
+
71
+ resubscribe_future .add_done_callback (on_resubscribed )
72
+
73
+
55
74
# Callback when the subscribed topic receives a message
56
75
def on_message_received (topic , message ):
57
76
print ("Received message from topic '{}': {}" .format (topic , message ))
@@ -67,7 +86,7 @@ def on_message_received(topic, message):
67
86
68
87
tls_options = io .TlsContextOptions .create_client_with_mtls_from_path (args .cert , args .key )
69
88
if args .root_ca :
70
- tls_options .override_default_trust_store_from_path (ca_path = None , ca_file = args .root_ca )
89
+ tls_options .override_default_trust_store_from_path (ca_dirpath = None , ca_filepath = args .root_ca )
71
90
tls_context = io .ClientTlsContext (tls_options )
72
91
73
92
mqtt_client = mqtt .Client (client_bootstrap , tls_context )
@@ -89,7 +108,7 @@ def on_message_received(topic, message):
89
108
port = port ,
90
109
use_websocket = False ,
91
110
clean_session = True ,
92
- keep_alive = 6000 )
111
+ keep_alive = 5 ) # DO NOT COMMIT THIS CHANGE
93
112
94
113
# Future.result() waits until a result is available
95
114
connect_future .result ()
0 commit comments