|
112 | 112 | @EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2,
|
113 | 113 | TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
|
114 | 114 | TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7,
|
115 |
| - TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT }, |
| 115 | + TransactionalContainerTests.topic8, TransactionalContainerTests.topic8DLT, TransactionalContainerTests.topic9}, |
116 | 116 | brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
|
117 | 117 | public class TransactionalContainerTests {
|
118 | 118 |
|
@@ -957,7 +957,7 @@ public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Excepti
|
957 | 957 | DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
958 | 958 | pf.setTransactionIdPrefix("batchListener.noRetries.");
|
959 | 959 | final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
|
960 |
| - final CountDownLatch latch = new CountDownLatch(1); |
| 960 | + final CountDownLatch latch = new CountDownLatch(2); |
961 | 961 | AtomicReference<String> data = new AtomicReference<>();
|
962 | 962 | containerProps.setMessageListener((BatchMessageListener<Integer, String>) recordList -> {
|
963 | 963 | for (ConsumerRecord<Integer, String> record : recordList) {
|
|
0 commit comments