Skip to content

Commit 60e3f23

Browse files
authored
Deprecation Code Cleanup
* Because `transactionManager` property is deprecated in ContainerProperties via a previous commit, there are some test variants that we may not need any longer. Cleaning them up and updating the tests where we were still using the deprecated transaction manager.
1 parent 83d9a31 commit 60e3f23

File tree

4 files changed

+7
-136
lines changed

4 files changed

+7
-136
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 1 addition & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@
7272
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
7373
import org.springframework.lang.Nullable;
7474
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
75-
import org.springframework.transaction.PlatformTransactionManager;
7675
import org.springframework.transaction.support.TransactionSynchronizationManager;
7776

7877
/**
7978
* @author Gary Russell
8079
* @author Wang Zhiyang
80+
* @author Soby Chacko
8181
*
8282
* @since 2.2.4
8383
*
@@ -662,103 +662,6 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
662662
}
663663
}
664664

665-
@Test
666-
@SuppressWarnings({ "rawtypes", "unchecked" })
667-
void testInterceptInTxNonKafkaTM() throws InterruptedException {
668-
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
669-
final Consumer consumer = mock(Consumer.class);
670-
TopicPartition tp0 = new TopicPartition("foo", 0);
671-
ConsumerRecord record1 = new ConsumerRecord("foo", 0, 0L, "bar", "baz");
672-
ConsumerRecords records = new ConsumerRecords(
673-
Collections.singletonMap(tp0, Collections.singletonList(record1)));
674-
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
675-
AtomicInteger firstOrSecondPoll = new AtomicInteger();
676-
willAnswer(invocation -> {
677-
Thread.sleep(10);
678-
return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty;
679-
}).given(consumer).poll(any());
680-
List<TopicPartition> assignments = List.of(tp0);
681-
willAnswer(invocation -> {
682-
((ConsumerRebalanceListener) invocation.getArgument(1))
683-
.onPartitionsAssigned(assignments);
684-
return null;
685-
}).given(consumer).subscribe(any(Collection.class), any());
686-
given(consumer.position(any())).willReturn(0L);
687-
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
688-
.willReturn(consumer);
689-
ContainerProperties containerProperties = new ContainerProperties("foo");
690-
containerProperties.setGroupId("grp");
691-
containerProperties.setMessageListener((MessageListener) rec -> {
692-
});
693-
containerProperties.setMissingTopicsFatal(false);
694-
List<String> order = new ArrayList<>();
695-
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(2));
696-
PlatformTransactionManager tm = mock(PlatformTransactionManager.class);
697-
willAnswer(inv -> {
698-
order.add("tx");
699-
latch.get().countDown();
700-
return null;
701-
}).given(tm).getTransaction(any());
702-
containerProperties.setTransactionManager(tm);
703-
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
704-
containerProperties);
705-
AtomicReference<CountDownLatch> successCalled = new AtomicReference<>(new CountDownLatch(1));
706-
container.setRecordInterceptor(new RecordInterceptor() {
707-
708-
@Override
709-
@Nullable
710-
public ConsumerRecord intercept(ConsumerRecord rec, Consumer consumer) {
711-
order.add("interceptor");
712-
latch.get().countDown();
713-
return rec;
714-
}
715-
716-
@Override
717-
public void success(ConsumerRecord record, Consumer consumer) {
718-
order.add("success");
719-
successCalled.get().countDown();
720-
}
721-
722-
});
723-
container.setBatchInterceptor(new BatchInterceptor() {
724-
725-
@Override
726-
@Nullable
727-
public ConsumerRecords intercept(ConsumerRecords recs, Consumer consumer) {
728-
order.add("b.interceptor");
729-
latch.get().countDown();
730-
return new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record1)));
731-
}
732-
733-
@Override
734-
public void success(ConsumerRecords records, Consumer consumer) {
735-
order.add("b.success");
736-
successCalled.get().countDown();
737-
}
738-
739-
});
740-
container.setInterceptBeforeTx(false);
741-
container.start();
742-
try {
743-
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
744-
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
745-
assertThat(order).containsExactly("tx", "interceptor", "success");
746-
container.stop();
747-
latch.set(new CountDownLatch(2));
748-
successCalled.set(new CountDownLatch(1));
749-
container.getContainerProperties().setMessageListener((BatchMessageListener) recs -> {
750-
});
751-
firstOrSecondPoll.set(0);
752-
container.start();
753-
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
754-
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
755-
assertThat(order).containsExactly("tx", "interceptor", "success", "tx", "b.interceptor", "b.success");
756-
}
757-
finally {
758-
container.stop();
759-
}
760-
}
761-
762665
@SuppressWarnings({ "rawtypes", "unchecked" })
763666
@Test
764667
void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedBatchProcessorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ records, mock(Consumer.class), mock(MessageListenerContainer.class), mock(Runnab
102102
assertThat(output).contains("Record not found in batch: topic-42@123;");
103103
}
104104

105+
@SuppressWarnings({ "rawtypes", "unchecked" })
105106
@Test
106107
void testExceptionDuringCommit() {
107108
CommonErrorHandler mockEH = mock(CommonErrorHandler.class);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,10 @@
125125
import org.springframework.kafka.test.context.EmbeddedKafka;
126126
import org.springframework.kafka.test.utils.ContainerTestUtils;
127127
import org.springframework.kafka.test.utils.KafkaTestUtils;
128+
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
128129
import org.springframework.lang.NonNull;
129130
import org.springframework.lang.Nullable;
130131
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
131-
import org.springframework.transaction.PlatformTransactionManager;
132132
import org.springframework.util.backoff.FixedBackOff;
133133

134134
/**
@@ -3899,7 +3899,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
38993899
});
39003900
containerProps.setClientId("clientId");
39013901
if (early) {
3902-
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
3902+
containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class));
39033903
}
39043904

39053905
RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {
@@ -3982,7 +3982,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
39823982
});
39833983
containerProps.setClientId("clientId");
39843984
if (early) {
3985-
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
3985+
containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class));
39863986
}
39873987

39883988
BatchInterceptor<Integer, String> interceptor = spy(new BatchInterceptor<Integer, String>() {

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -95,16 +95,14 @@
9595
import org.springframework.kafka.transaction.KafkaTransactionManager;
9696
import org.springframework.messaging.MessageHeaders;
9797
import org.springframework.transaction.TransactionDefinition;
98-
import org.springframework.transaction.TransactionException;
99-
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
10098
import org.springframework.transaction.support.DefaultTransactionDefinition;
101-
import org.springframework.transaction.support.DefaultTransactionStatus;
10299
import org.springframework.util.backoff.FixedBackOff;
103100

104101
/**
105102
* @author Gary Russell
106103
* @author Artem Bilan
107104
* @author Wang Zhiyang
105+
* @author Soby Chacko
108106
*
109107
* @since 1.3
110108
*
@@ -462,7 +460,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
462460
given(pf.createProducer(isNull())).willReturn(producer);
463461
ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0));
464462
props.setGroupId("group");
465-
props.setTransactionManager(new SomeOtherTransactionManager());
463+
props.setKafkaAwareTransactionManager(new KafkaTransactionManager<>(pf));
466464
final KafkaTemplate template = new KafkaTemplate(pf);
467465
ConsumerGroupMetadata meta = mock(ConsumerGroupMetadata.class);
468466
props.setMessageListener((MessageListener<String, String>) m -> {
@@ -595,11 +593,6 @@ public void testFixLagKTM() throws InterruptedException {
595593
testFixLagGuts(topic6, 1);
596594
}
597595

598-
@Test
599-
public void testFixLagOtherTM() throws InterruptedException {
600-
testFixLagGuts(topic7, 2);
601-
}
602-
603596
@SuppressWarnings({"unchecked"})
604597
private void testFixLagGuts(String topic, int whichTm) throws InterruptedException {
605598
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka);
@@ -619,8 +612,6 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
619612
case 1:
620613
containerProps.setKafkaAwareTransactionManager(new KafkaTransactionManager<>(pf));
621614
break;
622-
case 2:
623-
containerProps.setTransactionManager(new SomeOtherTransactionManager());
624615
}
625616

626617
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
@@ -1082,28 +1073,4 @@ void testNoAfterRollbackWhenFenced() throws Exception {
10821073
assertThatIllegalStateException().isThrownBy(container::start);
10831074
}
10841075

1085-
public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {
1086-
1087-
@Override
1088-
protected Object doGetTransaction() throws TransactionException {
1089-
return new Object();
1090-
}
1091-
1092-
@Override
1093-
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
1094-
//noop
1095-
}
1096-
1097-
@Override
1098-
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
1099-
//noop
1100-
}
1101-
1102-
@Override
1103-
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
1104-
//noop
1105-
}
1106-
1107-
}
1108-
11091076
}

0 commit comments

Comments
 (0)