Skip to content

Commit 2a5dd34

Browse files
committed
Upgrade to Reactor 2022.0.0-M1
[resolves #503] Signed-off-by: Mark Paluch <[email protected]>
1 parent b812b9d commit 2a5dd34

File tree

6 files changed

+41
-36
lines changed

6 files changed

+41
-36
lines changed

pom.xml

+6-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4949
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5050
<r2dbc-spi.version>1.0.0.BUILD-SNAPSHOT</r2dbc-spi.version>
51-
<reactor.version>2020.0.17</reactor.version>
51+
<reactor.version>2022.0.0-M1</reactor.version>
5252
<scram-client.version>2.1</scram-client.version>
5353
<spring-framework.version>5.3.16</spring-framework.version>
5454
<testcontainers.version>1.16.3</testcontainers.version>
@@ -648,6 +648,11 @@
648648
<enabled>true</enabled>
649649
</snapshots>
650650
</repository>
651+
652+
<repository>
653+
<id>spring-libs-milestone</id>
654+
<url>https://repo.spring.io/libs-milestone</url>
655+
</repository>
651656
</repositories>
652657

653658
</project>

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@
3838
import io.r2dbc.spi.TransactionDefinition;
3939
import io.r2dbc.spi.ValidationDepth;
4040
import org.reactivestreams.Publisher;
41-
import org.reactivestreams.Subscriber;
4241
import org.reactivestreams.Subscription;
4342
import reactor.core.CoreSubscriber;
4443
import reactor.core.Disposable;
44+
import reactor.core.publisher.BaseSubscriber;
4545
import reactor.core.publisher.Flux;
4646
import reactor.core.publisher.Mono;
4747
import reactor.core.publisher.Sinks;
@@ -474,28 +474,31 @@ void dispose() {
474474

475475
void register(Client client) {
476476

477-
this.subscription = client.addNotificationListener(new Subscriber<NotificationResponse>() {
477+
BaseSubscriber<NotificationResponse> subscriber = new BaseSubscriber<NotificationResponse>() {
478478

479479
@Override
480-
public void onSubscribe(Subscription subscription) {
480+
protected void hookOnSubscribe(Subscription subscription) {
481481
subscription.request(Long.MAX_VALUE);
482482
}
483483

484484
@Override
485-
public void onNext(NotificationResponse notificationResponse) {
485+
public void hookOnNext(NotificationResponse notificationResponse) {
486486
NotificationAdapter.this.sink.emitNext(new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
487487
}
488488

489489
@Override
490-
public void onError(Throwable throwable) {
490+
public void hookOnError(Throwable throwable) {
491491
NotificationAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
492492
}
493493

494494
@Override
495-
public void onComplete() {
495+
public void hookOnComplete() {
496496
NotificationAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
497497
}
498-
});
498+
};
499+
500+
this.subscription = subscriber;
501+
client.addNotificationListener(subscriber);
499502
}
500503

501504
Flux<Notification> getEvents() {

src/main/java/io/r2dbc/postgresql/client/Client.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public interface Client {
5757
* @throws IllegalArgumentException if {@code consumer} is {@code null}
5858
* @since 0.8.1
5959
*/
60-
Disposable addNotificationListener(Subscriber<NotificationResponse> consumer);
60+
void addNotificationListener(Subscriber<NotificationResponse> consumer);
6161

6262
/**
6363
* Release any resources held by the {@link Client}.

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
137137
Assert.requireNonNull(connection, "Connection must not be null");
138138
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null");
139139

140-
connection.addHandler(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
141-
connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
140+
connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
141+
connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
142142
this.connection = connection;
143143
this.byteBufAllocator = connection.outbound().alloc();
144144
this.context = new ConnectionContext().withChannelId(connection.channel().toString());
@@ -422,8 +422,8 @@ public Disposable addNotificationListener(Consumer<NotificationResponse> consume
422422
}
423423

424424
@Override
425-
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
426-
return this.notificationProcessor.asFlux().subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
425+
public void addNotificationListener(Subscriber<NotificationResponse> consumer) {
426+
this.notificationProcessor.asFlux().subscribe(consumer);
427427
}
428428

429429
@Override
@@ -816,7 +816,7 @@ public void onComplete() {
816816
@Override
817817
public Context currentContext() {
818818
Conversation receiver = this.conversations.peek();
819-
return receiver != null ? receiver.sink.currentContext() : Context.empty();
819+
return receiver != null ? Context.of(receiver.sink.currentContext()) : Context.empty();
820820
}
821821

822822
private void tryDrainLoop() {

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryUnitTests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ void createAuthenticationMD5Password() {
6666
// @formatter:off
6767
Client client = TestClient.builder()
6868
.window()
69-
.expectRequest(new StartupMessage("test-application-name", "test-database", "test-username", Collections.emptyMap())).thenRespond(new AuthenticationMD5Password(TEST.buffer(4).writeInt(100)))
70-
.expectRequest(new PasswordMessage("md55e9836cdb369d50e3bc7d127e88b4804")).thenRespond(AuthenticationOk.INSTANCE)
69+
.expectRequest(new StartupMessage("test-application-name", "test-database", "test-username", Collections.emptyMap()))
70+
.thenRespond(new AuthenticationMD5Password(TEST.buffer(4).writeInt(100)))
71+
.expectRequest(new PasswordMessage("md55e9836cdb369d50e3bc7d127e88b4804"))
72+
.thenRespond(AuthenticationOk.INSTANCE)
7173
.done()
7274
.build();
7375
// @formatter:on

src/test/java/io/r2dbc/postgresql/client/TestClient.java

+15-20
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.reactivestreams.Subscriber;
2727
import reactor.core.Disposable;
2828
import reactor.core.publisher.Flux;
29-
import reactor.core.publisher.FluxSink;
3029
import reactor.core.publisher.Mono;
30+
import reactor.core.publisher.Sinks;
3131
import reactor.util.annotation.Nullable;
3232

3333
import java.util.ArrayList;
@@ -42,7 +42,6 @@
4242
/**
4343
* Test {@link Client} implementation that allows specification of expectations and assertions.
4444
*/
45-
@SuppressWarnings("deprecation")
4645
public final class TestClient implements Client {
4746

4847
public static final TestClient NO_OP = new TestClient(false, true, null, null, Flux.empty(), IDLE, new Version("9.4"));
@@ -51,15 +50,13 @@ public final class TestClient implements Client {
5150

5251
private final boolean connected;
5352

54-
private final reactor.core.publisher.EmitterProcessor<NotificationResponse> notificationProcessor = reactor.core.publisher.EmitterProcessor.create(false);
53+
private final Sinks.Many<NotificationResponse> notificationProcessor = Sinks.many().multicast().onBackpressureBuffer();
5554

5655
private final Integer processId;
5756

58-
private final reactor.core.publisher.EmitterProcessor<FrontendMessage> requestProcessor = reactor.core.publisher.EmitterProcessor.create(false);
57+
private final Sinks.Many<FrontendMessage> requestProcessor = Sinks.many().multicast().onBackpressureBuffer();
5958

60-
private final FluxSink<FrontendMessage> requests = this.requestProcessor.sink();
61-
62-
private final reactor.core.publisher.EmitterProcessor<Flux<BackendMessage>> responseProcessor = reactor.core.publisher.EmitterProcessor.create(false);
59+
private final Sinks.Many<Flux<BackendMessage>> responseProcessor = Sinks.many().replay().all();
6360

6461
private final Integer secretKey;
6562

@@ -75,14 +72,12 @@ private TestClient(boolean expectClose, boolean connected, @Nullable Integer pro
7572
this.transactionStatus = Assert.requireNonNull(transactionStatus, "transactionStatus must not be null");
7673
this.version = version;
7774

78-
FluxSink<Flux<BackendMessage>> responses = this.responseProcessor.sink();
79-
8075
Assert.requireNonNull(windows, "windows must not be null")
8176
.map(window -> window.exchanges)
8277
.map(exchanges -> exchanges
8378
.concatMap(exchange ->
8479

85-
this.requestProcessor.zipWith(exchange.requests)
80+
this.requestProcessor.asFlux().zipWith(exchange.requests)
8681
.handle((tuple, sink) -> {
8782
FrontendMessage actual = tuple.getT1();
8883
FrontendMessage expected = tuple.getT2();
@@ -92,7 +87,7 @@ private TestClient(boolean expectClose, boolean connected, @Nullable Integer pro
9287
}
9388
})
9489
.thenMany(exchange.responses)))
95-
.subscribe(responses::next, responses::error, responses::complete);
90+
.subscribe(this.responseProcessor::tryEmitNext, this.responseProcessor::tryEmitError, this.responseProcessor::tryEmitComplete);
9691
}
9792

9893
public static Builder builder() {
@@ -108,20 +103,20 @@ public Mono<Void> close() {
108103
public Flux<BackendMessage> exchange(Publisher<FrontendMessage> requests) {
109104
Assert.requireNonNull(requests, "requests must not be null");
110105

111-
return this.responseProcessor
106+
return this.responseProcessor.asFlux()
112107
.doOnSubscribe(s ->
113108
Flux.from(requests)
114-
.subscribe(this.requests::next, this.requests::error))
109+
.subscribe(this.requestProcessor::tryEmitNext, this.requestProcessor::tryEmitError))
115110
.next()
116111
.flatMapMany(Function.identity());
117112
}
118113

119114
@Override
120115
public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publisher<FrontendMessage> requests) {
121-
return this.responseProcessor
116+
return this.responseProcessor.asFlux()
122117
.doOnSubscribe(s ->
123118
Flux.from(requests)
124-
.subscribe(this.requests::next, this.requests::error))
119+
.subscribe(this.requestProcessor::tryEmitNext, this.requestProcessor::tryEmitError))
125120
.next()
126121
.flatMapMany(Function.identity())
127122
.takeWhile(takeUntil.negate());
@@ -169,21 +164,21 @@ public Mono<Void> cancelRequest() {
169164

170165
@Override
171166
public void send(FrontendMessage message) {
172-
this.requests.next(message);
167+
this.requestProcessor.tryEmitNext(message);
173168
}
174169

175170
@Override
176171
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
177-
return this.notificationProcessor.subscribe(consumer);
172+
return this.notificationProcessor.asFlux().subscribe(consumer);
178173
}
179174

180175
@Override
181-
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
182-
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
176+
public void addNotificationListener(Subscriber<NotificationResponse> consumer) {
177+
this.notificationProcessor.asFlux().subscribe(consumer);
183178
}
184179

185180
public void notify(NotificationResponse notification) {
186-
this.notificationProcessor.onNext(notification);
181+
this.notificationProcessor.tryEmitNext(notification);
187182
}
188183

189184
public static final class Builder {

0 commit comments

Comments
 (0)