30
30
31
31
import java .time .Duration ;
32
32
import java .util .ArrayList ;
33
- import java .util .Arrays ;
34
33
import java .util .Collection ;
35
34
import java .util .Collections ;
36
35
import java .util .HashMap ;
@@ -132,7 +131,7 @@ else if (event instanceof ConsumerFailedToStartEvent) {
132
131
exec .destroy ();
133
132
}
134
133
135
- @ SuppressWarnings ({ "rawtypes" , "unchecked" , "deprecation" })
134
+ @ SuppressWarnings ({ "rawtypes" , "unchecked" })
136
135
@ Test
137
136
void testCorrectContainerForConsumerError () throws InterruptedException {
138
137
ConsumerFactory consumerFactory = mock (ConsumerFactory .class );
@@ -202,10 +201,8 @@ void delayedIdleEvent() throws InterruptedException {
202
201
containerProperties );
203
202
CountDownLatch latch1 = new CountDownLatch (1 );
204
203
CountDownLatch latch2 = new CountDownLatch (2 );
205
- AtomicReference <Long > eventTime = new AtomicReference <>();
206
204
container .setApplicationEventPublisher (event -> {
207
205
if (event instanceof ListenerContainerIdleEvent ) {
208
- eventTime .set (System .currentTimeMillis ());
209
206
latch1 .countDown ();
210
207
latch2 .countDown ();
211
208
}
@@ -265,7 +262,7 @@ void testSyncRelativeSeeks() throws InterruptedException {
265
262
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
266
263
TopicPartition tp2 = new TopicPartition ("foo" , 2 );
267
264
TopicPartition tp3 = new TopicPartition ("foo" , 3 );
268
- List <TopicPartition > assignments = Arrays . asList (tp0 , tp1 , tp2 , tp3 );
265
+ List <TopicPartition > assignments = List . of (tp0 , tp1 , tp2 , tp3 );
269
266
willAnswer (invocation -> {
270
267
((ConsumerRebalanceListener ) invocation .getArgument (1 ))
271
268
.onPartitionsAssigned (assignments );
@@ -306,7 +303,7 @@ void testAsyncRelativeSeeks() throws InterruptedException {
306
303
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
307
304
TopicPartition tp2 = new TopicPartition ("foo" , 2 );
308
305
TopicPartition tp3 = new TopicPartition ("foo" , 3 );
309
- List <TopicPartition > assignments = Arrays . asList (tp0 , tp1 , tp2 , tp3 );
306
+ List <TopicPartition > assignments = List . of (tp0 , tp1 , tp2 , tp3 );
310
307
Map <TopicPartition , List <ConsumerRecord <String , String >>> recordMap = new HashMap <>();
311
308
recordMap .put (tp0 , Collections .singletonList (new ConsumerRecord ("foo" , 0 , 0 , null , "bar" )));
312
309
recordMap .put (tp1 , Collections .singletonList (new ConsumerRecord ("foo" , 1 , 0 , null , "bar" )));
@@ -365,7 +362,7 @@ void testSyncTimestampSeeks() throws InterruptedException {
365
362
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
366
363
TopicPartition tp2 = new TopicPartition ("foo" , 2 );
367
364
TopicPartition tp3 = new TopicPartition ("foo" , 3 );
368
- List <TopicPartition > assignments = Arrays . asList (tp0 , tp1 , tp2 , tp3 );
365
+ List <TopicPartition > assignments = List . of (tp0 , tp1 , tp2 , tp3 );
369
366
willAnswer (invocation -> {
370
367
((ConsumerRebalanceListener ) invocation .getArgument (1 ))
371
368
.onPartitionsAssigned (assignments );
@@ -412,7 +409,7 @@ void testAsyncTimestampSeeks() throws InterruptedException {
412
409
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
413
410
TopicPartition tp2 = new TopicPartition ("foo" , 2 );
414
411
TopicPartition tp3 = new TopicPartition ("foo" , 3 );
415
- List <TopicPartition > assignments = Arrays . asList (tp0 , tp1 , tp2 , tp3 );
412
+ List <TopicPartition > assignments = List . of (tp0 , tp1 , tp2 , tp3 );
416
413
Map <TopicPartition , List <ConsumerRecord <String , String >>> recordMap = new HashMap <>();
417
414
recordMap .put (tp0 , Collections .singletonList (new ConsumerRecord ("foo" , 0 , 0 , null , "bar" )));
418
415
recordMap .put (tp1 , Collections .singletonList (new ConsumerRecord ("foo" , 1 , 0 , null , "bar" )));
@@ -510,7 +507,9 @@ void testBatchInterceptBeforeTx1() throws InterruptedException {
510
507
}
511
508
512
509
@ SuppressWarnings ({ "rawtypes" , "unchecked" })
513
- void testIntercept (boolean beforeTx , AssignmentCommitOption option , boolean batch ) throws InterruptedException {
510
+ void testIntercept (boolean beforeTx , @ Nullable AssignmentCommitOption option , boolean batch )
511
+ throws InterruptedException {
512
+
514
513
ConsumerFactory consumerFactory = mock (ConsumerFactory .class );
515
514
final Consumer consumer = mock (Consumer .class );
516
515
TopicPartition tp0 = new TopicPartition ("foo" , 0 );
@@ -525,7 +524,7 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc
525
524
Thread .sleep (10 );
526
525
return firstOrSecondPoll .incrementAndGet () < 3 ? records : empty ;
527
526
}).given (consumer ).poll (any ());
528
- List <TopicPartition > assignments = Arrays . asList (tp0 );
527
+ List <TopicPartition > assignments = List . of (tp0 );
529
528
willAnswer (invocation -> {
530
529
((ConsumerRebalanceListener ) invocation .getArgument (1 ))
531
530
.onPartitionsAssigned (assignments );
@@ -678,7 +677,7 @@ void testInterceptInTxNonKafkaTM() throws InterruptedException {
678
677
Thread .sleep (10 );
679
678
return firstOrSecondPoll .incrementAndGet () < 2 ? records : empty ;
680
679
}).given (consumer ).poll (any ());
681
- List <TopicPartition > assignments = Arrays . asList (tp0 );
680
+ List <TopicPartition > assignments = List . of (tp0 );
682
681
willAnswer (invocation -> {
683
682
((ConsumerRebalanceListener ) invocation .getArgument (1 ))
684
683
.onPartitionsAssigned (assignments );
@@ -773,7 +772,7 @@ void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {
773
772
return records ;
774
773
}).given (consumer ).poll (any ());
775
774
TopicPartition tp0 = new TopicPartition ("foo" , 0 );
776
- List <TopicPartition > assignments = Arrays . asList (tp0 );
775
+ List <TopicPartition > assignments = List . of (tp0 );
777
776
willAnswer (invocation -> {
778
777
((ConsumerRebalanceListener ) invocation .getArgument (1 ))
779
778
.onPartitionsAssigned (assignments );
@@ -816,7 +815,7 @@ private void testInitialCommitIBasedOnCommitted(boolean committed) throws Interr
816
815
return records ;
817
816
}).given (consumer ).poll (any ());
818
817
TopicPartition tp0 = new TopicPartition ("foo" , 0 );
819
- List <TopicPartition > assignments = Arrays . asList (tp0 );
818
+ List <TopicPartition > assignments = List . of (tp0 );
820
819
willAnswer (invocation -> {
821
820
((ConsumerRebalanceListener ) invocation .getArgument (1 ))
822
821
.onPartitionsAssigned (assignments );
@@ -867,7 +866,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept
867
866
return null ;
868
867
}).given (consumer ).pause (any ());
869
868
TopicPartition tp0 = new TopicPartition ("foo" , 0 );
870
- List <TopicPartition > assignments = Arrays . asList (tp0 );
869
+ List <TopicPartition > assignments = List . of (tp0 );
871
870
AtomicReference <ConsumerRebalanceListener > rebal = new AtomicReference <>();
872
871
willAnswer (invocation -> {
873
872
rebal .set (invocation .getArgument (1 ));
@@ -913,14 +912,14 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssi
913
912
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
914
913
TopicPartition tp2 = new TopicPartition ("foo" , 2 );
915
914
TopicPartition tp3 = new TopicPartition ("foo" , 3 );
916
- List <TopicPartition > allAssignments = Arrays . asList (tp0 , tp1 , tp2 , tp3 );
915
+ List <TopicPartition > allAssignments = List . of (tp0 , tp1 , tp2 , tp3 );
917
916
Map <TopicPartition , List <ConsumerRecord <String , String >>> allRecordMap = new HashMap <>();
918
917
allRecordMap .put (tp0 , Collections .singletonList (new ConsumerRecord ("foo" , 0 , 0 , null , "bar" )));
919
918
allRecordMap .put (tp1 , Collections .singletonList (new ConsumerRecord ("foo" , 1 , 0 , null , "bar" )));
920
919
allRecordMap .put (tp2 , Collections .singletonList (new ConsumerRecord ("foo" , 2 , 0 , null , "bar" )));
921
920
allRecordMap .put (tp3 , Collections .singletonList (new ConsumerRecord ("foo" , 3 , 0 , null , "bar" )));
922
921
ConsumerRecords allRecords = new ConsumerRecords <>(allRecordMap );
923
- List <TopicPartition > afterRevokeAssignments = Arrays . asList (tp1 , tp3 );
922
+ List <TopicPartition > afterRevokeAssignments = List . of (tp1 , tp3 );
924
923
Map <TopicPartition , List <ConsumerRecord <String , String >>> afterRevokeRecordMap = new HashMap <>();
925
924
afterRevokeRecordMap .put (tp1 , Collections .singletonList (new ConsumerRecord ("foo" , 1 , 0 , null , "bar" )));
926
925
afterRevokeRecordMap .put (tp3 , Collections .singletonList (new ConsumerRecord ("foo" , 3 , 0 , null , "bar" )));
@@ -981,10 +980,11 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
981
980
Thread .sleep (50 );
982
981
pollLatch .countDown ();
983
982
switch (pollPhase .getAndIncrement ()) {
984
- case 0 :
983
+ case 0 -> {
985
984
rebal .get ().onPartitionsAssigned (allAssignments );
986
985
return allRecords ;
987
- case 1 :
986
+ }
987
+ case 1 -> {
988
988
rebal .get ().onPartitionsRevoked (allAssignments );
989
989
rebal .get ().onPartitionsAssigned (afterRevokeAssignments );
990
990
rebalLatch .countDown ();
@@ -993,11 +993,13 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
993
993
return ConsumerRecords .empty ();
994
994
}
995
995
return afterRevokeRecords ;
996
- default :
996
+ }
997
+ default -> {
997
998
if (paused .get ()) {
998
999
return ConsumerRecords .empty ();
999
1000
}
1000
1001
return afterRevokeRecords ;
1002
+ }
1001
1003
}
1002
1004
}).given (consumer ).poll (any ());
1003
1005
container .start ();
@@ -1025,7 +1027,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
1025
1027
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
1026
1028
TopicPartition tp2 = new TopicPartition ("foo" , 2 );
1027
1029
TopicPartition tp3 = new TopicPartition ("foo" , 3 );
1028
- List <TopicPartition > allAssignments = Arrays . asList (tp0 , tp1 , tp2 , tp3 );
1030
+ List <TopicPartition > allAssignments = List . of (tp0 , tp1 , tp2 , tp3 );
1029
1031
Map <TopicPartition , List <ConsumerRecord <String , String >>> allRecordMap = new LinkedHashMap <>();
1030
1032
ConsumerRecord record0 = new ConsumerRecord ("foo" , 0 , 0 , null , "bar" );
1031
1033
ConsumerRecord record1 = new ConsumerRecord ("foo" , 1 , 0 , null , "bar" );
@@ -1034,7 +1036,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
1034
1036
allRecordMap .put (tp2 , Collections .singletonList (new ConsumerRecord ("foo" , 2 , 0 , null , "bar" )));
1035
1037
allRecordMap .put (tp3 , Collections .singletonList (new ConsumerRecord ("foo" , 3 , 0 , null , "bar" )));
1036
1038
ConsumerRecords allRecords = new ConsumerRecords <>(allRecordMap );
1037
- List <TopicPartition > revokedAssignments = Arrays . asList (tp0 , tp2 );
1039
+ List <TopicPartition > revokedAssignments = List . of (tp0 , tp2 );
1038
1040
AtomicInteger pollPhase = new AtomicInteger ();
1039
1041
1040
1042
Consumer consumer = mock (Consumer .class );
@@ -1046,9 +1048,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign
1046
1048
return null ;
1047
1049
}).given (consumer ).subscribe (any (Collection .class ), any ());
1048
1050
CountDownLatch pauseLatch = new CountDownLatch (1 );
1049
- AtomicBoolean paused = new AtomicBoolean ();
1050
1051
willAnswer (inv -> {
1051
- paused .set (true );
1052
1052
pauseLatch .countDown ();
1053
1053
return null ;
1054
1054
}).given (consumer ).pause (any ());
@@ -1089,17 +1089,20 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
1089
1089
Thread .sleep (50 );
1090
1090
pollLatch .countDown ();
1091
1091
switch (pollPhase .getAndIncrement ()) {
1092
- case 0 :
1092
+ case 0 -> {
1093
1093
rebal .get ().onPartitionsAssigned (allAssignments );
1094
1094
return allRecords ;
1095
- case 1 :
1095
+ }
1096
+ case 1 -> {
1096
1097
rebal .get ().onPartitionsRevoked (revokedAssignments );
1097
1098
rebal .get ().onPartitionsAssigned (Collections .emptyList ());
1098
1099
rebalLatch .countDown ();
1099
1100
continueLatch .await (10 , TimeUnit .SECONDS );
1100
1101
return ConsumerRecords .empty ();
1101
- default :
1102
+ }
1103
+ default -> {
1102
1104
return ConsumerRecords .empty ();
1105
+ }
1103
1106
}
1104
1107
}).given (consumer ).poll (any ());
1105
1108
container .start ();
@@ -1130,14 +1133,14 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
1130
1133
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor () throws InterruptedException {
1131
1134
TopicPartition tp0 = new TopicPartition ("foo" , 0 );
1132
1135
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
1133
- List <TopicPartition > allAssignments = Arrays . asList (tp0 , tp1 );
1136
+ List <TopicPartition > allAssignments = List . of (tp0 , tp1 );
1134
1137
Map <TopicPartition , List <ConsumerRecord <String , String >>> allRecordMap = new HashMap <>();
1135
1138
allRecordMap .put (tp0 ,
1136
1139
List .of (new ConsumerRecord ("foo" , 0 , 0 , null , "bar" ), new ConsumerRecord ("foo" , 0 , 1 , null , "bar" )));
1137
1140
allRecordMap .put (tp1 ,
1138
1141
List .of (new ConsumerRecord ("foo" , 1 , 0 , null , "bar" ), new ConsumerRecord ("foo" , 1 , 1 , null , "bar" )));
1139
1142
ConsumerRecords allRecords = new ConsumerRecords <>(allRecordMap );
1140
- List <TopicPartition > afterRevokeAssignments = Arrays . asList (tp1 );
1143
+ List <TopicPartition > afterRevokeAssignments = List . of (tp1 );
1141
1144
AtomicInteger pollPhase = new AtomicInteger ();
1142
1145
1143
1146
Consumer consumer = mock (Consumer .class );
@@ -1149,9 +1152,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
1149
1152
return null ;
1150
1153
}).given (consumer ).subscribe (any (Collection .class ), any ());
1151
1154
CountDownLatch pauseLatch = new CountDownLatch (1 );
1152
- AtomicBoolean paused = new AtomicBoolean ();
1153
1155
willAnswer (inv -> {
1154
- paused .set (true );
1155
1156
pauseLatch .countDown ();
1156
1157
return null ;
1157
1158
}).given (consumer ).pause (any ());
@@ -1173,17 +1174,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
1173
1174
Thread .sleep (50 );
1174
1175
pollLatch .countDown ();
1175
1176
switch (pollPhase .getAndIncrement ()) {
1176
- case 0 :
1177
+ case 0 -> {
1177
1178
rebal .get ().onPartitionsAssigned (allAssignments );
1178
1179
return allRecords ;
1179
- case 1 :
1180
+ }
1181
+ case 1 -> {
1180
1182
rebal .get ().onPartitionsRevoked (allAssignments );
1181
1183
rebal .get ().onPartitionsAssigned (afterRevokeAssignments );
1182
1184
rebalLatch .countDown ();
1183
1185
continueLatch .await (10 , TimeUnit .SECONDS );
1184
1186
return ConsumerRecords .empty ();
1185
- default :
1187
+ }
1188
+ default -> {
1186
1189
return ConsumerRecords .empty ();
1190
+ }
1187
1191
}
1188
1192
}).given (consumer ).poll (any ());
1189
1193
container .start ();
@@ -1208,14 +1212,13 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I
1208
1212
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor () throws InterruptedException {
1209
1213
TopicPartition tp0 = new TopicPartition ("foo" , 0 );
1210
1214
TopicPartition tp1 = new TopicPartition ("foo" , 1 );
1211
- List <TopicPartition > allAssignments = Arrays . asList (tp0 , tp1 );
1215
+ List <TopicPartition > allAssignments = List . of (tp0 , tp1 );
1212
1216
Map <TopicPartition , List <ConsumerRecord <String , String >>> allRecordMap = new HashMap <>();
1213
1217
allRecordMap .put (tp0 ,
1214
1218
List .of (new ConsumerRecord ("foo" , 0 , 0 , null , "bar" ), new ConsumerRecord ("foo" , 0 , 1 , null , "bar" )));
1215
1219
allRecordMap .put (tp1 ,
1216
1220
List .of (new ConsumerRecord ("foo" , 1 , 0 , null , "bar" ), new ConsumerRecord ("foo" , 1 , 1 , null , "bar" )));
1217
1221
ConsumerRecords allRecords = new ConsumerRecords <>(allRecordMap );
1218
- List <TopicPartition > afterRevokeAssignments = Arrays .asList (tp1 );
1219
1222
AtomicInteger pollPhase = new AtomicInteger ();
1220
1223
1221
1224
Consumer consumer = mock (Consumer .class );
@@ -1227,9 +1230,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
1227
1230
return null ;
1228
1231
}).given (consumer ).subscribe (any (Collection .class ), any ());
1229
1232
CountDownLatch pauseLatch = new CountDownLatch (1 );
1230
- AtomicBoolean paused = new AtomicBoolean ();
1231
1233
willAnswer (inv -> {
1232
- paused .set (true );
1233
1234
pauseLatch .countDown ();
1234
1235
return null ;
1235
1236
}).given (consumer ).pause (any ());
@@ -1251,17 +1252,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
1251
1252
Thread .sleep (50 );
1252
1253
pollLatch .countDown ();
1253
1254
switch (pollPhase .getAndIncrement ()) {
1254
- case 0 :
1255
+ case 0 -> {
1255
1256
rebal .get ().onPartitionsAssigned (allAssignments );
1256
1257
return allRecords ;
1257
- case 1 :
1258
+ }
1259
+ case 1 -> {
1258
1260
rebal .get ().onPartitionsRevoked (List .of (tp0 ));
1259
1261
rebal .get ().onPartitionsAssigned (List .of (new TopicPartition ("foo" , 2 )));
1260
1262
rebalLatch .countDown ();
1261
1263
continueLatch .await (10 , TimeUnit .SECONDS );
1262
1264
return ConsumerRecords .empty ();
1263
- default :
1265
+ }
1266
+ default -> {
1264
1267
return ConsumerRecords .empty ();
1268
+ }
1265
1269
}
1266
1270
}).given (consumer ).poll (any ());
1267
1271
container .start ();
@@ -1287,7 +1291,7 @@ private AcknowledgingMessageListener ackOffset1() {
1287
1291
1288
1292
@ Override
1289
1293
public void onMessage (ConsumerRecord rec , @ Nullable Acknowledgment ack ) {
1290
- if (rec .offset () == 1 ) {
1294
+ if (rec .offset () == 1 && ack != null ) {
1291
1295
ack .acknowledge ();
1292
1296
}
1293
1297
}
@@ -1301,7 +1305,7 @@ public void onMessage(Object data) {
1301
1305
1302
1306
public static class TestMessageListener1 implements MessageListener <String , String >, ConsumerSeekAware {
1303
1307
1304
- private static ThreadLocal <ConsumerSeekCallback > callbacks = new ThreadLocal <>();
1308
+ private static final ThreadLocal <ConsumerSeekCallback > callbacks = new ThreadLocal <>();
1305
1309
1306
1310
CountDownLatch latch = new CountDownLatch (1 );
1307
1311
@@ -1337,7 +1341,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
1337
1341
1338
1342
public static class TestMessageListener2 implements MessageListener <String , String >, ConsumerSeekAware {
1339
1343
1340
- private static ThreadLocal <ConsumerSeekCallback > callbacks = new ThreadLocal <>();
1344
+ private static final ThreadLocal <ConsumerSeekCallback > callbacks = new ThreadLocal <>();
1341
1345
1342
1346
CountDownLatch latch = new CountDownLatch (1 );
1343
1347
0 commit comments