From 9cb346fa9839c61034abcdef994ad41902ae747f Mon Sep 17 00:00:00 2001 From: Thomas Khalil Date: Thu, 21 Jan 2021 21:29:38 +0100 Subject: [PATCH 1/4] it works --- pom.xml | 107 ++++++++-- .../ddb_loader/LambdaFunctionHandler.java | 191 ++++++++++++------ 2 files changed, 223 insertions(+), 75 deletions(-) diff --git a/pom.xml b/pom.xml index 06bc0f79..3d445922 100644 --- a/pom.xml +++ b/pom.xml @@ -5,11 +5,70 @@ aws-s3-lambda-dynamodb-batch-loader 1.0.1 aws-s3-lambda-dynamodb-csv-loader + + + + software.amazon.awssdk + bom + 2.15.67 + pom + import + + + + + + + + + + + + + + + + + + - com.amazonaws - aws-lambda-java-core - 1.2.1 + com.google.guava + guava + 30.1-jre + + + software.amazon.awssdk + url-connection-client + 2.1.0 + + + software.amazon.awssdk + s3 + + + software.amazon.awssdk + kms + + + software.amazon.awssdk + dynamodb + + + software.amazon.awssdk + s3control + + + software.amazon.awssdk + utils + + + software.amazon.awssdk + regions + + + software.amazon.awssdk + lambda com.amazonaws @@ -18,21 +77,41 @@ com.amazonaws - aws-java-sdk-dynamodb - 1.11.933 + aws-lambda-java-events-sdk-transformer + 3.0.2 - net.sf.opencsv - opencsv - 2.3 + com.amazonaws + aws-lambda-java-log4j2 + 1.2.0 - com.google.guava - guava - 30.1-jre + com.amazonaws + aws-lambda-java-runtime-interface-client + 1.0.0 - + + com.amazonaws + aws-lambda-java-core + 1.2.1 + + + com.jayway.jsonpath + json-path + 2.5.0 + + + software.amazon.awssdk + dynamodb-enhanced + 2.15.66 + + + com.googlecode.json-simple + json-simple + 1.1.1 + + @@ -75,8 +154,8 @@ maven-compiler-plugin 3.8.1 - 8 - 8 + 11 + 11 diff --git a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java index 18454d05..78c66baa 100644 --- a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java +++ b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java @@ -1,118 +1,188 @@ package com.github.kernelpanicaus.ddb_loader; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.TableWriteItems; -import com.amazonaws.services.dynamodbv2.model.WriteRequest; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.S3Event; -import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.google.common.collect.Lists; - +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import com.jayway.jsonpath.JsonPath; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.utils.IoUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Scanner; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; - /** * The Class LambdaFunctionHandler. * This application loads GZIPped Dynamo JSON file to DynamoDB. */ -public class LambdaFunctionHandler implements RequestHandler { +public class LambdaFunctionHandler implements RequestStreamHandler { /** * Provide the AWS region which your DynamoDB table is hosted. */ - static final String AWS_REGION = System.getenv("AWS_REGION"); + static final Region AWS_REGION = Region.of(System.getenv("AWS_REGION")); /** * The DynamoDB table name. */ + // TODO: Make this dynamic, from the S3 event. static final String DYNAMO_TABLE_NAME = System.getenv("DYNAMO_TABLE_NAME"); /** * Configurable batch size */ - static final int BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("BATCH_SIZE","25")); - - static final ClientConfiguration config = new ClientConfiguration() - .withMaxConnections(ClientConfiguration.DEFAULT_MAX_CONNECTIONS * 2); + static final int BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("BATCH_SIZE", "25")); +// +// static final ClientConfiguration config = new ClientConfiguration() +// .withMaxConnections(ClientConfiguration.DEFAULT_MAX_CONNECTIONS * 2); - final AmazonS3 s3Client = AmazonS3ClientBuilder - .standard() - .withClientConfiguration(config) - .withRegion(AWS_REGION) + final S3Client s3Client = S3Client.builder() + .region(AWS_REGION) .build(); - final AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder - .standard() - .withClientConfiguration(config) - .withRegion(AWS_REGION) + final DynamoDbClient dynamoDBClient = DynamoDbClient.builder() + .region(AWS_REGION) .build(); - final DynamoDB dynamoDB = new DynamoDB(dynamoDBClient); + static AttributeValue toAttributeValue(Object value) { + if (value == null) return AttributeValue.builder().nul(true).build(); + if (value instanceof AttributeValue) return (AttributeValue) value; + if (value instanceof String) return AttributeValue.builder().s((String) value).build(); + if (value instanceof Number) return AttributeValue.builder().n(value.toString()).build(); + + if (value instanceof Map) return AttributeValue.builder().m( + ((Map) value).entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + e -> toAttributeValue(e.getValue()) + ))).build(); + + throw new UnsupportedOperationException("Time to impl new path for " + value); + } + + public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) { + // TODO: Add sanity check for dynamo table + // TODO: Add dead letter queue for failures - public Report handleRequest(S3Event s3event, Context context) { long startTime = System.currentTimeMillis(); Report statusReport = new Report(); LambdaLogger logger = context.getLogger(); - - logger.log("Lambda Function Started"); + String event = null; + try { + event = IoUtils.toUtf8String(inputStream); + } catch (IOException e) { + e.printStackTrace(); + } + logger.log(event + "\n"); + logger.log("Lambda Function Started \n"); try { - S3EventNotification.S3EventNotificationRecord record = s3event.getRecords().get(0); - String srcBucket = record.getS3().getBucket().getName(); - String srcKey = record.getS3().getObject().getUrlDecodedKey(); - S3Object s3Object = s3Client.getObject(new GetObjectRequest(srcBucket, srcKey)); - statusReport.setFileSize(s3Object.getObjectMetadata().getContentLength()); + String srcKey = JsonPath.read(event, "$.Records[0].s3.object.key"); + String srcBucket = JsonPath.read(event, "$.Records[0].s3.bucket.name"); - logger.log("S3 Event Received: " + srcBucket + "/" + srcKey); - GZIPInputStream gis = new GZIPInputStream(s3Object.getObjectContent()); - Scanner fileIn = new Scanner(gis); + logger.log("Bucket name: " + srcBucket + "\n"); + logger.log("Key name: " + srcKey + "\n"); + logger.log("S3 Object: " + srcBucket + "/" + srcKey + "\n"); - TableWriteItems energyDataTableWriteItems = new TableWriteItems(DYNAMO_TABLE_NAME); + logger.log("S3 Event Received: " + srcBucket + "/" + srcKey + "\n"); - List itemList = new ArrayList(); + ResponseInputStream responseInputStream = s3Client.getObject( + GetObjectRequest.builder() + .bucket(srcBucket) + .key(srcKey) + .build() + , ResponseTransformer.toInputStream() + ); + logger.log("Reading input stream \n"); + + GZIPInputStream gis = new GZIPInputStream(responseInputStream); + Scanner fileIn = new Scanner(gis); + var parser = new JSONParser(); + + Collection itemList = new ArrayList<>(); + + int counter = 0; + int batchCounter = 0; while (fileIn.hasNext()) { - Item item = Item.fromJSON(fileIn.nextLine()); + var line = fileIn.nextLine(); +// logger.log(line + "\n"); + JSONObject jsonLine = (JSONObject) parser.parse(line); + JSONObject jsonItem = (JSONObject) jsonLine.get("Item"); +// var item = jsonLine.getJSONObject("Item"); + +// jsonLine.get("Item") +// HashMap result = (HashMap) JacksonUtils.fromJsonString(line, HashMap.class).get("Item") +// .entrySet() +// .stream() +// .collect(Collectors.toMap( +// entry -> entry.toString(), +// entry -> entry +// )); + +// logger.log(jsonItem.toString() + "\n"); +// logger.log(jsonItem.getClass().toString() + "\n"); + + WriteRequest item = WriteRequest.builder() + .putRequest(PutRequest.builder().item(jsonItem).build()) + .build(); + itemList.add(item); - } - for (List partition : Lists.partition(itemList, BATCH_SIZE)) { - energyDataTableWriteItems.withItemsToPut(partition); - BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(energyDataTableWriteItems); + logger.log("[" + batchCounter + "/" + counter + "] Adding item to itemlist \n"); + counter++; + + if (counter == BATCH_SIZE) { + batchCounter++; + + var batchItemRequest = BatchWriteItemRequest.builder() + .requestItems(Map.of(DYNAMO_TABLE_NAME, itemList)) + .build(); - do { + logger.log("Sending Batch "+ batchCounter +" \n"); +// logger.log(batchItemRequest.toString() + "\n"); - Map> unprocessedItems = outcome.getUnprocessedItems(); + var outcome = dynamoDBClient.batchWriteItem(batchItemRequest); - if (outcome.getUnprocessedItems().size() > 0) { - logger.log("Retrieving the unprocessed " + outcome.getUnprocessedItems().size() + " items."); - outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems); - } + do { + var unprocessedItemsRequest = BatchWriteItemRequest.builder() + .requestItems(outcome.unprocessedItems()) + .build(); - } while (outcome.getUnprocessedItems().size() > 0); + if (outcome.unprocessedItems().size() > 0) { + logger.log("Retrieving the unprocessed " + outcome.unprocessedItems().size() + " items, batch ["+ batchCounter+"]."); + outcome = dynamoDBClient.batchWriteItem(unprocessedItemsRequest); + } + + } while (outcome.unprocessedItems().size() > 0); + itemList.clear(); + counter = 0; + } } + logger.log("Load finish in " + (System.currentTimeMillis() - startTime) + "ms"); fileIn.close(); gis.close(); - s3Object.close(); statusReport.setStatus(true); } catch (Exception ex) { @@ -120,6 +190,5 @@ public Report handleRequest(S3Event s3event, Context context) { } statusReport.setExecutiongTime(System.currentTimeMillis() - startTime); - return statusReport; } } From fe600dd54be2156bf9936b8fb38a853ffdde3e25 Mon Sep 17 00:00:00 2001 From: Thomas Khalil Date: Fri, 22 Jan 2021 10:13:24 +0100 Subject: [PATCH 2/4] cleanup comments --- .../ddb_loader/LambdaFunctionHandler.java | 37 +------------------ 1 file changed, 2 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java index 78c66baa..3835d4cf 100644 --- a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java +++ b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java @@ -10,7 +10,6 @@ import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; -import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.WriteRequest; @@ -26,7 +25,6 @@ import java.util.Collection; import java.util.Map; import java.util.Scanner; -import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; /** @@ -62,21 +60,6 @@ public class LambdaFunctionHandler implements RequestStreamHandler { .region(AWS_REGION) .build(); - static AttributeValue toAttributeValue(Object value) { - if (value == null) return AttributeValue.builder().nul(true).build(); - if (value instanceof AttributeValue) return (AttributeValue) value; - if (value instanceof String) return AttributeValue.builder().s((String) value).build(); - if (value instanceof Number) return AttributeValue.builder().n(value.toString()).build(); - - if (value instanceof Map) return AttributeValue.builder().m( - ((Map) value).entrySet().stream().collect(Collectors.toMap( - Map.Entry::getKey, - e -> toAttributeValue(e.getValue()) - ))).build(); - - throw new UnsupportedOperationException("Time to impl new path for " + value); - } - public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) { // TODO: Add sanity check for dynamo table // TODO: Add dead letter queue for failures @@ -125,22 +108,8 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co int batchCounter = 0; while (fileIn.hasNext()) { var line = fileIn.nextLine(); -// logger.log(line + "\n"); JSONObject jsonLine = (JSONObject) parser.parse(line); JSONObject jsonItem = (JSONObject) jsonLine.get("Item"); -// var item = jsonLine.getJSONObject("Item"); - -// jsonLine.get("Item") -// HashMap result = (HashMap) JacksonUtils.fromJsonString(line, HashMap.class).get("Item") -// .entrySet() -// .stream() -// .collect(Collectors.toMap( -// entry -> entry.toString(), -// entry -> entry -// )); - -// logger.log(jsonItem.toString() + "\n"); -// logger.log(jsonItem.getClass().toString() + "\n"); WriteRequest item = WriteRequest.builder() .putRequest(PutRequest.builder().item(jsonItem).build()) @@ -158,8 +127,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co .requestItems(Map.of(DYNAMO_TABLE_NAME, itemList)) .build(); - logger.log("Sending Batch "+ batchCounter +" \n"); -// logger.log(batchItemRequest.toString() + "\n"); + logger.log("Sending Batch " + batchCounter + " \n"); var outcome = dynamoDBClient.batchWriteItem(batchItemRequest); @@ -169,7 +137,7 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co .build(); if (outcome.unprocessedItems().size() > 0) { - logger.log("Retrieving the unprocessed " + outcome.unprocessedItems().size() + " items, batch ["+ batchCounter+"]."); + logger.log("Retrieving the unprocessed " + outcome.unprocessedItems().size() + " items, batch [" + batchCounter + "]."); outcome = dynamoDBClient.batchWriteItem(unprocessedItemsRequest); } @@ -188,7 +156,6 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co } catch (Exception ex) { logger.log(ex.getMessage()); } - statusReport.setExecutiongTime(System.currentTimeMillis() - startTime); } } From 3c9f7196ed95aa7f27079bd3f09e372aeea80daa Mon Sep 17 00:00:00 2001 From: Thomas Khalil Date: Fri, 22 Jan 2021 12:55:53 +0100 Subject: [PATCH 3/4] Refactor handler into smaller methods --- .../ddb_loader/LambdaFunctionHandler.java | 141 +++++++++++++----- 1 file changed, 106 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java index 3835d4cf..32fc2feb 100644 --- a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java +++ b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java @@ -6,11 +6,14 @@ import com.jayway.jsonpath.JsonPath; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse; import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import software.amazon.awssdk.services.s3.S3Client; @@ -22,7 +25,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.zip.GZIPInputStream; @@ -36,28 +39,30 @@ public class LambdaFunctionHandler implements RequestStreamHandler { /** * Provide the AWS region which your DynamoDB table is hosted. */ - static final Region AWS_REGION = Region.of(System.getenv("AWS_REGION")); + private static final Region AWS_REGION = Region.of(System.getenv("AWS_REGION")); /** * The DynamoDB table name. */ // TODO: Make this dynamic, from the S3 event. - static final String DYNAMO_TABLE_NAME = System.getenv("DYNAMO_TABLE_NAME"); + private static final String DYNAMO_TABLE_NAME = System.getenv("DYNAMO_TABLE_NAME"); /** * Configurable batch size */ - static final int BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("BATCH_SIZE", "25")); + private static final int BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("BATCH_SIZE", "25")); // // static final ClientConfiguration config = new ClientConfiguration() // .withMaxConnections(ClientConfiguration.DEFAULT_MAX_CONNECTIONS * 2); - final S3Client s3Client = S3Client.builder() + private static final S3Client s3Client = S3Client.builder() .region(AWS_REGION) + .httpClientBuilder(UrlConnectionHttpClient.builder()) .build(); - final DynamoDbClient dynamoDBClient = DynamoDbClient.builder() + private static final DynamoDbClient dynamoDBClient = DynamoDbClient.builder() .region(AWS_REGION) + .httpClientBuilder(UrlConnectionHttpClient.builder()) .build(); public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) { @@ -81,20 +86,13 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co String srcKey = JsonPath.read(event, "$.Records[0].s3.object.key"); String srcBucket = JsonPath.read(event, "$.Records[0].s3.bucket.name"); - logger.log("Bucket name: " + srcBucket + "\n"); logger.log("Key name: " + srcKey + "\n"); logger.log("S3 Object: " + srcBucket + "/" + srcKey + "\n"); logger.log("S3 Event Received: " + srcBucket + "/" + srcKey + "\n"); - ResponseInputStream responseInputStream = s3Client.getObject( - GetObjectRequest.builder() - .bucket(srcBucket) - .key(srcKey) - .build() - , ResponseTransformer.toInputStream() - ); + ResponseInputStream responseInputStream = getS3ClientObject(srcKey, srcBucket); logger.log("Reading input stream \n"); @@ -102,52 +100,39 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co Scanner fileIn = new Scanner(gis); var parser = new JSONParser(); - Collection itemList = new ArrayList<>(); - int counter = 0; int batchCounter = 0; + List itemList = new ArrayList<>(); + while (fileIn.hasNext()) { - var line = fileIn.nextLine(); - JSONObject jsonLine = (JSONObject) parser.parse(line); - JSONObject jsonItem = (JSONObject) jsonLine.get("Item"); - WriteRequest item = WriteRequest.builder() - .putRequest(PutRequest.builder().item(jsonItem).build()) - .build(); + JSONObject jsonItem = getWriteItemRequest(fileIn, parser); - itemList.add(item); + itemList.add(getWriteItemRequest(jsonItem)); logger.log("[" + batchCounter + "/" + counter + "] Adding item to itemlist \n"); counter++; if (counter == BATCH_SIZE) { - batchCounter++; - - var batchItemRequest = BatchWriteItemRequest.builder() - .requestItems(Map.of(DYNAMO_TABLE_NAME, itemList)) - .build(); logger.log("Sending Batch " + batchCounter + " \n"); - - var outcome = dynamoDBClient.batchWriteItem(batchItemRequest); + BatchWriteItemResponse outcome = getBatchWriteItemResponse(Map.of(DYNAMO_TABLE_NAME, itemList)); do { - var unprocessedItemsRequest = BatchWriteItemRequest.builder() - .requestItems(outcome.unprocessedItems()) - .build(); + BatchWriteItemRequest unprocessedItemsRequest = getBatchWriteItemRequest(outcome.unprocessedItems()); if (outcome.unprocessedItems().size() > 0) { logger.log("Retrieving the unprocessed " + outcome.unprocessedItems().size() + " items, batch [" + batchCounter + "]."); - outcome = dynamoDBClient.batchWriteItem(unprocessedItemsRequest); + outcome = batchWrite(unprocessedItemsRequest); } } while (outcome.unprocessedItems().size() > 0); itemList.clear(); + batchCounter++; counter = 0; } } - logger.log("Load finish in " + (System.currentTimeMillis() - startTime) + "ms"); fileIn.close(); gis.close(); @@ -158,4 +143,90 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co } statusReport.setExecutiongTime(System.currentTimeMillis() - startTime); } + + /** + *

+ * Builds and returns a WriteRequest for JSON object. + *

+ * + * @param jsonItem + * @return WriteRequest object + */ + private WriteRequest getWriteItemRequest(JSONObject jsonItem) { + return WriteRequest.builder() + .putRequest(PutRequest.builder().item(jsonItem).build()) + .build(); + } + + /** + *

+ * Takes a Map of Dynamo Table name to a List of Write requests and executes a bulk write. + *

+ * + * @param items Mapping of table name to collection of write requests. + * @return BatchWriteItemResponse + */ + private BatchWriteItemResponse getBatchWriteItemResponse(Map> items) { + return batchWrite(getBatchWriteItemRequest(items)); + } + + /** + *

+ * Builds and return a BatchWriteItemRequest object. + *

+ * + * @param items Mapping of table name to collection of write requests. + * @return BatchWriteItemRequest + */ + private BatchWriteItemRequest getBatchWriteItemRequest(Map> items) { + return BatchWriteItemRequest.builder() + .requestItems(items) + .build(); + } + + /** + *

+ * Executes BatchWriteItem operation against a Dynamo Table. + *

+ * + * @param `batchItemRequest` + * @return BatchWriteItemResponse + */ + private BatchWriteItemResponse batchWrite(BatchWriteItemRequest batchItemRequest) { + return dynamoDBClient.batchWriteItem(batchItemRequest); + } + + /** + *

+ * Returns inner JSON object. + *

+ * + * @param fileIn + * @param parser + * @return JSONObject + * @throws ParseException + */ + private JSONObject getWriteItemRequest(Scanner fileIn, JSONParser parser) throws ParseException { + var line = fileIn.nextLine(); + JSONObject jsonLine = (JSONObject) parser.parse(line); + return (JSONObject) jsonLine.get("Item"); + } + + /** + *

+ * getS3ClientObject and returns an S3 Object as a stream. + *

+ * + * @param srcKey + * @param srcBucket + * @return + */ + private ResponseInputStream getS3ClientObject(String srcKey, String srcBucket) { + GetObjectRequest objectRequest = GetObjectRequest.builder() + .bucket(srcBucket) + .key(srcKey) + .build(); + + return s3Client.getObject(objectRequest, ResponseTransformer.toInputStream()); + } } From 5e1c196df68ca30054eda592d082407ad2cd6a13 Mon Sep 17 00:00:00 2001 From: Thomas Khalil Date: Fri, 22 Jan 2021 13:02:27 +0100 Subject: [PATCH 4/4] cleanup dependencies --- pom.xml | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/pom.xml b/pom.xml index 3d445922..271fc394 100644 --- a/pom.xml +++ b/pom.xml @@ -17,21 +17,6 @@ - - - - - - - - - - - - - - - com.google.guava guava @@ -46,18 +31,10 @@ software.amazon.awssdk s3 - - software.amazon.awssdk - kms - software.amazon.awssdk dynamodb - - software.amazon.awssdk - s3control - software.amazon.awssdk utils @@ -100,12 +77,6 @@ json-path 2.5.0 - - software.amazon.awssdk - dynamodb-enhanced - 2.15.66 - - com.googlecode.json-simple json-simple