Skip to content

Commit 23a103e

Browse files
rhryngaryrussell
rhryn
authored andcommitted
Log Uncommitted After Rebalance
When commits a retryable, some partitions may have been assigned to another instance, in which case, those offsets can't be committed. Log the offsets that could not be committed at WARN level. Changed log level to improve troubleshooting Changed log level to improve troubleshooting Changed log level to improve troubleshooting Fixed formatting
1 parent 1e82ac9 commit 23a103e

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1551,7 +1551,15 @@ private void checkRebalanceCommits() {
15511551
Map<TopicPartition, OffsetAndMetadata> commits = this.commitsDuringRebalance.entrySet()
15521552
.stream()
15531553
.filter(entry -> this.assignedPartitions.contains(entry.getKey()))
1554-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
1554+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
1555+
1556+
Map<TopicPartition, OffsetAndMetadata> uncommitted = this.commitsDuringRebalance.entrySet()
1557+
.stream()
1558+
.filter(entry -> !this.assignedPartitions.contains(entry.getKey()))
1559+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
1560+
this.logger.warn(() -> "These offsets could not be committed; partition(s) lost during rebalance: "
1561+
+ uncommitted);
1562+
15551563
this.commitsDuringRebalance.clear();
15561564
this.logger.debug(() -> "Commit list: " + commits);
15571565
commitSync(commits);

0 commit comments

Comments
 (0)