Skip to content

KAFKA-19160: Improve performance of fetching stable offsets #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,16 @@ public OffsetMetadataManager build() {

/**
* The open transactions (producer ids) keyed by group.
* Tracks whether groups have any open transactions.
*/
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;

/**
* The open transactions (producer ids) keyed by group id, topic name and partition id.
* Tracks whether partitions have any pending transactional offsets.
*/
private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroupTopicAndPartition;

private class Offsets {
/**
* The offsets keyed by group id, topic name and partition id.
Expand Down Expand Up @@ -281,6 +288,7 @@ private OffsetAndMetadata remove(
this.offsets = new Offsets();
this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactionsByGroupTopicAndPartition = new TimelineHashMap<>(snapshotRegistry, 0);
}

/**
Expand Down Expand Up @@ -650,24 +658,18 @@ public int deleteAllOffsets(
// Delete all the pending transactional offsets too. Here we only write a tombstone
// if the topic-partition was not in the main storage because we don't need to write
// two consecutive tombstones.
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions != null) {
openTransactions.forEach(producerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(producerId);
if (pendingOffsets != null) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> pendingGroupOffsets =
pendingOffsets.offsetsByGroup.get(groupId);
if (pendingGroupOffsets != null) {
pendingGroupOffsets.forEach((topic, offsetsByPartition) -> {
offsetsByPartition.keySet().forEach(partition -> {
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
}
});
});
}
}
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic != null) {
openTransactionsByTopic.forEach((topic, openTransactionsByPartition) -> {
openTransactionsByPartition.forEach((partition, producerIds) -> {
producerIds.forEach(producerId -> {
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
numDeletedOffsets.getAndIncrement();
}
});
});
});
}
Comment on lines +661 to 674
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Duplicate tombstones may be emitted for the same partition

producerIds.forEach(...) iterates once per producer id, potentially adding the same tombstone several times when multiple producers have pending offsets for the identical <group, topic, partition>.
This inflates records and over‑counts numDeletedOffsets, causing needless log traffic and skewed metrics.

- openTransactionsByPartition.forEach((partition, producerIds) -> {
-     producerIds.forEach(producerId -> {
-         if (!hasCommittedOffset(groupId, topic, partition)) {
-             records.add(GroupCoordinatorRecordHelpers
-                 .newOffsetCommitTombstoneRecord(groupId, topic, partition));
-             numDeletedOffsets.getAndIncrement();
-         }
-     });
- });
+ openTransactionsByPartition.keySet().forEach(partition -> {
+     if (!hasCommittedOffset(groupId, topic, partition)) {
+         records.add(GroupCoordinatorRecordHelpers
+             .newOffsetCommitTombstoneRecord(groupId, topic, partition));
+         numDeletedOffsets.getAndIncrement();
+     }
+ });

This preserves the original semantics while guaranteeing a single tombstone per partition.


Expand All @@ -685,17 +687,15 @@ boolean hasPendingTransactionalOffsets(
String topic,
int partition
) {
final TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions == null) return false;
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic == null) return false;

for (Long producerId : openTransactions) {
Offsets offsets = pendingTransactionalOffsets.get(producerId);
if (offsets != null && offsets.get(groupId, topic, partition) != null) {
return true;
}
}
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return false;

return false;
TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition);
return openTransactions != null && !openTransactions.isEmpty();
}

/**
Expand Down Expand Up @@ -1005,21 +1005,41 @@ public void replay(
openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
openTransactionsByGroupTopicAndPartition
.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 1))
.computeIfAbsent(partition, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
}
} else {
if (offsets.remove(groupId, topic, partition) != null) {
metrics.decrementNumOffsets();
}

// Remove all the pending offset commits related to the tombstone.
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions != null) {
openTransactions.forEach(openProducerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
if (pendingOffsets != null) {
pendingOffsets.remove(groupId, topic, partition);
TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic != null) {
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition != null) {
TimelineHashSet<Long> openTransactions = openTransactionsByPartition.get(partition);
if (openTransactions != null) {
openTransactions.forEach(openProducerId -> {
Offsets pendingOffsets = pendingTransactionalOffsets.get(openProducerId);
if (pendingOffsets != null) {
pendingOffsets.remove(groupId, topic, partition);
}
});

openTransactionsByPartition.remove(partition);
if (openTransactionsByPartition.isEmpty()) {
openTransactionsByTopic.remove(topic);
}
if (openTransactionsByTopic.isEmpty()) {
openTransactionsByGroupTopicAndPartition.remove(groupId);
}
}
});
}
}
}
}
Expand All @@ -1031,6 +1051,7 @@ public void replay(
* @param result The result of the transaction.
* @throws RuntimeException if the transaction can not be completed.
*/
@SuppressWarnings("NPathComplexity")
public void replayEndTransactionMarker(
long producerId,
TransactionResult result
Expand All @@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker(
return;
}

pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions != null) {
openTransactions.remove(producerId);
if (openTransactions.isEmpty()) {
pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
TimelineHashSet<Long> groupTransactions = openTransactionsByGroup.get(groupId);
if (groupTransactions != null) {
groupTransactions.remove(producerId);
if (groupTransactions.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}

TimelineHashMap<String, TimelineHashMap<Integer, TimelineHashSet<Long>>> openTransactionsByTopic =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic == null) return;

topicOffsets.forEach((topic, partitionOffsets) -> {
TimelineHashMap<Integer, TimelineHashSet<Long>> openTransactionsByPartition = openTransactionsByTopic.get(topic);
if (openTransactionsByPartition == null) return;

partitionOffsets.keySet().forEach(partitionId -> {
TimelineHashSet<Long> partitionTransactions = openTransactionsByPartition.get(partitionId);
if (partitionTransactions != null) {
partitionTransactions.remove(producerId);
if (partitionTransactions.isEmpty()) {
openTransactionsByPartition.remove(partitionId);
}
if (openTransactionsByPartition.isEmpty()) {
openTransactionsByTopic.remove(topic);
}
if (openTransactionsByTopic.isEmpty()) {
openTransactionsByGroupTopicAndPartition.remove(groupId);
}
}
});
});
});

if (result == TransactionResult.COMMIT) {
Expand Down