Skip to content

Commit 0b321cf

Browse files
authored
Align Record/Batch Interceptor lifecycle (spring-projects#3053)
Fixes: spring-projects#2287 Resolves spring-projects#2287 * Align lifecycle for earlyRecordInterceptor(intercept + failure/success + afterRecord) and earlyBatchInterceptor(intercept + failure/success). * Fix unit test KafkaMessageListenerContainerTests, see spring-projects#2722 and spring-projects#2287.
1 parent 6f32959 commit 0b321cf

File tree

2 files changed

+21
-4
lines changed

2 files changed

+21
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2662,6 +2662,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
26622662
catch (InterruptedException e) {
26632663
Thread.currentThread().interrupt();
26642664
}
2665+
this.earlyBatchInterceptor.success(nextArg, this.consumer);
26652666
}
26662667
}
26672668
return next;
@@ -2677,6 +2678,8 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg)
26772678
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
26782679
+ KafkaUtils.format(recordArg));
26792680
ackCurrent(recordArg);
2681+
this.earlyRecordInterceptor.success(recordArg, this.consumer);
2682+
this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
26802683
}
26812684
}
26822685
return cRecord;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3898,6 +3898,9 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
38983898
containerProps.setMessageListener((MessageListener<?, ?>) msg -> {
38993899
});
39003900
containerProps.setClientId("clientId");
3901+
if (early) {
3902+
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
3903+
}
39013904

39023905
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
39033906

@@ -3922,17 +3925,27 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39223925
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
39233926
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
39243927
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
3925-
if (ackMode.equals(AckMode.RECORD)) {
3928+
if (AckMode.RECORD.equals(ackMode)) {
39263929
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
39273930
any(Duration.class));
39283931
}
39293932
else {
39303933
verify(consumer, never()).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
39313934
any(Duration.class));
39323935
}
3936+
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
3937+
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
39333938
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
3934-
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3935-
any(Duration.class));
3939+
if (AckMode.RECORD.equals(ackMode)) {
3940+
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3941+
any(Duration.class));
3942+
}
3943+
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
3944+
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
3945+
if (AckMode.BATCH.equals(ackMode)) {
3946+
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
3947+
any(Duration.class));
3948+
}
39363949
container.stop();
39373950
}
39383951

@@ -3968,7 +3981,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
39683981
containerProps.setMessageListener((BatchMessageListener<?, ?>) msgs -> {
39693982
});
39703983
containerProps.setClientId("clientId");
3971-
if (!early) {
3984+
if (early) {
39723985
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
39733986
}
39743987

@@ -3995,6 +4008,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
39954008
inOrder.verify(interceptor).setupThreadState(eq(consumer));
39964009
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
39974010
inOrder.verify(interceptor).intercept(any(), eq(consumer));
4011+
inOrder.verify(interceptor).success(any(), eq(consumer));
39984012
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
39994013
any(Duration.class));
40004014
container.stop();

0 commit comments

Comments
 (0)