diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 19f6adcf..baad5074 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -279,6 +279,9 @@ jobs: - name: run Shadow sample run: | python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_shadow_cfg.json + - name: run MQTT5 Shadow sample + run: | + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_shadow_cfg.json - name: configure AWS credentials (Jobs) uses: aws-actions/configure-aws-credentials@v1 with: @@ -287,6 +290,9 @@ jobs: - name: run Jobs sample run: | python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_jobs_cfg.json + - name: run MQTT5 Jobs sample + run: | + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_jobs_cfg.json - name: configure AWS credentials (Fleet provisioning) uses: aws-actions/configure-aws-credentials@v1 with: @@ -298,6 +304,12 @@ jobs: Sample_UUID=$(python3 -c "import uuid; print (uuid.uuid4())") python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_fleet_provisioning_cfg.json --input_uuid ${Sample_UUID} python3 ${{ env.CI_UTILS_FOLDER }}/delete_iot_thing_ci.py --thing_name "Fleet_Thing_${Sample_UUID}" --region "us-east-1" + - name: run MQTT5 Fleet Provisioning sample + run: | + echo "Generating UUID for IoT thing" + Sample_UUID=$(python3 -c "import uuid; print (uuid.uuid4())") + python3 ${{ env.CI_UTILS_FOLDER }}/run_sample_ci.py --file ${{ env.CI_SAMPLES_CFG_FOLDER }}/ci_run_mqtt5_fleet_provisioning_cfg.json --input_uuid ${Sample_UUID} + python3 ${{ env.CI_UTILS_FOLDER }}/delete_iot_thing_ci.py --thing_name "Fleet_Thing_${Sample_UUID}" --region "us-east-1" - name: configure AWS credentials (Greengrass) uses: aws-actions/configure-aws-credentials@v1 with: diff --git a/.github/workflows/ci_run_mqtt5_fleet_provisioning_cfg.json b/.github/workflows/ci_run_mqtt5_fleet_provisioning_cfg.json new file mode 100644 index 00000000..58b03e7f --- /dev/null +++ b/.github/workflows/ci_run_mqtt5_fleet_provisioning_cfg.json @@ -0,0 +1,30 @@ +{ + "language": "Python", + "sample_file": "./aws-iot-device-sdk-python-v2/samples/fleetprovisioning_mqtt5.py", + "sample_region": "us-east-1", + "sample_main_class": "", + "arguments": [ + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "secret": "ci/FleetProvisioning/cert", + "filename": "tmp_certificate.pem" + }, + { + "name": "--key", + "secret": "ci/FleetProvisioning/key", + "filename": "tmp_key.pem" + }, + { + "name": "--template_name", + "data": "CI_FleetProvisioning_Template" + }, + { + "name": "--template_parameters", + "data": "{\"SerialNumber\":\"$INPUT_UUID\"}" + } + ] +} diff --git a/.github/workflows/ci_run_mqtt5_jobs_cfg.json b/.github/workflows/ci_run_mqtt5_jobs_cfg.json new file mode 100644 index 00000000..118af85a --- /dev/null +++ b/.github/workflows/ci_run_mqtt5_jobs_cfg.json @@ -0,0 +1,26 @@ +{ + "language": "Python", + "sample_file": "./aws-iot-device-sdk-python-v2/samples/jobs_mqtt5.py", + "sample_region": "us-east-1", + "sample_main_class": "", + "arguments": [ + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "secret": "ci/Jobs/cert", + "filename": "tmp_certificate.pem" + }, + { + "name": "--key", + "secret": "ci/Jobs/key", + "filename": "tmp_key.pem" + }, + { + "name": "--thing_name", + "data": "CI_Jobs_Thing" + } + ] +} diff --git a/.github/workflows/ci_run_mqtt5_shadow_cfg.json b/.github/workflows/ci_run_mqtt5_shadow_cfg.json new file mode 100644 index 00000000..8342d464 --- /dev/null +++ b/.github/workflows/ci_run_mqtt5_shadow_cfg.json @@ -0,0 +1,30 @@ +{ + "language": "Python", + "sample_file": "./aws-iot-device-sdk-python-v2/samples/shadow_mqtt5.py", + "sample_region": "us-east-1", + "sample_main_class": "", + "arguments": [ + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "secret": "ci/Shadow/cert", + "filename": "tmp_certificate.pem" + }, + { + "name": "--key", + "secret": "ci/Shadow/key", + "filename": "tmp_key.pem" + }, + { + "name": "--thing_name", + "data": "CI_Shadow_Thing" + }, + { + "name": "--is_ci", + "data": "true" + } + ] +} diff --git a/awsiot/__init__.py b/awsiot/__init__.py index 6a65102c..fbd5a084 100644 --- a/awsiot/__init__.py +++ b/awsiot/__init__.py @@ -11,7 +11,7 @@ 'mqtt5_client_builder', ] -from awscrt import mqtt +from awscrt import mqtt, mqtt5 from concurrent.futures import Future import json from typing import Any, Callable, Dict, Optional, Tuple, TypeVar @@ -32,8 +32,14 @@ class MqttServiceClient: mqtt_connection: MQTT connection to use """ - def __init__(self, mqtt_connection: mqtt.Connection): - self._mqtt_connection = mqtt_connection # type: mqtt.Connection + def __init__(self, mqtt_connection: mqtt.Connection or mqtt5.Client): + if isinstance(mqtt_connection, mqtt.Connection): + self._mqtt_connection = mqtt_connection # type: mqtt.Connection + elif isinstance(mqtt_connection, mqtt5.Client): + self._mqtt_connection = mqtt_connection.new_connection() + self._mqtt5_client = mqtt_connection + else: + assert("The service client could only take mqtt.Connection and mqtt5.Client as argument") @property def mqtt_connection(self) -> mqtt.Connection: diff --git a/codebuild/samples/shadow-linux.sh b/codebuild/samples/shadow-linux.sh index 1fc1b54f..e9aad66c 100755 --- a/codebuild/samples/shadow-linux.sh +++ b/codebuild/samples/shadow-linux.sh @@ -11,5 +11,6 @@ ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "ci/endpoint" --query echo "Shadow test" python3 shadow.py --endpoint $ENDPOINT --key /tmp/privatekey.pem --cert /tmp/certificate.pem --thing_name CI_CodeBuild_Thing --is_ci true +python3 shadow_mqtt5.py --endpoint $ENDPOINT --key /tmp/privatekey.pem --cert /tmp/certificate.pem --thing_name CI_CodeBuild_Thing --is_ci true popd diff --git a/samples/README.md b/samples/README.md index 690fe29e..747fe46b 100644 --- a/samples/README.md +++ b/samples/README.md @@ -14,8 +14,11 @@ * [Cognito Connect](./cognito_connect.md) * [X509 Connect](./x509_connect.md) * [Shadow](./shadow.md) +* [MQTT5 Shadow](./shadow_mqtt5.md) * [Jobs](./jobs.md) +* [MQTT5 Jobs](./jobs_mqtt5.md) * [Fleet Provisioning](./fleetprovisioning.md) +* [MQTT5 Fleet Provisioning](./fleetprovisioning_mqtt5.md) * [Greengrass Discovery](./basic_discovery.md) * [Greengrass IPC](./ipc_greengrass.md) diff --git a/samples/fleetprovisioning_mqtt5.md b/samples/fleetprovisioning_mqtt5.md new file mode 100644 index 00000000..a89688f5 --- /dev/null +++ b/samples/fleetprovisioning_mqtt5.md @@ -0,0 +1,371 @@ +# Fleet provisioning MQTT5 + +[**Return to main sample list**](./README.md) + +This sample uses the AWS IoT [Fleet provisioning](https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html) to provision devices using either a CSR or Keys-And-Certificate and subsequently calls RegisterThing. This allows you to create new AWS IoT Core things using a Fleet Provisioning Template. + +On startup, the script subscribes to topics based on the request type of either CSR or Keys topics, publishes the request to corresponding topic and calls RegisterThing. + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": "iot:Publish",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Receive"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create/json/rejected",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/certificates/create-from-csr/json/rejected",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json/accepted",
+        "arn:aws:iot:region:account:topic/$aws/provisioning-templates/templatename/provision/json/rejected"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create/json/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create/json/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create-from-csr/json/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/certificates/create-from-csr/json/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/provisioning-templates/templatename/provision/json/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/provisioning-templates/templatename/provision/json/rejected"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/test-*"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: The name of your AWS Fleet Provisioning template you want to use to create new AWS IoT Core Things. + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +There are many different ways to run the Fleet Provisioning sample because of how many different ways there are to setup a Fleet Provisioning template in AWS IoT Core. **The easiest and most common way is to run the sample with the following**: + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 fleetprovisioning_mqtt5.py --endpoint --cert --key --template_name --template_parameters +``` + +You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 fleetprovisioning_mqtt5.py --endpoint --cert --key --template_name --template_parameters --ca_file +``` + +However, this is just one way using the `CreateKeysAndCertificate` workflow. Below are a detailed list of instructions with the different ways to connect. While the detailed instructions do not show it, you can pass `--ca_file` as needed no matter which way you connect via Fleet Provisioning. + +## Service Client Notes +### Difference relative to MQTT311 IoTIdentityClient +The IoTIdentityClient with mqtt5 client is almost identical to the mqtt3 one. The only difference is that you would need setup up a Mqtt5 Client and pass it to the IotIdentityClient. +For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](./mqtt5_pubsub.py) + + + + + + + + + + +
Create a IoTIdentityClient with Mqtt5Create a IoTIdentityClient with Mqtt311
+ +```python + # Create a Mqtt5 Client + mqtt5_client = mqtt5_client_builder.mtls_from_path( + endpoint, + port, + cert_filepath, + pri_key_filepath, + ca_filepath, + client_id, + clean_session, + keep_alive_secs, + http_proxy_options, + on_lifecycle_connection_success, + on_lifecycle_stopped) + + # Create the Identity Client from Mqtt5 Client + identity_client = iotidentity.IotIdentityClient(mqtt5_client) +``` + + + +```python + # Create a Mqtt311 Connection from the command line data + mqtt_connection = mqtt_connection_builder.mtls_from_path( + endpoint, + port, + cert_filepath, + pri_key_filepath, + ca_filepath, + client_id, + clean_session, + keep_alive_secs, + http_proxy_options) + + # Create the Identity Client from Mqtt311 Connection + identity_client = iotidentity.IotIdentityClient(mqtt_connection) +``` + +
+ +### Mqtt5.QoS v.s. Mqtt3.QoS +As the service client interface is unchanged for both Mqtt3 Connection and Mqtt5 Client,the IotIdentityClient will use Mqtt3.QoS instead of Mqtt5.QoS even with a Mqtt5 Client. You could use mqtt3.QoS.to_mqtt5() and mqtt5.QoS.to_mqtt3() to convert the value. + + +### Fleet Provisioning Detailed Instructions + +#### Aws Resource Setup + +Fleet provisioning requires some additional AWS resources be set up first. These steps assume you have the [AWS CLI](https://aws.amazon.com/cli/) installed and have your AWS credentials for the AWS CLI setup and with sufficient permissions to perform all of the operations in this guide. For instructions on how to setup AWS CLI, see the following: [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html). + +You will also need Python version 3 installed to be able to run the `parse_cert_set_result.py` file, which is a helper script to make running this sample easier. You can find Python3 installers for your platform on the [Python website](https://www.python.org/). + +These steps are based on the provisioning setup steps +that can be found at [Embedded C SDK Setup](https://docs.aws.amazon.com/freertos/latest/lib-ref/c-sdk/provisioning/provisioning_tests.html#provisioning_system_tests_setup). + + +First, create the IAM role that will be needed by the fleet provisioning template. Replace `` with a name of the role you want to create. + +``` sh +aws iam create-role \ + --role-name \ + --assume-role-policy-document '{"Version":"2012-10-17","Statement":[{"Action":"sts:AssumeRole","Effect":"Allow","Principal":{"Service":"iot.amazonaws.com"}}]}' +``` + +This is the IAM role the Fleet Provisioning template will use to create the new AWS IoT things. However, before it can do so, it will need to have a policy attached to it to give the new role permission to perform the operations it needs. To do this, run the following command and replace `` with the name of the role you created in the previous step. + +``` sh +aws iam attach-role-policy \ + --role-name \ + --policy-arn arn:aws:iam::aws:policy/service-role/AWSIoTThingsRegistration +``` + +The next step is to make a template resource that will be used for provisioning the new AWS IoT Core things. This template tells AWS IoT Core how to setup the new AWS IoT Core Things you create when your Fleet Provisioning role is invoked, setting up material such as the name and tags, for example. + +To create a new Fleet Provisioning template, you can use the following AWS CLI command, replacing `` with the name of the template you wish to create, `` with the name of the role you created two steps prior, and `` with your AWS IoT Core account number. Finally, make sure to replace `` with a valid JSON document as a single line. An example JSON document is provided further below. + +``` sh +aws iot create-provisioning-template \ + --template-name \ + --provisioning-role-arn arn:aws:iam:::role/ \ + --template-body "" \ + --enabled +``` + +For the purposes of this sample, the following template JSON document is presumed to be used: + +
+(see template body) + +```json +{ + "Parameters": { + "DeviceLocation": { + "Type": "String" + }, + "AWS::IoT::Certificate::Id": { + "Type": "String" + }, + "SerialNumber": { + "Type": "String" + } + }, + "Mappings": { + "LocationTable": { + "Seattle": { + "LocationUrl": "https://example.aws" + } + } + }, + "Resources": { + "thing": { + "Type": "AWS::IoT::Thing", + "Properties": { + "ThingName": { + "Fn::Join": [ + "", + [ + "ThingPrefix_", + { + "Ref": "SerialNumber" + } + ] + ] + }, + "AttributePayload": { + "version": "v1", + "serialNumber": "serialNumber" + } + }, + "OverrideSettings": { + "AttributePayload": "MERGE", + "ThingTypeName": "REPLACE", + "ThingGroups": "DO_NOTHING" + } + }, + "certificate": { + "Type": "AWS::IoT::Certificate", + "Properties": { + "CertificateId": { + "Ref": "AWS::IoT::Certificate::Id" + }, + "Status": "Active" + }, + "OverrideSettings": { + "Status": "REPLACE" + } + }, + "policy": { + "Type": "AWS::IoT::Policy", + "Properties": { + "PolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iot:Connect", + "iot:Subscribe", + "iot:Publish", + "iot:Receive" + ], + "Resource": "*" + } + ] + } + } + } + }, + "DeviceConfiguration": { + "FallbackUrl": "https://www.example.com/test-site", + "LocationUrl": { + "Fn::FindInMap": [ + "LocationTable", + { + "Ref": "DeviceLocation" + }, + "LocationUrl" + ] + } + } +} +``` + +
+ +And here is the same JSON document, but as a single line for easier copy-pasting: + +
+(see template body) + +``` json +{"Parameters": {"DeviceLocation": {"Type": "String"},"AWS::IoT::Certificate::Id": {"Type": "String"},"SerialNumber": {"Type": "String"}},"Mappings": {"LocationTable": {"Seattle": {"LocationUrl": "https://example.aws"}}},"Resources": {"thing": {"Type": "AWS::IoT::Thing","Properties": {"ThingName": {"Fn::Join": ["",["ThingPrefix_",{"Ref": "SerialNumber"}]]},"AttributePayload": {"version": "v1","serialNumber": "serialNumber"}},"OverrideSettings": {"AttributePayload": "MERGE","ThingTypeName": "REPLACE","ThingGroups": "DO_NOTHING"}},"certificate": {"Type": "AWS::IoT::Certificate","Properties": {"CertificateId": {"Ref": "AWS::IoT::Certificate::Id"},"Status": "Active"},"OverrideSettings": {"Status": "REPLACE"}},"policy": {"Type": "AWS::IoT::Policy","Properties": {"PolicyDocument": {"Version": "2012-10-17","Statement": [{"Effect": "Allow","Action": ["iot:Connect","iot:Subscribe","iot:Publish","iot:Receive"],"Resource": "*"}]}}}},"DeviceConfiguration": {"FallbackUrl": "https://www.example.com/test-site","LocationUrl": {"Fn::FindInMap": ["LocationTable",{"Ref": "DeviceLocation"},"LocationUrl"]}}} +``` + +
+ +You can use this JSON document as the `` in the AWS CLI command. This sample will assume you have used the template JSON above, so you may need to adjust if you are using a different template JSON. Thankfully, all of these steps need to only be done and, now that they are complete, you will need not perform them again. + +#### Creating a certificate-key set from a provisioning claim + +To run the provisioning sample, you'll need a certificate and key set with sufficient permissions. Provisioning certificates are normally created ahead of time and placed on your device, but for this sample, we will just create them on the fly. This is primarily done for example purposes. + +You can also use any certificate set you've already created if it has sufficient IoT permissions. If you wish to do this, you can skip the step that calls `create-provisioning-claim` below and move right to the next step: [Running the sample using a certificate-key set](#running-the-sample-using-a-certificate-key-set) + +We've included a script in the utils folder that creates certificate and key files from the response of calling +`create-provisioning-claim`. These dynamically sourced certificates are **only valid for five minutes**. When running the command, +you'll need to substitute the name of the template you previously created. If on Windows, replace the paths with something appropriate. + +**Note**: The following assumes you are running this command from the `aws-iot-device-sdk-java-v2` folder, which is the main GitHub folder. If you are running this from another folder (like the `samples/Identity` folder), then you will need to adjust the filepaths accordingly. + +```sh +aws iot create-provisioning-claim \ + --template-name \ + | python3 ./utils/parse_cert_set_result.py \ + --path ./tmp \ + --filename provision +``` +* Replace `` with the name of the Fleet Provisioning template you created earlier. + +This will create a certificate and key in the `tmp` folder with file names starting with `provision`. You can now use these temporary keys +to perform the actual provisioning in the section below. + +#### Running the sample using a certificate-key set + +To run the sample with your certificate and private key, use the following command: + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 fleetprovisioning.py --endpoint --cert --key --template_name --template_parameters '{\"SerialNumber\":\"1\",\"DeviceLocation\":\"Seattle\"}' +``` + +As per normal, replace the `<>` parameters with the proper values. Notice that we provided substitution values for the two parameters in the template body, `DeviceLocation` and `SerialNumber`. + +With that, the sample should run and work as expected! You should then find your have a new AWS IoT Core thing! + +### Run the sample using the certificate signing request workflow + +To run the sample with this workflow, you'll need to create a certificate signing request in addition to the other steps above (creating the role, setting its policy, setting the template JSON, etc). + +First create a certificate-key pair: +``` sh +openssl genrsa -out /tmp/deviceCert.key 2048 +``` + +Next create a certificate signing request from it: +``` sh +openssl req -new -key /tmp/deviceCert.key -out /tmp/deviceCert.csr +``` + +As in the previous workflow, you can [create a temporary certificate set from a provisioning claim](#creating-a-certificate-key-set-from-a-provisioning-claim). Again, this can be skipped if you have a policy and certificate with the proper permissions. + +```sh +aws iot create-provisioning-claim \ + --template-name \ + | python3 ./utils/parse_cert_set_result.py \ + --path ./tmp \ + --filename provision +``` +* Replace `` with the name of the Fleet Provisioning template you created earlier. + +Finally, you can also pass the certificate signing request while invoking the Fleet Provisioning sample. + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 fleetprovisioning.py --endpoint --cert --key --template_name --template_parameters '{\"SerialNumber\":\"1\",\"DeviceLocation\":\"Seattle\"}' --csr +``` diff --git a/samples/fleetprovisioning_mqtt5.py b/samples/fleetprovisioning_mqtt5.py new file mode 100644 index 00000000..f98d67cc --- /dev/null +++ b/samples/fleetprovisioning_mqtt5.py @@ -0,0 +1,376 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +from awscrt import mqtt5, http +from awsiot import iotidentity, mqtt5_client_builder +from concurrent.futures import Future +import sys +import threading +import time +import traceback +import json +from utils.command_line_utils import CommandLineUtils + +# - Overview - +# This sample uses the AWS IoT Fleet Provisioning to provision device using either the keys +# or CSR +# +# +# - Instructions - +# This sample requires you to create a provisioning claim. See: +# https://docs.aws.amazon.com/iot/latest/developerguide/provision-wo-cert.html +# +# - Detail - +# On startup, the script subscribes to topics based on the request type of either CSR or Keys +# publishes the request to corresponding topic and calls RegisterThing. + +# cmdData is the arguments/input from the command line placed into a single struct for +# use in this sample. This handles all of the command line parsing, validating, etc. +# See the Utils/CommandLineUtils for more information. +cmdData = CommandLineUtils.parse_sample_input_fleet_provisioning() + +# Using globals to simplify sample code +is_sample_done = threading.Event() +mqtt5_client = None +identity_client = None +createKeysAndCertificateResponse = None +createCertificateFromCsrResponse = None +registerThingResponse = None +future_connection_success = Future() + +class LockedData: + def __init__(self): + self.lock = threading.Lock() + self.disconnect_called = False + + +locked_data = LockedData() + +# Function for gracefully quitting this sample +def exit(msg_or_exception): + if isinstance(msg_or_exception, Exception): + print("Exiting Sample due to exception.") + traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) + else: + print("Exiting Sample:", msg_or_exception) + + with locked_data.lock: + if not locked_data.disconnect_called: + print("Stop the Client...") + locked_data.disconnect_called = True + mqtt5_client.stop() + + +# Callback for the lifecycle event Connection Success +def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): + print("Lifecycle Connection Success") + global future_connection_success + future_connection_success.set_result(lifecycle_connect_success_data) + + +# Callback for the lifecycle event on Client Stopped +def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData): + # type: (Future) -> None + print("Client Stopped.") + # Signal that sample is finished + is_sample_done.set() + + +def on_publish_register_thing(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published RegisterThing request..") + + except Exception as e: + print("Failed to publish RegisterThing request.") + exit(e) + + +def on_publish_create_keys_and_certificate(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published CreateKeysAndCertificate request..") + + except Exception as e: + print("Failed to publish CreateKeysAndCertificate request.") + exit(e) + + +def on_publish_create_certificate_from_csr(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published CreateCertificateFromCsr request..") + + except Exception as e: + print("Failed to publish CreateCertificateFromCsr request.") + exit(e) + + +def createkeysandcertificate_execution_accepted(response): + # type: (iotidentity.CreateKeysAndCertificateResponse) -> None + try: + global createKeysAndCertificateResponse + createKeysAndCertificateResponse = response + if (cmdData.input_is_ci == False): + print("Received a new message {}".format(createKeysAndCertificateResponse)) + + return + + except Exception as e: + exit(e) + + +def createkeysandcertificate_execution_rejected(rejected): + # type: (iotidentity.RejectedError) -> None + exit("CreateKeysAndCertificate Request rejected with code:'{}' message:'{}' status code:'{}'".format( + rejected.error_code, rejected.error_message, rejected.status_code)) + + +def createcertificatefromcsr_execution_accepted(response): + # type: (iotidentity.CreateCertificateFromCsrResponse) -> None + try: + global createCertificateFromCsrResponse + createCertificateFromCsrResponse = response + if (cmdData.input_is_ci == False): + print("Received a new message {}".format(createCertificateFromCsrResponse)) + global certificateOwnershipToken + certificateOwnershipToken = response.certificate_ownership_token + + return + + except Exception as e: + exit(e) + + +def createcertificatefromcsr_execution_rejected(rejected): + # type: (iotidentity.RejectedError) -> None + exit("CreateCertificateFromCsr Request rejected with code:'{}' message:'{}' status code:'{}'".format( + rejected.error_code, rejected.error_message, rejected.status_code)) + + +def registerthing_execution_accepted(response): + # type: (iotidentity.RegisterThingResponse) -> None + try: + global registerThingResponse + registerThingResponse = response + if (cmdData.input_is_ci == False): + print("Received a new message {} ".format(registerThingResponse)) + return + + except Exception as e: + exit(e) + + +def registerthing_execution_rejected(rejected): + # type: (iotidentity.RejectedError) -> None + exit("RegisterThing Request rejected with code:'{}' message:'{}' status code:'{}'".format( + rejected.error_code, rejected.error_message, rejected.status_code)) + +def on_resubscribe_complete(resubscribe_future): + resubscribe_results = resubscribe_future.result() + print("Resubscribe results: {}".format(resubscribe_results)) + + for topic, qos in resubscribe_results['topics']: + if qos is None: + sys.exit("Server rejected resubscribe to topic: {}".format(topic)) + + +def waitForCreateKeysAndCertificateResponse(): + # Wait for the response. + loopCount = 0 + while loopCount < 10 and createKeysAndCertificateResponse is None: + if createKeysAndCertificateResponse is not None: + break + if not cmdData.input_is_ci: + print('Waiting... CreateKeysAndCertificateResponse: ' + json.dumps(createKeysAndCertificateResponse)) + else: + print("Waiting... CreateKeysAndCertificateResponse: ...") + loopCount += 1 + time.sleep(1) + + +def waitForCreateCertificateFromCsrResponse(): + # Wait for the response. + loopCount = 0 + while loopCount < 10 and createCertificateFromCsrResponse is None: + if createCertificateFromCsrResponse is not None: + break + if not cmdData.input_is_ci: + print('Waiting...CreateCertificateFromCsrResponse: ' + json.dumps(createCertificateFromCsrResponse)) + else: + print("Waiting... CreateCertificateFromCsrResponse: ...") + loopCount += 1 + time.sleep(1) + + +def waitForRegisterThingResponse(): + # Wait for the response. + loopCount = 0 + while loopCount < 20 and registerThingResponse is None: + if registerThingResponse is not None: + break + loopCount += 1 + if not cmdData.input_is_ci: + print('Waiting... RegisterThingResponse: ' + json.dumps(registerThingResponse)) + else: + print('Waiting... RegisterThingResponse: ...') + time.sleep(1) + + +if __name__ == '__main__': + # Create the proxy options if the data is present in cmdData + proxy_options = None + if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0: + proxy_options = http.HttpProxyOptions( + host_name=cmdData.input_proxy_host, + port=cmdData.input_proxy_port) + + # Create a MQTT connection from the command line data + mqtt5_client = mqtt5_client_builder.mtls_from_path( + endpoint=cmdData.input_endpoint, + port=cmdData.input_port, + cert_filepath=cmdData.input_cert, + pri_key_filepath=cmdData.input_key, + ca_filepath=cmdData.input_ca, + client_id=cmdData.input_clientId, + clean_session=False, + keep_alive_secs=30, + http_proxy_options=proxy_options, + on_lifecycle_connection_success=on_lifecycle_connection_success, + on_lifecycle_stopped=on_lifecycle_stopped) + + if not cmdData.input_is_ci: + print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...") + else: + print("Connecting to endpoint with client ID") + + mqtt5_client.start() + + identity_client = iotidentity.IotIdentityClient(mqtt5_client) + + # Wait for connection to be fully established. + # Note that it's not necessary to wait, commands issued to the + # mqtt5_client before its fully connected will simply be queued. + # But this sample waits here so it's obvious when a connection + # fails or succeeds. + future_connection_success.result() + print("Connected!") + + try: + # Subscribe to necessary topics. + # Note that is **is** important to wait for "accepted/rejected" subscriptions + # to succeed before publishing the corresponding "request". + + # Keys workflow if csr is not provided + if cmdData.input_csr_path is None: + createkeysandcertificate_subscription_request = iotidentity.CreateKeysAndCertificateSubscriptionRequest() + + print("Subscribing to CreateKeysAndCertificate Accepted topic...") + createkeysandcertificate_subscribed_accepted_future, _ = identity_client.subscribe_to_create_keys_and_certificate_accepted( + request=createkeysandcertificate_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=createkeysandcertificate_execution_accepted) + + # Wait for subscription to succeed + createkeysandcertificate_subscribed_accepted_future.result() + + print("Subscribing to CreateKeysAndCertificate Rejected topic...") + createkeysandcertificate_subscribed_rejected_future, _ = identity_client.subscribe_to_create_keys_and_certificate_rejected( + request=createkeysandcertificate_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=createkeysandcertificate_execution_rejected) + + # Wait for subscription to succeed + createkeysandcertificate_subscribed_rejected_future.result() + else: + createcertificatefromcsr_subscription_request = iotidentity.CreateCertificateFromCsrSubscriptionRequest() + + print("Subscribing to CreateCertificateFromCsr Accepted topic...") + createcertificatefromcsr_subscribed_accepted_future, _ = identity_client.subscribe_to_create_certificate_from_csr_accepted( + request=createcertificatefromcsr_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=createcertificatefromcsr_execution_accepted) + + # Wait for subscription to succeed + createcertificatefromcsr_subscribed_accepted_future.result() + + print("Subscribing to CreateCertificateFromCsr Rejected topic...") + createcertificatefromcsr_subscribed_rejected_future, _ = identity_client.subscribe_to_create_certificate_from_csr_rejected( + request=createcertificatefromcsr_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=createcertificatefromcsr_execution_rejected) + + # Wait for subscription to succeed + createcertificatefromcsr_subscribed_rejected_future.result() + + registerthing_subscription_request = iotidentity.RegisterThingSubscriptionRequest( + template_name=cmdData.input_template_name) + + print("Subscribing to RegisterThing Accepted topic...") + registerthing_subscribed_accepted_future, _ = identity_client.subscribe_to_register_thing_accepted( + request=registerthing_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=registerthing_execution_accepted) + + # Wait for subscription to succeed + registerthing_subscribed_accepted_future.result() + + print("Subscribing to RegisterThing Rejected topic...") + registerthing_subscribed_rejected_future, _ = identity_client.subscribe_to_register_thing_rejected( + request=registerthing_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=registerthing_execution_rejected) + # Wait for subscription to succeed + registerthing_subscribed_rejected_future.result() + + fleet_template_name = cmdData.input_template_name + fleet_template_parameters = cmdData.input_template_parameters + if cmdData.input_csr_path is None: + print("Publishing to CreateKeysAndCertificate...") + publish_future = identity_client.publish_create_keys_and_certificate( + request=iotidentity.CreateKeysAndCertificateRequest(), qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future.add_done_callback(on_publish_create_keys_and_certificate) + + waitForCreateKeysAndCertificateResponse() + + if createKeysAndCertificateResponse is None: + raise Exception('CreateKeysAndCertificate API did not succeed') + + registerThingRequest = iotidentity.RegisterThingRequest( + template_name=fleet_template_name, + certificate_ownership_token=createKeysAndCertificateResponse.certificate_ownership_token, + parameters=json.loads(fleet_template_parameters)) + else: + print("Publishing to CreateCertificateFromCsr...") + csrPath = open(cmdData.input_csr_path, 'r').read() + publish_future = identity_client.publish_create_certificate_from_csr( + request=iotidentity.CreateCertificateFromCsrRequest(certificate_signing_request=csrPath), + qos=mqtt5.QoS.AT_LEAST_ONCE) + publish_future.add_done_callback(on_publish_create_certificate_from_csr) + + waitForCreateCertificateFromCsrResponse() + + if createCertificateFromCsrResponse is None: + raise Exception('CreateCertificateFromCsr API did not succeed') + + registerThingRequest = iotidentity.RegisterThingRequest( + template_name=fleet_template_name, + certificate_ownership_token=createCertificateFromCsrResponse.certificate_ownership_token, + parameters=json.loads(fleet_template_parameters)) + + print("Publishing to RegisterThing topic...") + registerthing_publish_future = identity_client.publish_register_thing( + registerThingRequest, mqtt5.QoS.AT_LEAST_ONCE) + registerthing_publish_future.add_done_callback(on_publish_register_thing) + + waitForRegisterThingResponse() + exit("success") + + except Exception as e: + exit(e) + + # Wait for the sample to finish + is_sample_done.wait() diff --git a/samples/jobs_mqtt5.md b/samples/jobs_mqtt5.md new file mode 100644 index 00000000..e8b2e1ed --- /dev/null +++ b/samples/jobs_mqtt5.md @@ -0,0 +1,144 @@ +# Jobs MQTT5 + +[**Return to main sample list**](./README.md) + +This sample uses the AWS IoT [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) Service to describe jobs to execute. [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) is a service that allows you to define and respond to remote operation requests defined through the AWS IoT Core website or via any other device (or CLI command) that can access the [Jobs](https://docs.aws.amazon.com/iot/latest/developerguide/iot-jobs.html) service. + +Note: This sample requires you to create jobs for your device to execute. See +[instructions here](https://docs.aws.amazon.com/iot/latest/developerguide/create-manage-jobs.html) for how to make jobs. + +On startup, the sample describes the jobs that are pending execution and pretends to process them, marking each job as complete as it does so. + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+Sample Policy +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": "iot:Publish",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/start-next",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/update",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/get",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/get"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Receive",
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/notify-next",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/start-next/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/update/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/get/*",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/jobs/*/get/*"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Subscribe",
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/notify-next",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/start-next/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/*/update/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/get/*",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/jobs/*/get/*"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/test-*"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: The name of your AWS IoT Core thing you want the device connection to be associated with + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +Use the following command to run the Jobs sample from the `samples` folder: + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 jobs_mqtt5.py --endpoint --cert --key --thing_name +``` + +You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 jobs_mqtt5.py --endpoint --cert --key --thing_name --ca_file +``` + + +## Service Client Notes +### Difference relative to MQTT311 IotJobsClient +The IotJobsClient with mqtt5 client is almost identical to the mqtt3 one. The only difference is that you would need setup up a Mqtt5 Client and pass it to the IotJobsClient. +For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](./mqtt5_pubsub.py) + + + + + + + + + + +
Create a IotJobsClient with Mqtt5Create a IotJobsClient with Mqtt311
+ +```python + # Create a Mqtt5 Client + mqtt5_client = mqtt5_client_builder.mtls_from_path( + endpoint, + port, + cert_filepath, + pri_key_filepath, + ca_filepath, + client_id, + clean_session, + keep_alive_secs, + http_proxy_options, + on_lifecycle_connection_success, + on_lifecycle_stopped) + + # Create the Jobs client from Mqtt5 Client + jobs_client = iotjobs.IotJobsClient(mqtt5_client) +``` + + + +```python + # Create a Mqtt311 Connection from the command line data + mqtt_connection = mqtt_connection_builder.mtls_from_path( + endpoint, + port, + cert_filepath, + pri_key_filepath, + ca_filepath, + client_id, + clean_session, + keep_alive_secs, + http_proxy_options) + + # Create the Jobs client from Mqtt311 Connection + jobs_client = iotjobs.IotJobsClient(mqtt_connection) +``` + +
+ +### Mqtt5.QoS v.s. Mqtt3.QoS +As the service client interface is unchanged for both Mqtt3 Connection and Mqtt5 Client,the IotJobsClient will use Mqtt3.QoS instead of Mqtt5.QoS even with a Mqtt5 Client. You could use mqtt3.QoS.to_mqtt5() and mqtt5.QoS.to_mqtt3() to convert the value. \ No newline at end of file diff --git a/samples/jobs_mqtt5.py b/samples/jobs_mqtt5.py new file mode 100644 index 00000000..ea0a4fb3 --- /dev/null +++ b/samples/jobs_mqtt5.py @@ -0,0 +1,405 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +from awscrt import mqtt5, http +from awsiot import iotjobs, mqtt5_client_builder +from concurrent.futures import Future +import sys +import threading +import time +import traceback +import time +from utils.command_line_utils import CommandLineUtils + +# - Overview - +# This sample uses the AWS IoT Jobs Service to get a list of pending jobs and +# then execution operations on these pending jobs until there are no more +# remaining on the device. Imagine periodic software updates that must be sent to and +# executed on devices in the wild. +# +# - Instructions - +# This sample requires you to create jobs for your device to execute. See: +# https://docs.aws.amazon.com/iot/latest/developerguide/create-manage-jobs.html +# +# - Detail - +# On startup, the sample tries to get a list of all the in-progress and queued +# jobs and display them in a list. Then it tries to start the next pending job execution. +# If such a job exists, the sample emulates "doing work" by spawning a thread +# that sleeps for several seconds before marking the job as SUCCEEDED. When no +# pending job executions exist, the sample sits in an idle state. +# +# The sample also subscribes to receive "Next Job Execution Changed" events. +# If the sample is idle, this event wakes it to start the job. If the sample is +# already working on a job, it remembers to try for another when it's done. +# This event is sent by the service when the current job completes, so the +# sample will be continually prompted to try another job until none remain. + +# Using globals to simplify sample code +is_sample_done = threading.Event() + +# cmdData is the arguments/input from the command line placed into a single struct for +# use in this sample. This handles all of the command line parsing, validating, etc. +# See the Utils/CommandLineUtils for more information. +cmdData = CommandLineUtils.parse_sample_input_jobs() + +mqtt5_client = None +future_connection_success = Future() +jobs_client = None +jobs_thing_name = cmdData.input_thing_name + + +class LockedData: + def __init__(self): + self.lock = threading.Lock() + self.disconnect_called = False + self.is_working_on_job = False + self.is_next_job_waiting = False + self.got_job_response = False + + +locked_data = LockedData() + +# Function for gracefully quitting this sample +def exit(msg_or_exception): + if isinstance(msg_or_exception, Exception): + print("Exiting Sample due to exception.") + traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) + else: + print("Exiting Sample:", msg_or_exception) + + with locked_data.lock: + if not locked_data.disconnect_called: + print("Disconnecting...") + locked_data.disconnect_called = True + mqtt5_client.stop() + + +def try_start_next_job(): + print("Trying to start the next job...") + with locked_data.lock: + if locked_data.is_working_on_job: + print("Nevermind, already working on a job.") + return + + if locked_data.disconnect_called: + print("Nevermind, sample is disconnecting.") + return + + locked_data.is_working_on_job = True + locked_data.is_next_job_waiting = False + + print("Publishing request to start next job...") + request = iotjobs.StartNextPendingJobExecutionRequest(thing_name=jobs_thing_name) + publish_future = jobs_client.publish_start_next_pending_job_execution(request, mqtt5.QoS.AT_LEAST_ONCE) + publish_future.add_done_callback(on_publish_start_next_pending_job_execution) + + +def done_working_on_job(): + with locked_data.lock: + locked_data.is_working_on_job = False + try_again = locked_data.is_next_job_waiting + + if try_again: + try_start_next_job() + + +# Callback for the lifecycle event Connection Success +def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): + print("Lifecycle Connection Success") + global future_connection_success + future_connection_success.set_result(lifecycle_connect_success_data) + +# Callback for the lifecycle event on Client Stopped +def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData): + print("Client Stopped.") + + # Signal that sample is finished + is_sample_done.set() + + + +# A list to hold all the pending jobs +available_jobs = [] + + +def on_get_pending_job_executions_accepted(response): + # type: (iotjobs.GetPendingJobExecutionsResponse) -> None + with locked_data.lock: + if (len(response.queued_jobs) > 0 or len(response.in_progress_jobs) > 0): + print("Pending Jobs:") + for job in response.in_progress_jobs: + available_jobs.append(job) + print(f" In Progress: {job.job_id} @ {job.last_updated_at}") + for job in response.queued_jobs: + available_jobs.append(job) + print(f" {job.job_id} @ {job.last_updated_at}") + else: + print("No pending or queued jobs found!") + locked_data.got_job_response = True + + +def on_get_pending_job_executions_rejected(error): + # type: (iotjobs.RejectedError) -> None + print(f"Request rejected: {error.code}: {error.message}") + exit("Get pending jobs request rejected!") + + +def on_next_job_execution_changed(event): + # type: (iotjobs.NextJobExecutionChangedEvent) -> None + try: + execution = event.execution + if execution: + print("Received Next Job Execution Changed event. job_id:{} job_document:{}".format( + execution.job_id, execution.job_document)) + + # Start job now, or remember to start it when current job is done + start_job_now = False + with locked_data.lock: + if locked_data.is_working_on_job: + locked_data.is_next_job_waiting = True + else: + start_job_now = True + + if start_job_now: + try_start_next_job() + + else: + print("Received Next Job Execution Changed event: None. Waiting for further jobs...") + + except Exception as e: + exit(e) + + +def on_publish_start_next_pending_job_execution(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + + print("Published request to start the next job.") + + except Exception as e: + exit(e) + + +def on_start_next_pending_job_execution_accepted(response): + # type: (iotjobs.StartNextJobExecutionResponse) -> None + try: + if response.execution: + execution = response.execution + print("Request to start next job was accepted. job_id:{} job_document:{}".format( + execution.job_id, execution.job_document)) + + # To emulate working on a job, spawn a thread that sleeps for a few seconds + job_thread = threading.Thread( + target=lambda: job_thread_fn(execution.job_id, execution.job_document), + name='job_thread') + job_thread.start() + else: + print("Request to start next job was accepted, but there are no jobs to be done. Waiting for further jobs...") + done_working_on_job() + + except Exception as e: + exit(e) + + +def on_start_next_pending_job_execution_rejected(rejected): + # type: (iotjobs.RejectedError) -> None + exit("Request to start next pending job rejected with code:'{}' message:'{}'".format( + rejected.code, rejected.message)) + + +def job_thread_fn(job_id, job_document): + try: + print("Starting local work on job...") + time.sleep(cmdData.input_job_time) + print("Done working on job.") + + print("Publishing request to update job status to SUCCEEDED...") + request = iotjobs.UpdateJobExecutionRequest( + thing_name=jobs_thing_name, + job_id=job_id, + status=iotjobs.JobStatus.SUCCEEDED) + publish_future = jobs_client.publish_update_job_execution(request, mqtt5.QoS.AT_LEAST_ONCE) + publish_future.add_done_callback(on_publish_update_job_execution) + + except Exception as e: + exit(e) + + +def on_publish_update_job_execution(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published request to update job.") + + except Exception as e: + exit(e) + + +def on_update_job_execution_accepted(response): + # type: (iotjobs.UpdateJobExecutionResponse) -> None + try: + print("Request to update job was accepted.") + done_working_on_job() + except Exception as e: + exit(e) + + +def on_update_job_execution_rejected(rejected): + # type: (iotjobs.RejectedError) -> None + exit("Request to update job status was rejected. code:'{}' message:'{}'.".format( + rejected.code, rejected.message)) + + +if __name__ == '__main__': + + # Create the proxy options if the data is present in cmdData + proxy_options = None + if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0: + proxy_options = http.HttpProxyOptions( + host_name=cmdData.input_proxy_host, + port=cmdData.input_proxy_port) + + # Create a mqtt5 connection from the command line data + mqtt5_client = mqtt5_client_builder.mtls_from_path( + endpoint=cmdData.input_endpoint, + port=cmdData.input_port, + cert_filepath=cmdData.input_cert, + pri_key_filepath=cmdData.input_key, + ca_filepath=cmdData.input_ca, + client_id=cmdData.input_clientId, + clean_session=False, + keep_alive_secs=30, + http_proxy_options=proxy_options, + on_lifecycle_connection_success=on_lifecycle_connection_success, + on_lifecycle_stopped=on_lifecycle_stopped) + + if not cmdData.input_is_ci: + print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...") + else: + print("Connecting to endpoint with client ID") + + mqtt5_client.start() + + jobs_client = iotjobs.IotJobsClient(mqtt5_client) + + # Wait for connection to be fully established. + # Note that it's not necessary to wait, commands issued to the + # mqtt5_client before its fully connected will simply be queued. + # But this sample waits here so it's obvious when a connection + # fails or succeeds. + future_connection_success.result() + print("Connected!") + + try: + # List the jobs queued and pending + get_jobs_request = iotjobs.GetPendingJobExecutionsRequest(thing_name=jobs_thing_name) + jobs_request_future_accepted, _ = jobs_client.subscribe_to_get_pending_job_executions_accepted( + request=get_jobs_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_get_pending_job_executions_accepted + ) + # Wait for the subscription to succeed + jobs_request_future_accepted.result() + + jobs_request_future_rejected, _ = jobs_client.subscribe_to_get_pending_job_executions_rejected( + request=get_jobs_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_get_pending_job_executions_rejected + ) + # Wait for the subscription to succeed + jobs_request_future_rejected.result() + + # Get a list of all the jobs + get_jobs_request_future = jobs_client.publish_get_pending_job_executions( + request=get_jobs_request, + qos=mqtt5.QoS.AT_LEAST_ONCE + ) + # Wait for the publish to succeed + get_jobs_request_future.result() + except Exception as e: + exit(e) + + # If we are running in CI, then we want to check how many jobs were reported and stop + if (cmdData.input_is_ci): + # Wait until we get a response. If we do not get a response after 50 tries, then abort + got_job_response_tries = 0 + while (locked_data.got_job_response == False): + got_job_response_tries += 1 + if (got_job_response_tries > 50): + exit("Got job response timeout exceeded") + sys.exit(-1) + time.sleep(0.2) + + if (len(available_jobs) > 0): + print("At least one job queued in CI! No further work to do. Exiting sample...") + sys.exit(0) + else: + print("ERROR: No jobs queued in CI! At least one job should be queued!") + sys.exit(-1) + + try: + # Subscribe to necessary topics. + # Note that is **is** important to wait for "accepted/rejected" subscriptions + # to succeed before publishing the corresponding "request". + print("Subscribing to Next Changed events...") + changed_subscription_request = iotjobs.NextJobExecutionChangedSubscriptionRequest( + thing_name=jobs_thing_name) + + subscribed_future, _ = jobs_client.subscribe_to_next_job_execution_changed_events( + request=changed_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_next_job_execution_changed) + + # Wait for subscription to succeed + subscribed_future.result() + + print("Subscribing to Start responses...") + start_subscription_request = iotjobs.StartNextPendingJobExecutionSubscriptionRequest( + thing_name=jobs_thing_name) + subscribed_accepted_future, _ = jobs_client.subscribe_to_start_next_pending_job_execution_accepted( + request=start_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_start_next_pending_job_execution_accepted) + + subscribed_rejected_future, _ = jobs_client.subscribe_to_start_next_pending_job_execution_rejected( + request=start_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_start_next_pending_job_execution_rejected) + + # Wait for subscriptions to succeed + subscribed_accepted_future.result() + subscribed_rejected_future.result() + + print("Subscribing to Update responses...") + # Note that we subscribe to "+", the MQTT wildcard, to receive + # responses about any job-ID. + update_subscription_request = iotjobs.UpdateJobExecutionSubscriptionRequest( + thing_name=jobs_thing_name, + job_id='+') + + subscribed_accepted_future, _ = jobs_client.subscribe_to_update_job_execution_accepted( + request=update_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_update_job_execution_accepted) + + subscribed_rejected_future, _ = jobs_client.subscribe_to_update_job_execution_rejected( + request=update_subscription_request, + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_update_job_execution_rejected) + + # Wait for subscriptions to succeed + subscribed_accepted_future.result() + subscribed_rejected_future.result() + + # Make initial attempt to start next job. The service should reply with + # an "accepted" response, even if no jobs are pending. The response + # will contain data about the next job, if there is one. + # (Will do nothing if we are in CI) + try_start_next_job() + + except Exception as e: + exit(e) + + # Wait for the sample to finish + is_sample_done.wait() diff --git a/samples/shadow_mqtt5.md b/samples/shadow_mqtt5.md new file mode 100644 index 00000000..cc2dfb5d --- /dev/null +++ b/samples/shadow_mqtt5.md @@ -0,0 +1,146 @@ +# Shadow MQTT5 + +[**Return to main sample list**](./README.md) + +This sample uses the AWS IoT [Device Shadow](https://docs.aws.amazon.com/iot/latest/developerguide/iot-device-shadows.html) Service to keep a property in sync between device and server. Imagine a light whose color may be changed through an app, or set by a local user. + +Once connected, type a value in the terminal and press Enter to update the property's "reported" value. The sample also responds when the "desired" value changes on the server. To observe this, edit the Shadow document in the AWS Console and set a new "desired" value. + +On startup, the sample requests the shadow document to learn the property's initial state. The sample also subscribes to "delta" events from the server, which are sent when a property's "desired" value differs from its "reported" value. When the sample learns of a new desired value, that value is changed on the device and an update is sent to the server with the new "reported" value. + +Your IoT Core Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect, subscribe, publish, and receive. Below is a sample policy that can be used on your IoT Core Thing that will allow this sample to run as intended. + +
+Sample Policy +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Publish"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Receive"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get/accepted",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/get/rejected",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/accepted",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/rejected",
+        "arn:aws:iot:region:account:topic/$aws/things/thingname/shadow/update/delta"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/get/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/get/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/accepted",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/rejected",
+        "arn:aws:iot:region:account:topicfilter/$aws/things/thingname/shadow/update/delta"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": "iot:Connect",
+      "Resource": "arn:aws:iot:region:account:client/test-*"
+    }
+  ]
+}
+
+ +Replace with the following with the data from your AWS account: +* ``: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`. +* ``: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website. +* ``: The name of your AWS IoT Core thing you want the device connection to be associated with + +Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id ` to send the client ID your policy supports. + +
+ +## How to run + +To run the Shadow sample from the `samples` folder, use the following command: + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 shadow_mqtt5.py --endpoint --cert --key --thing_name +``` + +You can also pass a Certificate Authority file (CA) if your certificate and key combination requires it: + +``` sh +# For Windows: replace 'python3' with 'python' and '/' with '\' +python3 shadow_mqtt5.py --endpoint --cert --key --thing_name --ca_file +``` + +## Service Client Notes +### Difference relative to MQTT311 IotShadowClient +The IotShadowClient with mqtt5 client is almost identical to mqtt3 one. The only difference is that you would need setup up a Mqtt5 Client and pass it to the IoTShadowClient. +For how to setup a Mqtt5 Client, please refer to [MQTT5 UserGuide](../documents/MQTT5_Userguide.md) and [MQTT5 PubSub Sample](./mqtt5_pubsub.py) + + + + + + + + + + +
Create a IotShadowClient with Mqtt5Create a IotShadowClient with Mqtt311
+ +```python + # Create a Mqtt5 Client + mqtt5_client = mqtt5_client_builder.mtls_from_path( + endpoint, + port, + cert_filepath, + pri_key_filepath, + ca_filepath, + client_id, + clean_session, + keep_alive_secs, + http_proxy_options, + on_lifecycle_connection_success, + on_lifecycle_stopped) + + # Create the shadow client from Mqtt5 Client + shadow_client = iotshadow.IotShadowClient(mqtt5_client) +``` + + + +```python + # Create a Mqtt311 Connection from the command line data + mqtt_connection = mqtt_connection_builder.mtls_from_path( + endpoint, + port, + cert_filepath, + pri_key_filepath, + ca_filepath, + client_id, + clean_session, + keep_alive_secs, + http_proxy_options) + + # Create the shadow client from Mqtt311 Connection + shadow_client = iotshadow.IotShadowClient(mqtt_connection) +``` + +
+ +### Mqtt5.QoS v.s. Mqtt3.QoS +As the service client interface is unchanged for both Mqtt3 Connection and Mqtt5 Client,the IoTShadowClient will use Mqtt3.QoS instead of Mqtt5.QoS even with a Mqtt5 Client. You could use mqtt3.QoS.to_mqtt5() and mqtt5.QoS.to_mqtt3() to convert the value. \ No newline at end of file diff --git a/samples/shadow_mqtt5.py b/samples/shadow_mqtt5.py new file mode 100644 index 00000000..a71e8fd0 --- /dev/null +++ b/samples/shadow_mqtt5.py @@ -0,0 +1,433 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +from time import sleep +from awscrt import mqtt5, http +from awsiot import iotshadow, mqtt5_client_builder +from concurrent.futures import Future +import sys +import threading +import traceback +from uuid import uuid4 +from utils.command_line_utils import CommandLineUtils + +# - Overview - +# This sample uses the AWS IoT Device Shadow Service to keep a property in +# sync between device and server. Imagine a light whose color may be changed +# through an app, or set by a local user. +# +# - Instructions - +# Once connected, type a value in the terminal and press Enter to update +# the property's "reported" value. The sample also responds when the "desired" +# value changes on the server. To observe this, edit the Shadow document in +# the AWS Console and set a new "desired" value. +# +# - Detail - +# On startup, the sample requests the shadow document to learn the property's +# initial state. The sample also subscribes to "delta" events from the server, +# which are sent when a property's "desired" value differs from its "reported" +# value. When the sample learns of a new desired value, that value is changed +# on the device and an update is sent to the server with the new "reported" +# value. + +# cmdData is the arguments/input from the command line placed into a single struct for +# use in this sample. This handles all of the command line parsing, validating, etc. +# See the Utils/CommandLineUtils for more information. +cmdData = CommandLineUtils.parse_sample_input_shadow() + +# Using globals to simplify sample code +is_sample_done = threading.Event() +mqtt5_client = None +future_connection_success = Future() +shadow_thing_name = cmdData.input_thing_name +shadow_property = cmdData.input_shadow_property + +SHADOW_VALUE_DEFAULT = "off" + + +class LockedData: + def __init__(self): + self.lock = threading.Lock() + self.shadow_value = None + self.disconnect_called = False + self.request_tokens = set() + + +locked_data = LockedData() + +# Function for gracefully quitting this sample +def exit(msg_or_exception): + if isinstance(msg_or_exception, Exception): + print("Exiting sample due to exception.") + traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) + else: + print("Exiting sample:", msg_or_exception) + + with locked_data.lock: + if not locked_data.disconnect_called: + print("Stop the client...") + locked_data.disconnect_called = True + mqtt5_client.stop() + +# Callback for the lifecycle event Connection Success +def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): + print("Lifecycle Connection Success") + global future_connection_success + future_connection_success.set_result(lifecycle_connect_success_data) + +# Callback for the lifecycle event on Client Stopped +def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData): + # type: (Future) -> None + print("Client Stopped.") + + # Signal that sample is finished + is_sample_done.set() + + +def on_get_shadow_accepted(response): + # type: (iotshadow.GetShadowResponse) -> None + try: + with locked_data.lock: + # check that this is a response to a request from this session + try: + locked_data.request_tokens.remove(response.client_token) + except KeyError: + print("Ignoring get_shadow_accepted message due to unexpected token.") + return + + print("Finished getting initial shadow state.") + if locked_data.shadow_value is not None: + print(" Ignoring initial query because a delta event has already been received.") + return + + if response.state: + if response.state.delta: + value = response.state.delta.get(shadow_property) + if value: + print(" Shadow contains delta value '{}'.".format(value)) + change_shadow_value(value) + return + + if response.state.reported: + value = response.state.reported.get(shadow_property) + if value: + print(" Shadow contains reported value '{}'.".format(value)) + set_local_value_due_to_initial_query(response.state.reported[shadow_property]) + return + + print(" Shadow document lacks '{}' property. Setting defaults...".format(shadow_property)) + change_shadow_value(SHADOW_VALUE_DEFAULT) + return + + except Exception as e: + exit(e) + + +def on_get_shadow_rejected(error): + # type: (iotshadow.ErrorResponse) -> None + try: + # check that this is a response to a request from this session + with locked_data.lock: + try: + locked_data.request_tokens.remove(error.client_token) + except KeyError: + print("Ignoring get_shadow_rejected message due to unexpected token.") + return + + if error.code == 404: + print("Thing has no shadow document. Creating with defaults...") + change_shadow_value(SHADOW_VALUE_DEFAULT) + else: + exit("Get request was rejected. code:{} message:'{}'".format( + error.code, error.message)) + + except Exception as e: + exit(e) + + +def on_shadow_delta_updated(delta): + # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None + try: + print("Received shadow delta event.") + if delta.state and (shadow_property in delta.state): + value = delta.state[shadow_property] + if value is None: + print(" Delta reports that '{}' was deleted. Resetting defaults...".format(shadow_property)) + change_shadow_value(SHADOW_VALUE_DEFAULT) + return + else: + print(" Delta reports that desired value is '{}'. Changing local value...".format(value)) + if (delta.client_token is not None): + print(" ClientToken is: " + delta.client_token) + change_shadow_value(value) + else: + print(" Delta did not report a change in '{}'".format(shadow_property)) + + except Exception as e: + exit(e) + + +def on_publish_update_shadow(future): + # type: (Future) -> None + try: + future.result() + print("Update request published.") + except Exception as e: + print("Failed to publish update request.") + exit(e) + + +def on_update_shadow_accepted(response): + # type: (iotshadow.UpdateShadowResponse) -> None + try: + # check that this is a response to a request from this session + with locked_data.lock: + try: + locked_data.request_tokens.remove(response.client_token) + except KeyError: + print("Ignoring update_shadow_accepted message due to unexpected token.") + return + + try: + if response.state.reported is not None: + if shadow_property in response.state.reported: + print("Finished updating reported shadow value to '{}'.".format( + response.state.reported[shadow_property])) # type: ignore + else: + print("Could not find shadow property with name: '{}'.".format(shadow_property)) # type: ignore + else: + print("Shadow states cleared.") # when the shadow states are cleared, reported and desired are set to None + print("Enter desired value: ") # remind user they can input new values + except BaseException: + exit("Updated shadow is missing the target property") + + except Exception as e: + exit(e) + + +def on_update_shadow_rejected(error): + # type: (iotshadow.ErrorResponse) -> None + try: + # check that this is a response to a request from this session + with locked_data.lock: + try: + locked_data.request_tokens.remove(error.client_token) + except KeyError: + print("Ignoring update_shadow_rejected message due to unexpected token.") + return + + exit("Update request was rejected. code:{} message:'{}'".format( + error.code, error.message)) + + except Exception as e: + exit(e) + + +def set_local_value_due_to_initial_query(reported_value): + with locked_data.lock: + locked_data.shadow_value = reported_value + print("Enter desired value: ") # remind user they can input new values + + +def change_shadow_value(value): + with locked_data.lock: + if locked_data.shadow_value == value: + print("Local value is already '{}'.".format(value)) + print("Enter desired value: ") # remind user they can input new values + return + + print("Changed local shadow value to '{}'.".format(value)) + locked_data.shadow_value = value + + print("Updating reported shadow value to '{}'...".format(value)) + + # use a unique token so we can correlate this "request" message to + # any "response" messages received on the /accepted and /rejected topics + token = str(uuid4()) + + # if the value is "clear shadow" then send a UpdateShadowRequest with None + # for both reported and desired to clear the shadow document completely. + if value == "clear_shadow": + tmp_state = iotshadow.ShadowState( + reported=None, + desired=None, + reported_is_nullable=True, + desired_is_nullable=True) + request = iotshadow.UpdateShadowRequest( + thing_name=shadow_thing_name, + state=tmp_state, + client_token=token, + ) + # Otherwise, send a normal update request + else: + # if the value is "none" then set it to a Python none object to + # clear the individual shadow property + if value == "none": + value = None + + request = iotshadow.UpdateShadowRequest( + thing_name=shadow_thing_name, + state=iotshadow.ShadowState( + reported={shadow_property: value}, + desired={shadow_property: value}, + ), + client_token=token, + ) + + future = shadow_client.publish_update_shadow(request, mqtt5.QoS.AT_LEAST_ONCE) + + locked_data.request_tokens.add(token) + + future.add_done_callback(on_publish_update_shadow) + + +def user_input_thread_fn(): + # If we are not in CI, then take terminal input + if not cmdData.input_is_ci: + while True: + try: + # Read user input + new_value = input() + + # If user wants to quit sample, then quit. + # Otherwise change the shadow value. + if new_value in ['exit', 'quit']: + exit("User has quit") + break + else: + change_shadow_value(new_value) + + except Exception as e: + print("Exception on input thread.") + exit(e) + break + # Otherwise, send shadow updates automatically + else: + try: + messages_sent = 0 + while messages_sent < 5: + cli_input = "Shadow_Value_" + str(messages_sent) + change_shadow_value(cli_input) + sleep(1) + messages_sent += 1 + exit("CI has quit") + except Exception as e: + print("Exception on input thread (CI)") + exit(e) + + +if __name__ == '__main__': + # Create the proxy options if the data is present in cmdData + proxy_options = None + if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0: + proxy_options = http.HttpProxyOptions( + host_name=cmdData.input_proxy_host, + port=cmdData.input_proxy_port) + + # Create a mqtt5 connection from the command line data + mqtt5_client = mqtt5_client_builder.mtls_from_path( + endpoint=cmdData.input_endpoint, + port=cmdData.input_port, + cert_filepath=cmdData.input_cert, + pri_key_filepath=cmdData.input_key, + ca_filepath=cmdData.input_ca, + client_id=cmdData.input_clientId, + clean_session=False, + keep_alive_secs=30, + http_proxy_options=proxy_options, + on_lifecycle_connection_success=on_lifecycle_connection_success, + on_lifecycle_stopped=on_lifecycle_stopped) + + if not cmdData.input_is_ci: + print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...") + else: + print("Connecting to endpoint with client ID") + + mqtt5_client.start() + + shadow_client = iotshadow.IotShadowClient(mqtt5_client) + + # Wait for connection to be fully established. + # Note that it's not necessary to wait, commands issued to the + # mqtt5_client before its fully connected will simply be queued. + # But this sample waits here so it's obvious when a connection + # fails or succeeds. + future_connection_success.result() + print("Connected!") + + try: + # Subscribe to necessary topics. + # Note that is **is** important to wait for "accepted/rejected" subscriptions + # to succeed before publishing the corresponding "request". + print("Subscribing to Update responses...") + update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted( + request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_update_shadow_accepted) + + + update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected( + request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=shadow_thing_name), + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_update_shadow_rejected) + + # Wait for subscriptions to succeed + result = update_accepted_subscribed_future.result() + update_rejected_subscribed_future.result() + + print("Subscribing to Get responses...") + get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted( + request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_get_shadow_accepted) + + get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected( + request=iotshadow.GetShadowSubscriptionRequest(thing_name=shadow_thing_name), + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_get_shadow_rejected) + + # Wait for subscriptions to succeed + get_accepted_subscribed_future.result() + get_rejected_subscribed_future.result() + + print("Subscribing to Delta events...") + delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events( + request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name), + qos=mqtt5.QoS.AT_LEAST_ONCE, + callback=on_shadow_delta_updated) + + # Wait for subscription to succeed + delta_subscribed_future.result() + + # The rest of the sample runs asynchronously. + + # Issue request for shadow's current state. + # The response will be received by the on_get_accepted() callback + print("Requesting current shadow state...") + + with locked_data.lock: + # use a unique token so we can correlate this "request" message to + # any "response" messages received on the /accepted and /rejected topics + token = str(uuid4()) + + publish_get_future = shadow_client.publish_get_shadow( + request=iotshadow.GetShadowRequest(thing_name=shadow_thing_name, client_token=token), + qos=mqtt5.QoS.AT_LEAST_ONCE) + + locked_data.request_tokens.add(token) + + # Ensure that publish succeeds + publish_get_future.result() + + # Launch thread to handle user input. + # A "daemon" thread won't prevent the program from shutting down. + print("Launching thread to read user input...") + user_input_thread = threading.Thread(target=user_input_thread_fn, name='user_input_thread') + user_input_thread.daemon = True + user_input_thread.start() + + except Exception as e: + exit(e) + + # Wait for the sample to finish (user types 'quit', or an error occurs) + is_sample_done.wait() diff --git a/setup.py b/setup.py index 26f12d81..0a26de6d 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ def _load_version(): "Operating System :: OS Independent", ], install_requires=[ - 'awscrt==0.18.0', + 'awscrt==0.19.1', ], python_requires='>=3.7', )