Skip to content

Commit 3dc1638

Browse files
committed
Add Logical Decoding API
We now support consumption of replication streams through logical and physical decoding. The API exposes functionality to create replication slots and to consume a replication stream. ReplicationSlotRequest request = ReplicationSlotRequest.logical().slotName("my_slot").outputPlugin("test_decoding").temporary().build(); Mono<ReplicationSlot> replicationSlot = replicationConnection.createSlot(request); Mono<ReplicationStream> replicationStream = replicationConnection.startReplication(replicationRequest); // Later Flux<…> replicationMessages = replicationStream.map(byteBuf -> …); Mono<Void> close = replicationStream.close(); [resolves #18][resolves #179]
1 parent c682bd6 commit 3dc1638

27 files changed

+2080
-61
lines changed

README.md

+46-4
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ This driver provides the following features:
1010
* TLS
1111
* Explicit transactions
1212
* Notifications
13+
* Logical Decode
1314
* Binary data transfer
1415
* Execution of prepared statements with bindings
1516
* Execution of batch statements without bindings
1617
* Binary data transfer
1718
* Read and write support for all data types except LOB types (e.g. `BLOB`, `CLOB`)
18-
* Extension points to register `Codec`s to handle additional Postgres data types
19+
* Extension points to register `Codec`s to handle additional PostgreSQL data types
1920

2021
Next steps:
2122

@@ -160,7 +161,7 @@ If you'd rather like the latest snapshots of the upcoming major version, use our
160161

161162
## Listen/Notify
162163

163-
Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same Postgres database.
164+
Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same PostgreSQL database.
164165
For Listen/Notify, two actors are involved: The sender (notify) and the receiver (listen). The following example uses two connections
165166
to illustrate how they work together:
166167

@@ -184,7 +185,7 @@ The second connection broadcasts a notification to the `mymessage` channel upon
184185

185186
## JSON/JSONB support
186187

187-
Postgres supports JSON by storing values in `JSON`/`JSONB` columns. These values can be consumed and written using the regular R2DBC SPI and by using driver-specific extensions with the `io.r2dbc.postgresql.codec.Json` type.
188+
PostgreSQL supports JSON by storing values in `JSON`/`JSONB` columns. These values can be consumed and written using the regular R2DBC SPI and by using driver-specific extensions with the `io.r2dbc.postgresql.codec.Json` type.
188189

189190
You can choose from two approaches:
190191

@@ -238,6 +239,47 @@ The following types are supported for JSON exchange:
238239
* `String`
239240
* `InputStream` (must be closed after usage to avoid memory leaks)
240241

242+
## Logical Decode
243+
244+
PostgreSQL allows replication streaming and decoding persistent changes to a database's tables into useful chunks of data.
245+
In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.
246+
247+
Consuming the replication stream is a four-step process:
248+
249+
1. Obtain a replication connection via `PostgresqlConnectionFactory.replication()`.
250+
2. Create a replication slot (physical/logical).
251+
3. Initiate replication using the replication slot.
252+
4. Once the replication stream is set up, you can consume and map the binary data using `ReplicationStream.map(…)`.
253+
254+
On application shutdown, `close()` the `ReplicationStream`.
255+
256+
Note that a connection is busy once the replication is active and a connection can have at most one active replication stream.
257+
258+
```java
259+
260+
Mono<PostgresqlReplicationConnection> replicationMono = connectionFactory.replication();
261+
262+
// later:
263+
ReplicationSlotRequest request = ReplicationSlotRequest.logical()
264+
.slotName("my_slot")
265+
.outputPlugin("test_decoding")
266+
.temporary()
267+
.build();
268+
Mono<ReplicationSlot> createSlot = replicationConnection.createSlot(request);
269+
270+
ReplicationRequest replicationRequest = ReplicationRequest.logical()
271+
.slotName("my_slot")
272+
.startPosition(LogSequenceNumber.valueOf(0))
273+
.slotOption("skip-empty-xacts", true)
274+
.slotOption("include-xids", false)
275+
.build();
276+
277+
Flux<T> replicationStream = replicationConnection.startReplication(replicationRequest).flatMapMany(it -> {
278+
return it.map(byteBuf -> {…})
279+
.doOnError(t -> it.close().subscribe());
280+
});
281+
```
282+
241283
## Data Type Mapping
242284

243285
This reference table shows the type mapping between [PostgreSQL][p] and Java data types:
@@ -360,7 +402,7 @@ Support for the following single-dimensional arrays (read and write):
360402
## Extension mechanism
361403
This driver accepts the following extensions:
362404

363-
* `CodecRegistrar` to contribute `Codec`s for Postgres ObjectIDs.
405+
* `CodecRegistrar` to contribute `Codec`s for PostgreSQL ObjectIDs.
364406

365407
Extensions can be registered programmatically using `PostgresConnectionConfiguration` or discovered using Java's `ServiceLoader` mechanism (from `META-INF/services/io.r2dbc.postgresql.extension.Extension`).
366408

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.api.PostgresqlConnectionMetadata;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.message.backend.BackendMessage;
22+
import io.r2dbc.postgresql.message.backend.EmptyQueryResponse;
23+
import io.r2dbc.postgresql.message.backend.ErrorResponse;
24+
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
25+
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
26+
import io.r2dbc.postgresql.message.frontend.Query;
27+
import io.r2dbc.postgresql.replication.LogSequenceNumber;
28+
import io.r2dbc.postgresql.replication.ReplicationRequest;
29+
import io.r2dbc.postgresql.replication.ReplicationSlot;
30+
import io.r2dbc.postgresql.replication.ReplicationSlotRequest;
31+
import io.r2dbc.postgresql.replication.ReplicationStream;
32+
import io.r2dbc.postgresql.util.Assert;
33+
import io.r2dbc.spi.Row;
34+
import reactor.core.publisher.EmitterProcessor;
35+
import reactor.core.publisher.Mono;
36+
37+
import java.util.function.Predicate;
38+
39+
import static io.r2dbc.postgresql.util.PredicateUtils.or;
40+
41+
/**
42+
* Postgres replication connection.
43+
*/
44+
final class DefaultPostgresqlReplicationConnection implements io.r2dbc.postgresql.api.PostgresqlReplicationConnection {
45+
46+
private static final Predicate<BackendMessage> WINDOW_UNTIL = or(ReadyForQuery.class::isInstance, EmptyQueryResponse.class::isInstance, ErrorResponse.class::isInstance);
47+
48+
private final PostgresqlConnection delegate;
49+
50+
private final Client client;
51+
52+
DefaultPostgresqlReplicationConnection(PostgresqlConnection delegate) {
53+
this.delegate = delegate;
54+
this.client = delegate.getClient();
55+
}
56+
57+
@Override
58+
public Mono<Void> close() {
59+
return this.delegate.close();
60+
}
61+
62+
@Override
63+
public Mono<ReplicationSlot> createSlot(ReplicationSlotRequest request) {
64+
65+
Assert.requireNonNull(request, "request must not be null");
66+
67+
return this.delegate.createStatement(request.asSQL()).execute().flatMap(it -> {
68+
69+
return it.map((row, rowMetadata) -> getReplicationSlot(request, row));
70+
}).last();
71+
}
72+
73+
private static ReplicationSlot getReplicationSlot(ReplicationSlotRequest request, Row row) {
74+
return new ReplicationSlot(
75+
getString(row, "slot_name"),
76+
request.getReplicationType(),
77+
LogSequenceNumber.valueOf(getString(row, "consistent_point")),
78+
row.get("snapshot_name", String.class),
79+
row.get("output_plugin", String.class));
80+
}
81+
82+
@Override
83+
public Mono<ReplicationStream> startReplication(ReplicationRequest request) {
84+
85+
Assert.requireNonNull(request, "request must not be null");
86+
87+
String sql = request.asSQL();
88+
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
89+
90+
EmitterProcessor<FrontendMessage> requestProcessor = EmitterProcessor.create();
91+
92+
return Mono.fromDirect(this.client.exchange(requestProcessor.startWith(new Query(sql)))
93+
.handle(exceptionFactory::handleErrorResponse)
94+
.windowUntil(WINDOW_UNTIL)
95+
.map(messages -> {
96+
return new PostgresReplicationStream(this.client.getByteBufAllocator(), request, requestProcessor, messages);
97+
}));
98+
}
99+
100+
@Override
101+
public PostgresqlConnectionMetadata getMetadata() {
102+
return this.delegate.getMetadata();
103+
}
104+
105+
private static String getString(Row row, String column) {
106+
String value = row.get(column, String.class);
107+
if (value == null) {
108+
throw new IllegalStateException(String.format("No value found for column %s", column));
109+
}
110+
return value;
111+
}
112+
113+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
21+
import io.r2dbc.postgresql.message.frontend.CopyData;
22+
import io.r2dbc.postgresql.replication.LogSequenceNumber;
23+
24+
/**
25+
* Nested message for a Replication Stream using {@link CopyData} as carrier.
26+
*/
27+
final class KeepAliveMessage {
28+
29+
private static final byte KEEP_ALIVE_REPLY = 'r';
30+
31+
private static final int NO_REPLY_REQUIRED = 0;
32+
33+
private static final int REPLY_REQUIRED = 1;
34+
35+
private final LogSequenceNumber received;
36+
37+
private final LogSequenceNumber flushed;
38+
39+
private final LogSequenceNumber applied;
40+
41+
private final long systemClock;
42+
43+
private boolean replyRequired;
44+
45+
KeepAliveMessage(LogSequenceNumber received, LogSequenceNumber flushed, LogSequenceNumber applied, long systemClock, boolean replyRequired) {
46+
this.received = received;
47+
this.flushed = flushed;
48+
this.applied = applied;
49+
this.systemClock = systemClock;
50+
this.replyRequired = replyRequired;
51+
}
52+
53+
public ByteBuf encode(ByteBufAllocator allocator) {
54+
55+
ByteBuf out = allocator.buffer(34);
56+
57+
out.writeByte(KEEP_ALIVE_REPLY);
58+
out.writeLong(this.received.asLong());
59+
out.writeLong(this.flushed.asLong());
60+
out.writeLong(this.applied.asLong());
61+
out.writeLong(this.systemClock);
62+
63+
if (this.replyRequired) {
64+
out.writeByte((byte) REPLY_REQUIRED);
65+
} else {
66+
out.writeByte(this.received == LogSequenceNumber.INVALID_LSN ? (byte) REPLY_REQUIRED : (byte) NO_REPLY_REQUIRED);
67+
}
68+
69+
return out;
70+
}
71+
}

0 commit comments

Comments
 (0)