Skip to content

Commit d9fd2fb

Browse files
author
graebm
committed
Fix greengrass and discovery sample
1 parent de5ff50 commit d9fd2fb

File tree

3 files changed

+81
-68
lines changed

3 files changed

+81
-68
lines changed

awsiot/greengrass_discovery.py

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
# permissions and limitations under the License.
1313

1414
from awscrt.http import HttpClientConnection, HttpRequest
15-
from awscrt import io
16-
from awscrt.io import ClientBootstrap, ClientTlsContext, TlsConnectionOptions, SocketOptions
15+
from awscrt.io import ClientBootstrap, ClientTlsContext, is_alpn_available, SocketOptions, TlsConnectionOptions
1716
import awsiot
1817
from concurrent.futures import Future
1918
import json
@@ -35,58 +34,59 @@ def __init__(self, bootstrap, socket_options, tls_context, region):
3534
self._tls_connection_options.set_server_name(self._gg_server_name)
3635
self.port = 8443
3736

38-
if io.is_alpn_available():
39-
self._tls_connection_options.set_alpn_list('x-amzn-http-ca')
37+
if is_alpn_available():
38+
self._tls_connection_options.set_alpn_list(['x-amzn-http-ca'])
4039
self.port = 443
4140

4241
def discover(self, thing_name):
43-
ret_future = Future()
44-
response_body = bytearray()
45-
request = None
4642

47-
def on_incoming_body(response_chunk):
48-
response_body.extend(response_chunk)
43+
discovery = dict(
44+
future=Future(),
45+
response_body=bytearray())
46+
47+
def on_incoming_body(http_stream, response_chunk):
48+
discovery['response_body'].extend(response_chunk)
4949

5050
def on_request_complete(completion_future):
51-
global request
5251
try:
53-
response_code = request.response_code
54-
# marking request as global prevents the GC from reclaiming it,
55-
# so force it to do it here.
56-
request = None
52+
response_code = completion_future.result()
5753
if response_code == 200:
58-
payload_str = response_body.decode('utf-8')
54+
payload_str = discovery['response_body'].decode('utf-8')
5955
discover_res = DiscoverResponse.from_payload(json.loads(payload_str))
60-
ret_future.set_result(discover_res)
61-
else:
62-
ret_future.set_exception(DiscoveryException('Error during discover call: response code ={}'.format(response_code), response_code))
56+
discovery['future'].set_result(discover_res)
57+
else:
58+
discovery['future'].set_exception(DiscoveryException('Error during discover call: response_code={}'.format(response_code), response_code))
6359

6460
except Exception as e:
65-
ret_future.set_exception(e)
61+
discovery['future'].set_exception(e)
6662

6763
def on_connection_completed(conn_future):
68-
global request
6964
try:
70-
connection = conn_future.result()
71-
request = connection.make_request(
72-
method='GET',
73-
uri_str='/greengrass/discover/thing/{}'.format(thing_name),
74-
outgoing_headers={'host':self._gg_server_name},
75-
on_outgoing_body=None,
76-
on_incoming_body=on_incoming_body)
65+
connection = conn_future.result()
66+
request = HttpRequest(
67+
method='GET',
68+
path='/greengrass/discover/thing/{}'.format(thing_name),
69+
headers=[('host', self._gg_server_name)])
70+
71+
http_stream = connection.request(
72+
request=request,
73+
on_body=on_incoming_body)
7774

78-
request.response_completed.add_done_callback(on_request_complete)
75+
http_stream.completion_future.add_done_callback(on_request_complete)
7976

8077
except Exception as e:
81-
# marking request as global prevents the GC from reclaiming it,
82-
# so force it to do it here.
83-
request = None
84-
ret_future.set_exception(e)
78+
discovery['future'].set_exception(e)
79+
80+
connect_future = HttpClientConnection.new(
81+
host_name=self._gg_server_name,
82+
port=self.port,
83+
socket_options=self._socket_options,
84+
tls_connection_options = self._tls_connection_options,
85+
bootstrap = self._bootstrap)
8586

86-
connect_future = HttpClientConnection.new_connection(self._bootstrap, self._gg_server_name, self.port, self._socket_options, None, self._tls_connection_options)
8787
connect_future.add_done_callback(on_connection_completed)
88-
89-
return ret_future
88+
89+
return discovery['future']
9090

9191
class DiscoveryException(Exception):
9292
_slots_ = ['http_response_code', 'message']
@@ -102,7 +102,7 @@ class ConnectivityInfo(awsiot.ModeledClass):
102102
def ___init___(self):
103103
for slot in self.__slots__:
104104
setattr(self, slot, None)
105-
105+
106106
@classmethod
107107
def from_payload(cls, payload):
108108
# type: (typing.Dict[str, typing.Any]) -> ConnectivityInfo
@@ -138,12 +138,12 @@ def from_payload(cls, payload):
138138
val = payload.get('Connectivity')
139139
if val is not None:
140140
new.connectivity = [ConnectivityInfo.from_payload(i) for i in val]
141-
141+
142142
return new
143143

144144
class GGGroup(awsiot.ModeledClass):
145145
__slots__ = ['gg_group_id', 'cores', 'certificate_authorities']
146-
146+
147147
def ___init___(self):
148148
for slot in self.__slots__:
149149
setattr(self, slot, None)
@@ -160,23 +160,23 @@ def from_payload(cls, payload):
160160
new.cores = [GGCore.from_payload(i) for i in val]
161161
val = payload.get('CAs')
162162
if val is not None:
163-
new.certificate_authorities = val
163+
new.certificate_authorities = val
164164

165-
return new
165+
return new
166166

167167
class DiscoverResponse(awsiot.ModeledClass):
168168
__slots__ = ['gg_groups']
169169

170170
def ___init___(self):
171171
for slot in self.__slots__:
172172
setattr(self, slot, None)
173-
173+
174174
@classmethod
175175
def from_payload(cls, payload):
176176
# type: (typing.Dict[str, typing.Any]) -> DiscoverResponse
177177
new = cls()
178178
val = payload.get('GGGroups')
179179
if val is not None:
180180
new.gg_groups = [GGGroup.from_payload(i) for i in val]
181-
182-
return new
181+
182+
return new

samples/basic_discovery.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,15 @@
6363
socket_options = io.SocketOptions()
6464
socket_options.connect_timeout_ms = 3000
6565

66+
print('Performing greengrass discovery...')
6667
discovery_client = DiscoveryClient(client_bootstrap, socket_options, tls_context, args.region)
6768
resp_future = discovery_client.discover(args.thing_name)
68-
resp = resp_future.result()
69+
discover_response = resp_future.result()
6970

71+
print(discover_response)
7072
if args.print_discover_resp_only:
71-
print(resp)
7273
exit(0)
7374

74-
gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path)
75-
gg_core_tls_options.override_default_trust_store(bytes(resp.gg_groups[0].certificate_authorities[0], encoding='utf-8'))
76-
gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options)
77-
mqtt_client = Client(client_bootstrap, gg_core_tls_ctx)
78-
7975

8076
def on_connection_interupted(connection, error_code):
8177
print('connection interupted with error {}'.format(error_code))
@@ -85,27 +81,44 @@ def on_connection_resumed(connection, error_code, session_present):
8581
print('connection resumed with error {}, session present {}'.format(error_code, session_present))
8682

8783

88-
mqtt_connection = Connection(mqtt_client, on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed)
89-
90-
connection_succeeded = False
91-
for conectivity_info in resp.gg_groups[0].cores[0].connectivity:
92-
try:
93-
connect_future = mqtt_connection.connect(args.thing_name, resp.gg_groups[0].cores[0].connectivity[0].host_address, resp.gg_groups[0].cores[0].connectivity[0].port, clean_session=False)
94-
connect_future.result()
95-
connection_succeeded = True
96-
break
97-
except Exception as e:
98-
print('connection failed with exception {}'.format(e))
99-
continue
100-
101-
if connection_succeeded != True:
102-
print('All connection attempts for core {} failed'.format(resp.gg_groups[0].cores[0].thing_arn))
103-
exit(-1)
84+
# Try endpoints until we find one that works
85+
def connect_to_greengrass():
86+
for gg_group in discover_response.gg_groups:
87+
88+
gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path)
89+
gg_core_tls_options.override_default_trust_store(bytes(gg_group.certificate_authorities[0], encoding='utf-8'))
90+
gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options)
91+
mqtt_client = Client(client_bootstrap, gg_core_tls_ctx)
92+
93+
for gg_core in gg_group.cores:
94+
for connectivity_info in gg_core.connectivity:
95+
try:
96+
print('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port))
97+
mqtt_connection = Connection(
98+
mqtt_client,
99+
on_connection_interrupted=on_connection_interupted,
100+
on_connection_resumed=on_connection_resumed)
101+
connect_future = mqtt_connection.connect(
102+
client_id=args.thing_name,
103+
host_name=connectivity_info.host_address,
104+
port=connectivity_info.port,
105+
clean_session=False)
106+
connect_future.result()
107+
print('Connected!')
108+
return mqtt_connection
109+
110+
except Exception as e:
111+
print('Connection failed with exception {}'.format(e))
112+
continue
113+
114+
exit('All connection attempts failed')
115+
116+
mqtt_connection = connect_to_greengrass()
104117

105118
if args.mode == 'both' or args.mode == 'subscribe':
106119

107120
def on_publish(topic, message):
108-
print('publish recieved on topic {}'.format(topic))
121+
print('Publish received on topic {}'.format(topic))
109122
print(message)
110123

111124
subscribe_future = mqtt_connection.subscribe(args.topic, QoS.AT_MOST_ONCE, on_publish)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
setup(
1919
name='awsiotsdk',
20-
version='0.2.9',
20+
version='0.3.0',
2121
description='AWS IoT SDK based on the AWS Common Runtime',
2222
author='AWS SDK Common Runtime Team',
2323
url='https://github.com/awslabs/aws-iot-device-sdk-python-v2',

0 commit comments

Comments
 (0)