-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathpowertools_json_idempotency_jmespath.py
37 lines (26 loc) · 1.17 KB
/
powertools_json_idempotency_jmespath.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
from uuid import uuid4
import requests
from aws_lambda_powertools.utilities.idempotency import (
DynamoDBPersistenceLayer,
IdempotencyConfig,
idempotent,
)
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
# Treat everything under the "body" key
# in the event json object as our payload
config = IdempotencyConfig(event_key_jmespath="powertools_json(body)")
class PaymentError(Exception): ...
@idempotent(config=config, persistence_store=persistence_layer)
def handler(event, context) -> dict:
body = json.loads(event["body"])
try:
payment: dict = create_subscription_payment(user=body["user"], product_id=body["product_id"])
return {"payment_id": payment.get("id"), "message": "success", "statusCode": 200}
except requests.HTTPError as e:
raise PaymentError("Unable to create payment subscription") from e
def create_subscription_payment(user: str, product_id: str) -> dict:
payload = {"user": user, "product_id": product_id}
ret: requests.Response = requests.post(url="https://httpbin.org/anything", data=payload)
ret.raise_for_status()
return {"id": f"{uuid4()}", "message": "paid"}