diff --git a/pom.xml b/pom.xml index 06bc0f79..271fc394 100644 --- a/pom.xml +++ b/pom.xml @@ -5,11 +5,47 @@ aws-s3-lambda-dynamodb-batch-loader 1.0.1 aws-s3-lambda-dynamodb-csv-loader + + + + software.amazon.awssdk + bom + 2.15.67 + pom + import + + + - com.amazonaws - aws-lambda-java-core - 1.2.1 + com.google.guava + guava + 30.1-jre + + + software.amazon.awssdk + url-connection-client + 2.1.0 + + + software.amazon.awssdk + s3 + + + software.amazon.awssdk + dynamodb + + + software.amazon.awssdk + utils + + + software.amazon.awssdk + regions + + + software.amazon.awssdk + lambda com.amazonaws @@ -18,21 +54,35 @@ com.amazonaws - aws-java-sdk-dynamodb - 1.11.933 + aws-lambda-java-events-sdk-transformer + 3.0.2 - net.sf.opencsv - opencsv - 2.3 + com.amazonaws + aws-lambda-java-log4j2 + 1.2.0 - com.google.guava - guava - 30.1-jre + com.amazonaws + aws-lambda-java-runtime-interface-client + 1.0.0 + + + com.amazonaws + aws-lambda-java-core + 1.2.1 + + + com.jayway.jsonpath + json-path + 2.5.0 + + + com.googlecode.json-simple + json-simple + 1.1.1 - @@ -75,8 +125,8 @@ maven-compiler-plugin 3.8.1 - 8 - 8 + 11 + 11 diff --git a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java index 18454d05..32fc2feb 100644 --- a/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java +++ b/src/main/java/com/github/kernelpanicaus/ddb_loader/LambdaFunctionHandler.java @@ -1,125 +1,232 @@ package com.github.kernelpanicaus.ddb_loader; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; -import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.TableWriteItems; -import com.amazonaws.services.dynamodbv2.model.WriteRequest; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.S3Event; -import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.google.common.collect.Lists; - +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import com.jayway.jsonpath.JsonPath; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.utils.IoUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Scanner; import java.util.zip.GZIPInputStream; - /** * The Class LambdaFunctionHandler. * This application loads GZIPped Dynamo JSON file to DynamoDB. */ -public class LambdaFunctionHandler implements RequestHandler { +public class LambdaFunctionHandler implements RequestStreamHandler { /** * Provide the AWS region which your DynamoDB table is hosted. */ - static final String AWS_REGION = System.getenv("AWS_REGION"); + private static final Region AWS_REGION = Region.of(System.getenv("AWS_REGION")); /** * The DynamoDB table name. */ - static final String DYNAMO_TABLE_NAME = System.getenv("DYNAMO_TABLE_NAME"); + // TODO: Make this dynamic, from the S3 event. + private static final String DYNAMO_TABLE_NAME = System.getenv("DYNAMO_TABLE_NAME"); /** * Configurable batch size */ - static final int BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("BATCH_SIZE","25")); - - static final ClientConfiguration config = new ClientConfiguration() - .withMaxConnections(ClientConfiguration.DEFAULT_MAX_CONNECTIONS * 2); - - final AmazonS3 s3Client = AmazonS3ClientBuilder - .standard() - .withClientConfiguration(config) - .withRegion(AWS_REGION) + private static final int BATCH_SIZE = Integer.parseInt(System.getenv().getOrDefault("BATCH_SIZE", "25")); +// +// static final ClientConfiguration config = new ClientConfiguration() +// .withMaxConnections(ClientConfiguration.DEFAULT_MAX_CONNECTIONS * 2); + + private static final S3Client s3Client = S3Client.builder() + .region(AWS_REGION) + .httpClientBuilder(UrlConnectionHttpClient.builder()) .build(); - final AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder - .standard() - .withClientConfiguration(config) - .withRegion(AWS_REGION) + private static final DynamoDbClient dynamoDBClient = DynamoDbClient.builder() + .region(AWS_REGION) + .httpClientBuilder(UrlConnectionHttpClient.builder()) .build(); - final DynamoDB dynamoDB = new DynamoDB(dynamoDBClient); + public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) { + // TODO: Add sanity check for dynamo table + // TODO: Add dead letter queue for failures - public Report handleRequest(S3Event s3event, Context context) { long startTime = System.currentTimeMillis(); Report statusReport = new Report(); LambdaLogger logger = context.getLogger(); - - logger.log("Lambda Function Started"); + String event = null; + try { + event = IoUtils.toUtf8String(inputStream); + } catch (IOException e) { + e.printStackTrace(); + } + logger.log(event + "\n"); + logger.log("Lambda Function Started \n"); try { - S3EventNotification.S3EventNotificationRecord record = s3event.getRecords().get(0); - String srcBucket = record.getS3().getBucket().getName(); - String srcKey = record.getS3().getObject().getUrlDecodedKey(); - S3Object s3Object = s3Client.getObject(new GetObjectRequest(srcBucket, srcKey)); - statusReport.setFileSize(s3Object.getObjectMetadata().getContentLength()); + String srcKey = JsonPath.read(event, "$.Records[0].s3.object.key"); + String srcBucket = JsonPath.read(event, "$.Records[0].s3.bucket.name"); - logger.log("S3 Event Received: " + srcBucket + "/" + srcKey); + logger.log("Bucket name: " + srcBucket + "\n"); + logger.log("Key name: " + srcKey + "\n"); + logger.log("S3 Object: " + srcBucket + "/" + srcKey + "\n"); - GZIPInputStream gis = new GZIPInputStream(s3Object.getObjectContent()); - Scanner fileIn = new Scanner(gis); + logger.log("S3 Event Received: " + srcBucket + "/" + srcKey + "\n"); + + ResponseInputStream responseInputStream = getS3ClientObject(srcKey, srcBucket); - TableWriteItems energyDataTableWriteItems = new TableWriteItems(DYNAMO_TABLE_NAME); + logger.log("Reading input stream \n"); + + GZIPInputStream gis = new GZIPInputStream(responseInputStream); + Scanner fileIn = new Scanner(gis); + var parser = new JSONParser(); - List itemList = new ArrayList(); + int counter = 0; + int batchCounter = 0; + List itemList = new ArrayList<>(); while (fileIn.hasNext()) { - Item item = Item.fromJSON(fileIn.nextLine()); - itemList.add(item); - } - for (List partition : Lists.partition(itemList, BATCH_SIZE)) { - energyDataTableWriteItems.withItemsToPut(partition); - BatchWriteItemOutcome outcome = dynamoDB.batchWriteItem(energyDataTableWriteItems); + JSONObject jsonItem = getWriteItemRequest(fileIn, parser); + + itemList.add(getWriteItemRequest(jsonItem)); + + logger.log("[" + batchCounter + "/" + counter + "] Adding item to itemlist \n"); + counter++; - do { + if (counter == BATCH_SIZE) { - Map> unprocessedItems = outcome.getUnprocessedItems(); + logger.log("Sending Batch " + batchCounter + " \n"); + BatchWriteItemResponse outcome = getBatchWriteItemResponse(Map.of(DYNAMO_TABLE_NAME, itemList)); - if (outcome.getUnprocessedItems().size() > 0) { - logger.log("Retrieving the unprocessed " + outcome.getUnprocessedItems().size() + " items."); - outcome = dynamoDB.batchWriteItemUnprocessed(unprocessedItems); - } + do { + BatchWriteItemRequest unprocessedItemsRequest = getBatchWriteItemRequest(outcome.unprocessedItems()); - } while (outcome.getUnprocessedItems().size() > 0); + if (outcome.unprocessedItems().size() > 0) { + logger.log("Retrieving the unprocessed " + outcome.unprocessedItems().size() + " items, batch [" + batchCounter + "]."); + outcome = batchWrite(unprocessedItemsRequest); + } + + } while (outcome.unprocessedItems().size() > 0); + itemList.clear(); + batchCounter++; + counter = 0; + } } logger.log("Load finish in " + (System.currentTimeMillis() - startTime) + "ms"); fileIn.close(); gis.close(); - s3Object.close(); statusReport.setStatus(true); } catch (Exception ex) { logger.log(ex.getMessage()); } - statusReport.setExecutiongTime(System.currentTimeMillis() - startTime); - return statusReport; + } + + /** + *

+ * Builds and returns a WriteRequest for JSON object. + *

+ * + * @param jsonItem + * @return WriteRequest object + */ + private WriteRequest getWriteItemRequest(JSONObject jsonItem) { + return WriteRequest.builder() + .putRequest(PutRequest.builder().item(jsonItem).build()) + .build(); + } + + /** + *

+ * Takes a Map of Dynamo Table name to a List of Write requests and executes a bulk write. + *

+ * + * @param items Mapping of table name to collection of write requests. + * @return BatchWriteItemResponse + */ + private BatchWriteItemResponse getBatchWriteItemResponse(Map> items) { + return batchWrite(getBatchWriteItemRequest(items)); + } + + /** + *

+ * Builds and return a BatchWriteItemRequest object. + *

+ * + * @param items Mapping of table name to collection of write requests. + * @return BatchWriteItemRequest + */ + private BatchWriteItemRequest getBatchWriteItemRequest(Map> items) { + return BatchWriteItemRequest.builder() + .requestItems(items) + .build(); + } + + /** + *

+ * Executes BatchWriteItem operation against a Dynamo Table. + *

+ * + * @param `batchItemRequest` + * @return BatchWriteItemResponse + */ + private BatchWriteItemResponse batchWrite(BatchWriteItemRequest batchItemRequest) { + return dynamoDBClient.batchWriteItem(batchItemRequest); + } + + /** + *

+ * Returns inner JSON object. + *

+ * + * @param fileIn + * @param parser + * @return JSONObject + * @throws ParseException + */ + private JSONObject getWriteItemRequest(Scanner fileIn, JSONParser parser) throws ParseException { + var line = fileIn.nextLine(); + JSONObject jsonLine = (JSONObject) parser.parse(line); + return (JSONObject) jsonLine.get("Item"); + } + + /** + *

+ * getS3ClientObject and returns an S3 Object as a stream. + *

+ * + * @param srcKey + * @param srcBucket + * @return + */ + private ResponseInputStream getS3ClientObject(String srcKey, String srcBucket) { + GetObjectRequest objectRequest = GetObjectRequest.builder() + .bucket(srcBucket) + .key(srcKey) + .build(); + + return s3Client.getObject(objectRequest, ResponseTransformer.toInputStream()); } }