-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add and refactor integration tests in DefaultShareConsumerFactoryTests #3932
Conversation
sobychacko
commented
May 28, 2025
- Added tests to check that multiple shared consumers each get records and all data is consumed.
- Refactored shared test logic into a helper method.
- Other cleanup in the test
- Added tests to check that multiple shared consumers each get records and all data is consumed. - Refactored shared test logic into a helper method. - Other cleanup in the test Signed-off-by: Soby Chacko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but I'm afraid more work is needed for this code for easier support in the future.
But general idea of the test is good.
|
||
for (int i = 0; i < consumers.size(); i++) { | ||
final int idx = i; | ||
Thread t = new Thread(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you use Executors.newCachedThreadPool()
?
It is really hard to read the code when custom Thread
is created.
.as("Should have received value " + topic + "-value-" + i) | ||
.contains(topic + "-value-" + i); | ||
} | ||
assertThat(allReceived.size()).isEqualTo(recordCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is redundant since the previous loop includes the size manipulation.
|
||
Set<String> allReceived = new java.util.HashSet<>(); | ||
for (Set<String> records : consumerRecords.values()) { | ||
allReceived.addAll(records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this manipulation, the test does not confirm that all consumers receive the same records: we are just going to have only one set of unique records and not clear from what consumer.
I would compare one value (Set
in our case) with the rest.
See respective AssertJ API:
/**
* Creates a new instance of <code>{@link CollectionAssert}</code>.
*
* @param <E> the type of elements.
* @param actual the actual value.
* @return the created assertion object.
* @since 3.21.0
*/
public static <E> AbstractCollectionAssert<?, Collection<? extends E>, E, ObjectAssert<E>> assertThat(Collection<? extends E> actual) {
And the rest of this test would not be necessary.
assertThat(allReceived.size()).isEqualTo(recordCount); | ||
} | ||
|
||
private static Map<String, Set<String>> runSharedConsumerTest(String topic, String groupId, List<String> consumerIds, int recordCount, EmbeddedKafkaBroker broker) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line length not more that 120
symbols, please
for (Thread t : threads) { | ||
t.join(); | ||
} | ||
if (!completed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you assert a latch.await()
result?
What is the point of this if
?
If we have a CountDownLatch
, why do we need to join()
those threads?
The test code is too complicated to follow and maintain in the future.
@artembilan I wanted to see if we can have multiple shared consumers in a group with the factory impl and wanted to start with something, but I will do the cleanup later today. Thanks! |
spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java
Show resolved
Hide resolved
executor.shutdown(); | ||
assertThat(completed) | ||
.as("All records should be received within timeout") | ||
.isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need all of these extra variables and even don't need to deal with Future
.
The executor.shutdown();
and then assert on the awaitTermination()
should do the trick.
See that shutdown()
JavaDocs.
"value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class | ||
)); | ||
try (var consumer = shareConsumerFactory | ||
.createShareConsumer(groupId, consumerIds.get(idx))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to create consumers in those threads?
Why cannot we do that outside for the executor.submit()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artembilan Don't you think that creating the shared consumer outside of the thread creates some race conditions? For ex, if we extract the consumer creation outside of the submit
, then the consumers may show unpredictable behavior, since both threads have access to both instances of the shared consumers. Since kafka consumers are inherently not thread safe, we may want to create the consumer within the thread bounds. Or did I understand it wrong? I tried your suggestion and then ran into issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. But that is not what I see in the KafkaMessageListenerContainer
.
We create a ListenerConsumer
from the doStart()
and immediately there we call consumerFactory.createConsumer()
.
However the poll()
we indeed call from the other thread.
Therefore my confusion here why would we create this shared consumer instance exactly in a thread where we poll?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the framework code you mentioned in KafkaMessageListenerContainer
, although we hand off the created consumer to another thread, in the doStart()
method, we ensure that the created consumer instance is exactly what is handed off to the polling thread and therefore it is thread safe. However, in our test scenario here, unless we do the same thing, (i.e., create a dedicated consumer instance and then pass that exactly to the thread), we may run into thread safety issues. IMO, it might be okay to to do what we do right now in the test. But if you see any issues with that, I will try to refactor the test.