Skip to content

Commit f395893

Browse files
mp911deavinash-anand
authored andcommitted
Ensure demand propagation from onRequest(…)
We now make sure to call demandMore() regardless of the drain loop to ensure that demand is propagated upstream even if the drain conditions are not met. Previously, concurrent consumption lead to an empty buffer without demand propagation and so emission got eventually stuck. [resolves pgjdbc#292]
1 parent c29517b commit f395893

File tree

2 files changed

+37
-18
lines changed

2 files changed

+37
-18
lines changed

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,7 @@ public Flux<BackendMessage> addConversation(Predicate<BackendMessage> takeUntil,
781781

782782
public void onRequest(Conversation conversation, long n) {
783783
conversation.incrementDemand(n);
784-
784+
demandMore();
785785
tryDrainLoop();
786786
}
787787

src/test/java/io/r2dbc/postgresql/client/ReactorNettyClientIntegrationTests.java

+36-17
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,14 @@
4545
import org.junit.jupiter.api.Nested;
4646
import org.junit.jupiter.api.Test;
4747
import org.junit.jupiter.api.TestInstance;
48+
import org.junit.jupiter.api.Timeout;
4849
import org.junit.jupiter.api.extension.RegisterExtension;
4950
import org.springframework.util.ReflectionUtils;
5051
import reactor.core.publisher.EmitterProcessor;
5152
import reactor.core.publisher.Flux;
5253
import reactor.core.publisher.Mono;
5354
import reactor.core.publisher.UnicastProcessor;
55+
import reactor.core.scheduler.Schedulers;
5456
import reactor.netty.Connection;
5557
import reactor.test.StepVerifier;
5658

@@ -75,6 +77,9 @@
7577
import static org.assertj.core.api.Assertions.fail;
7678
import static org.assertj.core.api.Assumptions.assumeThat;
7779

80+
/**
81+
* Integration tests for {@link ReactorNettyClient}.
82+
*/
7883
final class ReactorNettyClientIntegrationTests {
7984

8085
@RegisterExtension
@@ -92,6 +97,37 @@ final class ReactorNettyClientIntegrationTests {
9297
Collections.emptyMap()))
9398
.block();
9499

100+
@BeforeEach
101+
void before() {
102+
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
103+
SERVER.getJdbcOperations().execute("CREATE TABLE test ( value INTEGER )");
104+
}
105+
106+
@AfterEach
107+
void after() {
108+
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
109+
this.client.close()
110+
.block();
111+
}
112+
113+
114+
@Test
115+
@Timeout(50)
116+
void concurrentConsumptionShouldComplete() {
117+
118+
IntStream.range(0, 1000)
119+
.forEach(i -> SERVER.getJdbcOperations().update("INSERT INTO test VALUES(?),(?),(?),(?),(?),(?),(?),(?),(?),(?)", i, i, i, i, i, i, i, i, i, i));
120+
121+
this.client
122+
.exchange(Mono.just(new Query("SELECT value FROM test")))
123+
.limitRate(1)
124+
.publishOn(Schedulers.elastic())
125+
.doOnNext(ReferenceCountUtil::release)
126+
.as(StepVerifier::create)
127+
.thenConsumeWhile((t) -> true)
128+
.verifyComplete();
129+
}
130+
95131
@Test
96132
void close() {
97133
this.client.close()
@@ -158,30 +194,13 @@ void shouldCancelExchangeOnCloseInFlight() throws Exception {
158194
}
159195
}
160196

161-
@AfterEach
162-
void closeClient() {
163-
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
164-
this.client.close()
165-
.block();
166-
}
167197

168198
@Test
169199
void constructorNoHost() {
170200
assertThatIllegalArgumentException().isThrownBy(() -> ReactorNettyClient.connect(null, SERVER.getPort()))
171201
.withMessage("host must not be null");
172202
}
173203

174-
@BeforeEach
175-
void createTable() {
176-
dropTable();
177-
SERVER.getJdbcOperations().execute("CREATE TABLE test ( value INTEGER )");
178-
}
179-
180-
@AfterEach
181-
void dropTable() {
182-
SERVER.getJdbcOperations().execute("DROP TABLE IF EXISTS test");
183-
}
184-
185204
@Test
186205
void exchange() {
187206
SERVER.getJdbcOperations().execute("INSERT INTO test VALUES (100)");

0 commit comments

Comments
 (0)