Skip to content

Commit a35a6c5

Browse files
committed
GH-1876: Fix new Sonar Issues
1 parent 3b06529 commit a35a6c5

File tree

2 files changed

+13
-7
lines changed

2 files changed

+13
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2712,6 +2712,18 @@ private void initPartitionsIfNeeded() {
27122712
partitions.put(tp, new OffsetMetadata(off.offset(), false, SeekPosition.TIMESTAMP));
27132713
}
27142714
});
2715+
doInitialSeeks(partitions, beginnings, ends);
2716+
if (this.consumerSeekAwareListener != null) {
2717+
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
2718+
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
2719+
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
2720+
this.seekCallback);
2721+
}
2722+
}
2723+
2724+
private void doInitialSeeks(Map<TopicPartition, OffsetMetadata> partitions, Set<TopicPartition> beginnings,
2725+
Set<TopicPartition> ends) {
2726+
27152727
if (beginnings.size() > 0) {
27162728
this.consumer.seekToBeginning(beginnings);
27172729
}
@@ -2746,12 +2758,6 @@ else if (metadata.relativeToCurrent) {
27462758
}
27472759
}
27482760
}
2749-
if (this.consumerSeekAwareListener != null) {
2750-
this.consumerSeekAwareListener.onPartitionsAssigned(partitions.keySet().stream()
2751-
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
2752-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
2753-
this.seekCallback);
2754-
}
27552761
}
27562762

27572763
private void logReset(TopicPartition topicPartition, long newOffset) {

spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
103103
}
104104

105105
@Override
106-
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
106+
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
107107
MessageListenerContainer container) {
108108

109109
doHandle(thrownException, data, consumer, container, () -> { });

0 commit comments

Comments
 (0)