Skip to content

Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -49,7 +54,7 @@
* @since 4.0
*/
@EmbeddedKafka(
topics = {"embedded-share-test"}, partitions = 1,
topics = {"embedded-share-test", "embedded-share-distribution-test"}, partitions = 1,
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,share",
Expand Down Expand Up @@ -144,7 +149,6 @@ void shouldReturnUnmodifiableListenersList() {
}

@Test
@SuppressWarnings("try")
void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) throws Exception {
final String topic = "embedded-share-test";
final String groupId = "testGroup";
Expand All @@ -159,23 +163,7 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get();
}

Map<String, Object> adminProperties = new HashMap<>();
adminProperties.put("bootstrap.servers", bootstrapServers);

// For this test: force new share groups to start from the beginning of the topic.
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config,
// so use AdminClient to set share.auto.offset.reset = earliest for our test group.
try (AdminClient ignored = AdminClient.create(adminProperties)) {
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);

Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op));

try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}
}
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);

var consumerProps = new HashMap<String, Object>();
consumerProps.put("bootstrap.servers", bootstrapServers);
Expand All @@ -197,4 +185,112 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
consumer.close();
}

@Test
void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception {
final String topic = "embedded-share-distribution-test";
final String groupId = "distributionTestGroup";
int recordCount = 8;
List<String> consumerIds = List.of("client-dist-1", "client-dist-2");
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. So, if we talk about all the data received (exclusively) independently of the number of consumers (queue semantics), then I wonder if just shared List to be fulfilled by those consumers would be enough for our test.
Then we just verify content of that list and check if there are no duplicates.
At the same time it might be hard to determine that those records indeed come from different consumers.
Not sure how to be, but Set is misleading here since it will eliminate duplicates.

recordCount, broker);

// Assert all records were received (no loss)
Set<String> allReceived = new java.util.HashSet<>();
consumerRecords.values().forEach(allReceived::addAll);
for (int i = 0; i < recordCount; i++) {
assertThat(allReceived)
.as("Should have received value " + topic + "-value-" + i)
.contains(topic + "-value-" + i);
}
}

/**
* Runs multiple Kafka consumers in parallel using ExecutorService, collects all records received,
* and returns a map of consumerId to the set of record values received by that consumer.
*/
private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId,
List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception {
var bootstrapServers = broker.getBrokersAsString();

var producerProps = new java.util.Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (var producer = new KafkaProducer<String, String>(producerProps)) {
for (int i = 0; i < recordCount; i++) {
producer.send(new ProducerRecord<>(topic, "key" + i, topic + "-value-" + i)).get();
}
producer.flush();
}

setShareAutoOffsetResetEarliest(bootstrapServers, groupId);

Map<String, Set<String>> consumerRecords = new java.util.concurrent.ConcurrentHashMap<>();
consumerIds.forEach(id -> consumerRecords.put(id,
java.util.Collections.synchronizedSet(new java.util.HashSet<>())));
var latch = new java.util.concurrent.CountDownLatch(recordCount);
var running = new java.util.concurrent.atomic.AtomicBoolean(true);
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> futures = new java.util.ArrayList<>();

// Consumer task: poll, acknowledge, and count down latch for new records
for (int i = 0; i < consumerIds.size(); i++) {
final int idx = i;
Future<?> future = executor.submit(() -> {
DefaultShareConsumerFactory<String, String> shareConsumerFactory = new DefaultShareConsumerFactory<>(
Map.of(
"bootstrap.servers", bootstrapServers,
"key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,
"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class
));
try (var consumer = shareConsumerFactory
.createShareConsumer(groupId, consumerIds.get(idx))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to create consumers in those threads?
Why cannot we do that outside for the executor.submit()?

Copy link
Contributor Author

@sobychacko sobychacko Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan Don't you think that creating the shared consumer outside of the thread creates some race conditions? For ex, if we extract the consumer creation outside of the submit, then the consumers may show unpredictable behavior, since both threads have access to both instances of the shared consumers. Since kafka consumers are inherently not thread safe, we may want to create the consumer within the thread bounds. Or did I understand it wrong? I tried your suggestion and then ran into issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. But that is not what I see in the KafkaMessageListenerContainer.
We create a ListenerConsumer from the doStart() and immediately there we call consumerFactory.createConsumer().
However the poll() we indeed call from the other thread.
Therefore my confusion here why would we create this shared consumer instance exactly in a thread where we poll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the framework code you mentioned in KafkaMessageListenerContainer, although we hand off the created consumer to another thread, in the doStart() method, we ensure that the created consumer instance is exactly what is handed off to the polling thread and therefore it is thread safe. However, in our test scenario here, unless we do the same thing, (i.e., create a dedicated consumer instance and then pass that exactly to the thread), we may run into thread safety issues. IMO, it might be okay to to do what we do right now in the test. But if you see any issues with that, I will try to refactor the test.

consumer.subscribe(Collections.singletonList(topic));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to move create and subscribe outside of the thread.
And only poll in that submitted task.
Doesn't that work?
Indeed it might be not related to our test case, but that how I feel consumers are supposed to be used.

while (running.get() && latch.getCount() > 0) {
var records = consumer.poll(Duration.ofMillis(200));
for (var r : records) {
if (consumerRecords.get(consumerIds.get(idx)).add(r.value())) {
consumer.acknowledge(r, AcknowledgeType.ACCEPT);
latch.countDown();
}
}
}
}
});
futures.add(future);
}

boolean completed = latch.await(10, java.util.concurrent.TimeUnit.SECONDS);
running.set(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead of this flag really close those consumers after that latch is fulfilled?

for (Future<?> future : futures) {
try {
future.get();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
executor.shutdown();
assertThat(completed)
.as("All records should be received within timeout")
.isTrue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need all of these extra variables and even don't need to deal with Future.
The executor.shutdown(); and then assert on the awaitTermination() should do the trick.
See that shutdown() JavaDocs.

return consumerRecords;
}

/**
* Sets the share.auto.offset.reset group config to earliest for the given groupId,
* using the provided bootstrapServers.
*/
private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProperties = new HashMap<>();
adminProperties.put("bootstrap.servers", bootstrapServers);
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op));
try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}
}

}