Skip to content

use latest awscrt #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@ This SDK is built on the AWS Common Runtime, a collection of libraries
[4](https://github.com/awslabs/aws-c-http),
[5](https://github.com/awslabs/aws-c-cal) ...) written in C to be
cross-platform, high-performance, secure, and reliable. The libraries are bound
to Python by the [awscrt](https://github.com/awslabs/aws-crt-python) package.

The awscrt package can be installed via. pip
```
pip install awscrt
```
to Python by the `awscrt` package ([PyPI](https://pypi.org/project/awscrt/)) ([Github](https://github.com/awslabs/aws-crt-python)).

Integration with AWS IoT Services such as
[Device Shadow](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html)
Expand All @@ -32,12 +27,12 @@ is provided by code that been generated from a model of the service.
## Minimum Requirements
* Python 3.5+ or Python 2.7+

## Install from pypi
## Install from PyPI
```
pip install awsiotsdk
```

## Build from source
## Install from source
```
pip install ./aws-iot-device-sdk-python-v2
```
Expand Down Expand Up @@ -275,7 +270,7 @@ and receive.

## basic discovery

This sample intended for use directly with the
This sample intended for use directly with the
[Getting Started with AWS IoT Greengrass](https://docs.aws.amazon.com/greengrass/latest/developerguide/gg-gs.html) guide.

# License
Expand Down
17 changes: 9 additions & 8 deletions awsiot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,21 @@ def _subscribe_operation(self, topic, qos, callback, payload_to_class_fn):
`callback`. The dict comes from parsing the received
message as JSON.

Returns two values. The first is a `Future` which will contain a result
of `None` when the server has acknowledged the subscription, or an
exception if the subscription fails. The second value is a topic which
may be passed to `unsubscribe()` to stop receiving messages.
Returns two values. The first is a `Future` whose result will be the
`awscrt.mqtt.QoS` granted by the server, or an exception if the
subscription fails. The second value is a topic which may be passed to
`unsubscribe()` to stop receiving messages.
Note that messages may arrive before the subscription is acknowledged.
"""

future = Future() # type: Future
try:
def on_suback(suback_future):
if suback_future.exception():
future.set_exception(suback_future.exception())
else:
future.set_result(None)
try:
suback_result = suback_future.result()
future.set_result(suback_result['qos'])
except Exception as e:
future.set_exception(e)

def callback_wrapper(topic, payload_bytes):
try:
Expand Down
86 changes: 43 additions & 43 deletions awsiot/greengrass_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# permissions and limitations under the License.

from awscrt.http import HttpClientConnection, HttpRequest
from awscrt import io
from awscrt.io import ClientBootstrap, ClientTlsContext, TlsConnectionOptions, SocketOptions
from awscrt.io import ClientBootstrap, ClientTlsContext, is_alpn_available, SocketOptions, TlsConnectionOptions
import awsiot
from concurrent.futures import Future
import json
Expand All @@ -35,58 +34,59 @@ def __init__(self, bootstrap, socket_options, tls_context, region):
self._tls_connection_options.set_server_name(self._gg_server_name)
self.port = 8443

if io.is_alpn_available():
self._tls_connection_options.set_alpn_list('x-amzn-http-ca')
if is_alpn_available():
self._tls_connection_options.set_alpn_list(['x-amzn-http-ca'])
self.port = 443

def discover(self, thing_name):
ret_future = Future()
response_body = bytearray()
request = None

def on_incoming_body(response_chunk):
response_body.extend(response_chunk)
discovery = dict(
future=Future(),
response_body=bytearray())

def on_incoming_body(http_stream, response_chunk):
discovery['response_body'].extend(response_chunk)

def on_request_complete(completion_future):
global request
try:
response_code = request.response_code
# marking request as global prevents the GC from reclaiming it,
# so force it to do it here.
request = None
response_code = completion_future.result()
if response_code == 200:
payload_str = response_body.decode('utf-8')
payload_str = discovery['response_body'].decode('utf-8')
discover_res = DiscoverResponse.from_payload(json.loads(payload_str))
ret_future.set_result(discover_res)
else:
ret_future.set_exception(DiscoveryException('Error during discover call: response code ={}'.format(response_code), response_code))
discovery['future'].set_result(discover_res)
else:
discovery['future'].set_exception(DiscoveryException('Error during discover call: response_code={}'.format(response_code), response_code))

except Exception as e:
ret_future.set_exception(e)
discovery['future'].set_exception(e)

def on_connection_completed(conn_future):
global request
try:
connection = conn_future.result()
request = connection.make_request(
method='GET',
uri_str='/greengrass/discover/thing/{}'.format(thing_name),
outgoing_headers={'host':self._gg_server_name},
on_outgoing_body=None,
on_incoming_body=on_incoming_body)
connection = conn_future.result()
request = HttpRequest(
method='GET',
path='/greengrass/discover/thing/{}'.format(thing_name),
headers=[('host', self._gg_server_name)])

http_stream = connection.request(
request=request,
on_body=on_incoming_body)

request.response_completed.add_done_callback(on_request_complete)
http_stream.completion_future.add_done_callback(on_request_complete)

except Exception as e:
# marking request as global prevents the GC from reclaiming it,
# so force it to do it here.
request = None
ret_future.set_exception(e)
discovery['future'].set_exception(e)

connect_future = HttpClientConnection.new(
host_name=self._gg_server_name,
port=self.port,
socket_options=self._socket_options,
tls_connection_options = self._tls_connection_options,
bootstrap = self._bootstrap)

connect_future = HttpClientConnection.new_connection(self._bootstrap, self._gg_server_name, self.port, self._socket_options, None, self._tls_connection_options)
connect_future.add_done_callback(on_connection_completed)
return ret_future

return discovery['future']

class DiscoveryException(Exception):
_slots_ = ['http_response_code', 'message']
Expand All @@ -102,7 +102,7 @@ class ConnectivityInfo(awsiot.ModeledClass):
def ___init___(self):
for slot in self.__slots__:
setattr(self, slot, None)

@classmethod
def from_payload(cls, payload):
# type: (typing.Dict[str, typing.Any]) -> ConnectivityInfo
Expand Down Expand Up @@ -138,12 +138,12 @@ def from_payload(cls, payload):
val = payload.get('Connectivity')
if val is not None:
new.connectivity = [ConnectivityInfo.from_payload(i) for i in val]

return new

class GGGroup(awsiot.ModeledClass):
__slots__ = ['gg_group_id', 'cores', 'certificate_authorities']

def ___init___(self):
for slot in self.__slots__:
setattr(self, slot, None)
Expand All @@ -160,23 +160,23 @@ def from_payload(cls, payload):
new.cores = [GGCore.from_payload(i) for i in val]
val = payload.get('CAs')
if val is not None:
new.certificate_authorities = val
new.certificate_authorities = val

return new
return new

class DiscoverResponse(awsiot.ModeledClass):
__slots__ = ['gg_groups']

def ___init___(self):
for slot in self.__slots__:
setattr(self, slot, None)

@classmethod
def from_payload(cls, payload):
# type: (typing.Dict[str, typing.Any]) -> DiscoverResponse
new = cls()
val = payload.get('GGGroups')
if val is not None:
new.gg_groups = [GGGroup.from_payload(i) for i in val]
return new

return new
73 changes: 43 additions & 30 deletions samples/basic_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
io.init_logging(LogLevel.Info, 'stderr')
elif args.verbosity.lower() == 'debug':
io.init_logging(LogLevel.Debug, 'stderr')
elif args.verbosity.lower() == 'trace':
elif args.verbosity.lower() == 'trace':
io.init_logging(LogLevel.Trace, 'stderr')

event_loop_group = io.EventLoopGroup(1)
Expand All @@ -63,53 +63,66 @@
socket_options = io.SocketOptions()
socket_options.connect_timeout_ms = 3000

print('Performing greengrass discovery...')
discovery_client = DiscoveryClient(client_bootstrap, socket_options, tls_context, args.region)
resp_future = discovery_client.discover(args.thing_name)
resp = resp_future.result()
discover_response = resp_future.result()

print(discover_response)
if args.print_discover_resp_only:
print(resp)
exit(0)

gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path)
gg_core_tls_options.override_default_trust_store(bytes(resp.gg_groups[0].certificate_authorities[0], encoding='utf-8'))
gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options)
mqtt_client = Client(client_bootstrap, gg_core_tls_ctx)


def on_connection_interupted(error_code):
def on_connection_interupted(connection, error_code):
print('connection interupted with error {}'.format(error_code))


def on_connection_resumed(error_code, session_present):
def on_connection_resumed(connection, error_code, session_present):
print('connection resumed with error {}, session present {}'.format(error_code, session_present))


mqtt_connection = Connection(mqtt_client, on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed)

connection_succeeded = False
for conectivity_info in resp.gg_groups[0].cores[0].connectivity:
try:
connect_future = mqtt_connection.connect(args.thing_name, resp.gg_groups[0].cores[0].connectivity[0].host_address, resp.gg_groups[0].cores[0].connectivity[0].port, clean_session=False)
connect_future.result()
connection_succeeded = True
break
except Exception as e:
print('connection failed with exception {}'.format(e))
continue

if connection_succeeded != True:
print('All connection attempts for core {} failed'.format(resp.gg_groups[0].cores[0].thing_arn))
exit(-1)
# Try IoT endpoints until we find one that works
def try_iot_endpoints():
for gg_group in discover_response.gg_groups:

gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path)
gg_core_tls_options.override_default_trust_store(bytes(gg_group.certificate_authorities[0], encoding='utf-8'))
gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options)
mqtt_client = Client(client_bootstrap, gg_core_tls_ctx)

for gg_core in gg_group.cores:
for connectivity_info in gg_core.connectivity:
try:
print('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port))
mqtt_connection = Connection(
mqtt_client,
on_connection_interrupted=on_connection_interupted,
on_connection_resumed=on_connection_resumed)
connect_future = mqtt_connection.connect(
client_id=args.thing_name,
host_name=connectivity_info.host_address,
port=connectivity_info.port,
clean_session=False)
connect_future.result()
print('Connected!')
return mqtt_connection

except Exception as e:
print('Connection failed with exception {}'.format(e))
continue

exit('All connection attempts failed')

mqtt_connection = try_iot_endpoints()

if args.mode == 'both' or args.mode == 'subscribe':

def on_publish(topic, message):
print('publish recieved on topic {}'.format(topic))
print('Publish received on topic {}'.format(topic))
print(message)

subscribe_future = mqtt_connection.subscribe(args.topic, QoS.AT_MOST_ONCE, on_publish)
subscribe_future[0].result()
subscribe_future, _ = mqtt_connection.subscribe(args.topic, QoS.AT_MOST_ONCE, on_publish)
subscribe_result = subscribe_future.result()

loop_count = 0
while loop_count < args.max_pub_ops:
Expand All @@ -121,6 +134,6 @@ def on_publish(topic, message):
pub_future = mqtt_connection.publish(args.topic, messageJson, QoS.AT_MOST_ONCE)
pub_future[0].result()
print('Published topic {}: {}\n'.format(args.topic, messageJson))

loop_count += 1
time.sleep(1)
2 changes: 1 addition & 1 deletion samples/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def on_update_job_execution_rejected(rejected):

tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.cert, args.key)
if args.root_ca:
tls_options.override_default_trust_store_from_path(ca_path=None, ca_file=args.root_ca)
tls_options.override_default_trust_store_from_path(ca_dirpath=None, ca_filepath=args.root_ca)
tls_context = io.ClientTlsContext(tls_options)

mqtt_client = mqtt.Client(client_bootstrap, tls_context)
Expand Down
Loading