Skip to content

Commit 8c11aee

Browse files
committed
unit test
1 parent 5ebe55e commit 8c11aee

File tree

2 files changed

+47
-2
lines changed

2 files changed

+47
-2
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.coordinator.group;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.Uuid;
2021
import org.apache.kafka.common.errors.ApiException;
2122
import org.apache.kafka.common.errors.GroupIdNotFoundException;
2223
import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -739,7 +740,9 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets(
739740

740741
request.topics().forEach(topic -> {
741742
final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
742-
new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name());
743+
new OffsetFetchResponseData.OffsetFetchResponseTopics()
744+
.setTopicId(topic.topicId())
745+
.setName(topic.name());
743746
topicResponses.add(topicResponse);
744747

745748
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = groupOffsets == null ?
@@ -809,7 +812,11 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
809812
final TimelineHashMap<Integer, OffsetAndMetadata> topicOffsets = topicEntry.getValue();
810813

811814
final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse =
812-
new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic);
815+
new OffsetFetchResponseData.OffsetFetchResponseTopics()
816+
// It is set to zero for now but it will be set to the persisted
817+
// topic id along the committed offset, if present.
818+
.setTopicId(Uuid.ZERO_UUID)
819+
.setName(topic);
813820
topicResponses.add(topicResponse);
814821

815822
topicOffsets.entrySet(lastCommittedOffset).forEach(partitionEntry -> {

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

+38
Original file line numberDiff line numberDiff line change
@@ -1765,6 +1765,44 @@ public void testFetchOffsetsWithUnknownGroup() {
17651765
assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE));
17661766
}
17671767

1768+
@Test
1769+
public void testFetchOffsetsWithTopicIds() {
1770+
Uuid fooId = Uuid.randomUuid();
1771+
Uuid barId = Uuid.randomUuid();
1772+
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
1773+
1774+
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
1775+
1776+
context.commitOffset("group", "foo", 0, 100L, 1);
1777+
context.commitOffset("group", "bar", 0, 200L, 1);
1778+
1779+
List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(
1780+
new OffsetFetchRequestData.OffsetFetchRequestTopics()
1781+
.setName("foo")
1782+
.setTopicId(fooId)
1783+
.setPartitionIndexes(List.of(0)),
1784+
new OffsetFetchRequestData.OffsetFetchRequestTopics()
1785+
.setName("bar")
1786+
.setTopicId(barId)
1787+
.setPartitionIndexes(List.of(0))
1788+
);
1789+
1790+
assertEquals(List.of(
1791+
new OffsetFetchResponseData.OffsetFetchResponseTopics()
1792+
.setName("foo")
1793+
.setTopicId(fooId)
1794+
.setPartitions(List.of(
1795+
mkOffsetPartitionResponse(0, 100L, 1, "metadata")
1796+
)),
1797+
new OffsetFetchResponseData.OffsetFetchResponseTopics()
1798+
.setName("bar")
1799+
.setTopicId(barId)
1800+
.setPartitions(List.of(
1801+
mkOffsetPartitionResponse(0, 200L, 1, "metadata")
1802+
))
1803+
), context.fetchOffsets("group", request, Long.MAX_VALUE));
1804+
}
1805+
17681806
@Test
17691807
public void testFetchOffsetsAtDifferentCommittedOffset() {
17701808
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();

0 commit comments

Comments
 (0)