Skip to content

Commit 1eca673

Browse files
garyrussellartembilan
authored andcommitted
GH-2329: Fix interceptBeforeTx for Non-Kafka TM
Resolves #2329 The flag should apply, regardless of PTM type.
1 parent 36665e9 commit 1eca673

File tree

2 files changed

+109
-8
lines changed

2 files changed

+109
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -667,23 +667,25 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
667667

668668
private final Duration syncCommitTimeout;
669669

670-
private final RecordInterceptor<K, V> recordInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
671-
? getRecordInterceptor()
672-
: null;
670+
private final RecordInterceptor<K, V> recordInterceptor =
671+
!isInterceptBeforeTx() && this.transactionManager != null
672+
? getRecordInterceptor()
673+
: null;
673674

674675
private final RecordInterceptor<K, V> earlyRecordInterceptor =
675-
isInterceptBeforeTx() || this.kafkaTxManager == null
676+
isInterceptBeforeTx() || this.transactionManager == null
676677
? getRecordInterceptor()
677678
: null;
678679

679680
private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
680681

681-
private final BatchInterceptor<K, V> batchInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
682-
? getBatchInterceptor()
683-
: null;
682+
private final BatchInterceptor<K, V> batchInterceptor =
683+
!isInterceptBeforeTx() && this.transactionManager != null
684+
? getBatchInterceptor()
685+
: null;
684686

685687
private final BatchInterceptor<K, V> earlyBatchInterceptor =
686-
isInterceptBeforeTx() || this.kafkaTxManager == null
688+
isInterceptBeforeTx() || this.transactionManager == null
687689
? getBatchInterceptor()
688690
: null;
689691

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
6969
import org.springframework.lang.Nullable;
7070
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
71+
import org.springframework.transaction.PlatformTransactionManager;
7172
import org.springframework.transaction.support.TransactionSynchronizationManager;
7273

7374
/**
@@ -655,6 +656,104 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
655656
}
656657
}
657658

659+
@Test
660+
@SuppressWarnings({ "rawtypes", "unchecked" })
661+
void testInterceptInTxNonKafkaTM() throws InterruptedException {
662+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
663+
final Consumer consumer = mock(Consumer.class);
664+
TopicPartition tp0 = new TopicPartition("foo", 0);
665+
ConsumerRecord record1 = new ConsumerRecord("foo", 0, 0L, "bar", "baz");
666+
ConsumerRecords records = new ConsumerRecords(
667+
Collections.singletonMap(tp0, Collections.singletonList(record1)));
668+
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
669+
AtomicInteger firstOrSecondPoll = new AtomicInteger();
670+
willAnswer(invocation -> {
671+
Thread.sleep(10);
672+
return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty;
673+
}).given(consumer).poll(any());
674+
List<TopicPartition> assignments = Arrays.asList(tp0);
675+
willAnswer(invocation -> {
676+
((ConsumerRebalanceListener) invocation.getArgument(1))
677+
.onPartitionsAssigned(assignments);
678+
return null;
679+
}).given(consumer).subscribe(any(Collection.class), any());
680+
given(consumer.position(any())).willReturn(0L);
681+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
682+
.willReturn(consumer);
683+
ContainerProperties containerProperties = new ContainerProperties("foo");
684+
containerProperties.setGroupId("grp");
685+
AtomicReference<List<ConsumerRecord<String, String>>> received = new AtomicReference<>();
686+
containerProperties.setMessageListener((MessageListener) rec -> {
687+
});
688+
containerProperties.setMissingTopicsFatal(false);
689+
List<String> order = new ArrayList<>();
690+
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(2));
691+
PlatformTransactionManager tm = mock(PlatformTransactionManager.class);
692+
willAnswer(inv -> {
693+
order.add("tx");
694+
latch.get().countDown();
695+
return null;
696+
}).given(tm).getTransaction(any());
697+
containerProperties.setTransactionManager(tm);
698+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
699+
containerProperties);
700+
AtomicReference<CountDownLatch> successCalled = new AtomicReference<>(new CountDownLatch(1));
701+
container.setRecordInterceptor(new RecordInterceptor() {
702+
703+
@Override
704+
@Nullable
705+
public ConsumerRecord intercept(ConsumerRecord rec, Consumer consumer) {
706+
order.add("interceptor");
707+
latch.get().countDown();
708+
return rec;
709+
}
710+
711+
@Override
712+
public void success(ConsumerRecord record, Consumer consumer) {
713+
order.add("success");
714+
successCalled.get().countDown();
715+
}
716+
717+
});
718+
container.setBatchInterceptor(new BatchInterceptor() {
719+
720+
@Override
721+
@Nullable
722+
public ConsumerRecords intercept(ConsumerRecords recs, Consumer consumer) {
723+
order.add("b.interceptor");
724+
latch.get().countDown();
725+
return new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record1)));
726+
}
727+
728+
@Override
729+
public void success(ConsumerRecords records, Consumer consumer) {
730+
order.add("b.success");
731+
successCalled.get().countDown();
732+
}
733+
734+
});
735+
container.setInterceptBeforeTx(false);
736+
container.start();
737+
try {
738+
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
739+
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
740+
assertThat(order).containsExactly("tx", "interceptor", "success");
741+
container.stop();
742+
latch.set(new CountDownLatch(2));
743+
successCalled.set(new CountDownLatch(1));
744+
container.getContainerProperties().setMessageListener((BatchMessageListener) recs -> {
745+
});
746+
firstOrSecondPoll.set(0);
747+
container.start();
748+
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
749+
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
750+
assertThat(order).containsExactly("tx", "interceptor", "success", "tx", "b.interceptor", "b.success");
751+
}
752+
finally {
753+
container.stop();
754+
}
755+
}
756+
658757
@SuppressWarnings({ "rawtypes", "unchecked" })
659758
@Test
660759
void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {

0 commit comments

Comments
 (0)