Skip to content

Commit 59e4592

Browse files
committed
Introduce compatibility mode
We now support a compatibility mode for extended query flows. Enabling compatibility mode fetches all rows in auto-commit mode regardless the configured fetch size. Cursors are only used with explicit transactions and using Sync instead of Flush to work with newer pgpool versions. Compatibility mode is disabled by default to retain semantics of the previous driver version. [fixes #341][#373]
1 parent 9fa16c3 commit 59e4592

11 files changed

+296
-39
lines changed

README.md

+18-8
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
7979
| `database` | Database to select. _(Optional)_
8080
| `applicationName` | The name of the application connecting to the database. Defaults to `r2dbc-postgresql`. _(Optional)_
8181
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
82+
| `compatibilityMode` | Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults to `false`. _(Optional)_
8283
| `errorResponseLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
8384
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
8485
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
@@ -156,21 +157,30 @@ If you'd rather like the latest snapshots of the upcoming major version, use our
156157
</dependency>
157158

158159
<repository>
159-
<id>sonatype-nexus-snapshots</id>
160-
<name>Sonatype OSS Snapshot Repository</name>
161-
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
160+
<id>sonatype-nexus-snapshots</id>
161+
<name>Sonatype OSS Snapshot Repository</name>
162+
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
162163
</repository>
163164
```
164165

166+
## Cursors
167+
168+
R2DBC Postgres supports both, the [simple](https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4)
169+
and [extended](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) message flow.
170+
171+
Cursored fetching is activated by configuring a `fetchSize`. Postgres cursors are valid for the duration of a transaction. R2DBC can use cursors in auto-commit mode (`Execute` and `Flush`) to not
172+
require an explicit transaction (`BEGIN…COMMIT/ROLLBACK`). Newer pgpool versions don't support this feature. To work around this limitation, either use explicit transactions when configuring a fetch
173+
size or enable compatibility mode. Compatibility mode avoids cursors in auto-commit mode (`Execute` with no limit + `Sync`). Cursors in a transaction use `Execute` (with fetch size as limit) + `Sync`
174+
as message flow.
175+
165176
## Listen/Notify
166177

167-
Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same PostgreSQL database.
168-
For Listen/Notify, two actors are involved: The sender (notify) and the receiver (listen). The following example uses two connections
169-
to illustrate how they work together:
178+
Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same PostgreSQL database. For Listen/Notify, two actors are involved: The
179+
sender (notify) and the receiver (listen). The following example uses two connections to illustrate how they work together:
170180

171181
```java
172-
PostgresqlConnection sender = …;
173-
PostgresqlConnection receiver = …;
182+
PostgresqlConnection sender= …;
183+
PostgresqlConnection receiver= …;
174184

175185
Flux<Notification> listen = receiver.createStatement("LISTEN mymessage")
176186
.execute()

pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@
261261
<compilerArgs>
262262
<arg>-Werror</arg>
263263
<arg>-Xlint:all</arg>
264+
<arg>-Xlint:-deprecation</arg>
264265
<arg>-Xlint:-options</arg>
265266
<arg>-Xlint:-processing</arg>
266267
<arg>-Xlint:-serial</arg>

src/main/java/io/r2dbc/postgresql/ExtendedFlowDelegate.java

+81-4
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import io.r2dbc.postgresql.client.Binding;
2323
import io.r2dbc.postgresql.client.Client;
2424
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
25+
import io.r2dbc.postgresql.client.TransactionStatus;
2526
import io.r2dbc.postgresql.message.backend.BackendMessage;
2627
import io.r2dbc.postgresql.message.backend.BindComplete;
28+
import io.r2dbc.postgresql.message.backend.CloseComplete;
2729
import io.r2dbc.postgresql.message.backend.CommandComplete;
2830
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2931
import io.r2dbc.postgresql.message.backend.NoData;
3032
import io.r2dbc.postgresql.message.backend.ParseComplete;
3133
import io.r2dbc.postgresql.message.backend.PortalSuspended;
34+
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
3235
import io.r2dbc.postgresql.message.frontend.Bind;
3336
import io.r2dbc.postgresql.message.frontend.Close;
3437
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
@@ -98,11 +101,23 @@ public static Flux<BackendMessage> runQuery(ConnectionResources resources, Excep
98101
messagesToSend.add(new Describe(portal, PORTAL));
99102

100103
Flux<BackendMessage> exchange;
104+
boolean compatibilityMode = resources.getConfiguration().isCompatibilityMode();
105+
boolean implicitTransactions = resources.getClient().getTransactionStatus() == TransactionStatus.IDLE;
101106

102-
if (fetchSize == NO_LIMIT) {
103-
exchange = fetchAll(messagesToSend, client, portal);
107+
if (compatibilityMode) {
108+
109+
if (fetchSize == NO_LIMIT || implicitTransactions) {
110+
exchange = fetchAll(messagesToSend, client, portal);
111+
} else {
112+
exchange = fetchCursoredWithSync(messagesToSend, client, portal, fetchSize);
113+
}
104114
} else {
105-
exchange = fetchOptimisticCursored(messagesToSend, client, portal, fetchSize);
115+
116+
if (fetchSize == NO_LIMIT) {
117+
exchange = fetchAll(messagesToSend, client, portal);
118+
} else {
119+
exchange = fetchCursoredWithFlush(messagesToSend, client, portal, fetchSize);
120+
}
106121
}
107122

108123
if (prepareRequired) {
@@ -136,6 +151,68 @@ private static Flux<BackendMessage> fetchAll(List<FrontendMessage.DirectEncoder>
136151
.as(Operators::discardOnCancel);
137152
}
138153

154+
/**
155+
* Execute a chunked query and indicate to fetch rows in chunks with the {@link Execute} message.
156+
*
157+
* @param messagesToSend the messages to send
158+
* @param client client to use
159+
* @param portal the portal
160+
* @param fetchSize fetch size per roundtrip
161+
* @return the resulting message stream
162+
*/
163+
private static Flux<BackendMessage> fetchCursoredWithSync(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal, int fetchSize) {
164+
165+
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
166+
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
167+
AtomicBoolean isCanceled = new AtomicBoolean(false);
168+
AtomicBoolean done = new AtomicBoolean(false);
169+
170+
messagesToSend.add(new Execute(portal, fetchSize));
171+
messagesToSend.add(Sync.INSTANCE);
172+
173+
return client.exchange(it -> done.get() && it instanceof ReadyForQuery, Flux.<FrontendMessage>just(new CompositeFrontendMessage(messagesToSend)).concatWith(requestsProcessor))
174+
.handle((BackendMessage message, SynchronousSink<BackendMessage> sink) -> {
175+
176+
if (message instanceof CommandComplete) {
177+
requestsSink.next(new Close(portal, PORTAL));
178+
requestsSink.next(Sync.INSTANCE);
179+
requestsSink.complete();
180+
sink.next(message);
181+
} else if (message instanceof CloseComplete) {
182+
requestsSink.complete();
183+
done.set(true);
184+
sink.next(message);
185+
} else if (message instanceof ErrorResponse) {
186+
done.set(true);
187+
requestsSink.next(Sync.INSTANCE);
188+
requestsSink.complete();
189+
sink.next(message);
190+
} else if (message instanceof PortalSuspended) {
191+
192+
if (isCanceled.get()) {
193+
requestsSink.next(new Close(portal, PORTAL));
194+
requestsSink.next(Sync.INSTANCE);
195+
requestsSink.complete();
196+
} else {
197+
requestsSink.next(new Execute(portal, fetchSize));
198+
requestsSink.next(Sync.INSTANCE);
199+
}
200+
} else if (message instanceof NoData) {
201+
202+
if (isCanceled.get()) {
203+
requestsSink.next(new Close(portal, PORTAL));
204+
requestsSink.next(Sync.INSTANCE);
205+
requestsSink.complete();
206+
} else {
207+
done.set(true);
208+
}
209+
} else {
210+
sink.next(message);
211+
}
212+
}).doFinally(ignore -> requestsSink.complete())
213+
.as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
214+
}
215+
139216
/**
140217
* Execute a contiguous query and indicate to fetch rows in chunks with the {@link Execute} message. Uses {@link Flush}-based synchronization that creates a cursor. Note that flushing keeps the
141218
* cursor open even with implicit transactions and this method may not work with newer pgpool implementations.
@@ -146,7 +223,7 @@ private static Flux<BackendMessage> fetchAll(List<FrontendMessage.DirectEncoder>
146223
* @param fetchSize fetch size per roundtrip
147224
* @return the resulting message stream
148225
*/
149-
private static Flux<BackendMessage> fetchOptimisticCursored(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal, int fetchSize) {
226+
private static Flux<BackendMessage> fetchCursoredWithFlush(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal, int fetchSize) {
150227

151228
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
152229
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public final class PostgresqlConnectionConfiguration {
6666

6767
private final boolean autodetectExtensions;
6868

69+
private final boolean compatibilityMode;
70+
6971
private final Duration connectTimeout;
7072

7173
private final String database;
@@ -105,13 +107,14 @@ public final class PostgresqlConnectionConfiguration {
105107

106108
private final String username;
107109

108-
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions, @Nullable Duration connectTimeout, @Nullable String database, LogLevel errorResponseLogLevel,
110+
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions, @Nullable boolean compatibilityMode,Duration connectTimeout, @Nullable String database, LogLevel errorResponseLogLevel,
109111
List<Extension> extensions, ToIntFunction<String> fetchSize, boolean forceBinary, @Nullable String host, @Nullable LoopResources loopResources,
110112
LogLevel noticeLogLevel, @Nullable Map<String, String> options, @Nullable CharSequence password, int port, boolean preferAttachedBuffers,
111113
int preparedStatementCacheQueries, @Nullable String schema, @Nullable String socket, SSLConfig sslConfig, boolean tcpKeepAlive, boolean tcpNoDelay,
112114
String username) {
113115
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
114116
this.autodetectExtensions = autodetectExtensions;
117+
this.compatibilityMode = compatibilityMode;
115118
this.connectTimeout = connectTimeout;
116119
this.errorResponseLogLevel = errorResponseLogLevel;
117120
this.extensions = Assert.requireNonNull(extensions, "extensions must not be null");
@@ -152,6 +155,7 @@ public String toString() {
152155
return "PostgresqlConnectionConfiguration{" +
153156
"applicationName='" + this.applicationName + '\'' +
154157
", autodetectExtensions='" + this.autodetectExtensions + '\'' +
158+
", compatibilityMode=" + this.compatibilityMode +
155159
", connectTimeout=" + this.connectTimeout +
156160
", errorResponseLogLevel=" + this.errorResponseLogLevel +
157161
", database='" + this.database + '\'' +
@@ -176,6 +180,10 @@ String getApplicationName() {
176180
return this.applicationName;
177181
}
178182

183+
boolean isCompatibilityMode() {
184+
return this.compatibilityMode;
185+
}
186+
179187
@Nullable
180188
Duration getConnectTimeout() {
181189
return this.connectTimeout;
@@ -313,6 +321,8 @@ public static final class Builder {
313321

314322
private boolean autodetectExtensions = true;
315323

324+
private boolean compatibilityMode = false;
325+
316326
@Nullable
317327
private Duration connectTimeout;
318328

@@ -422,11 +432,25 @@ public PostgresqlConnectionConfiguration build() {
422432
throw new IllegalArgumentException("username must not be null");
423433
}
424434

425-
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.errorResponseLogLevel, this.extensions,
426-
this.fetchSize, this.forceBinary, this.host, this.loopResources, this.noticeLogLevel, this.options, this.password, this.port, this.preferAttachedBuffers,
435+
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.compatibilityMode, this.connectTimeout, this.database, this.errorResponseLogLevel, this.extensions,
436+
this.fetchSize
437+
, this.forceBinary, this.host, this.loopResources, this.noticeLogLevel, this.options, this.password, this.port, this.preferAttachedBuffers,
427438
this.preparedStatementCacheQueries, this.schema, this.socket, this.createSslConfig(), this.tcpKeepAlive, this.tcpNoDelay, this.username);
428439
}
429440

441+
/**
442+
* Enables protocol compatibility mode for cursored query execution. Cursored query execution applies when configuring a non-zero fetch size. Compatibility mode uses {@code Execute+Sync}
443+
* messages instead of {@code Execute+Flush}. The default mode uses optimized fetching which does not work with newer pgpool versions.
444+
*
445+
* @param compatibilityMode whether to enable compatibility mode
446+
* @return this {@link Builder}
447+
* @since 0.8.7
448+
*/
449+
public Builder compatibilityMode(boolean compatibilityMode) {
450+
this.compatibilityMode = compatibilityMode;
451+
return this;
452+
}
453+
430454
/**
431455
* Configure the connection timeout. Default unconfigured.
432456
*
@@ -786,6 +810,7 @@ public String toString() {
786810
return "Builder{" +
787811
"applicationName='" + this.applicationName + '\'' +
788812
", autodetectExtensions='" + this.autodetectExtensions + '\'' +
813+
", compatibilityMode='" + this.compatibilityMode + '\'' +
789814
", connectTimeout='" + this.connectTimeout + '\'' +
790815
", database='" + this.database + '\'' +
791816
", extensions='" + this.extensions + '\'' +

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProvider.java

+8
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
5757
*/
5858
public static final Option<Boolean> AUTODETECT_EXTENSIONS = Option.valueOf("autodetectExtensions");
5959

60+
/**
61+
* Compatibility query mode for cursored query execution.
62+
*
63+
* @since 0.8.7
64+
*/
65+
public static final Option<Boolean> COMPATIBILITY_MODE = Option.valueOf("compatibilityMode");
66+
6067
/**
6168
* Error Response Log Level.
6269
*/
@@ -226,6 +233,7 @@ private static PostgresqlConnectionConfiguration.Builder fromConnectionFactoryOp
226233

227234
mapper.from(APPLICATION_NAME).to(builder::applicationName);
228235
mapper.from(AUTODETECT_EXTENSIONS).map(OptionMapper::toBoolean).to(builder::autodetectExtensions);
236+
mapper.from(COMPATIBILITY_MODE).map(OptionMapper::toBoolean).to(builder::compatibilityMode);
229237
mapper.from(CONNECT_TIMEOUT).map(OptionMapper::toDuration).to(builder::connectTimeout);
230238
mapper.from(CURRENT_SCHEMA).to(builder::schema).otherwise(() -> mapper.from(SCHEMA).to(builder::schema));
231239
mapper.from(DATABASE).to(builder::database);

src/test/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatementIntegrationTests.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,31 @@
2121
import org.junit.jupiter.api.AfterEach;
2222
import org.junit.jupiter.api.BeforeEach;
2323
import org.junit.jupiter.api.Test;
24+
import org.springframework.jdbc.core.JdbcOperations;
2425
import reactor.core.publisher.Flux;
2526
import reactor.test.StepVerifier;
2627

2728
/**
28-
* Integration tests for {@link ExtendedQueryPostgresqlStatement}
29+
* Integration tests for {@link ExtendedQueryPostgresqlStatement}.
2930
*/
30-
final class ExtendedQueryPostgresqlStatementIntegrationTests extends AbstractIntegrationTests {
31+
class ExtendedQueryPostgresqlStatementIntegrationTests extends AbstractIntegrationTests {
3132

3233
@BeforeEach
3334
void setUp() {
3435
super.setUp();
35-
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
36-
SERVER.getJdbcOperations().execute("CREATE TABLE test (id SERIAL PRIMARY KEY, val VARCHAR(255))");
37-
SERVER.getJdbcOperations().execute("INSERT INTO test (val) VALUES ('a'), ('a'), ('b'), ('c'), ('c')");
36+
getJdbcOperations().execute("DROP TABLE IF EXISTS test");
37+
getJdbcOperations().execute("CREATE TABLE test (id SERIAL PRIMARY KEY, val VARCHAR(255))");
38+
getJdbcOperations().execute("INSERT INTO test (val) VALUES ('a'), ('a'), ('b'), ('c'), ('c')");
3839
}
3940

4041
@AfterEach
4142
void tearDown() {
4243
super.tearDown();
43-
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
44+
getJdbcOperations().execute("DROP TABLE IF EXISTS test");
45+
}
46+
47+
private JdbcOperations getJdbcOperations() {
48+
return SERVER.getJdbcOperations();
4449
}
4550

4651
@Override

0 commit comments

Comments
 (0)