Skip to content

Commit f5cca8e

Browse files
authored
Merge pull request #229 from r2dbc/refcursor
Add support for RefCursor
2 parents 2748759 + 1e525fb commit f5cca8e

23 files changed

+809
-198
lines changed

README.md

+23-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ This driver provides the following features:
1515
* Execution of prepared statements with bindings
1616
* Execution of batch statements without bindings
1717
* Read and write support for all data types except LOB types (e.g. `BLOB`, `CLOB`)
18+
* Fetching of `REFCURSOR` using `io.r2dbc.postgresql.api.RefCursor`
1819
* Extension points to register `Codec`s to handle additional PostgreSQL data types
1920

2021
Next steps:
@@ -200,7 +201,8 @@ CREATE TABLE my_table (my_json JSON);
200201
```java
201202
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1)")
202203
.bind("$1", Json.of("{\"hello\": \"world\"}")).execute();
203-
```
204+
```
205+
204206
**Consume JSON**
205207

206208
```java
@@ -209,12 +211,13 @@ connection.createStatement("SELECT my_json FROM my_table")
209211
.flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", Json.class)))
210212
.map(Json::asString);
211213
```
214+
212215
**Write JSON using casting**
213216

214217
```java
215218
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1::JSON)")
216219
.bind("$1", "{\"hello\": \"world\"}").execute();
217-
```
220+
```
218221

219222
**Consume JSON as scalar type**
220223

@@ -233,6 +236,24 @@ The following types are supported for JSON exchange:
233236
* `String`
234237
* `InputStream` (must be closed after usage to avoid memory leaks)
235238

239+
## Cursors
240+
241+
The driver can consume cursors that were created by PL/pgSQL as `refcursor`.
242+
Cursors are represented as `RefCursor` objects. Cursors obtained from `Result` can be used to fetch the cursor directly.
243+
Since cursors are stateful, they must be closed once they are no longer in use.
244+
245+
```java
246+
connection.createStatement("SELECT show_cities_multiple()").execute()
247+
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, RefCursor.class)))
248+
.flatMap(cursor -> {
249+
Mono<PostgresResult> data = cursor.fetch()
250+
.flatMap(…)
251+
.then(rc.close());
252+
return data;
253+
});
254+
255+
```
256+
236257
## Logical Decode
237258

238259
PostgreSQL allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.api.PostgresqlConnection;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.codec.Codecs;
22+
23+
/**
24+
* Value object capturing contextual connection resources such as {@link Client}, {@link Codecs} and the {@link PostgresqlConnection connection facade}.
25+
*/
26+
final class ConnectionContext {
27+
28+
private final Client client;
29+
30+
private final Codecs codecs;
31+
32+
private final PostgresqlConnection connection;
33+
34+
ConnectionContext(Client client, Codecs codecs, PostgresqlConnection connection) {
35+
this.client = client;
36+
this.codecs = codecs;
37+
this.connection = connection;
38+
}
39+
40+
public Client getClient() {
41+
return this.client;
42+
}
43+
44+
public Codecs getCodecs() {
45+
return this.codecs;
46+
}
47+
48+
public PostgresqlConnection getConnection() {
49+
return this.connection;
50+
}
51+
52+
@Override
53+
public String toString() {
54+
return "ConnectionContext{" +
55+
"client=" + this.client +
56+
", codecs=" + this.codecs +
57+
", connection=" + this.connection +
58+
'}';
59+
}
60+
}

src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java

+10-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,10 +18,8 @@
1818

1919
import io.r2dbc.postgresql.api.PostgresqlStatement;
2020
import io.r2dbc.postgresql.client.Binding;
21-
import io.r2dbc.postgresql.client.Client;
2221
import io.r2dbc.postgresql.client.ExtendedQueryMessageFlow;
2322
import io.r2dbc.postgresql.client.PortalNameSupplier;
24-
import io.r2dbc.postgresql.codec.Codecs;
2523
import io.r2dbc.postgresql.message.backend.BackendMessage;
2624
import io.r2dbc.postgresql.message.backend.BindComplete;
2725
import io.r2dbc.postgresql.message.backend.CloseComplete;
@@ -48,9 +46,7 @@ final class ExtendedQueryPostgresqlStatement implements PostgresqlStatement {
4846

4947
private final Bindings bindings;
5048

51-
private final Client client;
52-
53-
private final Codecs codecs;
49+
private final ConnectionContext context;
5450

5551
private final boolean forceBinary;
5652

@@ -62,9 +58,9 @@ final class ExtendedQueryPostgresqlStatement implements PostgresqlStatement {
6258

6359
private String[] generatedColumns;
6460

65-
ExtendedQueryPostgresqlStatement(Client client, Codecs codecs, PortalNameSupplier portalNameSupplier, String sql, StatementCache statementCache, boolean forceBinary) {
66-
this.client = Assert.requireNonNull(client, "client must not be null");
67-
this.codecs = Assert.requireNonNull(codecs, "codecs must not be null");
61+
ExtendedQueryPostgresqlStatement(ConnectionContext context, PortalNameSupplier portalNameSupplier, String sql, StatementCache statementCache,
62+
boolean forceBinary) {
63+
this.context = Assert.requireNonNull(context, "context must not be null");
6864
this.portalNameSupplier = Assert.requireNonNull(portalNameSupplier, "portalNameSupplier must not be null");
6965
this.sql = Assert.requireNonNull(sql, "sql must not be null");
7066
this.statementCache = Assert.requireNonNull(statementCache, "statementCache must not be null");
@@ -91,7 +87,7 @@ public ExtendedQueryPostgresqlStatement bind(String identifier, Object value) {
9187
public ExtendedQueryPostgresqlStatement bind(int index, Object value) {
9288
Assert.requireNonNull(value, "value must not be null");
9389

94-
this.bindings.getCurrent().add(index, this.codecs.encode(value));
90+
this.bindings.getCurrent().add(index, this.context.getCodecs().encode(value));
9591

9692
return this;
9793
}
@@ -110,7 +106,7 @@ public ExtendedQueryPostgresqlStatement bindNull(String identifier, Class<?> typ
110106
public ExtendedQueryPostgresqlStatement bindNull(int index, Class<?> type) {
111107
Assert.requireNonNull(type, "type must not be null");
112108

113-
this.bindings.getCurrent().add(index, this.codecs.encodeNull(type));
109+
this.bindings.getCurrent().add(index, this.context.getCodecs().encodeNull(type));
114110
return this;
115111
}
116112

@@ -143,8 +139,7 @@ public ExtendedQueryPostgresqlStatement returnGeneratedValues(String... columns)
143139
public String toString() {
144140
return "ExtendedQueryPostgresqlStatement{" +
145141
"bindings=" + this.bindings +
146-
", client=" + this.client +
147-
", codecs=" + this.codecs +
142+
", context=" + this.context +
148143
", forceBinary=" + this.forceBinary +
149144
", portalNameSupplier=" + this.portalNameSupplier +
150145
", sql='" + this.sql + '\'' +
@@ -183,10 +178,10 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
183178
ExceptionFactory factory = ExceptionFactory.withSql(sql);
184179
return this.statementCache.getName(this.bindings.first(), sql)
185180
.flatMapMany(name -> ExtendedQueryMessageFlow
186-
.execute(Flux.fromIterable(this.bindings.bindings), this.client, this.portalNameSupplier, name, sql, this.forceBinary))
181+
.execute(Flux.fromIterable(this.bindings.bindings), this.context.getClient(), this.portalNameSupplier, name, sql, this.forceBinary))
187182
.filter(RESULT_FRAME_FILTER)
188183
.windowUntil(CloseComplete.class::isInstance)
189-
.map(messages -> PostgresqlResult.toResult(this.codecs, messages, factory));
184+
.map(messages -> PostgresqlResult.toResult(this.context, messages, factory));
190185
}
191186

192187
private int getIndex(String identifier) {

src/main/java/io/r2dbc/postgresql/PostgresqlBatch.java

+6-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,6 @@
1616

1717
package io.r2dbc.postgresql;
1818

19-
import io.r2dbc.postgresql.client.Client;
20-
import io.r2dbc.postgresql.codec.Codecs;
2119
import io.r2dbc.postgresql.util.Assert;
2220
import io.r2dbc.spi.Batch;
2321
import reactor.core.publisher.Flux;
@@ -30,15 +28,12 @@
3028
*/
3129
final class PostgresqlBatch implements io.r2dbc.postgresql.api.PostgresqlBatch {
3230

33-
private final Client client;
34-
35-
private final Codecs codecs;
31+
private final ConnectionContext context;
3632

3733
private final List<String> statements = new ArrayList<>();
3834

39-
PostgresqlBatch(Client client, Codecs codecs) {
40-
this.client = Assert.requireNonNull(client, "client must not be null");
41-
this.codecs = Assert.requireNonNull(codecs, "codecs must not be null");
35+
PostgresqlBatch(ConnectionContext context) {
36+
this.context = Assert.requireNonNull(context, "context must not be null");
4237
}
4338

4439
@Override
@@ -55,15 +50,14 @@ public PostgresqlBatch add(String sql) {
5550

5651
@Override
5752
public Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute() {
58-
return new SimpleQueryPostgresqlStatement(this.client, this.codecs, String.join("; ", this.statements))
53+
return new SimpleQueryPostgresqlStatement(this.context, String.join("; ", this.statements))
5954
.execute();
6055
}
6156

6257
@Override
6358
public String toString() {
6459
return "PostgresqlBatch{" +
65-
"client=" + this.client +
66-
", codecs=" + this.codecs +
60+
"context=" + this.context +
6761
", statements=" + this.statements +
6862
'}';
6963
}

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -53,6 +53,8 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo
5353

5454
private final Logger logger = Loggers.getLogger(this.getClass());
5555

56+
private final ConnectionContext context;
57+
5658
private final Client client;
5759

5860
private final Codecs codecs;
@@ -70,13 +72,14 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo
7072
private volatile IsolationLevel isolationLevel;
7173

7274
PostgresqlConnection(Client client, Codecs codecs, PortalNameSupplier portalNameSupplier, StatementCache statementCache, IsolationLevel isolationLevel, boolean forceBinary) {
75+
this.context = new ConnectionContext(client, codecs, this);
7376
this.client = Assert.requireNonNull(client, "client must not be null");
7477
this.codecs = Assert.requireNonNull(codecs, "codecs must not be null");
7578
this.portalNameSupplier = Assert.requireNonNull(portalNameSupplier, "portalNameSupplier must not be null");
7679
this.statementCache = Assert.requireNonNull(statementCache, "statementCache must not be null");
7780
this.forceBinary = forceBinary;
7881
this.isolationLevel = Assert.requireNonNull(isolationLevel, "isolationLevel must not be null");
79-
this.validationQuery = new SimpleQueryPostgresqlStatement(this.client, this.codecs, "SELECT 1").fetchSize(0).execute().flatMap(PostgresqlResult::getRowsUpdated);
82+
this.validationQuery = new SimpleQueryPostgresqlStatement(this.context, "SELECT 1").fetchSize(0).execute().flatMap(PostgresqlResult::getRowsUpdated);
8083
}
8184

8285
Client getClient() {
@@ -121,7 +124,7 @@ public Mono<Void> commitTransaction() {
121124

122125
@Override
123126
public PostgresqlBatch createBatch() {
124-
return new PostgresqlBatch(this.client, this.codecs);
127+
return new PostgresqlBatch(this.context);
125128
}
126129

127130
@Override
@@ -144,9 +147,9 @@ public PostgresqlStatement createStatement(String sql) {
144147
Assert.requireNonNull(sql, "sql must not be null");
145148

146149
if (SimpleQueryPostgresqlStatement.supports(sql)) {
147-
return new SimpleQueryPostgresqlStatement(this.client, this.codecs, sql);
150+
return new SimpleQueryPostgresqlStatement(this.context, sql);
148151
} else if (ExtendedQueryPostgresqlStatement.supports(sql)) {
149-
return new ExtendedQueryPostgresqlStatement(this.client, this.codecs, this.portalNameSupplier, sql, this.statementCache, this.forceBinary);
152+
return new ExtendedQueryPostgresqlStatement(this.context, this.portalNameSupplier, sql, this.statementCache, this.forceBinary);
150153
} else {
151154
throw new IllegalArgumentException(String.format("Statement '%s' cannot be created. This is often due to the presence of both multiple statements and parameters at the same time.", sql));
152155
}

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -162,17 +162,21 @@ private Mono<PostgresqlConnection> doCreateConnection(boolean forReplication, @N
162162
})
163163
)
164164
.flatMap(client -> {
165+
165166
DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator());
167+
StatementCache statementCache = StatementCache.fromPreparedStatementCacheQueries(client, this.configuration.getPreparedStatementCacheQueries());
168+
169+
// early connection object to retrieve initialization details
170+
PostgresqlConnection earlyConnection = new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, IsolationLevel.READ_COMMITTED,
171+
this.configuration.isForceBinary());
166172

167173
Mono<IsolationLevel> isolationLevelMono = Mono.just(IsolationLevel.READ_COMMITTED);
168174
if (!forReplication) {
169-
isolationLevelMono = getIsolationLevel(client, codecs);
175+
isolationLevelMono = getIsolationLevel(earlyConnection);
170176
}
171-
172-
StatementCache statementCache = StatementCache.fromPreparedStatementCacheQueries(client, this.configuration.getPreparedStatementCacheQueries());
173-
174177
return isolationLevelMono
175-
.map(it -> new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, it, this.configuration.isForceBinary()))
178+
// actual connection to be used
179+
.map(isolationLevel -> new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, isolationLevel, this.configuration.isForceBinary()))
176180
.delayUntil(connection -> {
177181
return prepareConnection(connection, client.getByteBufAllocator(), codecs);
178182
})
@@ -251,8 +255,8 @@ private AuthenticationHandler getAuthenticationHandler(AuthenticationMessage mes
251255
}
252256
}
253257

254-
private Mono<IsolationLevel> getIsolationLevel(Client client, DefaultCodecs codecs) {
255-
return new SimpleQueryPostgresqlStatement(client, codecs, "SHOW TRANSACTION ISOLATION LEVEL")
258+
private Mono<IsolationLevel> getIsolationLevel(io.r2dbc.postgresql.api.PostgresqlConnection connection) {
259+
return connection.createStatement("SHOW TRANSACTION ISOLATION LEVEL")
256260
.execute()
257261
.flatMap(it -> it.map((row, rowMetadata) -> {
258262
String level = row.get(0, String.class);

0 commit comments

Comments
 (0)