Skip to content

Self-suppression exception when using preferSecondary #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
mateusz-baluch-xtb opened this issue Mar 28, 2025 · 0 comments
Open

Self-suppression exception when using preferSecondary #677

mateusz-baluch-xtb opened this issue Mar 28, 2025 · 0 comments
Labels
status: waiting-for-triage An issue we've not yet triaged

Comments

@mateusz-baluch-xtb
Copy link

Bug Report

I have not been able to find any information about this particular error on the Internet.

Versions

  • Driver: r2dbc-postgres v1.0.7 and r2dbc-pool v1.0.2
  • Database: Postgres 14
  • Java: 17
  • OS: Linux

Current Behavior

When using failover protocol with primary and standby Postgres instances we can set target type to "preferSecondary. In theory when standby instance is down, driver should connect to the primary one. And it's working fine until we add second parallel connection (for example using r2dbc-pool). Then it throws misleading error:

Exception in thread "main" io.r2dbc.postgresql.PostgresqlConnectionFactory$PostgresConnectionException: [08003] Cannot connect to a PREFER_SECONDARY node using [127.0.0.1:5432, 127.0.0.1:5433]. Known server states: {127.0.0.1/:5433=CONNECT_FAIL, 127.0.0.1/:5432=PRIMARY} at io.r2dbc.postgresql.PostgresqlConnectionFactory.cannotConnect(PostgresqlConnectionFactory.java:188) at io.r2dbc.postgresql.PostgresqlConnectionFactory.lambda$doCreateConnection$7(PostgresqlConnectionFactory.java:153) at reactor.core.publisher.Mono.lambda$onErrorMap$28(Mono.java:3783) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2210) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2210) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2210) at reactor.core.publisher.Operators.error(Operators.java:198) at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) at reactor.core.publisher.Mono.subscribe(Mono.java:4496) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102) at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerComplete(FluxConcatMapNoPrefetch.java:286) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:887) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205) at reactor.core.publisher.Operators.complete(Operators.java:137) at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46) at reactor.core.publisher.Mono.subscribe(Mono.java:4496) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180) at reactor.core.publisher.MonoDelayUntil$DelayUntilTrigger.onError(MonoDelayUntil.java:514) at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255) at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213) at reactor.core.publisher.FluxCreate$BaseSink.error(FluxCreate.java:477) at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:866) at reactor.core.publisher.FluxCreate$BufferAsyncSink.error(FluxCreate.java:811) at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:237) at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:213) at reactor.core.publisher.FluxCreate$SerializedFluxSink.error(FluxCreate.java:189) at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.onError(ReactorNettyClient.java:707) at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.close(ReactorNettyClient.java:980) at io.r2dbc.postgresql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:539) at io.r2dbc.postgresql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:522) at io.r2dbc.postgresql.client.ReactorNettyClient.access$200(ReactorNettyClient.java:94) at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onComplete(ReactorNettyClient.java:871) at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223) at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275) at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:483) at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:275) at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:419) at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:445) at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:499) at reactor.netty.channel.ChannelOperations.onInboundClose(ChannelOperations.java:460) at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler.channelInactive(ReactorNettyClient.java:554) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1402) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:900) at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:811) at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:405) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:1583) Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103) at reactor.core.publisher.Flux.blockLast(Flux.java:2753) at pl.baluch.r2dbc.failoverpool.Main.run(Main.java:42) at pl.baluch.r2dbc.failoverpool.Main.main(Main.java:18) Caused by: java.lang.IllegalArgumentException: Self-suppression not permitted at java.base/java.lang.Throwable.addSuppressed(Throwable.java:1096) at io.r2dbc.postgresql.MultiHostConnectionStrategy.lambda$connect$0(MultiHostConnectionStrategy.java:91) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ... 68 more Suppressed: io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: [08006] Connection unexpectedly closed at io.r2dbc.postgresql.client.ReactorNettyClient.lambda$static$0(ReactorNettyClient.java:102) ... 42 more Suppressed: io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: [08006] Connection unexpectedly closed ... 43 more Suppressed: io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: [08006] Connection unexpectedly closed ... 43 more Suppressed: io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: [08006] Connection unexpectedly closed ... 43 more Caused by: [CIRCULAR REFERENCE: io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: [08006] Connection unexpectedly closed]

Table schema

NA

Steps to reproduce

Minimal, Reproducible Example: https://github.com/mateusz-baluch-xtb/R2dbcPostgresFailoverPoolExample

Explanation:

r2dbc-pool is calling MultiHostConnectionStrategy.connect() only once - when we are creating new datasource. Then to create each connection it uses the same Mono which contains references to the same exceptionRef instances.

Code that is important to understand the problem (copied from io.r2dbc.postgresql.MultiHostConnectionStrategy class):

public Mono<Client> connect(TargetServerType targetServerType) {
   AtomicReference<Throwable> exceptionRef = new AtomicReference<>();

   return attemptConnection(targetServerType)
           .onErrorResume(e -> {
              if (!exceptionRef.compareAndSet(null, e)) {
                 exceptionRef.get().addSuppressed(e);
              }
              return Mono.empty();
           })
           .switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY ? attemptConnection(PRIMARY) : Mono.empty()))
           .switchIfEmpty(Mono.error(() -> {
              Throwable error = exceptionRef.get();
              if (error == null) {
                 return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type '%s'", targetServerType), null);
              } else {
                 return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("Cannot connect to a host of %s", this.addresses), error);
              }
           }));
}

private Mono<Client> attemptConnection(TargetServerType targetServerType) {
   AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
   return getCandidates(targetServerType).concatMap(candidate -> this.attemptConnection(targetServerType, candidate)
                   .onErrorResume(e -> {
                      if (!exceptionRef.compareAndSet(null, e)) {
                         exceptionRef.get().addSuppressed(e);
                      }
                      this.statusMap.put(candidate, HostConnectOutcome.fail(candidate));
                      return Mono.empty();
                   }))
           .next()
           .switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
                   ? Mono.error(exceptionRef.get())
                   : Mono.empty()));
}

When we call MultiHostConnectionStrategy.connect() method, it returns Mono<Connection> with references to the same two instances of exceptionRef (one from connect(TargetServerType) and the second one from attemptConnection(TargetServerType)). So when we are using target type PREFER_SECONDARY and the first attemptConnection is not successful, the exceptionRef is filled with that particular exception. Then in connect(TargetServerType) method, we store the same exception in its exceptionRef. First connection is always successful (connected to the primary instance), because exceptionRef in the connect method is empty. All subsequent connections are failing, because the
attemtConnection method is returning the same exception, and then in the connect method there is line:

exceptionRef.get().addSuppressed(e);

This is causing the java.lang.IllegalArgumentException: Self-suppression not permitted exception to be thrown, because e is the same exception as stored in a exceptionRef variable.

Expected behavior/code

When standby instance is not available the connections should always be able to connect to the primary server, even using r2dbc-pool.

Possible Solution

The solution to that problem is to create a new instance of AtomicReference<Throwable> in the attemptConnection and connect methods for each connection attempt. We can do it by wrapping method calls inside defer function (Mono.defer(() -> call())).

Example:

    @Override
    public Mono<Client> connect() {
        return Mono.defer(() -> connect(this.multiHostConfiguration.getTargetServerType()));
    }

    public Mono<Client> connect(TargetServerType targetServerType) {
        AtomicReference<Throwable> exceptionRef = new AtomicReference<>();

        return Mono.defer(() -> attemptConnection(targetServerType))
                ...

Solution explanation:

The Mono.defer method will call connect and attemptConnection for each connection (not only once), and it will create new instances of AtomicReference<Throwable>. This will prevent the java.lang.IllegalArgumentException: Self-suppression not permitted exception from being thrown.

Additional context

NA

@mateusz-baluch-xtb mateusz-baluch-xtb added the status: waiting-for-triage An issue we've not yet triaged label Mar 28, 2025
mateusz-baluch-xtb pushed a commit to mateusz-baluch-xtb/r2dbc-postgresql that referenced this issue Mar 28, 2025
This commit fixes self-suppression exception when using preferSecondary and all standby servers are unavailable
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

1 participant