Skip to content

Commit bb58415

Browse files
committed
Fix commitTransaction() for failed transactions
Failed transactions are now properly cleaned up when calling commitTransaction(). Previously, failed transactions were left untouched on commit. This change aligns with PGJDBC's behavior. [resolves #274]
1 parent cde3c86 commit bb58415

File tree

4 files changed

+79
-4
lines changed

4 files changed

+79
-4
lines changed

pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
<properties>
3535
<assertj.version>3.16.1</assertj.version>
36+
<awaitility.version>4.0.3</awaitility.version>
3637
<blockhound.version>1.0.3.RELEASE</blockhound.version>
3738
<hikari-cp.version>3.4.5</hikari-cp.version>
3839
<java.version>1.8</java.version>
@@ -186,6 +187,12 @@
186187
<version>${assertj.version}</version>
187188
<scope>test</scope>
188189
</dependency>
190+
<dependency>
191+
<groupId>org.awaitility</groupId>
192+
<artifactId>awaitility</artifactId>
193+
<version>${awaitility.version}</version>
194+
<scope>test</scope>
195+
</dependency>
189196
<dependency>
190197
<groupId>org.junit.jupiter</groupId>
191198
<artifactId>junit-jupiter-api</artifactId>

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public Mono<Void> close() {
107107
@Override
108108
public Mono<Void> commitTransaction() {
109109
return useTransactionStatus(transactionStatus -> {
110-
if (OPEN == transactionStatus) {
110+
if (IDLE != transactionStatus) {
111111
return exchange("COMMIT");
112112
} else {
113113
this.logger.debug("Skipping commit transaction because status is {}", transactionStatus);
@@ -336,9 +336,10 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
336336
return Mono.defer(() -> Mono.just(f.apply(this.client.getTransactionStatus())));
337337
}
338338

339-
private Publisher<?> exchange(String sql) {
339+
@SuppressWarnings("unchecked")
340+
private <T> Publisher<T> exchange(String sql) {
340341
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
341-
return SimpleQueryMessageFlow.exchange(this.client, sql)
342+
return (Publisher<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
342343
.handle(exceptionFactory::handleErrorResponse)
343344
.as(Operators::discardOnCancel);
344345
}

src/main/java/io/r2dbc/postgresql/api/ErrorDetails.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.r2dbc.postgresql.message.backend.Field.FieldType;
2222
import io.r2dbc.postgresql.util.Assert;
2323

24+
import java.util.Collections;
2425
import java.util.HashMap;
2526
import java.util.List;
2627
import java.util.Map;
@@ -89,7 +90,7 @@ public final class ErrorDetails {
8990

9091

9192
/**
92-
* Creates a new exception.
93+
* Create new {@link ErrorDetails} from {@link List} of {@link Field fields}.
9394
*
9495
* @param fields the fields to be used to populate the exception
9596
* @throws IllegalArgumentException if {@code fields} is {@code null}
@@ -120,6 +121,16 @@ private ErrorDetails(Map<FieldType, String> fields) {
120121
this.where = fields.get(WHERE);
121122
}
122123

124+
/**
125+
* Create a new {@link ErrorDetails}
126+
*
127+
* @param message the error message
128+
* @return the {@link ErrorDetails} object
129+
*/
130+
public static ErrorDetails fromMessage(String message) {
131+
return new ErrorDetails(Collections.singletonMap(MESSAGE, message));
132+
}
133+
123134
@Override
124135
public boolean equals(Object o) {
125136
if (this == o) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.api.PostgresqlConnection;
20+
import io.r2dbc.postgresql.api.PostgresqlResult;
21+
import io.r2dbc.spi.R2dbcBadGrammarException;
22+
import org.awaitility.Awaitility;
23+
import org.junit.jupiter.api.Test;
24+
import reactor.test.StepVerifier;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
28+
/**
29+
* Integration tests for various error cases using {@link PostgresqlConnection}.
30+
*/
31+
final class PostgresqlConnectionErrorsIntegrationTests extends AbstractIntegrationTests {
32+
33+
@Test
34+
void commitShouldRecoverFromFailedTransaction() {
35+
36+
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
37+
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);
38+
39+
this.connection.commitTransaction().as(StepVerifier::create).verifyComplete();
40+
41+
Awaitility.await().until(() -> this.connection.isAutoCommit());
42+
assertThat(this.connection.isAutoCommit()).isTrue();
43+
}
44+
45+
@Test
46+
void rollbackShouldRecoverFromFailedTransaction() {
47+
48+
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
49+
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);
50+
51+
this.connection.rollbackTransaction().as(StepVerifier::create).verifyComplete();
52+
53+
assertThat(this.connection.isAutoCommit()).isTrue();
54+
}
55+
56+
}

0 commit comments

Comments
 (0)