Skip to content

Commit ea2ed17

Browse files
authored
use latest awscrt (#27)
Update to `awscrt` 0.3.3 Adapt to API changes: - HTTP API: - had an extreme makeover, but that only affects the implementation of Greengrass Discovery, not its API. - MQTT API: - The Future returned by `subscribe()` now contains a `QoS`. It used to contain `None`. - `on_connection_interrupted` and `on_connection_resumed` callbacks have an extra `connection` argument. - IO API: - `override_default_trust_store_from_path()` arguments renamed
1 parent 496cec8 commit ea2ed17

File tree

8 files changed

+133
-105
lines changed

8 files changed

+133
-105
lines changed

README.md

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,7 @@ This SDK is built on the AWS Common Runtime, a collection of libraries
1616
[4](https://github.com/awslabs/aws-c-http),
1717
[5](https://github.com/awslabs/aws-c-cal) ...) written in C to be
1818
cross-platform, high-performance, secure, and reliable. The libraries are bound
19-
to Python by the [awscrt](https://github.com/awslabs/aws-crt-python) package.
20-
21-
The awscrt package can be installed via. pip
22-
```
23-
pip install awscrt
24-
```
19+
to Python by the `awscrt` package ([PyPI](https://pypi.org/project/awscrt/)) ([Github](https://github.com/awslabs/aws-crt-python)).
2520

2621
Integration with AWS IoT Services such as
2722
[Device Shadow](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html)
@@ -32,12 +27,12 @@ is provided by code that been generated from a model of the service.
3227
## Minimum Requirements
3328
* Python 3.5+ or Python 2.7+
3429

35-
## Install from pypi
30+
## Install from PyPI
3631
```
3732
pip install awsiotsdk
3833
```
3934

40-
## Build from source
35+
## Install from source
4136
```
4237
pip install ./aws-iot-device-sdk-python-v2
4338
```
@@ -275,7 +270,7 @@ and receive.
275270

276271
## basic discovery
277272

278-
This sample intended for use directly with the
273+
This sample intended for use directly with the
279274
[Getting Started with AWS IoT Greengrass](https://docs.aws.amazon.com/greengrass/latest/developerguide/gg-gs.html) guide.
280275

281276
# License

awsiot/__init__.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,20 +124,21 @@ def _subscribe_operation(self, topic, qos, callback, payload_to_class_fn):
124124
`callback`. The dict comes from parsing the received
125125
message as JSON.
126126
127-
Returns two values. The first is a `Future` which will contain a result
128-
of `None` when the server has acknowledged the subscription, or an
129-
exception if the subscription fails. The second value is a topic which
130-
may be passed to `unsubscribe()` to stop receiving messages.
127+
Returns two values. The first is a `Future` whose result will be the
128+
`awscrt.mqtt.QoS` granted by the server, or an exception if the
129+
subscription fails. The second value is a topic which may be passed to
130+
`unsubscribe()` to stop receiving messages.
131131
Note that messages may arrive before the subscription is acknowledged.
132132
"""
133133

134134
future = Future() # type: Future
135135
try:
136136
def on_suback(suback_future):
137-
if suback_future.exception():
138-
future.set_exception(suback_future.exception())
139-
else:
140-
future.set_result(None)
137+
try:
138+
suback_result = suback_future.result()
139+
future.set_result(suback_result['qos'])
140+
except Exception as e:
141+
future.set_exception(e)
141142

142143
def callback_wrapper(topic, payload_bytes):
143144
try:

awsiot/greengrass_discovery.py

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
# permissions and limitations under the License.
1313

1414
from awscrt.http import HttpClientConnection, HttpRequest
15-
from awscrt import io
16-
from awscrt.io import ClientBootstrap, ClientTlsContext, TlsConnectionOptions, SocketOptions
15+
from awscrt.io import ClientBootstrap, ClientTlsContext, is_alpn_available, SocketOptions, TlsConnectionOptions
1716
import awsiot
1817
from concurrent.futures import Future
1918
import json
@@ -35,58 +34,59 @@ def __init__(self, bootstrap, socket_options, tls_context, region):
3534
self._tls_connection_options.set_server_name(self._gg_server_name)
3635
self.port = 8443
3736

38-
if io.is_alpn_available():
39-
self._tls_connection_options.set_alpn_list('x-amzn-http-ca')
37+
if is_alpn_available():
38+
self._tls_connection_options.set_alpn_list(['x-amzn-http-ca'])
4039
self.port = 443
4140

4241
def discover(self, thing_name):
43-
ret_future = Future()
44-
response_body = bytearray()
45-
request = None
4642

47-
def on_incoming_body(response_chunk):
48-
response_body.extend(response_chunk)
43+
discovery = dict(
44+
future=Future(),
45+
response_body=bytearray())
46+
47+
def on_incoming_body(http_stream, response_chunk):
48+
discovery['response_body'].extend(response_chunk)
4949

5050
def on_request_complete(completion_future):
51-
global request
5251
try:
53-
response_code = request.response_code
54-
# marking request as global prevents the GC from reclaiming it,
55-
# so force it to do it here.
56-
request = None
52+
response_code = completion_future.result()
5753
if response_code == 200:
58-
payload_str = response_body.decode('utf-8')
54+
payload_str = discovery['response_body'].decode('utf-8')
5955
discover_res = DiscoverResponse.from_payload(json.loads(payload_str))
60-
ret_future.set_result(discover_res)
61-
else:
62-
ret_future.set_exception(DiscoveryException('Error during discover call: response code ={}'.format(response_code), response_code))
56+
discovery['future'].set_result(discover_res)
57+
else:
58+
discovery['future'].set_exception(DiscoveryException('Error during discover call: response_code={}'.format(response_code), response_code))
6359

6460
except Exception as e:
65-
ret_future.set_exception(e)
61+
discovery['future'].set_exception(e)
6662

6763
def on_connection_completed(conn_future):
68-
global request
6964
try:
70-
connection = conn_future.result()
71-
request = connection.make_request(
72-
method='GET',
73-
uri_str='/greengrass/discover/thing/{}'.format(thing_name),
74-
outgoing_headers={'host':self._gg_server_name},
75-
on_outgoing_body=None,
76-
on_incoming_body=on_incoming_body)
65+
connection = conn_future.result()
66+
request = HttpRequest(
67+
method='GET',
68+
path='/greengrass/discover/thing/{}'.format(thing_name),
69+
headers=[('host', self._gg_server_name)])
70+
71+
http_stream = connection.request(
72+
request=request,
73+
on_body=on_incoming_body)
7774

78-
request.response_completed.add_done_callback(on_request_complete)
75+
http_stream.completion_future.add_done_callback(on_request_complete)
7976

8077
except Exception as e:
81-
# marking request as global prevents the GC from reclaiming it,
82-
# so force it to do it here.
83-
request = None
84-
ret_future.set_exception(e)
78+
discovery['future'].set_exception(e)
79+
80+
connect_future = HttpClientConnection.new(
81+
host_name=self._gg_server_name,
82+
port=self.port,
83+
socket_options=self._socket_options,
84+
tls_connection_options = self._tls_connection_options,
85+
bootstrap = self._bootstrap)
8586

86-
connect_future = HttpClientConnection.new_connection(self._bootstrap, self._gg_server_name, self.port, self._socket_options, None, self._tls_connection_options)
8787
connect_future.add_done_callback(on_connection_completed)
88-
89-
return ret_future
88+
89+
return discovery['future']
9090

9191
class DiscoveryException(Exception):
9292
_slots_ = ['http_response_code', 'message']
@@ -102,7 +102,7 @@ class ConnectivityInfo(awsiot.ModeledClass):
102102
def ___init___(self):
103103
for slot in self.__slots__:
104104
setattr(self, slot, None)
105-
105+
106106
@classmethod
107107
def from_payload(cls, payload):
108108
# type: (typing.Dict[str, typing.Any]) -> ConnectivityInfo
@@ -138,12 +138,12 @@ def from_payload(cls, payload):
138138
val = payload.get('Connectivity')
139139
if val is not None:
140140
new.connectivity = [ConnectivityInfo.from_payload(i) for i in val]
141-
141+
142142
return new
143143

144144
class GGGroup(awsiot.ModeledClass):
145145
__slots__ = ['gg_group_id', 'cores', 'certificate_authorities']
146-
146+
147147
def ___init___(self):
148148
for slot in self.__slots__:
149149
setattr(self, slot, None)
@@ -160,23 +160,23 @@ def from_payload(cls, payload):
160160
new.cores = [GGCore.from_payload(i) for i in val]
161161
val = payload.get('CAs')
162162
if val is not None:
163-
new.certificate_authorities = val
163+
new.certificate_authorities = val
164164

165-
return new
165+
return new
166166

167167
class DiscoverResponse(awsiot.ModeledClass):
168168
__slots__ = ['gg_groups']
169169

170170
def ___init___(self):
171171
for slot in self.__slots__:
172172
setattr(self, slot, None)
173-
173+
174174
@classmethod
175175
def from_payload(cls, payload):
176176
# type: (typing.Dict[str, typing.Any]) -> DiscoverResponse
177177
new = cls()
178178
val = payload.get('GGGroups')
179179
if val is not None:
180180
new.gg_groups = [GGGroup.from_payload(i) for i in val]
181-
182-
return new
181+
182+
return new

samples/basic_discovery.py

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
io.init_logging(LogLevel.Info, 'stderr')
5151
elif args.verbosity.lower() == 'debug':
5252
io.init_logging(LogLevel.Debug, 'stderr')
53-
elif args.verbosity.lower() == 'trace':
53+
elif args.verbosity.lower() == 'trace':
5454
io.init_logging(LogLevel.Trace, 'stderr')
5555

5656
event_loop_group = io.EventLoopGroup(1)
@@ -63,53 +63,66 @@
6363
socket_options = io.SocketOptions()
6464
socket_options.connect_timeout_ms = 3000
6565

66+
print('Performing greengrass discovery...')
6667
discovery_client = DiscoveryClient(client_bootstrap, socket_options, tls_context, args.region)
6768
resp_future = discovery_client.discover(args.thing_name)
68-
resp = resp_future.result()
69+
discover_response = resp_future.result()
6970

71+
print(discover_response)
7072
if args.print_discover_resp_only:
71-
print(resp)
7273
exit(0)
7374

74-
gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path)
75-
gg_core_tls_options.override_default_trust_store(bytes(resp.gg_groups[0].certificate_authorities[0], encoding='utf-8'))
76-
gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options)
77-
mqtt_client = Client(client_bootstrap, gg_core_tls_ctx)
7875

79-
80-
def on_connection_interupted(error_code):
76+
def on_connection_interupted(connection, error_code):
8177
print('connection interupted with error {}'.format(error_code))
8278

8379

84-
def on_connection_resumed(error_code, session_present):
80+
def on_connection_resumed(connection, error_code, session_present):
8581
print('connection resumed with error {}, session present {}'.format(error_code, session_present))
8682

8783

88-
mqtt_connection = Connection(mqtt_client, on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed)
89-
90-
connection_succeeded = False
91-
for conectivity_info in resp.gg_groups[0].cores[0].connectivity:
92-
try:
93-
connect_future = mqtt_connection.connect(args.thing_name, resp.gg_groups[0].cores[0].connectivity[0].host_address, resp.gg_groups[0].cores[0].connectivity[0].port, clean_session=False)
94-
connect_future.result()
95-
connection_succeeded = True
96-
break
97-
except Exception as e:
98-
print('connection failed with exception {}'.format(e))
99-
continue
100-
101-
if connection_succeeded != True:
102-
print('All connection attempts for core {} failed'.format(resp.gg_groups[0].cores[0].thing_arn))
103-
exit(-1)
84+
# Try IoT endpoints until we find one that works
85+
def try_iot_endpoints():
86+
for gg_group in discover_response.gg_groups:
87+
88+
gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path)
89+
gg_core_tls_options.override_default_trust_store(bytes(gg_group.certificate_authorities[0], encoding='utf-8'))
90+
gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options)
91+
mqtt_client = Client(client_bootstrap, gg_core_tls_ctx)
92+
93+
for gg_core in gg_group.cores:
94+
for connectivity_info in gg_core.connectivity:
95+
try:
96+
print('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port))
97+
mqtt_connection = Connection(
98+
mqtt_client,
99+
on_connection_interrupted=on_connection_interupted,
100+
on_connection_resumed=on_connection_resumed)
101+
connect_future = mqtt_connection.connect(
102+
client_id=args.thing_name,
103+
host_name=connectivity_info.host_address,
104+
port=connectivity_info.port,
105+
clean_session=False)
106+
connect_future.result()
107+
print('Connected!')
108+
return mqtt_connection
109+
110+
except Exception as e:
111+
print('Connection failed with exception {}'.format(e))
112+
continue
113+
114+
exit('All connection attempts failed')
115+
116+
mqtt_connection = try_iot_endpoints()
104117

105118
if args.mode == 'both' or args.mode == 'subscribe':
106119

107120
def on_publish(topic, message):
108-
print('publish recieved on topic {}'.format(topic))
121+
print('Publish received on topic {}'.format(topic))
109122
print(message)
110123

111-
subscribe_future = mqtt_connection.subscribe(args.topic, QoS.AT_MOST_ONCE, on_publish)
112-
subscribe_future[0].result()
124+
subscribe_future, _ = mqtt_connection.subscribe(args.topic, QoS.AT_MOST_ONCE, on_publish)
125+
subscribe_result = subscribe_future.result()
113126

114127
loop_count = 0
115128
while loop_count < args.max_pub_ops:
@@ -121,6 +134,6 @@ def on_publish(topic, message):
121134
pub_future = mqtt_connection.publish(args.topic, messageJson, QoS.AT_MOST_ONCE)
122135
pub_future[0].result()
123136
print('Published topic {}: {}\n'.format(args.topic, messageJson))
124-
137+
125138
loop_count += 1
126139
time.sleep(1)

samples/jobs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ def on_update_job_execution_rejected(rejected):
230230

231231
tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.cert, args.key)
232232
if args.root_ca:
233-
tls_options.override_default_trust_store_from_path(ca_path=None, ca_file=args.root_ca)
233+
tls_options.override_default_trust_store_from_path(ca_dirpath=None, ca_filepath=args.root_ca)
234234
tls_context = io.ClientTlsContext(tls_options)
235235

236236
mqtt_client = mqtt.Client(client_bootstrap, tls_context)

0 commit comments

Comments
 (0)