From dc0be6fbc16b7ca08a275644b4d428d351f3ae1e Mon Sep 17 00:00:00 2001 From: Arjan Schouten Date: Fri, 18 Mar 2022 09:52:35 +0100 Subject: [PATCH 1/3] Wip/add copy in (#1) * feat: add copy in support for r2dbc postgresql driver. --- .../io/r2dbc/postgresql/CopyInBenchmarks.java | 149 +++++++++++++++ .../postgresql/PostgresqlConnection.java | 6 + .../io/r2dbc/postgresql/PostgresqlCopyIn.java | 101 ++++++++++ .../postgresql/api/PostgresqlConnection.java | 11 ++ .../PostgresqlConnectionUnitTests.java | 25 +++ .../PostgresqlCopyInIntegrationTests.java | 175 +++++++++++++++++ .../postgresql/PostgresqlCopyInUnitTests.java | 178 ++++++++++++++++++ .../api/MockPostgresqlConnection.java | 7 + .../r2dbc/postgresql/client/TestClient.java | 6 +- 9 files changed, 655 insertions(+), 3 deletions(-) create mode 100644 src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java create mode 100644 src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java create mode 100644 src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java create mode 100644 src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java diff --git a/src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java b/src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java new file mode 100644 index 00000000..aaff14b7 --- /dev/null +++ b/src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java @@ -0,0 +1,149 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.r2dbc.postgresql; + +import io.r2dbc.postgresql.api.PostgresqlConnection; +import io.r2dbc.postgresql.util.PostgresqlServerExtension; +import org.junit.platform.commons.annotation.Testable; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.postgresql.copy.CopyManager; +import org.postgresql.jdbc.PgConnection; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import reactor.core.publisher.Flux; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +/** + * Benchmarks for Copy operation. Contains the following execution methods: + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Testable +public class CopyInBenchmarks extends BenchmarkSettings { + + private static PostgresqlServerExtension extension = new PostgresqlServerExtension(); + + @State(Scope.Benchmark) + public static class ConnectionHolder { + + @Param({"0", "1", "100", "1000000"}) + int rows; + + final PgConnection jdbc; + + final CopyManager copyManager; + + final PostgresqlConnection r2dbc; + + Path csvFile; + + public ConnectionHolder() { + + extension.initialize(); + try { + jdbc = extension.getDataSource().getConnection() + .unwrap(PgConnection.class); + copyManager = jdbc.getCopyAPI(); + Statement statement = jdbc.createStatement(); + + try { + statement.execute("DROP TABLE IF EXISTS simple_test"); + } catch (SQLException e) { + } + + statement.execute("CREATE TABLE simple_test (name VARCHAR(255), age int)"); + + jdbc.setAutoCommit(false); + + r2dbc = new PostgresqlConnectionFactory(extension.getConnectionConfiguration()).create().block(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Setup(Level.Trial) + public void doSetup() throws IOException { + csvFile = Files.createTempFile("jmh-input", ".csv"); + + try (OutputStream outputStream = new FileOutputStream(csvFile.toFile())) { + IntStream.range(0, rows) + .mapToObj(i -> "some-input" + i + ";" + i + "\n") + .forEach(row -> { + try { + outputStream.write(row.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + } + + @TearDown(Level.Trial) + public void doTearDown() throws IOException { + Files.delete(csvFile); + } + + } + + @Benchmark + public void copyInR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) { + int bufferSize = 65536; // BufferSize is the same as the one from JDBC's CopyManager + Flux input = DataBufferUtils.read(connectionHolder.csvFile, DefaultDataBufferFactory.sharedInstance, bufferSize, StandardOpenOption.READ) + .map(DataBuffer::asByteBuffer); + + Long rowsInserted = connectionHolder.r2dbc.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", input) + .block(); + + voodoo.consume(rowsInserted); + } + + @Benchmark + public void copyInJdbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws IOException, SQLException { + try (InputStream inputStream = new FileInputStream(connectionHolder.csvFile.toFile())) { + + Long rowsInserted = connectionHolder.copyManager.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", inputStream); + + voodoo.consume(rowsInserted); + } + } + +} diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index f6cf0834..e4dea595 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -49,6 +49,7 @@ import reactor.util.Loggers; import reactor.util.annotation.Nullable; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -406,6 +407,11 @@ public void onComplete() { }); } + @Override + public Mono copyIn(String sql, Publisher stdin) { + return new PostgresqlCopyIn(resources).copy(sql, stdin); + } + private static Function getTransactionIsolationLevelQuery(IsolationLevel isolationLevel) { return transactionStatus -> { if (transactionStatus == OPEN) { diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java b/src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java new file mode 100644 index 00000000..b8bcbcbe --- /dev/null +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java @@ -0,0 +1,101 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.r2dbc.postgresql; + +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; +import io.r2dbc.postgresql.client.Client; +import io.r2dbc.postgresql.message.backend.BackendMessage; +import io.r2dbc.postgresql.message.backend.CommandComplete; +import io.r2dbc.postgresql.message.backend.CopyInResponse; +import io.r2dbc.postgresql.message.backend.ReadyForQuery; +import io.r2dbc.postgresql.message.frontend.CopyData; +import io.r2dbc.postgresql.message.frontend.CopyDone; +import io.r2dbc.postgresql.message.frontend.CopyFail; +import io.r2dbc.postgresql.message.frontend.FrontendMessage; +import io.r2dbc.postgresql.message.frontend.Query; +import io.r2dbc.postgresql.util.Assert; +import io.r2dbc.postgresql.util.Operators; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; + +import static io.r2dbc.postgresql.PostgresqlResult.toResult; + +/** + * An implementation for {@link CopyData} PostgreSQL queries. + */ +final class PostgresqlCopyIn { + + private final ConnectionResources context; + + PostgresqlCopyIn(ConnectionResources context) { + this.context = Assert.requireNonNull(context, "context must not be null"); + } + + Mono copy(String sql, Publisher stdin) { + return Flux.from(stdin) + .map(buffer -> new CopyData(Unpooled.wrappedBuffer(buffer))) + .as(messages -> copyIn(sql, messages)); + } + + private Mono copyIn(String sql, Flux copyDataMessages) { + Client client = context.getClient(); + + Flux backendMessages = copyDataMessages + .doOnNext(client::send) + .doOnError(e -> !(e instanceof IllegalArgumentException), (e) -> sendCopyFail(e.getMessage())) + .doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release) + .thenMany(client.exchange(Mono.just(CopyDone.INSTANCE))); + + return startCopy(sql) + .concatWith(backendMessages) + .doOnCancel(() -> sendCopyFail("Cancelled")) + .as(Operators::discardOnCancel) + .as(messages -> toResult(context, messages, ExceptionFactory.INSTANCE).getRowsUpdated()); + } + + private Flux startCopy(String sql) { + return context.getClient().exchange( + // ReadyForQuery is returned when an invalid query is provided + backendMessage -> backendMessage instanceof CopyInResponse || backendMessage instanceof ReadyForQuery, + Mono.just(new Query(sql)) + ) + .doOnNext(message -> { + if (message instanceof CommandComplete) { + throw new IllegalArgumentException("Copy from stdin query expected, sql='" + sql + "', message=" + message); + } + }); + } + + private void sendCopyFail(String message) { + context.getClient().exchange(Mono.just(new CopyFail("Copy operation failed: " + message))) + .as(Operators::discardOnCancel) + .subscribe(); + } + + @Override + public String toString() { + return "PostgresqlCopyIn{" + + "context=" + this.context + + '}'; + } + +} diff --git a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java index 6b820a4e..29ed35d9 100644 --- a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java @@ -22,10 +22,12 @@ import io.r2dbc.spi.R2dbcNonTransientResourceException; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; /** @@ -170,4 +172,13 @@ public interface PostgresqlConnection extends Connection { @Override Mono validate(ValidationDepth depth); + /** + * Copy bulk data from client into a PostgreSQL table very fast. + * + * @param sql the COPY sql statement + * @param stdin the ByteBuffer publisher + * @return a {@link Mono} with the amount of rows inserted + */ + Mono copyIn(String sql, Publisher stdin); + } diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java index 68be9062..41cd1012 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlConnectionUnitTests.java @@ -21,13 +21,18 @@ import io.r2dbc.postgresql.client.TestClient; import io.r2dbc.postgresql.client.Version; import io.r2dbc.postgresql.codec.MockCodecs; +import io.r2dbc.postgresql.message.Format; import io.r2dbc.postgresql.message.backend.CommandComplete; +import io.r2dbc.postgresql.message.backend.CopyInResponse; import io.r2dbc.postgresql.message.backend.ErrorResponse; +import io.r2dbc.postgresql.message.backend.ReadyForQuery; +import io.r2dbc.postgresql.message.frontend.CopyDone; import io.r2dbc.postgresql.message.frontend.Query; import io.r2dbc.postgresql.message.frontend.Terminate; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.R2dbcNonTransientResourceException; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import java.time.Duration; @@ -38,6 +43,7 @@ import static io.r2dbc.postgresql.client.TransactionStatus.IDLE; import static io.r2dbc.postgresql.client.TransactionStatus.OPEN; import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED; +import static java.util.Collections.emptySet; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.mockito.Mockito.RETURNS_SMART_NULLS; @@ -502,6 +508,25 @@ void setTransactionIsolationLevelNonOpen() { .verifyComplete(); } + @Test + void copyIn() { + Client client = TestClient.builder() + .transactionStatus(IDLE) + .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) + .expectRequest(CopyDone.INSTANCE).thenRespond( + new CommandComplete("cmd", 1, 0), + new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE) + ) + .build(); + + PostgresqlConnection connection = createConnection(client, MockCodecs.empty(), this.statementCache); + + connection.copyIn("some-sql", Flux.empty()) + .as(StepVerifier::create) + .expectNext(0L) + .verifyComplete(); + } + @Test void setStatementTimeout() { Client client = TestClient.builder() diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java new file mode 100644 index 00000000..130d4d86 --- /dev/null +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java @@ -0,0 +1,175 @@ +/* + * Copyright 2019-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.r2dbc.postgresql; + +import io.r2dbc.postgresql.ExceptionFactory.PostgresqlBadGrammarException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.jdbc.core.JdbcOperations; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for {@link PostgresqlCopyIn}. + */ +class PostgresqlCopyInIntegrationTests extends AbstractIntegrationTests { + + @BeforeEach + void setUp() { + super.setUp(); + getJdbcOperations().execute("DROP TABLE IF EXISTS test"); + getJdbcOperations().execute("CREATE TABLE test (id SERIAL PRIMARY KEY, val VARCHAR(255), timestamp TIMESTAMP)"); + } + + @AfterEach + void tearDown() { + super.tearDown(); + getJdbcOperations().execute("DROP TABLE IF EXISTS test"); + } + + private JdbcOperations getJdbcOperations() { + return SERVER.getJdbcOperations(); + } + + @Override + protected void customize(PostgresqlConnectionConfiguration.Builder builder) { + builder.preparedStatementCacheQueries(2); + } + + @Test + void shouldCopyDataIntoTable() { + String sql = "COPY test (val) FROM STDIN"; + + Flux data = Flux.just( + ByteBuffer.wrap("d\n".getBytes()), + ByteBuffer.wrap("d\n".getBytes()), + ByteBuffer.wrap("e\n".getBytes()) + ); + + this.connection.copyIn(sql, data) + .as(StepVerifier::create) + .expectNext(3L) + .verifyComplete(); + + // Verify the connection is no longer in COPY-IN mode and verify data is copied into the table. + verifyItemsInserted(asList("d", "d", "e")); + } + + @Test + void shouldHandleErrorOnFailureInInput() { + String sql = "COPY test (val) FROM STDIN"; + + Flux data = Flux.just( + ByteBuffer.wrap("d\n".getBytes()) + ) + .concatWith(Mono.error(new RuntimeException("Failed during input generation"))); + + this.connection.copyIn(sql, data) + .as(StepVerifier::create) + .expectError(RuntimeException.class) + .verify(); + + verifyItemsInserted(emptyList()); + } + + @Test + void shouldCopyNothingEmptyFlux() { + String sql = "COPY test (val) FROM STDIN"; + + Flux data = Flux.empty(); + + this.connection.copyIn(sql, data) + .as(StepVerifier::create) + .expectNext(0L) + .verifyComplete(); + } + + @Test + void shouldHandleErrorOnValidNonCopyInQuery() { + String sql = "SELECT 1"; + + Flux input = Flux.just(ByteBuffer.wrap(("something,something-invalid\n").getBytes())); + + this.connection.copyIn(sql, input) + .as(StepVerifier::create) + .consumeErrorWith(e -> assertThat(e) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Copy from stdin query expected, sql='SELECT 1', message=CommandComplete{command=SELECT, rowId=null, rows=1}") + ) + .verify(); + } + + @Test + void shouldHandleErrors() { + String sql = "COPY test (val) FROM STDIN"; + + int characterCountVarcharType = 256; + Flux input = Flux.just(String.join("", Collections.nCopies(characterCountVarcharType, "a"))) + .map(row -> ByteBuffer.wrap(row.getBytes())); + + verifyCopyInFailed(sql, input, "value too long for type character varying(255)"); + } + + @Test + void shouldFailOnInvalidStatement() { + String sql = "COPY invalid command"; + + Flux data = Flux.just(ByteBuffer.wrap(("something,something-invalid\n").getBytes())); + + verifyCopyInFailed(sql, data, "syntax error at or near \"command\""); + } + + @Test + void shouldFailOnInvalidDataType() { + String sql = "COPY test (val, timestamp) FROM STDIN WITH DELIMITER ','"; + + Flux data = Flux.just(ByteBuffer.wrap(("something,something-invalid\n").getBytes())); + + verifyCopyInFailed(sql, data, "invalid input syntax for type timestamp: \"something-invalid\""); + } + + private void verifyCopyInFailed(String sql, Flux data, String message) { + this.connection.copyIn(sql, data) + .as(StepVerifier::create) + .consumeErrorWith(e -> assertThat(e) + .isInstanceOf(PostgresqlBadGrammarException.class) + .hasMessage(message) + ) + .verify(); + } + + private void verifyItemsInserted(List t) { + this.connection.createStatement("SELECT val FROM test") + .execute() + .flatMap(res -> res.map(row -> row.get(0))) + .collectSortedList() + .as(StepVerifier::create) + .expectNext(t) + .verifyComplete(); + } + +} diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java new file mode 100644 index 00000000..8668d909 --- /dev/null +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java @@ -0,0 +1,178 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.r2dbc.postgresql; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.r2dbc.postgresql.ExceptionFactory.PostgresqlNonTransientResourceException; +import io.r2dbc.postgresql.client.Client; +import io.r2dbc.postgresql.client.TestClient; +import io.r2dbc.postgresql.client.TransactionStatus; +import io.r2dbc.postgresql.message.Format; +import io.r2dbc.postgresql.message.backend.CommandComplete; +import io.r2dbc.postgresql.message.backend.CopyInResponse; +import io.r2dbc.postgresql.message.backend.ErrorResponse; +import io.r2dbc.postgresql.message.backend.ReadyForQuery; +import io.r2dbc.postgresql.message.frontend.CopyData; +import io.r2dbc.postgresql.message.frontend.CopyDone; +import io.r2dbc.postgresql.message.frontend.CopyFail; +import io.r2dbc.postgresql.message.frontend.Query; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static io.r2dbc.postgresql.message.backend.ReadyForQuery.TransactionStatus.IDLE; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link PostgresqlCopyIn}. + */ +final class PostgresqlCopyInUnitTests { + + @Test + void copyIn() { + ByteBuffer byteBuffer = byteBuffer("a\n"); + Client client = TestClient.builder() + .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) + .expectRequest(new CopyData(Unpooled.wrappedBuffer(byteBuffer)), CopyDone.INSTANCE).thenRespond( + new CommandComplete("cmd", 1, 1), + new ReadyForQuery(IDLE) + ).build(); + + new PostgresqlCopyIn(MockContext.builder().client(client).build()) + .copy("some-sql", Flux.just(byteBuffer)) + .as(StepVerifier::create) + .expectNext(1L) + .verifyComplete(); + } + + @Test + void copyInInvalidQuery() { + ByteBuffer byteBuffer = byteBuffer("a\n"); + String sql = "invalid-sql"; + Client client = TestClient.builder() + .expectRequest(new Query(sql)).thenRespond(new CommandComplete("command", 0, 9)) + .build(); + + new PostgresqlCopyIn(MockContext.builder().client(client).build()) + .copy(sql, Flux.just(byteBuffer)) + .as(StepVerifier::create) + .consumeErrorWith(e -> assertThat(e) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Copy from stdin query expected, sql='invalid-sql', message=CommandComplete{command=command, rowId=0, rows=9}") + ) + .verify(); + } + + @Test + void copyInErrorResponse() { + ByteBuffer byteBuffer = byteBuffer("a\n"); + Client client = TestClient.builder() + .expectRequest(new Query("some-sql")).thenRespond(new ErrorResponse(emptyList())) + .build(); + + new PostgresqlCopyIn(MockContext.builder().client(client).build()) + .copy("some-sql", Flux.just(byteBuffer)) + .as(StepVerifier::create) + .expectError(PostgresqlNonTransientResourceException.class) + .verify(); + } + + @Test + void copyInEmpty() { + Client client = TestClient.builder() + .transactionStatus(TransactionStatus.IDLE) + .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) + .expectRequest(CopyDone.INSTANCE).thenRespond( + new CommandComplete("cmd", 1, 0), + new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE) + ) + .build(); + + new PostgresqlCopyIn(MockContext.builder().client(client).build()) + .copy("some-sql", Flux.empty()) + .as(StepVerifier::create) + .expectNext(0L) + .verifyComplete(); + } + + @Test + void copyInError() { + TestPublisher testPublisher = TestPublisher.createCold(); + testPublisher.next(byteBuffer("a\n")); + testPublisher.next(byteBuffer("b\n")); + testPublisher.error(new RuntimeException("Failed")); + + Client client = TestClient.builder() + .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) + .expectRequest( + new CopyData(byteBuf("a\n")), + new CopyData(byteBuf("b\n")), + new CopyFail("Copy operation failed: Failed") + ).thenRespond( + new CommandComplete("cmd", 1, 1), + new ReadyForQuery(IDLE) + ).build(); + + new PostgresqlCopyIn(MockContext.builder().client(client).build()) + .copy("some-sql", testPublisher.flux()) + .as(StepVerifier::create) + .expectError(RuntimeException.class) + .verify(); + } + + @Test + void copyInCancel() { + TestPublisher testPublisher = TestPublisher.create(); + + Client client = TestClient.builder() + .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) + .expectRequest( + new CopyData(byteBuf("a")), + new CopyData(byteBuf("b")), + new CopyFail("Copy operation failed: Cancelled") + ).thenRespond( + new CommandComplete("cmd", 1, 1), + new ReadyForQuery(IDLE) + ).build(); + + new PostgresqlCopyIn(MockContext.builder().client(client).build()) + .copy("some-sql", testPublisher.flux()) + .as(StepVerifier::create) + .then(() -> { + testPublisher.next(byteBuffer("a")); + testPublisher.next(byteBuffer("b")); + }) + .thenCancel() + .verify(); + } + + private ByteBuffer byteBuffer(String str) { + return ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)); + } + + private ByteBuf byteBuf(String str) { + return Unpooled.wrappedBuffer(str.getBytes()); + } + +} diff --git a/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java b/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java index 7060f9b0..91f2048e 100644 --- a/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java +++ b/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java @@ -19,9 +19,11 @@ import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.nio.ByteBuffer; import java.time.Duration; public final class MockPostgresqlConnection implements PostgresqlConnection { @@ -132,4 +134,9 @@ public Mono validate(ValidationDepth depth) { return Mono.empty(); } + @Override + public Mono copyIn(String sql, Publisher stdin) { + return Mono.empty(); + } + } diff --git a/src/test/java/io/r2dbc/postgresql/client/TestClient.java b/src/test/java/io/r2dbc/postgresql/client/TestClient.java index 2fdc8216..78dc563f 100644 --- a/src/test/java/io/r2dbc/postgresql/client/TestClient.java +++ b/src/test/java/io/r2dbc/postgresql/client/TestClient.java @@ -326,10 +326,10 @@ public T done() { return this.chain; } - public Exchange.Builder> expectRequest(FrontendMessage request) { - Assert.requireNonNull(request, "request must not be null"); + public Exchange.Builder> expectRequest(FrontendMessage... requests) { + Assert.requireNonNull(requests, "requests must not be null"); - Exchange.Builder> exchange = new Exchange.Builder<>(this, request); + Exchange.Builder> exchange = new Exchange.Builder<>(this, requests); this.exchanges.add(exchange); return exchange; } From f6b057aed884ade9e16dfc6d241e5190fb78666f Mon Sep 17 00:00:00 2001 From: Arjan Schouten Date: Tue, 12 Apr 2022 16:09:37 +0200 Subject: [PATCH 2/3] review comments --- .../io/r2dbc/postgresql/CopyInBenchmarks.java | 30 +++++++++-------- .../postgresql/PostgresqlConnection.java | 3 +- .../io/r2dbc/postgresql/PostgresqlCopyIn.java | 13 +++----- .../postgresql/api/PostgresqlConnection.java | 10 +++--- .../PostgresqlCopyInIntegrationTests.java | 33 +++++++++++-------- .../postgresql/PostgresqlCopyInUnitTests.java | 25 +++++--------- .../api/MockPostgresqlConnection.java | 4 +-- 7 files changed, 59 insertions(+), 59 deletions(-) diff --git a/src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java b/src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java index aaff14b7..47891abb 100644 --- a/src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java +++ b/src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java @@ -16,6 +16,8 @@ package io.r2dbc.postgresql; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.r2dbc.postgresql.api.PostgresqlConnection; import io.r2dbc.postgresql.util.PostgresqlServerExtension; import org.junit.platform.commons.annotation.Testable; @@ -32,21 +34,20 @@ import org.openjdk.jmh.infra.Blackhole; import org.postgresql.copy.CopyManager; import org.postgresql.jdbc.PgConnection; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.sql.SQLException; import java.sql.Statement; import java.util.concurrent.TimeUnit; @@ -125,15 +126,18 @@ public void doTearDown() throws IOException { } @Benchmark - public void copyInR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) { - int bufferSize = 65536; // BufferSize is the same as the one from JDBC's CopyManager - Flux input = DataBufferUtils.read(connectionHolder.csvFile, DefaultDataBufferFactory.sharedInstance, bufferSize, StandardOpenOption.READ) - .map(DataBuffer::asByteBuffer); + public void copyInR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws IOException { + File file = connectionHolder.csvFile.toFile(); + try (FileInputStream fileInputStream = new FileInputStream(file); + FileChannel fileChannel = fileInputStream.getChannel()) { + MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, file.length()); + ByteBuf byteBuf = Unpooled.wrappedBuffer(mappedByteBuffer); - Long rowsInserted = connectionHolder.r2dbc.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", input) - .block(); + Long rowsInserted = connectionHolder.r2dbc.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", Mono.just(byteBuf)) + .block(); - voodoo.consume(rowsInserted); + voodoo.consume(rowsInserted); + } } @Benchmark diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java index e4dea595..5ba545f8 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java @@ -16,6 +16,7 @@ package io.r2dbc.postgresql; +import io.netty.buffer.ByteBuf; import io.r2dbc.postgresql.api.ErrorDetails; import io.r2dbc.postgresql.api.Notification; import io.r2dbc.postgresql.api.PostgresTransactionDefinition; @@ -408,7 +409,7 @@ public void onComplete() { } @Override - public Mono copyIn(String sql, Publisher stdin) { + public Mono copyIn(String sql, Publisher stdin) { return new PostgresqlCopyIn(resources).copy(sql, stdin); } diff --git a/src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java b/src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java index b8bcbcbe..55c5b9dc 100644 --- a/src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java +++ b/src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java @@ -16,7 +16,7 @@ package io.r2dbc.postgresql; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBuf; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.r2dbc.postgresql.client.Client; @@ -27,7 +27,6 @@ import io.r2dbc.postgresql.message.frontend.CopyData; import io.r2dbc.postgresql.message.frontend.CopyDone; import io.r2dbc.postgresql.message.frontend.CopyFail; -import io.r2dbc.postgresql.message.frontend.FrontendMessage; import io.r2dbc.postgresql.message.frontend.Query; import io.r2dbc.postgresql.util.Assert; import io.r2dbc.postgresql.util.Operators; @@ -35,8 +34,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.nio.ByteBuffer; - import static io.r2dbc.postgresql.PostgresqlResult.toResult; /** @@ -50,10 +47,10 @@ final class PostgresqlCopyIn { this.context = Assert.requireNonNull(context, "context must not be null"); } - Mono copy(String sql, Publisher stdin) { + Mono copy(String sql, Publisher stdin) { return Flux.from(stdin) - .map(buffer -> new CopyData(Unpooled.wrappedBuffer(buffer))) - .as(messages -> copyIn(sql, messages)); + .map(CopyData::new) + .as(messages -> copyIn(sql, messages)); } private Mono copyIn(String sql, Flux copyDataMessages) { @@ -61,7 +58,7 @@ private Mono copyIn(String sql, Flux copyDataMessages) { Flux backendMessages = copyDataMessages .doOnNext(client::send) - .doOnError(e -> !(e instanceof IllegalArgumentException), (e) -> sendCopyFail(e.getMessage())) + .doOnError((e) -> sendCopyFail(e.getMessage())) .doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release) .thenMany(client.exchange(Mono.just(CopyDone.INSTANCE))); diff --git a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java index 29ed35d9..db6ef57a 100644 --- a/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java +++ b/src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java @@ -16,6 +16,7 @@ package io.r2dbc.postgresql.api; +import io.netty.buffer.ByteBuf; import io.r2dbc.postgresql.message.frontend.CancelRequest; import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; @@ -27,7 +28,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.nio.ByteBuffer; import java.time.Duration; /** @@ -173,12 +173,12 @@ public interface PostgresqlConnection extends Connection { Mono validate(ValidationDepth depth); /** - * Copy bulk data from client into a PostgreSQL table very fast. + * Use COPY FROM STDIN for very fast copying into a database table. * - * @param sql the COPY sql statement - * @param stdin the ByteBuffer publisher + * @param sql the COPY … FROM STDIN sql statement + * @param stdin the ByteBuf publisher * @return a {@link Mono} with the amount of rows inserted */ - Mono copyIn(String sql, Publisher stdin); + Mono copyIn(String sql, Publisher stdin); } diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java index 130d4d86..d915df0d 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInIntegrationTests.java @@ -16,6 +16,8 @@ package io.r2dbc.postgresql; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.r2dbc.postgresql.ExceptionFactory.PostgresqlBadGrammarException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -25,7 +27,6 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; @@ -64,10 +65,10 @@ protected void customize(PostgresqlConnectionConfiguration.Builder builder) { void shouldCopyDataIntoTable() { String sql = "COPY test (val) FROM STDIN"; - Flux data = Flux.just( - ByteBuffer.wrap("d\n".getBytes()), - ByteBuffer.wrap("d\n".getBytes()), - ByteBuffer.wrap("e\n".getBytes()) + Flux data = Flux.just( + byteBuf("d\n"), + byteBuf("d\n"), + byteBuf("e\n") ); this.connection.copyIn(sql, data) @@ -83,8 +84,8 @@ void shouldCopyDataIntoTable() { void shouldHandleErrorOnFailureInInput() { String sql = "COPY test (val) FROM STDIN"; - Flux data = Flux.just( - ByteBuffer.wrap("d\n".getBytes()) + Flux data = Flux.just( + byteBuf("d\n") ) .concatWith(Mono.error(new RuntimeException("Failed during input generation"))); @@ -100,7 +101,7 @@ void shouldHandleErrorOnFailureInInput() { void shouldCopyNothingEmptyFlux() { String sql = "COPY test (val) FROM STDIN"; - Flux data = Flux.empty(); + Flux data = Flux.empty(); this.connection.copyIn(sql, data) .as(StepVerifier::create) @@ -112,7 +113,7 @@ void shouldCopyNothingEmptyFlux() { void shouldHandleErrorOnValidNonCopyInQuery() { String sql = "SELECT 1"; - Flux input = Flux.just(ByteBuffer.wrap(("something,something-invalid\n").getBytes())); + Flux input = Flux.just(byteBuf("something,something-invalid\n")); this.connection.copyIn(sql, input) .as(StepVerifier::create) @@ -128,8 +129,8 @@ void shouldHandleErrors() { String sql = "COPY test (val) FROM STDIN"; int characterCountVarcharType = 256; - Flux input = Flux.just(String.join("", Collections.nCopies(characterCountVarcharType, "a"))) - .map(row -> ByteBuffer.wrap(row.getBytes())); + Flux input = Flux.just(String.join("", Collections.nCopies(characterCountVarcharType, "a"))) + .map(this::byteBuf); verifyCopyInFailed(sql, input, "value too long for type character varying(255)"); } @@ -138,7 +139,7 @@ void shouldHandleErrors() { void shouldFailOnInvalidStatement() { String sql = "COPY invalid command"; - Flux data = Flux.just(ByteBuffer.wrap(("something,something-invalid\n").getBytes())); + Flux data = Flux.just(byteBuf("something,something-invalid\n")); verifyCopyInFailed(sql, data, "syntax error at or near \"command\""); } @@ -147,12 +148,12 @@ void shouldFailOnInvalidStatement() { void shouldFailOnInvalidDataType() { String sql = "COPY test (val, timestamp) FROM STDIN WITH DELIMITER ','"; - Flux data = Flux.just(ByteBuffer.wrap(("something,something-invalid\n").getBytes())); + Flux data = Flux.just(byteBuf("something,something-invalid\n")); verifyCopyInFailed(sql, data, "invalid input syntax for type timestamp: \"something-invalid\""); } - private void verifyCopyInFailed(String sql, Flux data, String message) { + private void verifyCopyInFailed(String sql, Flux data, String message) { this.connection.copyIn(sql, data) .as(StepVerifier::create) .consumeErrorWith(e -> assertThat(e) @@ -172,4 +173,8 @@ private void verifyItemsInserted(List t) { .verifyComplete(); } + private ByteBuf byteBuf(String str) { + return Unpooled.wrappedBuffer(str.getBytes()); + } + } diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java index 8668d909..74a2dc21 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java @@ -36,9 +36,6 @@ import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - import static io.r2dbc.postgresql.message.backend.ReadyForQuery.TransactionStatus.IDLE; import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; @@ -51,7 +48,7 @@ final class PostgresqlCopyInUnitTests { @Test void copyIn() { - ByteBuffer byteBuffer = byteBuffer("a\n"); + ByteBuf byteBuffer = byteBuf("a\n"); Client client = TestClient.builder() .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) .expectRequest(new CopyData(Unpooled.wrappedBuffer(byteBuffer)), CopyDone.INSTANCE).thenRespond( @@ -68,7 +65,7 @@ void copyIn() { @Test void copyInInvalidQuery() { - ByteBuffer byteBuffer = byteBuffer("a\n"); + ByteBuf byteBuffer = byteBuf("a\n"); String sql = "invalid-sql"; Client client = TestClient.builder() .expectRequest(new Query(sql)).thenRespond(new CommandComplete("command", 0, 9)) @@ -86,7 +83,7 @@ void copyInInvalidQuery() { @Test void copyInErrorResponse() { - ByteBuffer byteBuffer = byteBuffer("a\n"); + ByteBuf byteBuffer = byteBuf("a\n"); Client client = TestClient.builder() .expectRequest(new Query("some-sql")).thenRespond(new ErrorResponse(emptyList())) .build(); @@ -118,9 +115,9 @@ void copyInEmpty() { @Test void copyInError() { - TestPublisher testPublisher = TestPublisher.createCold(); - testPublisher.next(byteBuffer("a\n")); - testPublisher.next(byteBuffer("b\n")); + TestPublisher testPublisher = TestPublisher.createCold(); + testPublisher.next(byteBuf("a\n")); + testPublisher.next(byteBuf("b\n")); testPublisher.error(new RuntimeException("Failed")); Client client = TestClient.builder() @@ -143,7 +140,7 @@ void copyInError() { @Test void copyInCancel() { - TestPublisher testPublisher = TestPublisher.create(); + TestPublisher testPublisher = TestPublisher.create(); Client client = TestClient.builder() .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) @@ -160,17 +157,13 @@ void copyInCancel() { .copy("some-sql", testPublisher.flux()) .as(StepVerifier::create) .then(() -> { - testPublisher.next(byteBuffer("a")); - testPublisher.next(byteBuffer("b")); + testPublisher.next(byteBuf("a")); + testPublisher.next(byteBuf("b")); }) .thenCancel() .verify(); } - private ByteBuffer byteBuffer(String str) { - return ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)); - } - private ByteBuf byteBuf(String str) { return Unpooled.wrappedBuffer(str.getBytes()); } diff --git a/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java b/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java index 91f2048e..5dc7dd07 100644 --- a/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java +++ b/src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java @@ -16,6 +16,7 @@ package io.r2dbc.postgresql.api; +import io.netty.buffer.ByteBuf; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; @@ -23,7 +24,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.nio.ByteBuffer; import java.time.Duration; public final class MockPostgresqlConnection implements PostgresqlConnection { @@ -135,7 +135,7 @@ public Mono validate(ValidationDepth depth) { } @Override - public Mono copyIn(String sql, Publisher stdin) { + public Mono copyIn(String sql, Publisher stdin) { return Mono.empty(); } From 17e54fca4fbd7b2e8514b565ec15578070db7bbd Mon Sep 17 00:00:00 2001 From: Arjan Schouten Date: Wed, 13 Apr 2022 11:37:06 +0200 Subject: [PATCH 3/3] fixup --- .../java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java index 74a2dc21..40c027d8 100644 --- a/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java +++ b/src/test/java/io/r2dbc/postgresql/PostgresqlCopyInUnitTests.java @@ -51,7 +51,7 @@ void copyIn() { ByteBuf byteBuffer = byteBuf("a\n"); Client client = TestClient.builder() .expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT)) - .expectRequest(new CopyData(Unpooled.wrappedBuffer(byteBuffer)), CopyDone.INSTANCE).thenRespond( + .expectRequest(new CopyData(byteBuffer), CopyDone.INSTANCE).thenRespond( new CommandComplete("cmd", 1, 1), new ReadyForQuery(IDLE) ).build();