@@ -1428,7 +1428,7 @@ protected void pollAndInvoke() {
1428
1428
doProcessCommits ();
1429
1429
fixTxOffsetsIfNeeded ();
1430
1430
idleBetweenPollIfNecessary ();
1431
- if (this .seeks .size () > 0 ) {
1431
+ if (! this .seeks .isEmpty () ) {
1432
1432
processSeeks ();
1433
1433
}
1434
1434
pauseConsumerIfNecessary ();
@@ -1586,7 +1586,7 @@ private void fixTxOffsetsIfNeeded() {
1586
1586
toFix .put (tp , createOffsetAndMetadata (position ));
1587
1587
}
1588
1588
});
1589
- if (toFix .size () > 0 ) {
1589
+ if (! toFix .isEmpty () ) {
1590
1590
this .logger .debug (() -> "Fixing TX offsets: " + toFix );
1591
1591
if (this .kafkaTxManager == null ) {
1592
1592
if (this .syncCommits ) {
@@ -1689,7 +1689,7 @@ private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
1689
1689
}
1690
1690
1691
1691
private void checkRebalanceCommits () {
1692
- if (this .commitsDuringRebalance .size () > 0 ) {
1692
+ if (! this .commitsDuringRebalance .isEmpty () ) {
1693
1693
// Attempt to recommit the offsets for partitions that we still own
1694
1694
Map <TopicPartition , OffsetAndMetadata > commits = this .commitsDuringRebalance .entrySet ()
1695
1695
.stream ()
@@ -1729,7 +1729,8 @@ private void debugRecords(@Nullable ConsumerRecords<K, V> records) {
1729
1729
.flatMap (p -> records .records (p ).stream ())
1730
1730
// map to same format as send metadata toString()
1731
1731
.map (r -> r .topic () + "-" + r .partition () + "@" + r .offset ())
1732
- .collect (Collectors .toList ()).toString ());
1732
+ .toList ()
1733
+ .toString ());
1733
1734
}
1734
1735
}
1735
1736
}
@@ -1756,11 +1757,11 @@ private void pauseConsumerIfNecessary() {
1756
1757
}
1757
1758
1758
1759
private void doPauseConsumerIfNecessary () {
1759
- if (this .pausedForNack .size () > 0 ) {
1760
+ if (! this .pausedForNack .isEmpty () ) {
1760
1761
this .logger .debug ("Still paused for nack sleep" );
1761
1762
return ;
1762
1763
}
1763
- if (this .offsetsInThisBatch != null && this .offsetsInThisBatch .size () > 0 && !this .pausedForAsyncAcks ) {
1764
+ if (this .offsetsInThisBatch != null && ! this .offsetsInThisBatch .isEmpty () && !this .pausedForAsyncAcks ) {
1764
1765
this .pausedForAsyncAcks = true ;
1765
1766
this .logger .debug (() -> "Pausing for incomplete async acks: " + this .offsetsInThisBatch );
1766
1767
}
@@ -1801,7 +1802,7 @@ else if (this.offsetsInThisBatch != null) {
1801
1802
}
1802
1803
1803
1804
private void doResumeConsumerIfNeccessary () {
1804
- if (this .pausedForAsyncAcks && this .offsetsInThisBatch .size () == 0 ) {
1805
+ if (this .pausedForAsyncAcks && this .offsetsInThisBatch .isEmpty () ) {
1805
1806
this .pausedForAsyncAcks = false ;
1806
1807
this .logger .debug ("Resuming after manual async acks cleared" );
1807
1808
}
@@ -1823,8 +1824,8 @@ private void pausePartitionsIfNecessary() {
1823
1824
.stream ()
1824
1825
.filter (tp -> isPartitionPauseRequested (tp )
1825
1826
&& !pausedConsumerPartitions .contains (tp ))
1826
- .collect ( Collectors . toList () );
1827
- if (partitionsToPause .size () > 0 ) {
1827
+ .toList ();
1828
+ if (! partitionsToPause .isEmpty () ) {
1828
1829
this .consumer .pause (partitionsToPause );
1829
1830
this .pausedPartitions .addAll (partitionsToPause );
1830
1831
this .logger .debug (() -> "Paused consumption from " + partitionsToPause );
@@ -1840,8 +1841,8 @@ private void resumePartitionsIfNecessary() {
1840
1841
.stream ()
1841
1842
.filter (tp -> !isPartitionPauseRequested (tp )
1842
1843
&& this .pausedPartitions .contains (tp ))
1843
- .collect ( Collectors . toList () );
1844
- if (partitionsToResume .size () > 0 ) {
1844
+ .toList ();
1845
+ if (! partitionsToResume .isEmpty () ) {
1845
1846
this .consumer .resume (partitionsToResume );
1846
1847
this .pausedPartitions .removeAll (partitionsToResume );
1847
1848
this .logger .debug (() -> "Resumed consumption from " + partitionsToResume );
@@ -1877,7 +1878,7 @@ private void checkIdle() {
1877
1878
private void idleBetweenPollIfNecessary () {
1878
1879
long idleBetweenPolls = this .containerProperties .getIdleBetweenPolls ();
1879
1880
Collection <TopicPartition > assigned = getAssignedPartitions ();
1880
- if (idleBetweenPolls > 0 && assigned != null && assigned .size () > 0 ) {
1881
+ if (idleBetweenPolls > 0 && assigned != null && ! assigned .isEmpty () ) {
1881
1882
idleBetweenPolls = Math .min (idleBetweenPolls ,
1882
1883
this .maxPollInterval - (System .currentTimeMillis () - this .lastPoll )
1883
1884
- 5000 ); // NOSONAR - less by five seconds to avoid race condition with rebalance
@@ -1960,7 +1961,7 @@ protected void handleConsumerException(Exception e) {
1960
1961
1961
1962
private void commitPendingAcks () {
1962
1963
processCommits ();
1963
- if (this .offsets .size () > 0 ) {
1964
+ if (! this .offsets .isEmpty () ) {
1964
1965
// we always commit after stopping the invoker
1965
1966
commitIfNecessary ();
1966
1967
}
@@ -2056,19 +2057,19 @@ private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
2056
2057
TopicPartition part = new TopicPartition (record .topic (), record .partition ());
2057
2058
List <Long > offs = this .offsetsInThisBatch .get (part );
2058
2059
List <ConsumerRecord <K , V >> deferred = this .deferredOffsets .get (part );
2059
- if (offs .size () > 0 ) {
2060
+ if (! offs .isEmpty () ) {
2060
2061
if (offs .get (0 ) == record .offset ()) {
2061
2062
offs .remove (0 );
2062
2063
ConsumerRecord <K , V > recordToAck = record ;
2063
- if (deferred .size () > 0 ) {
2064
+ if (! deferred .isEmpty () ) {
2064
2065
Collections .sort (deferred , (a , b ) -> Long .compare (a .offset (), b .offset ()));
2065
- while (deferred .size () > 0 && deferred .get (0 ).offset () == recordToAck .offset () + 1 ) {
2066
+ while (! deferred .isEmpty () && deferred .get (0 ).offset () == recordToAck .offset () + 1 ) {
2066
2067
recordToAck = deferred .remove (0 );
2067
2068
offs .remove (0 );
2068
2069
}
2069
2070
}
2070
2071
processAck (recordToAck );
2071
- if (offs .size () == 0 ) {
2072
+ if (offs .isEmpty () ) {
2072
2073
this .deferredOffsets .remove (part );
2073
2074
this .offsetsInThisBatch .remove (part );
2074
2075
}
@@ -2148,7 +2149,7 @@ private void invokeBatchListener(final ConsumerRecords<K, V> recordsArg) {
2148
2149
if (!this .wantsFullRecords ) {
2149
2150
recordList = createRecordList (records );
2150
2151
}
2151
- if (this .wantsFullRecords || recordList .size () > 0 ) {
2152
+ if (this .wantsFullRecords || ! recordList .isEmpty () ) {
2152
2153
if (this .transactionTemplate != null ) {
2153
2154
invokeBatchListenerInTx (records , recordList ); // NOSONAR
2154
2155
}
@@ -2625,7 +2626,7 @@ private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
2625
2626
remaining .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
2626
2627
tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
2627
2628
}
2628
- if (remaining .size () > 0 ) {
2629
+ if (! remaining .isEmpty () ) {
2629
2630
this .remainingRecords = new ConsumerRecords <>(remaining );
2630
2631
return true ;
2631
2632
}
@@ -2693,7 +2694,7 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
2693
2694
Iterator <ConsumerRecord <K , V >> iterator2 = records .iterator ();
2694
2695
while (iterator2 .hasNext ()) {
2695
2696
ConsumerRecord <K , V > next = iterator2 .next ();
2696
- if (list .size () > 0 || recordsEqual (record , next )) {
2697
+ if (! list .isEmpty () || recordsEqual (record , next )) {
2697
2698
list .add (next );
2698
2699
}
2699
2700
}
@@ -2908,7 +2909,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
2908
2909
records .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
2909
2910
tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
2910
2911
}
2911
- if (records .size () > 0 ) {
2912
+ if (! records .isEmpty () ) {
2912
2913
this .remainingRecords = new ConsumerRecords <>(records );
2913
2914
this .pauseForPending = true ;
2914
2915
}
@@ -3180,10 +3181,10 @@ private void initPartitionsIfNeeded() {
3180
3181
private void doInitialSeeks (Map <TopicPartition , OffsetMetadata > partitions , Set <TopicPartition > beginnings ,
3181
3182
Set <TopicPartition > ends ) {
3182
3183
3183
- if (beginnings .size () > 0 ) {
3184
+ if (! beginnings .isEmpty () ) {
3184
3185
this .consumer .seekToBeginning (beginnings );
3185
3186
}
3186
- if (ends .size () > 0 ) {
3187
+ if (! ends .isEmpty () ) {
3187
3188
this .consumer .seekToEnd (ends );
3188
3189
}
3189
3190
for (Entry <TopicPartition , OffsetMetadata > entry : partitions .entrySet ()) {
@@ -3312,7 +3313,7 @@ public void seekToBeginning(String topic, int partition) {
3312
3313
public void seekToBeginning (Collection <TopicPartition > partitions ) {
3313
3314
this .seeks .addAll (partitions .stream ()
3314
3315
.map (tp -> new TopicPartitionOffset (tp .topic (), tp .partition (), SeekPosition .BEGINNING ))
3315
- .collect ( Collectors . toList () ));
3316
+ .toList ());
3316
3317
}
3317
3318
3318
3319
@ Override
@@ -3324,7 +3325,7 @@ public void seekToEnd(String topic, int partition) {
3324
3325
public void seekToEnd (Collection <TopicPartition > partitions ) {
3325
3326
this .seeks .addAll (partitions .stream ()
3326
3327
.map (tp -> new TopicPartitionOffset (tp .topic (), tp .partition (), SeekPosition .END ))
3327
- .collect ( Collectors . toList () ));
3328
+ .toList ());
3328
3329
}
3329
3330
3330
3331
@ Override
@@ -3564,15 +3565,15 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3564
3565
toRepause .add (tp );
3565
3566
}
3566
3567
});
3567
- if (!ListenerConsumer .this .consumerPaused && toRepause .size () > 0 ) {
3568
+ if (!ListenerConsumer .this .consumerPaused && ! toRepause .isEmpty () ) {
3568
3569
ListenerConsumer .this .consumer .pause (toRepause );
3569
3570
ListenerConsumer .this .logger .debug (() -> "Paused consumption from: " + toRepause );
3570
3571
publishConsumerPausedEvent (toRepause , "Re-paused after rebalance" );
3571
3572
}
3572
3573
this .revoked .removeAll (toRepause );
3573
3574
ListenerConsumer .this .pausedPartitions .removeAll (this .revoked );
3574
3575
this .revoked .clear ();
3575
- if (ListenerConsumer .this .pausedForNack .size () > 0 ) {
3576
+ if (! ListenerConsumer .this .pausedForNack .isEmpty () ) {
3576
3577
ListenerConsumer .this .consumer .pause (ListenerConsumer .this .pausedForNack );
3577
3578
}
3578
3579
}
@@ -3597,7 +3598,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
3597
3598
return false ;
3598
3599
}
3599
3600
}
3600
- if (offsetsToCommit .size () > 0 ) {
3601
+ if (! offsetsToCommit .isEmpty () ) {
3601
3602
commitCurrentOffsets (offsetsToCommit );
3602
3603
}
3603
3604
return true ;
0 commit comments