Skip to content

Commit 0c75fc7

Browse files
spring-projectsGH-3542: Adds the ability to add record interceptors instead of override them
Fixes: spring-projects#3542 Issue link: spring-projects#3542 What? Change `RecordInterceptor` to `List<RecordInterceptor>` in `MessageListenerContainer`. Why? To allow adding multiple `RecordInterceptor` instances instead of overriding the existing one. Currently, only a single `RecordInterceptor` is supported. Users may want to register multiple `RecordInterceptors`. There are some workarounds, but they are not clean or ideal solutions. By supporting `List<RecordInterceptor`>, users can add their own interceptors via `setRecordInterceptor(...)`. Signed-off-by: Sanghyeok An <[email protected]>
1 parent 17fedf6 commit 0c75fc7

File tree

3 files changed

+53
-38
lines changed

3 files changed

+53
-38
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.Collection;
2122
import java.util.List;
@@ -114,7 +115,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
114115

115116
private int topicCheckTimeout = DEFAULT_TOPIC_CHECK_TIMEOUT;
116117

117-
private @Nullable RecordInterceptor<K, V> recordInterceptor;
118+
// TODO: HERE ASH
119+
private List<RecordInterceptor<K, V>> recordInterceptors = new ArrayList<>();
118120

119121
private @Nullable BatchInterceptor<K, V> batchInterceptor;
120122

@@ -460,8 +462,8 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
460462
this.kafkaAdmin = kafkaAdmin;
461463
}
462464

463-
protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
464-
return this.recordInterceptor;
465+
protected List<RecordInterceptor<K, V>> getRecordInterceptors() {
466+
return this.recordInterceptors;
465467
}
466468

467469
/**
@@ -472,7 +474,9 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
472474
* @see #setInterceptBeforeTx(boolean)
473475
*/
474476
public void setRecordInterceptor(@Nullable RecordInterceptor<K, V> recordInterceptor) {
475-
this.recordInterceptor = recordInterceptor;
477+
if (recordInterceptor != null) {
478+
this.recordInterceptors.add(recordInterceptor);
479+
}
476480
}
477481

478482
protected @Nullable BatchInterceptor<K, V> getBatchInterceptor() {

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Tomaz Fernandes
6161
* @author Wang Zhiyang
6262
* @author Lokesh Alamuri
63+
* @author Sanghyeok An
6364
*/
6465
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
6566

@@ -282,7 +283,9 @@ private void configureChildContainer(int index, KafkaMessageListenerContainer<K,
282283
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");
283284
container.setCommonErrorHandler(getCommonErrorHandler());
284285
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
285-
container.setRecordInterceptor(getRecordInterceptor());
286+
for (RecordInterceptor<K, V> recordInterceptor : getRecordInterceptors()) {
287+
container.setRecordInterceptor(recordInterceptor);
288+
}
286289
container.setBatchInterceptor(getBatchInterceptor());
287290
container.setInterceptBeforeTx(isInterceptBeforeTx());
288291
container.setListenerInfo(getListenerInfo());

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -714,17 +714,17 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
714714

715715
private final @Nullable Duration syncCommitTimeout;
716716

717-
private final @Nullable RecordInterceptor<K, V> recordInterceptor =
717+
private final List<RecordInterceptor<K, V>> recordInterceptors =
718718
!isInterceptBeforeTx() || this.transactionManager == null
719-
? getRecordInterceptor()
720-
: null;
719+
? getRecordInterceptors()
720+
: new ArrayList<>();
721721

722-
private final @Nullable RecordInterceptor<K, V> earlyRecordInterceptor =
722+
private final List<RecordInterceptor<K, V>> earlyRecordInterceptors =
723723
isInterceptBeforeTx() && this.transactionManager != null
724-
? getRecordInterceptor()
725-
: null;
724+
? getRecordInterceptors()
725+
: new ArrayList<>();
726726

727-
private final @Nullable RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
727+
private final List<RecordInterceptor<K, V>> commonRecordInterceptors = getRecordInterceptors();
728728

729729
private final @Nullable BatchInterceptor<K, V> batchInterceptor =
730730
!isInterceptBeforeTx() || this.transactionManager == null
@@ -738,7 +738,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
738738

739739
private final @Nullable BatchInterceptor<K, V> commonBatchInterceptor = getBatchInterceptor();
740740

741-
private final @Nullable ThreadStateProcessor pollThreadStateProcessor;
741+
private final List<ThreadStateProcessor> pollThreadStateProcessor;
742742

743743
private final ConsumerSeekCallback seekCallback = new InitialOrIdleSeekCallback();
744744

@@ -1040,9 +1040,20 @@ private void obtainClusterId() {
10401040
}
10411041
}
10421042

1043-
@Nullable
1044-
private ThreadStateProcessor setUpPollProcessor(boolean batch) {
1045-
return batch ? this.commonBatchInterceptor : this.commonRecordInterceptor;
1043+
private List<ThreadStateProcessor> setUpPollProcessor(boolean batch) {
1044+
if (batch) {
1045+
if (this.commonBatchInterceptor != null) {
1046+
List<ThreadStateProcessor> threadStateProcessors = new ArrayList<>();
1047+
threadStateProcessors.add(this.commonBatchInterceptor);
1048+
return threadStateProcessors;
1049+
}
1050+
else {
1051+
return new ArrayList<>();
1052+
}
1053+
}
1054+
else {
1055+
return new ArrayList<>(this.commonRecordInterceptors);
1056+
}
10461057
}
10471058

10481059
@Nullable
@@ -1548,9 +1559,7 @@ private void invokeIfHaveRecords(@Nullable ConsumerRecords<K, V> records) {
15481559
}
15491560

15501561
private void clearThreadState() {
1551-
if (this.pollThreadStateProcessor != null) {
1552-
this.pollThreadStateProcessor.clearThreadState(this.consumer);
1553-
}
1562+
this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.clearThreadState(this.consumer));
15541563
}
15551564

15561565
private void checkIdlePartitions() {
@@ -1708,9 +1717,7 @@ private ConsumerRecords<K, V> pollConsumer() {
17081717
}
17091718

17101719
private void beforePoll() {
1711-
if (this.pollThreadStateProcessor != null) {
1712-
this.pollThreadStateProcessor.setupThreadState(this.consumer);
1713-
}
1720+
this.pollThreadStateProcessor.forEach(threadStateProcessor -> threadStateProcessor.setupThreadState(this.consumer));
17141721
}
17151722

17161723
private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
@@ -2548,9 +2555,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
25482555
this.logger.error(ex, "Transaction rolled back");
25492556
recordAfterRollback(iterator, cRecord, ex);
25502557
}
2551-
if (this.commonRecordInterceptor != null) {
2552-
this.commonRecordInterceptor.afterRecord(cRecord, this.consumer);
2553-
}
2558+
this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer));
25542559
if (this.nackSleepDurationMillis >= 0) {
25552560
handleNack(records, cRecord);
25562561
break;
@@ -2627,9 +2632,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
26272632
}
26282633
this.logger.trace(() -> "Processing " + KafkaUtils.format(cRecord));
26292634
doInvokeRecordListener(cRecord, iterator);
2630-
if (this.commonRecordInterceptor != null) {
2631-
this.commonRecordInterceptor.afterRecord(cRecord, this.consumer);
2632-
}
2635+
this.commonRecordInterceptors.forEach(interceptor -> interceptor.afterRecord(cRecord, this.consumer));
26332636
if (this.nackSleepDurationMillis >= 0) {
26342637
handleNack(records, cRecord);
26352638
break;
@@ -2680,14 +2683,16 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
26802683
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) {
26812684
internalHeaders(recordArg);
26822685
ConsumerRecord<K, V> cRecord = recordArg;
2683-
if (this.earlyRecordInterceptor != null) {
2684-
cRecord = this.earlyRecordInterceptor.intercept(cRecord, this.consumer);
2686+
2687+
for (RecordInterceptor<K, V> earlyRecordInterceptor : this.earlyRecordInterceptors) {
2688+
cRecord = earlyRecordInterceptor.intercept(cRecord, this.consumer);
26852689
if (cRecord == null) {
26862690
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
2687-
+ KafkaUtils.format(recordArg));
2691+
+ KafkaUtils.format(recordArg));
26882692
ackCurrent(recordArg);
2689-
this.earlyRecordInterceptor.success(recordArg, this.consumer);
2690-
this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
2693+
earlyRecordInterceptor.success(recordArg, this.consumer);
2694+
earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
2695+
break;
26912696
}
26922697
}
26932698
return cRecord;
@@ -2848,13 +2853,13 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V>
28482853
}
28492854

28502855
private void recordInterceptAfter(ConsumerRecord<K, V> records, @Nullable Exception exception) {
2851-
if (this.commonRecordInterceptor != null) {
2856+
if (!this.commonRecordInterceptors.isEmpty()) {
28522857
try {
28532858
if (exception == null) {
2854-
this.commonRecordInterceptor.success(records, this.consumer);
2859+
this.commonRecordInterceptors.forEach(interceptor -> interceptor.success(records, this.consumer));
28552860
}
28562861
else {
2857-
this.commonRecordInterceptor.failure(records, exception, this.consumer);
2862+
this.commonRecordInterceptors.forEach(interceptor -> interceptor.failure(records, exception, this.consumer));
28582863
}
28592864
}
28602865
catch (Exception e) {
@@ -2888,8 +2893,11 @@ private void invokeOnMessage(final ConsumerRecord<K, V> cRecord) {
28882893

28892894
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
28902895
ConsumerRecord<K, V> cRecord = recordArg;
2891-
if (this.recordInterceptor != null) {
2892-
cRecord = this.recordInterceptor.intercept(cRecord, this.consumer);
2896+
for (RecordInterceptor<K, V> recordInterceptor : this.recordInterceptors) {
2897+
cRecord = recordInterceptor.intercept(cRecord, this.consumer);
2898+
if (cRecord == null) {
2899+
break;
2900+
}
28932901
}
28942902
if (cRecord == null) {
28952903
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "

0 commit comments

Comments
 (0)