forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_resolvers.py
36 lines (26 loc) · 1.12 KB
/
async_resolvers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
from typing import List, TypedDict
import aiohttp
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.event_handler import AppSyncResolver
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.tracing import aiohttp_trace_config
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
logger = Logger()
app = AppSyncResolver()
class Todo(TypedDict, total=False):
id: str # noqa AA03 VNE003, required due to GraphQL Schema
userId: str
title: str
completed: bool
@app.resolver(type_name="Query", field_name="listTodos")
async def list_todos() -> List[Todo]:
async with aiohttp.ClientSession(trace_configs=[aiohttp_trace_config()]) as session:
async with session.get("https://jsonplaceholder.typicode.com/todos") as resp:
return await resp.json()
@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
result = app.resolve(event, context)
return asyncio.run(result)