@@ -1428,7 +1428,7 @@ protected void pollAndInvoke() {
1428
1428
doProcessCommits ();
1429
1429
fixTxOffsetsIfNeeded ();
1430
1430
idleBetweenPollIfNecessary ();
1431
- if (this .seeks .size () > 0 ) {
1431
+ if (! this .seeks .isEmpty () ) {
1432
1432
processSeeks ();
1433
1433
}
1434
1434
pauseConsumerIfNecessary ();
@@ -1586,7 +1586,7 @@ private void fixTxOffsetsIfNeeded() {
1586
1586
toFix .put (tp , createOffsetAndMetadata (position ));
1587
1587
}
1588
1588
});
1589
- if (toFix .size () > 0 ) {
1589
+ if (! toFix .isEmpty () ) {
1590
1590
this .logger .debug (() -> "Fixing TX offsets: " + toFix );
1591
1591
if (this .kafkaTxManager == null ) {
1592
1592
if (this .syncCommits ) {
@@ -1689,7 +1689,7 @@ private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
1689
1689
}
1690
1690
1691
1691
private void checkRebalanceCommits () {
1692
- if (this .commitsDuringRebalance .size () > 0 ) {
1692
+ if (! this .commitsDuringRebalance .isEmpty () ) {
1693
1693
// Attempt to recommit the offsets for partitions that we still own
1694
1694
Map <TopicPartition , OffsetAndMetadata > commits = this .commitsDuringRebalance .entrySet ()
1695
1695
.stream ()
@@ -1729,7 +1729,8 @@ private void debugRecords(@Nullable ConsumerRecords<K, V> records) {
1729
1729
.flatMap (p -> records .records (p ).stream ())
1730
1730
// map to same format as send metadata toString()
1731
1731
.map (r -> r .topic () + "-" + r .partition () + "@" + r .offset ())
1732
- .collect (Collectors .toList ()).toString ());
1732
+ .toList ()
1733
+ .toString ());
1733
1734
}
1734
1735
}
1735
1736
}
@@ -1755,11 +1756,11 @@ private void pauseConsumerIfNecessary() {
1755
1756
}
1756
1757
1757
1758
private void doPauseConsumerIfNecessary () {
1758
- if (this .pausedForNack .size () > 0 ) {
1759
+ if (! this .pausedForNack .isEmpty () ) {
1759
1760
this .logger .debug ("Still paused for nack sleep" );
1760
1761
return ;
1761
1762
}
1762
- if (this .offsetsInThisBatch != null && this .offsetsInThisBatch .size () > 0 && !this .pausedForAsyncAcks ) {
1763
+ if (this .offsetsInThisBatch != null && ! this .offsetsInThisBatch .isEmpty () && !this .pausedForAsyncAcks ) {
1763
1764
this .pausedForAsyncAcks = true ;
1764
1765
this .logger .debug (() -> "Pausing for incomplete async acks: " + this .offsetsInThisBatch );
1765
1766
}
@@ -1800,7 +1801,7 @@ else if (this.offsetsInThisBatch != null) {
1800
1801
}
1801
1802
1802
1803
private void doResumeConsumerIfNeccessary () {
1803
- if (this .pausedForAsyncAcks && this .offsetsInThisBatch .size () == 0 ) {
1804
+ if (this .pausedForAsyncAcks && this .offsetsInThisBatch .isEmpty () ) {
1804
1805
this .pausedForAsyncAcks = false ;
1805
1806
this .logger .debug ("Resuming after manual async acks cleared" );
1806
1807
}
@@ -1822,8 +1823,8 @@ private void pausePartitionsIfNecessary() {
1822
1823
.stream ()
1823
1824
.filter (tp -> isPartitionPauseRequested (tp )
1824
1825
&& !pausedConsumerPartitions .contains (tp ))
1825
- .collect ( Collectors . toList () );
1826
- if (partitionsToPause .size () > 0 ) {
1826
+ .toList ();
1827
+ if (! partitionsToPause .isEmpty () ) {
1827
1828
this .consumer .pause (partitionsToPause );
1828
1829
this .pausedPartitions .addAll (partitionsToPause );
1829
1830
this .logger .debug (() -> "Paused consumption from " + partitionsToPause );
@@ -1839,8 +1840,8 @@ private void resumePartitionsIfNecessary() {
1839
1840
.stream ()
1840
1841
.filter (tp -> !isPartitionPauseRequested (tp )
1841
1842
&& this .pausedPartitions .contains (tp ))
1842
- .collect ( Collectors . toList () );
1843
- if (partitionsToResume .size () > 0 ) {
1843
+ .toList ();
1844
+ if (! partitionsToResume .isEmpty () ) {
1844
1845
this .consumer .resume (partitionsToResume );
1845
1846
this .pausedPartitions .removeAll (partitionsToResume );
1846
1847
this .logger .debug (() -> "Resumed consumption from " + partitionsToResume );
@@ -1876,7 +1877,7 @@ private void checkIdle() {
1876
1877
private void idleBetweenPollIfNecessary () {
1877
1878
long idleBetweenPolls = this .containerProperties .getIdleBetweenPolls ();
1878
1879
Collection <TopicPartition > assigned = getAssignedPartitions ();
1879
- if (idleBetweenPolls > 0 && assigned != null && assigned .size () > 0 ) {
1880
+ if (idleBetweenPolls > 0 && assigned != null && ! assigned .isEmpty () ) {
1880
1881
idleBetweenPolls = Math .min (idleBetweenPolls ,
1881
1882
this .maxPollInterval - (System .currentTimeMillis () - this .lastPoll )
1882
1883
- 5000 ); // NOSONAR - less by five seconds to avoid race condition with rebalance
@@ -1959,7 +1960,7 @@ protected void handleConsumerException(Exception e) {
1959
1960
1960
1961
private void commitPendingAcks () {
1961
1962
processCommits ();
1962
- if (this .offsets .size () > 0 ) {
1963
+ if (! this .offsets .isEmpty () ) {
1963
1964
// we always commit after stopping the invoker
1964
1965
commitIfNecessary ();
1965
1966
}
@@ -2055,19 +2056,19 @@ private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
2055
2056
TopicPartition part = new TopicPartition (record .topic (), record .partition ());
2056
2057
List <Long > offs = this .offsetsInThisBatch .get (part );
2057
2058
List <ConsumerRecord <K , V >> deferred = this .deferredOffsets .get (part );
2058
- if (offs .size () > 0 ) {
2059
+ if (! offs .isEmpty () ) {
2059
2060
if (offs .get (0 ) == record .offset ()) {
2060
2061
offs .remove (0 );
2061
2062
ConsumerRecord <K , V > recordToAck = record ;
2062
- if (deferred .size () > 0 ) {
2063
+ if (! deferred .isEmpty () ) {
2063
2064
Collections .sort (deferred , (a , b ) -> Long .compare (a .offset (), b .offset ()));
2064
- while (deferred .size () > 0 && deferred .get (0 ).offset () == recordToAck .offset () + 1 ) {
2065
+ while (! deferred .isEmpty () && deferred .get (0 ).offset () == recordToAck .offset () + 1 ) {
2065
2066
recordToAck = deferred .remove (0 );
2066
2067
offs .remove (0 );
2067
2068
}
2068
2069
}
2069
2070
processAck (recordToAck );
2070
- if (offs .size () == 0 ) {
2071
+ if (offs .isEmpty () ) {
2071
2072
this .deferredOffsets .remove (part );
2072
2073
this .offsetsInThisBatch .remove (part );
2073
2074
}
@@ -2147,7 +2148,7 @@ private void invokeBatchListener(final ConsumerRecords<K, V> recordsArg) {
2147
2148
if (!this .wantsFullRecords ) {
2148
2149
recordList = createRecordList (records );
2149
2150
}
2150
- if (this .wantsFullRecords || recordList .size () > 0 ) {
2151
+ if (this .wantsFullRecords || ! recordList .isEmpty () ) {
2151
2152
if (this .transactionTemplate != null ) {
2152
2153
invokeBatchListenerInTx (records , recordList ); // NOSONAR
2153
2154
}
@@ -2624,7 +2625,7 @@ private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
2624
2625
remaining .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
2625
2626
tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
2626
2627
}
2627
- if (remaining .size () > 0 ) {
2628
+ if (! remaining .isEmpty () ) {
2628
2629
this .remainingRecords = new ConsumerRecords <>(remaining );
2629
2630
return true ;
2630
2631
}
@@ -2692,7 +2693,7 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
2692
2693
Iterator <ConsumerRecord <K , V >> iterator2 = records .iterator ();
2693
2694
while (iterator2 .hasNext ()) {
2694
2695
ConsumerRecord <K , V > next = iterator2 .next ();
2695
- if (list .size () > 0 || recordsEqual (record , next )) {
2696
+ if (! list .isEmpty () || recordsEqual (record , next )) {
2696
2697
list .add (next );
2697
2698
}
2698
2699
}
@@ -2907,7 +2908,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
2907
2908
records .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
2908
2909
tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
2909
2910
}
2910
- if (records .size () > 0 ) {
2911
+ if (! records .isEmpty () ) {
2911
2912
this .remainingRecords = new ConsumerRecords <>(records );
2912
2913
this .pauseForPending = true ;
2913
2914
}
@@ -3179,10 +3180,10 @@ private void initPartitionsIfNeeded() {
3179
3180
private void doInitialSeeks (Map <TopicPartition , OffsetMetadata > partitions , Set <TopicPartition > beginnings ,
3180
3181
Set <TopicPartition > ends ) {
3181
3182
3182
- if (beginnings .size () > 0 ) {
3183
+ if (! beginnings .isEmpty () ) {
3183
3184
this .consumer .seekToBeginning (beginnings );
3184
3185
}
3185
- if (ends .size () > 0 ) {
3186
+ if (! ends .isEmpty () ) {
3186
3187
this .consumer .seekToEnd (ends );
3187
3188
}
3188
3189
for (Entry <TopicPartition , OffsetMetadata > entry : partitions .entrySet ()) {
@@ -3311,7 +3312,7 @@ public void seekToBeginning(String topic, int partition) {
3311
3312
public void seekToBeginning (Collection <TopicPartition > partitions ) {
3312
3313
this .seeks .addAll (partitions .stream ()
3313
3314
.map (tp -> new TopicPartitionOffset (tp .topic (), tp .partition (), SeekPosition .BEGINNING ))
3314
- .collect ( Collectors . toList () ));
3315
+ .toList ());
3315
3316
}
3316
3317
3317
3318
@ Override
@@ -3323,7 +3324,7 @@ public void seekToEnd(String topic, int partition) {
3323
3324
public void seekToEnd (Collection <TopicPartition > partitions ) {
3324
3325
this .seeks .addAll (partitions .stream ()
3325
3326
.map (tp -> new TopicPartitionOffset (tp .topic (), tp .partition (), SeekPosition .END ))
3326
- .collect ( Collectors . toList () ));
3327
+ .toList ());
3327
3328
}
3328
3329
3329
3330
@ Override
@@ -3563,15 +3564,15 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3563
3564
toRepause .add (tp );
3564
3565
}
3565
3566
});
3566
- if (!ListenerConsumer .this .consumerPaused && toRepause .size () > 0 ) {
3567
+ if (!ListenerConsumer .this .consumerPaused && ! toRepause .isEmpty () ) {
3567
3568
ListenerConsumer .this .consumer .pause (toRepause );
3568
3569
ListenerConsumer .this .logger .debug (() -> "Paused consumption from: " + toRepause );
3569
3570
publishConsumerPausedEvent (toRepause , "Re-paused after rebalance" );
3570
3571
}
3571
3572
this .revoked .removeAll (toRepause );
3572
3573
ListenerConsumer .this .pausedPartitions .removeAll (this .revoked );
3573
3574
this .revoked .clear ();
3574
- if (ListenerConsumer .this .pausedForNack .size () > 0 ) {
3575
+ if (! ListenerConsumer .this .pausedForNack .isEmpty () ) {
3575
3576
ListenerConsumer .this .consumer .pause (ListenerConsumer .this .pausedForNack );
3576
3577
}
3577
3578
}
@@ -3596,7 +3597,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
3596
3597
return false ;
3597
3598
}
3598
3599
}
3599
- if (offsetsToCommit .size () > 0 ) {
3600
+ if (! offsetsToCommit .isEmpty () ) {
3600
3601
commitCurrentOffsets (offsetsToCommit );
3601
3602
}
3602
3603
return true ;
0 commit comments