Skip to content

Commit 6d2c1a8

Browse files
committed
Polishing.
Extract builder API to enable flexibility in providing copy data. Add safeguards to terminate copy data in cases of cancellation or errors. Reorder methods. [resolves #500][#183] Signed-off-by: Mark Paluch <[email protected]>
1 parent 1f4c436 commit 6d2c1a8

9 files changed

+274
-109
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package io.r2dbc.postgresql;
1818

19-
import io.netty.buffer.ByteBuf;
19+
import io.r2dbc.postgresql.api.CopyInBuilder;
2020
import io.r2dbc.postgresql.api.ErrorDetails;
2121
import io.r2dbc.postgresql.api.Notification;
2222
import io.r2dbc.postgresql.api.PostgresTransactionDefinition;
@@ -50,7 +50,6 @@
5050
import reactor.util.Loggers;
5151
import reactor.util.annotation.Nullable;
5252

53-
import java.nio.ByteBuffer;
5453
import java.time.Duration;
5554
import java.util.concurrent.atomic.AtomicReference;
5655
import java.util.function.Function;
@@ -215,6 +214,11 @@ public Mono<Void> commitTransaction() {
215214
});
216215
}
217216

217+
@Override
218+
public CopyInBuilder copyIn(String sql) {
219+
return new PostgresqlCopyIn.Builder(this.resources, sql);
220+
}
221+
218222
@Override
219223
public PostgresqlBatch createBatch() {
220224
return new PostgresqlBatch(this.resources);
@@ -408,11 +412,6 @@ public void onComplete() {
408412
});
409413
}
410414

411-
@Override
412-
public Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin) {
413-
return new PostgresqlCopyIn(resources).copy(sql, stdin);
414-
}
415-
416415
private static Function<TransactionStatus, String> getTransactionIsolationLevelQuery(IsolationLevel isolationLevel) {
417416
return transactionStatus -> {
418417
if (transactionStatus == OPEN) {

src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java

+93-35
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,27 @@
1717
package io.r2dbc.postgresql;
1818

1919
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.CompositeByteBuf;
2021
import io.netty.util.ReferenceCountUtil;
2122
import io.netty.util.ReferenceCounted;
23+
import io.r2dbc.postgresql.api.CopyInBuilder;
2224
import io.r2dbc.postgresql.client.Client;
23-
import io.r2dbc.postgresql.message.backend.BackendMessage;
24-
import io.r2dbc.postgresql.message.backend.CommandComplete;
25-
import io.r2dbc.postgresql.message.backend.CopyInResponse;
25+
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2626
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2727
import io.r2dbc.postgresql.message.frontend.CopyData;
2828
import io.r2dbc.postgresql.message.frontend.CopyDone;
2929
import io.r2dbc.postgresql.message.frontend.CopyFail;
30+
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
3031
import io.r2dbc.postgresql.message.frontend.Query;
3132
import io.r2dbc.postgresql.util.Assert;
3233
import io.r2dbc.postgresql.util.Operators;
3334
import org.reactivestreams.Publisher;
3435
import reactor.core.publisher.Flux;
3536
import reactor.core.publisher.Mono;
37+
import reactor.core.publisher.Sinks;
38+
import reactor.util.annotation.Nullable;
39+
40+
import java.util.concurrent.atomic.AtomicBoolean;
3641

3742
import static io.r2dbc.postgresql.PostgresqlResult.toResult;
3843

@@ -43,49 +48,70 @@ final class PostgresqlCopyIn {
4348

4449
private final ConnectionResources context;
4550

46-
PostgresqlCopyIn(ConnectionResources context) {
47-
this.context = Assert.requireNonNull(context, "context must not be null");
51+
PostgresqlCopyIn(ConnectionResources resources) {
52+
this.context = Assert.requireNonNull(resources, "resources must not be null");
4853
}
4954

50-
Mono<Long> copy(String sql, Publisher<ByteBuf> stdin) {
55+
Mono<Long> copy(String sql, Publisher<? extends Publisher<ByteBuf>> stdin) {
56+
57+
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
58+
5159
return Flux.from(stdin)
52-
.map(CopyData::new)
53-
.as(messages -> copyIn(sql, messages));
54-
}
60+
.<FrontendMessage>concatMap(data -> {
5561

56-
private Mono<Long> copyIn(String sql, Flux<CopyData> copyDataMessages) {
57-
Client client = context.getClient();
62+
CompositeByteBuf composite = this.context.getClient().getByteBufAllocator().compositeBuffer();
5863

59-
Flux<BackendMessage> backendMessages = copyDataMessages
60-
.doOnNext(client::send)
61-
.doOnError((e) -> sendCopyFail(e.getMessage()))
62-
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)
63-
.thenMany(client.exchange(Mono.just(CopyDone.INSTANCE)));
64+
return Flux.from(data)
65+
.reduce(composite, (l, r) -> l.addComponent(true, r))
66+
.map(CopyData::new)
67+
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
6468

65-
return startCopy(sql)
66-
.concatWith(backendMessages)
67-
.doOnCancel(() -> sendCopyFail("Cancelled"))
68-
.as(Operators::discardOnCancel)
69-
.as(messages -> toResult(context, messages, ExceptionFactory.INSTANCE).getRowsUpdated());
69+
}).concatWithValues(CopyDone.INSTANCE).startWith(new Query(sql))
70+
.as(messages -> copyIn(exceptionFactory, messages));
7071
}
7172

72-
private Flux<BackendMessage> startCopy(String sql) {
73-
return context.getClient().exchange(
74-
// ReadyForQuery is returned when an invalid query is provided
75-
backendMessage -> backendMessage instanceof CopyInResponse || backendMessage instanceof ReadyForQuery,
76-
Mono.just(new Query(sql))
77-
)
78-
.doOnNext(message -> {
79-
if (message instanceof CommandComplete) {
80-
throw new IllegalArgumentException("Copy from stdin query expected, sql='" + sql + "', message=" + message);
73+
private Mono<Long> copyIn(ExceptionFactory exceptionFactory, Flux<FrontendMessage> copyDataMessages) {
74+
75+
Client client = this.context.getClient();
76+
AtomicBoolean stop = new AtomicBoolean();
77+
Sinks.Many<FrontendMessage> sink = Sinks.many().unicast().onBackpressureBuffer();
78+
Flux<FrontendMessage> requestMessages = sink.asFlux().mergeWith(copyDataMessages
79+
.doOnComplete(sink::tryEmitComplete)
80+
.filter(it -> !stop.get())
81+
.onErrorResume(e -> {
82+
copyFail(sink, stop, "Copy operation failed: " + e.getMessage());
83+
return Mono.empty();
84+
}));
85+
86+
return client.exchange(backendMessage -> backendMessage instanceof ReadyForQuery, requestMessages)
87+
.doOnNext(it -> {
88+
if (it instanceof ErrorResponse) {
89+
stop.set(true);
90+
sink.tryEmitComplete();
8191
}
82-
});
92+
})
93+
.doOnComplete(() -> {
94+
stop.set(true);
95+
sink.tryEmitComplete();
96+
})
97+
.doOnError((e) -> {
98+
copyFail(sink, stop, "Copy operation failed: " + e.getMessage());
99+
})
100+
.doOnCancel(() -> {
101+
copyFail(sink, stop, "Copy operation failed: Cancelled");
102+
})
103+
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)
104+
.as(Operators::discardOnCancel)
105+
.doOnCancel(() -> {
106+
copyFail(sink, stop, "Copy operation failed: Cancelled");
107+
})
108+
.as(messages -> toResult(this.context, messages, exceptionFactory).getRowsUpdated());
83109
}
84110

85-
private void sendCopyFail(String message) {
86-
context.getClient().exchange(Mono.just(new CopyFail("Copy operation failed: " + message)))
87-
.as(Operators::discardOnCancel)
88-
.subscribe();
111+
private void copyFail(Sinks.Many<FrontendMessage> sink, AtomicBoolean stop, String e) {
112+
sink.tryEmitNext(new CopyFail(e));
113+
sink.tryEmitComplete();
114+
stop.set(true);
89115
}
90116

91117
@Override
@@ -95,4 +121,36 @@ public String toString() {
95121
'}';
96122
}
97123

124+
static final class Builder implements CopyInBuilder {
125+
126+
private final ConnectionResources resources;
127+
128+
private final String sql;
129+
130+
@Nullable
131+
private Publisher<? extends Publisher<ByteBuf>> stdin;
132+
133+
Builder(ConnectionResources resources, String sql) {
134+
this.resources = resources;
135+
this.sql = sql;
136+
}
137+
138+
@Override
139+
public CopyInBuilder fromMany(Publisher<? extends Publisher<ByteBuf>> stdin) {
140+
this.stdin = Assert.requireNonNull(stdin, "stdin must not be null");
141+
return this;
142+
}
143+
144+
@Override
145+
public Mono<Long> build() {
146+
147+
if (this.stdin == null) {
148+
throw new IllegalArgumentException("No stdin configured for COPY IN");
149+
}
150+
151+
return new PostgresqlCopyIn(this.resources).copy(this.sql, this.stdin);
152+
}
153+
154+
}
155+
98156
}

src/main/java/io/r2dbc/postgresql/PostgresqlResult.java

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public Mono<Long> getRowsUpdated() {
8282
sink.next(rowCount);
8383
}
8484
}
85+
8586
}).collectList().handle((list, sink) -> {
8687

8788
if (list.isEmpty()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.api;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.Unpooled;
21+
import io.r2dbc.postgresql.message.frontend.CopyData;
22+
import org.reactivestreams.Publisher;
23+
import reactor.core.publisher.Mono;
24+
25+
import java.nio.ByteBuffer;
26+
27+
/**
28+
* Interface specifying a builder contract to configure a {@code COPY FROM STDIN} operation.
29+
*
30+
* @since 1.0
31+
*/
32+
public interface CopyInBuilder {
33+
34+
/**
35+
* Postgres parse limit for large messages {@code 2^30 - 1} bytes.
36+
*/
37+
int MAX_FRAME_SIZE = 0x3fffffff - 1;
38+
39+
/**
40+
* Configure a {@link Publisher} emitting publishers of buffers that to write data to a {@link CopyData} frame per emitted publisher.
41+
* This method allows controlling flush behavior and chunking of buffers. The provided stream must ensure to not exceed size limits ({@link #MAX_FRAME_SIZE}) of the {@link CopyData} frame.
42+
* <p>If a provided publisher terminates with an error signal then the copy operation terminates with a failure and gets cancelled on the server.
43+
*
44+
* @param stdin the bytes to write to a {@link CopyData} frame.
45+
* @return {@code this} {@link CopyInBuilder builder}.
46+
*/
47+
CopyInBuilder fromMany(Publisher<? extends Publisher<ByteBuf>> stdin);
48+
49+
/**
50+
* Configure a {@link Publisher} emitting buffers that are written to a single {@link CopyData} frame.
51+
* If the total amount of data to be written exceeds the copy frame size limitation ({@link #MAX_FRAME_SIZE}), then use {@link #fromMany(Publisher)} to split up the input data to many
52+
* {@link CopyData} frames.
53+
* <p>If the provided publisher terminates with an error signal then the copy operation terminates with a failure and gets cancelled on the server.
54+
*
55+
* @param stdin the bytes to write to a {@link CopyData} frame.
56+
* @return {@code this} {@link CopyInBuilder builder}.
57+
*/
58+
default CopyInBuilder from(Publisher<ByteBuf> stdin) {
59+
return fromMany(Mono.just(stdin));
60+
}
61+
62+
/**
63+
* Configure an input buffer that is written to a single {@link CopyData} frame.
64+
*
65+
* @param stdin the bytes to write to a {@link CopyData} frame.
66+
* @return {@code this} {@link CopyInBuilder builder}.
67+
*/
68+
default CopyInBuilder from(ByteBuf stdin) {
69+
return from(Mono.just(stdin));
70+
}
71+
72+
/**
73+
* Configure an input buffer that is written to a single {@link CopyData} frame.
74+
*
75+
* @param stdin the bytes to write to a {@link CopyData} frame.
76+
* @return {@code this} {@link CopyInBuilder builder}.
77+
*/
78+
default CopyInBuilder from(ByteBuffer stdin) {
79+
return from(Unpooled.wrappedBuffer(stdin));
80+
}
81+
82+
/**
83+
* Configure an input buffer that is written to a single {@link CopyData} frame.
84+
*
85+
* @param stdin the bytes to write to a {@link CopyData} frame.
86+
* @return {@code this} {@link CopyInBuilder builder}.
87+
*/
88+
default CopyInBuilder from(byte[] stdin) {
89+
return from(Unpooled.wrappedBuffer(stdin));
90+
}
91+
92+
/**
93+
* Configure an input buffer along with {@code offset} and {@code length} whose specified chunk is written to a single {@link CopyData} frame.
94+
*
95+
* @param stdin the bytes to write to a {@link CopyData} frame.
96+
* @param offset the start offset in the data.
97+
* @param length the number of bytes to write.
98+
* @return {@code this} {@link CopyInBuilder builder}.
99+
*/
100+
default CopyInBuilder from(byte[] stdin, int offset, int length) {
101+
return from(Unpooled.wrappedBuffer(stdin, offset, length));
102+
}
103+
104+
/**
105+
* Build the final publisher that initiates the {@code COPY} operation. The copy data messages sent to the server are triggered by the provided input buffer.
106+
* Cancelling the copy operation sends a failure frame to the server to terminate the copy operation with an error.
107+
*
108+
* @return the publisher that initiates the {@code COPY} operation upon subscription.
109+
*/
110+
Mono<Long> build();
111+
112+
}

src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java

+22-9
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,28 @@ public interface PostgresqlConnection extends Connection {
6161
@Override
6262
Mono<Void> commitTransaction();
6363

64+
/**
65+
* Obtain a {@link CopyInBuilder} to configure a {@code COPY FROM STDIN} operation for very fast copying into a database table.
66+
*
67+
* @param sql the COPY … FROM STDIN sql statement
68+
* @return the builder to configure the copy operation.
69+
* @since 1.0
70+
*/
71+
CopyInBuilder copyIn(String sql);
72+
73+
/**
74+
* Use {@code COPY FROM STDIN} for very fast copying into a database table.
75+
*
76+
* @param sql the COPY … FROM STDIN sql statement
77+
* @param stdin the ByteBuf publisher
78+
* @return a {@link Mono} with the amount of rows inserted
79+
* @see CopyInBuilder
80+
* @since 1.0
81+
*/
82+
default Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin) {
83+
return copyIn(sql).from(stdin).build();
84+
}
85+
6486
/**
6587
* {@inheritDoc}
6688
*/
@@ -172,13 +194,4 @@ public interface PostgresqlConnection extends Connection {
172194
@Override
173195
Mono<Boolean> validate(ValidationDepth depth);
174196

175-
/**
176-
* Use COPY FROM STDIN for very fast copying into a database table.
177-
*
178-
* @param sql the COPY … FROM STDIN sql statement
179-
* @param stdin the ByteBuf publisher
180-
* @return a {@link Mono} with the amount of rows inserted
181-
*/
182-
Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin);
183-
184197
}

0 commit comments

Comments
 (0)