110
110
@ EmbeddedKafka (topics = { TransactionalContainerTests .topic1 , TransactionalContainerTests .topic2 ,
111
111
TransactionalContainerTests .topic3 , TransactionalContainerTests .topic3DLT , TransactionalContainerTests .topic4 ,
112
112
TransactionalContainerTests .topic5 , TransactionalContainerTests .topic6 , TransactionalContainerTests .topic7 ,
113
- TransactionalContainerTests .topic8 , TransactionalContainerTests .topic8DLT },
113
+ TransactionalContainerTests .topic8 , TransactionalContainerTests .topic8DLT , TransactionalContainerTests . topic9 },
114
114
brokerProperties = { "transaction.state.log.replication.factor=1" , "transaction.state.log.min.isr=1" })
115
115
public class TransactionalContainerTests {
116
116
@@ -936,7 +936,8 @@ public void testRollbackProcessorCrash() throws Exception {
936
936
937
937
@ Test
938
938
public void testBatchListenerRecoverAfterRollbackProcessorCrash () throws Exception {
939
- Map <String , Object > props = KafkaTestUtils .consumerProps ("testBatchListenerRollbackNoRetries" , "false" , embeddedKafka );
939
+ String group = "testBatchListenerRollbackNoRetries" ;
940
+ Map <String , Object > props = KafkaTestUtils .consumerProps (group , "false" , embeddedKafka );
940
941
props .put (ConsumerConfig .ISOLATION_LEVEL_CONFIG , "read_committed" );
941
942
props .put (ConsumerConfig .MAX_POLL_RECORDS_CONFIG , 2 );
942
943
DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
@@ -949,23 +950,23 @@ public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Excepti
949
950
pf .setTransactionIdPrefix ("batchListener.noRetries." );
950
951
final KafkaTemplate <Object , Object > template = new KafkaTemplate <>(pf );
951
952
final CountDownLatch latch = new CountDownLatch (1 );
952
- AtomicReference <String > data = new AtomicReference <>();
953
+ AtomicReference <List < ConsumerRecord < Integer , String >> > data = new AtomicReference <>();
953
954
containerProps .setMessageListener ((BatchMessageListener <Integer , String >) recordList -> {
954
955
for (ConsumerRecord <Integer , String > record : recordList ) {
955
- data .set (record .value ());
956
956
if (record .offset () == 0 ) {
957
957
throw new BatchListenerFailedException ("fail for no retry" , record );
958
958
}
959
- latch .countDown ();
960
959
}
960
+ data .set (recordList );
961
+ latch .countDown ();
961
962
});
962
963
963
964
KafkaTransactionManager <Object , Object > tm = new KafkaTransactionManager <>(pf );
964
965
containerProps .setKafkaAwareTransactionManager (tm );
965
966
KafkaMessageListenerContainer <Integer , String > container =
966
967
new KafkaMessageListenerContainer <>(cf , containerProps );
967
968
container .setBeanName ("testBatchListenerRollbackNoRetries" );
968
- final KafkaOperations <Object , Object > dlTemplate = spy ( new KafkaTemplate <>(pf ) );
969
+ final KafkaOperations <Object , Object > dlTemplate = new KafkaTemplate <>(pf );
969
970
AtomicBoolean recovererShouldFail = new AtomicBoolean (true );
970
971
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer (dlTemplate ) {
971
972
@ Override
@@ -977,7 +978,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
977
978
978
979
};
979
980
DefaultAfterRollbackProcessor <Object , Object > afterRollbackProcessor =
980
- spy ( new DefaultAfterRollbackProcessor <>(recoverer , new FixedBackOff (0L , 0L ), dlTemplate , true ) );
981
+ new DefaultAfterRollbackProcessor <>(recoverer , new FixedBackOff (0L , 0L ), dlTemplate , true );
981
982
container .setAfterRollbackProcessor (afterRollbackProcessor );
982
983
final CountDownLatch stopLatch = new CountDownLatch (1 );
983
984
container .setApplicationEventPublisher (e -> {
@@ -997,8 +998,16 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
997
998
template .sendDefault (0 , 0 , "qux" );
998
999
return null ;
999
1000
});
1001
+
1000
1002
assertThat (latch .await (60 , TimeUnit .SECONDS )).isTrue ();
1001
- assertThat (data .get ()).isEqualTo ("qux" );
1003
+ assertThat (data .get ()).isNotNull ();
1004
+ ConsumerRecord <Integer , String > crBaz = data .get ().get (0 );
1005
+ ConsumerRecord <Integer , String > crQux = data .get ().get (1 );
1006
+ assertThat (crBaz .offset ()).isEqualTo (2L );
1007
+ assertThat (crBaz .value ()).isEqualTo ("baz" );
1008
+ assertThat (crQux .offset ()).isEqualTo (3L );
1009
+ assertThat (crQux .value ()).isEqualTo ("qux" );
1010
+
1002
1011
container .stop ();
1003
1012
pf .destroy ();
1004
1013
assertThat (stopLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
0 commit comments