Skip to content

Saving nodes with relationships results in constant error in a reactive environment -> Source emitted more than one item #2646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mdeinum opened this issue Jan 6, 2023 · 1 comment
Labels
status: waiting-for-triage An issue we've not yet triaged

Comments

@mdeinum
Copy link

mdeinum commented Jan 6, 2023

I have an issue with the Reactive implementation of the Neo4j repositories, the blocking one works just fine, however doing the same with the reactive version keeps me running around in circles and all end up with the following error.

2023-01-06 11:49:31.380 [Neo4jDriverIO-2-8] [WARN] Neo4jPersistenceExceptionTranslator - Don't know how to translate exception of type class java.lang.IndexOutOfBoundsException
2023-01-06 11:49:31.392 [Neo4jDriverIO-2-8] [ERROR] Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
Assembly trace from producer [reactor.core.publisher.FluxPeek] :
	reactor.core.publisher.Flux.checkpoint(Flux.java:3453)
	org.springframework.data.neo4j.core.ReactiveNeo4jTemplate.processNestedRelations(ReactiveNeo4jTemplate.java:1041)
Error has been observed at the following site(s):
	*__checkpoint() ⇢ at org.springframework.data.neo4j.core.ReactiveNeo4jTemplate.processNestedRelations(ReactiveNeo4jTemplate.java:1041)
	*__checkpoint() ⇢ at org.springframework.data.neo4j.core.ReactiveNeo4jTemplate.processNestedRelations(ReactiveNeo4jTemplate.java:1041)
Original Stack Trace:
		at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
		at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
		at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:128)
		at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:639)
		at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
		at org.neo4j.driver.internal.reactivestreams.InternalReactiveResult.lambda$createRecordConsumer$3(InternalReactiveResult.java:91)
		at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.onRecord(BasicPullResponseHandler.java:161)
		at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleRecordMessage(InboundMessageDispatcher.java:100)
		at org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackRecordMessage(CommonMessageReader.java:84)
		at org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:59)
		at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)
		at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)
		at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
		at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:1589)

Now the same code using a standard CrudRepository with @EnableNeo4jRepositories works just fine, and IIRC older versions of the reactive bits used to work as well. However the most recent version appears to be broken.

I've attached a sample using Spring Boot for bootstrapping, the original code doesn't use Spring Boot but the result is the same. If I remove the addition of friends and apprentice (the model is a small part of the starwars universe) the program runs. So the issue appears the relationship to another character that is also persisted. But both my lack of experience with Reactive programming and Neo4j here are leaving me confused.

The weird thing, as mentioned, the blocking/imperative version of same code works flawlessly. Is it me, is it a bug, or something else?

neo4j-reactive-issue.zip

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Jan 6, 2023
@michael-simons
Copy link
Collaborator

Thanks Marten for bringing this here.

Short answer: It is you!

Long answer:

Your problematic code:

var planets = Flux.just(dagobah, alderaan, tatooine);
planets.flatMap(service::save)
	.thenMany(characters.flatMap(service::save))
	.then(service.printAll())
	.then(service.deleteAll())
	.doOnTerminate(countDownLatch::countDown).subscribe();

It fans out multiple saves of an owner (character) with multiple saves of the owned entity (planet) without an explicit transaction boundary. This has been problematic with the Neo4j driver before, see #2632 and paging @injectives (I cannot fix Martens code by just upgrading to 5.3.1 driver)

You can chose to flatMap the outer and linearlize the inner like this:

planets.flatMap(service::save)
	.thenMany(characters.concatMap(service::save))
	.then(service.printAll())
	.then(service.deleteAll())
	.doOnTerminate(countDownLatch::countDown).subscribe();

Concat map keeps the proper order.

IMHO the better solution is this

var transactionalOperator = TransactionalOperator.create(ctx.getBean(ReactiveTransactionManager.class));
characters.flatMap(service::save)
	.transform(transactionalOperator::transactional)
	.then(service.printAll())
	.then(service.deleteAll())
	.doOnTerminate(countDownLatch::countDown).subscribe();

This makes the flatMap and the save happening in one transaction, including the nested queries proper.
Also, you don't need to store the planets separately. They are "owned" by the characters, this is done for you.

Hope that helps; thanks for including SDN in your book!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

3 participants