diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e349c504..36a252d6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,6 +32,8 @@ env: CI_DEVICE_ADVISOR: arn:aws:iam::180635532705:role/CI_DeviceAdvisor_Role CI_MQTT5_ROLE: arn:aws:iam::180635532705:role/CI_MQTT5_Role CI_BUILD_AND_TEST_ROLE: arn:aws:iam::180635532705:role/V2_SDK_Unit_Testing + CI_JOBS_SERVICE_CLIENT_ROLE: arn:aws:iam::180635532705:role/CI_JobsServiceClient_Role + CI_SERVICE_ROLE_CFG_FOLDER: "./aws-iot-device-sdk-python-v2/servicetests/test_cases" jobs: @@ -204,6 +206,23 @@ jobs: python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')" chmod a+x builder ./builder build -p ${{ env.PACKAGE_NAME }} + + - name: configure AWS credentials (service tests Jobs) + uses: aws-actions/configure-aws-credentials@v2 + with: + role-to-assume: ${{ env.CI_JOBS_SERVICE_CLIENT_ROLE}} + aws-region: ${{ env.AWS_DEFAULT_REGION }} + - name: run MQTT3 Jobs servicetests + working-directory: ./aws-iot-device-sdk-python-v2/servicetests + run: | + export PYTHONPATH=${{ github.workspace }}/aws-iot-device-sdk-python-v2/utils:${{ github.workspace }}/aws-iot-device-sdk-python-v2/samples + python3 ./test_cases/test_jobs_execution.py --config-file test_cases/mqtt3_jobs_cfg.json + - name: run MQTT5 Jobs servicetests + working-directory: ./aws-iot-device-sdk-python-v2/servicetests + run: | + export PYTHONPATH=${{ github.workspace }}/aws-iot-device-sdk-python-v2/utils:${{ github.workspace }}/aws-iot-device-sdk-python-v2/samples + python3 ./test_cases/test_jobs_execution.py --config-file test_cases/mqtt5_jobs_cfg.json + - name: configure AWS credentials (Connect and PubSub) uses: aws-actions/configure-aws-credentials@v1 with: diff --git a/samples/utils/command_line_utils.py b/samples/utils/command_line_utils.py index 98f5cd0d..1a93ed4f 100644 --- a/samples/utils/command_line_utils.py +++ b/samples/utils/command_line_utils.py @@ -461,6 +461,7 @@ def parse_sample_input_jobs(): cmdUtils.register_command(CommandLineUtils.m_cmd_port, "", "Connection port. AWS IoT supports 443 and 8883 (optional, default=8883).", type=int) cmdUtils.register_command(CommandLineUtils.m_cmd_thing_name, "", "The name assigned to your IoT Thing", required=True) cmdUtils.register_command(CommandLineUtils.m_cmd_job_time, "", "Emulate working on a job by sleeping this many seconds (optional, default='5')", default=5, type=int) + cmdUtils.register_command(CommandLineUtils.m_cmd_mqtt_version, "", "mqtt version (optional, default='5')", default=5, type=int) cmdUtils.get_args() cmdData = CommandLineUtils.CmdData() @@ -475,6 +476,7 @@ def parse_sample_input_jobs(): cmdData.input_thing_name = cmdUtils.get_command_required(CommandLineUtils.m_cmd_thing_name) cmdData.input_job_time = int(cmdUtils.get_command(CommandLineUtils.m_cmd_job_time, 5)) cmdData.input_is_ci = cmdUtils.get_command(CommandLineUtils.m_cmd_is_ci, None) != None + cmdData.input_mqtt_version = int(cmdUtils.get_command(CommandLineUtils.m_cmd_mqtt_version, 5)) return cmdData def parse_sample_input_mqtt5_custom_authorizer_connect(): @@ -877,3 +879,4 @@ def parse_sample_input_pkcs12_connect(): m_cmd_pkcs12_file = "pkcs12_file" m_cmd_pkcs12_password = "pkcs12_password" m_cmd_region = "region" + m_cmd_mqtt_version = "mqtt_version" diff --git a/servicetests/test_cases/mqtt3_jobs_cfg.json b/servicetests/test_cases/mqtt3_jobs_cfg.json new file mode 100644 index 00000000..235aa7fd --- /dev/null +++ b/servicetests/test_cases/mqtt3_jobs_cfg.json @@ -0,0 +1,28 @@ +{ + "language": "Python", + "runnable_file": "./tests/JobsExecution/jobs.py", + "runnable_region": "us-east-1", + "runnable_main_class": "", + "arguments": [ + { + "name": "--mqtt_version", + "data": "3" + }, + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "data": "tests/JobsExecution/certificate.pem.crt" + }, + { + "name": "--key", + "data": "tests/JobsExecution/private.pem.key" + }, + { + "name": "--thing_name", + "data": "ServiceTest_Jobs_$INPUT_UUID" + } + ] +} diff --git a/servicetests/test_cases/mqtt5_jobs_cfg.json b/servicetests/test_cases/mqtt5_jobs_cfg.json new file mode 100644 index 00000000..eb6f3ff4 --- /dev/null +++ b/servicetests/test_cases/mqtt5_jobs_cfg.json @@ -0,0 +1,28 @@ +{ + "language": "Python", + "runnable_file": "./tests/JobsExecution/jobs.py", + "runnable_region": "us-east-1", + "runnable_main_class": "", + "arguments": [ + { + "name": "--mqtt_version", + "data": "5" + }, + { + "name": "--endpoint", + "secret": "ci/endpoint" + }, + { + "name": "--cert", + "data": "tests/JobsExecution/certificate.pem.crt" + }, + { + "name": "--key", + "data": "tests/JobsExecution/private.pem.key" + }, + { + "name": "--thing_name", + "data": "ServiceTest_Jobs_$INPUT_UUID" + } + ] +} diff --git a/servicetests/test_cases/test_jobs_execution.py b/servicetests/test_cases/test_jobs_execution.py new file mode 100644 index 00000000..1de7cbc2 --- /dev/null +++ b/servicetests/test_cases/test_jobs_execution.py @@ -0,0 +1,120 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import argparse +import json +import os +import sys +import uuid +import time + +import boto3 + +import run_in_ci +import ci_iot_thing + + +def main(): + argument_parser = argparse.ArgumentParser( + description="Run Jobs test in CI") + argument_parser.add_argument( + "--config-file", required=True, help="JSON file providing command-line arguments for a test") + argument_parser.add_argument( + "--input-uuid", required=False, help="UUID for thing name. UUID will be generated if this option is omit") + argument_parser.add_argument( + "--region", required=False, default="us-east-1", help="The name of the region to use") + parsed_commands = argument_parser.parse_args() + + try: + iot_client = boto3.client('iot', region_name=parsed_commands.region) + secrets_client = boto3.client("secretsmanager", region_name=parsed_commands.region) + except Exception as e: + print(f"ERROR: Could not make Boto3 iot-data client. Credentials likely could not be sourced. Exception: {e}", + file=sys.stderr) + return -1 + + input_uuid = parsed_commands.input_uuid if parsed_commands.input_uuid else str(uuid.uuid4()) + + thing_name = "ServiceTest_Jobs_" + input_uuid + policy_name = secrets_client.get_secret_value( + SecretId="ci/JobsServiceClientTest/policy_name")["SecretString"] + + # Temporary certificate/key file path. + certificate_path = os.path.join(os.getcwd(), "tests/JobsExecution/certificate.pem.crt") + key_path = os.path.join(os.getcwd(), "tests/JobsExecution/private.pem.key") + + try: + ci_iot_thing.create_iot_thing( + thing_name=thing_name, + thing_group="CI_ServiceClient_Thing_Group", + region=parsed_commands.region, + policy_name=policy_name, + certificate_path=certificate_path, + key_path=key_path) + except Exception as e: + print(f"ERROR: Failed to create IoT thing: {e}") + sys.exit(-1) + + thing_job = 'ERROR' + i = 0; + while 'ERROR' in thing_job and i <= 4: + try: + job_id = secrets_client.get_secret_value(SecretId="ci/JobsServiceClientTest/job_id")["SecretString"] + thing_job = iot_client.describe_job_execution(jobId=job_id, thingName=thing_name) + print(f'thing job is {thing_job}'); + if 'ERROR' in thing_job: + i = i + 1; + else: + break; + except Exception as e: + print(f"Waiting for a newly created thing to be ready for the Job {e}") + i = i + 1; + time.sleep(1); + + # Perform Jobs test. If it's successful, the Job execution should be marked as SUCCEEDED for the thing. + try: + test_result = run_in_ci.setup_and_launch(parsed_commands.config_file, input_uuid) + except Exception as e: + print(f"ERROR: Failed to execute Jobs test: {e}") + test_result = -1 + + # Test reported success, verify that Job was indeed executed by the thing. + if test_result == 0: + print("Verifying that Job was executed") + try: + job_id = secrets_client.get_secret_value(SecretId="ci/JobsServiceClientTest/job_id")["SecretString"] + thing_job = iot_client.describe_job_execution(jobId=job_id, thingName=thing_name) + job_status = thing_job.get('execution', {}).get('status', {}) + if job_status != 'SUCCEEDED': + print(f"ERROR: Could not verify Job execution; Job info: {thing_job}") + test_result = -1 + except Exception as e: + print(f"ERROR: Could not verify Job execution: {e}") + test_result = -1 + + if test_result == 0: + print("Test succeeded") + + # Delete a thing created for this test run. + # NOTE We want to try to delete thing even if test was unsuccessful. + try: + ci_iot_thing.delete_iot_thing(thing_name, parsed_commands.region) + except Exception as e: + print(f"ERROR: Failed to delete thing: {e}") + # Fail the test if unable to delete thing, so this won't remain unnoticed. + test_result = -1 + + try: + if os.path.isfile(certificate_path): + os.remove(certificate_path) + if os.path.isfile(key_path): + os.remove(key_path) + except Exception as e: + print(f"WARNING: Failed to delete local files: {e}") + + if test_result != 0: + sys.exit(-1) + + +if __name__ == "__main__": + main() diff --git a/servicetests/tests/JobsExecution/jobs.py b/servicetests/tests/JobsExecution/jobs.py new file mode 100644 index 00000000..80101787 --- /dev/null +++ b/servicetests/tests/JobsExecution/jobs.py @@ -0,0 +1,433 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +from awscrt import mqtt, mqtt5, http +from awsiot import iotjobs, mqtt_connection_builder +from awsiot import iotjobs, mqtt5_client_builder +from concurrent.futures import Future +import sys +import threading +import time +import traceback +import time +from utils.command_line_utils import CommandLineUtils + +# - Overview - +# This sample uses the AWS IoT Jobs Service to get a list of pending jobs and +# then execution operations on these pending jobs until there are no more +# remaining on the device. Imagine periodic software updates that must be sent to and +# executed on devices in the wild. +# +# - Instructions - +# This sample requires you to create jobs for your device to execute. See: +# https://docs.aws.amazon.com/iot/latest/developerguide/create-manage-jobs.html +# +# - Detail - +# On startup, the sample tries to get a list of all the in-progress and queued +# jobs and display them in a list. Then it tries to start the next pending job execution. +# If such a job exists, the sample emulates "doing work" by spawning a thread +# that sleeps for several seconds before marking the job as SUCCEEDED. When no +# pending job executions exist, the sample sits in an idle state. +# +# The sample also subscribes to receive "Next Job Execution Changed" events. +# If the sample is idle, this event wakes it to start the job. If the sample is +# already working on a job, it remembers to try for another when it's done. +# This event is sent by the service when the current job completes, so the +# sample will be continually prompted to try another job until none remain. + +# Using globals to simplify sample code +is_sample_done = threading.Event() + +# cmdData is the arguments/input from the command line placed into a single struct for +# use in this sample. This handles all of the command line parsing, validating, etc. +# See the Utils/CommandLineUtils for more information. +cmdData = CommandLineUtils.parse_sample_input_jobs() + +mqtt_connection = None +jobs_client = None +jobs_thing_name = cmdData.input_thing_name +mqtt_qos = None + +# MQTT5 specific +mqtt5_client = None +future_connection_success = Future() + +class LockedData: + def __init__(self): + self.lock = threading.Lock() + self.disconnect_called = False + self.is_working_on_job = False + self.is_next_job_waiting = False + self.got_job_response = False + + +locked_data = LockedData() + +# Function for gracefully quitting this sample +def exit(msg_or_exception): + if isinstance(msg_or_exception, Exception): + print("Exiting Sample due to exception.") + traceback.print_exception(msg_or_exception.__class__, msg_or_exception, sys.exc_info()[2]) + else: + print("Exiting Sample:", msg_or_exception) + + with locked_data.lock: + if not locked_data.disconnect_called: + print("Disconnecting...") + locked_data.disconnect_called = True + if cmdData.input_mqtt_version == 5: + locked_data.disconnect_called = True + mqtt5_client.stop() + else: + future = mqtt_connection.disconnect() + future.add_done_callback(on_disconnected) + + +def try_start_next_job(): + print("Trying to start the next job...") + with locked_data.lock: + if locked_data.is_working_on_job: + print("Nevermind, already working on a job.") + return + + if locked_data.disconnect_called: + print("Nevermind, sample is disconnecting.") + return + + locked_data.is_working_on_job = True + locked_data.is_next_job_waiting = False + + print("Publishing request to start next job...") + request = iotjobs.StartNextPendingJobExecutionRequest(thing_name=jobs_thing_name) + publish_future = jobs_client.publish_start_next_pending_job_execution(request, mqtt_qos) + publish_future.add_done_callback(on_publish_start_next_pending_job_execution) + + +def done_working_on_job(): + with locked_data.lock: + locked_data.is_working_on_job = False + try_again = locked_data.is_next_job_waiting + exit(0) + + if try_again: + try_start_next_job() + + +def on_disconnected(disconnect_future): + # type: (Future) -> None + print("Disconnected.") + + # Signal that sample is finished + is_sample_done.set() + + +# A list to hold all the pending jobs +available_jobs = [] + + +def on_get_pending_job_executions_accepted(response): + # type: (iotjobs.GetPendingJobExecutionsResponse) -> None + with locked_data.lock: + if (len(response.queued_jobs) > 0 or len(response.in_progress_jobs) > 0): + print("Pending Jobs:") + for job in response.in_progress_jobs: + available_jobs.append(job) + print(f" In Progress: {job.job_id} @ {job.last_updated_at}") + for job in response.queued_jobs: + available_jobs.append(job) + print(f" {job.job_id} @ {job.last_updated_at}") + else: + print("No pending or queued jobs found!") + locked_data.got_job_response = True + + +def on_get_pending_job_executions_rejected(error): + # type: (iotjobs.RejectedError) -> None + print(f"Request rejected: {error.code}: {error.message}") + exit("Get pending jobs request rejected!") + + +def on_next_job_execution_changed(event): + # type: (iotjobs.NextJobExecutionChangedEvent) -> None + try: + execution = event.execution + if execution: + print("Received Next Job Execution Changed event. job_id:{} job_document:{}".format( + execution.job_id, execution.job_document)) + + # Start job now, or remember to start it when current job is done + start_job_now = False + with locked_data.lock: + if locked_data.is_working_on_job: + locked_data.is_next_job_waiting = True + else: + start_job_now = True + + if start_job_now: + try_start_next_job() + + else: + print("Received Next Job Execution Changed event: None. Waiting for further jobs...") + + except Exception as e: + exit(e) + + +def on_publish_start_next_pending_job_execution(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + + print("Published request to start the next job.") + + except Exception as e: + exit(e) + + +def on_start_next_pending_job_execution_accepted(response): + # type: (iotjobs.StartNextJobExecutionResponse) -> None + try: + if response.execution: + execution = response.execution + print("Request to start next job was accepted. job_id:{} job_document:{}".format( + execution.job_id, execution.job_document)) + + # To emulate working on a job, spawn a thread that sleeps for a few seconds + job_thread = threading.Thread( + target=lambda: job_thread_fn(execution.job_id, execution.job_document), + name='job_thread') + job_thread.start() + else: + print("Request to start next job was accepted, but there are no jobs to be done. Waiting for further jobs ...") + done_working_on_job() + + except Exception as e: + exit(e) + + +def on_start_next_pending_job_execution_rejected(rejected): + # type: (iotjobs.RejectedError) -> None + exit("Request to start next pending job rejected with code:'{}' message:'{}'".format( + rejected.code, rejected.message)) + + +def job_thread_fn(job_id, job_document): + try: + print("Starting local work on job...") + time.sleep(cmdData.input_job_time) + print("Done working on job.") + + print("Publishing request to update job status to SUCCEEDED...") + request = iotjobs.UpdateJobExecutionRequest( + thing_name=jobs_thing_name, + job_id=job_id, + status=iotjobs.JobStatus.SUCCEEDED) + publish_future = jobs_client.publish_update_job_execution(request, mqtt_qos) + publish_future.add_done_callback(on_publish_update_job_execution) + + except Exception as e: + exit(e) + + +def on_publish_update_job_execution(future): + # type: (Future) -> None + try: + future.result() # raises exception if publish failed + print("Published request to update job.") + + except Exception as e: + exit(e) + + +def on_update_job_execution_accepted(response): + # type: (iotjobs.UpdateJobExecutionResponse) -> None + try: + print("Request to update job was accepted.") + done_working_on_job() + except Exception as e: + exit(e) + + +def on_update_job_execution_rejected(rejected): + # type: (iotjobs.RejectedError) -> None + exit("Request to update job status was rejected. code:'{}' message:'{}'.".format( + rejected.code, rejected.message)) + +# MQTT5 specific functions +# Callback for the lifecycle event Connection Success +def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData): + print("Lifecycle Connection Success") + global future_connection_success + future_connection_success.set_result(lifecycle_connect_success_data) + +# Callback for the lifecycle event on Client Stopped +def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData): + print("Client Stopped.") + + # Signal that sample is finished + is_sample_done.set() + +# end MQTT5 specific functions + +if __name__ == '__main__': + # Create the proxy options if the data is present in cmdData + + proxy_options = None + if cmdData.input_proxy_host is not None and cmdData.input_proxy_port != 0: + proxy_options = http.HttpProxyOptions( + host_name=cmdData.input_proxy_host, + port=cmdData.input_proxy_port) + + if cmdData.input_mqtt_version == 5: + mqtt_qos = mqtt5.QoS.AT_LEAST_ONCE + # Create a mqtt5 connection from the command line data + mqtt5_client = mqtt5_client_builder.mtls_from_path( + endpoint=cmdData.input_endpoint, + port=cmdData.input_port, + cert_filepath=cmdData.input_cert, + pri_key_filepath=cmdData.input_key, + ca_filepath=cmdData.input_ca, + client_id=cmdData.input_clientId, + clean_session=False, + keep_alive_secs=30, + http_proxy_options=proxy_options, + on_lifecycle_connection_success=on_lifecycle_connection_success, + on_lifecycle_stopped=on_lifecycle_stopped) + print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}' with MQTT5...") + + mqtt5_client.start() + + jobs_client = iotjobs.IotJobsClient(mqtt5_client) + future_connection_success.result() + + # Wait for connection to be fully established. + # Note that it's not necessary to wait, commands issued to the + # mqtt5_client before its fully connected will simply be queued. + # But this sample waits here so it's obvious when a connection + # fails or succeeds. + elif cmdData.input_mqtt_version == 3: + mqtt_qos = mqtt.QoS.AT_LEAST_ONCE + # Create a MQTT connection from the command line data + mqtt_connection = mqtt_connection_builder.mtls_from_path( + endpoint=cmdData.input_endpoint, + port=cmdData.input_port, + cert_filepath=cmdData.input_cert, + pri_key_filepath=cmdData.input_key, + ca_filepath=cmdData.input_ca, + client_id=cmdData.input_clientId, + clean_session=False, + keep_alive_secs=30, + http_proxy_options=proxy_options) + + print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}' with MQTT3...") + + connected_future = mqtt_connection.connect() + + jobs_client = iotjobs.IotJobsClient(mqtt_connection) + connected_future.result() + + # Wait for connection to be fully established. + # Note that it's not necessary to wait, commands issued to the + # mqtt_connection before its fully connected will simply be queued. + # But this sample waits here so it's obvious when a connection + # fails or succeeds. + else: + print("Unsopported MQTT version number\n") + sys.exit(-1) + + + print("Connected!") + + try: + # List the jobs queued and pending + get_jobs_request = iotjobs.GetPendingJobExecutionsRequest(thing_name=jobs_thing_name) + jobs_request_future_accepted, _ = jobs_client.subscribe_to_get_pending_job_executions_accepted( + request=get_jobs_request, + qos=mqtt_qos, + callback=on_get_pending_job_executions_accepted + ) + # Wait for the subscription to succeed + jobs_request_future_accepted.result() + + jobs_request_future_rejected, _ = jobs_client.subscribe_to_get_pending_job_executions_rejected( + request=get_jobs_request, + qos=mqtt_qos, + callback=on_get_pending_job_executions_rejected + ) + # Wait for the subscription to succeed + jobs_request_future_rejected.result() + + # Get a list of all the jobs + get_jobs_request_future = jobs_client.publish_get_pending_job_executions( + request=get_jobs_request, + qos=mqtt_qos + ) + # Wait for the publish to succeed + get_jobs_request_future.result() + except Exception as e: + exit(e) + try: + # Subscribe to necessary topics. + # Note that is **is** important to wait for "accepted/rejected" subscriptions + # to succeed before publishing the corresponding "request". + print("Subscribing to Next Changed events...") + changed_subscription_request = iotjobs.NextJobExecutionChangedSubscriptionRequest( + thing_name=jobs_thing_name) + + subscribed_future, _ = jobs_client.subscribe_to_next_job_execution_changed_events( + request=changed_subscription_request, + qos=mqtt_qos, + callback=on_next_job_execution_changed) + + # Wait for subscription to succeed + subscribed_future.result() + + print("Subscribing to Start responses...") + start_subscription_request = iotjobs.StartNextPendingJobExecutionSubscriptionRequest( + thing_name=jobs_thing_name) + subscribed_accepted_future, _ = jobs_client.subscribe_to_start_next_pending_job_execution_accepted( + request=start_subscription_request, + qos=mqtt_qos, + callback=on_start_next_pending_job_execution_accepted) + + subscribed_rejected_future, _ = jobs_client.subscribe_to_start_next_pending_job_execution_rejected( + request=start_subscription_request, + qos=mqtt_qos, + callback=on_start_next_pending_job_execution_rejected) + + # Wait for subscriptions to succeed + subscribed_accepted_future.result() + subscribed_rejected_future.result() + + print("Subscribing to Update responses...") + # Note that we subscribe to "+", the MQTT wildcard, to receive + # responses about any job-ID. + update_subscription_request = iotjobs.UpdateJobExecutionSubscriptionRequest( + thing_name=jobs_thing_name, + job_id='+') + + subscribed_accepted_future, _ = jobs_client.subscribe_to_update_job_execution_accepted( + request=update_subscription_request, + qos=mqtt_qos, + callback=on_update_job_execution_accepted) + + subscribed_rejected_future, _ = jobs_client.subscribe_to_update_job_execution_rejected( + request=update_subscription_request, + qos=mqtt_qos, + callback=on_update_job_execution_rejected) + + # Wait for subscriptions to succeed + subscribed_accepted_future.result() + subscribed_rejected_future.result() + + # Make initial attempt to start next job. The service should reply with + # an "accepted" response, even if no jobs are pending. The response + # will contain data about the next job, if there is one. + # (Will do nothing if we are in CI) + try_start_next_job() + + except Exception as e: + exit(e) + + # Wait for the sample to finish + is_sample_done.wait() diff --git a/utils/ci_iot_thing.py b/utils/ci_iot_thing.py new file mode 100644 index 00000000..81d54739 --- /dev/null +++ b/utils/ci_iot_thing.py @@ -0,0 +1,82 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import sys + +import boto3 + + +def create_iot_thing(thing_name, region, policy_name, certificate_path, key_path, thing_group=None): + """ Create IoT thing along with policy and credentials. """ + + iot_client = boto3.client('iot', region_name=region) + + print(f"Creating thing '{thing_name}'", file=sys.stderr) + + iot_client.create_thing(thingName=thing_name) + if thing_group: + iot_client.add_thing_to_thing_group(thingGroupName=thing_group, thingName=thing_name) + + try: + print("Creating certificate", file=sys.stderr) + create_cert_response = iot_client.create_keys_and_certificate( + setAsActive=True + ) + + f = open(certificate_path, "w") + f.write(create_cert_response['certificatePem']) + f.close() + + f = open(key_path, "w") + f.write(create_cert_response['keyPair']['PrivateKey']) + f.close() + + certificate_arn = create_cert_response['certificateArn'] + + print("Attaching policy to certificate", file=sys.stderr) + iot_client.attach_policy(policyName=policy_name, target=certificate_arn) + + print("Attaching certificate to thing", file=sys.stderr) + iot_client.attach_thing_principal(thingName=thing_name, principal=certificate_arn) + except Exception: + try: + iot_client.delete_thing(thingName=thing_name) + except Exception: + print("ERROR: Could not delete thing", file=sys.stderr) + raise + + print("IoT thing created successfully", file=sys.stderr) + + +def delete_iot_thing(thing_name, region): + """ Delete IoT thing and all its principals. """ + + try: + iot_client = boto3.client('iot', region_name=region) + except Exception as e: + print(f"ERROR: Could not make Boto3 client. Credentials likely could not be sourced", file=sys.stderr) + raise + + # Detach and delete thing's principals. + try: + thing_principals = iot_client.list_thing_principals(thingName=thing_name) + print(f"Detaching and deleting principals: {thing_principals}", file=sys.stderr) + for principal in thing_principals["principals"]: + certificate_id = principal.split("/")[1] + iot_client.detach_thing_principal(thingName=thing_name, principal=principal) + iot_client.update_certificate(certificateId=certificate_id, newStatus='INACTIVE') + iot_client.delete_certificate(certificateId=certificate_id, forceDelete=True) + except Exception: + print("ERROR: Could not delete certificate for IoT thing {thing_name}, probably thing does not exist", + file=sys.stderr) + raise + + # Delete thing. + try: + iot_client.delete_thing(thingName=thing_name) + except Exception: + raise + + print("IoT thing deleted successfully", file=sys.stderr) + + return 0 diff --git a/utils/run_in_ci.py b/utils/run_in_ci.py new file mode 100644 index 00000000..a999d2de --- /dev/null +++ b/utils/run_in_ci.py @@ -0,0 +1,358 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +# Built-in +import argparse +import os +import subprocess +import pathlib +import sys +import json +# Needs to be installed via pip +import boto3 + +current_folder = os.path.dirname(pathlib.Path(__file__).resolve()) +if sys.platform == "win32" or sys.platform == "cygwin": + current_folder += "\\" +else: + current_folder += "/" + +config_json = None +config_json_arguments_list = [] + +pfx_certificate_store_location = "CurrentUser\\My" +pfx_password = "" # Setting a password causes issues, but an empty string is valid so we use that + +def setup_json_arguments_list(file, input_uuid=None): + global config_json + global config_json_arguments_list + + print("Attempting to get credentials from secrets using Boto3...") + secrets_client = boto3.client("secretsmanager", region_name=config_json['runnable_region']) + print("Processing arguments...") + + for argument in config_json['arguments']: + # Add the name of the argument + config_json_arguments_list.append(argument['name']) + + # Based on the data present, we need to process and add the data differently + try: + + # Is there a secret? If so, decode it! + if 'secret' in argument: + secret_data = secrets_client.get_secret_value(SecretId=argument['secret'])["SecretString"] + + # Is this supposed to be stored in a file? + if 'filename' in argument: + with open(str(current_folder) + argument['filename'], "w") as file: + # lgtm [py/clear-text-storage-sensitive-data] + file.write(secret_data) + config_json_arguments_list.append(str(current_folder) + argument['filename']) + else: + config_json_arguments_list.append(secret_data) + + if 'pkcs11_key' in argument: + pkcs11_result = make_softhsm_key(str(current_folder) + argument['filename']) + if (pkcs11_result != 0): + print ("ERROR with PKCS11!") + return pkcs11_result + + # Windows 10 certificate store data? + elif 'windows_cert_certificate' in argument and 'windows_cert_certificate_path' in argument \ + and 'windows_cert_key' in argument and 'windows_cert_key_path' in argument != None \ + and 'windows_cert_pfx_key_path' in argument != None: + + windows_cert_data = secrets_client.get_secret_value(SecretId=argument['windows_cert_certificate'])["SecretString"] + with open(str(current_folder) + argument['windows_cert_certificate_path'], "w") as file: + # lgtm [py/clear-text-storage-sensitive-data] + file.write(windows_cert_data) + windows_key_data = secrets_client.get_secret_value(SecretId=argument['windows_cert_key'])["SecretString"] + with open(str(current_folder) + argument['windows_cert_key_path'], "w") as file: + # lgtm [py/clear-text-storage-sensitive-data] + file.write(windows_key_data) + + certificate_path = make_windows_pfx_file( + str(current_folder) + argument['windows_cert_certificate_path'], + str(current_folder) + argument['windows_cert_key_path'], + str(current_folder) + argument['windows_cert_pfx_key_path']) + config_json_arguments_list.append(certificate_path) + + # Raw data? just add it directly! + elif 'data' in argument: + tmp_value = argument['data'] + if isinstance(tmp_value, str) and input_uuid is not None: + if ("$INPUT_UUID" in tmp_value): + tmp_value = tmp_value.replace("$INPUT_UUID", input_uuid) + if (tmp_value != None and tmp_value != ""): + config_json_arguments_list.append(tmp_value) + + # None of the above? Just print an error + else: + print("ERROR - unknown or missing argument value!") + + except Exception as e: + print(f"Something went wrong processing {argument['name']}: {e}!") + return -1 + return 0 + +def make_softhsm_key(private_key_path): + print ("Setting up private key via SoftHSM") + softhsm_run = subprocess.run("softhsm2-util --init-token --free --label my-token --pin 0000 --so-pin 0000", shell=True) + if (softhsm_run.returncode != 0): + print ("ERROR: SoftHSM could not initialize a new token") + return softhsm_run.returncode + softhsm_run = subprocess.run(f"softhsm2-util --import {private_key_path} --token my-token --label my-key --id BEEFCAFE --pin 0000", shell=True) + if (softhsm_run.returncode != 0): + print ("ERROR: SoftHSM could not import token") + print ("Finished setting up private key in SoftHSM") + return 0 + + +def make_windows_pfx_file(certificate_file_path, private_key_path, pfx_file_path): + global pfx_certificate_store_location + global pfx_password + + if sys.platform == "win32" or sys.platform == "cygwin": + if os.path.isfile(certificate_file_path) != True: + print (certificate_file_path) + print("ERROR: Certificate file not found!") + return 1 + if os.path.isfile(private_key_path) != True: + print("ERROR: Private key file not found!") + return 1 + + # Delete old PFX file if it exists + if os.path.isfile(pfx_file_path): + os.remove(pfx_file_path) + + # Make a key copy + copy_path = os.path.splitext(certificate_file_path) + with open(copy_path[0] + ".key", 'w') as file: + key_file = open(private_key_path) + file.write(key_file.read()) + key_file.close() + + # Make a PFX file + arguments = ["certutil", "-mergePFX", certificate_file_path, pfx_file_path] + certutil_run = subprocess.run(args=arguments, shell=True, input=f"{pfx_password}\n{pfx_password}", encoding='ascii') + if (certutil_run.returncode != 0): + print ("ERROR: Could not make PFX file") + return 1 + else: + print ("PFX file created successfully") + + # Remove the temporary key copy + if os.path.isfile(copy_path[0] + ".key"): + os.remove(copy_path[0] + ".key") + + # Import the PFX into the Windows Certificate Store + # (Passing '$mypwd' is required even though it is empty and our certificate has no password. It fails CI otherwise) + import_pfx_arguments = ["powershell.exe", "Import-PfxCertificate", "-FilePath", pfx_file_path, "-CertStoreLocation", "Cert:\\" + pfx_certificate_store_location, "-Password", "$mypwd"] + import_pfx_run = subprocess.run(args=import_pfx_arguments, shell=True, stdout=subprocess.PIPE) + if (import_pfx_run.returncode != 0): + print ("ERROR: Could not import PFX certificate into Windows store!") + return 1 + else: + print ("Certificate imported to Windows Certificate Store successfully") + + # Get the certificate thumbprint from the output: + import_pfx_output = str(import_pfx_run.stdout) + # We know the Thumbprint will always be 40 characters long, so we can find it using that + # TODO: Extract this using a better method + thumbprint = "" + current_str = "" + # The input comes as a string with some special characters still included, so we need to remove them! + import_pfx_output = import_pfx_output.replace("\\r", " ") + import_pfx_output = import_pfx_output.replace("\\n", "\n") + for i in range(0, len(import_pfx_output)): + if (import_pfx_output[i] == " " or import_pfx_output[i] == "\n"): + if (len(current_str) == 40): + thumbprint = current_str + break + current_str = "" + else: + current_str += import_pfx_output[i] + + # Did we get a thumbprint? + if (thumbprint == ""): + print ("ERROR: Could not find certificate thumbprint") + return 1 + + # Construct the certificate path + print ("PFX certificate created and imported successfully!") + return pfx_certificate_store_location + "\\" + thumbprint + + else: + print("ERROR - Windows PFX file can only be created on a Windows platform!") + return 1 + +def setup_runnable(file, input_uuid=None): + global config_json + + file_absolute = pathlib.Path(file).resolve() + json_file_data = "" + with open(file_absolute, "r") as json_file: + json_file_data = json_file.read() + + # Load the JSON data + config_json = json.loads(json_file_data) + + # Make sure required parameters are all there + if not 'language' in config_json or not 'runnable_file' in config_json \ + or not 'runnable_region' in config_json or not 'runnable_main_class' in config_json: + return -1 + + # Preprocess runnable arguments (get secret data, etc) + setup_result = setup_json_arguments_list(file, input_uuid) + if setup_result != 0: + return setup_result + + print("JSON config file loaded!") + return 0 + + +def cleanup_runnable(): + global config_json + global config_json_arguments_list + + for argument in config_json['arguments']: + config_json_arguments_list.append(argument['name']) + + # Based on the data present, we need to process and add the data differently + try: + # Is there a file? If so, clean it! + if 'filename' in argument: + if (os.path.isfile(str(current_folder) + argument['filename'])): + os.remove(str(current_folder) + argument['filename']) + + # Windows 10 certificate store data? + if 'windows_cert_certificate' in argument and 'windows_cert_certificate_path' in argument \ + and 'windows_cert_key' in argument and 'windows_cert_key_path' in argument \ + and 'windows_cert_pfx_key_path' in argument: + + if (os.path.isfile(str(current_folder) + argument['windows_cert_certificate_path'])): + os.remove(str(current_folder) + argument['windows_cert_certificate_path']) + if (os.path.isfile(str(current_folder) + argument['windows_cert_key_path'])): + os.remove(str(current_folder) + argument['windows_cert_key_path']) + if (os.path.isfile(str(current_folder) + argument['windows_cert_pfx_key_path'])): + os.remove(str(current_folder) + argument['windows_cert_pfx_key_path']) + + except Exception as e: + print(f"Something went wrong cleaning {argument['name']}!") + return -1 + + +def launch_runnable(): + global config_json + global config_json_arguments_list + + if (config_json == None): + print("No configuration JSON file data found!") + return -1 + + exit_code = 0 + + print("Launching runnable...") + + # Java + if (config_json['language'] == "Java"): + + # Flatten arguments down into a single string + arguments_as_string = "" + for i in range(0, len(config_json_arguments_list)): + arguments_as_string += str(config_json_arguments_list[i]) + if (i+1 < len(config_json_arguments_list)): + arguments_as_string += " " + + arguments = ["mvn", "compile", "exec:java"] + arguments.append("-pl") + arguments.append(config_json['runnable_file']) + arguments.append("-Dexec.mainClass=" + config_json['runnable_main_class']) + arguments.append("-Daws.crt.ci=True") + + # We have to do this as a string, unfortunately, due to how -Dexec.args= works... + argument_string = subprocess.list2cmdline(arguments) + " -Dexec.args=\"" + arguments_as_string + "\"" + print(f"Running cmd: {argument_string}") + runnable_return = subprocess.run(argument_string, shell=True) + exit_code = runnable_return.returncode + + # C++ + elif (config_json['language'] == "CPP"): + runnable_return = subprocess.run( + args=config_json_arguments_list, executable=config_json['runnable_file']) + exit_code = runnable_return.returncode + + elif (config_json['language'] == "Python"): + config_json_arguments_list.append("--is_ci") + config_json_arguments_list.append("True") + + runnable_return = subprocess.run( + args=[sys.executable, config_json['runnable_file']] + config_json_arguments_list) + exit_code = runnable_return.returncode + + elif (config_json['language'] == "Javascript"): + os.chdir(config_json['runnable_file']) + + config_json_arguments_list.append("--is_ci") + config_json_arguments_list.append("true") + + runnable_return_one = None + if sys.platform == "win32" or sys.platform == "cygwin": + runnable_return_one = subprocess.run(args=["npm", "install"], shell=True) + else: + runnable_return_one = subprocess.run(args=["npm", "install"]) + + if (runnable_return_one == None or runnable_return_one.returncode != 0): + exit_code = runnable_return_one.returncode + else: + runnable_return_two = None + arguments = [] + if 'node_cmd' in config_json: + arguments = config_json['node_cmd'].split(" ") + else: + arguments = ["node", "dist/index.js"] + + if sys.platform == "win32" or sys.platform == "cygwin": + runnable_return_two = subprocess.run( + args=arguments + config_json_arguments_list, shell=True) + else: + runnable_return_two = subprocess.run( + args=arguments + config_json_arguments_list) + + if (runnable_return_two != None): + exit_code = runnable_return_two.returncode + else: + exit_code = 1 + + cleanup_runnable() + return exit_code + + +def setup_and_launch(file, input_uuid=None): + setup_result = setup_runnable(file, input_uuid) + if setup_result != 0: + print("Setting up runnable failed") + return setup_result + + print("About to launch runnable...") + return launch_runnable() + + +def main(): + argument_parser = argparse.ArgumentParser( + description="Run runnable in CI") + argument_parser.add_argument("--file", required=True, help="Configuration file to pull CI data from") + argument_parser.add_argument("--input_uuid", required=False, + help="UUID data to replace '$INPUT_UUID' with. Only works in Data field") + parsed_commands = argument_parser.parse_args() + + file = parsed_commands.file + input_uuid = parsed_commands.input_uuid + + print(f"Starting to launch runnable: config {file}; input UUID: {input_uuid}") + test_result = setup_and_launch(file, input_uuid) + sys.exit(test_result) + + +if __name__ == "__main__": + main()