You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have an issue with the Reactive implementation of the Neo4j repositories, the blocking one works just fine, however doing the same with the reactive version keeps me running around in circles and all end up with the following error.
2023-01-06 11:49:31.380 [Neo4jDriverIO-2-8] [WARN] Neo4jPersistenceExceptionTranslator - Don't know how to translate exception of type class java.lang.IndexOutOfBoundsException
2023-01-06 11:49:31.392 [Neo4jDriverIO-2-8] [ERROR] Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.FluxPeek] :
reactor.core.publisher.Flux.checkpoint(Flux.java:3453)
org.springframework.data.neo4j.core.ReactiveNeo4jTemplate.processNestedRelations(ReactiveNeo4jTemplate.java:1041)
Error has been observed at the following site(s):
*__checkpoint() ⇢ at org.springframework.data.neo4j.core.ReactiveNeo4jTemplate.processNestedRelations(ReactiveNeo4jTemplate.java:1041)
*__checkpoint() ⇢ at org.springframework.data.neo4j.core.ReactiveNeo4jTemplate.processNestedRelations(ReactiveNeo4jTemplate.java:1041)
Original Stack Trace:
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:128)
at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:639)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
at org.neo4j.driver.internal.reactivestreams.InternalReactiveResult.lambda$createRecordConsumer$3(InternalReactiveResult.java:91)
at org.neo4j.driver.internal.handlers.pulln.BasicPullResponseHandler.onRecord(BasicPullResponseHandler.java:161)
at org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleRecordMessage(InboundMessageDispatcher.java:100)
at org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackRecordMessage(CommonMessageReader.java:84)
at org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:59)
at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)
at org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1589)
Now the same code using a standard CrudRepository with @EnableNeo4jRepositories works just fine, and IIRC older versions of the reactive bits used to work as well. However the most recent version appears to be broken.
I've attached a sample using Spring Boot for bootstrapping, the original code doesn't use Spring Boot but the result is the same. If I remove the addition of friends and apprentice (the model is a small part of the starwars universe) the program runs. So the issue appears the relationship to another character that is also persisted. But both my lack of experience with Reactive programming and Neo4j here are leaving me confused.
The weird thing, as mentioned, the blocking/imperative version of same code works flawlessly. Is it me, is it a bug, or something else?
It fans out multiple saves of an owner (character) with multiple saves of the owned entity (planet) without an explicit transaction boundary. This has been problematic with the Neo4j driver before, see #2632 and paging @injectives (I cannot fix Martens code by just upgrading to 5.3.1 driver)
You can chose to flatMap the outer and linearlize the inner like this:
var transactionalOperator = TransactionalOperator.create(ctx.getBean(ReactiveTransactionManager.class));
characters.flatMap(service::save)
.transform(transactionalOperator::transactional)
.then(service.printAll())
.then(service.deleteAll())
.doOnTerminate(countDownLatch::countDown).subscribe();
This makes the flatMap and the save happening in one transaction, including the nested queries proper.
Also, you don't need to store the planets separately. They are "owned" by the characters, this is done for you.
Hope that helps; thanks for including SDN in your book!
I have an issue with the Reactive implementation of the Neo4j repositories, the blocking one works just fine, however doing the same with the reactive version keeps me running around in circles and all end up with the following error.
Now the same code using a standard
CrudRepository
with@EnableNeo4jRepositories
works just fine, and IIRC older versions of the reactive bits used to work as well. However the most recent version appears to be broken.I've attached a sample using Spring Boot for bootstrapping, the original code doesn't use Spring Boot but the result is the same. If I remove the addition of friends and apprentice (the model is a small part of the starwars universe) the program runs. So the issue appears the relationship to another character that is also persisted. But both my lack of experience with Reactive programming and Neo4j here are leaving me confused.
The weird thing, as mentioned, the blocking/imperative version of same code works flawlessly. Is it me, is it a bug, or something else?
neo4j-reactive-issue.zip
The text was updated successfully, but these errors were encountered: