Skip to content

Rewrite the reactive saveAll(Publisher<T>) methods in template and repository #2496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sothawo opened this issue Mar 18, 2023 · 6 comments · Fixed by #2497
Closed

Rewrite the reactive saveAll(Publisher<T>) methods in template and repository #2496

sothawo opened this issue Mar 18, 2023 · 6 comments · Fixed by #2497
Assignees
Labels
type: enhancement A general enhancement

Comments

@sothawo
Copy link
Collaborator

sothawo commented Mar 18, 2023

Currently the saveAll(Publisher) methods collect the publisher into a list, sending that to Elasticsearch as a batch request and then returning data from the response as Flux.

This needs to be changed to a more reactive approach, as it is not known how many items will be sent in with the publisher.

But we cannot simply just sent each element as a single index request, this would flood the network layer, so we need to

  • buffer the incoming data into batches
  • send this as a bulk request to Elasticsearch
  • use backpressure handling with the upstream publisher to prevent flooding the transport client.
  • provide the returned data in a continous flux.

I outlined the basic idea how this can be done in #2492

@sothawo sothawo added type: enhancement A general enhancement status: worked on a contributor is working on this issue labels Mar 18, 2023
@sothawo sothawo self-assigned this Mar 18, 2023
sothawo added a commit to sothawo/spring-data-elasticsearch that referenced this issue Mar 19, 2023
sothawo added a commit that referenced this issue Mar 19, 2023
Original Pull Request #2497
Closes #2496
Closes #2492
@sothawo sothawo removed the status: worked on a contributor is working on this issue label Mar 19, 2023
@sothawo sothawo added this to the 5.1 M3 (2023.0.0) milestone Mar 19, 2023
@patpatpat123
Copy link

Hello @sothawo ,

Many thanks for this PR.

We have several different test scenarios ready to save Flux sent at different rate (ranging from gentle to aggresive)

We would like to help test this PR.

However, we tried importing

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
            <version>5.1.0-M3</version>
        </dependency>

And it seems we are not able to get your work.
Could you please suggest how can we get this merge please?

Will be glad to come back with some test results.

@sothawo
Copy link
Collaborator Author

sothawo commented Mar 20, 2023

Milestone 5.1.0-M3 is not yet released. You'll need to use the snapshot version 5.1.0-SNAPSHOT from the snapshot repository. Check the readme for the URL of this.

@patpatpat123
Copy link

Hello @sothawo,

With you advice, I was able to download the latest code.
I then run bunch of tests, ranging from gentle (10 messages per second) to something more aggressive (5000-8000 messages per second).

Only when it is aggressive, I am able to encounter this issue most of the time, after the program being run for a while:

reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
Caused by: reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:249)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
	at io.micrometer.context.ContextSnapshot.lambda$wrap$0(ContextSnapshot.java:78)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

I am pasting the issue here first, since the only delta is this PR.

(I tried mongo, no problem, cassandra, no problem, elastic with new code but with low rate, no problem, elastic with new code but with high rate => issue)

If you believe it is not due to this change. please let me know.

@patpatpat123
Copy link

timeout.zip

@sothawo
Copy link
Collaborator Author

sothawo commented Mar 21, 2023

I don't know why you again bring up Mongo and cassandra, these are different projects with different drivers and connections.

Can you provide a runnable example that reproduces this? - and no, some setup which requires a Kafka is no reproducer for this.

5.1.0-M3 was released yesterday afternoon.

@sothawo
Copy link
Collaborator Author

sothawo commented Mar 21, 2023

Sorry, but your zip file does not produce some runnable code to reproduce the problem. Your code needs some Kafka setup with some data in it which I don't have plus setting up ssl certificates.

I have a minimal Spring Boot application that connects to a local Elasticsearch running in docker with this configuration:

@Configuration
public class ReactiveRestClientConfig extends ReactiveElasticsearchConfiguration {

	public ClientConfiguration clientConfiguration() {
		return ClientConfiguration.builder() //
			.connectedTo("localhost:9200") //
			.withBasicAuth("user", "password") //
			.build();
	}

	@Override
	protected RefreshPolicy refreshPolicy() {
		return RefreshPolicy.NONE;
	}
}

Note that I set the refresh policy to NONE to speed up indexing

The method I use to run my test:

public Mono<Void> test() {
	var start = Instant.now();

	var fooFlux =  Flux.range(1, 1_000_000)
		.map(Foo::of);

	return repository.saveAll(fooFlux)
		.doOnComplete(() -> {
			System.out.println(Duration.between(start, Instant.now()));
		})
		.then();
}

On my iMac from 2015 (running the application from within IntelliJ IDEA and Elasticsearch in docker), I can index these million entities in 1 minute 12 seconds without an error. That's 13.800 entities/second.

So I see no issue here on the side of Spring Data Elasticsearch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants