|
| 1 | +--- |
| 2 | +title: AppSync Events |
| 3 | +description: Core utility |
| 4 | +--- |
| 5 | + |
| 6 | +Event Handler for AWS AppSync real-time events. |
| 7 | + |
| 8 | +```mermaid |
| 9 | +stateDiagram-v2 |
| 10 | + direction LR |
| 11 | + EventSource: AppSync Events |
| 12 | + EventHandlerResolvers: Publish & Subscribe events |
| 13 | + LambdaInit: Lambda invocation |
| 14 | + EventHandler: Event Handler |
| 15 | + EventHandlerResolver: Route event based on namespace/channel |
| 16 | + YourLogic: Run your registered handler function |
| 17 | + EventHandlerResolverBuilder: Adapts response to AppSync contract |
| 18 | + LambdaResponse: Lambda response |
| 19 | +
|
| 20 | + state EventSource { |
| 21 | + EventHandlerResolvers |
| 22 | + } |
| 23 | +
|
| 24 | + EventHandlerResolvers --> LambdaInit |
| 25 | +
|
| 26 | + LambdaInit --> EventHandler |
| 27 | + EventHandler --> EventHandlerResolver |
| 28 | +
|
| 29 | + state EventHandler { |
| 30 | + [*] --> EventHandlerResolver: app.resolve(event, context) |
| 31 | + EventHandlerResolver --> YourLogic |
| 32 | + YourLogic --> EventHandlerResolverBuilder |
| 33 | + } |
| 34 | +
|
| 35 | + EventHandler --> LambdaResponse |
| 36 | +``` |
| 37 | + |
| 38 | +## Key Features |
| 39 | + |
| 40 | +* Easily handle publish and subscribe events with dedicated handler methods |
| 41 | +* Automatic routing based on namespace and channel patterns |
| 42 | +* Support for wildcard patterns to create catch-all handlers |
| 43 | +* Process events in parallel or sequentially |
| 44 | +* Control over event aggregation for batch processing |
| 45 | +* Graceful error handling for individual events |
| 46 | + |
| 47 | +## Terminology |
| 48 | + |
| 49 | +**[AWS AppSync Events](https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-welcome.html){target="_blank"}**. A service that enables you to quickly build secure, scalable real-time WebSocket APIs without managing infrastructure or writing API code. It handles connection management, message broadcasting, authentication, and monitoring, reducing time to market and operational costs. |
| 50 | + |
| 51 | +## Getting started |
| 52 | + |
| 53 | +???+ tip "Tip: New to AppSync Real-time API?" |
| 54 | + Visit [AWS AppSync Real-time documentation](https://docs.aws.amazon.com/appsync/latest/eventapi/event-api-getting-started.html){target="_blank"} to understand how to set up subscriptions and pub/sub messaging. |
| 55 | + |
| 56 | +### Required resources |
| 57 | + |
| 58 | +You must have an existing AppSync Events API with real-time capabilities enabled and IAM permissions to invoke your Lambda function. That said, there are no additional permissions required to use Event Handler as routing requires no dependency (_standard library_). |
| 59 | + |
| 60 | +### AppSync request and response format |
| 61 | + |
| 62 | +AppSync Events uses a specific event format for Lambda requests and responses. In most scenarios, Powertools simplifies this interaction by automatically formatting resolver returns to match the expected AppSync response structure. |
| 63 | + |
| 64 | +=== "appsync_payload_request.json" |
| 65 | + |
| 66 | + ```python hl_lines="5 10 12" |
| 67 | + --8<-- "examples/event_handler_appsync_events/src/appsync_payload_request.json" |
| 68 | + ``` |
| 69 | + |
| 70 | +=== "appsync_payload_response.json" |
| 71 | + |
| 72 | + ```python hl_lines="5 10 12" |
| 73 | + --8<-- "examples/event_handler_appsync_events/src/appsync_payload_response.json" |
| 74 | + ``` |
| 75 | + |
| 76 | +=== "appsync_payload_response_with_error.json" |
| 77 | + |
| 78 | + ```python hl_lines="5 10 12" |
| 79 | + --8<-- "examples/event_handler_appsync_events/src/appsync_payload_response_with_error.json" |
| 80 | + ``` |
| 81 | + |
| 82 | +#### Events Response with Error |
| 83 | + |
| 84 | +When processing events with Lambda, you can return errors to AppSync in two ways: |
| 85 | + |
| 86 | +* **Error per item:** Return an `error` key within each individual item's response. AppSync Events expects this format for item-specific errors. |
| 87 | +* **Fail entire request:** Return a JSON object with a top-level `error` key. This signals a general failure, and AppSync treats the entire request as unsuccessful. |
| 88 | + |
| 89 | +### Resolver decorator |
| 90 | + |
| 91 | +???+ important |
| 92 | + The event handler automatically parses the incoming event data and invokes the appropriate handler based on the namespace/channel pattern you register. |
| 93 | + |
| 94 | +You can define your handlers for different event types using the `app.on_publish()`, `app.async_on_publish()`, and `app.on_subscribe()` methods. |
| 95 | + |
| 96 | +=== "getting_started_with_publish_events.py" |
| 97 | + |
| 98 | + ```python hl_lines="5 10 12" |
| 99 | + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_publish_events.py" |
| 100 | + ``` |
| 101 | + |
| 102 | +=== "getting_started_with_subscribe_events.py" |
| 103 | + |
| 104 | + ```python hl_lines="5 6 13" |
| 105 | + --8<-- "examples/event_handler_appsync_events/src/getting_started_with_subscribe_events.py" |
| 106 | + ``` |
| 107 | + |
| 108 | +## Advanced |
| 109 | + |
| 110 | +### Wildcard patterns and handler precedence |
| 111 | + |
| 112 | +You can use wildcard patterns to create catch-all handlers for multiple channels or namespaces. This is particularly useful for centralizing logic that applies to multiple channels. |
| 113 | + |
| 114 | +When multiple handlers could match the same event, the most specific pattern takes precedence. |
| 115 | + |
| 116 | +=== "working_with_wildcard_resolvers.py" |
| 117 | + |
| 118 | + ```python hl_lines="5 6 13" |
| 119 | + --8<-- "examples/event_handler_appsync_events/src/working_with_wildcard_resolvers.py" |
| 120 | + ``` |
| 121 | + |
| 122 | +???+ note "Supported wildcard patterns" |
| 123 | + Only the following patterns are supported: |
| 124 | + |
| 125 | + * `/namespace/*` - Matches all channels in the specified namespace |
| 126 | + * `/*` - Matches all channels in all namespaces |
| 127 | + |
| 128 | + Patterns like `/namespace/channel*` or `/namespace/*/subpath` are not supported. |
| 129 | + |
| 130 | + More specific routes will always take precedence over less specific ones. For example, `/default/channel1` will take precedence over `/default/*`, which will take precedence over `/*`. |
| 131 | + |
| 132 | +### Processing events with async resolvers |
| 133 | + |
| 134 | +Use the `@app.async_on_publish()` decorator to process events asynchronously. |
| 135 | + |
| 136 | +We use `asyncio` module to support async functions, and we ensure reliable execution by managing the event loop. |
| 137 | + |
| 138 | +???+ note "Events order and AppSync Events" |
| 139 | + AppSync does not rely on event order. As long as each event includes the original `id`, AppSync processes them correctly regardless of the order in which they are received. |
| 140 | + |
| 141 | +=== "working_with_async_resolvers.py" |
| 142 | + |
| 143 | + ```python hl_lines="5 6 13" |
| 144 | + --8<-- "examples/event_handler_appsync_events/src/working_with_async_resolvers.py" |
| 145 | + ``` |
| 146 | + |
| 147 | +### Aggregated processing |
| 148 | + |
| 149 | +???+ note "Aggregate Processing" |
| 150 | + When `aggregate=True`, your handler receives a list of all events, requiring you to manage the response format. Ensure your response includes results for each event in the expected [AppSync Request and Response Format](#appsync-request-and-response-format). |
| 151 | + |
| 152 | +In some scenarios, you might want to process all events for a channel as a batch rather than individually. This is useful when you need to: |
| 153 | + |
| 154 | +* Optimize database operations by making a single batch query |
| 155 | +* Ensure all events are processed together or not at all |
| 156 | +* Apply custom error handling logic for the entire batch |
| 157 | + |
| 158 | +You can enable this with the `aggregate` parameter: |
| 159 | + |
| 160 | +=== "working_with_aggregated_events.py" |
| 161 | + |
| 162 | + ```python hl_lines="5 6 13" |
| 163 | + --8<-- "examples/event_handler_appsync_events/src/working_with_aggregated_events.py" |
| 164 | + ``` |
| 165 | + |
| 166 | +### Handling errors |
| 167 | + |
| 168 | +You can filter or reject events by raising exceptions in your handlers. The event handler will catch these exceptions and include appropriate error information in the response. |
| 169 | + |
| 170 | +#### Error with individual itens |
| 171 | + |
| 172 | +**Note:** When using `aggregate=True`, you are responsible for formatting the error response according to the expected format. |
| 173 | + |
| 174 | +=== "working_with_error_handling.py" |
| 175 | + |
| 176 | + ```python hl_lines="5 6 13" |
| 177 | + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling.py" |
| 178 | + ``` |
| 179 | + |
| 180 | +=== "working_with_error_handling_response.json" |
| 181 | + |
| 182 | + ```python hl_lines="5 6 13" |
| 183 | + --8<-- "examples/event_handler_appsync_events/src/working_with_error_handling_response.json" |
| 184 | + ``` |
| 185 | + |
| 186 | +#### Rejecting the entire request |
| 187 | + |
| 188 | + |
| 189 | +### Accessing Lambda context and event |
| 190 | + |
| 191 | +You might need access to the original Lambda event or context for additional information. These are accessible via the app instance: |
| 192 | + |
| 193 | +=== "context_access.py" |
| 194 | + |
| 195 | + ```python |
| 196 | + from aws_lambda_powertools.event_handler.appsync_events import AppSyncEventsResolver |
| 197 | + |
| 198 | + app = AppSyncEventsResolver() |
| 199 | + |
| 200 | + @app.on_publish("/default/channel1") |
| 201 | + def handle_channel1_publish(payload): |
| 202 | + # Access the full event and context |
| 203 | + full_event = app.event |
| 204 | + lambda_context = app.context |
| 205 | + |
| 206 | + # Access request headers |
| 207 | + headers = full_event.get("request", {}).get("headers", {}) |
| 208 | + |
| 209 | + # Check remaining time |
| 210 | + remaining_time = lambda_context.get_remaining_time_in_millis() |
| 211 | + |
| 212 | + return { |
| 213 | + "payload": payload, |
| 214 | + "userAgent": headers.get("User-Agent"), |
| 215 | + "timeRemaining": remaining_time |
| 216 | + } |
| 217 | + |
| 218 | + def lambda_handler(event, context): |
| 219 | + return app.resolve(event, context) |
| 220 | + ``` |
| 221 | + |
| 222 | +## Testing your code |
| 223 | + |
| 224 | +You can test your event handlers by passing a mocked or actual AppSync Events Lambda event. |
| 225 | + |
| 226 | +=== "test_publish_handler.py" |
| 227 | + |
| 228 | + ```python |
| 229 | + import json |
| 230 | + from my_lambda_function import app |
| 231 | + |
| 232 | + def test_publish_handler(): |
| 233 | + # Load a sample event |
| 234 | + with open("sample_publish_event.json", "r") as f: |
| 235 | + event = json.load(f) |
| 236 | + |
| 237 | + # Test the handler directly |
| 238 | + result = app.resolve(event, {}) |
| 239 | + |
| 240 | + # Verify the result |
| 241 | + assert len(result["events"]) == 2 |
| 242 | + assert result["events"][0]["payload"]["processed"] is True |
| 243 | + ``` |
| 244 | + |
| 245 | +=== "sample_publish_event.json" |
| 246 | + |
| 247 | + ```json |
| 248 | + { |
| 249 | + "events": [ |
| 250 | + { |
| 251 | + "id": "event1", |
| 252 | + "payload": {"message": "Hello World"}, |
| 253 | + "channel": { |
| 254 | + "path": "/default/channel1" |
| 255 | + } |
| 256 | + }, |
| 257 | + { |
| 258 | + "id": "event2", |
| 259 | + "payload": {"message": "Test Message"}, |
| 260 | + "channel": { |
| 261 | + "path": "/default/channel2" |
| 262 | + } |
| 263 | + } |
| 264 | + ], |
| 265 | + "request": { |
| 266 | + "headers": { |
| 267 | + "x-api-key": "test-api-key" |
| 268 | + } |
| 269 | + } |
| 270 | + } |
| 271 | + ``` |
| 272 | + |
| 273 | +For subscription events: |
| 274 | + |
| 275 | +=== "test_subscribe_handler.py" |
| 276 | + |
| 277 | + ```python |
| 278 | + import json |
| 279 | + from my_lambda_function import app |
| 280 | + |
| 281 | + def test_subscribe_handler(): |
| 282 | + # Load a sample subscription event |
| 283 | + with open("sample_subscribe_event.json", "r") as f: |
| 284 | + event = json.load(f) |
| 285 | + |
| 286 | + # Test the handler directly |
| 287 | + result = app.resolve(event, {}) |
| 288 | + |
| 289 | + # Verify the result |
| 290 | + assert result is True |
| 291 | + ``` |
| 292 | + |
| 293 | +=== "sample_subscribe_event.json" |
| 294 | + |
| 295 | + ```json |
| 296 | + { |
| 297 | + "type": "SUBSCRIBE", |
| 298 | + "channel": { |
| 299 | + "path": "/default/channel1" |
| 300 | + }, |
| 301 | + "identity": { |
| 302 | + "username": "testuser" |
| 303 | + } |
| 304 | + } |
| 305 | + ``` |
0 commit comments