-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Rewrite the reactive saveAll(Publisher<T>) methods in template and repository #2496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hello @sothawo , Many thanks for this PR. We have several different test scenarios ready to save Flux sent at different rate (ranging from gentle to aggresive) We would like to help test this PR. However, we tried importing
And it seems we are not able to get your work. Will be glad to come back with some test results. |
Milestone 5.1.0-M3 is not yet released. You'll need to use the snapshot version 5.1.0-SNAPSHOT from the snapshot repository. Check the readme for the URL of this. |
Hello @sothawo, With you advice, I was able to download the latest code. Only when it is aggressive, I am able to encounter this issue most of the time, after the program being run for a while:
I am pasting the issue here first, since the only delta is this PR. (I tried mongo, no problem, cassandra, no problem, elastic with new code but with low rate, no problem, elastic with new code but with high rate => issue) If you believe it is not due to this change. please let me know. |
I don't know why you again bring up Mongo and cassandra, these are different projects with different drivers and connections. Can you provide a runnable example that reproduces this? - and no, some setup which requires a Kafka is no reproducer for this. 5.1.0-M3 was released yesterday afternoon. |
Sorry, but your zip file does not produce some runnable code to reproduce the problem. Your code needs some Kafka setup with some data in it which I don't have plus setting up ssl certificates. I have a minimal Spring Boot application that connects to a local Elasticsearch running in docker with this configuration: @Configuration
public class ReactiveRestClientConfig extends ReactiveElasticsearchConfiguration {
public ClientConfiguration clientConfiguration() {
return ClientConfiguration.builder() //
.connectedTo("localhost:9200") //
.withBasicAuth("user", "password") //
.build();
}
@Override
protected RefreshPolicy refreshPolicy() {
return RefreshPolicy.NONE;
}
} Note that I set the refresh policy to The method I use to run my test: public Mono<Void> test() {
var start = Instant.now();
var fooFlux = Flux.range(1, 1_000_000)
.map(Foo::of);
return repository.saveAll(fooFlux)
.doOnComplete(() -> {
System.out.println(Duration.between(start, Instant.now()));
})
.then();
} On my iMac from 2015 (running the application from within IntelliJ IDEA and Elasticsearch in docker), I can index these million entities in 1 minute 12 seconds without an error. That's 13.800 entities/second. So I see no issue here on the side of Spring Data Elasticsearch. |
Currently the saveAll(Publisher) methods collect the publisher into a list, sending that to Elasticsearch as a batch request and then returning data from the response as Flux.
This needs to be changed to a more reactive approach, as it is not known how many items will be sent in with the publisher.
But we cannot simply just sent each element as a single index request, this would flood the network layer, so we need to
I outlined the basic idea how this can be done in #2492
The text was updated successfully, but these errors were encountered: