1
- import boto3 , json
2
- from boto3 .dynamodb .conditions import Key
1
+ from __future__ import print_function # Python 2/3 compatibility
2
+ import boto3
3
+ from boto3 .dynamodb .conditions import Attr
4
+ from botocore .exceptions import ClientError
5
+
6
+ dynamodb = boto3 .resource ("dynamodb" , region_name = "us-west-2" )
7
+ table = dynamodb .Table ("RetailDatabase" )
8
+
9
+
10
+ def create_item_if_not_exist (created_at ):
11
+ try :
12
+ table .put_item (
13
+ Item = {
14
+
15
+ "sk" : "metadata" ,
16
+ "name" : "Jim Bob" ,
17
+ "first_name" : "Jim" ,
18
+ "last_name" : "Bob" ,
19
+ "created_at" : created_at ,
20
+ },
21
+ # If this item exists, then an item exists with this pk and/or sk so we should fail if
22
+ # we see a matching item with this pk.
23
+ ConditionExpression = Attr ("pk" ).not_exists (),
24
+ )
25
+ except dynamodb .meta .client .exceptions .ConditionalCheckFailedException :
26
+ print ("PutItem failed since the item already exists" )
27
+ return False
28
+
29
+ print ("PutItem succeeded" )
30
+ return True
31
+
32
+
33
+ def get_created_at (email ):
34
+ response = table .get_item (
35
+ Key = {"pk" : email , "sk" : "metadata" },
36
+ AttributesToGet = ["created_at" ],
37
+ )
38
+ return response .get ("Item" , {}).get ("created_at" )
39
+
40
+
41
+ print ("The first PutItem should succeed because the item does not exist" )
42
+ assert create_item_if_not_exist (100 )
43
+
44
+ print ("The same PutItem command should now fail because the item already exists" )
45
+ assert not create_item_if_not_exist (200 )
46
+
47
+ assert get_created_at (
"[email protected] " )
== 100
48
+ print ("As expected, The second PutItem command failed and the data was not overwritten" )
0 commit comments