Skip to content

Wip/add copy in #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions src/jmh/java/io/r2dbc/postgresql/CopyInBenchmarks.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql;

import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.util.PostgresqlServerExtension;
import org.junit.platform.commons.annotation.Testable;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import org.postgresql.copy.CopyManager;
import org.postgresql.jdbc.PgConnection;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
* Benchmarks for Copy operation. Contains the following execution methods:
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Testable
public class CopyInBenchmarks extends BenchmarkSettings {

private static PostgresqlServerExtension extension = new PostgresqlServerExtension();

@State(Scope.Benchmark)
public static class ConnectionHolder {

@Param({"0", "1", "100", "1000000"})
int rows;

final PgConnection jdbc;

final CopyManager copyManager;

final PostgresqlConnection r2dbc;

Path csvFile;

public ConnectionHolder() {

extension.initialize();
try {
jdbc = extension.getDataSource().getConnection()
.unwrap(PgConnection.class);
copyManager = jdbc.getCopyAPI();
Statement statement = jdbc.createStatement();

try {
statement.execute("DROP TABLE IF EXISTS simple_test");
} catch (SQLException e) {
}

statement.execute("CREATE TABLE simple_test (name VARCHAR(255), age int)");

jdbc.setAutoCommit(false);

r2dbc = new PostgresqlConnectionFactory(extension.getConnectionConfiguration()).create().block();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Setup(Level.Trial)
public void doSetup() throws IOException {
csvFile = Files.createTempFile("jmh-input", ".csv");

try (OutputStream outputStream = new FileOutputStream(csvFile.toFile())) {
IntStream.range(0, rows)
.mapToObj(i -> "some-input" + i + ";" + i + "\n")
.forEach(row -> {
try {
outputStream.write(row.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

@TearDown(Level.Trial)
public void doTearDown() throws IOException {
Files.delete(csvFile);
}

}

@Benchmark
public void copyInR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) {
int bufferSize = 65536; // BufferSize is the same as the one from JDBC's CopyManager
Flux<ByteBuffer> input = DataBufferUtils.read(connectionHolder.csvFile, DefaultDataBufferFactory.sharedInstance, bufferSize, StandardOpenOption.READ)
.map(DataBuffer::asByteBuffer);

Long rowsInserted = connectionHolder.r2dbc.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", input)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ik krijg geen Rule 1.3 errors meer. Wel vreemd dat jdbc meer dat 2 keer zo snel is:

Benchmark                      (rows)   Mode  Cnt     Score     Error  Units
CopyInBenchmarks.copyInJdbc         0  thrpt    5  6464.981 ± 1673.001  ops/s
CopyInBenchmarks.copyInJdbc         1  thrpt    5  6083.269 ± 1046.740  ops/s
CopyInBenchmarks.copyInJdbc       100  thrpt    5  3822.937 ±  443.608  ops/s
CopyInBenchmarks.copyInJdbc      1000  thrpt    5   395.994 ±  87.967  ops/s
CopyInBenchmarks.copyInJdbc     10000  thrpt    5    97.100 ±  17.249  ops/s
CopyInBenchmarks.copyInJdbc   1000000  thrpt    5     1.076 ±   1.220  ops/s
CopyInBenchmarks.copyInR2dbc        0  thrpt    5  4280.554 ±  870.727  ops/s
CopyInBenchmarks.copyInR2dbc        1  thrpt    5   559.015 ±   77.062  ops/s
CopyInBenchmarks.copyInR2dbc      100  thrpt    5   515.306 ±  112.103  ops/s
CopyInBenchmarks.copyInR2dbc     1000  thrpt    5   122.167 ±  53.674  ops/s
CopyInBenchmarks.copyInR2dbc    10000  thrpt    5    47.122 ±   6.171  ops/s
CopyInBenchmarks.copyInR2dbc  1000000  thrpt    5     0.856 ±   0.710  ops/s

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goed om te horen van die Rule 1.3. Nog geen idee door wat dat dan veroorzaakt werd.

Ik denk dat we ons er niet op moeten blindstaren. Ik had het idee dat het wellicht kan liggen aan het lezen van file waarbij FileInputStream sneller is. Testje gedaan en dit is bij mij lokaal de output:

Benchmark                      (rows)   Mode  Cnt      Score       Error  Units
CopyInBenchmarks.copyInJdbc         0  thrpt    5  35548.527 ± 14809.055  ops/s
CopyInBenchmarks.copyInJdbc         1  thrpt    5  34664.394 ±  8705.782  ops/s
CopyInBenchmarks.copyInJdbc       100  thrpt    5  14261.899 ±  9395.174  ops/s
CopyInBenchmarks.copyInJdbc   1000000  thrpt    5      2.595 ±     0.240  ops/s
CopyInBenchmarks.copyInR2dbc        0  thrpt    5  28322.464 ±  2161.058  ops/s
CopyInBenchmarks.copyInR2dbc        1  thrpt    5  20449.850 ±   617.851  ops/s
CopyInBenchmarks.copyInR2dbc      100  thrpt    5  20647.997 ±  1632.350  ops/s
CopyInBenchmarks.copyInR2dbc  1000000  thrpt    5    116.070 ±     6.872  ops/s

Met de volgende code:

    @Benchmark
    public void copyInR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) {
        int bufferSize = 65536; // BufferSize is the same as the one from JDBC's CopyManager
        Long rowsInserted = DataBufferUtils.read(connectionHolder.csvFile, DefaultDataBufferFactory.sharedInstance, bufferSize, StandardOpenOption.READ)
            .map(x -> {
                DataBufferUtils.release(x);
                return 1;
            })
            .count()
            .block();

        voodoo.consume(rowsInserted);
    }

    @Benchmark
    public void copyInJdbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws IOException, SQLException {
        try (InputStream inputStream = new FileInputStream(connectionHolder.csvFile.toFile())) {

            int i = 0;
            Scanner s = new Scanner(inputStream).useDelimiter("\n");
            while (s.hasNext()) {
                i++;
                voodoo.consume(s.next());
            }
            
            voodoo.consume(i);
        }
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heb jij een idee om de r2dbc variant te verbeteren? Dat is namelijk wel chique richting r2dbc om cijfers te hebben die representatief zijn...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nee, veel efficienter lijkt het me niet te krijgen dan in die benchmark. Ik weet alleen niet of 1 en 100 zulke goede benchmark cases zijn.

.block();

voodoo.consume(rowsInserted);
}

@Benchmark
public void copyInJdbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws IOException, SQLException {
try (InputStream inputStream = new FileInputStream(connectionHolder.csvFile.toFile())) {

Long rowsInserted = connectionHolder.copyManager.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", inputStream);

voodoo.consume(rowsInserted);
}
}

}
6 changes: 6 additions & 0 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -406,6 +407,11 @@ public void onComplete() {
});
}

@Override
public Mono<Long> copyIn(String sql, Publisher<ByteBuffer> stdin) {
return new PostgresqlCopyIn(resources).copy(sql, stdin);
}

private static Function<TransactionStatus, String> getTransactionIsolationLevelQuery(IsolationLevel isolationLevel) {
return transactionStatus -> {
if (transactionStatus == OPEN) {
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/io/r2dbc/postgresql/PostgresqlCopyIn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql;

import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.CopyInResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.CopyData;
import io.r2dbc.postgresql.message.frontend.CopyDone;
import io.r2dbc.postgresql.message.frontend.CopyFail;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Query;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;

import static io.r2dbc.postgresql.PostgresqlResult.toResult;

/**
* An implementation for {@link CopyData} PostgreSQL queries.
*/
final class PostgresqlCopyIn {

private final ConnectionResources context;

PostgresqlCopyIn(ConnectionResources context) {
this.context = Assert.requireNonNull(context, "context must not be null");
}

Mono<Long> copy(String sql, Publisher<ByteBuffer> stdin) {
return Flux.from(stdin)
.map(buffer -> new CopyData(Unpooled.wrappedBuffer(buffer)))
.as(messages -> copyIn(sql, messages));
}

private Mono<Long> copyIn(String sql, Flux<CopyData> copyDataMessages) {
Client client = context.getClient();

Flux<BackendMessage> backendMessages = copyDataMessages
.doOnNext(client::send)
.doOnError(e -> !(e instanceof IllegalArgumentException), (e) -> sendCopyFail(e.getMessage()))
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release)
.thenMany(client.exchange(Mono.just(CopyDone.INSTANCE)));

return startCopy(sql)
.concatWith(backendMessages)
.doOnCancel(() -> sendCopyFail("Cancelled"))
.as(Operators::discardOnCancel)
.as(messages -> toResult(context, messages, ExceptionFactory.INSTANCE).getRowsUpdated());
}

private Flux<BackendMessage> startCopy(String sql) {
return context.getClient().exchange(
// ReadyForQuery is returned when an invalid query is provided
backendMessage -> backendMessage instanceof CopyInResponse || backendMessage instanceof ReadyForQuery,
Mono.just(new Query(sql))
)
.doOnNext(message -> {
if (message instanceof CommandComplete) {
throw new IllegalArgumentException("Copy from stdin query expected, sql='" + sql + "', message=" + message);
}
});
}

private void sendCopyFail(String message) {
context.getClient().exchange(Mono.just(new CopyFail("Copy operation failed: " + message)))
.as(Operators::discardOnCancel)
.subscribe();
}

@Override
public String toString() {
return "PostgresqlCopyIn{" +
"context=" + this.context +
'}';
}

}
11 changes: 11 additions & 0 deletions src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.time.Duration;

/**
Expand Down Expand Up @@ -170,4 +172,13 @@ public interface PostgresqlConnection extends Connection {
@Override
Mono<Boolean> validate(ValidationDepth depth);

/**
* Copy bulk data from client into a PostgreSQL table very fast.
*
* @param sql the COPY sql statement
* @param stdin the ByteBuffer publisher
* @return a {@link Mono} with the amount of rows inserted
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moet er niet een @since bij?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ik weet nog niet in welke versie dit komt en of het ook in de 0.8.x releases gaat komen of dat we moeten wachten op de nieuwe spring-data releases?!

Voor nu ff laten zodat het de verantwoordelijkheid is van r2dbc team?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okido

Mono<Long> copyIn(String sql, Publisher<ByteBuffer> stdin);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import io.r2dbc.postgresql.client.TestClient;
import io.r2dbc.postgresql.client.Version;
import io.r2dbc.postgresql.codec.MockCodecs;
import io.r2dbc.postgresql.message.Format;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.CopyInResponse;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.CopyDone;
import io.r2dbc.postgresql.message.frontend.Query;
import io.r2dbc.postgresql.message.frontend.Terminate;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.Duration;
Expand All @@ -38,6 +43,7 @@
import static io.r2dbc.postgresql.client.TransactionStatus.IDLE;
import static io.r2dbc.postgresql.client.TransactionStatus.OPEN;
import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.Mockito.RETURNS_SMART_NULLS;
Expand Down Expand Up @@ -502,6 +508,25 @@ void setTransactionIsolationLevelNonOpen() {
.verifyComplete();
}

@Test
void copyIn() {
Client client = TestClient.builder()
.transactionStatus(IDLE)
.expectRequest(new Query("some-sql")).thenRespond(new CopyInResponse(emptySet(), Format.FORMAT_TEXT))
.expectRequest(CopyDone.INSTANCE).thenRespond(
new CommandComplete("cmd", 1, 0),
new ReadyForQuery(ReadyForQuery.TransactionStatus.IDLE)
)
.build();

PostgresqlConnection connection = createConnection(client, MockCodecs.empty(), this.statementCache);

connection.copyIn("some-sql", Flux.empty())
.as(StepVerifier::create)
.expectNext(0L)
.verifyComplete();
}

@Test
void setStatementTimeout() {
Client client = TestClient.builder()
Expand Down
Loading