|
7 | 7 | import json
|
8 | 8 | import logging
|
9 | 9 | from abc import ABC, abstractmethod
|
10 |
| -from typing import Any, Dict, Optional, Union |
| 10 | +from typing import Any, Dict |
11 | 11 |
|
12 |
| -import boto3 |
13 | 12 | import jmespath
|
14 |
| -from botocore.config import Config |
15 | 13 |
|
16 |
| -from .cache_dict import LRUDict |
17 |
| -from .exceptions import IdempotencyValidationerror, InvalidStatusError, ItemAlreadyExistsError, ItemNotFoundError |
| 14 | +from aws_lambda_powertools.utilities.idempotency.cache_dict import LRUDict |
| 15 | +from aws_lambda_powertools.utilities.idempotency.exceptions import ( |
| 16 | + IdempotencyValidationerror, |
| 17 | + InvalidStatusError, |
| 18 | + ItemAlreadyExistsError, |
| 19 | +) |
18 | 20 |
|
19 | 21 | logger = logging.getLogger(__name__)
|
20 | 22 |
|
@@ -297,8 +299,16 @@ def save_inprogress(self, event: Dict[str, Any]) -> None:
|
297 | 299 | )
|
298 | 300 |
|
299 | 301 | logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")
|
| 302 | + |
| 303 | + if self.use_local_cache: |
| 304 | + record = self._retrieve_from_cache(idempotency_key=data_record.idempotency_key) |
| 305 | + if record: |
| 306 | + raise ItemAlreadyExistsError |
| 307 | + |
300 | 308 | self._put_record(data_record)
|
301 | 309 |
|
| 310 | + # This has to come after _put_record. If _put_record call raises ItemAlreadyExists we shouldn't populate the |
| 311 | + # cache with an "INPROGRESS" record as we don't know the status in the data store at this point. |
302 | 312 | if self.use_local_cache:
|
303 | 313 | self._save_to_cache(data_record)
|
304 | 314 |
|
@@ -354,6 +364,10 @@ def get_record(self, lambda_event) -> DataRecord:
|
354 | 364 | return cached_record
|
355 | 365 |
|
356 | 366 | record = self._get_record(idempotency_key)
|
| 367 | + |
| 368 | + if self.use_local_cache: |
| 369 | + self._save_to_cache(data_record=record) |
| 370 | + |
357 | 371 | self._validate_payload(lambda_event, record)
|
358 | 372 | return record
|
359 | 373 |
|
@@ -416,145 +430,3 @@ def _delete_record(self, data_record: DataRecord) -> None:
|
416 | 430 | """
|
417 | 431 |
|
418 | 432 | raise NotImplementedError
|
419 |
| - |
420 |
| - |
421 |
| -class DynamoDBPersistenceLayer(BasePersistenceLayer): |
422 |
| - def __init__( |
423 |
| - self, |
424 |
| - table_name: str, # Can we use the lambda function name? |
425 |
| - key_attr: Optional[str] = "id", |
426 |
| - expiry_attr: Optional[str] = "expiration", |
427 |
| - status_attr: Optional[str] = "status", |
428 |
| - data_attr: Optional[str] = "data", |
429 |
| - validation_key_attr: Optional[str] = "validation", |
430 |
| - boto_config: Optional[Config] = None, |
431 |
| - *args, |
432 |
| - **kwargs, |
433 |
| - ): |
434 |
| - """ |
435 |
| - Initialize the DynamoDB client |
436 |
| -
|
437 |
| - Parameters |
438 |
| - ---------- |
439 |
| - table_name: str |
440 |
| - Name of the table to use for storing execution records |
441 |
| - key_attr: str, optional |
442 |
| - DynamoDB attribute name for key, by default "id" |
443 |
| - expiry_attr: str, optional |
444 |
| - DynamoDB attribute name for expiry timestamp, by default "expiration" |
445 |
| - status_attr: str, optional |
446 |
| - DynamoDB attribute name for status, by default "status" |
447 |
| - data_attr: str, optional |
448 |
| - DynamoDB attribute name for response data, by default "data" |
449 |
| - boto_config: botocore.config.Config, optional |
450 |
| - Botocore configuration to pass during client initialization |
451 |
| - args |
452 |
| - kwargs |
453 |
| -
|
454 |
| - Examples |
455 |
| - -------- |
456 |
| - **Create a DynamoDB persistence layer with custom settings** |
457 |
| - >>> from aws_lambda_powertools.utilities.idempotency import idempotent, DynamoDBPersistenceLayer |
458 |
| - >>> |
459 |
| - >>> persistence_store = DynamoDBPersistenceLayer(event_key="body", table_name="idempotency_store") |
460 |
| - >>> |
461 |
| - >>> @idempotent(persistence=persistence_store) |
462 |
| - >>> def handler(event, context): |
463 |
| - >>> return {"StatusCode": 200} |
464 |
| - """ |
465 |
| - |
466 |
| - boto_config = boto_config or Config() |
467 |
| - self._ddb_resource = boto3.resource("dynamodb", config=boto_config) |
468 |
| - self.table_name = table_name |
469 |
| - self.table = self._ddb_resource.Table(self.table_name) |
470 |
| - self.key_attr = key_attr |
471 |
| - self.expiry_attr = expiry_attr |
472 |
| - self.status_attr = status_attr |
473 |
| - self.data_attr = data_attr |
474 |
| - self.validation_key_attr = validation_key_attr |
475 |
| - super(DynamoDBPersistenceLayer, self).__init__(*args, **kwargs) |
476 |
| - |
477 |
| - def _item_to_data_record(self, item: Dict[str, Union[str, int]]) -> DataRecord: |
478 |
| - """ |
479 |
| - Translate raw item records from DynamoDB to DataRecord |
480 |
| -
|
481 |
| - Parameters |
482 |
| - ---------- |
483 |
| - item: Dict[str, Union[str, int]] |
484 |
| - Item format from dynamodb response |
485 |
| -
|
486 |
| - Returns |
487 |
| - ------- |
488 |
| - DataRecord |
489 |
| - representation of item |
490 |
| -
|
491 |
| - """ |
492 |
| - return DataRecord( |
493 |
| - idempotency_key=item[self.key_attr], |
494 |
| - status=item[self.status_attr], |
495 |
| - expiry_timestamp=item[self.expiry_attr], |
496 |
| - response_data=item.get(self.data_attr), |
497 |
| - payload_hash=item.get(self.validation_key_attr), |
498 |
| - ) |
499 |
| - |
500 |
| - def _get_record(self, idempotency_key) -> DataRecord: |
501 |
| - response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True) |
502 |
| - |
503 |
| - try: |
504 |
| - item = response["Item"] |
505 |
| - except KeyError: |
506 |
| - raise ItemNotFoundError |
507 |
| - return self._item_to_data_record(item) |
508 |
| - |
509 |
| - def _put_record(self, data_record: DataRecord) -> None: |
510 |
| - |
511 |
| - item = { |
512 |
| - self.key_attr: data_record.idempotency_key, |
513 |
| - self.expiry_attr: data_record.expiry_timestamp, |
514 |
| - self.status_attr: STATUS_CONSTANTS["INPROGRESS"], |
515 |
| - } |
516 |
| - |
517 |
| - if self.payload_validation_enabled: |
518 |
| - item[self.validation_key_attr] = data_record.payload_hash |
519 |
| - |
520 |
| - now = datetime.datetime.now() |
521 |
| - try: |
522 |
| - logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") |
523 |
| - self.table.put_item( |
524 |
| - Item=item, |
525 |
| - ConditionExpression=f"attribute_not_exists({self.key_attr}) OR expiration < :now", |
526 |
| - ExpressionAttributeValues={":now": int(now.timestamp())}, |
527 |
| - ) |
528 |
| - except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException: |
529 |
| - logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") |
530 |
| - raise ItemAlreadyExistsError |
531 |
| - |
532 |
| - def _update_record(self, data_record: DataRecord): |
533 |
| - logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") |
534 |
| - update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status" |
535 |
| - expression_attr_values = { |
536 |
| - ":expiry": data_record.expiry_timestamp, |
537 |
| - ":response_data": data_record.response_data, |
538 |
| - ":status": data_record.status, |
539 |
| - } |
540 |
| - expression_attr_names = { |
541 |
| - "#response_data": self.data_attr, |
542 |
| - "#expiry": self.expiry_attr, |
543 |
| - "#status": self.status_attr, |
544 |
| - } |
545 |
| - |
546 |
| - if self.payload_validation_enabled: |
547 |
| - update_expression += ", #validation_key = :validation_key" |
548 |
| - expression_attr_values[":validation_key"] = data_record.payload_hash |
549 |
| - expression_attr_names["#validation_key"] = self.validation_key_attr |
550 |
| - |
551 |
| - self.table.update_item( |
552 |
| - Key={self.key_attr: data_record.idempotency_key}, |
553 |
| - UpdateExpression=update_expression, |
554 |
| - ExpressionAttributeValues=expression_attr_values, |
555 |
| - ExpressionAttributeNames=expression_attr_names, |
556 |
| - ) |
557 |
| - |
558 |
| - def _delete_record(self, data_record: DataRecord) -> None: |
559 |
| - logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") |
560 |
| - self.table.delete_item(Key={self.key_attr: data_record.idempotency_key},) |
0 commit comments