diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 2b50071a7f771..1b3060f6d2495 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -198,12 +198,19 @@ public OffsetMetadataManager build() { private final TimelineHashMap> openTransactionsByGroup; private class Offsets { + /** + * Whether to preserve empty entries for groups when removing offsets. + * We use this to keep track of the groups associated with pending transactions. + */ + private final boolean preserveGroups; + /** * The offsets keyed by group id, topic name and partition id. */ private final TimelineHashMap>> offsetsByGroup; - private Offsets() { + private Offsets(boolean preserveGroups) { + this.preserveGroups = preserveGroups; this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); } @@ -256,7 +263,7 @@ private OffsetAndMetadata remove( if (partitionOffsets.isEmpty()) topicOffsets.remove(topic); - if (topicOffsets.isEmpty()) + if (!preserveGroups && topicOffsets.isEmpty()) offsetsByGroup.remove(groupId); return removedValue; @@ -278,7 +285,7 @@ private OffsetAndMetadata remove( this.groupMetadataManager = groupMetadataManager; this.config = config; this.metrics = metrics; - this.offsets = new Offsets(); + this.offsets = new Offsets(false); this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0); this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); } @@ -851,7 +858,7 @@ public boolean cleanupExpiredOffsets(String groupId, List rec TimelineHashMap> offsetsByTopic = offsets.offsetsByGroup.get(groupId); if (offsetsByTopic == null) { - return true; + return !openTransactionsByGroup.containsKey(groupId); } // We expect the group to exist. @@ -995,7 +1002,7 @@ public void replay( // offsets store. Pending offsets there are moved to the main store when // the transaction is committed; or removed when the transaction is aborted. pendingTransactionalOffsets - .computeIfAbsent(producerId, __ -> new Offsets()) + .computeIfAbsent(producerId, __ -> new Offsets(true)) .put( groupId, topic, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 6f788d84fd009..be4063bc1b012 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -2593,6 +2593,103 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() { assertEquals(List.of(), records); } + @Test + public void testCleanupExpiredOffsetsWithDeletedPendingTransactionalOffsets() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .withOffsetsRetentionMinutes(1) + .build(); + + long commitTimestamp = context.time.milliseconds(); + + context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); + context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.of( + new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); + when(group.isSubscribedToTopic("foo")).thenReturn(false); + + // Delete the pending transactional offset. + OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = + new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List.of( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName("foo") + .setPartitions(List.of( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1) + )) + ).iterator()); + CoordinatorResult result = context.deleteOffsets( + new OffsetDeleteRequestData() + .setGroupId("group-id") + .setTopics(requestTopicCollection) + ); + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1) + ); + assertEquals(expectedRecords, result.records()); + + context.time.sleep(Duration.ofMinutes(1).toMillis()); + + // The group should not be deleted because it has a pending transaction. + expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0) + ); + List records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + + // Commit the ongoing transaction. + context.replayEndTransactionMarker(10L, TransactionResult.COMMIT); + + // The group should be deletable now. + context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); + context.time.sleep(Duration.ofMinutes(1).toMillis()); + + records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + } + + @Test + public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .withOffsetsRetentionMinutes(1) + .build(); + + long commitTimestamp = context.time.milliseconds(); + + context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); + context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500); + + context.time.sleep(Duration.ofMinutes(1).toMillis()); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.of( + new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); + when(group.isSubscribedToTopic("foo")).thenReturn(false); + + // foo-0 is expired, but the group is not deleted beacuse it has pending transactional offset commits. + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0) + ); + List records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + + // No offsets are expired, and the group is still not deleted because it has pending transactional offset commits. + records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(List.of(), records); + } + private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse( int partition, long offset,