Skip to content

Commit 1e525fb

Browse files
committed
Add support for RefCursor
We now can decode a RefCursor from a Row and fetch the data that the cursor holds. RefCursor cursor = row.get("my_store_procedure", RefCursor.class); // later cursor.fetch().then(cursor.close()); [closes #173]
1 parent 5d9dad7 commit 1e525fb

11 files changed

+539
-7
lines changed

README.md

+23-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ This driver provides the following features:
1515
* Execution of prepared statements with bindings
1616
* Execution of batch statements without bindings
1717
* Read and write support for all data types except LOB types (e.g. `BLOB`, `CLOB`)
18+
* Fetching of `REFCURSOR` using `io.r2dbc.postgresql.api.RefCursor`
1819
* Extension points to register `Codec`s to handle additional PostgreSQL data types
1920

2021
Next steps:
@@ -200,7 +201,8 @@ CREATE TABLE my_table (my_json JSON);
200201
```java
201202
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1)")
202203
.bind("$1", Json.of("{\"hello\": \"world\"}")).execute();
203-
```
204+
```
205+
204206
**Consume JSON**
205207

206208
```java
@@ -209,12 +211,13 @@ connection.createStatement("SELECT my_json FROM my_table")
209211
.flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", Json.class)))
210212
.map(Json::asString);
211213
```
214+
212215
**Write JSON using casting**
213216

214217
```java
215218
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1::JSON)")
216219
.bind("$1", "{\"hello\": \"world\"}").execute();
217-
```
220+
```
218221

219222
**Consume JSON as scalar type**
220223

@@ -233,6 +236,24 @@ The following types are supported for JSON exchange:
233236
* `String`
234237
* `InputStream` (must be closed after usage to avoid memory leaks)
235238

239+
## Cursors
240+
241+
The driver can consume cursors that were created by PL/pgSQL as `refcursor`.
242+
Cursors are represented as `RefCursor` objects. Cursors obtained from `Result` can be used to fetch the cursor directly.
243+
Since cursors are stateful, they must be closed once they are no longer in use.
244+
245+
```java
246+
connection.createStatement("SELECT show_cities_multiple()").execute()
247+
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, RefCursor.class)))
248+
.flatMap(cursor -> {
249+
Mono<PostgresResult> data = cursor.fetch()
250+
.flatMap(…)
251+
.then(rc.close());
252+
return data;
253+
});
254+
255+
```
256+
236257
## Logical Decode
237258

238259
PostgreSQL allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data.

src/main/java/io/r2dbc/postgresql/ConnectionContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.r2dbc.postgresql.codec.Codecs;
2222

2323
/**
24-
* @author Mark Paluch
24+
* Value object capturing contextual connection resources such as {@link Client}, {@link Codecs} and the {@link PostgresqlConnection connection facade}.
2525
*/
2626
final class ConnectionContext {
2727

src/main/java/io/r2dbc/postgresql/PostgresqlRow.java

+57-1
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,30 @@ private <T> T decode(int index, Class<T> type) {
9090
int readerIndex = data.readerIndex();
9191
try {
9292
RowDescription.Field field = this.fields.get(index);
93-
return this.codecs.decode(data, field.getDataType(), field.getFormat(), type);
93+
94+
T decoded = this.context.getCodecs().decode(data, field.getDataType(), field.getFormat(), type);
95+
96+
return type.cast(postProcessResult(decoded));
97+
9498
} finally {
9599
data.readerIndex(readerIndex);
96100
}
97101
}
98102

103+
@Nullable
104+
private Object postProcessResult(@Nullable Object decoded) {
105+
106+
if (decoded instanceof RefCursor) {
107+
return createCursor((RefCursor) decoded);
108+
}
109+
110+
return decoded;
111+
}
112+
113+
private AttachedRefCursor createCursor(RefCursor decoded) {
114+
return new AttachedRefCursor(this.context, decoded.getCursorName());
115+
}
116+
99117
@Override
100118
public int hashCode() {
101119
return Objects.hash(this.fields);
@@ -163,4 +181,42 @@ private void requireNotReleased() {
163181
}
164182
}
165183

184+
/**
185+
* Default {@link RefCursor} implementation that is attached to a {@link io.r2dbc.postgresql.api.PostgresqlConnection}.
186+
*/
187+
static class AttachedRefCursor implements RefCursor {
188+
189+
private final ConnectionContext context;
190+
191+
private final String portal;
192+
193+
AttachedRefCursor(ConnectionContext context, String portal) {
194+
this.context = Assert.requireNonNull(context, "connection must not be null");
195+
this.portal = Assert.requireNotEmpty(portal, "portal must not be empty");
196+
}
197+
198+
@Override
199+
public String getCursorName() {
200+
return this.portal;
201+
}
202+
203+
@Override
204+
public Mono<io.r2dbc.postgresql.api.PostgresqlResult> fetch() {
205+
return Mono.fromDirect(this.context.getConnection().createStatement("FETCH ALL IN \"" + getCursorName() + "\"").execute());
206+
}
207+
208+
@Override
209+
public Mono<Void> close() {
210+
return this.context.getConnection().createStatement("CLOSE \"" + getCursorName() + "\"").execute().flatMap(PostgresqlResult::getRowsUpdated).then();
211+
}
212+
213+
@Override
214+
public String toString() {
215+
return "AttachedRefCursor{" +
216+
"portal='" + this.portal + '\'' +
217+
", context=" + this.context +
218+
'}';
219+
}
220+
}
221+
166222
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.api;
18+
19+
import io.r2dbc.spi.Closeable;
20+
import reactor.core.publisher.Mono;
21+
22+
/**
23+
* A ref cursor value object. Cursor objects can be attached to a {@link PostgresqlConnection} which allows interaction with the cursor object by {@link #fetch() fetching the cursor} and
24+
* {@link #close() closing} it.
25+
* Cursor objects are stateful resources. Obtaining a cursor typically keeps the cursor open on the database server. The cursor object can get invalidated if the transaction is closed. Cursor
26+
* objects should be {@link #close() closed} if they are no longer required.
27+
* Detached cursors that were instantiated without a connection may throw {@link UnsupportedOperationException} when attempting to fetch or close the cursor.
28+
*/
29+
public interface RefCursor extends Closeable {
30+
31+
/**
32+
* Return the ref cursor name (portal name).
33+
*
34+
* @return the ref cursor name (portal name).
35+
*/
36+
String getCursorName();
37+
38+
/**
39+
* Fetch the contents of the cursor using {@code FETCH ALL IN}.
40+
*
41+
* @return the {@link PostgresqlResult} associated with the ref cursor.
42+
*/
43+
Mono<PostgresqlResult> fetch();
44+
45+
/**
46+
* Close the cursor.
47+
*
48+
* @return a {@link Mono} terminating with success the cursor was closed.
49+
*/
50+
@Override
51+
Mono<Void> close();
52+
53+
}

src/main/java/io/r2dbc/postgresql/codec/DefaultCodecs.java

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public DefaultCodecs(ByteBufAllocator byteBufAllocator) {
8585

8686
new BlobCodec(byteBufAllocator),
8787
new ClobCodec(byteBufAllocator),
88+
RefCursorCodec.INSTANCE,
89+
RefCursorNameCodec.INSTANCE,
8890

8991
new ShortArrayCodec(byteBufAllocator),
9092
new StringArrayCodec(byteBufAllocator),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.codec;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.r2dbc.postgresql.api.PostgresqlResult;
21+
import io.r2dbc.postgresql.api.RefCursor;
22+
import io.r2dbc.postgresql.client.Parameter;
23+
import io.r2dbc.postgresql.message.Format;
24+
import io.r2dbc.postgresql.type.PostgresqlObjectId;
25+
import io.r2dbc.postgresql.util.Assert;
26+
import io.r2dbc.postgresql.util.ByteBufUtils;
27+
import reactor.core.publisher.Mono;
28+
import reactor.util.annotation.Nullable;
29+
30+
import static io.r2dbc.postgresql.type.PostgresqlObjectId.REF_CURSOR;
31+
32+
final class RefCursorCodec extends AbstractCodec<RefCursor> {
33+
34+
static final RefCursorCodec INSTANCE = new RefCursorCodec();
35+
36+
RefCursorCodec() {
37+
super(RefCursor.class);
38+
}
39+
40+
@Override
41+
public Parameter encodeNull() {
42+
throw new UnsupportedOperationException("RefCursor cannot be encoded");
43+
}
44+
45+
@Override
46+
boolean doCanDecode(PostgresqlObjectId type, Format format) {
47+
Assert.requireNonNull(format, "format must not be null");
48+
Assert.requireNonNull(type, "type must not be null");
49+
50+
return REF_CURSOR == type;
51+
}
52+
53+
@Override
54+
RefCursor doDecode(ByteBuf buffer, PostgresqlObjectId dataType, @Nullable Format format, @Nullable Class<? extends RefCursor> type) {
55+
Assert.requireNonNull(buffer, "byteBuf must not be null");
56+
57+
return new SimpleRefCursor(ByteBufUtils.decode(buffer));
58+
}
59+
60+
@Override
61+
Parameter doEncode(RefCursor value) {
62+
throw new UnsupportedOperationException("RefCursor cannot be encoded");
63+
}
64+
65+
static class SimpleRefCursor implements RefCursor {
66+
67+
private final String portal;
68+
69+
SimpleRefCursor(String portal) {
70+
this.portal = portal;
71+
}
72+
73+
@Override
74+
public String getCursorName() {
75+
return this.portal;
76+
}
77+
78+
@Override
79+
public Mono<PostgresqlResult> fetch() {
80+
throw new UnsupportedOperationException("Stateless RefCursor does not support fetch()");
81+
}
82+
83+
@Override
84+
public Mono<Void> close() {
85+
throw new UnsupportedOperationException("Stateless RefCursor does not support close()");
86+
}
87+
88+
@Override
89+
public String toString() {
90+
return "SimpleRefCursor{" +
91+
"portal='" + this.portal + '\'' +
92+
'}';
93+
}
94+
}
95+
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.codec;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.r2dbc.postgresql.client.Parameter;
21+
import io.r2dbc.postgresql.message.Format;
22+
import io.r2dbc.postgresql.type.PostgresqlObjectId;
23+
import io.r2dbc.postgresql.util.Assert;
24+
import io.r2dbc.postgresql.util.ByteBufUtils;
25+
import reactor.util.annotation.Nullable;
26+
27+
import static io.r2dbc.postgresql.type.PostgresqlObjectId.REF_CURSOR;
28+
29+
final class RefCursorNameCodec extends AbstractCodec<String> {
30+
31+
static final RefCursorNameCodec INSTANCE = new RefCursorNameCodec();
32+
33+
RefCursorNameCodec() {
34+
super(String.class);
35+
}
36+
37+
@Override
38+
public Parameter encodeNull() {
39+
throw new UnsupportedOperationException("Cannot encode RefCursor");
40+
}
41+
42+
@Override
43+
boolean doCanDecode(PostgresqlObjectId type, Format format) {
44+
Assert.requireNonNull(format, "format must not be null");
45+
Assert.requireNonNull(type, "type must not be null");
46+
47+
return REF_CURSOR == type;
48+
}
49+
50+
@Override
51+
String doDecode(ByteBuf buffer, PostgresqlObjectId dataType, @Nullable Format format, @Nullable Class<? extends String> type) {
52+
Assert.requireNonNull(buffer, "byteBuf must not be null");
53+
54+
return ByteBufUtils.decode(buffer);
55+
}
56+
57+
@Override
58+
Parameter doEncode(String value) {
59+
throw new UnsupportedOperationException("Cannot encode RefCursor");
60+
}
61+
62+
}

src/main/java/io/r2dbc/postgresql/codec/StringCodec.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

src/test/java/io/r2dbc/postgresql/AbstractIntegrationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ public abstract class AbstractIntegrationTests {
3131
@RegisterExtension
3232
public static final PostgresqlServerExtension SERVER = new PostgresqlServerExtension();
3333

34-
PostgresqlConnectionFactory connectionFactory;
34+
public PostgresqlConnectionFactory connectionFactory;
3535

36-
PostgresqlConnection connection;
36+
public PostgresqlConnection connection;
3737

3838
/**
3939
* Entry-point to obtain a {@link PostgresqlConnectionFactory}.

0 commit comments

Comments
 (0)