Skip to content

Commit 0278465

Browse files
gurpiarbassigaryrussell
authored andcommitted
GH-2218 add ability to pass AdminClient
- Useful when you want to pass an AdminClient that has already been initialised with specific properties to connect to the cluster e.g. SSL properties.
1 parent 10905dc commit 0278465

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -244,6 +244,23 @@ public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String
244244
}
245245
}
246246

247+
/**
248+
* Get the current offset and metadata for the provided group/topic/partition.
249+
* @param adminClient the AdminClient instance.
250+
* @param group the group.
251+
* @param topic the topic.
252+
* @param partition the partition.
253+
* @return the offset and metadata.
254+
* @throws Exception if an exception occurs.
255+
* @since 3.0
256+
*/
257+
public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition)
258+
throws Exception { // NOSONAR
259+
260+
return adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get() // NOSONAR false positive
261+
.get(new TopicPartition(topic, partition));
262+
}
263+
247264
/**
248265
* Return the end offsets of the requested topic/partitions
249266
* @param consumer the consumer.

spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
2121

2222
import java.util.Map;
2323

24+
import org.apache.kafka.clients.admin.AdminClient;
25+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2426
import org.apache.kafka.clients.consumer.ConsumerRecord;
2527
import org.apache.kafka.clients.consumer.ConsumerRecords;
2628
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -128,4 +130,21 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
128130
consumer.close();
129131
}
130132

133+
@Test
134+
public void testGetCurrentOffsetWithAdminClient(EmbeddedKafkaBroker broker) throws Exception {
135+
Map<String, Object> adminClientProps = Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
136+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
137+
try (AdminClient adminClient = AdminClient.create(adminClientProps); KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps)) {
138+
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));
139+
140+
KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "testGetCurrentOffsetWithAdminClient",
141+
"singleTopic3", 0, false, true, 10_000L);
142+
assertThat(KafkaTestUtils.getCurrentOffset(adminClient, "testGetCurrentOffsetWithAdminClient", "singleTopic3", 0))
143+
.isNotNull()
144+
.extracting(omd -> omd.offset())
145+
.isEqualTo(1L);
146+
}
147+
148+
}
149+
131150
}

0 commit comments

Comments
 (0)