Skip to content

Commit 7fd494e

Browse files
committed
Fix flaky KinesisStabilityTest
1 parent 248dd65 commit 7fd494e

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisStabilityTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,10 @@ private List<CompletableFuture<?>> generateSubscribeToShardFutures() {
183183
.startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)),
184184
responseHandler)
185185
.thenAccept(b -> {
186-
// Only verify data if all events have been received.
187-
if (responseHandler.allEventsReceived) {
186+
// Only verify data if all events have been received and the received data is not empty.
187+
// It is possible the received data is empty because there is no record at the position
188+
// event with TRIM_HORIZON.
189+
if (responseHandler.allEventsReceived && !responseHandler.receivedData.isEmpty()) {
188190
assertThat(producedData).as(responseHandler.id + " has not received all events"
189191
+ ".").containsSequence(responseHandler.receivedData);
190192
}
@@ -218,7 +220,7 @@ private static class TestSubscribeToShardResponseHandler extends TestEventStream
218220
, SubscribeToShardEventStream> implements SubscribeToShardResponseHandler {
219221
private final List<SdkBytes> receivedData = new ArrayList<>();
220222
private final String id;
221-
private boolean allEventsReceived = false;
223+
private volatile boolean allEventsReceived = false;
222224

223225
TestSubscribeToShardResponseHandler(int consumerIndex, int shardIndex) {
224226
id = "consumer_" + consumerIndex + "_shard_" + shardIndex;

0 commit comments

Comments
 (0)