Skip to content

RxSession.beginTransaction returning an "empty" Publisher #797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
BalmungSan opened this issue Dec 4, 2020 · 12 comments · Fixed by #932 or neotypes/neotypes#364
Closed

RxSession.beginTransaction returning an "empty" Publisher #797

BalmungSan opened this issue Dec 4, 2020 · 12 comments · Fixed by #932 or neotypes/neotypes#364
Assignees

Comments

@BalmungSan
Copy link

BalmungSan commented Dec 4, 2020

  • Neo4j version: Community 4.2.1
  • Neo4j Mode: Single instance (Docker)
  • Driver version: Java driver 4.2.0 (Scala)
  • Operating system: Ubuntu 20.04 on WSL2 / Amazon Linux 2 on AWS (t3.medium) / Ubuntu 18.04 on AWS (t3.small) / Ubuntu 18.04 on Azure (Standard_DS2_v2)

Actual behaviour

Calling beginTransaction on a RxSession returns a Publisher that was "empty".
Meaning, that the Subscriber that was subscribed to that Publisher receive an onComplete before any onNext or onError.

Expected behaviour

That onNext was called on the Subscription passing an instance of RxTransaction, or a call to onError.

Background

First of all, this bug was hard to isolate and replicate so I will try to share all the relevant information here first; apologies beforehand for the long post.

The context of this error is neotypes a Scala wrapper over this Java driver. I am one of the maintainers of that project and I found this problem by trying to provide a Streaming abstraction over the new Rx module; just in case the PR is this one: neotypes/neotypes#221

The problem is that on Github Actions the tests are failing due to the problem described above.
(Starting a new RxTransaction "failed")

After playing a with the code, looking at metrics and logs I was able to isolate the error to the RxSession.beginTransaction call.
I decided to take a look at the source code and I believe the problem is that, for some mythical reason, the code is executing this line meaning that there was no RxTransaction but also no error.

Steps to reproduce

Run the following code while running a neo4j server (Docker) in background.

Note that the code is written using Scala (I am sorry but it is hard to come back to Java). Also, note that using a single thread and executing the callbacks asynchronously (i.e. inside a Future) is essential to make the code fail. Which makes me believe that this is a concurrency problem.

package neotypes

import org.neo4j.{driver => neo4j}
import org.reactivestreams.{Publisher, Subscriber, Subscription}

import java.util.concurrent.Executors
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
import scala.util.control.NoStackTrace

object Main {
  implicit val ec =
    ExecutionContext.fromExecutorService(
      Executors.newSingleThreadExecutor()
    )

  def main(args: Array[String]): Unit = {
    val driver =
      neo4j.GraphDatabase.driver(
        "bolt://localhost:7687",
        neo4j.Config.builder
          .withoutEncryption
          .withDriverMetrics
          .withLogging(neo4j.Logging.slf4j)
          .build()
      )

    val neotypesDriver = new NeotypesDriver(driver)

    def loop(attempts: Int): Future[Unit] = {
      println()
      println("--------------------------------------------------")
      println(s"Remaining attempts ${attempts}")
      println(s"Metrics: ${driver.metrics.connectionPoolMetrics.asScala}")

      neotypesDriver.run("MATCH (p: Person { name: 'Charlize Theron' }) RETURN p.name").flatMap { r =>
        println(s"Results: ${r}")
        if (attempts > 0) loop(attempts - 1)
        else Future.unit
      }
    }

    def setup: Future[Unit] =
      for {
        _ <- neotypesDriver.run("MATCH (n) DETACH DELETE n")
        _ <- neotypesDriver.run("CREATE (Charlize: Person { name: 'Charlize Theron', born: 1975 })")
      } yield ()

    val app = setup.flatMap { _ =>
      loop(attempts = 1000)
    } recover {
      case NoTransactionError =>
        println(s"Transaction was not created!")

      case ex =>
        println(s"Unexpected error ${ex.getMessage}")
        ex.printStackTrace()
    }

    Await.ready(app, Duration.Inf)
    println()
    println("-------------------------------------------------")
    println(s"Final metrics: ${driver.metrics.connectionPoolMetrics.asScala}")
    driver.close()
    ec.shutdown()
  }
}

final class NeotypesDriver(driver: neo4j.Driver)
                          (implicit ec: ExecutionContext) {
  import Syntax._

  def run(query: String): Future[Option[Map[String, String]]] = {
    def runQuery(tx: neo4j.reactive.RxTransaction): Future[Option[Map[String, String]]] =
      tx
        .run(query)
        .records
        .toFuture
        .map { recordOption =>
          recordOption.map { record =>
            record
              .fields
              .asScala
              .iterator
              .map(p => p.key -> p.value.toString)
              .toMap
          }
        }

    val session = driver.rxSession

    for {
      tx <- session.beginTransaction.toFuture.transform(_.flatMap(_.toRight(left = NoTransactionError).toTry))
      result <- runQuery(tx)
      _ <- tx.commit[Unit].toFuture
      _ <- session.close[Unit].toFuture
    } yield result
  }
}

object Syntax {
  implicit final class PublisherOps[A] (private val publisher: Publisher[A]) extends AnyVal {
    def toFuture(implicit ec: ExecutionContext): Future[Option[A]] = {
      val promise = Promise[Option[A]]()

      val subscriber = new Subscriber[A] {
        var s: Subscription = _

        override def onSubscribe(subscription: Subscription): Unit = {
          s = subscription
          Future(s.request(1))
        }

        override def onNext(a: A): Unit = {
          promise.success(Some(a))
          Future(s.cancel())
        }

        override def onError(ex: Throwable): Unit = {
          promise.failure(ex)
        }

        override def onComplete(): Unit = {
          if (!promise.isCompleted) {
            promise.success(None)
          }
        }
      }
      publisher.subscribe(subscriber)

      promise.future
    }
  }
}

object NoTransactionError extends Throwable("Transaction was not created!") with NoStackTrace

The code can be found here, the README includes instructions about how to run it.
Also, the repo contains one additional snippet (on its own branch) using fs2 which is a popular Streaming library in the Scala ecosystem, that one is closer to the original code that found the bug.

Extras

  1. If one attempts to begin a new RxTransaction after getting the "empty" one, then the following exception will be throw:

org.neo4j.driver.exceptions.TransactionNestingException: You cannot begin a transaction on a session with an open transaction; either run from within the transaction or use a different session.

  1. I was able to "fix" the problem by replacing these lines in my implementation:
val txIO = session.beginTransaction.toStream.toIO.flatMap(opt => IO.fromOption(opt)(orElse = NoTransactionError))

With these (unsafe) ones:

val rxs = session.asInstanceOf[org.neo4j.driver.internal.reactive.InternalRxSession]
val f = rxs.getClass.getDeclaredField("session")
f.setAccessible(true)
val ns = f.get(rxs).asInstanceOf[org.neo4j.driver.internal.async.NetworkSession]
val txIO = IO.async[neo4j.reactive.RxTransaction] { cb =>
  ns.beginTransactionAsync(neo4j.TransactionConfig.empty).thenAccept { tx =>
    cb(Right(new org.neo4j.driver.internal.reactive.InternalRxTransaction(tx)))
  } exceptionally { ex =>
    cb(Left(ex))
    None.orNull
  }
}

(do not worry too much about the Scala details, the point is the use of Reflection to by-pass the Publisher)

Which somewhat confirms that error is related to the code mentioned above.

  1. I found something interesting by looking at the Driver.metrics.connectionPoolMetrics when finishing the execution:

When there is no error they look like this:

localhost:33245-1027665495=[created=1, closed=0, creating=0, failedToCreate=0, acquiring=0, acquired=4, timedOutToAcquire=0, inUse=0, idle=1, totalAcquisitionTime=462, totalConnectionTime=449, totalInUseTime=3356, totalInUseCount=4]

When there is an error they look like this:

localhost:33241-1862328262=[created=1, closed=0, creating=0, failedToCreate=0, acquiring=0, acquired=3, timedOutToAcquire=0, inUse=1, idle=0, totalAcquisitionTime=516, totalConnectionTime=498, totalInUseTime=3301, totalInUseCount=2]

Note the "inUse=1" in the error case.
It seems that even if the RxTransaction was not created, its connection was!
Also, printing the metrics before the failed transaction did not show anything abnormal.

The logs, however, do no show anything relevant; no matter if the error happened or not the final logs are something similar to this:

22:09:09.261 [pool-1-thread-1-ScalaTest-running-Fs2Suite] INFO Driver - Closing driver instance 317516020
22:09:09.264 [Neo4jDriverIO-2-1] DEBUG OutboundMessageHandler - [0xc0839333][localhost:33269][bolt-2] C: GOODBYE
22:09:09.271 [Neo4jDriverIO-2-1] DEBUG ChannelErrorHandler - [0xc0839333][localhost:33269][bolt-2] Channel is inactive
22:09:09.274 [pool-1-thread-1-ScalaTest-running-Fs2Suite] INFO ConnectionPool - Closing connection pool towards localhost:33269
22:09:09.275 [Neo4jDriverIO-2-1] DEBUG ChannelErrorHandler - [0xc0839333][localhost:33269][bolt-2] Closing channel because of a failure 'org.neo4j.driver.exceptions.ServiceUnavailableException: Connection to the database terminated. Please ensure that your database is listening on the correct host and port and that you have compatible encryption settings both on Neo4j server and driver. Note that the default encryption setting has changed in Neo4j 4.0.'
22:09:09.487 [Neo4jDriverIO-2-1] DEBUG org.neo4j.driver.internal.shaded.io.netty.buffer.PoolThreadCache - Freed 8 thread-local buffer(s) from thread: Neo4jDriverIO-2-1


Finally, thanks in advance for the help.
And, let me know if I can do anything to help.

@michael-simons
Copy link
Contributor

From what I can tell about that Scala code: You never close the session, thus keeping a connection around. The session is supposed to be a lightweight thing, don't keep it around.
If you would like to have a notion of a long lived session, keep your NeotypesSession, but pass the instance of our Driver to it and give out new sessions and make sure you close them.

Also, I cannot judge if you ever subscribe to the publisher you'll receive on tx.commit[Unit]. If you don't, the transaction won't be committed and won't be closed.

Note that an empty Publisher like you describe initially is completely valid as per Reactive Streams specification. A publisher is not allowed to publish literal null values and Java does not have the concept of Unit.

Find one way of handling sessions and tx correctly here: https://github.com/michael-simons/neo4j-from-the-jvm-ecosystem/blob/master/spring-plain-reactive/src/main/java/org/neo4j/examples/jvm/spring/plain/reactive/movies/MovieRepository.java#L46-L75

I am using the transactional functions of the RxSession here, so that I automatically profit from the drivers retry management and don't have to deal with closing the tx myself.

Same thing with RxJava2 here https://github.com/michael-simons/neo4j-from-the-jvm-ecosystem/blob/master/micronaut-reactive/src/main/java/org/neo4j/examples/jvm/micronaut/reactive/movies/MovieRepository.java#L47-L82 and we have also an example on how to integrate with Smallrye Mutiny: https://github.com/michael-simons/neo4j-from-the-jvm-ecosystem/blob/master/quarkus-reactive/src/main/java/org/neo4j/examples/jvm/quarkus/reactive/movies/MovieRepository.java#L46-L81

You see that in all examples the session is closed. Project Reactor provides the best support for "do something with a resource that must be closed"

Please let us know if this helps.

@BalmungSan
Copy link
Author

@michael-simons

You never close the session

Yes in this example I do not close it just for the sake of simplicity, since close returns another Publisher.
But the real code ensures it is closed at the end.

The session is supposed to be a lightweight thing, don't keep it around.

So I should be creating a Session for each query? Is that correct?
So what is even the purpose of Session in the first place? Why not just getting a Transaction directly from the Driver?
For what I understood of the docs a Session was for ensuring casual consistency of their Transactions and thread-unsafe. So, what we did in neotypes was making our Session thread-safe and thus the idea was creating one instance at the beginning of the app and use it for all your queries. Or if you want to run queries in parallel then the user would need to create multiple Sessions and distribute the queries themselves.

So, I ask again, is that not the intended way of using a Session? Should I be creating one per query?
Because that implies we need to refactor how neotypes works.

Also, I cannot judge if you ever subscribe to the publisher you'll receive on tx.commit[Unit]

I see it right there: tx.commit[Unit].toFuture remember toFuture subscribes to the Publisher.

Note that an empty Publisher like you describe initially is completely valid as per Reactive Streams specification. A publisher is not allowed to publish literal null values and Java does not have the concept of Unit.

Yes, a Publisher may be "empty", that is how the publisher of commit behaves. But in that case, it makes sense since commit doesn't return any value.
Now, IMHO, if a Publisher that was supposed to produce one and only one value doesn't produce it and also doesn't produce an error is, for me, a bug; especially considering that if you ask for a new Transaction it fails saying the previous Transaction was not closed, so it indeed produced it but didn't publish it.

I am using the transactional functions of the RxSession here

Those are hard to idiomatically use from Scala, that is the purpose of neotypes to provide a Scala-friendly API for using it.

Project Reactor provides the best support for "do something with a resource that must be closed"

Real code uses something called Resource which is a functional construct which ensures this.
Again, this snippet is just a simplification for showing the error, that tried to be as Javaesque as possible.

@technige
Copy link
Contributor

technige commented Dec 8, 2020

For clarity specifically on sessions....

As described in the manual:

Sessions are lightweight containers for causally chained sequences of transactions. They essentially provide context for storing transaction sequencing information in the form of bookmarks.

They are not and have never been something intended to be kept around long term. Sessions should never be shared across threads, and should be opened and closed to contain connected transactions, never kept for longer. Also, closing a session will activate necessary cleanup and buffering of any unconsumed result data. So always close your sessions promptly.

If your transactions are not causally chained, then simply create one for each transaction and close it afterwards. Yes, this does add ugly boilerplate, but see the note on that below.

The naming of Session is an unfortunate historical hangover. They would be better named TransactionChain or similar; we are looking at deprecating this naming ahead of 5.0, which should ultimately help to clarify their usage. As you also suggest, spawning transactions directly from the Driver would be handy; we are also looking at that as a future optimisation.

@BalmungSan
Copy link
Author

Thanks, @technige for the clarification! As well to @michael-simons for first mentioning it.

That means we have a big change to do on neotypes... which should fix the error for us, since we will just skip Session for now and just provide Driver and Transaction.

Still, I believe it would be worth to give it a look to this (but of course, that is up to you all).
Since, even if the code does 1000 just to be sure, most of the time the error happened before the first ten iterations.

@michael-simons
Copy link
Contributor

No worries, glad that we are able to help!

I sometimes explained it with the following JDBC analogy:

GraphDatabase is to Neo4j Driver what DriverManager is to JDBC.
Driver (instance) is to Neo4j Driver what Connection is to JDBC.
Session reassambles a JDBC statement with auto commit set to true. You wouldn't want to have it dangling around, though.
Transaction is an unmanaged transaction, basically a statement with auto commit set to false.

Both session and transaction (also including their reactive and asynchronous variants) implement QueryRunner. Probably that is the interface you want to pass out (new on each request) to be used. This what I am doing in Spring Data Neo4j reactive.

Following things not available in JDBC analogy:

  • Our driver has a pool of physical connections. No need to create multiple instances of a Driver for pooling.
  • The sessions have a build in transaction manager (when using readTransaction(Work) and writeTransaction(work)that do automatic retries in some scenarios.

@BalmungSan
Copy link
Author

BalmungSan commented Dec 18, 2020

Hi @michael-simons @technige

We followed your advice about no reusing Session and, for now, we decided to just omit it for now in neotypes and just provide Driver and Transaction.

However, after doing that and trying again to add support for the Rx module, we found the same error again.
I updated the original post with all details, you can see that we do close the Session after each query.
Note that for reproducing the error we need to run the code using a single thread, but this time we do not need to run on AWS or something fancy, my local machine can reproduce the error always.

Let me know if I can do something to help.

@geoffjohn11
Copy link

@michael-simons @technige
Any insight into the problem? We aren't reusing Session, and Session is closed after each query.

@injectives
Copy link
Contributor

Hi @BalmungSan and @geoffjohn11.
This should be fixed with #932.

@BalmungSan
Copy link
Author

Hi @injectives thank you very much for looking into this!

Is there a simple way of testing this fix? Or the best would be to clone this repo and do a local publish and use the produced SNAPSHOT version?

@injectives
Copy link
Contributor

Hi @injectives thank you very much for looking into this!

Is there a simple way of testing this fix? Or the best would be to clone this repo and do a local publish and use the produced SNAPSHOT version?

Happy to fix things 👍

As it has not been released yet, the easiest option is to build a local snapshot and just trying it on local environment (that is how it was tested).

@BalmungSan
Copy link
Author

Hi @injectives I just tested the fix, I can confirm it works both with the little test repo as well as with the real code at neotypes!
Thanks a lot!

@injectives
Copy link
Contributor

Hi @injectives I just tested the fix, I can confirm it works both with the little test repo as well as with the real code at neotypes!
Thanks a lot!

Thanks for providing a comprehensive description and reproduction steps.
A new version should be released next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants