20
20
from awscrt .io import LogLevel
21
21
from awscrt .mqtt import Connection , Client , QoS
22
22
from awsiot .greengrass_discovery import DiscoveryClient , DiscoverResponse
23
+ from awsiot import mqtt_connection_builder
23
24
24
25
allowed_actions = ['both' , 'publish' , 'subscribe' ]
25
26
26
27
parser = argparse .ArgumentParser ()
27
- parser .add_argument ('-r' , '--ca_file ' , action = 'store' , required = True , dest = 'root_ca_path' , help = 'Root CA file path' )
28
+ parser .add_argument ('-r' , '--root-ca ' , action = 'store' , dest = 'root_ca_path' , help = 'Root CA file path' )
28
29
parser .add_argument ('-c' , '--cert' , action = 'store' , required = True , dest = 'certificate_path' , help = 'Certificate file path' )
29
30
parser .add_argument ('-k' , '--key' , action = 'store' , required = True , dest = 'private_key_path' , help = 'Private key file path' )
30
- parser .add_argument ('-n' , '--thing_name ' , action = 'store' , required = True , dest = 'thing_name' , help = 'Targeted thing name' )
31
+ parser .add_argument ('-n' , '--thing-name ' , action = 'store' , required = True , dest = 'thing_name' , help = 'Targeted thing name' )
31
32
parser .add_argument ('-t' , '--topic' , action = 'store' , dest = 'topic' , default = 'sdk/test/Python' , help = 'Targeted topic' )
32
33
parser .add_argument ('-m' , '--mode' , action = 'store' , dest = 'mode' , default = 'both' ,
33
34
help = 'Operation modes: %s' % str (allowed_actions ))
34
35
parser .add_argument ('-M' , '--message' , action = 'store' , dest = 'message' , default = 'Hello World!' ,
35
36
help = 'Message to publish' )
36
37
parser .add_argument ('--region' , action = 'store' , dest = 'region' , default = 'us-east-1' )
37
- parser .add_argument ('--max_pub_ops ' , action = 'store' , dest = 'max_pub_ops' , default = 10 )
38
- parser .add_argument ('--print_discover_resp_only ' , action = 'store_true' , dest = 'print_discover_resp_only' , default = False )
38
+ parser .add_argument ('--max-pub-ops ' , action = 'store' , dest = 'max_pub_ops' , default = 10 )
39
+ parser .add_argument ('--print-discover-resp-only ' , action = 'store_true' , dest = 'print_discover_resp_only' , default = False )
39
40
parser .add_argument ('-v' , '--verbose' , action = 'store' , dest = 'verbosity' , default = 'NoLogs' )
40
41
41
42
args = parser .parse_args ()
54
55
io .init_logging (LogLevel .Trace , 'stderr' )
55
56
56
57
event_loop_group = io .EventLoopGroup (1 )
57
- client_bootstrap = io .ClientBootstrap (event_loop_group )
58
+ host_resolver = io .DefaultHostResolver (event_loop_group )
59
+ client_bootstrap = io .ClientBootstrap (event_loop_group , host_resolver )
58
60
59
61
tls_options = io .TlsContextOptions .create_client_with_mtls_from_path (args .certificate_path , args .private_key_path )
60
62
tls_options .override_default_trust_store_from_path (None , args .root_ca_path )
@@ -84,25 +86,17 @@ def on_connection_resumed(connection, error_code, session_present):
84
86
# Try IoT endpoints until we find one that works
85
87
def try_iot_endpoints ():
86
88
for gg_group in discover_response .gg_groups :
87
-
88
- gg_core_tls_options = io .TlsContextOptions .create_client_with_mtls_from_path (args .certificate_path , args .private_key_path )
89
- gg_core_tls_options .override_default_trust_store (bytes (gg_group .certificate_authorities [0 ], encoding = 'utf-8' ))
90
- gg_core_tls_ctx = io .ClientTlsContext (gg_core_tls_options )
91
- mqtt_client = Client (client_bootstrap , gg_core_tls_ctx )
92
-
93
89
for gg_core in gg_group .cores :
94
90
for connectivity_info in gg_core .connectivity :
95
91
try :
96
92
print ('Trying core {} at host {} port {}' .format (gg_core .thing_arn , connectivity_info .host_address , connectivity_info .port ))
97
- mqtt_connection = Connection (
98
- mqtt_client ,
99
- on_connection_interrupted = on_connection_interupted ,
100
- on_connection_resumed = on_connection_resumed )
101
- connect_future = mqtt_connection .connect (
102
- client_id = args .thing_name ,
103
- host_name = connectivity_info .host_address ,
104
- port = connectivity_info .port ,
105
- clean_session = False )
93
+ mqtt_connection = mqtt_connection_builder .mtls_from_path (endpoint = connectivity_info .host_address , port = connectivity_info .port ,
94
+ cert_filepath = args .certificate_path , pri_key_filepath = args .private_key_path , client_bootstrap = client_bootstrap ,
95
+ ca_bytes = bytes (gg_group .certificate_authorities [0 ], encoding = 'utf-8' ),
96
+ on_connection_interrupted = on_connection_interupted , on_connection_resumed = on_connection_resumed ,
97
+ client_id = args .thing_name , clean_session = False , keep_alive_secs = 6 )
98
+
99
+ connect_future = mqtt_connection .connect ()
106
100
connect_future .result ()
107
101
print ('Connected!' )
108
102
return mqtt_connection
@@ -117,9 +111,9 @@ def try_iot_endpoints():
117
111
118
112
if args .mode == 'both' or args .mode == 'subscribe' :
119
113
120
- def on_publish (topic , message ):
114
+ def on_publish (topic , payload ):
121
115
print ('Publish received on topic {}' .format (topic ))
122
- print (message )
116
+ print (payload )
123
117
124
118
subscribe_future , _ = mqtt_connection .subscribe (args .topic , QoS .AT_MOST_ONCE , on_publish )
125
119
subscribe_result = subscribe_future .result ()
0 commit comments