6
6
import com .jayway .jsonpath .JsonPath ;
7
7
import org .json .simple .JSONObject ;
8
8
import org .json .simple .parser .JSONParser ;
9
+ import org .json .simple .parser .ParseException ;
9
10
import software .amazon .awssdk .core .ResponseInputStream ;
10
11
import software .amazon .awssdk .core .sync .ResponseTransformer ;
12
+ import software .amazon .awssdk .http .urlconnection .UrlConnectionHttpClient ;
11
13
import software .amazon .awssdk .regions .Region ;
12
14
import software .amazon .awssdk .services .dynamodb .DynamoDbClient ;
13
15
import software .amazon .awssdk .services .dynamodb .model .BatchWriteItemRequest ;
16
+ import software .amazon .awssdk .services .dynamodb .model .BatchWriteItemResponse ;
14
17
import software .amazon .awssdk .services .dynamodb .model .PutRequest ;
15
18
import software .amazon .awssdk .services .dynamodb .model .WriteRequest ;
16
19
import software .amazon .awssdk .services .s3 .S3Client ;
22
25
import java .io .InputStream ;
23
26
import java .io .OutputStream ;
24
27
import java .util .ArrayList ;
25
- import java .util .Collection ;
28
+ import java .util .List ;
26
29
import java .util .Map ;
27
30
import java .util .Scanner ;
28
31
import java .util .zip .GZIPInputStream ;
@@ -36,28 +39,30 @@ public class LambdaFunctionHandler implements RequestStreamHandler {
36
39
/**
37
40
* Provide the AWS region which your DynamoDB table is hosted.
38
41
*/
39
- static final Region AWS_REGION = Region .of (System .getenv ("AWS_REGION" ));
42
+ private static final Region AWS_REGION = Region .of (System .getenv ("AWS_REGION" ));
40
43
41
44
/**
42
45
* The DynamoDB table name.
43
46
*/
44
47
// TODO: Make this dynamic, from the S3 event.
45
- static final String DYNAMO_TABLE_NAME = System .getenv ("DYNAMO_TABLE_NAME" );
48
+ private static final String DYNAMO_TABLE_NAME = System .getenv ("DYNAMO_TABLE_NAME" );
46
49
47
50
/**
48
51
* Configurable batch size
49
52
*/
50
- static final int BATCH_SIZE = Integer .parseInt (System .getenv ().getOrDefault ("BATCH_SIZE" , "25" ));
53
+ private static final int BATCH_SIZE = Integer .parseInt (System .getenv ().getOrDefault ("BATCH_SIZE" , "25" ));
51
54
//
52
55
// static final ClientConfiguration config = new ClientConfiguration()
53
56
// .withMaxConnections(ClientConfiguration.DEFAULT_MAX_CONNECTIONS * 2);
54
57
55
- final S3Client s3Client = S3Client .builder ()
58
+ private static final S3Client s3Client = S3Client .builder ()
56
59
.region (AWS_REGION )
60
+ .httpClientBuilder (UrlConnectionHttpClient .builder ())
57
61
.build ();
58
62
59
- final DynamoDbClient dynamoDBClient = DynamoDbClient .builder ()
63
+ private static final DynamoDbClient dynamoDBClient = DynamoDbClient .builder ()
60
64
.region (AWS_REGION )
65
+ .httpClientBuilder (UrlConnectionHttpClient .builder ())
61
66
.build ();
62
67
63
68
public void handleRequest (InputStream inputStream , OutputStream outputStream , Context context ) {
@@ -81,73 +86,53 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
81
86
String srcKey = JsonPath .read (event , "$.Records[0].s3.object.key" );
82
87
String srcBucket = JsonPath .read (event , "$.Records[0].s3.bucket.name" );
83
88
84
-
85
89
logger .log ("Bucket name: " + srcBucket + "\n " );
86
90
logger .log ("Key name: " + srcKey + "\n " );
87
91
logger .log ("S3 Object: " + srcBucket + "/" + srcKey + "\n " );
88
92
89
93
logger .log ("S3 Event Received: " + srcBucket + "/" + srcKey + "\n " );
90
94
91
- ResponseInputStream <GetObjectResponse > responseInputStream = s3Client .getObject (
92
- GetObjectRequest .builder ()
93
- .bucket (srcBucket )
94
- .key (srcKey )
95
- .build ()
96
- , ResponseTransformer .toInputStream ()
97
- );
95
+ ResponseInputStream <GetObjectResponse > responseInputStream = getS3ClientObject (srcKey , srcBucket );
98
96
99
97
logger .log ("Reading input stream \n " );
100
98
101
99
GZIPInputStream gis = new GZIPInputStream (responseInputStream );
102
100
Scanner fileIn = new Scanner (gis );
103
101
var parser = new JSONParser ();
104
102
105
- Collection <WriteRequest > itemList = new ArrayList <>();
106
-
107
103
int counter = 0 ;
108
104
int batchCounter = 0 ;
105
+ List <WriteRequest > itemList = new ArrayList <>();
106
+
109
107
while (fileIn .hasNext ()) {
110
- var line = fileIn .nextLine ();
111
- JSONObject jsonLine = (JSONObject ) parser .parse (line );
112
- JSONObject jsonItem = (JSONObject ) jsonLine .get ("Item" );
113
108
114
- WriteRequest item = WriteRequest .builder ()
115
- .putRequest (PutRequest .builder ().item (jsonItem ).build ())
116
- .build ();
109
+ JSONObject jsonItem = getWriteItemRequest (fileIn , parser );
117
110
118
- itemList .add (item );
111
+ itemList .add (getWriteItemRequest ( jsonItem ) );
119
112
120
113
logger .log ("[" + batchCounter + "/" + counter + "] Adding item to itemlist \n " );
121
114
counter ++;
122
115
123
116
if (counter == BATCH_SIZE ) {
124
- batchCounter ++;
125
-
126
- var batchItemRequest = BatchWriteItemRequest .builder ()
127
- .requestItems (Map .of (DYNAMO_TABLE_NAME , itemList ))
128
- .build ();
129
117
130
118
logger .log ("Sending Batch " + batchCounter + " \n " );
131
-
132
- var outcome = dynamoDBClient .batchWriteItem (batchItemRequest );
119
+ BatchWriteItemResponse outcome = getBatchWriteItemResponse (Map .of (DYNAMO_TABLE_NAME , itemList ));
133
120
134
121
do {
135
- var unprocessedItemsRequest = BatchWriteItemRequest .builder ()
136
- .requestItems (outcome .unprocessedItems ())
137
- .build ();
122
+ BatchWriteItemRequest unprocessedItemsRequest = getBatchWriteItemRequest (outcome .unprocessedItems ());
138
123
139
124
if (outcome .unprocessedItems ().size () > 0 ) {
140
125
logger .log ("Retrieving the unprocessed " + outcome .unprocessedItems ().size () + " items, batch [" + batchCounter + "]." );
141
- outcome = dynamoDBClient . batchWriteItem (unprocessedItemsRequest );
126
+ outcome = batchWrite (unprocessedItemsRequest );
142
127
}
143
128
144
129
} while (outcome .unprocessedItems ().size () > 0 );
145
130
itemList .clear ();
131
+ batchCounter ++;
146
132
counter = 0 ;
147
133
}
148
134
}
149
135
150
-
151
136
logger .log ("Load finish in " + (System .currentTimeMillis () - startTime ) + "ms" );
152
137
fileIn .close ();
153
138
gis .close ();
@@ -158,4 +143,90 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co
158
143
}
159
144
statusReport .setExecutiongTime (System .currentTimeMillis () - startTime );
160
145
}
146
+
147
+ /**
148
+ * <p>
149
+ * Builds and returns a <code>WriteRequest</code> for JSON object.
150
+ * </p>
151
+ *
152
+ * @param jsonItem
153
+ * @return WriteRequest object
154
+ */
155
+ private WriteRequest getWriteItemRequest (JSONObject jsonItem ) {
156
+ return WriteRequest .builder ()
157
+ .putRequest (PutRequest .builder ().item (jsonItem ).build ())
158
+ .build ();
159
+ }
160
+
161
+ /**
162
+ * <p>
163
+ * Takes a Map of Dynamo Table name to a List of Write requests and executes a bulk write.
164
+ * </p>
165
+ *
166
+ * @param items Mapping of table name to collection of write requests.
167
+ * @return BatchWriteItemResponse
168
+ */
169
+ private BatchWriteItemResponse getBatchWriteItemResponse (Map <String , List <WriteRequest >> items ) {
170
+ return batchWrite (getBatchWriteItemRequest (items ));
171
+ }
172
+
173
+ /**
174
+ * <p>
175
+ * Builds and return a BatchWriteItemRequest object.
176
+ * </p>
177
+ *
178
+ * @param items Mapping of table name to collection of write requests.
179
+ * @return BatchWriteItemRequest
180
+ */
181
+ private BatchWriteItemRequest getBatchWriteItemRequest (Map <String , List <WriteRequest >> items ) {
182
+ return BatchWriteItemRequest .builder ()
183
+ .requestItems (items )
184
+ .build ();
185
+ }
186
+
187
+ /**
188
+ * <p>
189
+ * Executes BatchWriteItem operation against a Dynamo Table.
190
+ * </p>
191
+ *
192
+ * @param `batchItemRequest`
193
+ * @return <code>BatchWriteItemResponse</code>
194
+ */
195
+ private BatchWriteItemResponse batchWrite (BatchWriteItemRequest batchItemRequest ) {
196
+ return dynamoDBClient .batchWriteItem (batchItemRequest );
197
+ }
198
+
199
+ /**
200
+ * <p>
201
+ * Returns inner JSON object.
202
+ * </p>
203
+ *
204
+ * @param fileIn
205
+ * @param parser
206
+ * @return <code>JSONObject</code>
207
+ * @throws ParseException
208
+ */
209
+ private JSONObject getWriteItemRequest (Scanner fileIn , JSONParser parser ) throws ParseException {
210
+ var line = fileIn .nextLine ();
211
+ JSONObject jsonLine = (JSONObject ) parser .parse (line );
212
+ return (JSONObject ) jsonLine .get ("Item" );
213
+ }
214
+
215
+ /**
216
+ * <p>
217
+ * getS3ClientObject and returns an S3 Object as a stream.
218
+ * </p>
219
+ *
220
+ * @param srcKey
221
+ * @param srcBucket
222
+ * @return
223
+ */
224
+ private ResponseInputStream <GetObjectResponse > getS3ClientObject (String srcKey , String srcBucket ) {
225
+ GetObjectRequest objectRequest = GetObjectRequest .builder ()
226
+ .bucket (srcBucket )
227
+ .key (srcKey )
228
+ .build ();
229
+
230
+ return s3Client .getObject (objectRequest , ResponseTransformer .toInputStream ());
231
+ }
161
232
}
0 commit comments