-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,9 @@ | |
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import org.apache.kafka.clients.admin.Admin; | ||
import org.apache.kafka.clients.admin.AdminClient; | ||
|
@@ -49,7 +51,7 @@ | |
* @since 4.0 | ||
*/ | ||
@EmbeddedKafka( | ||
topics = {"embedded-share-test"}, partitions = 1, | ||
topics = {"embedded-share-test", "embedded-share-multi-test", "embedded-share-distribution-test"}, partitions = 1, | ||
brokerProperties = { | ||
"unstable.api.versions.enable=true", | ||
"group.coordinator.rebalance.protocols=classic,share", | ||
|
@@ -144,7 +146,6 @@ void shouldReturnUnmodifiableListenersList() { | |
} | ||
|
||
@Test | ||
@SuppressWarnings("try") | ||
void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) throws Exception { | ||
final String topic = "embedded-share-test"; | ||
final String groupId = "testGroup"; | ||
|
@@ -165,16 +166,14 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro | |
// For this test: force new share groups to start from the beginning of the topic. | ||
// This is NOT the same as the usual consumer auto.offset.reset; it's a group config, | ||
// so use AdminClient to set share.auto.offset.reset = earliest for our test group. | ||
try (AdminClient ignored = AdminClient.create(adminProperties)) { | ||
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); | ||
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); | ||
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); | ||
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); | ||
|
||
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of( | ||
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op)); | ||
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of( | ||
new ConfigResource(ConfigResource.Type.GROUP, "testGroup"), Arrays.asList(op)); | ||
|
||
try (Admin admin = AdminClient.create(adminProperties)) { | ||
admin.incrementalAlterConfigs(configs).all().get(); | ||
} | ||
try (Admin admin = AdminClient.create(adminProperties)) { | ||
admin.incrementalAlterConfigs(configs).all().get(); | ||
} | ||
|
||
var consumerProps = new HashMap<String, Object>(); | ||
|
@@ -197,4 +196,121 @@ void integrationTestDefaultShareConsumerFactory(EmbeddedKafkaBroker broker) thro | |
consumer.close(); | ||
} | ||
|
||
@Test | ||
void integrationTestMultipleSharedConsumers(EmbeddedKafkaBroker broker) throws Exception { | ||
final String topic = "embedded-share-multi-test"; | ||
final String groupId = "multiTestGroup"; | ||
int recordCount = 4; | ||
List<String> consumerIds = List.of("client-1", "client-2"); | ||
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker); | ||
|
||
Set<String> allReceived = new java.util.HashSet<>(); | ||
for (Set<String> records : consumerRecords.values()) { | ||
allReceived.addAll(records); | ||
} | ||
for (int i = 0; i < recordCount; i++) { | ||
assertThat(allReceived) | ||
.as("Should have received value " + topic + "-value-" + i) | ||
.contains(topic + "-value-" + i); | ||
} | ||
assertThat(allReceived.size()).isEqualTo(recordCount); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is redundant since the previous loop includes the size manipulation. |
||
} | ||
|
||
@Test | ||
void integrationTestSharedConsumersDistribution(EmbeddedKafkaBroker broker) throws Exception { | ||
final String topic = "embedded-share-distribution-test"; | ||
final String groupId = "distributionTestGroup"; | ||
int recordCount = 8; | ||
List<String> consumerIds = List.of("client-dist-1", "client-dist-2"); | ||
Map<String, Set<String>> consumerRecords = runSharedConsumerTest(topic, groupId, consumerIds, recordCount, broker); | ||
|
||
// Assert each consumer received at least one record | ||
for (String id : consumerIds) { | ||
Set<String> records = consumerRecords.get(id); | ||
assertThat(records) | ||
.as("Consumer %s should have received at least one record", id) | ||
.isNotEmpty(); | ||
} | ||
// Assert all records were received (no loss) | ||
Set<String> allReceived = new java.util.HashSet<>(); | ||
consumerRecords.values().forEach(allReceived::addAll); | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for (int i = 0; i < recordCount; i++) { | ||
assertThat(allReceived) | ||
.as("Should have received value " + topic + "-value-" + i) | ||
.contains(topic + "-value-" + i); | ||
} | ||
assertThat(allReceived.size()).isEqualTo(recordCount); | ||
} | ||
|
||
private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId, List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The line length not more that |
||
var bootstrapServers = broker.getBrokersAsString(); | ||
|
||
var producerProps = new java.util.Properties(); | ||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); | ||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | ||
|
||
try (var producer = new KafkaProducer<String, String>(producerProps)) { | ||
for (int i = 0; i < recordCount; i++) { | ||
producer.send(new ProducerRecord<>(topic, "key" + i, topic + "-value-" + i)).get(); | ||
} | ||
} | ||
|
||
Map<String, Object> adminProperties = Map.of("bootstrap.servers", bootstrapServers); | ||
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest"); | ||
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET); | ||
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of( | ||
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op)); | ||
try (Admin admin = AdminClient.create(adminProperties)) { | ||
admin.incrementalAlterConfigs(configs).all().get(); | ||
} | ||
|
||
var consumerProps = new HashMap<String, Object>(); | ||
consumerProps.put("bootstrap.servers", bootstrapServers); | ||
consumerProps.put("key.deserializer", StringDeserializer.class); | ||
consumerProps.put("value.deserializer", StringDeserializer.class); | ||
consumerProps.put("group.id", groupId); | ||
|
||
DefaultShareConsumerFactory<String, String> factory = new DefaultShareConsumerFactory<>(consumerProps); | ||
var consumers = consumerIds.stream() | ||
.map(id -> factory.createShareConsumer(groupId, id)) | ||
.toList(); | ||
consumers.forEach(c -> c.subscribe(Collections.singletonList(topic))); | ||
|
||
Map<String, Set<String>> consumerRecords = new java.util.concurrent.ConcurrentHashMap<>(); | ||
consumerIds.forEach(id -> consumerRecords.put(id, java.util.Collections.synchronizedSet(new java.util.HashSet<>()))); | ||
var latch = new java.util.concurrent.CountDownLatch(recordCount); | ||
var running = new java.util.concurrent.atomic.AtomicBoolean(true); | ||
List<Thread> threads = new java.util.ArrayList<>(); | ||
|
||
for (int i = 0; i < consumers.size(); i++) { | ||
final int idx = i; | ||
Thread t = new Thread(() -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you use |
||
try (var consumer = consumers.get(idx)) { | ||
var id = consumerIds.get(idx); | ||
while (running.get() && latch.getCount() > 0) { | ||
var records = consumer.poll(Duration.ofMillis(200)); | ||
for (var r : records) { | ||
if (consumerRecords.get(id).add(r.value())) { | ||
latch.countDown(); | ||
} | ||
} | ||
} | ||
} | ||
}); | ||
threads.add(t); | ||
t.start(); | ||
} | ||
|
||
boolean completed = latch.await(5, java.util.concurrent.TimeUnit.SECONDS); | ||
running.set(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we instead of this flag really close those consumers after that latch is fulfilled? |
||
for (Thread t : threads) { | ||
t.join(); | ||
} | ||
if (!completed) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't you assert a The test code is too complicated to follow and maintain in the future. |
||
throw new AssertionError("All records should be received within timeout"); | ||
} | ||
return consumerRecords; | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this manipulation, the test does not confirm that all consumers receive the same records: we are just going to have only one set of unique records and not clear from what consumer.
I would compare one value (
Set
in our case) with the rest.See respective AssertJ API:
And the rest of this test would not be necessary.