@@ -2768,7 +2768,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
2768
2768
rebal .get ().onPartitionsAssigned (Set .of (tp0 , tp1 ));
2769
2769
return null ;
2770
2770
}).given (consumer ).subscribe (eq (foos ), any (ConsumerRebalanceListener .class ));
2771
- final CountDownLatch resumeLatch = new CountDownLatch (1 );
2772
2771
ContainerProperties containerProps = new ContainerProperties ("foo" );
2773
2772
containerProps .setGroupId ("grp" );
2774
2773
containerProps .setAckMode (AckMode .RECORD );
@@ -2779,7 +2778,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
2779
2778
KafkaMessageListenerContainer <Integer , String > container =
2780
2779
new KafkaMessageListenerContainer <>(cf , containerProps );
2781
2780
container .start ();
2782
- InOrder inOrder = inOrder (consumer );
2783
2781
assertThat (firstPoll .await (10 , TimeUnit .SECONDS )).isNotNull ();
2784
2782
container .pausePartition (tp0 );
2785
2783
container .pausePartition (tp1 );
@@ -2810,7 +2808,6 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
2810
2808
ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
2811
2809
Consumer <Integer , String > consumer = mock (Consumer .class );
2812
2810
given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
2813
- AtomicBoolean first = new AtomicBoolean (true );
2814
2811
TopicPartition tp0 = new TopicPartition ("foo" , 0 );
2815
2812
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
2816
2813
given (consumer .assignment ()).willReturn (Set .of (tp0 , tp1 ));
@@ -3466,6 +3463,7 @@ public void testCooperativeRebalance() throws Exception {
3466
3463
containerProps .setClientId ("clientId" );
3467
3464
containerProps .setMessageListener ((MessageListener ) msg -> { });
3468
3465
Properties consumerProps = new Properties ();
3466
+ containerProps .setMessageListener ((MessageListener <?, ?>) msg -> { });
3469
3467
KafkaMessageListenerContainer <Integer , String > container =
3470
3468
new KafkaMessageListenerContainer <>(cf , containerProps );
3471
3469
container .start ();
@@ -3609,7 +3607,6 @@ else if (call == 1) {
3609
3607
}).given (consumer ).subscribe (any (Collection .class ), any (ConsumerRebalanceListener .class ));
3610
3608
List <Map <TopicPartition , OffsetAndMetadata >> commits = new ArrayList <>();
3611
3609
AtomicBoolean firstCommit = new AtomicBoolean (true );
3612
- AtomicInteger commitCount = new AtomicInteger ();
3613
3610
willAnswer (invoc -> {
3614
3611
commits .add (invoc .getArgument (0 , Map .class ));
3615
3612
if (!firstCommit .getAndSet (false )) {
@@ -3891,6 +3888,11 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
3891
3888
latch .countDown ();
3892
3889
return null ;
3893
3890
}).given (consumer ).commitSync (any (), any ());
3891
+ CountDownLatch closeLatch = new CountDownLatch (1 );
3892
+ willAnswer (inv -> {
3893
+ closeLatch .countDown ();
3894
+ return null ;
3895
+ }).given (consumer ).close ();
3894
3896
TopicPartitionOffset [] topicPartition = new TopicPartitionOffset [] {
3895
3897
new TopicPartitionOffset ("foo" , 0 ) };
3896
3898
@@ -3905,6 +3907,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
3905
3907
containerProps .setTransactionManager (mock (PlatformTransactionManager .class ));
3906
3908
}
3907
3909
3910
+ CountDownLatch afterRecordLatch = new CountDownLatch (2 );
3908
3911
RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
3909
3912
3910
3913
@ Override
@@ -3915,6 +3918,10 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
3915
3918
return null ;
3916
3919
}
3917
3920
3921
+ public void afterRecord (ConsumerRecord <Integer , String > record , Consumer <Integer , String > consumer ) {
3922
+ afterRecordLatch .countDown ();
3923
+ }
3924
+
3918
3925
});
3919
3926
3920
3927
KafkaMessageListenerContainer <Integer , String > container =
@@ -3923,6 +3930,9 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
3923
3930
container .setInterceptBeforeTx (early );
3924
3931
container .start ();
3925
3932
assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
3933
+ assertThat (afterRecordLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
3934
+ container .stop ();
3935
+ assertThat (closeLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
3926
3936
3927
3937
InOrder inOrder = inOrder (recordInterceptor , consumer );
3928
3938
inOrder .verify (recordInterceptor ).setupThreadState (eq (consumer ));
@@ -3949,7 +3959,7 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
3949
3959
inOrder .verify (consumer ).commitSync (eq (Map .of (new TopicPartition ("foo" , 0 ), new OffsetAndMetadata (2L ))),
3950
3960
any (Duration .class ));
3951
3961
}
3952
- container . stop ();
3962
+ inOrder . verify ( consumer ). close ();
3953
3963
}
3954
3964
3955
3965
@ ParameterizedTest (name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}" )
0 commit comments