Skip to content

Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -49,7 +51,7 @@
* @since 4.0
*/
@EmbeddedKafka(
topics = {"embedded-share-test"}, partitions = 1,
topics = {"embedded-share-test", "embedded-share-multi-test", "embedded-share-distribution-test"}, partitions = 1,
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,share",
Expand Down Expand Up @@ -144,7 +146,6 @@ void shouldReturnUnmodifiableListenersList() {
}

@Test
@SuppressWarnings("try")
void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) throws Exception {
final String topic = "embedded-share-test";
final String groupId = "testGroup";
Expand All @@ -165,16 +166,14 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
// For this test: force new share groups to start from the beginning of the topic.
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config,
// so use AdminClient to set share.auto.offset.reset = earliest for our test group.
try (AdminClient ignored = AdminClient.create(adminProperties)) {
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);

Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op));
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op));

try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}
try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}

var consumerProps = new HashMap<String, Object>();
Expand All @@ -197,4 +196,121 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro
consumer.close();
}

@Test
void integrationTestMultipleSharedConsumers(EmbeddedKafkaBroker broker) throws Exception {
final String topic = "embedded-share-multi-test";
final String groupId = "multiTestGroup";
int recordCount = 4;
List<String> consumerIds = List.of("client-1", "client-2");
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker);

Set<String> allReceived = new java.util.HashSet<>();
for (Set<String> records : consumerRecords.values()) {
allReceived.addAll(records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this manipulation, the test does not confirm that all consumers receive the same records: we are just going to have only one set of unique records and not clear from what consumer.
I would compare one value (Set in our case) with the rest.
See respective AssertJ API:

  /**
   * Creates a new instance of <code>{@link CollectionAssert}</code>.
   *
   * @param <E> the type of elements.
   * @param actual the actual value.
   * @return the created assertion object.
   * @since 3.21.0
   */
  public static <E> AbstractCollectionAssert<?, Collection<? extends E>, E, ObjectAssert<E>> assertThat(Collection<? extends E> actual) {

And the rest of this test would not be necessary.

}
for (int i = 0; i < recordCount; i++) {
assertThat(allReceived)
.as("Should have received value " + topic + "-value-" + i)
.contains(topic + "-value-" + i);
}
assertThat(allReceived.size()).isEqualTo(recordCount);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant since the previous loop includes the size manipulation.

}

@Test
void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception {
final String topic = "embedded-share-distribution-test";
final String groupId = "distributionTestGroup";
int recordCount = 8;
List<String> consumerIds = List.of("client-dist-1", "client-dist-2");
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker);

// Assert each consumer received at least one record
for (String id : consumerIds) {
Set<String> records = consumerRecords.get(id);
assertThat(records)
.as("Consumer %s should have received at least one record", id)
.isNotEmpty();
}
// Assert all records were received (no loss)
Set<String> allReceived = new java.util.HashSet<>();
consumerRecords.values().forEach(allReceived::addAll);
for (int i = 0; i < recordCount; i++) {
assertThat(allReceived)
.as("Should have received value " + topic + "-value-" + i)
.contains(topic + "-value-" + i);
}
assertThat(allReceived.size()).isEqualTo(recordCount);
}

private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId, List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line length not more that 120 symbols, please

var bootstrapServers = broker.getBrokersAsString();

var producerProps = new java.util.Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

try (var producer = new KafkaProducer<String, String>(producerProps)) {
for (int i = 0; i < recordCount; i++) {
producer.send(new ProducerRecord<>(topic, "key" + i, topic + "-value-" + i)).get();
}
}

Map<String, Object> adminProperties = Map.of("bootstrap.servers", bootstrapServers);
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op));
try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}

var consumerProps = new HashMap<String, Object>();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
consumerProps.put("group.id", groupId);

DefaultShareConsumerFactory<String, String> factory = new DefaultShareConsumerFactory<>(consumerProps);
var consumers = consumerIds.stream()
.map(id -> factory.createShareConsumer(groupId, id))
.toList();
consumers.forEach(c -> c.subscribe(Collections.singletonList(topic)));

Map<String, Set<String>> consumerRecords = new java.util.concurrent.ConcurrentHashMap<>();
consumerIds.forEach(id -> consumerRecords.put(id, java.util.Collections.synchronizedSet(new java.util.HashSet<>())));
var latch = new java.util.concurrent.CountDownLatch(recordCount);
var running = new java.util.concurrent.atomic.AtomicBoolean(true);
List<Thread> threads = new java.util.ArrayList<>();

for (int i = 0; i < consumers.size(); i++) {
final int idx = i;
Thread t = new Thread(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use Executors.newCachedThreadPool()?
It is really hard to read the code when custom Thread is created.

try (var consumer = consumers.get(idx)) {
var id = consumerIds.get(idx);
while (running.get() && latch.getCount() > 0) {
var records = consumer.poll(Duration.ofMillis(200));
for (var r : records) {
if (consumerRecords.get(id).add(r.value())) {
latch.countDown();
}
}
}
}
});
threads.add(t);
t.start();
}

boolean completed = latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
running.set(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead of this flag really close those consumers after that latch is fulfilled?

for (Thread t : threads) {
t.join();
}
if (!completed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you assert a latch.await() result?
What is the point of this if?
If we have a CountDownLatch, why do we need to join() those threads?

The test code is too complicated to follow and maintain in the future.

throw new AssertionError("All records should be received within timeout");
}
return consumerRecords;
}

}