Skip to content

feat(data-classes): support for code pipeline job event #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 17, 2021
Merged
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .api_gateway_proxy_event import APIGatewayProxyEvent, APIGatewayProxyEventV2
from .appsync_resolver_event import AppSyncResolverEvent
from .cloud_watch_logs_event import CloudWatchLogsEvent
from .code_pipeline_job_event import CodePipelineJobEvent
from .connect_contact_flow_event import ConnectContactFlowEvent
from .dynamo_db_stream_event import DynamoDBStreamEvent
from .event_bridge_event import EventBridgeEvent
Expand All @@ -21,6 +22,7 @@
"AppSyncResolverEvent",
"ALBEvent",
"CloudWatchLogsEvent",
"CodePipelineJobEvent",
"ConnectContactFlowEvent",
"DynamoDBStreamEvent",
"EventBridgeEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import json
import tempfile
import zipfile
from typing import Any, Dict, List, Optional
from urllib.parse import unquote_plus

import boto3

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class CodePipelineConfiguration(DictWrapper):
@property
def function_name(self) -> str:
"""Function name"""
return self["FunctionName"]

@property
def user_parameters(self) -> str:
"""User parameters"""
return self["UserParameters"]

@property
def decoded_user_parameters(self) -> Dict[str, Any]:
"""Json Decoded user parameters"""
return json.loads(self.user_parameters)


class CodePipelineActionConfiguration(DictWrapper):
"""CodePipeline Action Configuration"""

@property
def configuration(self) -> CodePipelineConfiguration:
return CodePipelineConfiguration(self["configuration"])


class CodePipelineS3Location(DictWrapper):
@property
def bucket_name(self) -> str:
return self["bucketName"]

@property
def key(self) -> str:
"""Raw S3 object key"""
return self["objectKey"]

@property
def object_key(self) -> str:
"""Unquote plus of the S3 object key"""
return unquote_plus(self["objectKey"])


class CodePipelineLocation(DictWrapper):
@property
def get_type(self) -> str:
"""Location type eg: S3"""
return self["type"]

@property
def s3_location(self) -> CodePipelineS3Location:
"""S3 location"""
return CodePipelineS3Location(self["s3Location"])


class CodePipelineArtifact(DictWrapper):
@property
def name(self) -> str:
"""Name"""
return self["name"]

@property
def revision(self) -> Optional[str]:
return self.get("revision")

@property
def location(self) -> CodePipelineLocation:
return CodePipelineLocation(self["location"])


class CodePipelineArtifactCredentials(DictWrapper):
@property
def access_key_id(self) -> str:
return self["accessKeyId"]

@property
def secret_access_key(self) -> str:
return self["secretAccessKey"]

@property
def session_token(self) -> str:
return self["sessionToken"]

@property
def expiration_time(self) -> Optional[int]:
return self.get("expirationTime")


class CodePipelineData(DictWrapper):
"""CodePipeline Job Data"""

@property
def action_configuration(self) -> CodePipelineActionConfiguration:
"""CodePipeline action configuration"""
return CodePipelineActionConfiguration(self["actionConfiguration"])

@property
def input_artifacts(self) -> List[CodePipelineArtifact]:
"""Represents a CodePipeline input artifact"""
return [CodePipelineArtifact(item) for item in self["inputArtifacts"]]

@property
def output_artifacts(self) -> List[CodePipelineArtifact]:
"""Represents a CodePipeline output artifact"""
return [CodePipelineArtifact(item) for item in self["outputArtifacts"]]

@property
def artifact_credentials(self) -> CodePipelineArtifactCredentials:
"""Represents a CodePipeline artifact credentials"""
return CodePipelineArtifactCredentials(self["artifactCredentials"])

@property
def continuation_token(self) -> Optional[str]:
"""A continuation token if continuing job"""
return self.get("continuationToken")


class CodePipelineJobEvent(DictWrapper):
"""AWS CodePipeline Job Event

Documentation:
-------------
- https://docs.aws.amazon.com/codepipeline/latest/userguide/actions-invoke-lambda-function.html
- https://docs.aws.amazon.com/lambda/latest/dg/services-codepipeline.html
"""

def __init__(self, data: Dict[str, Any]):
super().__init__(data)
self._job = self["CodePipeline.job"]

@property
def get_id(self) -> str:
"""Job id"""
return self._job["id"]

@property
def account_id(self) -> str:
"""Account id"""
return self._job["accountId"]

@property
def data(self) -> CodePipelineData:
"""Code pipeline jab data"""
return CodePipelineData(self._job["data"])

@property
def user_parameters(self) -> str:
"""Action configuration user parameters"""
return self.data.action_configuration.configuration.user_parameters

@property
def decoded_user_parameters(self) -> Dict[str, Any]:
"""Json Decoded action configuration user parameters"""
return self.data.action_configuration.configuration.decoded_user_parameters

@property
def input_bucket_name(self) -> str:
"""Get the first input artifact bucket name"""
return self.data.input_artifacts[0].location.s3_location.bucket_name

@property
def input_object_key(self) -> str:
"""Get the first input artifact order key unquote plus"""
return self.data.input_artifacts[0].location.s3_location.object_key

def setup_s3_client(self):
"""Creates an S3 client

Uses the credentials passed in the event by CodePipeline. These
credentials can be used to access the artifact bucket.

Returns
-------
BaseClient
An S3 client with the appropriate credentials
"""
return boto3.client(
"s3",
aws_access_key_id=self.data.artifact_credentials.access_key_id,
aws_secret_access_key=self.data.artifact_credentials.secret_access_key,
aws_session_token=self.data.artifact_credentials.session_token,
)

def find_input_artifact(self, artifact_name: str) -> Optional[CodePipelineArtifact]:
"""Find an input artifact by artifact name

Parameters
----------
artifact_name : str
The name of the input artifact to look for

Returns
-------
CodePipelineArtifact, None
Matching CodePipelineArtifact if found
"""
for artifact in self.data.input_artifacts:
if artifact.name == artifact_name:
return artifact
return None

def get_artifact(self, artifact_name: str, filename: str) -> Optional[str]:
"""Get a file within an artifact zip on s3

Parameters
----------
artifact_name : str
Name of the S3 artifact to download
filename : str
The file name within the artifact zip to extract as a string

Returns
-------
str, None
Returns the contents file contents as a string
"""
artifact = self.find_input_artifact(artifact_name)
if artifact is None:
return None

with tempfile.NamedTemporaryFile() as tmp_file:
s3 = self.setup_s3_client()
bucket = artifact.location.s3_location.bucket_name
key = artifact.location.s3_location.key
s3.download_file(bucket, key, tmp_file.name)
with zipfile.ZipFile(tmp_file.name, "r") as zip_file:
return zip_file.read(filename).decode("UTF-8")
53 changes: 53 additions & 0 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Event Source | Data_class
[API Gateway Proxy event v2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2`
[AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent`
[CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent`
[CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent`
[Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event`
[Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent`
[DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName`
Expand Down Expand Up @@ -222,6 +223,58 @@ decompress and parse json data from the event.
do_something_with(event.timestamp, event.message)
```

### CodePipeline Job

Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda

=== "app.py"

```python
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import CodePipelineJobEvent

logger = Logger()


def lambda_handler(event, context):
"""The Lambda function handler

If a continuing job then checks the CloudFormation stack status
and updates the job accordingly.

If a new job then kick of an update or creation of the target
CloudFormation stack.
"""
event: CodePipelineJobEvent = CodePipelineJobEvent(event)

# Extract the Job ID
job_id = event.get_id

# Extract the params
params: dict = event.decoded_user_parameters
stack = params["stack"]
artifact_name = params["artifact"]
template_file = params["file"]

try:
if event.data.continuation_token:
# If we're continuing then the create/update has already been triggered
# we just need to check if it has finished.
check_stack_update_status(job_id, stack)
else:
template = event.get_artifact(artifact_name, template_file)
# Kick off a stack update or create
start_update_or_create(job_id, stack, template)
except Exception as e:
# If any other exceptions which we didn't expect are raised
# then fail the job and log the exception message.
logger.exception("Function failed due to exception.")
put_job_failure(job_id, "Function exception: " + str(e))

logger.debug("Function complete.")
return "Complete."
```

### Cognito User Pool

Cognito User Pools have several [different Lambda trigger sources](https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-user-identity-pools-working-with-aws-lambda-triggers.html#cognito-user-identity-pools-working-with-aws-lambda-trigger-sources), all of which map to a different data class, which
Expand Down
34 changes: 34 additions & 0 deletions tests/events/codePipelineEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"CodePipeline.job": {
"id": "11111111-abcd-1111-abcd-111111abcdef",
"accountId": "111111111111",
"data": {
"actionConfiguration": {
"configuration": {
"FunctionName": "MyLambdaFunctionForAWSCodePipeline",
"UserParameters": "some-input-such-as-a-URL"
}
},
"inputArtifacts": [
{
"name": "ArtifactName",
"revision": null,
"location": {
"type": "S3",
"s3Location": {
"bucketName": "the name of the bucket configured as the pipeline artifact store in Amazon S3, for example codepipeline-us-east-2-1234567890",
"objectKey": "the name of the application, for example CodePipelineDemoApplication.zip"
}
}
}
],
"outputArtifacts": [],
"artifactCredentials": {
"accessKeyId": "AKIAIOSFODNN7EXAMPLE",
"secretAccessKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"sessionToken": "MIICiTCCAfICCQD6m7oRw0uXOjANBgkqhkiG9w0BAQUFADCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wHhcNMTEwNDI1MjA0NTIxWhcNMTIwNDI0MjA0NTIxWjCBiDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAldBMRAwDgYDVQQHEwdTZWF0dGxlMQ8wDQYDVQQKEwZBbWF6b24xFDASBgNVBAsTC0lBTSBDb25zb2xlMRIwEAYDVQQDEwlUZXN0Q2lsYWMxHzAdBgkqhkiG9w0BCQEWEG5vb25lQGFtYXpvbi5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMaK0dn+a4GmWIWJ21uUSfwfEvySWtC2XADZ4nB+BLYgVIk60CpiwsZ3G93vUEIO3IyNoH/f0wYK8m9TrDHudUZg3qX4waLG5M43q7Wgc/MbQITxOUSQv7c7ugFFDzQGBzZswY6786m86gpEIbb3OhjZnzcvQAaRHhdlQWIMm2nrAgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAtCu4nUhVVxYUntneD9+h8Mg9q6q+auNKyExzyLwaxlAoo7TJHidbtS4J5iNmZgXL0FkbFFBjvSfpJIlJ00zbhNYS5f6GuoEDmFJl0ZxBHjJnyp378OD8uTs7fLvjx79LjSTbNYiytVbZPQUQ5Yaxu2jXnimvw3rrszlaEXAMPLE="
},
"continuationToken": "A continuation token if continuing job"
}
}
}
Loading