20
20
import com .amazonaws .services .lambda .runtime .events .KinesisEvent ;
21
21
import com .amazonaws .services .lambda .runtime .events .StreamsEventResponse ;
22
22
import com .amazonaws .services .lambda .runtime .tests .annotations .Event ;
23
+ import java .util .ArrayList ;
24
+ import java .util .Collections ;
25
+ import java .util .List ;
23
26
import java .util .concurrent .atomic .AtomicBoolean ;
27
+ import org .junit .jupiter .api .AfterEach ;
24
28
import org .junit .jupiter .params .ParameterizedTest ;
25
29
import org .mockito .Mock ;
26
30
import software .amazon .lambda .powertools .batch .handler .BatchMessageHandler ;
27
31
import software .amazon .lambda .powertools .batch .model .Product ;
28
32
29
- public class KinesisBatchProcessorTest {
33
+ class KinesisBatchProcessorTest {
30
34
31
35
@ Mock
32
36
private Context context ;
33
37
38
+ private final List <String > threadList = Collections .synchronizedList (new ArrayList <>());
39
+
40
+ @ AfterEach
41
+ public void clear () {
42
+ threadList .clear ();
43
+ }
44
+
34
45
private void processMessageSucceeds (KinesisEvent .KinesisEventRecord record , Context context ) {
35
46
// Great success
36
47
}
@@ -42,6 +53,34 @@ private void processMessageFailsForFixedMessage(KinesisEvent.KinesisEventRecord
42
53
}
43
54
}
44
55
56
+ private void processMessageInParallelSucceeds (KinesisEvent .KinesisEventRecord record , Context context ) {
57
+ String thread = Thread .currentThread ().getName ();
58
+ if (!threadList .contains (thread )) {
59
+ threadList .add (thread );
60
+ }
61
+ try {
62
+ Thread .sleep (500 ); // simulate some processing
63
+ } catch (InterruptedException e ) {
64
+ throw new RuntimeException (e );
65
+ }
66
+ }
67
+
68
+ private void processMessageInParallelFailsForFixedMessage (KinesisEvent .KinesisEventRecord record , Context context ) {
69
+ String thread = Thread .currentThread ().getName ();
70
+ if (!threadList .contains (thread )) {
71
+ threadList .add (thread );
72
+ }
73
+ try {
74
+ Thread .sleep (500 ); // simulate some processing
75
+ } catch (InterruptedException e ) {
76
+ throw new RuntimeException (e );
77
+ }
78
+ if (record .getKinesis ().getSequenceNumber ()
79
+ .equals ("49545115243490985018280067714973144582180062593244200961" )) {
80
+ throw new RuntimeException ("fake exception" );
81
+ }
82
+ }
83
+
45
84
// A handler that throws an exception for _one_ of the deserialized products in the same messages
46
85
public void processMessageFailsForFixedProduct (Product product , Context context ) {
47
86
if (product .getId () == 1234 ) {
@@ -51,7 +90,7 @@ public void processMessageFailsForFixedProduct(Product product, Context context)
51
90
52
91
@ ParameterizedTest
53
92
@ Event (value = "kinesis_event.json" , type = KinesisEvent .class )
54
- public void batchProcessingSucceedsAndReturns (KinesisEvent event ) {
93
+ void batchProcessingSucceedsAndReturns (KinesisEvent event ) {
55
94
// Arrange
56
95
BatchMessageHandler <KinesisEvent , StreamsEventResponse > handler = new BatchMessageHandlerBuilder ()
57
96
.withKinesisBatchHandler ()
@@ -61,12 +100,28 @@ public void batchProcessingSucceedsAndReturns(KinesisEvent event) {
61
100
StreamsEventResponse kinesisBatchResponse = handler .processBatch (event , context );
62
101
63
102
// Assert
64
- assertThat (kinesisBatchResponse .getBatchItemFailures ()).hasSize (0 );
103
+ assertThat (kinesisBatchResponse .getBatchItemFailures ()).isEmpty ();
104
+ }
105
+
106
+ @ ParameterizedTest
107
+ @ Event (value = "kinesis_event_big.json" , type = KinesisEvent .class )
108
+ void batchProcessingInParallelSucceedsAndReturns (KinesisEvent event ) {
109
+ // Arrange
110
+ BatchMessageHandler <KinesisEvent , StreamsEventResponse > handler = new BatchMessageHandlerBuilder ()
111
+ .withKinesisBatchHandler ()
112
+ .buildWithRawMessageHandler (this ::processMessageInParallelSucceeds );
113
+
114
+ // Act
115
+ StreamsEventResponse kinesisBatchResponse = handler .processBatchInParallel (event , context );
116
+
117
+ // Assert
118
+ assertThat (kinesisBatchResponse .getBatchItemFailures ()).isEmpty ();
119
+ assertThat (threadList ).hasSizeGreaterThan (1 );
65
120
}
66
121
67
122
@ ParameterizedTest
68
123
@ Event (value = "kinesis_event.json" , type = KinesisEvent .class )
69
- public void shouldAddMessageToBatchFailure_whenException_withMessage (KinesisEvent event ) {
124
+ void shouldAddMessageToBatchFailure_whenException_withMessage (KinesisEvent event ) {
70
125
// Arrange
71
126
BatchMessageHandler <KinesisEvent , StreamsEventResponse > handler = new BatchMessageHandlerBuilder ()
72
127
.withKinesisBatchHandler ()
@@ -82,9 +137,28 @@ public void shouldAddMessageToBatchFailure_whenException_withMessage(KinesisEven
82
137
"49545115243490985018280067714973144582180062593244200961" );
83
138
}
84
139
140
+ @ ParameterizedTest
141
+ @ Event (value = "kinesis_event_big.json" , type = KinesisEvent .class )
142
+ void batchProcessingInParallel_shouldAddMessageToBatchFailure_whenException_withMessage (KinesisEvent event ) {
143
+ // Arrange
144
+ BatchMessageHandler <KinesisEvent , StreamsEventResponse > handler = new BatchMessageHandlerBuilder ()
145
+ .withKinesisBatchHandler ()
146
+ .buildWithRawMessageHandler (this ::processMessageInParallelFailsForFixedMessage );
147
+
148
+ // Act
149
+ StreamsEventResponse kinesisBatchResponse = handler .processBatchInParallel (event , context );
150
+
151
+ // Assert
152
+ assertThat (kinesisBatchResponse .getBatchItemFailures ()).hasSize (1 );
153
+ StreamsEventResponse .BatchItemFailure batchItemFailure = kinesisBatchResponse .getBatchItemFailures ().get (0 );
154
+ assertThat (batchItemFailure .getItemIdentifier ()).isEqualTo (
155
+ "49545115243490985018280067714973144582180062593244200961" );
156
+ assertThat (threadList ).hasSizeGreaterThan (1 );
157
+ }
158
+
85
159
@ ParameterizedTest
86
160
@ Event (value = "kinesis_event.json" , type = KinesisEvent .class )
87
- public void shouldAddMessageToBatchFailure_whenException_withProduct (KinesisEvent event ) {
161
+ void shouldAddMessageToBatchFailure_whenException_withProduct (KinesisEvent event ) {
88
162
// Arrange
89
163
BatchMessageHandler <KinesisEvent , StreamsEventResponse > handler = new BatchMessageHandlerBuilder ()
90
164
.withKinesisBatchHandler ()
@@ -102,7 +176,7 @@ public void shouldAddMessageToBatchFailure_whenException_withProduct(KinesisEven
102
176
103
177
@ ParameterizedTest
104
178
@ Event (value = "kinesis_event.json" , type = KinesisEvent .class )
105
- public void failingFailureHandlerShouldntFailBatch (KinesisEvent event ) {
179
+ void failingFailureHandlerShouldntFailBatch (KinesisEvent event ) {
106
180
// Arrange
107
181
AtomicBoolean wasCalled = new AtomicBoolean (false );
108
182
BatchMessageHandler <KinesisEvent , StreamsEventResponse > handler = new BatchMessageHandlerBuilder ()
@@ -118,7 +192,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) {
118
192
119
193
// Assert
120
194
assertThat (kinesisBatchResponse ).isNotNull ();
121
- assertThat (kinesisBatchResponse .getBatchItemFailures (). size ()). isEqualTo (1 );
195
+ assertThat (kinesisBatchResponse .getBatchItemFailures ()). hasSize (1 );
122
196
assertThat (wasCalled .get ()).isTrue ();
123
197
StreamsEventResponse .BatchItemFailure batchItemFailure = kinesisBatchResponse .getBatchItemFailures ().get (0 );
124
198
assertThat (batchItemFailure .getItemIdentifier ()).isEqualTo (
@@ -127,7 +201,7 @@ public void failingFailureHandlerShouldntFailBatch(KinesisEvent event) {
127
201
128
202
@ ParameterizedTest
129
203
@ Event (value = "kinesis_event.json" , type = KinesisEvent .class )
130
- public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage (KinesisEvent event ) {
204
+ void failingSuccessHandlerShouldntFailBatchButShouldFailMessage (KinesisEvent event ) {
131
205
// Arrange
132
206
AtomicBoolean wasCalledAndFailed = new AtomicBoolean (false );
133
207
BatchMessageHandler <KinesisEvent , StreamsEventResponse > handler = new BatchMessageHandlerBuilder ()
@@ -146,7 +220,7 @@ public void failingSuccessHandlerShouldntFailBatchButShouldFailMessage(KinesisEv
146
220
147
221
// Assert
148
222
assertThat (kinesisBatchResponse ).isNotNull ();
149
- assertThat (kinesisBatchResponse .getBatchItemFailures (). size ()). isEqualTo (1 );
223
+ assertThat (kinesisBatchResponse .getBatchItemFailures ()). hasSize (1 );
150
224
assertThat (wasCalledAndFailed .get ()).isTrue ();
151
225
StreamsEventResponse .BatchItemFailure batchItemFailure = kinesisBatchResponse .getBatchItemFailures ().get (0 );
152
226
assertThat (batchItemFailure .getItemIdentifier ()).isEqualTo (
0 commit comments