@@ -152,6 +152,12 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
152
152
153
153
private static final String UNUSED = "unused" ;
154
154
155
+ private static final String DEPRECATION = "deprecation" ;
156
+
157
+ private static final String UNCHECKED = "unchecked" ;
158
+
159
+ private static final String RAWTYPES = "rawtypes" ;
160
+
155
161
private static final int DEFAULT_ACK_TIME = 5000 ;
156
162
157
163
private static final Map <String , Object > CONSUMER_CONFIG_DEFAULTS = ConsumerConfig .configDef ().defaultValues ();
@@ -547,12 +553,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
547
553
548
554
private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception" ;
549
555
550
- private static final String UNCHECKED = "unchecked" ;
551
-
552
- private static final String RAWTYPES = "rawtypes" ;
553
-
554
- private static final String RAW_TYPES = RAWTYPES ;
555
-
556
556
private final LogAccessor logger = KafkaMessageListenerContainer .this .logger ; // NOSONAR hide
557
557
558
558
private final ContainerProperties containerProperties = getContainerProperties ();
@@ -611,7 +611,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
611
611
612
612
private final PlatformTransactionManager transactionManager = this .containerProperties .getTransactionManager ();
613
613
614
- @ SuppressWarnings (RAW_TYPES )
614
+ @ SuppressWarnings (RAWTYPES )
615
615
private final KafkaAwareTransactionManager kafkaTxManager =
616
616
this .transactionManager instanceof KafkaAwareTransactionManager
617
617
? ((KafkaAwareTransactionManager ) this .transactionManager ) : null ;
@@ -858,7 +858,7 @@ else if (this.commonRecordInterceptor != null) {
858
858
@ Nullable
859
859
private CommonErrorHandler determineCommonErrorHandler () {
860
860
CommonErrorHandler common = getCommonErrorHandler ();
861
- @ SuppressWarnings ("deprecation" )
861
+ @ SuppressWarnings (DEPRECATION )
862
862
GenericErrorHandler <?> errHandler = getGenericErrorHandler ();
863
863
if (common != null ) {
864
864
if (errHandler != null ) {
@@ -935,7 +935,7 @@ private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> con
935
935
}
936
936
}
937
937
938
- @ SuppressWarnings ("deprecation" )
938
+ @ SuppressWarnings (DEPRECATION )
939
939
private boolean setupSubBatchPerPartition () {
940
940
Boolean subBatching = this .containerProperties .getSubBatchPerPartition ();
941
941
if (subBatching != null ) {
@@ -1229,7 +1229,7 @@ public boolean isLongLived() {
1229
1229
return true ;
1230
1230
}
1231
1231
1232
- @ SuppressWarnings ("deprecation" )
1232
+ @ SuppressWarnings (DEPRECATION )
1233
1233
@ Override // NOSONAR complexity
1234
1234
public void run () {
1235
1235
ListenerUtils .setLogOnlyMetadata (this .containerProperties .isOnlyLogRecordMetadata ());
@@ -1671,16 +1671,20 @@ private void pausePartitionsIfNecessary() {
1671
1671
}
1672
1672
1673
1673
private void resumePartitionsIfNecessary () {
1674
- List <TopicPartition > partitionsToResume = getAssignedPartitions ()
1675
- .stream ()
1676
- .filter (tp -> !isPartitionPauseRequested (tp )
1677
- && this .pausedPartitions .contains (tp ))
1678
- .collect (Collectors .toList ());
1679
- if (partitionsToResume .size () > 0 ) {
1680
- this .consumer .resume (partitionsToResume );
1681
- this .pausedPartitions .removeAll (partitionsToResume );
1682
- this .logger .debug (() -> "Resumed consumption from " + partitionsToResume );
1683
- partitionsToResume .forEach (KafkaMessageListenerContainer .this ::publishConsumerPartitionResumedEvent );
1674
+ Collection <TopicPartition > assigned = getAssignedPartitions ();
1675
+ if (assigned != null ) {
1676
+ List <TopicPartition > partitionsToResume = assigned
1677
+ .stream ()
1678
+ .filter (tp -> !isPartitionPauseRequested (tp )
1679
+ && this .pausedPartitions .contains (tp ))
1680
+ .collect (Collectors .toList ());
1681
+ if (partitionsToResume .size () > 0 ) {
1682
+ this .consumer .resume (partitionsToResume );
1683
+ this .pausedPartitions .removeAll (partitionsToResume );
1684
+ this .logger .debug (() -> "Resumed consumption from " + partitionsToResume );
1685
+ partitionsToResume
1686
+ .forEach (KafkaMessageListenerContainer .this ::publishConsumerPartitionResumedEvent );
1687
+ }
1684
1688
}
1685
1689
}
1686
1690
@@ -1812,7 +1816,7 @@ record = this.acks.poll();
1812
1816
}
1813
1817
}
1814
1818
1815
- @ SuppressWarnings ("deprecation" )
1819
+ @ SuppressWarnings (DEPRECATION )
1816
1820
private void traceAck (ConsumerRecord <K , V > record ) {
1817
1821
this .logger .trace (() -> "Ack: " + ListenerUtils .recordToString (record , true ));
1818
1822
}
@@ -1887,7 +1891,7 @@ private void processAcks(ConsumerRecords<K, V> records) {
1887
1891
}
1888
1892
}
1889
1893
1890
- @ SuppressWarnings ("deprecation" )
1894
+ @ SuppressWarnings (DEPRECATION )
1891
1895
private synchronized void ackInOrder (ConsumerRecord <K , V > record ) {
1892
1896
TopicPartition part = new TopicPartition (record .topic (), record .partition ());
1893
1897
List <Long > offs = this .offsetsInThisBatch .get (part );
@@ -1995,7 +1999,7 @@ private void invokeBatchListener(final ConsumerRecords<K, V> recordsArg) {
1995
1999
}
1996
2000
}
1997
2001
1998
- @ SuppressWarnings (RAW_TYPES )
2002
+ @ SuppressWarnings (RAWTYPES )
1999
2003
private void invokeBatchListenerInTx (final ConsumerRecords <K , V > records ,
2000
2004
@ Nullable final List <ConsumerRecord <K , V >> recordList ) {
2001
2005
@@ -2306,7 +2310,7 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
2306
2310
* Invoke the listener with each record in a separate transaction.
2307
2311
* @param records the records.
2308
2312
*/
2309
- @ SuppressWarnings ("deprecation" ) // NOSONAR complexity
2313
+ @ SuppressWarnings (DEPRECATION ) // NOSONAR complexity
2310
2314
private void invokeRecordListenerInTx (final ConsumerRecords <K , V > records ) {
2311
2315
Iterator <ConsumerRecord <K , V >> iterator = records .iterator ();
2312
2316
while (iterator .hasNext ()) {
@@ -2408,7 +2412,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
2408
2412
}
2409
2413
}
2410
2414
2411
- @ SuppressWarnings ("deprecation" )
2415
+ @ SuppressWarnings (DEPRECATION )
2412
2416
private void doInvokeWithRecords (final ConsumerRecords <K , V > records ) {
2413
2417
Iterator <ConsumerRecord <K , V >> iterator = records .iterator ();
2414
2418
while (iterator .hasNext ()) {
@@ -2444,7 +2448,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
2444
2448
return next ;
2445
2449
}
2446
2450
2447
- @ SuppressWarnings ("deprecation" )
2451
+ @ SuppressWarnings (DEPRECATION )
2448
2452
@ Nullable
2449
2453
private ConsumerRecord <K , V > checkEarlyIntercept (ConsumerRecord <K , V > recordArg ) {
2450
2454
internalHeaders (recordArg );
@@ -2494,7 +2498,10 @@ private void pauseForNackSleep() {
2494
2498
this .nackWake = System .currentTimeMillis () + this .nackSleep ;
2495
2499
this .nackSleep = -1 ;
2496
2500
Set <TopicPartition > alreadyPaused = this .consumer .paused ();
2497
- this .pausedForNack .addAll (getAssignedPartitions ());
2501
+ Collection <TopicPartition > assigned = getAssignedPartitions ();
2502
+ if (assigned != null ) {
2503
+ this .pausedForNack .addAll (assigned );
2504
+ }
2498
2505
this .pausedForNack .removeAll (alreadyPaused );
2499
2506
this .logger .debug (() -> "Pausing for nack sleep: " + ListenerConsumer .this .pausedForNack );
2500
2507
try {
@@ -2605,7 +2612,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
2605
2612
}
2606
2613
}
2607
2614
2608
- @ SuppressWarnings ("deprecation" )
2615
+ @ SuppressWarnings (DEPRECATION )
2609
2616
private void doInvokeOnMessage (final ConsumerRecord <K , V > recordArg ) {
2610
2617
ConsumerRecord <K , V > record = recordArg ;
2611
2618
if (this .recordInterceptor != null ) {
@@ -2759,7 +2766,7 @@ private void sendOffsetsToTransaction() {
2759
2766
doSendOffsets (this .producer , commits );
2760
2767
}
2761
2768
2762
- @ SuppressWarnings ("deprecation" )
2769
+ @ SuppressWarnings (DEPRECATION )
2763
2770
private void doSendOffsets (Producer <?, ?> prod , Map <TopicPartition , OffsetAndMetadata > commits ) {
2764
2771
if (this .eosMode .getMode ().equals (EOSMode .V1 )) {
2765
2772
prod .sendOffsetsToTransaction (commits , this .consumerGroupId );
@@ -3171,7 +3178,7 @@ public void nack(long sleep) {
3171
3178
}
3172
3179
3173
3180
@ Override
3174
- @ SuppressWarnings ("deprecation" )
3181
+ @ SuppressWarnings (DEPRECATION )
3175
3182
public String toString () {
3176
3183
return "Acknowledgment for " + ListenerUtils .recordToString (this .record , true );
3177
3184
}
0 commit comments