1
1
/*
2
- * Copyright 2016-2023 the original author or authors.
2
+ * Copyright 2016-2024 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
125
125
import org .springframework .kafka .test .context .EmbeddedKafka ;
126
126
import org .springframework .kafka .test .utils .ContainerTestUtils ;
127
127
import org .springframework .kafka .test .utils .KafkaTestUtils ;
128
+ import org .springframework .lang .NonNull ;
128
129
import org .springframework .lang .Nullable ;
129
130
import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
130
131
import org .springframework .transaction .PlatformTransactionManager ;
140
141
* @author Lukasz Kaminski
141
142
* @author Ray Chuan Tay
142
143
* @author Daniel Gentes
144
+ * @author Soby Chacko
143
145
*/
144
146
@ EmbeddedKafka (topics = { KafkaMessageListenerContainerTests .topic1 , KafkaMessageListenerContainerTests .topic2 ,
145
147
KafkaMessageListenerContainerTests .topic3 , KafkaMessageListenerContainerTests .topic4 ,
@@ -235,8 +237,7 @@ public void testDelegateType() throws Exception {
235
237
container .setBeanName ("delegate" );
236
238
AtomicReference <List <TopicPartitionOffset >> offsets = new AtomicReference <>();
237
239
container .setApplicationEventPublisher (e -> {
238
- if (e instanceof ConsumerStoppingEvent ) {
239
- ConsumerStoppingEvent event = (ConsumerStoppingEvent ) e ;
240
+ if (e instanceof ConsumerStoppingEvent event ) {
240
241
offsets .set (event .getPartitions ().stream ()
241
242
.map (p -> new TopicPartitionOffset (p .topic (), p .partition (),
242
243
event .getConsumer ().position (p , Duration .ofMillis (10_000 ))))
@@ -929,7 +930,7 @@ public void testRecordAckAfterStop() throws Exception {
929
930
Consumer <Integer , String > consumer = mock (Consumer .class );
930
931
given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
931
932
final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
932
- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
933
+ records .put (new TopicPartition ("foo" , 0 ), List . of (
933
934
new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
934
935
ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
935
936
given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
@@ -1344,7 +1345,6 @@ else if (entry.getValue().offset() == 2) {
1344
1345
logger .info ("Stop batch listener manual" );
1345
1346
}
1346
1347
1347
- @ SuppressWarnings ("deprecation" )
1348
1348
@ Test
1349
1349
public void testBatchListenerErrors () throws Exception {
1350
1350
logger .info ("Start batch listener errors" );
@@ -1417,7 +1417,7 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
1417
1417
logger .info ("Stop batch listener errors" );
1418
1418
}
1419
1419
1420
- @ SuppressWarnings ({ "unchecked" , "deprecation" })
1420
+ @ SuppressWarnings ({ "unchecked" })
1421
1421
@ Test
1422
1422
public void testBatchListenerAckAfterRecoveryMock () throws Exception {
1423
1423
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -1680,7 +1680,7 @@ public void testDefinedPartitions() throws Exception {
1680
1680
@ Override
1681
1681
protected KafkaConsumer <Integer , String > createKafkaConsumer (Map <String , Object > configs ) {
1682
1682
assertThat (configs ).containsKey (ConsumerConfig .MAX_POLL_RECORDS_CONFIG );
1683
- return new KafkaConsumer <Integer , String >(props ) {
1683
+ return new KafkaConsumer <>(props ) {
1684
1684
1685
1685
@ Override
1686
1686
public ConsumerRecords <Integer , String > poll (Duration timeout ) {
@@ -2281,10 +2281,8 @@ public void testStaticAssign() throws Exception {
2281
2281
Map <String , Object > props = KafkaTestUtils .consumerProps ("testStatic" , "false" , embeddedKafka );
2282
2282
2283
2283
DefaultKafkaConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(props );
2284
- ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset [] {
2285
- new TopicPartitionOffset (topic22 , 0 ),
2286
- new TopicPartitionOffset (topic22 , 1 )
2287
- });
2284
+ ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset (topic22 , 0 ),
2285
+ new TopicPartitionOffset (topic22 , 1 ));
2288
2286
final CountDownLatch latch = new CountDownLatch (1 );
2289
2287
final List <ConsumerRecord <Integer , String >> received = new ArrayList <>();
2290
2288
containerProps .setMessageListener ((MessageListener <Integer , String >) record -> {
@@ -2362,15 +2360,15 @@ public void testBadListenerType() {
2362
2360
containerProps .setMissingTopicsFatal (false );
2363
2361
KafkaMessageListenerContainer <Integer , Foo1 > badContainer =
2364
2362
new KafkaMessageListenerContainer <>(cf , containerProps );
2365
- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2363
+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
2366
2364
.withMessageContaining ("implementation must be provided" );
2367
2365
badContainer .setupMessageListener ((GenericMessageListener <String >) data -> {
2368
2366
});
2369
2367
assertThat (badContainer .getAssignedPartitions ()).isNull ();
2370
2368
badContainer .pause ();
2371
2369
assertThat (badContainer .isContainerPaused ()).isFalse ();
2372
2370
assertThat (badContainer .metrics ()).isEqualTo (Collections .emptyMap ());
2373
- assertThatIllegalArgumentException ().isThrownBy (() -> badContainer . start () )
2371
+ assertThatIllegalArgumentException ().isThrownBy (badContainer :: start )
2374
2372
.withMessageContaining ("Listener must be" );
2375
2373
assertThat (badContainer .toString ()).contains ("none assigned" );
2376
2374
@@ -2387,7 +2385,7 @@ public void testBadAckMode() {
2387
2385
new KafkaMessageListenerContainer <>(cf , containerProps );
2388
2386
badContainer .setupMessageListener ((MessageListener <String , String >) m -> {
2389
2387
});
2390
- assertThatIllegalStateException ().isThrownBy (() -> badContainer . start () )
2388
+ assertThatIllegalStateException ().isThrownBy (badContainer :: start )
2391
2389
.withMessageContaining ("Consumer cannot be configured for auto commit for ackMode" );
2392
2390
2393
2391
}
@@ -2566,14 +2564,16 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
2566
2564
public void onMessage (ConsumerRecord <String , String > data ) {
2567
2565
if (data .partition () == 0 && data .offset () == 0 ) {
2568
2566
TopicPartition topicPartition = new TopicPartition (data .topic (), data .partition ());
2569
- getSeekCallbackFor (topicPartition ).seekToBeginning (records .keySet ());
2567
+ final ConsumerSeekCallback seekCallbackFor = getSeekCallbackFor (topicPartition );
2568
+ assertThat (seekCallbackFor ).isNotNull ();
2569
+ seekCallbackFor .seekToBeginning (records .keySet ());
2570
2570
Iterator <TopicPartition > iterator = records .keySet ().iterator ();
2571
- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2572
- getSeekCallbackFor ( topicPartition ) .seekToBeginning (Collections .singletonList (iterator .next ()));
2573
- getSeekCallbackFor ( topicPartition ) .seekToEnd (records .keySet ());
2571
+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2572
+ seekCallbackFor .seekToBeginning (Collections .singletonList (iterator .next ()));
2573
+ seekCallbackFor .seekToEnd (records .keySet ());
2574
2574
iterator = records .keySet ().iterator ();
2575
- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2576
- getSeekCallbackFor ( topicPartition ) .seekToEnd (Collections .singletonList (iterator .next ()));
2575
+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
2576
+ seekCallbackFor .seekToEnd (Collections .singletonList (iterator .next ()));
2577
2577
}
2578
2578
}
2579
2579
@@ -2679,7 +2679,7 @@ public void dontResumePausedPartition() throws Exception {
2679
2679
containerProps .setAckMode (AckMode .RECORD );
2680
2680
containerProps .setClientId ("clientId" );
2681
2681
containerProps .setIdleEventInterval (100L );
2682
- containerProps .setMessageListener ((MessageListener ) rec -> { });
2682
+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
2683
2683
containerProps .setMissingTopicsFatal (false );
2684
2684
KafkaMessageListenerContainer <Integer , String > container =
2685
2685
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2746,7 +2746,7 @@ public void rePausePartitionAfterRebalance() throws Exception {
2746
2746
containerProps .setAckMode (AckMode .RECORD );
2747
2747
containerProps .setClientId ("clientId" );
2748
2748
containerProps .setIdleEventInterval (100L );
2749
- containerProps .setMessageListener ((MessageListener ) rec -> { });
2749
+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
2750
2750
containerProps .setMissingTopicsFatal (false );
2751
2751
KafkaMessageListenerContainer <Integer , String > container =
2752
2752
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2828,7 +2828,7 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
2828
2828
containerProps .setAckMode (AckMode .RECORD );
2829
2829
containerProps .setClientId ("clientId" );
2830
2830
containerProps .setIdleEventInterval (100L );
2831
- containerProps .setMessageListener ((MessageListener ) rec -> { });
2831
+ containerProps .setMessageListener ((MessageListener <?, ?> ) rec -> { });
2832
2832
containerProps .setMissingTopicsFatal (false );
2833
2833
KafkaMessageListenerContainer <Integer , String > container =
2834
2834
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -2956,7 +2956,7 @@ public void testIdleEarlyExit() throws Exception {
2956
2956
container .start ();
2957
2957
assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
2958
2958
new DirectFieldAccessor (container ).setPropertyValue ("listenerConsumer.assignedPartitions" ,
2959
- Arrays . asList (new TopicPartition ("foo" , 0 )));
2959
+ List . of (new TopicPartition ("foo" , 0 )));
2960
2960
Thread .sleep (500 );
2961
2961
long t1 = System .currentTimeMillis ();
2962
2962
container .stop ();
@@ -3061,16 +3061,12 @@ public void testAckModeCount() throws Exception {
3061
3061
given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
3062
3062
Thread .sleep (50 );
3063
3063
int recordsToUse = which .incrementAndGet ();
3064
- switch (recordsToUse ) {
3065
- case 1 :
3066
- return consumerRecords1 ;
3067
- case 2 :
3068
- return consumerRecords2 ;
3069
- case 3 :
3070
- return consumerRecords3 ;
3071
- default :
3072
- return emptyRecords ;
3073
- }
3064
+ return switch (recordsToUse ) {
3065
+ case 1 -> consumerRecords1 ;
3066
+ case 2 -> consumerRecords2 ;
3067
+ case 3 -> consumerRecords3 ;
3068
+ default -> emptyRecords ;
3069
+ };
3074
3070
});
3075
3071
final CountDownLatch commitLatch = new CountDownLatch (3 );
3076
3072
willAnswer (i -> {
@@ -3108,7 +3104,7 @@ public void testAckModeCount() throws Exception {
3108
3104
container .stop ();
3109
3105
}
3110
3106
3111
- @ SuppressWarnings ({ "unchecked" , "rawtypes" , "deprecation" })
3107
+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
3112
3108
@ Test
3113
3109
public void testCommitErrorHandlerCalled () throws Exception {
3114
3110
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3436,7 +3432,7 @@ public void testCooperativeRebalance() throws Exception {
3436
3432
ContainerProperties containerProps = new ContainerProperties ("foo" );
3437
3433
containerProps .setGroupId ("grp" );
3438
3434
containerProps .setClientId ("clientId" );
3439
- containerProps .setMessageListener ((MessageListener ) msg -> { });
3435
+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
3440
3436
Properties consumerProps = new Properties ();
3441
3437
KafkaMessageListenerContainer <Integer , String > container =
3442
3438
new KafkaMessageListenerContainer <>(cf , containerProps );
@@ -3468,7 +3464,7 @@ void testCommitRebalanceInProgressRecord() throws Exception {
3468
3464
assertThat (commits .get (5 )).hasSize (2 ); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
3469
3465
assertThat (commits .get (5 ).get (new TopicPartition ("foo" , 1 )))
3470
3466
.isNotNull ()
3471
- .extracting (om -> om . offset () )
3467
+ .extracting (OffsetAndMetadata :: offset )
3472
3468
.isEqualTo (2L );
3473
3469
});
3474
3470
}
@@ -3528,7 +3524,7 @@ else if (call == 1) {
3528
3524
containerProps .setAckMode (ackMode );
3529
3525
containerProps .setClientId ("clientId" );
3530
3526
containerProps .setIdleEventInterval (100L );
3531
- containerProps .setMessageListener ((MessageListener ) msg -> { });
3527
+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> { });
3532
3528
Properties consumerProps = new Properties ();
3533
3529
containerProps .setKafkaConsumerProperties (consumerProps );
3534
3530
KafkaMessageListenerContainer <Integer , String > container =
@@ -3539,7 +3535,7 @@ else if (call == 1) {
3539
3535
verifier .accept (commits );
3540
3536
}
3541
3537
3542
- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
3538
+ @ SuppressWarnings ({ "unchecked" })
3543
3539
@ Test
3544
3540
void testCommitFailsOnRevoke () throws Exception {
3545
3541
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
@@ -3672,7 +3668,7 @@ void commitAfterHandleManual() throws InterruptedException {
3672
3668
cfProps .put (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG , 45000 ); // wins
3673
3669
given (cf .getConfigurationProperties ()).willReturn (cfProps );
3674
3670
final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
3675
- records .put (new TopicPartition ("foo" , 0 ), Arrays . asList (
3671
+ records .put (new TopicPartition ("foo" , 0 ), List . of (
3676
3672
new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" )));
3677
3673
ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
3678
3674
ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
@@ -3755,7 +3751,7 @@ void stopImmediately() throws InterruptedException {
3755
3751
}
3756
3752
3757
3753
@ Test
3758
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3754
+ @ SuppressWarnings ({"unchecked" })
3759
3755
public void testInvokeRecordInterceptorSuccess () throws Exception {
3760
3756
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3761
3757
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3797,7 +3793,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
3797
3793
RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
3798
3794
3799
3795
@ Override
3800
- @ Nullable
3796
+ @ NonNull
3801
3797
public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
3802
3798
Consumer <Integer , String > consumer ) {
3803
3799
@@ -3843,7 +3839,7 @@ private static Stream<Arguments> paramsForRecordAllSkipped() {
3843
3839
3844
3840
@ ParameterizedTest (name = "{index} testInvokeRecordInterceptorAllSkipped AckMode.{0} early intercept {1}" )
3845
3841
@ MethodSource ("paramsForRecordAllSkipped" )
3846
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3842
+ @ SuppressWarnings ({"unchecked" })
3847
3843
public void testInvokeRecordInterceptorAllSkipped (AckMode ackMode , boolean early ) throws Exception {
3848
3844
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3849
3845
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3870,7 +3866,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
3870
3866
containerProps .setGroupId ("grp" );
3871
3867
containerProps .setAckMode (ackMode );
3872
3868
3873
- containerProps .setMessageListener ((MessageListener ) msg -> {
3869
+ containerProps .setMessageListener ((MessageListener <?, ?> ) msg -> {
3874
3870
});
3875
3871
containerProps .setClientId ("clientId" );
3876
3872
@@ -3913,7 +3909,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
3913
3909
3914
3910
@ ParameterizedTest (name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}" )
3915
3911
@ ValueSource (booleans = { true , false })
3916
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3912
+ @ SuppressWarnings ({"unchecked" })
3917
3913
public void testInvokeBatchInterceptorAllSkipped (boolean early ) throws Exception {
3918
3914
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3919
3915
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -3940,7 +3936,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
3940
3936
containerProps .setGroupId ("grp" );
3941
3937
containerProps .setAckMode (AckMode .BATCH );
3942
3938
3943
- containerProps .setMessageListener ((BatchMessageListener ) msgs -> {
3939
+ containerProps .setMessageListener ((BatchMessageListener <?, ?> ) msgs -> {
3944
3940
});
3945
3941
containerProps .setClientId ("clientId" );
3946
3942
if (!early ) {
@@ -3976,7 +3972,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
3976
3972
}
3977
3973
3978
3974
@ Test
3979
- @ SuppressWarnings ({"unchecked" , "deprecation" })
3975
+ @ SuppressWarnings ({"unchecked" })
3980
3976
public void testInvokeRecordInterceptorFailure () throws Exception {
3981
3977
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
3982
3978
Consumer <Integer , String > consumer = mock (Consumer .class );
@@ -4016,7 +4012,7 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
4016
4012
RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
4017
4013
4018
4014
@ Override
4019
- @ Nullable
4015
+ @ NonNull
4020
4016
public ConsumerRecord <Integer , String > intercept (ConsumerRecord <Integer , String > record ,
4021
4017
Consumer <Integer , String > consumer ) {
4022
4018
0 commit comments