diff --git a/README.md b/README.md index 1a21ef53..446e6ec0 100644 --- a/README.md +++ b/README.md @@ -276,6 +276,75 @@ and receive. +## fleet provisioning + +This sample uses the AWS IoT +[Fleet provisioning](https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html) +to provision devices using either a CSR or KeysAndcertificate and subsequently calls RegisterThing. + +On startup, the script subscribes to topics based on the request type of either CSR or Keys topics, +publishes the request to corresponding topic and calls RegisterThing. + +Source: `samples/fleetprovisioning.py` + +Run the sample using createKeysAndCertificate: +``` +python fleetprovisioning.py --endpoint --root-ca --cert --key --thing-name --templateName --templateParameters +``` + +Run the sample using createCertificateFromCsr: +``` +python fleetprovisioning.py --endpoint --root-ca --cert --key --thing-name --templateName --templateParameters --csr +``` + +Your Thing's +[Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) +must provide privileges for this sample to connect, subscribe, publish, +and receive. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Publish"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Receive",
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json/rejected",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json/rejected",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json/rejected"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/samples-client-id"
+    }
+  ]
+}
+
+
+ + ## basic discovery This sample intended for use directly with the diff --git a/awsiot/iotidentity.py b/awsiot/iotidentity.py new file mode 100644 index 00000000..661542e0 --- /dev/null +++ b/awsiot/iotidentity.py @@ -0,0 +1,414 @@ +# Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +# This file is generated + +import awsiot +import concurrent.futures +import typing + +class IotIdentityClient(awsiot.MqttServiceClient): + + def publish_create_certificate_from_csr(self, request, qos): + # type: (CreateCertificateFromCsrRequest, int) -> concurrent.futures.Future + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `CreateCertificateFromCsrRequest` instance. + qos - The Quality of Service guarantee of this message + + Returns a concurrent.futures.Future, whose result will be None if the + request is successfully published. The Future's result will be an + exception if the request cannot be published. + """ + + return self._publish_operation( + topic='$aws/certificates/create-from-csr/json', + qos=qos, + payload=request.to_payload()) + + def publish_create_keys_and_certificate(self, request, qos): + # type: (CreateKeysAndCertificateRequest, int) -> concurrent.futures.Future + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `CreateKeysAndCertificateRequest` instance. + qos - The Quality of Service guarantee of this message + + Returns a concurrent.futures.Future, whose result will be None if the + request is successfully published. The Future's result will be an + exception if the request cannot be published. + """ + + return self._publish_operation( + topic='$aws/certificates/create/json', + qos=qos, + payload=None) + + def publish_register_thing(self, request, qos): + # type: (RegisterThingRequest, int) -> concurrent.futures.Future + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `RegisterThingRequest` instance. + qos - The Quality of Service guarantee of this message + + Returns a concurrent.futures.Future, whose result will be None if the + request is successfully published. The Future's result will be an + exception if the request cannot be published. + """ + if not request.template_name: + raise ValueError("request.template_name is required") + + return self._publish_operation( + topic='$aws/provisioning-templates/{0.template_name}/provision/json'.format(request), + qos=qos, + payload=request.to_payload()) + + def subscribe_to_create_certificate_from_csr_accepted(self, request, qos, callback): + # type: (CreateCertificateFromCsrSubscriptionRequest, int, typing.Callable[[CreateCertificateFromCsrResponse], None]) -> typing.Tuple[concurrent.futures.Future, str] + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `CreateCertificateFromCsrSubscriptionRequest` instance. + qos - The Quality of Service guarantee of this message + callback - Callback to invoke each time the event is received. + The callback should take 1 argument of type `CreateCertificateFromCsrResponse`. + The callback is not expected to return anything. + + Returns two values immediately. The first is a `concurrent.futures.Future` + which will contain a result of `None` when the server has acknowledged + the subscription, or an exception if the subscription fails. The second + value is a topic which may be passed to `unsubscribe()` to stop + receiving messages. Note that messages may arrive before the + subscription is acknowledged. + """ + + if not callable(callback): + raise ValueError("callback is required") + + return self._subscribe_operation( + topic='$aws/certificates/create-from-csr/json/accepted', + qos=qos, + callback=callback, + payload_to_class_fn=CreateCertificateFromCsrResponse.from_payload) + + def subscribe_to_create_certificate_from_csr_rejected(self, request, qos, callback): + # type: (CreateCertificateFromCsrSubscriptionRequest, int, typing.Callable[[ErrorResponse], None]) -> typing.Tuple[concurrent.futures.Future, str] + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `CreateCertificateFromCsrSubscriptionRequest` instance. + qos - The Quality of Service guarantee of this message + callback - Callback to invoke each time the event is received. + The callback should take 1 argument of type `ErrorResponse`. + The callback is not expected to return anything. + + Returns two values immediately. The first is a `concurrent.futures.Future` + which will contain a result of `None` when the server has acknowledged + the subscription, or an exception if the subscription fails. The second + value is a topic which may be passed to `unsubscribe()` to stop + receiving messages. Note that messages may arrive before the + subscription is acknowledged. + """ + + if not callable(callback): + raise ValueError("callback is required") + + return self._subscribe_operation( + topic='$aws/certificates/create-from-csr/json/rejected', + qos=qos, + callback=callback, + payload_to_class_fn=ErrorResponse.from_payload) + + def subscribe_to_create_keys_and_certificate_accepted(self, request, qos, callback): + # type: (CreateKeysAndCertificateSubscriptionRequest, int, typing.Callable[[CreateKeysAndCertificateResponse], None]) -> typing.Tuple[concurrent.futures.Future, str] + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `CreateKeysAndCertificateSubscriptionRequest` instance. + qos - The Quality of Service guarantee of this message + callback - Callback to invoke each time the event is received. + The callback should take 1 argument of type `CreateKeysAndCertificateResponse`. + The callback is not expected to return anything. + + Returns two values immediately. The first is a `concurrent.futures.Future` + which will contain a result of `None` when the server has acknowledged + the subscription, or an exception if the subscription fails. The second + value is a topic which may be passed to `unsubscribe()` to stop + receiving messages. Note that messages may arrive before the + subscription is acknowledged. + """ + + if not callable(callback): + raise ValueError("callback is required") + + return self._subscribe_operation( + topic='$aws/certificates/create/json/accepted', + qos=qos, + callback=callback, + payload_to_class_fn=CreateKeysAndCertificateResponse.from_payload) + + def subscribe_to_create_keys_and_certificate_rejected(self, request, qos, callback): + # type: (CreateKeysAndCertificateSubscriptionRequest, int, typing.Callable[[ErrorResponse], None]) -> typing.Tuple[concurrent.futures.Future, str] + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `CreateKeysAndCertificateSubscriptionRequest` instance. + qos - The Quality of Service guarantee of this message + callback - Callback to invoke each time the event is received. + The callback should take 1 argument of type `ErrorResponse`. + The callback is not expected to return anything. + + Returns two values immediately. The first is a `concurrent.futures.Future` + which will contain a result of `None` when the server has acknowledged + the subscription, or an exception if the subscription fails. The second + value is a topic which may be passed to `unsubscribe()` to stop + receiving messages. Note that messages may arrive before the + subscription is acknowledged. + """ + + if not callable(callback): + raise ValueError("callback is required") + + return self._subscribe_operation( + topic='$aws/certificates/create/json/rejected', + qos=qos, + callback=callback, + payload_to_class_fn=ErrorResponse.from_payload) + + def subscribe_to_register_thing_accepted(self, request, qos, callback): + # type: (RegisterThingSubscriptionRequest, int, typing.Callable[[RegisterThingResponse], None]) -> typing.Tuple[concurrent.futures.Future, str] + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `RegisterThingSubscriptionRequest` instance. + qos - The Quality of Service guarantee of this message + callback - Callback to invoke each time the event is received. + The callback should take 1 argument of type `RegisterThingResponse`. + The callback is not expected to return anything. + + Returns two values immediately. The first is a `concurrent.futures.Future` + which will contain a result of `None` when the server has acknowledged + the subscription, or an exception if the subscription fails. The second + value is a topic which may be passed to `unsubscribe()` to stop + receiving messages. Note that messages may arrive before the + subscription is acknowledged. + """ + if not request.template_name: + raise ValueError("request.template_name is required") + + if not callable(callback): + raise ValueError("callback is required") + + return self._subscribe_operation( + topic='$aws/provisioning-templates/{0.template_name}/provision/json/accepted'.format(request), + qos=qos, + callback=callback, + payload_to_class_fn=RegisterThingResponse.from_payload) + + def subscribe_to_register_thing_rejected(self, request, qos, callback): + # type: (RegisterThingSubscriptionRequest, int, typing.Callable[[ErrorResponse], None]) -> typing.Tuple[concurrent.futures.Future, str] + """ + API Docs: https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html#fleet-provision-api + + Parameters: + request - `RegisterThingSubscriptionRequest` instance. + qos - The Quality of Service guarantee of this message + callback - Callback to invoke each time the event is received. + The callback should take 1 argument of type `ErrorResponse`. + The callback is not expected to return anything. + + Returns two values immediately. The first is a `concurrent.futures.Future` + which will contain a result of `None` when the server has acknowledged + the subscription, or an exception if the subscription fails. The second + value is a topic which may be passed to `unsubscribe()` to stop + receiving messages. Note that messages may arrive before the + subscription is acknowledged. + """ + if not request.template_name: + raise ValueError("request.template_name is required") + + if not callable(callback): + raise ValueError("callback is required") + + return self._subscribe_operation( + topic='$aws/provisioning-templates/{0.template_name}/provision/json/rejected'.format(request), + qos=qos, + callback=callback, + payload_to_class_fn=ErrorResponse.from_payload) + +class CreateCertificateFromCsrRequest(awsiot.ModeledClass): + __slots__ = ['certificate_signing_request'] + + def __init__(self, certificate_signing_request=None): + # type: (typing.Optional[str]) -> None + self.certificate_signing_request = certificate_signing_request # type: typing.Optional[str] + + def to_payload(self): + # type: () -> typing.Dict[str, typing.Any] + payload = {} # type: typing.Dict[str, typing.Any] + if self.certificate_signing_request is not None: + payload['certificateSigningRequest'] = self.certificate_signing_request + return payload + +class CreateCertificateFromCsrResponse(awsiot.ModeledClass): + __slots__ = ['certificate_id', 'certificate_ownership_token', 'certificate_pem'] + + def __init__(self, certificate_id=None, certificate_ownership_token=None, certificate_pem=None): + # type: (typing.Optional[str], typing.Optional[str], typing.Optional[str]) -> None + self.certificate_id = certificate_id # type: typing.Optional[str] + self.certificate_ownership_token = certificate_ownership_token # type: typing.Optional[str] + self.certificate_pem = certificate_pem # type: typing.Optional[str] + + @classmethod + def from_payload(cls, payload): + # type: (typing.Dict[str, typing.Any]) -> CreateCertificateFromCsrResponse + new = cls() + val = payload.get('certificateId') + if val is not None: + new.certificate_id = val + val = payload.get('certificateOwnershipToken') + if val is not None: + new.certificate_ownership_token = val + val = payload.get('certificatePem') + if val is not None: + new.certificate_pem = val + return new + +class CreateCertificateFromCsrSubscriptionRequest(awsiot.ModeledClass): + __slots__ = [] + + def __init__(self): + # type: () -> None + pass + +class CreateKeysAndCertificateRequest(awsiot.ModeledClass): + __slots__ = [] + + def __init__(self): + # type: () -> None + pass + +class CreateKeysAndCertificateResponse(awsiot.ModeledClass): + __slots__ = ['certificate_id', 'certificate_ownership_token', 'certificate_pem', 'private_key'] + + def __init__(self, certificate_id=None, certificate_ownership_token=None, certificate_pem=None, private_key=None): + # type: (typing.Optional[str], typing.Optional[str], typing.Optional[str], typing.Optional[str]) -> None + self.certificate_id = certificate_id # type: typing.Optional[str] + self.certificate_ownership_token = certificate_ownership_token # type: typing.Optional[str] + self.certificate_pem = certificate_pem # type: typing.Optional[str] + self.private_key = private_key # type: typing.Optional[str] + + @classmethod + def from_payload(cls, payload): + # type: (typing.Dict[str, typing.Any]) -> CreateKeysAndCertificateResponse + new = cls() + val = payload.get('certificateId') + if val is not None: + new.certificate_id = val + val = payload.get('certificateOwnershipToken') + if val is not None: + new.certificate_ownership_token = val + val = payload.get('certificatePem') + if val is not None: + new.certificate_pem = val + val = payload.get('privateKey') + if val is not None: + new.private_key = val + return new + +class CreateKeysAndCertificateSubscriptionRequest(awsiot.ModeledClass): + __slots__ = [] + + def __init__(self): + # type: () -> None + pass + +class ErrorResponse(awsiot.ModeledClass): + __slots__ = ['error_code', 'error_message', 'status_code'] + + def __init__(self, error_code=None, error_message=None, status_code=None): + # type: (typing.Optional[str], typing.Optional[str], typing.Optional[int]) -> None + self.error_code = error_code # type: typing.Optional[str] + self.error_message = error_message # type: typing.Optional[str] + self.status_code = status_code # type: typing.Optional[int] + + @classmethod + def from_payload(cls, payload): + # type: (typing.Dict[str, typing.Any]) -> ErrorResponse + new = cls() + val = payload.get('errorCode') + if val is not None: + new.error_code = val + val = payload.get('errorMessage') + if val is not None: + new.error_message = val + val = payload.get('statusCode') + if val is not None: + new.status_code = val + return new + +class RegisterThingRequest(awsiot.ModeledClass): + __slots__ = ['certificate_ownership_token', 'parameters', 'template_name'] + + def __init__(self, certificate_ownership_token=None, parameters=None, template_name=None): + # type: (typing.Optional[str], typing.Optional[typing.Dict[str, str]], typing.Optional[str]) -> None + self.certificate_ownership_token = certificate_ownership_token # type: typing.Optional[str] + self.parameters = parameters # type: typing.Optional[typing.Dict[str, str]] + self.template_name = template_name # type: typing.Optional[str] + + def to_payload(self): + # type: () -> typing.Dict[str, typing.Any] + payload = {} # type: typing.Dict[str, typing.Any] + if self.certificate_ownership_token is not None: + payload['certificateOwnershipToken'] = self.certificate_ownership_token + if self.parameters is not None: + payload['parameters'] = self.parameters + return payload + +class RegisterThingResponse(awsiot.ModeledClass): + __slots__ = ['device_configuration', 'thing_name'] + + def __init__(self, device_configuration=None, thing_name=None): + # type: (typing.Optional[typing.Dict[str, str]], typing.Optional[str]) -> None + self.device_configuration = device_configuration # type: typing.Optional[typing.Dict[str, str]] + self.thing_name = thing_name # type: typing.Optional[str] + + @classmethod + def from_payload(cls, payload): + # type: (typing.Dict[str, typing.Any]) -> RegisterThingResponse + new = cls() + val = payload.get('deviceConfiguration') + if val is not None: + new.device_configuration = val + val = payload.get('thingName') + if val is not None: + new.thing_name = val + return new + +class RegisterThingSubscriptionRequest(awsiot.ModeledClass): + __slots__ = ['template_name'] + + def __init__(self, template_name=None): + # type: (typing.Optional[str]) -> None + self.template_name = template_name # type: typing.Optional[str] + diff --git a/samples/fleetprovisioning.py b/samples/fleetprovisioning.py new file mode 100644 index 00000000..44131d28 --- /dev/null +++ b/samples/fleetprovisioning.py @@ -0,0 +1,406 @@ +# Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +from __future__ import absolute_import +from __future__ import print_function +import argparse +from awscrt import auth, http, io, mqtt +from awsiot import iotidentity +from awsiot import mqtt_connection_builder +from concurrent.futures import Future +import sys +import threading +import time +import traceback +import json + +# - Overview - +# This sample uses the AWS IoT Fleet Provisioning to provision device using either the keys +# or CSR +# +# +# - Instructions - +# This sample requires you to create a provisioning claim. See: +# https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html +# +# - Detail - +# On startup, the script subscribes to topics based on the request type of either CSR or Keys +# publishes the request to corresponding topic and calls RegisterThing. + +parser = argparse.ArgumentParser(description="Fleet Provisioning sample script.") +parser.add_argument('--endpoint', required=True, help="Your AWS IoT custom endpoint, not including a port. " + + "Ex: \"w6zbse3vjd5b4p-ats.iot.us-west-2.amazonaws.com\"") +parser.add_argument('--cert', help="File path to your client certificate, in PEM format") +parser.add_argument('--key', help="File path to your private key file, in PEM format") +parser.add_argument('--root-ca', help="File path to root certificate authority, in PEM format. " + + "Necessary if MQTT server uses a certificate that's not already in " + + "your trust store") +parser.add_argument('--client-id', default='samples-client-id', help="Client ID for MQTT connection.") +parser.add_argument('--use-websocket', default=False, action='store_true', + help="To use a websocket instead of raw mqtt. If you " + + "specify this option you must specify a region for signing, you can also enable proxy mode.") +parser.add_argument('--signing-region', default='us-east-1', help="If you specify --use-web-socket, this " + + "is the region that will be used for computing the Sigv4 signature") +parser.add_argument('--proxy-host', help="Hostname for proxy to connect to. Note: if you use this feature, " + + "you will likely need to set --root-ca to the ca for your proxy.") +parser.add_argument('--proxy-port', type=int, default=8080, help="Port for proxy to connect to.") +parser.add_argument('--verbosity', choices=[x.name for x in io.LogLevel], default=io.LogLevel.NoLogs.name, + help='Logging level') +parser.add_argument("--csr", help="File path to your client CSR in PEM format") +parser.add_argument("--templateName", help="Template name") +parser.add_argument("--templateParameters", help="Values for Template Parameters") + +# Using globals to simplify sample code +is_sample_done = threading.Event() +args = parser.parse_args() + +io.init_logging(getattr(io.LogLevel, args.verbosity), 'stderr') +mqtt_connection = None +identity_client = None + +createKeysAndCertificateResponse = None +createCertificateFromCsrResponse = None +registerThingResponse = None + +class LockedData(object): + def __init__(self): + self.lock = threading.Lock() + self.disconnect_called = False + +locked_data = LockedData() + +# Function for gracefully quitting this sample +def exit(msg_or_exception): + if isinstance(msg_or_exception, Exception): + print("Exiting Sample due to exception.") + traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) + else: + print("Exiting Sample:", msg_or_exception) + + with locked_data.lock: + if not locked_data.disconnect_called: + print("Disconnecting...") + locked_data.disconnect_called = True + future = mqtt_connection.disconnect() + future.add_done_callback(on_disconnected) + +def on_disconnected(disconnect_future): + # type: (Future) -> None + print("Disconnected.") + + # Signal that sample is finished + is_sample_done.set() + +def on_publish_register_thing(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published RegisterThing request..") + + except Exception as e: + print("Failed to publish RegisterThing request.") + exit(e) + +def on_publish_create_keys_and_certificate(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published CreateKeysAndCertificate request..") + + except Exception as e: + print("Failed to publish CreateKeysAndCertificate request.") + exit(e) + +def on_publish_create_certificate_from_csr(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published CreateCertificateFromCsr request..") + + except Exception as e: + print("Failed to publish CreateCertificateFromCsr request.") + exit(e) + +def createkeysandcertificate_execution_accepted(response): + # type: (iotidentity.CreateKeysAndCertificateResponse) -> None + try: + global createKeysAndCertificateResponse + createKeysAndCertificateResponse = response + print("Received a new message {}".format(createKeysAndCertificateResponse)) + + return + + except Exception as e: + exit(e) + +def createkeysandcertificate_execution_rejected(rejected): + # type: (iotidentity.RejectedError) -> None + exit("CreateKeysAndCertificate Request rejected with code:'{}' message:'{}' statuscode:'{}'".format( + rejected.error_code, rejected.error_message, rejected.status_code)) + +def createcertificatefromcsr_execution_accepted(response): + # type: (iotidentity.CreateCertificateFromCsrResponse) -> None + try: + global createCertificateFromCsrResponse + createCertificateFromCsrResponse = response + print("Received a new message {}".format(createCertificateFromCsrResponse)) + global certificateOwnershipToken + certificateOwnershipToken = response.certificate_ownership_token + + return + + except Exception as e: + exit(e) + +def createcertificatefromcsr_execution_rejected(rejected): + # type: (iotidentity.RejectedError) -> None + exit("CreateCertificateFromCsr Request rejected with code:'{}' message:'{}' statuscode:'{}'".format( + rejected.error_code, rejected.error_message, rejected.status_code)) + +def registerthing_execution_accepted(response): + # type: (iotidentity.RegisterThingResponse) -> None + try: + global registerThingResponse + registerThingResponse = response + print("Received a new message {} ".format(registerThingResponse)) + return + + except Exception as e: + exit(e) + +def registerthing_execution_rejected(rejected): + # type: (iotidentity.RejectedError) -> None + exit("RegisterThing Request rejected with code:'{}' message:'{}' statuscode:'{}'".format( + rejected.error_code, rejected.error_message, rejected.status_code)) + +# Callback when connection is accidentally lost. +def on_connection_interrupted(connection, error, **kwargs): + print("Connection interrupted. error: {}".format(error)) + + +# Callback when an interrupted connection is re-established. +def on_connection_resumed(connection, return_code, session_present, **kwargs): + print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present)) + + if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present: + print("Session did not persist. Resubscribing to existing topics...") + resubscribe_future, _ = connection.resubscribe_existing_topics() + + # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread, + # evaluate result with a callback instead. + resubscribe_future.add_done_callback(on_resubscribe_complete) + +def on_resubscribe_complete(resubscribe_future): + resubscribe_results = resubscribe_future.result() + print("Resubscribe results: {}".format(resubscribe_results)) + + for topic, qos in resubscribe_results['topics']: + if qos is None: + sys.exit("Server rejected resubscribe to topic: {}".format(topic)) + +def waitForCreateKeysAndCertificateResponse(): + # Wait for the response. + loopCount = 0 + while loopCount < 10 and createKeysAndCertificateResponse is None: + if createKeysAndCertificateResponse is not None: + break + print('Waiting... CreateKeysAndCertificateResponse: ' + json.dumps(createKeysAndCertificateResponse)) + loopCount += 1 + time.sleep(1) + +def waitForCreateCertificateFromCsrResponse(): + # Wait for the response. + loopCount = 0 + while loopCount < 10 and createCertificateFromCsrResponse is None: + if createCertificateFromCsrResponse is not None: + break + print('Waiting...CreateCertificateFromCsrResponse: ' + json.dumps(createCertificateFromCsrResponse)) + loopCount += 1 + time.sleep(1) + +def waitForRegisterThingResponse(): + # Wait for the response. + loopCount = 0 + while loopCount < 20 and registerThingResponse is None: + if registerThingResponse is not None: + break + loopCount += 1 + print('Waiting... RegisterThingResponse: ' + json.dumps(registerThingResponse)) + time.sleep(1) + +if __name__ == '__main__': + # Spin up resources + event_loop_group = io.EventLoopGroup(1) + host_resolver = io.DefaultHostResolver(event_loop_group) + client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) + + if args.use_websocket == True: + proxy_options = None + if (args.proxy_host): + proxy_options = http.HttpProxyOptions(host_name=args.proxy_host, port=args.proxy_port) + + credentials_provider = auth.AwsCredentialsProvider.new_default_chain(client_bootstrap) + mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing( + endpoint=args.endpoint, + client_bootstrap=client_bootstrap, + region=args.signing_region, + credentials_provider=credentials_provider, + websocket_proxy_options=proxy_options, + on_connection_interrupted=on_connection_interrupted, + on_connection_resumed=on_connection_resumed, + ca_filepath=args.root_ca, + client_id=args.client_id, + clean_session=False, + keep_alive_secs=6) + + else: + mqtt_connection = mqtt_connection_builder.mtls_from_path( + endpoint=args.endpoint, + cert_filepath=args.cert, + pri_key_filepath=args.key, + client_bootstrap=client_bootstrap, + ca_filepath=args.root_ca, + client_id=args.client_id, + on_connection_interrupted=on_connection_interrupted, + on_connection_resumed=on_connection_resumed, + clean_session=False, + keep_alive_secs=6) + + print("Connecting to {} with client ID '{}'...".format( + args.endpoint, args.client_id)) + + connected_future = mqtt_connection.connect() + + identity_client = iotidentity.IotIdentityClient(mqtt_connection) + + # Wait for connection to be fully established. + # Note that it's not necessary to wait, commands issued to the + # mqtt_connection before its fully connected will simply be queued. + # But this sample waits here so it's obvious when a connection + # fails or succeeds. + connected_future.result() + print("Connected!") + + try: + # Subscribe to necessary topics. + # Note that is **is** important to wait for "accepted/rejected" subscriptions + # to succeed before publishing the corresponding "request". + + # Keys workflow if csr is not provided + if args.csr is None: + createkeysandcertificate_subscription_request = iotidentity.CreateKeysAndCertificateSubscriptionRequest() + + print("Subscribing to CreateKeysAndCertificate Accepted topic...") + createkeysandcertificate_subscribed_accepted_future, _ = identity_client.subscribe_to_create_keys_and_certificate_accepted( + request=createkeysandcertificate_subscription_request, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=createkeysandcertificate_execution_accepted) + + # Wait for subscription to succeed + createkeysandcertificate_subscribed_accepted_future.result() + + print("Subscribing to CreateKeysAndCertificate Rejected topic...") + createkeysandcertificate_subscribed_rejected_future, _ = identity_client.subscribe_to_create_keys_and_certificate_rejected( + request=createkeysandcertificate_subscription_request, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=createkeysandcertificate_execution_rejected) + + # Wait for subscription to succeed + createkeysandcertificate_subscribed_rejected_future.result() + else: + createcertificatefromcsr_subscription_request = iotidentity.CreateCertificateFromCsrSubscriptionRequest() + + print("Subscribing to CreateCertificateFromCsr Accepted topic...") + createcertificatefromcsr_subscribed_accepted_future, _ = identity_client.subscribe_to_create_certificate_from_csr_accepted( + request=createcertificatefromcsr_subscription_request, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=createcertificatefromcsr_execution_accepted) + + # Wait for subscription to succeed + createcertificatefromcsr_subscribed_accepted_future.result() + + print("Subscribing to CreateCertificateFromCsr Rejected topic...") + createcertificatefromcsr_subscribed_rejected_future, _ = identity_client.subscribe_to_create_certificate_from_csr_rejected( + request=createcertificatefromcsr_subscription_request, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=createcertificatefromcsr_execution_rejected) + + # Wait for subscription to succeed + createcertificatefromcsr_subscribed_rejected_future.result() + + + registerthing_subscription_request = iotidentity.RegisterThingSubscriptionRequest(args.templateName) + + print("Subscribing to RegisterThing Accepted topic...") + registerthing_subscribed_accepted_future, _ = identity_client.subscribe_to_register_thing_accepted( + request=registerthing_subscription_request, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=registerthing_execution_accepted) + + # Wait for subscription to succeed + registerthing_subscribed_accepted_future.result() + + print("Subscribing to RegisterThing Rejected topic...") + registerthing_subscribed_rejected_future, _ = identity_client.subscribe_to_register_thing_rejected( + request=registerthing_subscription_request, + qos=mqtt.QoS.AT_LEAST_ONCE, + callback=registerthing_execution_rejected) + # Wait for subscription to succeed + registerthing_subscribed_rejected_future.result() + + if args.csr is None: + print("Publishing to CreateKeysAndCertificate...") + publish_future = identity_client.publish_create_keys_and_certificate( + request=iotidentity.CreateKeysAndCertificateRequest(), qos=mqtt.QoS.AT_LEAST_ONCE) + publish_future.add_done_callback(on_publish_create_keys_and_certificate) + + waitForCreateKeysAndCertificateResponse() + + if createKeysAndCertificateResponse is None: + raise Exception('CreateKeysAndCertificate API did not succeed') + + registerThingRequest = iotidentity.RegisterThingRequest( + template_name=args.templateName, + certificate_ownership_token=createKeysAndCertificateResponse.certificate_ownership_token, + parameters=json.loads(args.templateParameters)) + else: + print("Publishing to CreateCertificateFromCsr...") + csrPath = open(args.csr, 'r').read() + publish_future = identity_client.publish_create_certificate_from_csr( + request=iotidentity.CreateCertificateFromCsrRequest(certificate_signing_request=csrPath), + qos=mqtt.QoS.AT_LEAST_ONCE) + publish_future.add_done_callback(on_publish_create_certificate_from_csr) + + waitForCreateCertificateFromCsrResponse() + + if createCertificateFromCsrResponse is None: + raise Exception('CreateCertificateFromCsr API did not succeed') + + registerThingRequest = iotidentity.RegisterThingRequest( + template_name=args.templateName, + certificate_ownership_token=createCertificateFromCsrResponse.certificate_ownership_token, + parameters=json.loads(args.templateParameters)) + + print("Publishing to RegisterThing topic...") + registerthing_publish_future = identity_client.publish_register_thing(registerThingRequest, mqtt.QoS.AT_LEAST_ONCE) + registerthing_publish_future.add_done_callback(on_publish_register_thing) + + waitForRegisterThingResponse() + exit("success") + + except Exception as e: + exit(e) + + # Wait for the sample to finish + is_sample_done.wait() +