Skip to content

Commit 21172de

Browse files
garyrussellartembilan
authored andcommitted
GH-2329: Fix interceptBeforeTx for Non-Kafka TM
Resolves #2329 The flag should apply, regardless of PTM type.
1 parent 72f23e7 commit 21172de

File tree

2 files changed

+109
-8
lines changed

2 files changed

+109
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -644,23 +644,25 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
644644

645645
private final Duration syncCommitTimeout;
646646

647-
private final RecordInterceptor<K, V> recordInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
648-
? getRecordInterceptor()
649-
: null;
647+
private final RecordInterceptor<K, V> recordInterceptor =
648+
!isInterceptBeforeTx() && this.transactionManager != null
649+
? getRecordInterceptor()
650+
: null;
650651

651652
private final RecordInterceptor<K, V> earlyRecordInterceptor =
652-
isInterceptBeforeTx() || this.kafkaTxManager == null
653+
isInterceptBeforeTx() || this.transactionManager == null
653654
? getRecordInterceptor()
654655
: null;
655656

656657
private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
657658

658-
private final BatchInterceptor<K, V> batchInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
659-
? getBatchInterceptor()
660-
: null;
659+
private final BatchInterceptor<K, V> batchInterceptor =
660+
!isInterceptBeforeTx() && this.transactionManager != null
661+
? getBatchInterceptor()
662+
: null;
661663

662664
private final BatchInterceptor<K, V> earlyBatchInterceptor =
663-
isInterceptBeforeTx() || this.kafkaTxManager == null
665+
isInterceptBeforeTx() || this.transactionManager == null
664666
? getBatchInterceptor()
665667
: null;
666668

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
6969
import org.springframework.lang.Nullable;
7070
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
71+
import org.springframework.transaction.PlatformTransactionManager;
7172
import org.springframework.transaction.support.TransactionSynchronizationManager;
7273

7374
/**
@@ -644,6 +645,104 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
644645
}
645646
}
646647

648+
@Test
649+
@SuppressWarnings({ "rawtypes", "unchecked" })
650+
void testInterceptInTxNonKafkaTM() throws InterruptedException {
651+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
652+
final Consumer consumer = mock(Consumer.class);
653+
TopicPartition tp0 = new TopicPartition("foo", 0);
654+
ConsumerRecord record1 = new ConsumerRecord("foo", 0, 0L, "bar", "baz");
655+
ConsumerRecords records = new ConsumerRecords(
656+
Collections.singletonMap(tp0, Collections.singletonList(record1)));
657+
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
658+
AtomicInteger firstOrSecondPoll = new AtomicInteger();
659+
willAnswer(invocation -> {
660+
Thread.sleep(10);
661+
return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty;
662+
}).given(consumer).poll(any());
663+
List<TopicPartition> assignments = Arrays.asList(tp0);
664+
willAnswer(invocation -> {
665+
((ConsumerRebalanceListener) invocation.getArgument(1))
666+
.onPartitionsAssigned(assignments);
667+
return null;
668+
}).given(consumer).subscribe(any(Collection.class), any());
669+
given(consumer.position(any())).willReturn(0L);
670+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
671+
.willReturn(consumer);
672+
ContainerProperties containerProperties = new ContainerProperties("foo");
673+
containerProperties.setGroupId("grp");
674+
AtomicReference<List<ConsumerRecord<String, String>>> received = new AtomicReference<>();
675+
containerProperties.setMessageListener((MessageListener) rec -> {
676+
});
677+
containerProperties.setMissingTopicsFatal(false);
678+
List<String> order = new ArrayList<>();
679+
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(2));
680+
PlatformTransactionManager tm = mock(PlatformTransactionManager.class);
681+
willAnswer(inv -> {
682+
order.add("tx");
683+
latch.get().countDown();
684+
return null;
685+
}).given(tm).getTransaction(any());
686+
containerProperties.setTransactionManager(tm);
687+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
688+
containerProperties);
689+
AtomicReference<CountDownLatch> successCalled = new AtomicReference<>(new CountDownLatch(1));
690+
container.setRecordInterceptor(new RecordInterceptor() {
691+
692+
@Override
693+
@Nullable
694+
public ConsumerRecord intercept(ConsumerRecord rec, Consumer consumer) {
695+
order.add("interceptor");
696+
latch.get().countDown();
697+
return rec;
698+
}
699+
700+
@Override
701+
public void success(ConsumerRecord record, Consumer consumer) {
702+
order.add("success");
703+
successCalled.get().countDown();
704+
}
705+
706+
});
707+
container.setBatchInterceptor(new BatchInterceptor() {
708+
709+
@Override
710+
@Nullable
711+
public ConsumerRecords intercept(ConsumerRecords recs, Consumer consumer) {
712+
order.add("b.interceptor");
713+
latch.get().countDown();
714+
return new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record1)));
715+
}
716+
717+
@Override
718+
public void success(ConsumerRecords records, Consumer consumer) {
719+
order.add("b.success");
720+
successCalled.get().countDown();
721+
}
722+
723+
});
724+
container.setInterceptBeforeTx(false);
725+
container.start();
726+
try {
727+
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
728+
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
729+
assertThat(order).containsExactly("tx", "interceptor", "success");
730+
container.stop();
731+
latch.set(new CountDownLatch(2));
732+
successCalled.set(new CountDownLatch(1));
733+
container.getContainerProperties().setMessageListener((BatchMessageListener) recs -> {
734+
});
735+
firstOrSecondPoll.set(0);
736+
container.start();
737+
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
738+
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
739+
assertThat(order).containsExactly("tx", "interceptor", "success", "tx", "b.interceptor", "b.success");
740+
}
741+
finally {
742+
container.stop();
743+
}
744+
}
745+
647746
@SuppressWarnings({ "rawtypes", "unchecked" })
648747
@Test
649748
void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {

0 commit comments

Comments
 (0)