Skip to content

Cleanup ConcurrentMessageListenerContainerMockTests #3044

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -130,7 +129,7 @@ else if (event instanceof ConsumerFailedToStartEvent) {
exec.destroy();
}

@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void testCorrectContainerForConsumerError() throws InterruptedException {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
Expand Down Expand Up @@ -200,10 +199,8 @@ void delayedIdleEvent() throws InterruptedException {
containerProperties);
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(2);
AtomicReference<Long> eventTime = new AtomicReference<>();
container.setApplicationEventPublisher(event -> {
if (event instanceof ListenerContainerIdleEvent) {
eventTime.set(System.currentTimeMillis());
latch1.countDown();
latch2.countDown();
}
Expand Down Expand Up @@ -263,7 +260,7 @@ void testSyncRelativeSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -304,7 +301,7 @@ void testAsyncRelativeSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
Expand Down Expand Up @@ -363,7 +360,7 @@ void testSyncTimestampSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -410,7 +407,7 @@ void testAsyncTimestampSeeks() throws InterruptedException {
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> assignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> assignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
Expand Down Expand Up @@ -508,7 +505,9 @@ void testBatchInterceptBeforeTx1() throws InterruptedException {
}

@SuppressWarnings({ "rawtypes", "unchecked" })
void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException {
void testIntercept(boolean beforeTx, @Nullable AssignmentCommitOption option, boolean batch)
throws InterruptedException {

ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = mock(Consumer.class);
TopicPartition tp0 = new TopicPartition("foo", 0);
Expand All @@ -523,7 +522,7 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc
Thread.sleep(10);
return firstOrSecondPoll.incrementAndGet() < 3 ? records : empty;
}).given(consumer).poll(any());
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -676,7 +675,7 @@ void testInterceptInTxNonKafkaTM() throws InterruptedException {
Thread.sleep(10);
return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty;
}).given(consumer).poll(any());
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -771,7 +770,7 @@ void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {
return records;
}).given(consumer).poll(any());
TopicPartition tp0 = new TopicPartition("foo", 0);
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -814,7 +813,7 @@ private void testInitialCommitIBasedOnCommitted(boolean committed) throws Interr
return records;
}).given(consumer).poll(any());
TopicPartition tp0 = new TopicPartition("foo", 0);
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
Expand Down Expand Up @@ -865,7 +864,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept
return null;
}).given(consumer).pause(any());
TopicPartition tp0 = new TopicPartition("foo", 0);
List<TopicPartition> assignments = Arrays.asList(tp0);
List<TopicPartition> assignments = List.of(tp0);
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
willAnswer(invocation -> {
rebal.set(invocation.getArgument(1));
Expand Down Expand Up @@ -911,14 +910,14 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssi
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> allAssignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
allRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1, tp3);
List<TopicPartition> afterRevokeAssignments = List.of(tp1, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> afterRevokeRecordMap = new HashMap<>();
afterRevokeRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
afterRevokeRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
Expand Down Expand Up @@ -979,10 +978,11 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(allAssignments);
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
rebalLatch.countDown();
Expand All @@ -991,11 +991,13 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
return ConsumerRecords.empty();
}
return afterRevokeRecords;
default:
}
default -> {
if (paused.get()) {
return ConsumerRecords.empty();
}
return afterRevokeRecords;
}
}
}).given(consumer).poll(any());
container.start();
Expand Down Expand Up @@ -1023,7 +1025,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
TopicPartition tp3 = new TopicPartition("foo", 3);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
List<TopicPartition> allAssignments = List.of(tp0, tp1, tp2, tp3);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new LinkedHashMap<>();
ConsumerRecord record0 = new ConsumerRecord("foo", 0, 0, null, "bar");
ConsumerRecord record1 = new ConsumerRecord("foo", 1, 0, null, "bar");
Expand All @@ -1032,7 +1034,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> revokedAssignments = Arrays.asList(tp0, tp2);
List<TopicPartition> revokedAssignments = List.of(tp0, tp2);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
Expand All @@ -1044,9 +1046,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
Expand Down Expand Up @@ -1087,17 +1087,20 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(revokedAssignments);
rebal.get().onPartitionsAssigned(Collections.emptyList());
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
return ConsumerRecords.empty();
default:
}
default -> {
return ConsumerRecords.empty();
}
}
}).given(consumer).poll(any());
container.start();
Expand Down Expand Up @@ -1128,14 +1131,14 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException {
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
List<TopicPartition> allAssignments = List.of(tp0, tp1);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0,
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
allRecordMap.put(tp1,
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
List<TopicPartition> afterRevokeAssignments = List.of(tp1);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
Expand All @@ -1147,9 +1150,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
Expand All @@ -1171,17 +1172,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(allAssignments);
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
return ConsumerRecords.empty();
default:
}
default -> {
return ConsumerRecords.empty();
}
}
}).given(consumer).poll(any());
container.start();
Expand All @@ -1206,14 +1210,13 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException {
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
List<TopicPartition> allAssignments = List.of(tp0, tp1);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0,
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
allRecordMap.put(tp1,
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
Expand All @@ -1225,9 +1228,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
Expand All @@ -1249,17 +1250,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
case 0 -> {
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
}
case 1 -> {
rebal.get().onPartitionsRevoked(List.of(tp0));
rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2)));
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
return ConsumerRecords.empty();
default:
}
default -> {
return ConsumerRecords.empty();
}
}
}).given(consumer).poll(any());
container.start();
Expand All @@ -1285,7 +1289,7 @@ private AcknowledgingMessageListener ackOffset1() {

@Override
public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) {
if (rec.offset() == 1) {
if (rec.offset() == 1 && ack != null) {
ack.acknowledge();
}
}
Expand All @@ -1299,7 +1303,7 @@ public void onMessage(Object data) {

public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {

private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();
private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

CountDownLatch latch = new CountDownLatch(1);

Expand Down Expand Up @@ -1335,7 +1339,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC

public static class TestMessageListener2 implements MessageListener<String, String>, ConsumerSeekAware {

private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();
private static final ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

CountDownLatch latch = new CountDownLatch(1);

Expand Down