Skip to content

Commit bb59714

Browse files
committed
Await ReadyForQuery before emitting errors from transactional control methods.
commitTransaction, rollbackTransaction and other methods now await completion of the exchange before emitting error signals to properly synchronize completion. Previously, error signals were emitted before updating the transaction state which could lead to invalid cleanup states if e.g. the commit failed. [resolves #541] Signed-off-by: Mark Paluch <[email protected]>
1 parent 03ca360 commit bb59714

File tree

3 files changed

+47
-10
lines changed

3 files changed

+47
-10
lines changed

src/main/java/io/r2dbc/postgresql/ExceptionFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ static ExceptionFactory withSql(String sql) {
6161
* @return the {@link R2dbcException}.
6262
* @see ErrorResponse
6363
*/
64-
private static R2dbcException createException(ErrorResponse response, String sql) {
64+
static R2dbcException createException(ErrorResponse response, String sql) {
6565
return createException(new ErrorDetails(response.getFields()), sql);
6666
}
6767

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
import io.r2dbc.postgresql.codec.Codecs;
3131
import io.r2dbc.postgresql.message.backend.BackendMessage;
3232
import io.r2dbc.postgresql.message.backend.CommandComplete;
33+
import io.r2dbc.postgresql.message.backend.ErrorResponse;
3334
import io.r2dbc.postgresql.message.backend.NotificationResponse;
3435
import io.r2dbc.postgresql.util.Assert;
3536
import io.r2dbc.postgresql.util.Operators;
3637
import io.r2dbc.spi.Connection;
3738
import io.r2dbc.spi.IsolationLevel;
3839
import io.r2dbc.spi.Option;
40+
import io.r2dbc.spi.R2dbcException;
3941
import io.r2dbc.spi.TransactionDefinition;
4042
import io.r2dbc.spi.ValidationDepth;
4143
import org.reactivestreams.Publisher;
@@ -189,6 +191,8 @@ public Mono<Void> cancelRequest() {
189191

190192
@Override
191193
public Mono<Void> commitTransaction() {
194+
195+
AtomicReference<R2dbcException> ref = new AtomicReference<>();
192196
return useTransactionStatus(transactionStatus -> {
193197
if (IDLE != transactionStatus) {
194198
return Flux.from(exchange("COMMIT"))
@@ -203,13 +207,17 @@ public Mono<Void> commitTransaction() {
203207
// See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org
204208

205209
if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) {
206-
sink.error(new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction" +
207-
" " +
208-
"failure is not known (check server logs?)"), "COMMIT"));
210+
ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " +
211+
"failure is not known (check server logs?)");
212+
ref.set(new ExceptionFactory.PostgresqlRollbackException(details, "COMMIT"));
209213
return;
210214
}
211215

212216
sink.next(message);
217+
}).doOnComplete(() -> {
218+
if (ref.get() != null) {
219+
throw ref.get();
220+
}
213221
});
214222
} else {
215223
this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus);
@@ -452,9 +460,21 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
452460

453461
@SuppressWarnings("unchecked")
454462
private <T> Flux<T> exchange(String sql) {
455-
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
463+
AtomicReference<R2dbcException> ref = new AtomicReference<>();
456464
return (Flux<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
457-
.handle(exceptionFactory::handleErrorResponse);
465+
.handle((backendMessage, synchronousSink) -> {
466+
467+
if (backendMessage instanceof ErrorResponse) {
468+
ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql));
469+
} else {
470+
synchronousSink.next(backendMessage);
471+
}
472+
})
473+
.doOnComplete(() -> {
474+
if (ref.get() != null) {
475+
throw ref.get();
476+
}
477+
});
458478
}
459479

460480
private void cleanupIsolationLevel() {

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import io.r2dbc.postgresql.api.PostgresqlConnection;
2020
import io.r2dbc.postgresql.api.PostgresqlResult;
21+
import io.r2dbc.postgresql.client.Client;
22+
import io.r2dbc.postgresql.client.TransactionStatus;
2123
import io.r2dbc.spi.R2dbcBadGrammarException;
22-
import io.r2dbc.spi.R2dbcRollbackException;
23-
import org.awaitility.Awaitility;
24+
import io.r2dbc.spi.R2dbcException;
2425
import org.junit.jupiter.api.Test;
2526
import reactor.test.StepVerifier;
2627

28+
import java.lang.reflect.Field;
29+
2730
import static org.assertj.core.api.Assertions.assertThat;
2831

2932
/**
@@ -37,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() {
3740
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
3841
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);
3942

40-
this.connection.commitTransaction().as(StepVerifier::create).verifyError(R2dbcRollbackException.class);
43+
this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> {
44+
assertThat(throwable).isInstanceOf(R2dbcException.class);
45+
46+
Client client = extractClient();
47+
assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE);
48+
});
4149

42-
Awaitility.await().until(() -> this.connection.isAutoCommit());
4350
assertThat(this.connection.isAutoCommit()).isTrue();
4451
}
4552

53+
private Client extractClient() {
54+
try {
55+
Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client");
56+
field.setAccessible(true);
57+
return (Client) field.get(this.connection);
58+
} catch (ReflectiveOperationException e) {
59+
throw new RuntimeException(e);
60+
}
61+
}
62+
4663
@Test
4764
void rollbackShouldRecoverFromFailedTransaction() {
4865

0 commit comments

Comments
 (0)