diff --git a/README.md b/README.md index fda60246..1c931fc9 100644 --- a/README.md +++ b/README.md @@ -16,12 +16,7 @@ This SDK is built on the AWS Common Runtime, a collection of libraries [4](https://github.com/awslabs/aws-c-http), [5](https://github.com/awslabs/aws-c-cal) ...) written in C to be cross-platform, high-performance, secure, and reliable. The libraries are bound -to Python by the [awscrt](https://github.com/awslabs/aws-crt-python) package. - -The awscrt package can be installed via. pip -``` -pip install awscrt -``` +to Python by the `awscrt` package ([PyPI](https://pypi.org/project/awscrt/)) ([Github](https://github.com/awslabs/aws-crt-python)). Integration with AWS IoT Services such as [Device Shadow](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html) @@ -32,12 +27,12 @@ is provided by code that been generated from a model of the service. ## Minimum Requirements * Python 3.5+ or Python 2.7+ -## Install from pypi +## Install from PyPI ``` pip install awsiotsdk ``` -## Build from source +## Install from source ``` pip install ./aws-iot-device-sdk-python-v2 ``` @@ -275,7 +270,7 @@ and receive. ## basic discovery -This sample intended for use directly with the +This sample intended for use directly with the [Getting Started with AWS IoT Greengrass](https://docs.aws.amazon.com/greengrass/latest/developerguide/gg-gs.html) guide. # License diff --git a/awsiot/__init__.py b/awsiot/__init__.py index b6e40737..975c6704 100644 --- a/awsiot/__init__.py +++ b/awsiot/__init__.py @@ -124,20 +124,21 @@ def _subscribe_operation(self, topic, qos, callback, payload_to_class_fn): `callback`. The dict comes from parsing the received message as JSON. - Returns two values. The first is a `Future` which will contain a result - of `None` when the server has acknowledged the subscription, or an - exception if the subscription fails. The second value is a topic which - may be passed to `unsubscribe()` to stop receiving messages. + Returns two values. The first is a `Future` whose result will be the + `awscrt.mqtt.QoS` granted by the server, or an exception if the + subscription fails. The second value is a topic which may be passed to + `unsubscribe()` to stop receiving messages. Note that messages may arrive before the subscription is acknowledged. """ future = Future() # type: Future try: def on_suback(suback_future): - if suback_future.exception(): - future.set_exception(suback_future.exception()) - else: - future.set_result(None) + try: + suback_result = suback_future.result() + future.set_result(suback_result['qos']) + except Exception as e: + future.set_exception(e) def callback_wrapper(topic, payload_bytes): try: diff --git a/awsiot/greengrass_discovery.py b/awsiot/greengrass_discovery.py index e2254f65..244f80af 100644 --- a/awsiot/greengrass_discovery.py +++ b/awsiot/greengrass_discovery.py @@ -12,8 +12,7 @@ # permissions and limitations under the License. from awscrt.http import HttpClientConnection, HttpRequest -from awscrt import io -from awscrt.io import ClientBootstrap, ClientTlsContext, TlsConnectionOptions, SocketOptions +from awscrt.io import ClientBootstrap, ClientTlsContext, is_alpn_available, SocketOptions, TlsConnectionOptions import awsiot from concurrent.futures import Future import json @@ -35,58 +34,59 @@ def __init__(self, bootstrap, socket_options, tls_context, region): self._tls_connection_options.set_server_name(self._gg_server_name) self.port = 8443 - if io.is_alpn_available(): - self._tls_connection_options.set_alpn_list('x-amzn-http-ca') + if is_alpn_available(): + self._tls_connection_options.set_alpn_list(['x-amzn-http-ca']) self.port = 443 def discover(self, thing_name): - ret_future = Future() - response_body = bytearray() - request = None - def on_incoming_body(response_chunk): - response_body.extend(response_chunk) + discovery = dict( + future=Future(), + response_body=bytearray()) + + def on_incoming_body(http_stream, response_chunk): + discovery['response_body'].extend(response_chunk) def on_request_complete(completion_future): - global request try: - response_code = request.response_code - # marking request as global prevents the GC from reclaiming it, - # so force it to do it here. - request = None + response_code = completion_future.result() if response_code == 200: - payload_str = response_body.decode('utf-8') + payload_str = discovery['response_body'].decode('utf-8') discover_res = DiscoverResponse.from_payload(json.loads(payload_str)) - ret_future.set_result(discover_res) - else: - ret_future.set_exception(DiscoveryException('Error during discover call: response code ={}'.format(response_code), response_code)) + discovery['future'].set_result(discover_res) + else: + discovery['future'].set_exception(DiscoveryException('Error during discover call: response_code={}'.format(response_code), response_code)) except Exception as e: - ret_future.set_exception(e) + discovery['future'].set_exception(e) def on_connection_completed(conn_future): - global request try: - connection = conn_future.result() - request = connection.make_request( - method='GET', - uri_str='/greengrass/discover/thing/{}'.format(thing_name), - outgoing_headers={'host':self._gg_server_name}, - on_outgoing_body=None, - on_incoming_body=on_incoming_body) + connection = conn_future.result() + request = HttpRequest( + method='GET', + path='/greengrass/discover/thing/{}'.format(thing_name), + headers=[('host', self._gg_server_name)]) + + http_stream = connection.request( + request=request, + on_body=on_incoming_body) - request.response_completed.add_done_callback(on_request_complete) + http_stream.completion_future.add_done_callback(on_request_complete) except Exception as e: - # marking request as global prevents the GC from reclaiming it, - # so force it to do it here. - request = None - ret_future.set_exception(e) + discovery['future'].set_exception(e) + + connect_future = HttpClientConnection.new( + host_name=self._gg_server_name, + port=self.port, + socket_options=self._socket_options, + tls_connection_options = self._tls_connection_options, + bootstrap = self._bootstrap) - connect_future = HttpClientConnection.new_connection(self._bootstrap, self._gg_server_name, self.port, self._socket_options, None, self._tls_connection_options) connect_future.add_done_callback(on_connection_completed) - - return ret_future + + return discovery['future'] class DiscoveryException(Exception): _slots_ = ['http_response_code', 'message'] @@ -102,7 +102,7 @@ class ConnectivityInfo(awsiot.ModeledClass): def ___init___(self): for slot in self.__slots__: setattr(self, slot, None) - + @classmethod def from_payload(cls, payload): # type: (typing.Dict[str, typing.Any]) -> ConnectivityInfo @@ -138,12 +138,12 @@ def from_payload(cls, payload): val = payload.get('Connectivity') if val is not None: new.connectivity = [ConnectivityInfo.from_payload(i) for i in val] - + return new class GGGroup(awsiot.ModeledClass): __slots__ = ['gg_group_id', 'cores', 'certificate_authorities'] - + def ___init___(self): for slot in self.__slots__: setattr(self, slot, None) @@ -160,9 +160,9 @@ def from_payload(cls, payload): new.cores = [GGCore.from_payload(i) for i in val] val = payload.get('CAs') if val is not None: - new.certificate_authorities = val + new.certificate_authorities = val - return new + return new class DiscoverResponse(awsiot.ModeledClass): __slots__ = ['gg_groups'] @@ -170,7 +170,7 @@ class DiscoverResponse(awsiot.ModeledClass): def ___init___(self): for slot in self.__slots__: setattr(self, slot, None) - + @classmethod def from_payload(cls, payload): # type: (typing.Dict[str, typing.Any]) -> DiscoverResponse @@ -178,5 +178,5 @@ def from_payload(cls, payload): val = payload.get('GGGroups') if val is not None: new.gg_groups = [GGGroup.from_payload(i) for i in val] - - return new + + return new diff --git a/samples/basic_discovery.py b/samples/basic_discovery.py index e1fe17e6..aa2eb3ee 100644 --- a/samples/basic_discovery.py +++ b/samples/basic_discovery.py @@ -50,7 +50,7 @@ io.init_logging(LogLevel.Info, 'stderr') elif args.verbosity.lower() == 'debug': io.init_logging(LogLevel.Debug, 'stderr') -elif args.verbosity.lower() == 'trace': +elif args.verbosity.lower() == 'trace': io.init_logging(LogLevel.Trace, 'stderr') event_loop_group = io.EventLoopGroup(1) @@ -63,53 +63,66 @@ socket_options = io.SocketOptions() socket_options.connect_timeout_ms = 3000 +print('Performing greengrass discovery...') discovery_client = DiscoveryClient(client_bootstrap, socket_options, tls_context, args.region) resp_future = discovery_client.discover(args.thing_name) -resp = resp_future.result() +discover_response = resp_future.result() +print(discover_response) if args.print_discover_resp_only: - print(resp) exit(0) -gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path) -gg_core_tls_options.override_default_trust_store(bytes(resp.gg_groups[0].certificate_authorities[0], encoding='utf-8')) -gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options) -mqtt_client = Client(client_bootstrap, gg_core_tls_ctx) - -def on_connection_interupted(error_code): +def on_connection_interupted(connection, error_code): print('connection interupted with error {}'.format(error_code)) -def on_connection_resumed(error_code, session_present): +def on_connection_resumed(connection, error_code, session_present): print('connection resumed with error {}, session present {}'.format(error_code, session_present)) -mqtt_connection = Connection(mqtt_client, on_connection_interrupted=on_connection_interupted, on_connection_resumed=on_connection_resumed) - -connection_succeeded = False -for conectivity_info in resp.gg_groups[0].cores[0].connectivity: - try: - connect_future = mqtt_connection.connect(args.thing_name, resp.gg_groups[0].cores[0].connectivity[0].host_address, resp.gg_groups[0].cores[0].connectivity[0].port, clean_session=False) - connect_future.result() - connection_succeeded = True - break - except Exception as e: - print('connection failed with exception {}'.format(e)) - continue - -if connection_succeeded != True: - print('All connection attempts for core {} failed'.format(resp.gg_groups[0].cores[0].thing_arn)) - exit(-1) +# Try IoT endpoints until we find one that works +def try_iot_endpoints(): + for gg_group in discover_response.gg_groups: + + gg_core_tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.certificate_path, args.private_key_path) + gg_core_tls_options.override_default_trust_store(bytes(gg_group.certificate_authorities[0], encoding='utf-8')) + gg_core_tls_ctx = io.ClientTlsContext(gg_core_tls_options) + mqtt_client = Client(client_bootstrap, gg_core_tls_ctx) + + for gg_core in gg_group.cores: + for connectivity_info in gg_core.connectivity: + try: + print('Trying core {} at host {} port {}'.format(gg_core.thing_arn, connectivity_info.host_address, connectivity_info.port)) + mqtt_connection = Connection( + mqtt_client, + on_connection_interrupted=on_connection_interupted, + on_connection_resumed=on_connection_resumed) + connect_future = mqtt_connection.connect( + client_id=args.thing_name, + host_name=connectivity_info.host_address, + port=connectivity_info.port, + clean_session=False) + connect_future.result() + print('Connected!') + return mqtt_connection + + except Exception as e: + print('Connection failed with exception {}'.format(e)) + continue + + exit('All connection attempts failed') + +mqtt_connection = try_iot_endpoints() if args.mode == 'both' or args.mode == 'subscribe': def on_publish(topic, message): - print('publish recieved on topic {}'.format(topic)) + print('Publish received on topic {}'.format(topic)) print(message) - subscribe_future = mqtt_connection.subscribe(args.topic, QoS.AT_MOST_ONCE, on_publish) - subscribe_future[0].result() + subscribe_future, _ = mqtt_connection.subscribe(args.topic, QoS.AT_MOST_ONCE, on_publish) + subscribe_result = subscribe_future.result() loop_count = 0 while loop_count < args.max_pub_ops: @@ -121,6 +134,6 @@ def on_publish(topic, message): pub_future = mqtt_connection.publish(args.topic, messageJson, QoS.AT_MOST_ONCE) pub_future[0].result() print('Published topic {}: {}\n'.format(args.topic, messageJson)) - + loop_count += 1 time.sleep(1) diff --git a/samples/jobs.py b/samples/jobs.py index 41870aca..eff5c89d 100644 --- a/samples/jobs.py +++ b/samples/jobs.py @@ -230,7 +230,7 @@ def on_update_job_execution_rejected(rejected): tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.cert, args.key) if args.root_ca: - tls_options.override_default_trust_store_from_path(ca_path=None, ca_file=args.root_ca) + tls_options.override_default_trust_store_from_path(ca_dirpath=None, ca_filepath=args.root_ca) tls_context = io.ClientTlsContext(tls_options) mqtt_client = mqtt.Client(client_bootstrap, tls_context) diff --git a/samples/pubsub.py b/samples/pubsub.py index ccda702a..f6017fc5 100644 --- a/samples/pubsub.py +++ b/samples/pubsub.py @@ -45,12 +45,31 @@ received_all_event = threading.Event() # Callback when connection is accidentally lost. -def on_connection_interrupted(error_code): - print("Connection interrupted. error_code:{}".format(error_code)) +def on_connection_interrupted(connection, error_code): + print("Connection interrupted. error_code: {}".format(error_code)) + # Callback when an interrupted connection is re-established. -def on_connection_resumed(error_code, session_present): - print("Connection resumed. error_code:{} session_present:{}".format(error_code, session_present)) +def on_connection_resumed(connection, error_code, session_present): + print("Connection resumed. error_code: {} session_present: {}".format(error_code, session_present)) + + if not error_code and not session_present: + print("Session did not persist. Resubscribing to existing topics...") + resubscribe_future, _ = connection.resubscribe_existing_topics() + + # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread, + # evaluate result with a callback instead. + resubscribe_future.add_done_callback(on_resubscribe_complete) + + +def on_resubscribe_complete(resubscribe_future): + resubscribe_results = resubscribe_future.result() + print("Resubscribe results: {}".format(resubscribe_results)) + + for topic, qos in resubscribe_results['topics']: + if qos is None: + exit("Server rejected resubscribe to topic: {}".format(topic)) + # Callback when the subscribed topic receives a message def on_message_received(topic, message): @@ -67,7 +86,7 @@ def on_message_received(topic, message): tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.cert, args.key) if args.root_ca: - tls_options.override_default_trust_store_from_path(ca_path=None, ca_file=args.root_ca) + tls_options.override_default_trust_store_from_path(ca_dirpath=None, ca_filepath=args.root_ca) tls_context = io.ClientTlsContext(tls_options) mqtt_client = mqtt.Client(client_bootstrap, tls_context) @@ -102,8 +121,8 @@ def on_message_received(topic, message): qos=mqtt.QoS.AT_LEAST_ONCE, callback=on_message_received) - subscribe_future.result() - print("Subscribed!") + subscribe_result = subscribe_future.result() + print("Subscribed with {}".format(str(subscribe_result['qos']))) # Publish message to server desired number of times. # This step is skipped if message is blank. diff --git a/samples/shadow.py b/samples/shadow.py index b6a83218..2f19a457 100644 --- a/samples/shadow.py +++ b/samples/shadow.py @@ -235,7 +235,7 @@ def user_input_thread_fn(): tls_options = io.TlsContextOptions.create_client_with_mtls_from_path(args.cert, args.key) if args.root_ca: - tls_options.override_default_trust_store_from_path(ca_path=None, ca_file=args.root_ca) + tls_options.override_default_trust_store_from_path(ca_dirpath=None, ca_filepath=args.root_ca) tls_context = io.ClientTlsContext(tls_options) mqtt_client = mqtt.Client(client_bootstrap, tls_context) diff --git a/setup.py b/setup.py index 484501fd..0a122f86 100644 --- a/setup.py +++ b/setup.py @@ -13,19 +13,19 @@ # express or implied. See the License for the specific language governing # permissions and limitations under the License. -from setuptools import setup, find_packages +from setuptools import setup setup( name='awsiotsdk', - version='0.2.9', + version='0.3.0', description='AWS IoT SDK based on the AWS Common Runtime', author='AWS SDK Common Runtime Team', url='https://github.com/awslabs/aws-iot-device-sdk-python-v2', - packages = find_packages(), + packages = ['awsiot'], install_requires=[ - 'awscrt==0.2.16', - 'futures; python_version == "2.7"', - 'typing; python_version <= "3.4"', + 'awscrt==0.3.3', + 'futures;python_version<"3.2"', + 'typing;python_version<"3.5"', ], python_requires='>=2.7', )