Skip to content

Commit fd150ad

Browse files
Wzy19930507sobychacko
authored andcommitted
Align Record/Batch Interceptor lifecycle (#3053)
Fixes: #2287 Resolves #2287 * Align lifecycle for earlyRecordInterceptor(intercept + failure/success + afterRecord) and earlyBatchInterceptor(intercept + failure/success). * Fix unit test KafkaMessageListenerContainerTests, see #2722 and #2287. (cherry picked from commit 0b321cf)
1 parent 7077f4e commit fd150ad

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2665,6 +2665,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
26652665
catch (InterruptedException e) {
26662666
Thread.currentThread().interrupt();
26672667
}
2668+
this.earlyBatchInterceptor.success(nextArg, this.consumer);
26682669
}
26692670
}
26702671
return next;
@@ -2680,6 +2681,8 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg)
26802681
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
26812682
+ KafkaUtils.format(recordArg));
26822683
ackCurrent(recordArg);
2684+
this.earlyRecordInterceptor.success(recordArg, this.consumer);
2685+
this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
26832686
}
26842687
}
26852688
return cRecord;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3901,6 +3901,9 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
39013901
containerProps.setMessageListener((MessageListener) msg -> {
39023902
});
39033903
containerProps.setClientId("clientId");
3904+
if (early) {
3905+
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
3906+
}
39043907

39053908
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
39063909

@@ -3925,17 +3928,27 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39253928
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
39263929
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
39273930
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
3928-
if (ackMode.equals(AckMode.RECORD)) {
3931+
if (AckMode.RECORD.equals(ackMode)) {
39293932
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
39303933
any(Duration.class));
39313934
}
39323935
else {
39333936
verify(consumer, never()).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
39343937
any(Duration.class));
39353938
}
3939+
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
3940+
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
39363941
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
3937-
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3938-
any(Duration.class));
3942+
if (AckMode.RECORD.equals(ackMode)) {
3943+
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3944+
any(Duration.class));
3945+
}
3946+
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
3947+
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
3948+
if (AckMode.BATCH.equals(ackMode)) {
3949+
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3950+
any(Duration.class));
3951+
}
39393952
container.stop();
39403953
}
39413954

@@ -3971,7 +3984,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
39713984
containerProps.setMessageListener((BatchMessageListener) msgs -> {
39723985
});
39733986
containerProps.setClientId("clientId");
3974-
if (!early) {
3987+
if (early) {
39753988
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
39763989
}
39773990

@@ -3998,6 +4011,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
39984011
inOrder.verify(interceptor).setupThreadState(eq(consumer));
39994012
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
40004013
inOrder.verify(interceptor).intercept(any(), eq(consumer));
4014+
inOrder.verify(interceptor).success(any(), eq(consumer));
40014015
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
40024016
any(Duration.class));
40034017
container.stop();

0 commit comments

Comments
 (0)