diff --git a/pom.xml b/pom.xml
index 9245270..7111582 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.amazonaws
amazon-sqs-java-extended-client-lib
- 2.1.0
+ 2.1.1
jar
Amazon SQS Extended Client Library for Java
An extension to the Amazon SQS client that enables sending and receiving messages up to 2GB via Amazon S3.
@@ -57,7 +57,7 @@
software.amazon.payloadoffloading
payloadoffloading-common
- 2.2.0
+ 2.2.1
diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java
index 02404a2..49690af 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java
@@ -298,10 +298,8 @@ public CompletableFuture deleteMessage(DeleteMessageReque
// Delete from SQS first, then S3.
final String messageToDeletePointer = messagePointer;
- return super.deleteMessage(deleteMessageRequestBuilder.build())
- .thenCompose(deleteMessageResponse ->
- payloadStore.deleteOriginalPayload(messageToDeletePointer)
- .thenApply(v -> deleteMessageResponse));
+ return payloadStore.deleteOriginalPayload(messageToDeletePointer)
+ .thenCompose(ignore -> super.deleteMessage(deleteMessageRequestBuilder.build()));
}
/**
@@ -395,6 +393,7 @@ public CompletableFuture deleteMessageBatch(
}
List entries = new ArrayList<>(deleteMessageBatchRequest.entries().size());
+ List s3ToCleanup = new ArrayList<>(deleteMessageBatchRequest.entries().size());
for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.entries()) {
DeleteMessageBatchRequestEntry.Builder entryBuilder = entry.toBuilder();
String receiptHandle = entry.receiptHandle();
@@ -406,7 +405,7 @@ public CompletableFuture deleteMessageBatch(
// Delete s3 payload if needed
if (clientConfiguration.doesCleanupS3Payload()) {
String messagePointer = getMessagePointerFromModifiedReceiptHandle(receiptHandle);
- payloadStore.deleteOriginalPayload(messagePointer);
+ s3ToCleanup.add(messagePointer);
}
}
@@ -415,7 +414,16 @@ public CompletableFuture deleteMessageBatch(
}
deleteMessageBatchRequestBuilder.entries(entries);
- return super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build());
+
+ // Check if message is in S3 or only in SQS.
+ if (s3ToCleanup.isEmpty()) {
+ // Delete only from SQS
+ return super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build());
+ }
+
+ // Delete from S3 first, then SQS.
+ return payloadStore.deleteOriginalPayloads(s3ToCleanup)
+ .thenCompose(ignore -> super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build()));
}
/**
diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java
index 68317d2..f9373b6 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java
@@ -698,6 +698,7 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d
}
List entries = new ArrayList<>(deleteMessageBatchRequest.entries().size());
+ List s3ToCleanup = new ArrayList<>(deleteMessageBatchRequest.entries().size());
for (DeleteMessageBatchRequestEntry entry : deleteMessageBatchRequest.entries()) {
DeleteMessageBatchRequestEntry.Builder entryBuilder = entry.toBuilder();
String receiptHandle = entry.receiptHandle();
@@ -709,7 +710,7 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d
// Delete s3 payload if needed
if (clientConfiguration.doesCleanupS3Payload()) {
String messagePointer = getMessagePointerFromModifiedReceiptHandle(receiptHandle);
- payloadStore.deleteOriginalPayload(messagePointer);
+ s3ToCleanup.add(messagePointer);
}
}
@@ -718,6 +719,11 @@ public DeleteMessageBatchResponse deleteMessageBatch(DeleteMessageBatchRequest d
}
deleteMessageBatchRequestBuilder.entries(entries);
+
+ if (!s3ToCleanup.isEmpty()) {
+ payloadStore.deleteOriginalPayloads(s3ToCleanup);
+ }
+
return super.deleteMessageBatch(deleteMessageBatchRequestBuilder.build());
}
diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java
index eb7de08..29d3794 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java
@@ -38,6 +38,8 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -91,6 +93,8 @@ public void setupClients() {
CompletableFuture.completedFuture(null));
when(mockS3.deleteObject(isA(DeleteObjectRequest.class))).thenReturn(
CompletableFuture.completedFuture(DeleteObjectResponse.builder().build()));
+ when(mockS3.deleteObjects(isA(DeleteObjectsRequest.class))).thenReturn(
+ CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build()));
when(mockSqsBackend.sendMessage(isA(SendMessageRequest.class))).thenReturn(
CompletableFuture.completedFuture(SendMessageResponse.builder().build()));
when(mockSqsBackend.sendMessageBatch(isA(SendMessageBatchRequest.class))).thenReturn(
@@ -590,7 +594,7 @@ public void testDefaultExtendedClientDeletesObjectsFromS3UponDeleteBatch() {
IntStream.range(0, originalReceiptHandles.size()).forEach(i -> assertEquals(
originalReceiptHandles.get(i),
request.entries().get(i).receiptHandle()));
- verify(mockS3, times(batchSize)).deleteObject(any(DeleteObjectRequest.class));
+ verify(mockS3, times(1)).deleteObjects(any(DeleteObjectsRequest.class));
}
@Test
diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java
index fd58b0b..f29c5e7 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java
@@ -29,6 +29,7 @@
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
@@ -621,7 +622,7 @@ public void testDefaultExtendedClientDeletesObjectsFromS3UponDeleteBatch() {
IntStream.range(0, originalReceiptHandles.size()).forEach(i -> assertEquals(
originalReceiptHandles.get(i),
request.entries().get(i).receiptHandle()));
- verify(mockS3, times(batchSize)).deleteObject(any(DeleteObjectRequest.class));
+ verify(mockS3, times(1)).deleteObjects(any(DeleteObjectsRequest.class));
}
@Test