Skip to content

Insertion of too many rows do not complete #222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
gabrieldn opened this issue Dec 29, 2019 · 7 comments
Closed

Insertion of too many rows do not complete #222

gabrieldn opened this issue Dec 29, 2019 · 7 comments
Labels
status: in-progress An issue that is currently being worked on type: bug A general bug
Milestone

Comments

@gabrieldn
Copy link

Bug Report

Versions

  • Driver: 0.8.0.RELEASE
  • Database: postgres:9.0/postgres:12.1/postgres:12.1-alpine
  • Java: 1.8/11/13
  • OS: Windows/MacOS

Current Behavior

Insertion queries that use bind and returning do not complete when inserting too many rows.

When inserting multiple rows, there is a limit to the amount that can be inserted. When that limit is reached, the execution will just hang forever.

The problem is happening with and without the use of the transactional operator. The only difference between them is the amount of rows inserted before the issue happens.
When not using transactional operator, 518 rows and then it will hang.
When using transactional operator, 260 rows will be inserted and then it will hang.

Those numbers do also change dependending on the queries that were run before the issue query. This behavior can be seen by changing the last argument passed to the method called "behaviorChangingSample" that is present in the repro code. In general, the amount of inserts with the issue query before the issue will be increased by 2 by each query ran before it.

There is no error stack trace at all, it just hangs.

Table schema

Input Code
CREATE TABLE foo
(
    id    SERIAL PRIMARY KEY,
    value CHAR(1) NOT NULL
);

Steps to reproduce

Just execute the code below or the DemoApplication in the repo below.
There are 5 methods in the repro code. They do showcase that the problem is only hapenning in insert queries using bind and return at the same time.

Repo link: https://github.com/gabrieldn/spring-r2dbc-postgres-insert-issue

Input Code
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.r2dbc.connectionfactory.R2dbcTransactionManager;
import org.springframework.data.r2dbc.core.DatabaseClient;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.testcontainers.containers.PostgreSQLContainer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class DemoApplication {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemoApplication.class);

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = createConnectionFactory();
        DatabaseClient client = createDatabaseClient(connectionFactory);
        ReactiveTransactionManager tm = createTransactionManager(connectionFactory);

        client.execute("create table foo (id serial primary key, value char(1) not null)")
                .fetch().rowsUpdated().block();

        TransactionalOperator operator = TransactionalOperator.create(tm);

//        LOGGER.info("rawInsertString inserted {} rows", rawInsertString(client, operator, 5000).size());
//        LOGGER.info("insertWithBind inserted {} rows", insertWithBind(client, operator, 5000).size());
//        LOGGER.info("rawInsertStringWithReturning inserted {} rows", rawInsertStringWithReturning(client, operator, 5000).size());
        LOGGER.info("insertWithBindAndReturning inserted {} rows", insertWithBindAndReturning(client, operator).size());

//        behaviorChangingSample(client, operator, 1);
    }

    private static List<String> insertWithBind(DatabaseClient client, TransactionalOperator operator, long rowsToInsert) {
        return client.execute("INSERT INTO foo(value) VALUES (:value)")
                .bind("value", "x")
                .fetch().first()
                .repeat(rowsToInsert - 1).then()
                .as(operator::transactional)
                .then(client.execute("select * from foo")
                        .map((row, rowMetadata) -> String.valueOf(row.get("id")))
                        .all().collectList())
                .defaultIfEmpty(new ArrayList<>())
                .block();
    }

    private static List<String> rawInsertString(DatabaseClient client, TransactionalOperator operator, long rowsToInsert) {
        return client.execute("INSERT INTO foo(value) VALUES ('x')")
                .fetch().first()
                .repeat(rowsToInsert - 1).then()
                .as(operator::transactional)
                .then(client.execute("select * from foo")
                        .map((row, rowMetadata) -> String.valueOf(row.get("id")))
                        .all().collectList())
                .defaultIfEmpty(new ArrayList<>())
                .block();
    }

    private static List<String> rawInsertStringWithReturning(DatabaseClient client, TransactionalOperator operator, long rowsToInsert) {
        return client.execute("INSERT INTO foo(value) VALUES ('x') RETURNING *")
                .map((row, metadata) -> {
                    LOGGER.info("rawInsertStringWithReturning Id: {}", row.get("id"));
                    return row;
                }).first()
                .repeat(rowsToInsert - 1).then()
                .as(operator::transactional)
                .then(client.execute("select * from foo")
                        .map((row, rowMetadata) -> String.valueOf(row.get("id")))
                        .all().collectList())
                .defaultIfEmpty(new ArrayList<>())
                .block();
    }

    private static List<String> insertWithBindAndReturning(DatabaseClient client, TransactionalOperator operator) {
        return client.execute("INSERT INTO foo(value) VALUES (:value) RETURNING *")
                .bind("value", "x")
                .map((row, metadata) -> {
                    LOGGER.info("insertWithBindAndReturning: {}", row.get("id"));
                    return row;
                }).first()
                .repeat(5000 - 1).then()
                .as(operator::transactional)
                .then(client.execute("select * from foo")
                        .map((row, rowMetadata) -> String.valueOf(row.get("id")))
                        .all().collectList())
                .defaultIfEmpty(new ArrayList<>())
                .block();
    }

    private static void behaviorChangingSample(DatabaseClient client, TransactionalOperator operator, long rowsToInsert) {
        LOGGER.info("rawInsertString inserted {} rows", insertWithBind(client, operator, rowsToInsert).size());
        LOGGER.info("insertWithBindAndReturning inserted {} rows", insertWithBindAndReturning(client, operator).size());
    }

    private static PostgreSQLContainer<?> createContainer() {
        PostgreSQLContainer<?> container = new PostgreSQLContainer<>("postgres:12.1-alpine");
        container.start();

        return container;
    }

    private static ConnectionFactory createConnectionFactory() {
        PostgreSQLContainer<?> container = createContainer();

        ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
                .option(ConnectionFactoryOptions.DRIVER, "pool")
                .option(ConnectionFactoryOptions.PROTOCOL, "postgres")
                .option(ConnectionFactoryOptions.HOST, container.getContainerIpAddress())
                .option(ConnectionFactoryOptions.PORT, container.getMappedPort(5432))
                .option(ConnectionFactoryOptions.USER, container.getUsername())
                .option(ConnectionFactoryOptions.PASSWORD, container.getPassword())
                .option(ConnectionFactoryOptions.DATABASE, container.getDatabaseName())
                .build());

        ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
                .maxIdleTime(Duration.ofMinutes(10))
                .maxLifeTime(Duration.ofMinutes(30))
                .initialSize(2)
                .maxSize(5)
                .build();

        return new ConnectionPool(configuration);
    }

    private static DatabaseClient createDatabaseClient(ConnectionFactory connectionFactory) {
        return DatabaseClient.create(connectionFactory);
    }

    private static ReactiveTransactionManager createTransactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }
}

Expected behavior/code

Insert as many rows as needed.

Possible Solution

Additional context

The problem seems to be happening only with the postgres driver. I have tested with both mysql drivers available and the h2 driver and it works just fine even when inserting a few million rows.

When running with spring webflux, the problem does get worse and worse. Sometimes being unable to insert just a few rows, the worst case I got was failing to complete a POST request that should insert 10 rows.

@mp911de mp911de added the type: bug A general bug label Jan 1, 2020
@jowax
Copy link

jowax commented Jan 12, 2020

I have the exact same problem but it hangs on the 2590th insert every time.
I played around a little and noticed that if RowsFetchSpec.first() is used it hangs but if RowsFetchSpec.one() is used it works flawlessly.

Example that hangs on the 2590th insert:

for (int i = 0; i < 100000; i++) {
    var person = new Person()
        .setFirstName("testing");
    System.out.println("Trying to save: " + i);
    databaseClient.insert()
        .into(Person.class)
        .using(person)
        .map(Function.identity())
        .first()
        .block();
    System.out.println("Done saving: " + i);
}

Example that works:

for (int i = 0; i < 100000; i++) {
    var person = new Person()
        .setFirstName("testing");
    System.out.println("Trying to save: " + i);
    databaseClient.insert()
        .into(Person.class)
        .using(person)
        .map(Function.identity())
        .one() // .first() has been replaced with .one()
        .block();
    System.out.println("Done saving: " + i);
}

@mp911de
Copy link
Collaborator

mp911de commented Jan 12, 2020

Thanks for investigating. I assume it's related to #2. SQL Server and MySQL drivers ship already a …DiscardOnCancel operator (see r2dbc/r2dbc-mssql#106) and we should add a similar one, too.
Calling first(…) may leave unconsumed protocol frames in the transport buffer and so the protocol state gets mixed up.

@vnobo
Copy link

vnobo commented Jan 15, 2020

I also encountered it, and temporarily replaced it with one()

@mp911de
Copy link
Collaborator

mp911de commented Jan 20, 2020

I was able to reproduce the issue with just R2DBC Postgres:

for (int i = 0; i < 100000; i++) {


    connection.createStatement("INSERT INTO insert_test (value) VALUES($1)")
        .returnGeneratedValues()
        .bind("$1", "a")
        .execute().flatMap(postgresqlResult -> postgresqlResult.map((row, rowMetadata) -> row.get(0)), 1,1)
        .collectList().block();

    System.out.println("Done saving: " + i);

}

What happens here is that after a couple of iterations (about 260 for me), one of the last statements puts the socket into explicit read mode because it has no demand anymore (the one row that was requested via first() or next() has been emitted). This is, when downstream consumers don't have demand, Reactor Netty stops reading from the I/O channel.

The demand is left as of zero. A subsequent query, the one that hangs, has been sent to the channel but because the receiving side has no demand, no packet is read and therefore, the entirre query result receive part is hanging.

mp911de added a commit that referenced this issue Jan 21, 2020
FluxDiscardOnCancel replays source signals unless cancelling
the subscription. On cancellation, the subscriber requests Long.MAX_VALUE
to drain the source and discard elements that are emitted afterwards.

[closes #222]
mp911de added a commit that referenced this issue Jan 21, 2020
FluxDiscardOnCancel replays source signals unless cancelling
the subscription. On cancellation, the subscriber requests Long.MAX_VALUE
to drain the source and discard elements that are emitted afterwards.

[closes #222]
@mp911de mp911de added this to the 0.8.1.RELEASE milestone Jan 21, 2020
@mp911de mp911de added the status: in-progress An issue that is currently being worked on label Jan 21, 2020
mp911de added a commit that referenced this issue Jan 22, 2020
FluxDiscardOnCancel replays source signals unless cancelling
the subscription. On cancellation, the subscriber requests Long.MAX_VALUE
to drain the source and discard elements that are emitted afterwards.

[closes #222]
gregturn pushed a commit that referenced this issue Jan 27, 2020
FluxDiscardOnCancel replays source signals unless cancelling
the subscription. On cancellation, the subscriber requests Long.MAX_VALUE
to drain the source and discard elements that are emitted afterwards.

[closes #222]
@anfy2002us
Copy link

For two days I was wondering why instead of 100k+ records I can see in the database exactly 2600.... either way upgrading to the version that have the fix resolved my issue.

@jaswanthbellam
Copy link

Is this issue resolved? I see the same issue with latest version 0.8.6.RELEASE

@mp911de
Copy link
Collaborator

mp911de commented Dec 4, 2020

That fix was shipped with 0.8.1.RELEASE. Please file a new issue along with a minimal reproducer so we can look into it @jaswanthbellam

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: in-progress An issue that is currently being worked on type: bug A general bug
Projects
None yet
Development

No branches or pull requests

6 participants