diff --git a/docs/core/event_handler/appsync_events.md b/docs/core/event_handler/appsync_events.md new file mode 100644 index 00000000000..3eeee7459fd --- /dev/null +++ b/docs/core/event_handler/appsync_events.md @@ -0,0 +1,388 @@ +--- +title: AppSync Events +description: Core utility +status: new +--- + +Event Handler for AWS AppSync real-time events. + +```mermaid +stateDiagram-v2 + direction LR + EventSource: AppSync Events + EventHandlerResolvers: Publish & Subscribe events + LambdaInit: Lambda invocation + EventHandler: Event Handler + EventHandlerResolver: Route event based on namespace/channel + YourLogic: Run your registered handler function + EventHandlerResolverBuilder: Adapts response to AppSync contract + LambdaResponse: Lambda response + + state EventSource { + EventHandlerResolvers + } + + EventHandlerResolvers --> LambdaInit + + LambdaInit --> EventHandler + EventHandler --> EventHandlerResolver + + state EventHandler { + [*] --> EventHandlerResolver: app.resolve(event, context) + EventHandlerResolver --> YourLogic + YourLogic --> EventHandlerResolverBuilder + } + + EventHandler --> LambdaResponse +``` + +## Key Features + +* Easily handle publish and subscribe events with dedicated handler methods +* Automatic routing based on namespace and channel patterns +* Support for wildcard patterns to create catch-all handlers +* Process events in parallel corontrol aggregation for batch processing +* Graceful error handling for individual events + +## Terminology + +**[AWS AppSync Events](https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-welcome.html){target="_blank"}**. A service that enables you to quickly build secure, scalable real-time WebSocket APIs without managing infrastructure or writing API code. + +It handles connection management, message broadcasting, authentication, and monitoring, reducing time to market and operational costs. + +## Getting started + +???+ tip "Tip: New to AppSync Real-time API?" + Visit [AWS AppSync Real-time documentation](https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-getting-started.html){target="_blank"} to understand how to set up subscriptions and pub/sub messaging. + +### Required resources + +You must have an existing AppSync Events API with real-time capabilities enabled and IAM permissions to invoke your Lambda function. That said, there are no additional permissions required to use Event Handler as routing requires no dependency (_standard library_). + +### AppSync request and response format + +AppSync Events uses a specific event format for Lambda requests and responses. In most scenarios, Powertools for AWS simplifies this interaction by automatically formatting resolver returns to match the expected AppSync response structure. + +=== "payload_request.json" + + ```python hl_lines="5 10 12" + --8<-- "examples/event_handler_appsync_events/src/payload_request.json" + ``` + +=== "payload_response.json" + + ```python hl_lines="5 10 12" + --8<-- "examples/event_handler_appsync_events/src/payload_response.json" + ``` + +=== "payload_response_with_error.json" + + ```python hl_lines="5 10 12" + --8<-- "examples/event_handler_appsync_events/src/payload_response_with_error.json" + ``` + +=== "payload_response_fail_request.json" + + ```python hl_lines="5 10 12" + --8<-- "examples/event_handler_appsync_events/src/payload_response_fail_request.json" + ``` + +#### Events response with error + +When processing events with Lambda, you can return errors to AppSync in three ways: + +* **Item specific error:** Return an `error` key within each individual item's response. AppSync Events expects this format for item-specific errors. +* **Fail entire request:** Return a JSON object with a top-level `error` key. This signals a general failure, and AppSync treats the entire request as unsuccessful. +* **Unauthorized exception**: Raise the **UnauthorizedException** exception to reject a subscribe or publish request with HTTP 403. + +### Resolver decorator + +???+ important + The event handler automatically parses the incoming event data and invokes the appropriate handler based on the namespace/channel pattern you register. + +You can define your handlers for different event types using the `app.on_publish()`, `app.async_on_publish()`, and `app.on_subscribe()` methods. + +=== "getting_started_with_publish_events.py" + + ```python hl_lines="5 10 12" + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_publish_events.py" + ``` + +=== "getting_started_with_subscribe_events.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_subscribe_events.py" + ``` + +## Advanced + +### Wildcard patterns and handler precedence + +You can use wildcard patterns to create catch-all handlers for multiple channels or namespaces. This is particularly useful for centralizing logic that applies to multiple channels. + +When an event matches with multiple handlers, the most specific pattern takes precedence. + +???+ note "Supported wildcard patterns" + Only the following patterns are supported: + + * `/namespace/*` - Matches all channels in the specified namespace + * `/*` - Matches all channels in all namespaces + + Patterns like `/namespace/channel*` or `/namespace/*/subpath` are not supported. + + More specific routes will always take precedence over less specific ones. For example, `/default/channel1` will take precedence over `/default/*`, which will take precedence over `/*`. + +=== "working_with_wildcard_resolvers.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_wildcard_resolvers.py" + ``` + +If the event doesn't match any registered handler, the Event Handler will log a warning and skip processing the event. + +### Aggregated processing + +???+ note "Aggregate Processing" + When `aggregate=True`, your handler receives a list of all events, requiring you to manage the response format. Ensure your response includes results for each event in the expected [AppSync Request and Response Format](#appsync-request-and-response-format). + +In some scenarios, you might want to process all events for a channel as a batch rather than individually. This is useful when you need to: + +* Optimize database operations by making a single batch query +* Ensure all events are processed together or not at all +* Apply custom error handling logic for the entire batch + +You can enable this with the `aggregate` parameter: + +=== "working_with_aggregated_events.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_aggregated_events.py" + ``` + +### Handling errors + +You can filter or reject events by raising exceptions in your resolvers or by formatting the payload according to the expected response structure. This instructs AppSync not to propagate that specific message, so subscribers will not receive it. + +#### Handling errors with individual items + +When processing items individually with `aggregate=False`, you can raise an exception to fail a specific message. When this happens, the Event Handler will catch it and include the exception name and message in the response. + +=== "working_with_error_handling.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling.py" + ``` + +=== "working_with_error_handling_response.json" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" + ``` + +#### Handling errors with batch of items + +When processing batch of items with `aggregate=True`, you must format the payload according the expected response. + +=== "working_with_error_handling_multiple.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_multiple.py" + ``` + +=== "working_with_error_handling_response.json" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" + ``` + +If instead you want to fail the entire batch, you can throw an exception. This will cause the Event Handler to return an error response to AppSync and fail the entire batch. + +=== "working_with_error_handling_multiple.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_multiple.py" + ``` + +=== "working_with_error_handling_response.json" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" + ``` + +#### Authorization control + +!!! warning "Raising `UnauthorizedException` will cause the Lambda invocation to fail." + +You can also do content based authorization for channel by raising the `UnauthorizedException` exception. This can cause two situations: + +* **When working with publish events** Powertools for AWS stop processing messages and subscribers will not receive any message. +* **When working with subscribe events** the subscription won't be established. + +=== "working_with_error_handling.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling.py" + ``` + +=== "working_with_error_handling_response.json" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" + ``` + +### Processing events with async resolvers + +Use the `@app.async_on_publish()` decorator to process events asynchronously. + +We use `asyncio` module to support async functions, and we ensure reliable execution by managing the event loop. + +???+ note "Events order and AppSync Events" + AppSync does not rely on event order. As long as each event includes the original `id`, AppSync processes them correctly regardless of the order in which they are received. + +=== "working_with_async_resolvers.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/working_with_async_resolvers.py" + ``` + +### Accessing Lambda context and event + +You can access to the original Lambda event or context for additional information. These are accessible via the app instance: + +=== "accessing_event_and_context.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/accessing_event_and_context.py" + ``` + +## Event Handler workflow + +### Working with single items + +
+```mermaid +sequenceDiagram + participant Client + participant AppSync + participant Lambda + participant EventHandler + note over Client,EventHandler: Individual Event Processing (aggregate=False) + Client->>+AppSync: Send multiple events to channel + AppSync->>+Lambda: Invoke Lambda with batch of events + Lambda->>+EventHandler: Process events with aggregate=False + loop For each event in batch + EventHandler->>EventHandler: Process individual event + end + EventHandler-->>-Lambda: Return array of processed events + Lambda-->>-AppSync: Return event-by-event responses + AppSync-->>-Client: Report individual event statuses +``` +
+ +### Working with aggregated items + +
+```mermaid +sequenceDiagram + participant Client + participant AppSync + participant Lambda + participant EventHandler + note over Client,EventHandler: Aggregate Processing Workflow + Client->>+AppSync: Send multiple events to channel + AppSync->>+Lambda: Invoke Lambda with batch of events + Lambda->>+EventHandler: Process events with aggregate=True + EventHandler->>EventHandler: Batch of events + EventHandler->>EventHandler: Process entire batch at once + EventHandler->>EventHandler: Format response for each event + EventHandler-->>-Lambda: Return aggregated results + Lambda-->>-AppSync: Return success responses + AppSync-->>-Client: Confirm all events processed +``` +
+ +### Authorization fails for publish + +
+```mermaid +sequenceDiagram + participant Client + participant AppSync + participant Lambda + participant EventHandler + note over Client,EventHandler: Publish Event Authorization Flow + Client->>AppSync: Publish message to channel + AppSync->>Lambda: Invoke Lambda with publish event + Lambda->>EventHandler: Process publish event + alt Authorization Failed + EventHandler->>EventHandler: Authorization check fails + EventHandler->>Lambda: Raise UnauthorizedException + Lambda->>AppSync: Return error response + AppSync--xClient: Message not delivered + AppSync--xAppSync: No distribution to subscribers + else Authorization Passed + EventHandler->>Lambda: Return successful response + Lambda->>AppSync: Return processed event + AppSync->>Client: Acknowledge message + AppSync->>AppSync: Distribute to subscribers + end +``` +
+ +### Authorization fails for subscribe + +
+```mermaid +sequenceDiagram + participant Client + participant AppSync + participant Lambda + participant EventHandler + note over Client,EventHandler: Subscribe Event Authorization Flow + Client->>AppSync: Request subscription to channel + AppSync->>Lambda: Invoke Lambda with subscribe event + Lambda->>EventHandler: Process subscribe event + alt Authorization Failed + EventHandler->>EventHandler: Authorization check fails + EventHandler->>Lambda: Raise UnauthorizedException + Lambda->>AppSync: Return error response + AppSync--xClient: Subscription denied (HTTP 403) + else Authorization Passed + EventHandler->>Lambda: Return successful response + Lambda->>AppSync: Return authorization success + AppSync->>Client: Subscription established + end +``` +
+ +## Testing your code + +You can test your event handlers by passing a mocked or actual AppSync Events Lambda event. + +### Testing publish events + +=== "getting_started_with_testing_publish.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_publish.py" + ``` + +=== "getting_started_with_testing_publish_event.json" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_publish_event.json" + ``` + +### Testing subscribe events + +=== "getting_started_with_testing_subscribe.py" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe.py" + ``` + +=== "getting_started_with_testing_subscribe_event.json" + + ```python hl_lines="5 6 13" + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe_event.json" + ``` diff --git a/examples/event_handler_appsync_events/src/accessing_event_and_context.py b/examples/event_handler_appsync_events/src/accessing_event_and_context.py new file mode 100644 index 00000000000..db6f456e704 --- /dev/null +++ b/examples/event_handler_appsync_events/src/accessing_event_and_context.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] +from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEventsEvent # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +class ValidationError(Exception): + pass + + +@app.on_publish("/default/channel1") +def handle_channel1_publish(payload: dict[str, Any]): + # Access the full event and context + lambda_event: AppSyncResolverEventsEvent = app.current_event + lambda_context: LambdaContext = app.context + + # Access request headers + headers = lambda_event.get("request", {}).get("headers", {}) + + # Check remaining time + remaining_time = lambda_context.get_remaining_time_in_millis() + + return { + "originalMessage": payload, + "userAgent": headers.get("User-Agent"), + "timeRemaining": remaining_time, + } + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/getting_started_with_publish_events.py b/examples/event_handler_appsync_events/src/getting_started_with_publish_events.py new file mode 100644 index 00000000000..10b0e73160e --- /dev/null +++ b/examples/event_handler_appsync_events/src/getting_started_with_publish_events.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +@app.on_publish("/default/channel") +def handle_channel1_publish(payload: dict[str, Any]): + # Process the payload for this specific channel + return { + "processed": True, + "original_payload": payload, + } + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/getting_started_with_subscribe_events.py b/examples/event_handler_appsync_events/src/getting_started_with_subscribe_events.py new file mode 100644 index 00000000000..6626c36ab4a --- /dev/null +++ b/examples/event_handler_appsync_events/src/getting_started_with_subscribe_events.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +@app.on_subscribe("/*") +def handle_all_subscriptions(): + path = app.current_event.info.channel_path + + # Perform access control checks + if not is_authorized(path): + raise Exception("You are not authorized to subscribe to this channel") + + return True + + +def is_authorized(path: str): + # Your authorization logic here + return path != "not_allowed_path_here" + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/getting_started_with_testing_publish.py b/examples/event_handler_appsync_events/src/getting_started_with_testing_publish.py new file mode 100644 index 00000000000..248447f5ff1 --- /dev/null +++ b/examples/event_handler_appsync_events/src/getting_started_with_testing_publish.py @@ -0,0 +1,42 @@ +import json +from pathlib import Path + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + + +class LambdaContext: + def __init__(self): + self.function_name = "test-func" + self.memory_limit_in_mb = 128 + self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func" + self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72" + + def get_remaining_time_in_millis(self) -> int: + return 1000 + + +def test_publish_event_with_synchronous_resolver(): + """Test handling a publish event with a synchronous resolver.""" + # GIVEN a sample publish event + with Path.open("getting_started_with_testing_publish_event.json", "r") as f: + event = json.load(f) + + lambda_context = LambdaContext() + + # GIVEN an AppSyncEventsResolver with a synchronous resolver + app = AppSyncEventsResolver() + + @app.on_publish(path="/default/*") + def test_handler(payload): + return {"processed": True, "data": payload["data"]} + + # WHEN we resolve the event + result = app.resolve(event, lambda_context) + + # THEN we should get the correct response + expected_result = { + "events": [ + {"id": "123", "payload": {"processed": True, "data": "test data"}}, + ], + } + assert result == expected_result diff --git a/examples/event_handler_appsync_events/src/getting_started_with_testing_publish_event.json b/examples/event_handler_appsync_events/src/getting_started_with_testing_publish_event.json new file mode 100644 index 00000000000..d3b69ce3ac3 --- /dev/null +++ b/examples/event_handler_appsync_events/src/getting_started_with_testing_publish_event.json @@ -0,0 +1,64 @@ +{ + "identity":"None", + "result":"None", + "request":{ + "headers": { + "x-forwarded-for": "1.1.1.1, 2.2.2.2", + "cloudfront-viewer-country": "US", + "cloudfront-is-tablet-viewer": "false", + "via": "2.0 xxxxxxxxxxxxxxxx.cloudfront.net (CloudFront)", + "cloudfront-forwarded-proto": "https", + "origin": "https://us-west-1.console.aws.amazon.com", + "content-length": "217", + "accept-language": "en-US,en;q=0.9", + "host": "xxxxxxxxxxxxxxxx.appsync-api.us-west-1.amazonaws.com", + "x-forwarded-proto": "https", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36", + "accept": "*/*", + "cloudfront-is-mobile-viewer": "false", + "cloudfront-is-smarttv-viewer": "false", + "accept-encoding": "gzip, deflate, br", + "referer": "https://us-west-1.console.aws.amazon.com/appsync/home?region=us-west-1", + "content-type": "application/json", + "sec-fetch-mode": "cors", + "x-amz-cf-id": "3aykhqlUwQeANU-HGY7E_guV5EkNeMMtwyOgiA==", + "x-amzn-trace-id": "Root=1-5f512f51-fac632066c5e848ae714", + "authorization": "eyJraWQiOiJScWFCSlJqYVJlM0hrSnBTUFpIcVRXazNOW...", + "sec-fetch-dest": "empty", + "x-amz-user-agent": "AWS-Console-AppSync/", + "cloudfront-is-desktop-viewer": "true", + "sec-fetch-site": "cross-site", + "x-forwarded-port": "443" + }, + "domainName":"None" + }, + "info":{ + "channel":{ + "path":"/default/channel", + "segments":[ + "default", + "channel" + ] + }, + "channelNamespace":{ + "name":"default" + }, + "operation":"PUBLISH" + }, + "error":"None", + "prev":"None", + "stash":{ + + }, + "outErrors":[ + + ], + "events":[ + { + "payload":{ + "data": "test data" + }, + "id":"123" + } + ] + } diff --git a/examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe.py b/examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe.py new file mode 100644 index 00000000000..d91ff76b38b --- /dev/null +++ b/examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe.py @@ -0,0 +1,37 @@ +import json +from pathlib import Path + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + + +class LambdaContext: + def __init__(self): + self.function_name = "test-func" + self.memory_limit_in_mb = 128 + self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func" + self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72" + + def get_remaining_time_in_millis(self) -> int: + return 1000 + + +def test_subscribe_event_with_valid_return(): + """Test error handling during publish event processing.""" + # GIVEN a sample publish event + with Path.open("getting_started_with_testing_publish_event.json", "r") as f: + event = json.load(f) + + lambda_context = LambdaContext() + + # GIVEN an AppSyncEventsResolver with a resolver that returns ok + app = AppSyncEventsResolver() + + @app.on_subscribe(path="/default/*") + def test_handler(): + pass + + # WHEN we resolve the event + result = app.resolve(event, lambda_context) + + # THEN we should return None because subscribe always must return None + assert result is None diff --git a/examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe_event.json b/examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe_event.json new file mode 100644 index 00000000000..40ff4c32886 --- /dev/null +++ b/examples/event_handler_appsync_events/src/getting_started_with_testing_subscribe_event.json @@ -0,0 +1,57 @@ +{ + "identity":"None", + "result":"None", + "request":{ + "headers": { + "x-forwarded-for": "1.1.1.1, 2.2.2.2", + "cloudfront-viewer-country": "US", + "cloudfront-is-tablet-viewer": "false", + "via": "2.0 xxxxxxxxxxxxxxxx.cloudfront.net (CloudFront)", + "cloudfront-forwarded-proto": "https", + "origin": "https://us-west-1.console.aws.amazon.com", + "content-length": "217", + "accept-language": "en-US,en;q=0.9", + "host": "xxxxxxxxxxxxxxxx.appsync-api.us-west-1.amazonaws.com", + "x-forwarded-proto": "https", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36", + "accept": "*/*", + "cloudfront-is-mobile-viewer": "false", + "cloudfront-is-smarttv-viewer": "false", + "accept-encoding": "gzip, deflate, br", + "referer": "https://us-west-1.console.aws.amazon.com/appsync/home?region=us-west-1", + "content-type": "application/json", + "sec-fetch-mode": "cors", + "x-amz-cf-id": "3aykhqlUwQeANU-HGY7E_guV5EkNeMMtwyOgiA==", + "x-amzn-trace-id": "Root=1-5f512f51-fac632066c5e848ae714", + "authorization": "eyJraWQiOiJScWFCSlJqYVJlM0hrSnBTUFpIcVRXazNOW...", + "sec-fetch-dest": "empty", + "x-amz-user-agent": "AWS-Console-AppSync/", + "cloudfront-is-desktop-viewer": "true", + "sec-fetch-site": "cross-site", + "x-forwarded-port": "443" + }, + "domainName":"None" + }, + "info":{ + "channel":{ + "path":"/default/channel", + "segments":[ + "default", + "channel" + ] + }, + "channelNamespace":{ + "name":"default" + }, + "operation":"SUBSCRIBE" + }, + "error":"None", + "prev":"None", + "stash":{ + + }, + "outErrors":[ + + ], + "events":[] + } diff --git a/examples/event_handler_appsync_events/src/payload_request.json b/examples/event_handler_appsync_events/src/payload_request.json new file mode 100644 index 00000000000..e7335cc70c5 --- /dev/null +++ b/examples/event_handler_appsync_events/src/payload_request.json @@ -0,0 +1,46 @@ +{ + "identity":"None", + "result":"None", + "request":{ + "headers": { + "x-forwarded-for": "1.1.1.1, 2.2.2.2", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36", + }, + "domainName":"None" + }, + "info":{ + "channel":{ + "path":"/default/channel", + "segments":[ + "default", + "channel" + ] + }, + "channelNamespace":{ + "name":"default" + }, + "operation":"PUBLISH" + }, + "error":"None", + "prev":"None", + "stash":{ + + }, + "outErrors":[ + + ], + "events":[ + { + "payload":{ + "data":"data_1" + }, + "id":"1" + }, + { + "payload":{ + "data":"data_2" + }, + "id":"2" + } + ] +} diff --git a/examples/event_handler_appsync_events/src/payload_response.json b/examples/event_handler_appsync_events/src/payload_response.json new file mode 100644 index 00000000000..dc21bb3ac09 --- /dev/null +++ b/examples/event_handler_appsync_events/src/payload_response.json @@ -0,0 +1,16 @@ +{ + "events":[ + { + "payload":{ + "data":"data_1" + }, + "id":"1" + }, + { + "payload":{ + "data":"data_2" + }, + "id":"2" + } + ] +} diff --git a/examples/event_handler_appsync_events/src/payload_response_fail_request.json b/examples/event_handler_appsync_events/src/payload_response_fail_request.json new file mode 100644 index 00000000000..2db9bb23778 --- /dev/null +++ b/examples/event_handler_appsync_events/src/payload_response_fail_request.json @@ -0,0 +1,3 @@ +{ + "error": "Exception - An exception occurred" +} diff --git a/examples/event_handler_appsync_events/src/payload_response_with_error.json b/examples/event_handler_appsync_events/src/payload_response_with_error.json new file mode 100644 index 00000000000..2ffdc0cef70 --- /dev/null +++ b/examples/event_handler_appsync_events/src/payload_response_with_error.json @@ -0,0 +1,14 @@ +{ + "events":[ + { + "error": "Error message", + "id":"1" + }, + { + "payload":{ + "data":"data_2" + }, + "id":"2" + } + ] +} diff --git a/examples/event_handler_appsync_events/src/working_with_aggregated_events.py b/examples/event_handler_appsync_events/src/working_with_aggregated_events.py new file mode 100644 index 00000000000..1d238027797 --- /dev/null +++ b/examples/event_handler_appsync_events/src/working_with_aggregated_events.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +@app.on_publish("/default/*", aggregate=True) +def handle_default_namespace_batch(payload_list: list[dict[str, Any]]): + results: list = [] + + # Process all events in the batch together + for event in payload_list: + # Process each event + results.append({"id": event.get("id"), "payload": {"processed": True, "originalEvent": event}}) + + return results + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/working_with_async_resolvers.py b/examples/event_handler_appsync_events/src/working_with_async_resolvers.py new file mode 100644 index 00000000000..b34645f1e74 --- /dev/null +++ b/examples/event_handler_appsync_events/src/working_with_async_resolvers.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +@app.async_on_publish("/default/channel1") +async def handle_channel1_publish(payload: dict[str, Any]): + result = await async_process_data(payload) + return result + + +async def async_process_data(payload: dict[str, Any]): + await asyncio.sleep(0.1) + return {"processed": payload, "async": True} + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/working_with_error_handling.py b/examples/event_handler_appsync_events/src/working_with_error_handling.py new file mode 100644 index 00000000000..459cf07a819 --- /dev/null +++ b/examples/event_handler_appsync_events/src/working_with_error_handling.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +class ValidationError(Exception): + pass + + +@app.on_publish("/default/channel") +def handle_channel1_publish(payload: dict[str, Any]): + if not is_valid_payload(payload): + raise ValidationError("Invalid payload format") + + return process_payload(payload) + + +def is_valid_payload(payload: dict[str, Any]): + return "data" in payload + + +def process_payload(payload: dict[str, Any]): + return {"processed": payload["data"]} + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/working_with_error_handling_multiple.py b/examples/event_handler_appsync_events/src/working_with_error_handling_multiple.py new file mode 100644 index 00000000000..73165b08029 --- /dev/null +++ b/examples/event_handler_appsync_events/src/working_with_error_handling_multiple.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +@app.on_publish("/default/*", aggregate=True) +def handle_default_namespace_batch(payload_list: list[dict[str, Any]]): + results: list = [] + + # Process all events in the batch together + for event in payload_list: + try: + # Process each event + results.append({"id": event.get("id"), "payload": {"processed": True, "originalEvent": event}}) + except Exception as e: + # Handle errors for individual events + results.append( + { + "error": str(e), + "id": event.get("id"), + }, + ) + + return results + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/examples/event_handler_appsync_events/src/working_with_error_handling_response.json b/examples/event_handler_appsync_events/src/working_with_error_handling_response.json new file mode 100644 index 00000000000..fe35279468d --- /dev/null +++ b/examples/event_handler_appsync_events/src/working_with_error_handling_response.json @@ -0,0 +1,14 @@ +{ + "events":[ + { + "error": "Error message", + "id":"1" + }, + { + "payload":{ + "data":"data_2" + }, + "id":"2" + } + ] + } diff --git a/examples/event_handler_appsync_events/src/working_with_wildcard_resolvers.py b/examples/event_handler_appsync_events/src/working_with_wildcard_resolvers.py new file mode 100644 index 00000000000..3a53c0f480a --- /dev/null +++ b/examples/event_handler_appsync_events/src/working_with_wildcard_resolvers.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.event_handler import AppSyncEventsResolver # type: ignore[attr-defined] + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.typing import LambdaContext + +app = AppSyncEventsResolver() + + +@app.on_publish("/default/channel1") +def handle_specific_channel(payload: dict[str, Any]): + # This handler will be called for events on /default/channel1 + return {"source": "specific_handler", "data": payload} + + +@app.on_publish("/default/*") +def handle_default_namespace(payload: dict[str, Any]): + # This handler will be called for all channels in the default namespace + # EXCEPT for /default/channel1 which has a more specific handler + return {"source": "namespace_handler", "data": payload} + + +@app.on_publish("/*") +def handle_all_channels(payload: dict[str, Any]): + # This handler will be called for all channels in all namespaces + # EXCEPT for those that have more specific handlers + return {"source": "catch_all_handler", "data": payload} + + +def lambda_handler(event: dict, context: LambdaContext): + return app.resolve(event, context) diff --git a/mkdocs.yml b/mkdocs.yml index 1bd460bf218..de566fb1f08 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -24,6 +24,7 @@ nav: - Event Handler: - core/event_handler/api_gateway.md - core/event_handler/appsync.md + - core/event_handler/appsync_events.md - core/event_handler/bedrock_agents.md - utilities/parameters.md - utilities/batch.md