-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,16 +17,21 @@ | |
package org.springframework.kafka.core; | ||
|
||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
|
||
import org.apache.kafka.clients.admin.Admin; | ||
import org.apache.kafka.clients.admin.AdminClient; | ||
import org.apache.kafka.clients.admin.AlterConfigOp; | ||
import org.apache.kafka.clients.admin.ConfigEntry; | ||
import org.apache.kafka.clients.consumer.AcknowledgeType; | ||
import org.apache.kafka.clients.consumer.ShareConsumer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
|
@@ -49,7 +54,7 @@ | |
* @since 4.0 | ||
*/ | ||
@EmbeddedKafka( | ||
topics = {"embedded-share-test"}, partitions = 1, | ||
topics = {"embedded-share-test", "embedded-share-distribution-test"}, partitions = 1, | ||
brokerProperties = { | ||
"unstable.api.versions.enable=true", | ||
"group.coordinator.rebalance.protocols=classic,share", | ||
|
@@ -144,7 +149,6 @@ void shouldReturnUnmodifiableListenersList() { | |
} | ||
|
||
@Test | ||
@SuppressWarnings("try") | ||
void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) throws Exception { | ||
final String topic = "embedded-share-test"; | ||
final String groupId = "testGroup"; | ||
|
@@ -159,23 +163,7 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro | |
producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get(); | ||
} | ||
|
||
Map<String, Object> adminProperties = new HashMap<>(); | ||
adminProperties.put("bootstrap.servers", bootstrapServers); | ||
|
||
// For this test: force new share groups to start from the beginning of the topic. | ||
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config, | ||
// so use AdminClient to set share.auto.offset.reset = earliest for our test group. | ||
try (AdminClient ignored = AdminClient.create(adminProperties)) { | ||
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); | ||
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); | ||
|
||
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of( | ||
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op)); | ||
|
||
try (Admin admin = AdminClient.create(adminProperties)) { | ||
admin.incrementalAlterConfigs(configs).all().get(); | ||
} | ||
} | ||
setShareAutoOffsetResetEarliest(bootstrapServers, groupId); | ||
|
||
var consumerProps = new HashMap<String, Object>(); | ||
consumerProps.put("bootstrap.servers", bootstrapServers); | ||
|
@@ -197,4 +185,112 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro | |
consumer.close(); | ||
} | ||
|
||
@Test | ||
void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception { | ||
final String topic = "embedded-share-distribution-test"; | ||
final String groupId = "distributionTestGroup"; | ||
int recordCount = 8; | ||
List<String> consumerIds = List.of("client-dist-1", "client-dist-2"); | ||
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds, | ||
recordCount, broker); | ||
|
||
// Assert all records were received (no loss) | ||
Set<String> allReceived = new java.util.HashSet<>(); | ||
consumerRecords.values().forEach(allReceived::addAll); | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for (int i = 0; i < recordCount; i++) { | ||
assertThat(allReceived) | ||
.as("Should have received value " + topic + "-value-" + i) | ||
.contains(topic + "-value-" + i); | ||
} | ||
} | ||
|
||
/** | ||
* Runs multiple Kafka consumers in parallel using ExecutorService, collects all records received, | ||
* and returns a map of consumerId to the set of record values received by that consumer. | ||
*/ | ||
private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId, | ||
List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception { | ||
var bootstrapServers = broker.getBrokersAsString(); | ||
|
||
var producerProps = new java.util.Properties(); | ||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
try (var producer = new KafkaProducer<String, String>(producerProps)) { | ||
for (int i = 0; i < recordCount; i++) { | ||
producer.send(new ProducerRecord<>(topic, "key" + i, topic + "-value-" + i)).get(); | ||
} | ||
producer.flush(); | ||
} | ||
|
||
setShareAutoOffsetResetEarliest(bootstrapServers, groupId); | ||
|
||
Map<String, Set<String>> consumerRecords = new java.util.concurrent.ConcurrentHashMap<>(); | ||
consumerIds.forEach(id -> consumerRecords.put(id, | ||
java.util.Collections.synchronizedSet(new java.util.HashSet<>()))); | ||
var latch = new java.util.concurrent.CountDownLatch(recordCount); | ||
var running = new java.util.concurrent.atomic.AtomicBoolean(true); | ||
ExecutorService executor = Executors.newCachedThreadPool(); | ||
List<Future<?>> futures = new java.util.ArrayList<>(); | ||
|
||
// Consumer task: poll, acknowledge, and count down latch for new records | ||
for (int i = 0; i < consumerIds.size(); i++) { | ||
final int idx = i; | ||
Future<?> future = executor.submit(() -> { | ||
DefaultShareConsumerFactory<String, String> shareConsumerFactory = new DefaultShareConsumerFactory<>( | ||
Map.of( | ||
"bootstrap.servers", bootstrapServers, | ||
"key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class, | ||
"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class | ||
)); | ||
try (var consumer = shareConsumerFactory | ||
.createShareConsumer(groupId, consumerIds.get(idx))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to create consumers in those threads? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @artembilan Don't you think that creating the shared consumer outside of the thread creates some race conditions? For ex, if we extract the consumer creation outside of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. But that is not what I see in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the framework code you mentioned in |
||
consumer.subscribe(Collections.singletonList(topic)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My idea was to move |
||
while (running.get() && latch.getCount() > 0) { | ||
var records = consumer.poll(Duration.ofMillis(200)); | ||
for (var r : records) { | ||
if (consumerRecords.get(consumerIds.get(idx)).add(r.value())) { | ||
consumer.acknowledge(r, AcknowledgeType.ACCEPT); | ||
latch.countDown(); | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
futures.add(future); | ||
} | ||
|
||
boolean completed = latch.await(10, java.util.concurrent.TimeUnit.SECONDS); | ||
running.set(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we instead of this flag really close those consumers after that latch is fulfilled? |
||
for (Future<?> future : futures) { | ||
try { | ||
future.get(); | ||
} | ||
catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
executor.shutdown(); | ||
assertThat(completed) | ||
.as("All records should be received within timeout") | ||
.isTrue(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need all of these extra variables and even don't need to deal with |
||
return consumerRecords; | ||
} | ||
|
||
/** | ||
* Sets the share.auto.offset.reset group config to earliest for the given groupId, | ||
* using the provided bootstrapServers. | ||
*/ | ||
private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { | ||
Map<String, Object> adminProperties = new HashMap<>(); | ||
adminProperties.put("bootstrap.servers", bootstrapServers); | ||
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); | ||
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); | ||
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of( | ||
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); | ||
try (Admin admin = AdminClient.create(adminProperties)) { | ||
admin.incrementalAlterConfigs(configs).all().get(); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. So, if we talk about all the data received (exclusively) independently of the number of consumers (queue semantics), then I wonder if just shared
List
to be fulfilled by those consumers would be enough for our test.Then we just verify content of that list and check if there are no duplicates.
At the same time it might be hard to determine that those records indeed come from different consumers.
Not sure how to be, but
Set
is misleading here since it will eliminate duplicates.