Skip to content

Commit d2ed076

Browse files
committed
* Add method consumerWakeIfNecessary() to reuse consumer.wakeIfNecessary().
* Add method `getTxProducer()`. * Optimize `determineListenerType()`. * Replace iterator to for loop or `forEachRemaining()`. * Change `OffsetMetadata` type to record.
1 parent cd4341c commit d2ed076

File tree

1 file changed

+46
-78
lines changed

1 file changed

+46
-78
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 46 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import java.nio.ByteBuffer;
2020
import java.time.Duration;
21-
import java.util.AbstractMap.SimpleEntry;
2221
import java.util.ArrayList;
2322
import java.util.Arrays;
2423
import java.util.Collection;
2524
import java.util.Collections;
25+
import java.util.Comparator;
2626
import java.util.HashMap;
2727
import java.util.HashSet;
2828
import java.util.Iterator;
@@ -305,8 +305,7 @@ public boolean isContainerPaused() {
305305

306306
@Override
307307
public boolean isPartitionPaused(TopicPartition topicPartition) {
308-
return this.listenerConsumer != null && this.listenerConsumer
309-
.isPartitionPaused(topicPartition);
308+
return this.listenerConsumer != null && this.listenerConsumer.isPartitionPaused(topicPartition);
310309
}
311310

312311
@Override
@@ -317,33 +316,28 @@ public boolean isInExpectedState() {
317316
@Override
318317
public void enforceRebalance() {
319318
this.thisOrParentContainer.enforceRebalanceRequested.set(true);
320-
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
321-
if (consumer != null) {
322-
consumer.wakeIfNecessary();
323-
}
319+
consumerWakeIfNecessary();
324320
}
325321

326322
@Override
327323
public void pause() {
328324
super.pause();
329-
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
330-
if (consumer != null) {
331-
consumer.wakeIfNecessary();
332-
}
325+
consumerWakeIfNecessary();
333326
}
334327

335328
@Override
336329
public void resume() {
337330
super.resume();
338-
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
339-
if (consumer != null) {
340-
consumer.wakeIfNecessary();
341-
}
331+
consumerWakeIfNecessary();
342332
}
343333

344334
@Override
345335
public void resumePartition(TopicPartition topicPartition) {
346336
super.resumePartition(topicPartition);
337+
consumerWakeIfNecessary();
338+
}
339+
340+
private void consumerWakeIfNecessary() {
347341
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
348342
if (consumer != null) {
349343
consumer.wakeIfNecessary();
@@ -422,15 +416,11 @@ private void checkAckMode(ContainerProperties containerProperties) {
422416
}
423417

424418
private ListenerType determineListenerType(GenericMessageListener<?> listener) {
425-
ListenerType listenerType = ListenerUtils.determineListenerType(listener);
426-
if (listener instanceof DelegatingMessageListener) {
427-
Object delegating = listener;
428-
while (delegating instanceof DelegatingMessageListener<?> dml) {
429-
delegating = dml.getDelegate();
430-
}
431-
listenerType = ListenerUtils.determineListenerType(delegating);
419+
Object delegating = listener;
420+
while (delegating instanceof DelegatingMessageListener<?> dml) {
421+
delegating = dml.getDelegate();
432422
}
433-
return listenerType;
423+
return ListenerUtils.determineListenerType(delegating);
434424
}
435425

436426
@Override
@@ -1586,7 +1576,7 @@ private void fixTxOffsetsIfNeeded() {
15861576
this.lastCommits.forEach((tp, oamd) -> {
15871577
long position = this.consumer.position(tp);
15881578
Long saved = this.savedPositions.get(tp);
1589-
if (saved != null && saved.longValue() != position) {
1579+
if (saved != null && saved != position) {
15901580
this.logger.debug(() -> "Skipping TX offset correction - seek(s) have been performed; "
15911581
+ "saved: " + this.savedPositions + ", "
15921582
+ "committed: " + oamd + ", "
@@ -1609,9 +1599,7 @@ private void fixTxOffsetsIfNeeded() {
16091599
}
16101600
else {
16111601
this.transactionTemplate.executeWithoutResult(status -> {
1612-
doSendOffsets(((KafkaResourceHolder) TransactionSynchronizationManager
1613-
.getResource(this.kafkaTxManager.getProducerFactory()))
1614-
.getProducer(), toFix);
1602+
doSendOffsets(getTxProducer(), toFix);
16151603
});
16161604
}
16171605
}
@@ -2088,7 +2076,7 @@ private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) {
20882076
offs.remove(0);
20892077
ConsumerRecord<K, V> recordToAck = cRecord;
20902078
if (!deferred.isEmpty()) {
2091-
Collections.sort(deferred, (a, b) -> Long.compare(a.offset(), b.offset()));
2079+
deferred.sort(Comparator.comparingLong(ConsumerRecord::offset));
20922080
while (!ObjectUtils.isEmpty(deferred) && deferred.get(0).offset() == recordToAck.offset() + 1) {
20932081
recordToAck = deferred.remove(0);
20942082
offs.remove(0);
@@ -2195,9 +2183,7 @@ private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
21952183
@Override
21962184
public void doInTransactionWithoutResult(TransactionStatus s) {
21972185
if (ListenerConsumer.this.kafkaTxManager != null) {
2198-
ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager
2199-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
2200-
.getProducer(); // NOSONAR nullable
2186+
ListenerConsumer.this.producer = getTxProducer();
22012187
}
22022188
RuntimeException aborted = doInvokeBatchListener(records, recordList);
22032189
if (aborted != null) {
@@ -2254,10 +2240,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
22542240
}
22552241

22562242
private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
2257-
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
22582243
List<ConsumerRecord<K, V>> list = new LinkedList<>();
2259-
while (iterator.hasNext()) {
2260-
list.add(iterator.next());
2244+
for (ConsumerRecord<K, V> record : records) {
2245+
list.add(record);
22612246
}
22622247
return list;
22632248
}
@@ -2324,9 +2309,7 @@ private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecords<K, V>
23242309
|| this.producer != null) {
23252310
if (this.remainingRecords != null) {
23262311
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
2327-
Iterator<ConsumerRecord<K, V>> it = records.iterator();
2328-
while (it.hasNext()) {
2329-
ConsumerRecord<K, V> next = it.next();
2312+
for (ConsumerRecord<K, V> next : records) {
23302313
if (!next.equals(firstUncommitted)) {
23312314
this.acks.add(next);
23322315
}
@@ -2433,7 +2416,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24332416
ConsumerRecords<K, V> records = recordsArg;
24342417
List<ConsumerRecord<K, V>> recordList = recordListArg;
24352418
if (this.listenerinfo != null) {
2436-
records.iterator().forEachRemaining(this::listenerInfo);
2419+
records.forEach(this::listenerInfo);
24372420
}
24382421
if (this.batchInterceptor != null) {
24392422
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
@@ -2516,7 +2499,6 @@ private void invokeRecordListener(final ConsumerRecords<K, V> records) {
25162499
* Invoke the listener with each record in a separate transaction.
25172500
* @param records the records.
25182501
*/
2519-
@SuppressWarnings(RAWTYPES) // NOSONAR complexity
25202502
private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
25212503
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
25222504
while (iterator.hasNext()) {
@@ -2561,9 +2543,7 @@ private void invokeInTransaction(Iterator<ConsumerRecord<K, V>> iterator, final
25612543
@Override
25622544
public void doInTransactionWithoutResult(TransactionStatus s) {
25632545
if (ListenerConsumer.this.kafkaTxManager != null) {
2564-
ListenerConsumer.this.producer = ((KafkaResourceHolder) TransactionSynchronizationManager
2565-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
2566-
.getProducer(); // NOSONAR
2546+
ListenerConsumer.this.producer = getTxProducer();
25672547
}
25682548
RuntimeException aborted = doInvokeRecordListener(cRecord, iterator);
25692549
if (aborted != null) {
@@ -2579,9 +2559,7 @@ private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, final
25792559

25802560
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
25812561
unprocessed.add(cRecord);
2582-
while (iterator.hasNext()) {
2583-
unprocessed.add(iterator.next());
2584-
}
2562+
iterator.forEachRemaining(unprocessed::add);
25852563
@SuppressWarnings(UNCHECKED)
25862564
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse =
25872565
(AfterRollbackProcessor<K, V>) getAfterRollbackProcessor();
@@ -2639,11 +2617,10 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
26392617
private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
26402618
if (isPauseRequested() && this.pauseImmediate) {
26412619
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new LinkedHashMap<>();
2642-
while (iterator.hasNext()) {
2643-
ConsumerRecord<K, V> next = iterator.next();
2620+
iterator.forEachRemaining(next -> {
26442621
remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
2645-
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
2646-
}
2622+
tp -> new ArrayList<>()).add(next);
2623+
});
26472624
if (!remaining.isEmpty()) {
26482625
this.remainingRecords = new ConsumerRecords<>(remaining);
26492626
return true;
@@ -2712,9 +2689,7 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
27122689
processCommits();
27132690
}
27142691
List<ConsumerRecord<?, ?>> list = new ArrayList<>();
2715-
Iterator<ConsumerRecord<K, V>> iterator2 = records.iterator();
2716-
while (iterator2.hasNext()) {
2717-
ConsumerRecord<K, V> next = iterator2.next();
2692+
for (ConsumerRecord<K, V> next : records) {
27182693
if (!list.isEmpty() || recordsEqual(cRecord, next)) {
27192694
list.add(next);
27202695
}
@@ -2755,6 +2730,13 @@ private void pauseForNackSleep() {
27552730
this.nackSleepDurationMillis = -1;
27562731
}
27572732

2733+
@SuppressWarnings(RAWTYPES)
2734+
private Producer<?, ?> getTxProducer() {
2735+
return ((KafkaResourceHolder) TransactionSynchronizationManager
2736+
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
2737+
.getProducer(); // NOSONAR
2738+
}
2739+
27582740
/**
27592741
* Actually invoke the listener.
27602742
* @param cRecord the record.
@@ -2911,9 +2893,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
29112893
}
29122894
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
29132895
records.add(cRecord);
2914-
while (iterator.hasNext()) {
2915-
records.add(iterator.next());
2916-
}
2896+
iterator.forEachRemaining(records::add);
29172897
this.commonErrorHandler.handleRemaining(rte, records, this.consumer,
29182898
KafkaMessageListenerContainer.this.thisOrParentContainer);
29192899
}
@@ -2929,12 +2909,9 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
29292909
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
29302910
if (!handled) {
29312911
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
2932-
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(cRecord);
2933-
while (iterator.hasNext()) {
2934-
ConsumerRecord<K, V> next = iterator.next();
2935-
records.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
2936-
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
2937-
}
2912+
tp -> new ArrayList<>()).add(cRecord);
2913+
iterator.forEachRemaining(next -> records.computeIfAbsent(
2914+
new TopicPartition(next.topic(), next.partition()), tp -> new ArrayList<>()).add(next));
29382915
}
29392916
if (!records.isEmpty()) {
29402917
this.remainingRecords = new ConsumerRecords<>(records);
@@ -3201,9 +3178,7 @@ private void initPartitionsIfNeeded() {
32013178
doInitialSeeks(partitions, beginnings, ends);
32023179
if (this.consumerSeekAwareListener != null) {
32033180
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
3204-
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
3205-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
3206-
this.seekCallback);
3181+
.collect(Collectors.toMap(tp -> tp, this.consumer::position)), this.seekCallback);
32073182
}
32083183
}
32093184

@@ -3884,20 +3859,13 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
38843859
}
38853860

38863861

3887-
private static final class OffsetMetadata {
3888-
3889-
final Long offset; // NOSONAR
3890-
3891-
final boolean relativeToCurrent; // NOSONAR
3892-
3893-
final SeekPosition seekPosition; // NOSONAR
3894-
3895-
OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) {
3896-
this.offset = offset;
3897-
this.relativeToCurrent = relativeToCurrent;
3898-
this.seekPosition = seekPosition;
3899-
}
3900-
3862+
/**
3863+
* Offset metadata record.
3864+
* @param offset current offset.
3865+
* @param relativeToCurrent relative to current.
3866+
* @param seekPosition seek position strategy.
3867+
*/
3868+
private record OffsetMetadata(Long offset, boolean relativeToCurrent, SeekPosition seekPosition) {
39013869
}
39023870

39033871
private class StopCallback implements BiConsumer<Object, Throwable> {

0 commit comments

Comments
 (0)