Skip to content

fix bug at KafkaMessageListenerContainerTests.testInvokeRecordInterceptorAllSkipped() #3091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
return null;
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
final CountDownLatch resumeLatch = new CountDownLatch(1);
ContainerProperties containerProps = new ContainerProperties("foo");
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
Expand All @@ -2780,7 +2779,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
InOrder inOrder = inOrder(consumer);
assertThat(firstPoll.await(10, TimeUnit.SECONDS)).isNotNull();
container.pausePartition(tp0);
container.pausePartition(tp1);
Expand Down Expand Up @@ -2811,7 +2809,6 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
AtomicBoolean first = new AtomicBoolean(true);
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
Expand Down Expand Up @@ -3462,7 +3459,6 @@ public void testCooperativeRebalance() throws Exception {
containerProps.setGroupId("grp");
containerProps.setClientId("clientId");
containerProps.setMessageListener((MessageListener<?, ?>) msg -> { });
Properties consumerProps = new Properties();
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
Expand Down Expand Up @@ -3606,7 +3602,6 @@ else if (call == 1) {
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
AtomicBoolean firstCommit = new AtomicBoolean(true);
AtomicInteger commitCount = new AtomicInteger();
willAnswer(invoc -> {
commits.add(invoc.getArgument(0, Map.class));
if (!firstCommit.getAndSet(false)) {
Expand Down Expand Up @@ -3888,6 +3883,11 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
latch.countDown();
return null;
}).given(consumer).commitSync(any(), any());
CountDownLatch closeLatch = new CountDownLatch(1);
willAnswer(inv -> {
closeLatch.countDown();
return null;
}).given(consumer).close();
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
new TopicPartitionOffset("foo", 0) };

Expand All @@ -3902,6 +3902,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class));
}

CountDownLatch afterRecordLatch = new CountDownLatch(2);
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {

@Override
Expand All @@ -3912,6 +3913,10 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
return null;
}

public void afterRecord(ConsumerRecord<Integer, String> record, Consumer<Integer, String> consumer) {
afterRecordLatch.countDown();
}

});

KafkaMessageListenerContainer<Integer, String> container =
Expand All @@ -3920,6 +3925,9 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
container.setInterceptBeforeTx(early);
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(afterRecordLatch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();

InOrder inOrder = inOrder(recordInterceptor, consumer);
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
Expand All @@ -3946,12 +3954,12 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
}
container.stop();
inOrder.verify(consumer).close();
}

@ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}")
@ValueSource(booleans = { true, false })
@SuppressWarnings({ "unchecked", "deprecation" })
@SuppressWarnings("unchecked")
public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
Expand Down