Skip to content

Commit 049aab9

Browse files
committed
doc and test listener transaction nested kafka transaction
* kafka Listener transaction nested kafka transactions does not support `@RetryableTopic`. * add adoc to notice see #2934
1 parent a96e08b commit 049aab9

File tree

3 files changed

+267
-0
lines changed

3 files changed

+267
-0
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,3 +209,67 @@ public static class Config {
209209
}
210210
----
211211

212+
[[kafka-listener-nested-kafka-transactions]]
213+
== Listeners Nested Kafka Transactions
214+
215+
IMPORTANT: xref:retrytopic.adoc[Non-Blocking Retries] are not supported Listeners Nested Kafka transactions.
216+
Listeners enable transaction and Non-Blocking Retries, when listener fails after invoke `KafkaTemplate` Transactions.
217+
`KafkaTemplate` transactions commit success multiple times because of exception catch by Non-Blocking Retries and not threw.
218+
219+
[source, java]
220+
----
221+
public static class TestListener {
222+
223+
@Autowired
224+
KafkaTemplate<String, String> kafkaTemplate;
225+
226+
@RetryableTopic
227+
@KafkaListener(topics = "tests")
228+
void listen(String in) {
229+
kafkaTemplate.send("nested-test-topic", "value");
230+
throw new RuntimeException("from FirstRetryableKafkaListener");
231+
}
232+
233+
}
234+
235+
@EnableKafka
236+
@Configuration
237+
static class Config {
238+
239+
@Bean
240+
public TestListener test() {
241+
return new TestListener();
242+
}
243+
244+
@Bean
245+
public ConsumerFactory<?, ?> consumerFactory() {
246+
return mock(ConsumerFactory.class);
247+
}
248+
249+
@Bean
250+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
251+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
252+
factory.setConsumerFactory(consumerFactory());
253+
ContainerProperties props = factory.getContainerProperties();
254+
KafkaTransactionManager<String, String> tm = new KafkaTransactionManager<>(producerFactory());
255+
props.setTransactionManager(tm);
256+
return factory;
257+
}
258+
259+
@Bean
260+
ProducerFactory<String, String> producerFactory() {
261+
Map<String, Object> configProps = new HashMap<>();
262+
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configProps);
263+
pf.setTransactionIdPrefix("listener.noRetries.");
264+
return pf;
265+
}
266+
267+
@Bean
268+
KafkaTemplate<String, String> kafkaTemplate() {
269+
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
270+
kafkaTemplate.setConsumerFactory(consumerFactory());
271+
return kafkaTemplate;
272+
}
273+
274+
}
275+
----

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTop
99

1010
IMPORTANT: Non-blocking retries are not supported with xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners].
1111

12+
IMPORTANT: Non-blocking retries are not supported xref:kafka/transactions.adoc#kafka-listener-nested-kafka-transactions[Listeners Nested Kafka Transactions].
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.fail;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.apache.kafka.clients.consumer.ConsumerConfig;
28+
import org.apache.kafka.clients.consumer.ConsumerRecord;
29+
import org.apache.kafka.clients.producer.ProducerConfig;
30+
import org.apache.kafka.common.serialization.StringDeserializer;
31+
import org.apache.kafka.common.serialization.StringSerializer;
32+
import org.junit.jupiter.api.DisplayName;
33+
import org.junit.jupiter.api.Test;
34+
35+
import org.springframework.beans.factory.annotation.Autowired;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.kafka.annotation.DltHandler;
39+
import org.springframework.kafka.annotation.EnableKafka;
40+
import org.springframework.kafka.annotation.KafkaListener;
41+
import org.springframework.kafka.annotation.RetryableTopic;
42+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
43+
import org.springframework.kafka.core.ConsumerFactory;
44+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
45+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
46+
import org.springframework.kafka.core.KafkaTemplate;
47+
import org.springframework.kafka.core.ProducerFactory;
48+
import org.springframework.kafka.listener.ContainerProperties;
49+
import org.springframework.kafka.support.KafkaHeaders;
50+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
51+
import org.springframework.kafka.test.context.EmbeddedKafka;
52+
import org.springframework.kafka.transaction.KafkaTransactionManager;
53+
import org.springframework.messaging.handler.annotation.Header;
54+
import org.springframework.stereotype.Component;
55+
import org.springframework.test.annotation.DirtiesContext;
56+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
57+
58+
/**
59+
* {@link org.springframework.kafka.annotation.RetryableTopic} with transaction.
60+
*
61+
* @author Wang Zhiyang
62+
*/
63+
@SpringJUnitConfig
64+
@DirtiesContext
65+
@EmbeddedKafka(topics = { RetryTopicTransactionIntegrationTests.RETRY_TRANSACTION_FIRST_TOPIC,
66+
RetryTopicTransactionIntegrationTests.RETRY_KAFKA_LISTENER_NESTED_TX_FIRST_TOPIC }, partitions = 1,
67+
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
68+
public class RetryTopicTransactionIntegrationTests {
69+
70+
public final static String RETRY_TRANSACTION_FIRST_TOPIC = "retryTopic1";
71+
72+
public final static String RETRY_KAFKA_LISTENER_NESTED_TX_FIRST_TOPIC = "retryNestedTxTopic1";
73+
74+
@Autowired
75+
private KafkaTemplate<String, String> kafkaTemplate;
76+
77+
@Autowired
78+
private CountDownLatchContainer latchContainer;
79+
80+
@Test
81+
@DisplayName("retry topic not support kafka listener nested kafka transactions")
82+
void shouldRetryableTopicWithKafkaListenerNestedKafkaTransactions() {
83+
kafkaTemplate.executeInTransaction(t ->
84+
kafkaTemplate.send(RETRY_TRANSACTION_FIRST_TOPIC, "Testing topic 1")
85+
);
86+
assertThat(awaitLatch(latchContainer.countDownLatchFirstRetryable)).isTrue();
87+
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
88+
ConsumerRecord<String, String> consumerRecord =
89+
kafkaTemplate.receive(RETRY_KAFKA_LISTENER_NESTED_TX_FIRST_TOPIC, 0, 4);
90+
assertThat(consumerRecord).isNotNull();
91+
assertThat(consumerRecord.value()).isEqualTo("m-3");
92+
assertThat(consumerRecord.offset()).isEqualTo(4);
93+
}
94+
95+
private boolean awaitLatch(CountDownLatch latch) {
96+
try {
97+
return latch.await(150, TimeUnit.SECONDS);
98+
}
99+
catch (Exception e) {
100+
fail(e.getMessage());
101+
throw new RuntimeException(e);
102+
}
103+
}
104+
105+
@Component
106+
static class FirstRetryableKafkaListener {
107+
108+
@Autowired
109+
CountDownLatchContainer countDownLatchContainer;
110+
111+
@Autowired
112+
KafkaTemplate<String, String> kafkaTemplate;
113+
114+
static int SEND_MESSAGE_COUNT = 0;
115+
116+
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
117+
@KafkaListener(topics = RetryTopicTransactionIntegrationTests.RETRY_TRANSACTION_FIRST_TOPIC)
118+
void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
119+
countDownLatchContainer.countDownLatchFirstRetryable.countDown();
120+
kafkaTemplate.send(RETRY_KAFKA_LISTENER_NESTED_TX_FIRST_TOPIC, "m-" + ++SEND_MESSAGE_COUNT);
121+
throw new RuntimeException("from FirstRetryableKafkaListener");
122+
}
123+
124+
@DltHandler
125+
void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
126+
countDownLatchContainer.countDownLatchDltOne.countDown();
127+
}
128+
}
129+
130+
@Component
131+
static class CountDownLatchContainer {
132+
133+
CountDownLatch countDownLatchFirstRetryable = new CountDownLatch(3);
134+
CountDownLatch countDownLatchDltOne = new CountDownLatch(1);
135+
136+
}
137+
138+
@EnableKafka
139+
@Configuration
140+
static class Config {
141+
142+
@Autowired
143+
EmbeddedKafkaBroker broker;
144+
145+
@Bean
146+
CountDownLatchContainer latchContainer() {
147+
return new CountDownLatchContainer();
148+
}
149+
150+
@Bean
151+
FirstRetryableKafkaListener firstRetryableKafkaListener() {
152+
return new FirstRetryableKafkaListener();
153+
}
154+
155+
@Bean
156+
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
157+
ConsumerFactory<String, String> consumerFactory) {
158+
159+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
160+
new ConcurrentKafkaListenerContainerFactory<>();
161+
factory.setConsumerFactory(consumerFactory);
162+
ContainerProperties props = factory.getContainerProperties();
163+
props.setPollTimeout(50L);
164+
KafkaTransactionManager<String, String> tm = new KafkaTransactionManager<>(producerFactory());
165+
props.setTransactionManager(tm);
166+
factory.setConcurrency(1);
167+
return factory;
168+
}
169+
170+
@Bean
171+
ProducerFactory<String, String> producerFactory() {
172+
Map<String, Object> configProps = new HashMap<>();
173+
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString());
174+
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
175+
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
176+
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configProps);
177+
pf.setTransactionIdPrefix("listener.noRetries.");
178+
return pf;
179+
}
180+
181+
@Bean
182+
KafkaTemplate<String, String> kafkaTemplate() {
183+
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
184+
kafkaTemplate.setConsumerFactory(consumerFactory());
185+
return kafkaTemplate;
186+
}
187+
188+
@Bean
189+
ConsumerFactory<String, String> consumerFactory() {
190+
Map<String, Object> props = new HashMap<>();
191+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString());
192+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
193+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
194+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
195+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
196+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
197+
return new DefaultKafkaConsumerFactory<>(props);
198+
}
199+
200+
}
201+
202+
}

0 commit comments

Comments
 (0)