Skip to content

Commit 3bf6a42

Browse files
committed
Await ReadyForQuery before emitting errors from transactional control methods.
commitTransaction, rollbackTransaction and other methods now await completion of the exchange before emitting error signals to properly synchronize completion. Previously, error signals were emitted before updating the transaction state which could lead to invalid cleanup states if e.g. the commit failed. [resolves #541] Signed-off-by: Mark Paluch <[email protected]>
1 parent 67806b6 commit 3bf6a42

File tree

3 files changed

+47
-10
lines changed

3 files changed

+47
-10
lines changed

src/main/java/io/r2dbc/postgresql/ExceptionFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ static ExceptionFactory withSql(String sql) {
6161
* @return the {@link R2dbcException}.
6262
* @see ErrorResponse
6363
*/
64-
private static R2dbcException createException(ErrorResponse response, String sql) {
64+
static R2dbcException createException(ErrorResponse response, String sql) {
6565
return createException(new ErrorDetails(response.getFields()), sql);
6666
}
6767

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import io.r2dbc.postgresql.codec.Codecs;
3030
import io.r2dbc.postgresql.message.backend.BackendMessage;
3131
import io.r2dbc.postgresql.message.backend.CommandComplete;
32+
import io.r2dbc.postgresql.message.backend.ErrorResponse;
3233
import io.r2dbc.postgresql.message.backend.NotificationResponse;
3334
import io.r2dbc.postgresql.util.Assert;
3435
import io.r2dbc.postgresql.util.Operators;
3536
import io.r2dbc.spi.Connection;
3637
import io.r2dbc.spi.IsolationLevel;
3738
import io.r2dbc.spi.Option;
39+
import io.r2dbc.spi.R2dbcException;
3840
import io.r2dbc.spi.TransactionDefinition;
3941
import io.r2dbc.spi.ValidationDepth;
4042
import org.reactivestreams.Publisher;
@@ -184,6 +186,8 @@ public Mono<Void> cancelRequest() {
184186

185187
@Override
186188
public Mono<Void> commitTransaction() {
189+
190+
AtomicReference<R2dbcException> ref = new AtomicReference<>();
187191
return useTransactionStatus(transactionStatus -> {
188192
if (IDLE != transactionStatus) {
189193
return Flux.from(exchange("COMMIT"))
@@ -198,13 +202,17 @@ public Mono<Void> commitTransaction() {
198202
// See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org
199203

200204
if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) {
201-
sink.error(new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction" +
202-
" " +
203-
"failure is not known (check server logs?)"), "COMMIT"));
205+
ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " +
206+
"failure is not known (check server logs?)");
207+
ref.set(new ExceptionFactory.PostgresqlRollbackException(details, "COMMIT"));
204208
return;
205209
}
206210

207211
sink.next(message);
212+
}).doOnComplete(() -> {
213+
if (ref.get() != null) {
214+
throw ref.get();
215+
}
208216
});
209217
} else {
210218
this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus);
@@ -442,9 +450,21 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
442450

443451
@SuppressWarnings("unchecked")
444452
private <T> Flux<T> exchange(String sql) {
445-
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
453+
AtomicReference<R2dbcException> ref = new AtomicReference<>();
446454
return (Flux<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
447-
.handle(exceptionFactory::handleErrorResponse);
455+
.handle((backendMessage, synchronousSink) -> {
456+
457+
if (backendMessage instanceof ErrorResponse) {
458+
ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql));
459+
} else {
460+
synchronousSink.next(backendMessage);
461+
}
462+
})
463+
.doOnComplete(() -> {
464+
if (ref.get() != null) {
465+
throw ref.get();
466+
}
467+
});
448468
}
449469

450470
private void cleanupIsolationLevel() {

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import io.r2dbc.postgresql.api.PostgresqlConnection;
2020
import io.r2dbc.postgresql.api.PostgresqlResult;
21+
import io.r2dbc.postgresql.client.Client;
22+
import io.r2dbc.postgresql.client.TransactionStatus;
2123
import io.r2dbc.spi.R2dbcBadGrammarException;
22-
import io.r2dbc.spi.R2dbcRollbackException;
23-
import org.awaitility.Awaitility;
24+
import io.r2dbc.spi.R2dbcException;
2425
import org.junit.jupiter.api.Test;
2526
import reactor.test.StepVerifier;
2627

28+
import java.lang.reflect.Field;
29+
2730
import static org.assertj.core.api.Assertions.assertThat;
2831

2932
/**
@@ -37,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() {
3740
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
3841
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);
3942

40-
this.connection.commitTransaction().as(StepVerifier::create).verifyError(R2dbcRollbackException.class);
43+
this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> {
44+
assertThat(throwable).isInstanceOf(R2dbcException.class);
45+
46+
Client client = extractClient();
47+
assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE);
48+
});
4149

42-
Awaitility.await().until(() -> this.connection.isAutoCommit());
4350
assertThat(this.connection.isAutoCommit()).isTrue();
4451
}
4552

53+
private Client extractClient() {
54+
try {
55+
Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client");
56+
field.setAccessible(true);
57+
return (Client) field.get(this.connection);
58+
} catch (ReflectiveOperationException e) {
59+
throw new RuntimeException(e);
60+
}
61+
}
62+
4663
@Test
4764
void rollbackShouldRecoverFromFailedTransaction() {
4865

0 commit comments

Comments
 (0)