-
Notifications
You must be signed in to change notification settings - Fork 184
Fetch rows never completes #401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@Test
void noCompleteSignalTest() {
getJdbcOperations().execute("create table if not exists users\n" +
"(\n" +
"\tuserId integer,\n" +
"\tattr1 integer,\n" +
"\tattr2 integer,\n" +
"\tattr3 integer,\n" +
"\tattr4 integer,\n" +
"\tattr5 integer,\n" +
"\tattr6 integer,\n" +
"\tattr7 integer,\n" +
"\tattr8 integer,\n" +
"\tattr9 integer,\n" +
"\tattr10 integer,\n" +
"\tattr11 text,\n" +
"\tattr12 text,\n" +
"\tattr13 text,\n" +
"\tattr14 text,\n" +
"\tattr15 text,\n" +
"\tattr16 text,\n" +
"\tattr17 text,\n" +
"\tattr18 text,\n" +
"\tattr19 text,\n" +
"\tattr20 text\n" +
");\n");
getJdbcOperations().execute(c -> {
return c.prepareStatement("INSERT INTO users" +
"(userId, attr1, attr2, attr3, attr4, attr5, attr6, attr7, attr8, attr9, attr10, attr11, attr12, attr13, attr14, attr15, attr16, attr17, attr18, attr19, attr20) " +
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
}, (PreparedStatement stmt) -> {
for (int i = 0; i < 10000; i++) {
stmt.setInt(1, i);
stmt.setInt(2, i * 2);
stmt.setInt(3, i * 3);
stmt.setInt(4, i * 4);
stmt.setInt(5, i * 5);
stmt.setInt(6, i * 6);
stmt.setInt(7, i * 7);
stmt.setInt(8, i * 8);
stmt.setInt(9, i * 9);
stmt.setInt(10, i * 10);
stmt.setInt(11, i * 11);
stmt.setString(12, String.valueOf(i));
stmt.setString(13, String.valueOf(i * 2));
stmt.setString(14, String.valueOf(i * 3));
stmt.setString(15, String.valueOf(i * 4));
stmt.setString(16, String.valueOf(i * 5));
stmt.setString(17, String.valueOf(i * 6));
stmt.setString(18, String.valueOf(i * 7));
stmt.setString(19, String.valueOf(i * 8));
stmt.setString(20, String.valueOf(i * 9));
stmt.setString(21, String.valueOf(i * 10));
stmt.addBatch();
}
return stmt.executeBatch();
});
this.fetchProfiles(1200)
.as(StepVerifier::create)
.expectNextCount(1200)
.verifyComplete();
}
public Flux<UserProfile> fetchProfiles(int limit) {
PostgresqlConnectionFactory connectionFactory = (PostgresqlConnectionFactory) getConnectionFactory();
return Flux.usingWhen(
connectionFactory.create(),
connection -> connection.createStatement("select userId, attr1, attr2, attr3, attr4, attr5 from users limit " + limit)
.execute()
.flatMap(result -> result.map(UserProfile::new)),
Connection::close
)
.log();
}
private static class UserProfile {
private final String userId;
private final Map<String, Object> attributes = new HashMap<>();
private UserProfile(Row row, RowMetadata rowMetadata) {
for (String columnName : rowMetadata.getColumnNames()) {
Object columnValue = row.get(columnName);
if (columnValue != null) {
attributes.put(columnName, columnValue);
}
}
attributes.remove("userId");
this.userId = String.valueOf(row.get("userId", Integer.class));
}
} @Leuteris does this look like a reproducer? I can't reproduce your behaviour. |
Still nothing, test completes as expected. Can you reproduce your problem as a test in a |
@Squiry Try to consume your result stream to enable back-pressure. |
Well consuming with thread.sleep or somthing like that is not enough. We should move consuming out of event loop so that backpressure is actually on a reactive streams side. Simple delay(1ms) does the job. After that I do thread sleep for 2 ms and limit rate with 1 to make backpressure really bad. And with all that stuff I do reproduce it with 5000 elements on a |
Reproduced on master. Here's the test case: @Test
void noCompleteSignalTest() {
getJdbcOperations().execute("create table if not exists users\n" +
"(\n" +
"\tuserId integer,\n" +
"\tattr1 integer,\n" +
"\tattr2 integer,\n" +
"\tattr3 integer,\n" +
"\tattr4 integer,\n" +
"\tattr5 integer,\n" +
"\tattr6 integer,\n" +
"\tattr7 integer,\n" +
"\tattr8 integer,\n" +
"\tattr9 integer,\n" +
"\tattr10 integer,\n" +
"\tattr11 text,\n" +
"\tattr12 text,\n" +
"\tattr13 text,\n" +
"\tattr14 text,\n" +
"\tattr15 text,\n" +
"\tattr16 text,\n" +
"\tattr17 text,\n" +
"\tattr18 text,\n" +
"\tattr19 text,\n" +
"\tattr20 text\n" +
");\n");
getJdbcOperations().execute(c -> {
return c.prepareStatement("INSERT INTO users" +
"(userId, attr1, attr2, attr3, attr4, attr5, attr6, attr7, attr8, attr9, attr10, attr11, attr12, attr13, attr14, attr15, attr16, attr17, attr18, attr19, attr20) " +
"VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
}, (PreparedStatement stmt) -> {
for (int i = 0; i < 10000; i++) {
stmt.setInt(1, i);
stmt.setInt(2, i * 2);
stmt.setInt(3, i * 3);
stmt.setInt(4, i * 4);
stmt.setInt(5, i * 5);
stmt.setInt(6, i * 6);
stmt.setInt(7, i * 7);
stmt.setInt(8, i * 8);
stmt.setInt(9, i * 9);
stmt.setInt(10, i * 10);
stmt.setInt(11, i * 11);
stmt.setString(12, String.valueOf(i));
stmt.setString(13, String.valueOf(i * 2));
stmt.setString(14, String.valueOf(i * 3));
stmt.setString(15, String.valueOf(i * 4));
stmt.setString(16, String.valueOf(i * 5));
stmt.setString(17, String.valueOf(i * 6));
stmt.setString(18, String.valueOf(i * 7));
stmt.setString(19, String.valueOf(i * 8));
stmt.setString(20, String.valueOf(i * 9));
stmt.setString(21, String.valueOf(i * 10));
stmt.addBatch();
}
return stmt.executeBatch();
});
this.fetchProfiles(5000)
.delayElements(Duration.ofMillis(1))
.doOnSubscribe(l -> System.out.println("SUBSCRIBE"))
.doOnRequest(l -> System.out.println(l))
.doOnComplete(() -> System.out.println("COMPLETE"))
.limitRate(1, 1)
.map(this::processProfile)
.as(StepVerifier::create)
.expectNextCount(5000)
.verifyComplete();
}
private UserProfile processProfile(UserProfile up) {
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return up;
}
public Flux<UserProfile> fetchProfiles(int limit) {
return Flux.usingWhen(
getConnectionFactory().create(),
connection -> Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, attr4, attr5 from users limit " + limit)
.execute())
.flatMap(result -> result.map(UserProfile::new)),
Connection::close
);
}
private static class UserProfile {
private final String userId;
private final Map<String, Object> attributes = new HashMap<>();
private UserProfile(Row row, RowMetadata rowMetadata) {
for (String columnName : rowMetadata.getColumnNames()) {
Object columnValue = row.get(columnName);
if (columnValue != null) {
attributes.put(columnName, columnValue);
}
}
attributes.remove("userId");
this.userId = String.valueOf(row.get("userId", Integer.class));
}
} The problem is in the demand managing again. We can fix it by adding P.S. I have |
Thanks a lot for looking into it. I think that |
I wonder whether that would be the easier fix for now, to just omit |
Remove windowUntil(…) to avoid miscalculation of demand in long-running streams. Refine discardOnCancel operator calls to properly guard simple queries. [#401][#426] Signed-off-by: Mark Paluch <[email protected]>
The problem: when our receive buffer fills and conversation demand drops to zero and last item is `CommandComplete` we never request more frames and will never get `ReadyForQuery` Signed-off-by: Mark Paluch <[email protected]> [closes #401][#426]
Remove windowUntil(…) to avoid miscalculation of demand in long-running streams. Refine discardOnCancel operator calls to properly guard simple queries. [#401][#426] Signed-off-by: Mark Paluch <[email protected]>
[#401][#426] Signed-off-by: Mark Paluch <[email protected]>
[#401][#426] Signed-off-by: Mark Paluch <[email protected]>
[#401][#426] Signed-off-by: Mark Paluch <[email protected]>
Bug Report
Versions
Current Behavior
I have created the following method that fetches some rows from database and returns a
Flux
of objects. When the number of rows returned by the query are e.g. 1200 or 15.000.000,Flux
never emits theonComplete
signal. Here's a sample code:When adding a
.log()
call to the chain I see 256request(1)
before the stream stops. I would expect to see theonComplete
signal:Logs
If I change the limit to 1201 instead of 1200,
Flux
emits theonComplete
signal as expected:Logs
Table schema
Input Code
Steps to reproduce
Input Code
The text was updated successfully, but these errors were encountered: