Skip to content

Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

sobychacko
Copy link
Contributor

  • Added tests to check that multiple shared consumers each get records and all data is consumed.
  • Refactored shared test logic into a helper method.
  • Other cleanup in the test

- Added tests to check that multiple shared consumers each get records and all data is consumed.
- Refactored shared test logic into a helper method.
- Other cleanup in the test

Signed-off-by: Soby Chacko <[email protected]>
@sobychacko sobychacko added this to the 4.0.0-M3 milestone May 28, 2025
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I'm afraid more work is needed for this code for easier support in the future.
But general idea of the test is good.


for (int i = 0; i < consumers.size(); i++) {
final int idx = i;
Thread t = new Thread(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use Executors.newCachedThreadPool()?
It is really hard to read the code when custom Thread is created.

.as("Should have received value " + topic + "-value-" + i)
.contains(topic + "-value-" + i);
}
assertThat(allReceived.size()).isEqualTo(recordCount);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant since the previous loop includes the size manipulation.


Set<String> allReceived = new java.util.HashSet<>();
for (Set<String> records : consumerRecords.values()) {
allReceived.addAll(records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this manipulation, the test does not confirm that all consumers receive the same records: we are just going to have only one set of unique records and not clear from what consumer.
I would compare one value (Set in our case) with the rest.
See respective AssertJ API:

  /**
   * Creates a new instance of <code>{@link CollectionAssert}</code>.
   *
   * @param <E> the type of elements.
   * @param actual the actual value.
   * @return the created assertion object.
   * @since 3.21.0
   */
  public static <E> AbstractCollectionAssert<?, Collection<? extends E>, E, ObjectAssert<E>> assertThat(Collection<? extends E> actual) {

And the rest of this test would not be necessary.

assertThat(allReceived.size()).isEqualTo(recordCount);
}

private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId, List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line length not more that 120 symbols, please

for (Thread t : threads) {
t.join();
}
if (!completed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you assert a latch.await() result?
What is the point of this if?
If we have a CountDownLatch, why do we need to join() those threads?

The test code is too complicated to follow and maintain in the future.

@sobychacko
Copy link
Contributor Author

@artembilan I wanted to see if we can have multiple shared consumers in a group with the factory impl and wanted to start with something, but I will do the cleanup later today. Thanks!

executor.shutdown();
assertThat(completed)
.as("All records should be received within timeout")
.isTrue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need all of these extra variables and even don't need to deal with Future.
The executor.shutdown(); and then assert on the awaitTermination() should do the trick.
See that shutdown() JavaDocs.

"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class
));
try (var consumer = shareConsumerFactory
.createShareConsumer(groupId, consumerIds.get(idx))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to create consumers in those threads?
Why cannot we do that outside for the executor.submit()?

Copy link
Contributor Author

@sobychacko sobychacko Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan Don't you think that creating the shared consumer outside of the thread creates some race conditions? For ex, if we extract the consumer creation outside of the submit, then the consumers may show unpredictable behavior, since both threads have access to both instances of the shared consumers. Since kafka consumers are inherently not thread safe, we may want to create the consumer within the thread bounds. Or did I understand it wrong? I tried your suggestion and then ran into issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. But that is not what I see in the KafkaMessageListenerContainer.
We create a ListenerConsumer from the doStart() and immediately there we call consumerFactory.createConsumer().
However the poll() we indeed call from the other thread.
Therefore my confusion here why would we create this shared consumer instance exactly in a thread where we poll?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the framework code you mentioned in KafkaMessageListenerContainer, although we hand off the created consumer to another thread, in the doStart() method, we ensure that the created consumer instance is exactly what is handed off to the polling thread and therefore it is thread safe. However, in our test scenario here, unless we do the same thing, (i.e., create a dedicated consumer instance and then pass that exactly to the thread), we may run into thread safety issues. IMO, it might be okay to to do what we do right now in the test. But if you see any issues with that, I will try to refactor the test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants