Skip to content

docs(event-sources): extract examples #1115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,10 @@ changelog:

mypy:
poetry run mypy --pretty aws_lambda_powertools

format-examples:
poetry run isort docs/examples
poetry run black docs/examples/*/*/*.py

lint-examples:
poetry run python3 -m py_compile docs/examples/*/*/*.py
15 changes: 15 additions & 0 deletions docs/examples/utilities/data_classes/app_active_mq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent

logger = Logger()


@event_source(data_class=ActiveMQEvent)
def lambda_handler(event: ActiveMQEvent, context):
for message in event.messages:
logger.debug(f"MessageID: {message.message_id}")
data: Dict = message.json_data
logger.debug("Process json in base64 encoded data str", data)
7 changes: 7 additions & 0 deletions docs/examples/utilities/data_classes/app_alb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from aws_lambda_powertools.utilities.data_classes import ALBEvent, event_source


@event_source(data_class=ALBEvent)
def lambda_handler(event: ALBEvent, context):
if "helloworld" in event.path and event.http_method == "POST":
do_something_with(event.json_body, event.query_string_parameters)
33 changes: 33 additions & 0 deletions docs/examples/utilities/data_classes/app_appsync_authorizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Dict

from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.logging.logger import Logger
from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import (
AppSyncAuthorizerEvent,
AppSyncAuthorizerResponse,
)
from aws_lambda_powertools.utilities.data_classes.event_source import event_source

logger = Logger()


def get_user_by_token(token: str):
"""Look a user by token"""
...


@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)
@event_source(data_class=AppSyncAuthorizerEvent)
def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict:
user = get_user_by_token(event.authorization_token)

if not user:
# No user found, return not authorized
return AppSyncAuthorizerResponse().asdict()

return AppSyncAuthorizerResponse(
authorize=True,
resolver_context={"id": user.id},
# Only allow admins to delete events
deny_fields=None if user.is_admin else ["Mutation.deleteEvent"],
).asdict()
36 changes: 36 additions & 0 deletions docs/examples/utilities/data_classes/app_appsync_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from aws_lambda_powertools.logging import Logger, correlation_paths
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import (
AppSyncIdentityCognito,
AppSyncResolverEvent,
)

logger = Logger()


def get_locations(name: str = None, size: int = 0, page: int = 0):
"""Your resolver logic here"""


@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
def lambda_handler(event, context):
event: AppSyncResolverEvent = AppSyncResolverEvent(event)

# Case insensitive look up of request headers
x_forwarded_for = event.get_header_value("x-forwarded-for")

# Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity types
assert isinstance(event.identity, AppSyncIdentityCognito)
identity: AppSyncIdentityCognito = event.identity

# Logging with correlation_id
logger.debug(
{
"x-forwarded-for": x_forwarded_for,
"username": identity.username,
}
)

if event.type_name == "Merchant" and event.field_name == "locations":
return get_locations(**event.arguments)

raise ValueError(f"Unsupported field resolver: {event.field_name}")
10 changes: 10 additions & 0 deletions docs/examples/utilities/data_classes/app_cloudwatch_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from aws_lambda_powertools.utilities.data_classes import CloudWatchLogsEvent, event_source
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData


@event_source(data_class=CloudWatchLogsEvent)
def lambda_handler(event: CloudWatchLogsEvent, context):
decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data
log_events = decompressed_log.log_events
for event in log_events:
do_something_with(event.timestamp, event.message)
43 changes: 43 additions & 0 deletions docs/examples/utilities/data_classes/app_codepipeline_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import CodePipelineJobEvent, event_source

logger = Logger()


@event_source(data_class=CodePipelineJobEvent)
def lambda_handler(event, context):
"""The Lambda function handler

If a continuing job then checks the CloudFormation stack status
and updates the job accordingly.

If a new job then kick of an update or creation of the target
CloudFormation stack.
"""

# Extract the Job ID
job_id = event.get_id

# Extract the params
params: dict = event.decoded_user_parameters
stack = params["stack"]
artifact_name = params["artifact"]
template_file = params["file"]

try:
if event.data.continuation_token:
# If we're continuing then the create/update has already been triggered
# we just need to check if it has finished.
check_stack_update_status(job_id, stack)
else:
template = event.get_artifact(artifact_name, template_file)
# Kick off a stack update or create
start_update_or_create(job_id, stack, template)
except Exception as e:
# If any other exceptions which we didn't expect are raised
# then fail the job and log the exception message.
logger.exception("Function failed due to exception.")
put_job_failure(job_id, "Function exception: " + str(e))

logger.debug("Function complete.")
return "Complete."
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent


@event_source(data_class=CreateAuthChallengeTriggerEvent)
def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict:
if event.request.challenge_name == "CUSTOM_CHALLENGE":
event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"}
event.response.private_challenge_parameters = {"answer": "5"}
event.response.challenge_metadata = "CAPTCHA_CHALLENGE"
return event.raw_event
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent


def handler(event: dict, context) -> dict:
event: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event)
if len(event.request.session) == 1 and event.request.session[0].challenge_name == "SRP_A":
event.response.issue_tokens = False
event.response.fail_authentication = False
event.response.challenge_name = "PASSWORD_VERIFIER"
elif (
len(event.request.session) == 2
and event.request.session[1].challenge_name == "PASSWORD_VERIFIER"
and event.request.session[1].challenge_result
):
event.response.issue_tokens = False
event.response.fail_authentication = False
event.response.challenge_name = "CUSTOM_CHALLENGE"
elif (
len(event.request.session) == 3
and event.request.session[2].challenge_name == "CUSTOM_CHALLENGE"
and event.request.session[2].challenge_result
):
event.response.issue_tokens = True
event.response.fail_authentication = False
else:
event.response.issue_tokens = False
event.response.fail_authentication = True

return event.raw_event
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent


def lambda_handler(event, context):
event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event)

user_attributes = event.request.user_attributes
do_something_with(user_attributes)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent


@event_source(data_class=VerifyAuthChallengeResponseTriggerEvent)
def handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict:
event.response.answer_correct = (
event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer
)
return event.raw_event
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import (
ConnectContactFlowChannel,
ConnectContactFlowEndpointType,
ConnectContactFlowEvent,
ConnectContactFlowInitiationMethod,
)


def lambda_handler(event, context):
event: ConnectContactFlowEvent = ConnectContactFlowEvent(event)
assert event.contact_data.attributes == {"Language": "en-US"}
assert event.contact_data.channel == ConnectContactFlowChannel.VOICE
assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER
assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API
14 changes: 14 additions & 0 deletions docs/examples/utilities/data_classes/app_dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBRecordEventName,
DynamoDBStreamEvent,
)


def lambda_handler(event, context):
event: DynamoDBStreamEvent = DynamoDBStreamEvent(event)

# Multiple records can be delivered in a single event
for record in event.records:
if record.event_name == DynamoDBRecordEventName.MODIFY:
do_something_with(record.dynamodb.new_image)
do_something_with(record.dynamodb.old_image)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import AttributeValue, AttributeValueType
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
for record in event.records:
key: AttributeValue = record.dynamodb.keys["id"]
if key == AttributeValueType.Number:
# {"N": "123.45"} => "123.45"
assert key.get_value == key.n_value
print(key.get_value)
elif key == AttributeValueType.Map:
assert key.get_value == key.map_value
print(key.get_value)
6 changes: 6 additions & 0 deletions docs/examples/utilities/data_classes/app_event_bridge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent, event_source


@event_source(data_class=EventBridgeEvent)
def lambda_handler(event: EventBridgeEvent, context):
do_something_with(event.detail)
7 changes: 7 additions & 0 deletions docs/examples/utilities/data_classes/app_http_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source


@event_source(data_class=APIGatewayProxyEventV2)
def lambda_handler(event: APIGatewayProxyEventV2, context):
if "helloworld" in event.path and event.http_method == "POST":
do_something_with(event.json_body, event.query_string_parameters)
25 changes: 25 additions & 0 deletions docs/examples/utilities/data_classes/app_http_api_authorizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from secrets import compare_digest

from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
APIGatewayAuthorizerEventV2,
APIGatewayAuthorizerResponseV2,
)


def get_user_by_token(token):
if compare_digest(token, "Foo"):
return {"name": "Foo"}
return None


@event_source(data_class=APIGatewayAuthorizerEventV2)
def handler(event: APIGatewayAuthorizerEventV2, context):
user = get_user_by_token(event.get_header_value("x-token"))

if user is None:
# No user was found, so we return not authorized
return APIGatewayAuthorizerResponseV2().asdict()

# Found the user and setting the details in the context
return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict()
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from aws_lambda_powertools.utilities.data_classes import KinesisStreamEvent, event_source


@event_source(data_class=KinesisStreamEvent)
def lambda_handler(event: KinesisStreamEvent, context):
kinesis_record = next(event.records).kinesis

# if data was delivered as text
data = kinesis_record.data_as_text()

# if data was delivered as json
data = kinesis_record.data_as_json()

do_something_with(data)
17 changes: 17 additions & 0 deletions docs/examples/utilities/data_classes/app_rabbit_mq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Dict

from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent

logger = Logger()


@event_source(data_class=RabbitMQEvent)
def lambda_handler(event: RabbitMQEvent, context):
for queue_name, messages in event.rmq_messages_by_queue.items():
logger.debug(f"Messages for queue: {queue_name}")
for message in messages:
logger.debug(f"MessageID: {message.basic_properties.message_id}")
data: Dict = message.json_data
logger.debug("Process json in base64 encoded data str", data)
10 changes: 10 additions & 0 deletions docs/examples/utilities/data_classes/app_rest_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source


@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
if "helloworld" in event.path and event.http_method == "GET":
request_context = event.request_context
identity = request_context.identity
user = identity.user
do_something_with(event.json_body, user)
Loading