forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinfrastructure.py
32 lines (24 loc) · 1.23 KB
/
infrastructure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from aws_cdk import CfnOutput, RemovalPolicy
from aws_cdk import aws_dynamodb as dynamodb
from aws_cdk.aws_dynamodb import Table
from tests.e2e.utils.infrastructure import BaseInfrastructure
class IdempotencyDynamoDBStack(BaseInfrastructure):
def create_resources(self):
table = self._create_dynamodb_table()
env_vars = {"IdempotencyTable": table.table_name}
functions = self.create_lambda_functions(function_props={"environment": env_vars})
table.grant_read_write_data(functions["TtlCacheExpirationHandler"])
table.grant_read_write_data(functions["TtlCacheTimeoutHandler"])
table.grant_read_write_data(functions["ParallelExecutionHandler"])
table.grant_read_write_data(functions["ParallelFunctionsHandler"])
def _create_dynamodb_table(self) -> Table:
table = dynamodb.Table(
self.stack,
"Idempotency",
removal_policy=RemovalPolicy.DESTROY,
partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
time_to_live_attribute="expiration",
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
)
CfnOutput(self.stack, "DynamoDBTable", value=table.table_name)
return table