-
Notifications
You must be signed in to change notification settings - Fork 155
RxSession.beginTransaction returning an "empty" Publisher #797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
From what I can tell about that Scala code: You never close the session, thus keeping a connection around. The session is supposed to be a lightweight thing, don't keep it around. Also, I cannot judge if you ever subscribe to the publisher you'll receive on Note that an empty Publisher like you describe initially is completely valid as per Reactive Streams specification. A publisher is not allowed to publish literal Find one way of handling sessions and tx correctly here: https://github.com/michael-simons/neo4j-from-the-jvm-ecosystem/blob/master/spring-plain-reactive/src/main/java/org/neo4j/examples/jvm/spring/plain/reactive/movies/MovieRepository.java#L46-L75 I am using the transactional functions of the Same thing with RxJava2 here https://github.com/michael-simons/neo4j-from-the-jvm-ecosystem/blob/master/micronaut-reactive/src/main/java/org/neo4j/examples/jvm/micronaut/reactive/movies/MovieRepository.java#L47-L82 and we have also an example on how to integrate with Smallrye Mutiny: https://github.com/michael-simons/neo4j-from-the-jvm-ecosystem/blob/master/quarkus-reactive/src/main/java/org/neo4j/examples/jvm/quarkus/reactive/movies/MovieRepository.java#L46-L81 You see that in all examples the session is closed. Project Reactor provides the best support for "do something with a resource that must be closed" Please let us know if this helps. |
Yes in this example I do not close it just for the sake of simplicity, since
So I should be creating a So, I ask again, is that not the intended way of using a
I see it right there:
Yes, a
Those are hard to idiomatically use from Scala, that is the purpose of neotypes to provide a Scala-friendly API for using it.
Real code uses something called |
For clarity specifically on sessions.... As described in the manual:
They are not and have never been something intended to be kept around long term. Sessions should never be shared across threads, and should be opened and closed to contain connected transactions, never kept for longer. Also, closing a session will activate necessary cleanup and buffering of any unconsumed result data. So always close your sessions promptly. If your transactions are not causally chained, then simply create one for each transaction and close it afterwards. Yes, this does add ugly boilerplate, but see the note on that below. The naming of |
Thanks, @technige for the clarification! As well to @michael-simons for first mentioning it. That means we have a big change to do on neotypes... which should fix the error for us, since we will just skip Still, I believe it would be worth to give it a look to this (but of course, that is up to you all). |
No worries, glad that we are able to help! I sometimes explained it with the following JDBC analogy:
Both session and transaction (also including their reactive and asynchronous variants) implement Following things not available in JDBC analogy:
|
We followed your advice about no reusing However, after doing that and trying again to add support for the Rx module, we found the same error again. Let me know if I can do something to help. |
@michael-simons @technige |
Hi @BalmungSan and @geoffjohn11. |
Hi @injectives thank you very much for looking into this! Is there a simple way of testing this fix? Or the best would be to clone this repo and do a local publish and use the produced SNAPSHOT version? |
Happy to fix things 👍 As it has not been released yet, the easiest option is to build a local snapshot and just trying it on local environment (that is how it was tested). |
Hi @injectives I just tested the fix, I can confirm it works both with the little test repo as well as with the real code at neotypes! |
Thanks for providing a comprehensive description and reproduction steps. |
Uh oh!
There was an error while loading. Please reload this page.
4.2.1
4.2.0
(Scala)Actual behaviour
Calling
beginTransaction
on aRxSession
returns aPublisher
that was "empty".Meaning, that the
Subscriber
that wassubscribed
to thatPublisher
receive anonComplete
before anyonNext
oronError
.Expected behaviour
That
onNext
was called on theSubscription
passing an instance ofRxTransaction
, or a call toonError
.Background
First of all, this bug was hard to isolate and replicate so I will try to share all the relevant information here first; apologies beforehand for the long post.
The context of this error is neotypes a Scala wrapper over this Java driver. I am one of the maintainers of that project and I found this problem by trying to provide a Streaming abstraction over the new Rx module; just in case the PR is this one: neotypes/neotypes#221
The problem is that on Github Actions the tests are failing due to the problem described above.
(Starting a new
RxTransaction
"failed")After playing a with the code, looking at metrics and logs I was able to isolate the error to the
RxSession.beginTransaction
call.I decided to take a look at the source code and I believe the problem is that, for some mythical reason, the code is executing this line meaning that there was no
RxTransaction
but also no error.Steps to reproduce
Run the following code while running a neo4j server (Docker) in background.
Note that the code is written using Scala (I am sorry but it is hard to come back to Java). Also, note that using a single thread and executing the callbacks asynchronously (i.e. inside a
Future
) is essential to make the code fail. Which makes me believe that this is a concurrency problem.The code can be found here, the README includes instructions about how to run it.
Also, the repo contains one additional snippet (on its own branch) using fs2 which is a popular Streaming library in the Scala ecosystem, that one is closer to the original code that found the bug.
Extras
RxTransaction
after getting the "empty" one, then the following exception will be throw:With these (unsafe) ones:
(do not worry too much about the Scala details, the point is the use of Reflection to by-pass the
Publisher
)Which somewhat confirms that error is related to the code mentioned above.
Driver.metrics.connectionPoolMetrics
when finishing the execution:When there is no error they look like this:
When there is an error they look like this:
Note the "inUse=1" in the error case.
It seems that even if the
RxTransaction
was not created, its connection was!Also, printing the
metrics
before the failed transaction did not show anything abnormal.The logs, however, do no show anything relevant; no matter if the error happened or not the final logs are something similar to this:
Finally, thanks in advance for the help.
And, let me know if I can do anything to help.
The text was updated successfully, but these errors were encountered: