Skip to content

Commit d22ad0c

Browse files
committed
validation of sqs / kinesis batches with partial failures
1 parent daec77d commit d22ad0c

File tree

9 files changed

+557
-144
lines changed

9 files changed

+557
-144
lines changed

powertools-validation/pom.xml

+15-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,21 @@
8383
<artifactId>junit-jupiter-engine</artifactId>
8484
<scope>test</scope>
8585
</dependency>
86-
86+
<dependency>
87+
<groupId>org.slf4j</groupId>
88+
<artifactId>slf4j-simple</artifactId>
89+
<scope>test</scope>
90+
</dependency>
91+
<dependency>
92+
<groupId>org.mockito</groupId>
93+
<artifactId>mockito-core</artifactId>
94+
<scope>test</scope>
95+
</dependency>
96+
<dependency>
97+
<groupId>com.amazonaws</groupId>
98+
<artifactId>aws-lambda-java-tests</artifactId>
99+
<scope>test</scope>
100+
</dependency>
87101
<dependency>
88102
<groupId>org.apache.commons</groupId>
89103
<artifactId>commons-lang3</artifactId>

powertools-validation/src/main/java/software/amazon/lambda/powertools/validation/internal/ValidationAspect.java

+184-92
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2024 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.validation.handlers;
16+
17+
import com.amazonaws.services.lambda.runtime.Context;
18+
import com.amazonaws.services.lambda.runtime.RequestHandler;
19+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
20+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
21+
import java.util.ArrayList;
22+
import software.amazon.lambda.powertools.validation.Validation;
23+
24+
public class KinesisHandlerWithError implements RequestHandler<KinesisEvent, StreamsEventResponse> {
25+
26+
@Override
27+
@Validation(inboundSchema = "classpath:/schema_v7.json")
28+
public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {
29+
StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build();
30+
assert input.getRecords().size() == 2; // invalid messages have been removed from the input
31+
response.getBatchItemFailures().add(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier("1234").build());
32+
return response;
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2024 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.validation.handlers;
16+
17+
import com.amazonaws.services.lambda.runtime.Context;
18+
import com.amazonaws.services.lambda.runtime.RequestHandler;
19+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
20+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
21+
import java.util.ArrayList;
22+
import software.amazon.lambda.powertools.validation.Validation;
23+
24+
public class SQSHandlerWithError implements RequestHandler<SQSEvent, SQSBatchResponse> {
25+
26+
@Override
27+
@Validation(inboundSchema = "classpath:/schema_v7.json")
28+
public SQSBatchResponse handleRequest(SQSEvent input, Context context) {
29+
SQSBatchResponse response = SQSBatchResponse.builder().withBatchItemFailures(new ArrayList<>()).build();
30+
assert input.getRecords().size() == 2; // invalid messages have been removed from the input
31+
response.getBatchItemFailures().add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier("1234").build());
32+
return response;
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2024 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.validation.handlers;
16+
17+
import com.amazonaws.services.lambda.runtime.Context;
18+
import com.amazonaws.services.lambda.runtime.RequestHandler;
19+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
20+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
21+
import software.amazon.lambda.powertools.validation.Validation;
22+
23+
public class StandardKinesisHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {
24+
25+
@Override
26+
@Validation(inboundSchema = "classpath:/schema_v7.json")
27+
public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {
28+
StreamsEventResponse response = StreamsEventResponse.builder().build();
29+
assert input.getRecords().size() == 2; // invalid messages have been removed from the input
30+
return response;
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2024 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
15+
package software.amazon.lambda.powertools.validation.handlers;
16+
17+
import com.amazonaws.services.lambda.runtime.Context;
18+
import com.amazonaws.services.lambda.runtime.RequestHandler;
19+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
20+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
21+
import software.amazon.lambda.powertools.validation.Validation;
22+
23+
public class StandardSQSHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
24+
25+
@Override
26+
@Validation(inboundSchema = "classpath:/schema_v7.json")
27+
public SQSBatchResponse handleRequest(SQSEvent input, Context context) {
28+
SQSBatchResponse response = SQSBatchResponse.builder().build();
29+
assert input.getRecords().size() == 2; // invalid messages have been removed from the input
30+
return response;
31+
}
32+
}

powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/internal/ValidationAspectTest.java

+82-51
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,6 @@
2020
import static org.junit.jupiter.api.Assertions.fail;
2121
import static org.mockito.Mockito.when;
2222

23-
import java.io.IOException;
24-
import java.util.ArrayList;
25-
import java.util.HashMap;
26-
import java.util.List;
27-
import java.util.Map;
28-
import java.util.stream.Stream;
29-
30-
import org.aspectj.lang.ProceedingJoinPoint;
31-
import org.aspectj.lang.Signature;
32-
import org.junit.jupiter.api.BeforeEach;
33-
import org.junit.jupiter.api.Test;
34-
import org.junit.jupiter.params.ParameterizedTest;
35-
import org.junit.jupiter.params.provider.Arguments;
36-
import org.junit.jupiter.params.provider.ArgumentsSource;
37-
import org.junit.jupiter.params.provider.MethodSource;
38-
import org.mockito.Mock;
39-
import org.mockito.MockitoAnnotations;
40-
4123
import com.amazonaws.services.lambda.runtime.Context;
4224
import com.amazonaws.services.lambda.runtime.RequestHandler;
4325
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
@@ -55,24 +37,45 @@
5537
import com.amazonaws.services.lambda.runtime.events.KinesisFirehoseEvent;
5638
import com.amazonaws.services.lambda.runtime.events.RabbitMQEvent;
5739
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
40+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
5841
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
5942
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
60-
import com.amazonaws.services.lambda.runtime.serialization.PojoSerializer;
61-
import com.amazonaws.services.lambda.runtime.serialization.events.LambdaEventSerializers;
43+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
44+
import com.amazonaws.services.lambda.runtime.tests.annotations.Event;
6245
import com.networknt.schema.SpecVersion;
63-
46+
import java.io.IOException;
47+
import java.util.ArrayList;
48+
import java.util.HashMap;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.stream.Collectors;
52+
import java.util.stream.Stream;
53+
import org.aspectj.lang.ProceedingJoinPoint;
54+
import org.aspectj.lang.Signature;
55+
import org.junit.jupiter.api.BeforeEach;
56+
import org.junit.jupiter.api.Test;
57+
import org.junit.jupiter.params.ParameterizedTest;
58+
import org.junit.jupiter.params.provider.Arguments;
59+
import org.junit.jupiter.params.provider.ArgumentsSource;
60+
import org.junit.jupiter.params.provider.MethodSource;
61+
import org.mockito.Mock;
62+
import org.mockito.MockitoAnnotations;
6463
import software.amazon.lambda.powertools.validation.Validation;
6564
import software.amazon.lambda.powertools.validation.ValidationConfig;
6665
import software.amazon.lambda.powertools.validation.ValidationException;
6766
import software.amazon.lambda.powertools.validation.handlers.GenericSchemaV7APIGatewayProxyRequestEventHandler;
6867
import software.amazon.lambda.powertools.validation.handlers.GenericSchemaV7StringHandler;
68+
import software.amazon.lambda.powertools.validation.handlers.KinesisHandlerWithError;
69+
import software.amazon.lambda.powertools.validation.handlers.SQSHandlerWithError;
6970
import software.amazon.lambda.powertools.validation.handlers.SQSWithCustomEnvelopeHandler;
7071
import software.amazon.lambda.powertools.validation.handlers.SQSWithWrongEnvelopeHandler;
72+
import software.amazon.lambda.powertools.validation.handlers.StandardKinesisHandler;
73+
import software.amazon.lambda.powertools.validation.handlers.StandardSQSHandler;
7174
import software.amazon.lambda.powertools.validation.handlers.ValidationInboundAPIGatewayV2HTTPEventHandler;
7275
import software.amazon.lambda.powertools.validation.model.MyCustomEvent;
7376

7477

75-
public class ValidationAspectTest {
78+
class ValidationAspectTest {
7679

7780
@Mock
7881
Validation validation;
@@ -167,7 +170,7 @@ void testValidateOutboundJsonSchemaWithHandledExceptions(Object object) throws T
167170
}
168171

169172
@Test
170-
public void testValidateOutboundJsonSchema_APIGWV2() throws Throwable {
173+
void testValidateOutboundJsonSchema_APIGWV2() throws Throwable {
171174
when(validation.schemaVersion()).thenReturn(SpecVersion.VersionFlag.V7);
172175
when(pjp.getSignature()).thenReturn(signature);
173176
when(pjp.getSignature().getDeclaringType()).thenReturn(RequestHandler.class);
@@ -187,7 +190,7 @@ public void testValidateOutboundJsonSchema_APIGWV2() throws Throwable {
187190
}
188191

189192
@Test
190-
public void validate_inputOK_schemaInClasspath_shouldValidate() {
193+
void validate_inputOK_schemaInClasspath_shouldValidate() {
191194
GenericSchemaV7APIGatewayProxyRequestEventHandler handler = new GenericSchemaV7APIGatewayProxyRequestEventHandler();
192195
APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent();
193196
event.setBody("{" +
@@ -204,7 +207,7 @@ public void validate_inputOK_schemaInClasspath_shouldValidate() {
204207
}
205208

206209
@Test
207-
public void validate_inputKO_schemaInClasspath_shouldThrowValidationException() {
210+
void validate_inputKO_schemaInClasspath_shouldThrowValidationException() {
208211
GenericSchemaV7APIGatewayProxyRequestEventHandler handler = new GenericSchemaV7APIGatewayProxyRequestEventHandler();
209212

210213
Map<String, String> headers = new HashMap<>();
@@ -232,7 +235,7 @@ public void validate_inputKO_schemaInClasspath_shouldThrowValidationException()
232235
}
233236

234237
@Test
235-
public void validate_inputOK_schemaInString_shouldValidate() {
238+
void validate_inputOK_schemaInString_shouldValidate() {
236239
ValidationInboundAPIGatewayV2HTTPEventHandler handler = new ValidationInboundAPIGatewayV2HTTPEventHandler();
237240
APIGatewayV2HTTPEvent event = new APIGatewayV2HTTPEvent();
238241
event.setBody("{" +
@@ -248,7 +251,7 @@ public void validate_inputOK_schemaInString_shouldValidate() {
248251

249252

250253
@Test
251-
public void validate_inputKO_schemaInString_shouldThrowValidationException() {
254+
void validate_inputKO_schemaInString_shouldThrowValidationException() {
252255
ValidationInboundAPIGatewayV2HTTPEventHandler handler = new ValidationInboundAPIGatewayV2HTTPEventHandler();
253256

254257
Map<String, String> headers = new HashMap<>();
@@ -268,49 +271,77 @@ public void validate_inputKO_schemaInString_shouldThrowValidationException() {
268271
assertThat(response.getMultiValueHeaders()).isEmpty();
269272
}
270273

271-
@Test
272-
public void validate_SQS() {
273-
PojoSerializer<SQSEvent> pojoSerializer =
274-
LambdaEventSerializers.serializerFor(SQSEvent.class, ClassLoader.getSystemClassLoader());
275-
SQSEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/sqs.json"));
276-
274+
@ParameterizedTest
275+
@Event(value = "sqs.json", type = SQSEvent.class)
276+
void validate_SQS(SQSEvent event) {
277277
GenericSchemaV7StringHandler<Object> handler = new GenericSchemaV7StringHandler<>();
278278
assertThat(handler.handleRequest(event, context)).isEqualTo("OK");
279279
}
280280

281-
@Test
282-
public void validate_SQS_CustomEnvelopeTakePrecedence() {
283-
PojoSerializer<SQSEvent> pojoSerializer =
284-
LambdaEventSerializers.serializerFor(SQSEvent.class, ClassLoader.getSystemClassLoader());
285-
SQSEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/sqs_message.json"));
281+
@ParameterizedTest
282+
@Event(value = "sqs_invalid_messages.json", type = SQSEvent.class)
283+
void validate_SQS_with_validation_partial_failure(SQSEvent event) {
284+
StandardSQSHandler handler = new StandardSQSHandler();
285+
SQSBatchResponse response = handler.handleRequest(event, context);
286+
assertThat(response.getBatchItemFailures()).hasSize(2);
287+
assertThat(response.getBatchItemFailures().stream().map(SQSBatchResponse.BatchItemFailure::getItemIdentifier).collect(
288+
Collectors.toList())).contains("d9144555-9a4f-4ec3-99a0-fc4e625a8db3", "d9144555-9a4f-4ec3-99a0-fc4e625a8db5");
289+
}
290+
291+
@ParameterizedTest
292+
@Event(value = "sqs_invalid_messages.json", type = SQSEvent.class)
293+
void validate_SQS_with_partial_failure(SQSEvent event) {
294+
SQSHandlerWithError handler = new SQSHandlerWithError();
295+
SQSBatchResponse response = handler.handleRequest(event, context);
296+
assertThat(response.getBatchItemFailures()).hasSize(3);
297+
assertThat(response.getBatchItemFailures().stream().map(SQSBatchResponse.BatchItemFailure::getItemIdentifier).collect(
298+
Collectors.toList())).contains("d9144555-9a4f-4ec3-99a0-fc4e625a8db3", "d9144555-9a4f-4ec3-99a0-fc4e625a8db5", "1234");
299+
}
286300

301+
@ParameterizedTest
302+
@Event(value = "sqs_message.json", type = SQSEvent.class)
303+
void validate_SQS_CustomEnvelopeTakePrecedence(SQSEvent event) {
287304
SQSWithCustomEnvelopeHandler handler = new SQSWithCustomEnvelopeHandler();
288305
assertThat(handler.handleRequest(event, context)).isEqualTo("OK");
289306
}
290307

291-
@Test
292-
public void validate_SQS_WrongEnvelope_shouldThrowValidationException() {
293-
PojoSerializer<SQSEvent> pojoSerializer =
294-
LambdaEventSerializers.serializerFor(SQSEvent.class, ClassLoader.getSystemClassLoader());
295-
SQSEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/sqs_message.json"));
296-
308+
@ParameterizedTest
309+
@Event(value = "sqs_message.json", type = SQSEvent.class)
310+
void validate_SQS_WrongEnvelope_shouldThrowValidationException(SQSEvent event) {
297311
SQSWithWrongEnvelopeHandler handler = new SQSWithWrongEnvelopeHandler();
298312
assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> handler.handleRequest(event, context));
299313
}
300314

301-
@Test
302-
public void validate_Kinesis() {
303-
PojoSerializer<KinesisEvent> pojoSerializer =
304-
LambdaEventSerializers.serializerFor(KinesisEvent.class, ClassLoader.getSystemClassLoader());
305-
KinesisEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/kinesis.json"));
306-
315+
@ParameterizedTest
316+
@Event(value = "kinesis.json", type = KinesisEvent.class)
317+
void validate_Kinesis(KinesisEvent event) {
307318
GenericSchemaV7StringHandler<Object> handler = new GenericSchemaV7StringHandler<>();
308319
assertThat(handler.handleRequest(event, context)).isEqualTo("OK");
309320
}
310321

322+
@ParameterizedTest
323+
@Event(value = "kinesis_invalid_messages.json", type = KinesisEvent.class)
324+
void validate_Kinesis_with_validation_partial_failure(KinesisEvent event) {
325+
StandardKinesisHandler handler = new StandardKinesisHandler();
326+
StreamsEventResponse response = handler.handleRequest(event, context);
327+
assertThat(response.getBatchItemFailures()).hasSize(2);
328+
assertThat(response.getBatchItemFailures().stream().map(StreamsEventResponse.BatchItemFailure::getItemIdentifier).collect(
329+
Collectors.toList())).contains("49545115243490985018280067714973144582180062593244200962", "49545115243490985018280067714973144582180062593244200964");
330+
}
331+
332+
@ParameterizedTest
333+
@Event(value = "kinesis_invalid_messages.json", type = KinesisEvent.class)
334+
void validate_Kinesis_with_partial_failure(KinesisEvent event) {
335+
KinesisHandlerWithError handler = new KinesisHandlerWithError();
336+
StreamsEventResponse response = handler.handleRequest(event, context);
337+
assertThat(response.getBatchItemFailures()).hasSize(3);
338+
assertThat(response.getBatchItemFailures().stream().map(StreamsEventResponse.BatchItemFailure::getItemIdentifier).collect(
339+
Collectors.toList())).contains("49545115243490985018280067714973144582180062593244200962", "49545115243490985018280067714973144582180062593244200964", "1234");
340+
}
341+
311342
@ParameterizedTest
312343
@MethodSource("provideEventAndEventType")
313-
public void validateEEvent(String jsonResource, Class eventClass) throws IOException {
344+
void validateEEvent(String jsonResource, Class eventClass) throws IOException {
314345
Object event = ValidationConfig.get().getObjectMapper()
315346
.readValue(this.getClass().getResourceAsStream(jsonResource), eventClass);
316347

0 commit comments

Comments
 (0)