-
Notifications
You must be signed in to change notification settings - Fork 0
Wip/add copy in #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
/* | ||
* Copyright 2019 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.r2dbc.postgresql; | ||
|
||
import io.r2dbc.postgresql.api.PostgresqlConnection; | ||
import io.r2dbc.postgresql.util.PostgresqlServerExtension; | ||
import org.junit.platform.commons.annotation.Testable; | ||
import org.openjdk.jmh.annotations.Benchmark; | ||
import org.openjdk.jmh.annotations.BenchmarkMode; | ||
import org.openjdk.jmh.annotations.Level; | ||
import org.openjdk.jmh.annotations.Mode; | ||
import org.openjdk.jmh.annotations.OutputTimeUnit; | ||
import org.openjdk.jmh.annotations.Param; | ||
import org.openjdk.jmh.annotations.Scope; | ||
import org.openjdk.jmh.annotations.Setup; | ||
import org.openjdk.jmh.annotations.State; | ||
import org.openjdk.jmh.annotations.TearDown; | ||
import org.openjdk.jmh.infra.Blackhole; | ||
import org.postgresql.copy.CopyManager; | ||
import org.postgresql.jdbc.PgConnection; | ||
import org.springframework.core.io.buffer.DataBuffer; | ||
import org.springframework.core.io.buffer.DataBufferUtils; | ||
import org.springframework.core.io.buffer.DefaultDataBufferFactory; | ||
import reactor.core.publisher.Flux; | ||
|
||
import java.io.FileInputStream; | ||
import java.io.FileOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.sql.SQLException; | ||
import java.sql.Statement; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.stream.IntStream; | ||
|
||
/** | ||
* Benchmarks for Copy operation. Contains the following execution methods: | ||
*/ | ||
@BenchmarkMode(Mode.Throughput) | ||
@OutputTimeUnit(TimeUnit.SECONDS) | ||
@Testable | ||
public class CopyInBenchmarks extends BenchmarkSettings { | ||
|
||
private static PostgresqlServerExtension extension = new PostgresqlServerExtension(); | ||
|
||
@State(Scope.Benchmark) | ||
public static class ConnectionHolder { | ||
|
||
@Param({"0", "1", "100", "1000000"}) | ||
int rows; | ||
|
||
final PgConnection jdbc; | ||
|
||
final CopyManager copyManager; | ||
|
||
final PostgresqlConnection r2dbc; | ||
|
||
Path csvFile; | ||
|
||
public ConnectionHolder() { | ||
|
||
extension.initialize(); | ||
try { | ||
jdbc = extension.getDataSource().getConnection() | ||
.unwrap(PgConnection.class); | ||
copyManager = jdbc.getCopyAPI(); | ||
Statement statement = jdbc.createStatement(); | ||
|
||
try { | ||
statement.execute("DROP TABLE IF EXISTS simple_test"); | ||
} catch (SQLException e) { | ||
} | ||
|
||
statement.execute("CREATE TABLE simple_test (name VARCHAR(255), age int)"); | ||
|
||
jdbc.setAutoCommit(false); | ||
|
||
r2dbc = new PostgresqlConnectionFactory(extension.getConnectionConfiguration()).create().block(); | ||
} catch (SQLException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Setup(Level.Trial) | ||
public void doSetup() throws IOException { | ||
csvFile = Files.createTempFile("jmh-input", ".csv"); | ||
|
||
try (OutputStream outputStream = new FileOutputStream(csvFile.toFile())) { | ||
IntStream.range(0, rows) | ||
.mapToObj(i -> "some-input" + i + ";" + i + "\n") | ||
.forEach(row -> { | ||
try { | ||
outputStream.write(row.getBytes(StandardCharsets.UTF_8)); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
@TearDown(Level.Trial) | ||
public void doTearDown() throws IOException { | ||
Files.delete(csvFile); | ||
} | ||
|
||
} | ||
|
||
@Benchmark | ||
public void copyInR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) { | ||
int bufferSize = 65536; // BufferSize is the same as the one from JDBC's CopyManager | ||
Flux<ByteBuffer> input = DataBufferUtils.read(connectionHolder.csvFile, DefaultDataBufferFactory.sharedInstance, bufferSize, StandardOpenOption.READ) | ||
.map(DataBuffer::asByteBuffer); | ||
|
||
Long rowsInserted = connectionHolder.r2dbc.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", input) | ||
.block(); | ||
|
||
voodoo.consume(rowsInserted); | ||
} | ||
|
||
@Benchmark | ||
public void copyInJdbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws IOException, SQLException { | ||
try (InputStream inputStream = new FileInputStream(connectionHolder.csvFile.toFile())) { | ||
|
||
Long rowsInserted = connectionHolder.copyManager.copyIn("COPY simple_test (name, age) FROM STDIN DELIMITER ';'", inputStream); | ||
|
||
voodoo.consume(rowsInserted); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Copyright 2017 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.r2dbc.postgresql; | ||
|
||
import io.netty.buffer.Unpooled; | ||
import io.netty.util.ReferenceCountUtil; | ||
import io.netty.util.ReferenceCounted; | ||
import io.r2dbc.postgresql.client.Client; | ||
import io.r2dbc.postgresql.message.backend.BackendMessage; | ||
import io.r2dbc.postgresql.message.backend.CommandComplete; | ||
import io.r2dbc.postgresql.message.backend.CopyInResponse; | ||
import io.r2dbc.postgresql.message.backend.ReadyForQuery; | ||
import io.r2dbc.postgresql.message.frontend.CopyData; | ||
import io.r2dbc.postgresql.message.frontend.CopyDone; | ||
import io.r2dbc.postgresql.message.frontend.CopyFail; | ||
import io.r2dbc.postgresql.message.frontend.FrontendMessage; | ||
import io.r2dbc.postgresql.message.frontend.Query; | ||
import io.r2dbc.postgresql.util.Assert; | ||
import io.r2dbc.postgresql.util.Operators; | ||
import org.reactivestreams.Publisher; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
|
||
import java.nio.ByteBuffer; | ||
|
||
import static io.r2dbc.postgresql.PostgresqlResult.toResult; | ||
|
||
/** | ||
* An implementation for {@link CopyData} PostgreSQL queries. | ||
*/ | ||
final class PostgresqlCopyIn { | ||
|
||
private final ConnectionResources context; | ||
|
||
PostgresqlCopyIn(ConnectionResources context) { | ||
this.context = Assert.requireNonNull(context, "context must not be null"); | ||
} | ||
|
||
Mono<Long> copy(String sql, Publisher<ByteBuffer> stdin) { | ||
return Flux.from(stdin) | ||
.map(buffer -> new CopyData(Unpooled.wrappedBuffer(buffer))) | ||
.as(messages -> copyIn(sql, messages)); | ||
} | ||
|
||
private Mono<Long> copyIn(String sql, Flux<CopyData> copyDataMessages) { | ||
Client client = context.getClient(); | ||
|
||
Flux<BackendMessage> backendMessages = copyDataMessages | ||
.doOnNext(client::send) | ||
.doOnError(e -> !(e instanceof IllegalArgumentException), (e) -> sendCopyFail(e.getMessage())) | ||
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release) | ||
.thenMany(client.exchange(Mono.just(CopyDone.INSTANCE))); | ||
|
||
return startCopy(sql) | ||
.concatWith(backendMessages) | ||
.doOnCancel(() -> sendCopyFail("Cancelled")) | ||
.as(Operators::discardOnCancel) | ||
.as(messages -> toResult(context, messages, ExceptionFactory.INSTANCE).getRowsUpdated()); | ||
} | ||
|
||
private Flux<BackendMessage> startCopy(String sql) { | ||
return context.getClient().exchange( | ||
// ReadyForQuery is returned when an invalid query is provided | ||
backendMessage -> backendMessage instanceof CopyInResponse || backendMessage instanceof ReadyForQuery, | ||
Mono.just(new Query(sql)) | ||
) | ||
.doOnNext(message -> { | ||
if (message instanceof CommandComplete) { | ||
throw new IllegalArgumentException("Copy from stdin query expected, sql='" + sql + "', message=" + message); | ||
} | ||
}); | ||
} | ||
|
||
private void sendCopyFail(String message) { | ||
context.getClient().exchange(Mono.just(new CopyFail("Copy operation failed: " + message))) | ||
.as(Operators::discardOnCancel) | ||
.subscribe(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PostgresqlCopyIn{" + | ||
"context=" + this.context + | ||
'}'; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,10 +22,12 @@ | |
import io.r2dbc.spi.R2dbcNonTransientResourceException; | ||
import io.r2dbc.spi.TransactionDefinition; | ||
import io.r2dbc.spi.ValidationDepth; | ||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.time.Duration; | ||
|
||
/** | ||
|
@@ -170,4 +172,13 @@ public interface PostgresqlConnection extends Connection { | |
@Override | ||
Mono<Boolean> validate(ValidationDepth depth); | ||
|
||
/** | ||
* Copy bulk data from client into a PostgreSQL table very fast. | ||
* | ||
* @param sql the COPY sql statement | ||
* @param stdin the ByteBuffer publisher | ||
* @return a {@link Mono} with the amount of rows inserted | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moet er niet een There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ik weet nog niet in welke versie dit komt en of het ook in de 0.8.x releases gaat komen of dat we moeten wachten op de nieuwe spring-data releases?! Voor nu ff laten zodat het de verantwoordelijkheid is van r2dbc team? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okido |
||
Mono<Long> copyIn(String sql, Publisher<ByteBuffer> stdin); | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ik krijg geen Rule 1.3 errors meer. Wel vreemd dat jdbc meer dat 2 keer zo snel is:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Goed om te horen van die Rule 1.3. Nog geen idee door wat dat dan veroorzaakt werd.
Ik denk dat we ons er niet op moeten blindstaren. Ik had het idee dat het wellicht kan liggen aan het lezen van file waarbij FileInputStream sneller is. Testje gedaan en dit is bij mij lokaal de output:
Met de volgende code:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heb jij een idee om de r2dbc variant te verbeteren? Dat is namelijk wel chique richting r2dbc om cijfers te hebben die representatief zijn...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nee, veel efficienter lijkt het me niet te krijgen dan in die benchmark. Ik weet alleen niet of 1 en 100 zulke goede benchmark cases zijn.