Skip to content

Commit 75960fb

Browse files
committed
Polishing.
Remove windowUntil(…) to avoid miscalculation of demand in long-running streams. Refine discardOnCancel operator calls to properly guard simple queries. [#401][#426] Signed-off-by: Mark Paluch <[email protected]>
1 parent 312c878 commit 75960fb

8 files changed

+94
-23
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,7 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
360360
private <T> Publisher<T> exchange(String sql) {
361361
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
362362
return (Publisher<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
363-
.handle(exceptionFactory::handleErrorResponse)
364-
.as(Operators::discardOnCancel);
363+
.handle(exceptionFactory::handleErrorResponse);
365364
}
366365

367366
/**

src/main/java/io/r2dbc/postgresql/PostgresqlResult.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,20 @@ public Mono<Integer> getRowsUpdated() {
8282
sink.next(rowCount);
8383
}
8484
}
85-
}).singleOrEmpty();
85+
}).collectList().handle((list, sink) -> {
86+
87+
if (list.isEmpty()) {
88+
return;
89+
}
90+
91+
int sum = 0;
92+
93+
for (Integer integer : list) {
94+
sum += integer;
95+
}
96+
97+
sink.next(sum);
98+
});
8699
}
87100

88101
@Override

src/main/java/io/r2dbc/postgresql/SimpleQueryPostgresqlStatement.java

+3-16
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package io.r2dbc.postgresql;
1818

19-
import io.netty.util.ReferenceCountUtil;
20-
import io.netty.util.ReferenceCounted;
2119
import io.r2dbc.postgresql.api.PostgresqlStatement;
2220
import io.r2dbc.postgresql.client.Binding;
2321
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
@@ -27,7 +25,6 @@
2725
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2826
import io.r2dbc.postgresql.util.Assert;
2927
import io.r2dbc.postgresql.util.GeneratedValuesUtils;
30-
import io.r2dbc.postgresql.util.Operators;
3128
import io.r2dbc.spi.Statement;
3229
import reactor.core.publisher.Flux;
3330
import reactor.util.annotation.Nullable;
@@ -91,11 +88,7 @@ public SimpleQueryPostgresqlStatement bindNull(int index, @Nullable Class<?> typ
9188

9289
@Override
9390
public Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute() {
94-
if (this.generatedColumns == null) {
95-
return execute(this.sql);
96-
}
97-
98-
return execute(GeneratedValuesUtils.augment(this.sql, this.generatedColumns));
91+
return execute(this.generatedColumns == null ? this.sql : GeneratedValuesUtils.augment(this.sql, this.generatedColumns));
9992
}
10093

10194
@Override
@@ -148,17 +141,11 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
148141
if (this.fetchSize != NO_LIMIT) {
149142

150143
Flux<BackendMessage> messages = ExtendedFlowDelegate.runQuery(this.resources, factory, sql, Binding.EMPTY, Collections.emptyList(), this.fetchSize);
151-
152144
return Flux.just(new PostgresqlResult(this.resources, messages, factory));
153145
}
154146

155-
return SimpleQueryMessageFlow
156-
.exchange(this.resources.getClient(), sql)
157-
.windowUntil(WINDOW_UNTIL)
158-
.map(dataRow -> PostgresqlResult.toResult(this.resources, dataRow, factory))
159-
.cast(io.r2dbc.postgresql.api.PostgresqlResult.class)
160-
.as(Operators::discardOnCancel)
161-
.doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
147+
Flux<BackendMessage> messages = SimpleQueryMessageFlow.exchange(this.resources.getClient(), sql);
148+
return Flux.just(PostgresqlResult.toResult(this.resources, messages, factory));
162149
}
163150

164151
}

src/main/java/io/r2dbc/postgresql/client/SimpleQueryMessageFlow.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616

1717
package io.r2dbc.postgresql.client;
1818

19+
import io.netty.util.ReferenceCountUtil;
20+
import io.netty.util.ReferenceCounted;
1921
import io.r2dbc.postgresql.message.backend.BackendMessage;
2022
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
2123
import io.r2dbc.postgresql.message.frontend.Query;
2224
import io.r2dbc.postgresql.util.Assert;
25+
import io.r2dbc.postgresql.util.Operators;
2326
import reactor.core.publisher.Flux;
2427
import reactor.core.publisher.Mono;
2528

@@ -43,7 +46,8 @@ public static Flux<BackendMessage> exchange(Client client, String query) {
4346
Assert.requireNonNull(client, "client must not be null");
4447
Assert.requireNonNull(query, "query must not be null");
4548

46-
return client.exchange(Mono.<FrontendMessage>just(new Query(query)).doOnSubscribe(ignore -> QueryLogger.logQuery(client.getContext(), query)));
49+
return client.exchange(Mono.<FrontendMessage>just(new Query(query)).doOnSubscribe(ignore -> QueryLogger.logQuery(client.getContext(), query))).doOnDiscard(ReferenceCounted.class,
50+
ReferenceCountUtil::release).as(Operators::discardOnCancel);
4751
}
4852

4953
}

src/test/java/io/r2dbc/postgresql/PostgresNotificationIntegrationTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.channel.Channel;
2020
import io.r2dbc.postgresql.api.Notification;
2121
import io.r2dbc.postgresql.api.PostgresqlConnection;
22+
import io.r2dbc.postgresql.api.PostgresqlResult;
2223
import io.r2dbc.postgresql.util.ConnectionIntrospector;
2324
import io.r2dbc.spi.R2dbcNonTransientResourceException;
2425
import org.junit.jupiter.api.Test;
@@ -44,13 +45,13 @@ void shouldReceivePubSubNotifications() throws Exception {
4445

4546
CountDownLatch await = new CountDownLatch(1);
4647
Disposable listener = this.connectionFactory.create().flatMapMany(it -> {
47-
return it.createStatement("LISTEN mymessage").execute().doOnComplete(await::countDown)
48+
return it.createStatement("LISTEN mymessage").execute().flatMap(PostgresqlResult::getRowsUpdated).doOnComplete(await::countDown)
4849
.thenMany(it.getNotifications()).doOnCancel(() -> it.close().subscribe());
4950
}).doOnNext(notifications::add).subscribe();
5051

5152
await.await(10, TimeUnit.SECONDS);
5253

53-
this.connectionFactory.create().flatMapMany(it -> it.createStatement("NOTIFY mymessage, 'Mötorhead'").execute().thenMany(it.close()))
54+
this.connectionFactory.create().flatMapMany(it -> it.createStatement("NOTIFY mymessage, 'Mötorhead'").execute().flatMap(PostgresqlResult::getRowsUpdated).thenMany(it.close()))
5455
.as(StepVerifier::create).verifyComplete();
5556

5657
Notification notification = notifications.poll(10, TimeUnit.SECONDS);

src/test/java/io/r2dbc/postgresql/PostgresqlBatchUnitTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ void add() {
4747
.add("test-query-2")
4848
.execute()
4949
.as(StepVerifier::create)
50-
.expectNextCount(2)
50+
.expectNextCount(1)
5151
.verifyComplete();
5252
}
5353

src/test/java/io/r2dbc/postgresql/PostgresqlTestKit.java

+7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import io.r2dbc.spi.ConnectionFactory;
2222
import io.r2dbc.spi.ConnectionFactoryOptions;
2323
import io.r2dbc.spi.test.TestKit;
24+
import org.junit.jupiter.api.Disabled;
25+
import org.junit.jupiter.api.Test;
2426
import org.junit.jupiter.api.extension.RegisterExtension;
2527
import org.springframework.jdbc.core.JdbcOperations;
2628

@@ -95,4 +97,9 @@ public String getPlaceholder(int index) {
9597
return String.format("$%d", index + 1);
9698
}
9799

100+
@Test
101+
@Disabled("Disabled until we get a fixed windowUntil(…) or we have a better way to split result streams into multiple Result objects. https://github.com/pgjdbc/r2dbc-postgresql/issues/401")
102+
public void compoundStatement() {
103+
}
104+
98105
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import org.junit.jupiter.api.AfterEach;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import org.springframework.jdbc.core.JdbcOperations;
23+
import reactor.test.StepVerifier;
24+
import reactor.util.function.Tuples;
25+
26+
/**
27+
* Integration tests for {@link SimpleQueryPostgresqlStatement}.
28+
*/
29+
class SimpleQueryPostgresqlStatementIntegrationTests extends AbstractIntegrationTests {
30+
31+
@BeforeEach
32+
void setUp() {
33+
super.setUp();
34+
getJdbcOperations().execute("DROP TABLE IF EXISTS test");
35+
getJdbcOperations().execute("CREATE TABLE test (val VARCHAR(255))");
36+
getJdbcOperations().execute("INSERT INTO test (val) VALUES ('a'), ('b')");
37+
}
38+
39+
@AfterEach
40+
void tearDown() {
41+
super.tearDown();
42+
getJdbcOperations().execute("DROP TABLE IF EXISTS test");
43+
}
44+
45+
private JdbcOperations getJdbcOperations() {
46+
return SERVER.getJdbcOperations();
47+
}
48+
49+
@Test
50+
void shouldRunMultipleQueries() {
51+
52+
this.connection.createStatement("SELECT 1;SELECT val FROM test")
53+
.fetchSize(0).execute()
54+
.flatMap(it -> it.map((row, rowMetadata) -> Tuples.of(row.get(0), rowMetadata.getColumnMetadata(0).getName())))
55+
.as(StepVerifier::create)
56+
.expectNext(Tuples.of(1, "?column?"), Tuples.of("a", "val"), Tuples.of("b", "val"))
57+
.verifyComplete();
58+
}
59+
60+
}

0 commit comments

Comments
 (0)