diff --git a/pom.xml b/pom.xml index 0cea9a4..459415d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ software.amazon.payloadoffloading payloadoffloading-common - 2.2.0 + 2.2.1 jar Payload offloading common library for AWS Common library between extended Amazon AWS clients to save payloads up to 2GB on Amazon S3. diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStore.java b/src/main/java/software/amazon/payloadoffloading/PayloadStore.java index ac5235f..01c8bdd 100644 --- a/src/main/java/software/amazon/payloadoffloading/PayloadStore.java +++ b/src/main/java/software/amazon/payloadoffloading/PayloadStore.java @@ -1,5 +1,6 @@ package software.amazon.payloadoffloading; +import java.util.Collection; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -60,4 +61,20 @@ public interface PayloadStore { * a server side issue. */ void deleteOriginalPayload(String payloadPointer); + + /** + * Deletes original payloads using the given payloadPointers. The pointers must + * have been obtained using {@link storeOriginalPayload} + *

+ * This call will be more efficient than deleting payloads one at a time if the payloads + * are in the same S3 bucket. + * + * @param payloadPointers + * @throws SdkClientException If any internal errors are encountered on the client side while + * attempting to make the request or handle the response to/from PayloadStore. + * For example, if payloadPointer is invalid or a network connection is not available. + * @throws S3Exception If an error response is returned by actual PayloadStore indicating + * a server side issue. + */ + void deleteOriginalPayloads(Collection payloadPointers); } diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java index 43590b4..a924815 100644 --- a/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java +++ b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java @@ -1,5 +1,6 @@ package software.amazon.payloadoffloading; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.model.S3Exception; @@ -75,4 +76,24 @@ public interface PayloadStoreAsync { * a server side issue. */ CompletableFuture deleteOriginalPayload(String payloadPointer); + + /** + * Deletes the original payload using the given payloadPointer. The pointer must + * have been obtained using {@link #storeOriginalPayload(String)} + *

+ * This call will be more efficient than deleting payloads one at a time if the payloads + * are in the same S3 bucket. + *

+ * This call is asynchronous, and so documented return values and exceptions are propagated through + * the returned {@link CompletableFuture}. + * + * @param payloadPointers + * @return future value that completes when the delete operation finishes + * @throws SdkClientException If any internal errors are encountered on the client side while + * attempting to make the request or handle the response to/from PayloadStore. + * For example, if payloadPointer is invalid or a network connection is not available. + * @throws S3Exception If an error response is returned by actual PayloadStore indicating + * a server side issue. + */ + CompletableFuture deleteOriginalPayloads(Collection payloadPointers); } diff --git a/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java index a0dc868..f1b958b 100644 --- a/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java +++ b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java @@ -1,8 +1,10 @@ package software.amazon.payloadoffloading; import java.io.UncheckedIOException; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.ResponseBytes; @@ -11,9 +13,12 @@ import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; /** @@ -115,4 +120,33 @@ public CompletableFuture deletePayloadFromS3(String s3BucketName, String s return null; }); } + + public CompletableFuture deletePayloadsFromS3(String s3BucketName, Collection s3Keys) { + DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder() + .bucket(s3BucketName) + .delete(Delete.builder() + .objects(s3Keys.stream() + .map(s3Key -> ObjectIdentifier.builder() + .key(s3Key) + .build()) + .collect(Collectors.toList())) + .build()) + .build(); + + return s3Client.deleteObjects(deleteObjectsRequest) + .handle((v, tIn) -> { + if (tIn != null) { + Throwable t = Util.unwrapFutureException(tIn); + if (t instanceof SdkException) { + String errorMessage = "Failed to delete the S3 object which contains the payload"; + LOG.error(errorMessage, t); + throw SdkException.create(errorMessage, t); + } + throw new CompletionException(t); + } + + LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + "."); + return null; + }); + } } diff --git a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java index 096f8b2..7dedc8d 100644 --- a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java +++ b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStore.java @@ -1,5 +1,9 @@ package software.amazon.payloadoffloading; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,4 +60,20 @@ public void deleteOriginalPayload(String payloadPointer) { String s3Key = s3Pointer.getS3Key(); s3Dao.deletePayloadFromS3(s3BucketName, s3Key); } + + @Override + public void deleteOriginalPayloads(Collection payloadPointers) { + // Sort by S3 bucket. + Map> offloadedMessages = payloadPointers.stream() + .map(PayloadS3Pointer::fromJson) + .collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName)); + + for (Map.Entry> bucket : offloadedMessages.entrySet()) { + String s3BucketName = bucket.getKey(); + List s3Keys = bucket.getValue().stream() + .map(PayloadS3Pointer::getS3Key) + .collect(Collectors.toList()); + s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys); + } + } } diff --git a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java index 5b84ede..b4cb028 100644 --- a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java +++ b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java @@ -1,11 +1,15 @@ package software.amazon.payloadoffloading; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.payloadoffloading.PayloadS3Pointer; /** * S3 based implementation for PayloadStoreAsync. @@ -74,4 +78,34 @@ public CompletableFuture deleteOriginalPayload(String payloadPointer) { return futureEx; } } + + @Override + public CompletableFuture deleteOriginalPayloads(Collection payloadPointers) { + // Skip the delete if there are no payloads to delete. + if (payloadPointers.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + try { + // Sort by S3 bucket. + Map> offloadedMessages = payloadPointers.stream() + .map(PayloadS3Pointer::fromJson) + .collect(Collectors.groupingBy(PayloadS3Pointer::getS3BucketName)); + + List> deleteFutures = new ArrayList<>(offloadedMessages.size()); + for (Map.Entry> bucket : offloadedMessages.entrySet()) { + String s3BucketName = bucket.getKey(); + List s3Keys = bucket.getValue().stream() + .map(PayloadS3Pointer::getS3Key) + .collect(Collectors.toList()); + deleteFutures.add(s3Dao.deletePayloadsFromS3(s3BucketName, s3Keys)); + } + + return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); + } catch (Exception e) { + CompletableFuture futureEx = new CompletableFuture<>(); + futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e)); + return futureEx; + } + } } diff --git a/src/main/java/software/amazon/payloadoffloading/S3Dao.java b/src/main/java/software/amazon/payloadoffloading/S3Dao.java index 2b03dd5..5f18524 100644 --- a/src/main/java/software/amazon/payloadoffloading/S3Dao.java +++ b/src/main/java/software/amazon/payloadoffloading/S3Dao.java @@ -1,5 +1,7 @@ package software.amazon.payloadoffloading; +import java.util.Collection; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.ResponseInputStream; @@ -7,10 +9,13 @@ import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.utils.IoUtils; @@ -104,4 +109,26 @@ public void deletePayloadFromS3(String s3BucketName, String s3Key) { LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + "."); } + + public void deletePayloadsFromS3(String s3BucketName, Collection s3Keys) { + try { + DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder() + .bucket(s3BucketName) + .delete(Delete.builder() + .objects(s3Keys.stream() + .map(s3Key -> ObjectIdentifier.builder() + .key(s3Key) + .build()) + .collect(Collectors.toList())) + .build()) + .build(); + s3Client.deleteObjects(deleteObjectsRequest); + } catch (SdkException e) { + String errorMessage = "Failed to delete the S3 object which contains the payload"; + LOG.error(errorMessage, e); + throw SdkException.create(errorMessage, e); + } + + LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object keys: " + s3Keys + "."); + } } diff --git a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java index 15d4c18..e68222b 100644 --- a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java +++ b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java @@ -12,6 +12,11 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.junit.jupiter.api.BeforeEach; @@ -22,8 +27,10 @@ public class S3BackedPayloadStoreAsyncTest { private static final String S3_BUCKET_NAME = "test-bucket-name"; + private static final String OTHER_S3_BUCKET_NAME = "other-bucket-name"; private static final String ANY_PAYLOAD = "AnyPayload"; private static final String ANY_S3_KEY = "AnyS3key"; + private static final String ANY_OTHER_S3_KEY = "AnyOtherS3key"; private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string"; private PayloadStoreAsync payloadStore; private S3AsyncDao s3AsyncDao; @@ -175,4 +182,70 @@ public void testDeleteOriginalPayloadIncorrectPointer() { verifyNoInteractions(s3AsyncDao); } + @Test + public void testDeleteOriginalPayloadsOnSuccess() { + when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadStore.deleteOriginalPayloads(payloadPointers).join(); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class); + verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getValue()); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue()); + } + + @Test + public void testDeleteOriginalPayloadsSameBucket() { + when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson()); + payloadStore.deleteOriginalPayloads(payloadPointers).join(); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class); + verify(s3AsyncDao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(Arrays.asList(ANY_S3_KEY, ANY_OTHER_S3_KEY), keyCaptor.getValue()); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue()); + } + + @Test + public void testDeleteOriginalPayloadsDifferentBuckets() { + when(s3AsyncDao.deletePayloadsFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadPointers.add(new PayloadS3Pointer(OTHER_S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson()); + payloadStore.deleteOriginalPayloads(payloadPointers).join(); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class); + verify(s3AsyncDao, times(2)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getAllValues().get(0)); + assertEquals(Collections.singletonList(ANY_OTHER_S3_KEY), keyCaptor.getAllValues().get(1)); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(0)); + assertEquals(OTHER_S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(1)); + } + + @Test + public void testDeleteOriginalPayloadsIncorrectPointer() { + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadPointers.add("IncorrectPointer"); + + CompletionException exception = assertThrows(CompletionException.class, () -> { + payloadStore.deleteOriginalPayloads(payloadPointers).join(); + }); + + assertTrue(exception.getMessage().contains(INCORRECT_POINTER_EXCEPTION_MSG)); + verifyNoInteractions(s3AsyncDao); + } + } diff --git a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java index b40df6f..7e4b672 100644 --- a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java +++ b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreTest.java @@ -1,5 +1,10 @@ package software.amazon.payloadoffloading; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -21,8 +26,10 @@ public class S3BackedPayloadStoreTest { private static final String S3_BUCKET_NAME = "test-bucket-name"; + private static final String OTHER_S3_BUCKET_NAME = "other-bucket-name"; private static final String ANY_PAYLOAD = "AnyPayload"; private static final String ANY_S3_KEY = "AnyS3key"; + private static final String ANY_OTHER_S3_KEY = "AnyOtherS3key"; private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string"; private PayloadStore payloadStore; private S3Dao s3Dao; @@ -153,4 +160,58 @@ public void testDeleteOriginalPayloadIncorrectPointer() { INCORRECT_POINTER_EXCEPTION_MSG); verifyNoInteractions(s3Dao); } -} \ No newline at end of file + @Test + public void testDeleteOriginalPayloadsOnSuccess() { + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadStore.deleteOriginalPayloads(payloadPointers); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class); + verify(s3Dao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getValue()); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue()); + } + + @Test + public void testDeleteOriginalPayloadsSameBucket() { + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson()); + payloadStore.deleteOriginalPayloads(payloadPointers); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class); + verify(s3Dao, times(1)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(Arrays.asList(ANY_S3_KEY, ANY_OTHER_S3_KEY), keyCaptor.getValue()); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue()); + } + + @Test + public void testDeleteOriginalPayloadsDifferentBuckets() { + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadPointers.add(new PayloadS3Pointer(OTHER_S3_BUCKET_NAME, ANY_OTHER_S3_KEY).toJson()); + payloadStore.deleteOriginalPayloads(payloadPointers); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(Collection.class); + verify(s3Dao, times(2)).deletePayloadsFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(Collections.singletonList(ANY_S3_KEY), keyCaptor.getAllValues().get(0)); + assertEquals(Collections.singletonList(ANY_OTHER_S3_KEY), keyCaptor.getAllValues().get(1)); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(0)); + assertEquals(OTHER_S3_BUCKET_NAME, bucketNameCaptor.getAllValues().get(1)); + } + + @Test + public void testDeleteOriginalPayloadsIncorrectPointer() { + List payloadPointers = new ArrayList<>(); + payloadPointers.add(new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY).toJson()); + payloadPointers.add("IncorrectPointer"); + assertThrows(SdkClientException.class, () -> payloadStore.deleteOriginalPayloads(payloadPointers), + INCORRECT_POINTER_EXCEPTION_MSG); + } +}