diff --git a/pom.xml b/pom.xml
index 0cea9a4..459415d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
software.amazon.payloadoffloading
payloadoffloading-common
- 2.2.0
+ 2.2.1
jar
Payload offloading common library for AWS
Common library between extended Amazon AWS clients to save payloads up to 2GB on Amazon S3.
diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStore.java b/src/main/java/software/amazon/payloadoffloading/PayloadStore.java
index ac5235f..01c8bdd 100644
--- a/src/main/java/software/amazon/payloadoffloading/PayloadStore.java
+++ b/src/main/java/software/amazon/payloadoffloading/PayloadStore.java
@@ -1,5 +1,6 @@
package software.amazon.payloadoffloading;
+import java.util.Collection;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -60,4 +61,20 @@ public interface PayloadStore {
* a server side issue.
*/
void deleteOriginalPayload(String payloadPointer);
+
+ /**
+ * Deletes original payloads using the given payloadPointers. The pointers must
+ * have been obtained using {@link storeOriginalPayload}
+ *
+ * This call will be more efficient than deleting payloads one at a time if the payloads
+ * are in the same S3 bucket.
+ *
+ * @param payloadPointers
+ * @throws SdkClientException If any internal errors are encountered on the client side while
+ * attempting to make the request or handle the response to/from PayloadStore.
+ * For example, if payloadPointer is invalid or a network connection is not available.
+ * @throws S3Exception If an error response is returned by actual PayloadStore indicating
+ * a server side issue.
+ */
+ void deleteOriginalPayloads(Collection payloadPointers);
}
diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java
index 43590b4..a924815 100644
--- a/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java
+++ b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java
@@ -1,5 +1,6 @@
package software.amazon.payloadoffloading;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.S3Exception;
@@ -75,4 +76,24 @@ public interface PayloadStoreAsync {
* a server side issue.
*/
CompletableFuture deleteOriginalPayload(String payloadPointer);
+
+ /**
+ * Deletes the original payload using the given payloadPointer. The pointer must
+ * have been obtained using {@link #storeOriginalPayload(String)}
+ *
+ * This call will be more efficient than deleting payloads one at a time if the payloads
+ * are in the same S3 bucket.
+ *
+ * This call is asynchronous, and so documented return values and exceptions are propagated through
+ * the returned {@link CompletableFuture}.
+ *
+ * @param payloadPointers
+ * @return future value that completes when the delete operation finishes
+ * @throws SdkClientException If any internal errors are encountered on the client side while
+ * attempting to make the request or handle the response to/from PayloadStore.
+ * For example, if payloadPointer is invalid or a network connection is not available.
+ * @throws S3Exception If an error response is returned by actual PayloadStore indicating
+ * a server side issue.
+ */
+ CompletableFuture deleteOriginalPayloads(Collection payloadPointers);
}
diff --git a/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
index a0dc868..f1b958b 100644
--- a/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
+++ b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
@@ -1,8 +1,10 @@
package software.amazon.payloadoffloading;
import java.io.UncheckedIOException;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseBytes;
@@ -11,9 +13,12 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
/**
@@ -115,4 +120,33 @@ public CompletableFuture deletePayloadFromS3(String s3BucketName, String s
return null;
});
}
+
+ public CompletableFuture deletePayloadsFromS3(String s3BucketName, Collection s3Keys) {
+ DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
+ .bucket(s3BucketName)
+ .delete(Delete.builder()
+ .objects(s3Keys.stream()
+ .map(s3Key -> ObjectIdentifier.builder()
+ .key(s3Key)
+ .build())
+ .collect(Collectors.toList()))
+ .build())
+ .build();
+
+ return s3Client.deleteObjects(deleteObjectsRequest)
+ .handle((v, tIn) -> {
+ if (tIn != null) {
+ Throwable t = Util.unwrapFutureException(tIn);
+ if (t instanceof SdkException) {
+ String errorMessage = "Failed to delete the S3 object which contains the payload";
+ LOG.error(errorMessage, t);
+ throw SdkException.create(errorMessage, t);
+ }
+ throw new CompletionException(t);
+ }
+
+ LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + ".");
+ return null;
+ });
+ }
}
diff --git a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java
index 096f8b2..7dedc8d 100644
--- a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java
+++ b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java
@@ -1,5 +1,9 @@
package software.amazon.payloadoffloading;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,4 +60,20 @@ public void deleteOriginalPayload(String payloadPointer) {
String s3Key = s3Pointer.getS3Key();
s3Dao.deletePayloadFromS3(s3BucketName, s3Key);
}
+
+ @Override
+ public void deleteOriginalPayloads(Collection payloadPointers) {
+ // Sort by S3 bucket.
+ Map> offloadedMessages = payloadPointers.stream()
+ .map(PayloadS3Pointer::fromJson)
+ .collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName));
+
+ for (Map.Entry> bucket : offloadedMessages.entrySet()) {
+ String s3BucketName = bucket.getKey();
+ List s3Keys = bucket.getValue().stream()
+ .map(PayloadS3Pointer::getS3Key)
+ .collect(Collectors.toList());
+ s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys);
+ }
+ }
}
diff --git a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java
index 5b84ede..b4cb028 100644
--- a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java
+++ b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java
@@ -1,11 +1,15 @@
package software.amazon.payloadoffloading;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.payloadoffloading.PayloadS3Pointer;
/**
* S3 based implementation for PayloadStoreAsync.
@@ -74,4 +78,34 @@ public CompletableFuture deleteOriginalPayload(String payloadPointer) {
return futureEx;
}
}
+
+ @Override
+ public CompletableFuture deleteOriginalPayloads(Collection payloadPointers) {
+ // Skip the delete if there are no payloads to delete.
+ if (payloadPointers.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ try {
+ // Sort by S3 bucket.
+ Map> offloadedMessages = payloadPointers.stream()
+ .map(PayloadS3Pointer::fromJson)
+ .collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName));
+
+ List> deleteFutures = new ArrayList<>(offloadedMessages.size());
+ for (Map.Entry> bucket : offloadedMessages.entrySet()) {
+ String s3BucketName = bucket.getKey();
+ List s3Keys = bucket.getValue().stream()
+ .map(PayloadS3Pointer::getS3Key)
+ .collect(Collectors.toList());
+ deleteFutures.add(s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys));
+ }
+
+ return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
+ } catch (Exception e) {
+ CompletableFuture futureEx = new CompletableFuture<>();
+ futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
+ return futureEx;
+ }
+ }
}
diff --git a/src/main/java/software/amazon/payloadoffloading/S3Dao.java b/src/main/java/software/amazon/payloadoffloading/S3Dao.java
index 2b03dd5..5f18524 100644
--- a/src/main/java/software/amazon/payloadoffloading/S3Dao.java
+++ b/src/main/java/software/amazon/payloadoffloading/S3Dao.java
@@ -1,5 +1,7 @@
package software.amazon.payloadoffloading;
+import java.util.Collection;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseInputStream;
@@ -7,10 +9,13 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.utils.IoUtils;
@@ -104,4 +109,26 @@ public void deletePayloadFromS3(String s3BucketName, String s3Key) {
LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
}
+
+ public void deletePayloadsFromS3(String s3BucketName, Collection s3Keys) {
+ try {
+ DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder()
+ .bucket(s3BucketName)
+ .delete(Delete.builder()
+ .objects(s3Keys.stream()
+ .map(s3Key -> ObjectIdentifier.builder()
+ .key(s3Key)
+ .build())
+ .collect(Collectors.toList()))
+ .build())
+ .build();
+ s3Client.deleteObjects(deleteObjectsRequest);
+ } catch (SdkException e) {
+ String errorMessage = "Failed to delete the S3 object which contains the payload";
+ LOG.error(errorMessage, e);
+ throw SdkException.create(errorMessage, e);
+ }
+
+ LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + ".");
+ }
}
diff --git a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java
index 15d4c18..e68222b 100644
--- a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java
+++ b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java
@@ -12,6 +12,11 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.junit.jupiter.api.BeforeEach;
@@ -22,8 +27,10 @@
public class S3BackedPayloadStoreAsyncTest {
private static final String S3_BUCKET_NAME = "test-bucket-name";
+ private static final String OTHER_S3_BUCKET_NAME = "other-bucket-name";
private static final String ANY_PAYLOAD = "AnyPayload";
private static final String ANY_S3_KEY = "AnyS3key";
+ private static final String ANY_OTHER_S3_KEY = "AnyOtherS3key";
private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string";
private PayloadStoreAsync payloadStore;
private S3AsyncDao s3AsyncDao;
@@ -175,4 +182,70 @@ public void testDeleteOriginalPayloadIncorrectPointer() {
verifyNoInteractions(s3AsyncDao);
}
+ @Test
+ public void testDeleteOriginalPayloadsOnSuccess() {
+ when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
+
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadStore.deleteOriginalPayloads(payloadPointers).join();
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class);
+ verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getValue());
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadsSameBucket() {
+ when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
+
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
+ payloadStore.deleteOriginalPayloads(payloadPointers).join();
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class);
+ verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(Arrays.asList(ANY_S3_KEY, ANY_OTHER_S3_KEY), keyCaptor.getValue());
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadsDifferentBuckets() {
+ when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
+
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadPointers.add(new PayloadS3Pointer(OTHER_S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
+ payloadStore.deleteOriginalPayloads(payloadPointers).join();
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class);
+ verify(s3AsyncDao, times(2)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getAllValues().get(0));
+ assertEquals(Collections.singletonList(ANY_OTHER_S3_KEY), keyCaptor.getAllValues().get(1));
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(0));
+ assertEquals(OTHER_S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(1));
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadsIncorrectPointer() {
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadPointers.add("IncorrectPointer");
+
+ CompletionException exception = assertThrows(CompletionException.class, () -> {
+ payloadStore.deleteOriginalPayloads(payloadPointers).join();
+ });
+
+ assertTrue(exception.getMessage().contains(INCORRECT_POINTER_EXCEPTION_MSG));
+ verifyNoInteractions(s3AsyncDao);
+ }
+
}
diff --git a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java
index b40df6f..7e4b672 100644
--- a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java
+++ b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java
@@ -1,5 +1,10 @@
package software.amazon.payloadoffloading;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -21,8 +26,10 @@
public class S3BackedPayloadStoreTest {
private static final String S3_BUCKET_NAME = "test-bucket-name";
+ private static final String OTHER_S3_BUCKET_NAME = "other-bucket-name";
private static final String ANY_PAYLOAD = "AnyPayload";
private static final String ANY_S3_KEY = "AnyS3key";
+ private static final String ANY_OTHER_S3_KEY = "AnyOtherS3key";
private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string";
private PayloadStore payloadStore;
private S3Dao s3Dao;
@@ -153,4 +160,58 @@ public void testDeleteOriginalPayloadIncorrectPointer() {
INCORRECT_POINTER_EXCEPTION_MSG);
verifyNoInteractions(s3Dao);
}
-}
\ No newline at end of file
+ @Test
+ public void testDeleteOriginalPayloadsOnSuccess() {
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadStore.deleteOriginalPayloads(payloadPointers);
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class);
+ verify(s3Dao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getValue());
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadsSameBucket() {
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
+ payloadStore.deleteOriginalPayloads(payloadPointers);
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class);
+ verify(s3Dao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(Arrays.asList(ANY_S3_KEY, ANY_OTHER_S3_KEY), keyCaptor.getValue());
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadsDifferentBuckets() {
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadPointers.add(new PayloadS3Pointer(OTHER_S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson());
+ payloadStore.deleteOriginalPayloads(payloadPointers);
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class);
+ verify(s3Dao, times(2)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getAllValues().get(0));
+ assertEquals(Collections.singletonList(ANY_OTHER_S3_KEY), keyCaptor.getAllValues().get(1));
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(0));
+ assertEquals(OTHER_S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(1));
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadsIncorrectPointer() {
+ List payloadPointers = new ArrayList<>();
+ payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson());
+ payloadPointers.add("IncorrectPointer");
+ assertThrows(SdkClientException.class, () -> payloadStore.deleteOriginalPayloads(payloadPointers),
+ INCORRECT_POINTER_EXCEPTION_MSG);
+ }
+}