1
1
package software .amazon .lambda .powertools .sqs ;
2
2
3
3
import java .io .IOException ;
4
+ import java .util .HashMap ;
4
5
import java .util .List ;
6
+ import java .util .function .Consumer ;
5
7
6
8
import com .amazonaws .services .lambda .runtime .events .SQSEvent ;
7
9
import com .fasterxml .jackson .databind .ObjectMapper ;
10
+ import org .assertj .core .api .Assertions ;
8
11
import org .junit .jupiter .api .BeforeEach ;
9
12
import org .junit .jupiter .api .Test ;
10
13
import org .junit .jupiter .params .ParameterizedTest ;
11
14
import org .junit .jupiter .params .provider .ValueSource ;
12
15
import org .mockito .ArgumentCaptor ;
13
16
import software .amazon .awssdk .services .sqs .SqsClient ;
14
17
import software .amazon .awssdk .services .sqs .model .DeleteMessageBatchRequest ;
15
- import software .amazon .awssdk .services .sqs .model .GetQueueUrlRequest ;
16
- import software .amazon .awssdk .services .sqs .model .GetQueueUrlResponse ;
18
+ import software .amazon .awssdk .services .sqs .model .GetQueueAttributesRequest ;
19
+ import software .amazon .awssdk .services .sqs .model .GetQueueAttributesResponse ;
20
+ import software .amazon .awssdk .services .sqs .model .QueueAttributeName ;
17
21
18
22
import static com .amazonaws .services .lambda .runtime .events .SQSEvent .SQSMessage ;
19
23
import static org .assertj .core .api .Assertions .assertThat ;
@@ -39,11 +43,6 @@ class SqsUtilsBatchProcessorTest {
39
43
void setUp () throws IOException {
40
44
reset (sqsClient , interactionClient );
41
45
event = MAPPER .readValue (this .getClass ().getResource ("/sampleSqsBatchEvent.json" ), SQSEvent .class );
42
-
43
- when (sqsClient .getQueueUrl (any (GetQueueUrlRequest .class ))).thenReturn (GetQueueUrlResponse .builder ()
44
- .queueUrl ("test" )
45
- .build ());
46
-
47
46
overrideSqsClient (sqsClient );
48
47
}
49
48
@@ -107,14 +106,12 @@ void shouldBatchProcessAndDeleteSuccessMessageOnPartialFailures() {
107
106
});
108
107
109
108
verify (interactionClient ).listQueues ();
110
- verify (sqsClient ).deleteMessageBatch (any (DeleteMessageBatchRequest .class ));
111
109
112
- ArgumentCaptor <GetQueueUrlRequest > captor = ArgumentCaptor .forClass (GetQueueUrlRequest .class );
113
- verify (sqsClient ).getQueueUrl (captor .capture ());
110
+ ArgumentCaptor <DeleteMessageBatchRequest > captor = ArgumentCaptor .forClass (DeleteMessageBatchRequest .class );
111
+ verify (sqsClient ).deleteMessageBatch (captor .capture ());
114
112
115
113
assertThat (captor .getValue ())
116
- .hasFieldOrPropertyWithValue ("queueName" , "my-queue" )
117
- .hasFieldOrPropertyWithValue ("queueOwnerAWSAccountId" , "123456789012" );
114
+ .hasFieldOrPropertyWithValue ("queueUrl" , "https://sqs.us-east-2.amazonaws.com/123456789012/my-queue" );
118
115
}
119
116
120
117
@ Test
@@ -219,6 +216,16 @@ public String process(SQSMessage message) {
219
216
@ Test
220
217
void shouldBatchProcessAndMoveNonRetryableExceptionToDlq () {
221
218
String failedId = "2e1424d4-f796-459a-8184-9c92662be6da" ;
219
+ HashMap <QueueAttributeName , String > attributes = new HashMap <>();
220
+
221
+ attributes .put (QueueAttributeName .REDRIVE_POLICY , "{\n " +
222
+ " \" deadLetterTargetArn\" : \" arn:aws:sqs:us-east-2:123456789012:retry-queue\" ,\n " +
223
+ " \" maxReceiveCount\" : 2\n " +
224
+ "}" );
225
+
226
+ when (sqsClient .getQueueAttributes (any (GetQueueAttributesRequest .class ))).thenReturn (GetQueueAttributesResponse .builder ()
227
+ .attributes (attributes )
228
+ .build ());
222
229
223
230
List <String > batchProcessor = batchProcessor (event , (message ) -> {
224
231
if (failedId .equals (message .getMessageId ())) {
@@ -228,6 +235,11 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
228
235
interactionClient .listQueues ();
229
236
return "Success" ;
230
237
}, IllegalStateException .class , IllegalArgumentException .class );
238
+
239
+ Assertions .assertThat (batchProcessor )
240
+ .hasSize (1 );
241
+
242
+ verify (sqsClient ).sendMessageBatch (any (Consumer .class ));
231
243
}
232
244
233
245
public class FailureSampleInnerSqsHandler implements SqsMessageHandler <String > {
0 commit comments