Skip to content

Commit 3a04952

Browse files
committed
Replace EmitterProcessor with Sinks API.
[resolves #445] Signed-off-by: Mark Paluch <[email protected]>
1 parent 835178b commit 3a04952

File tree

5 files changed

+68
-68
lines changed

5 files changed

+68
-68
lines changed

src/main/java/io/r2dbc/postgresql/DefaultPostgresqlReplicationConnection.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.r2dbc.postgresql.util.Assert;
3333
import io.r2dbc.spi.Row;
3434
import reactor.core.publisher.Mono;
35+
import reactor.core.publisher.Sinks;
3536

3637
import java.util.function.Predicate;
3738

@@ -78,7 +79,6 @@ private static ReplicationSlot getReplicationSlot(ReplicationSlotRequest request
7879
row.get("output_plugin", String.class));
7980
}
8081

81-
@SuppressWarnings("deprecation")
8282
@Override
8383
public Mono<ReplicationStream> startReplication(ReplicationRequest request) {
8484

@@ -87,14 +87,16 @@ public Mono<ReplicationStream> startReplication(ReplicationRequest request) {
8787
String sql = request.asSQL();
8888
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
8989

90-
reactor.core.publisher.EmitterProcessor<FrontendMessage> requestProcessor = reactor.core.publisher.EmitterProcessor.create();
90+
Sinks.Many<FrontendMessage> requestSink = Sinks.many().unicast().onBackpressureBuffer();
9191

92-
return Mono.fromDirect(this.client.exchange(requestProcessor.startWith(new Query(sql)))
92+
return Mono.fromDirect(this.client.exchange(requestSink.asFlux())
9393
.handle(exceptionFactory::handleErrorResponse)
9494
.windowUntil(WINDOW_UNTIL)
9595
.map(messages -> {
96-
return new PostgresReplicationStream(this.client.getByteBufAllocator(), request, requestProcessor, messages);
97-
}));
96+
return (ReplicationStream) new PostgresReplicationStream(this.client.getByteBufAllocator(), request, requestSink, messages);
97+
})).doOnSubscribe(it -> {
98+
requestSink.emitNext(new Query(sql), Sinks.EmitFailureHandler.FAIL_FAST);
99+
});
98100
}
99101

100102
@Override

src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java

+17-12
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,17 @@
2727
import io.r2dbc.postgresql.util.Assert;
2828
import io.r2dbc.postgresql.util.GeneratedValuesUtils;
2929
import io.r2dbc.spi.Statement;
30-
import reactor.core.publisher.EmitterProcessor;
3130
import reactor.core.publisher.Flux;
3231
import reactor.core.publisher.Mono;
32+
import reactor.core.publisher.Sinks;
3333

3434
import java.util.ArrayList;
3535
import java.util.Arrays;
3636
import java.util.HashSet;
3737
import java.util.Iterator;
3838
import java.util.List;
3939
import java.util.Set;
40+
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.regex.Matcher;
4142

4243
import static io.r2dbc.postgresql.client.ExtendedQueryMessageFlow.PARAMETER_SYMBOL;
@@ -195,37 +196,39 @@ private Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute(String sql) {
195196
}
196197

197198
Iterator<Binding> iterator = this.bindings.bindings.iterator();
198-
EmitterProcessor<Binding> bindingEmitter = EmitterProcessor.create(true);
199-
return bindingEmitter.startWith(iterator.next())
199+
Sinks.Many<Binding> bindings = Sinks.many().unicast().onBackpressureBuffer();
200+
AtomicBoolean canceled = new AtomicBoolean();
201+
return bindings.asFlux()
200202
.map(it -> {
201203

202204
Flux<BackendMessage> messages =
203205
collectBindingParameters(it).flatMapMany(values -> ExtendedFlowDelegate.runQuery(this.resources, factory, sql, it, values, this.fetchSize)).doOnComplete(() -> {
204-
tryNextBinding(iterator, bindingEmitter);
206+
tryNextBinding(iterator, bindings, canceled);
205207
});
206208

207209
return PostgresqlResult.toResult(this.resources, messages, factory);
208210
})
209-
.doOnCancel(() -> clearBindings(iterator))
210-
.doOnError(e -> clearBindings(iterator));
211+
.doOnCancel(() -> clearBindings(iterator, canceled))
212+
.doOnError(e -> clearBindings(iterator, canceled))
213+
.doOnSubscribe(it -> bindings.emitNext(iterator.next(), Sinks.EmitFailureHandler.FAIL_FAST));
211214

212215
}).cast(io.r2dbc.postgresql.api.PostgresqlResult.class);
213216
}
214217

215-
private static void tryNextBinding(Iterator<Binding> iterator, EmitterProcessor<Binding> boundRequests) {
218+
private static void tryNextBinding(Iterator<Binding> iterator, Sinks.Many<Binding> bindingSink, AtomicBoolean canceled) {
216219

217-
if (boundRequests.isCancelled()) {
220+
if (canceled.get()) {
218221
return;
219222
}
220223

221224
try {
222225
if (iterator.hasNext()) {
223-
boundRequests.onNext(iterator.next());
226+
bindingSink.emitNext(iterator.next(), Sinks.EmitFailureHandler.FAIL_FAST);
224227
} else {
225-
boundRequests.onComplete();
228+
bindingSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
226229
}
227230
} catch (Exception e) {
228-
boundRequests.onError(e);
231+
bindingSink.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST);
229232
}
230233
}
231234

@@ -253,7 +256,9 @@ private int getIndex(String identifier) {
253256
return Integer.parseInt(matcher.group(1)) - 1;
254257
}
255258

256-
private void clearBindings(Iterator<Binding> iterator) {
259+
private void clearBindings(Iterator<Binding> iterator, AtomicBoolean canceled) {
260+
261+
canceled.set(true);
257262

258263
while (iterator.hasNext()) {
259264
// exhaust iterator, ignore returned elements

src/main/java/io/r2dbc/postgresql/PostgresReplicationStream.java

+18-14
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import reactor.core.Disposable;
3333
import reactor.core.publisher.Flux;
3434
import reactor.core.publisher.Mono;
35+
import reactor.core.publisher.Sinks;
3536
import reactor.core.scheduler.Scheduler;
3637
import reactor.core.scheduler.Schedulers;
3738

@@ -41,7 +42,6 @@
4142
import java.util.concurrent.atomic.AtomicReference;
4243
import java.util.function.Function;
4344

44-
@SuppressWarnings("deprecation")
4545
final class PostgresReplicationStream implements ReplicationStream {
4646

4747
public static final long POSTGRES_EPOCH_2000_01_01 = 946684800000L;
@@ -50,9 +50,9 @@ final class PostgresReplicationStream implements ReplicationStream {
5050

5151
private static final char X_LOG_DATA = 'w';
5252

53-
private final reactor.core.publisher.EmitterProcessor<CopyData> responseProcessor = reactor.core.publisher.EmitterProcessor.create(false);
53+
private final Sinks.Many<FrontendMessage> requestSink;
5454

55-
private final reactor.core.publisher.EmitterProcessor<FrontendMessage> requestProcessor;
55+
private final Flux<CopyData> stream;
5656

5757
private final AtomicReference<Disposable> subscription = new AtomicReference<>();
5858

@@ -70,13 +70,13 @@ final class PostgresReplicationStream implements ReplicationStream {
7070

7171
private volatile LogSequenceNumber lastFlushedLSN = LogSequenceNumber.INVALID_LSN;
7272

73-
PostgresReplicationStream(ByteBufAllocator allocator, ReplicationRequest replicationRequest, reactor.core.publisher.EmitterProcessor<FrontendMessage> requestProcessor,
73+
PostgresReplicationStream(ByteBufAllocator allocator, ReplicationRequest replicationRequest, Sinks.Many<FrontendMessage> requestSink,
7474
Flux<BackendMessage> messages) {
7575
this.allocator = allocator;
7676
this.replicationRequest = replicationRequest;
77-
this.requestProcessor = requestProcessor;
77+
this.requestSink = requestSink;
7878

79-
Flux<CopyData> stream = messages
79+
this.stream = messages
8080
.takeUntil(ReadyForQuery.class::isInstance)
8181
.doOnError(throwable -> {
8282
close().subscribe();
@@ -85,8 +85,12 @@ final class PostgresReplicationStream implements ReplicationStream {
8585
.doOnComplete(() -> {
8686
this.closeFuture.complete(null);
8787
})
88+
.doOnCancel(() -> {
89+
Disposable scheduler = Schedulers.parallel().schedule(() -> this.closeFuture.complete(null), 1, TimeUnit.MINUTES);
90+
this.closeFuture.whenComplete(((unused, throwable) -> scheduler.dispose()));
91+
})
8892
.ofType(CopyData.class)
89-
.handle((message, sink) -> {
93+
.<CopyData>handle((message, sink) -> {
9094

9195
try {
9296

@@ -112,9 +116,8 @@ final class PostgresReplicationStream implements ReplicationStream {
112116
} finally {
113117
ReferenceCountUtil.release(message);
114118
}
115-
});
119+
}).share();
116120

117-
stream.subscribeWith(this.responseProcessor);
118121
Disposable disposable = () -> {
119122
};
120123

@@ -164,7 +167,7 @@ private void processXLogData(ByteBuf buffer) {
164167
private void sendStatusUpdate() {
165168
ByteBuf byteBuf = prepareUpdateStatus(this.lastReceiveLSN, this.lastFlushedLSN, this.lastAppliedLSN, false);
166169
io.r2dbc.postgresql.message.frontend.CopyData copyData = new io.r2dbc.postgresql.message.frontend.CopyData(byteBuf);
167-
this.requestProcessor.onNext(copyData);
170+
this.requestSink.emitNext(copyData, Sinks.EmitFailureHandler.FAIL_FAST);
168171
}
169172

170173
private ByteBuf prepareUpdateStatus(LogSequenceNumber received, LogSequenceNumber flushed,
@@ -181,13 +184,14 @@ private ByteBuf prepareUpdateStatus(LogSequenceNumber received, LogSequenceNumbe
181184
public Mono<Void> close() {
182185

183186
Disposable disposable = this.subscription.get();
187+
184188
if (disposable != null && this.subscription.compareAndSet(disposable, null)) {
185189
disposable.dispose();
186190

187-
this.requestProcessor.onNext(CopyDone.INSTANCE);
188-
this.requestProcessor.onComplete();
191+
this.requestSink.emitNext(CopyDone.INSTANCE, Sinks.EmitFailureHandler.FAIL_FAST);
192+
this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
189193

190-
return this.responseProcessor.ignoreElements().then(Mono.fromCompletionStage(this.closeFuture));
194+
return this.stream.ignoreElements().then(Mono.fromCompletionStage(this.closeFuture));
191195
}
192196

193197
return Mono.fromCompletionStage(this.closeFuture);
@@ -201,7 +205,7 @@ public boolean isClosed() {
201205
@Override
202206
public <T> Flux<T> map(Function<ByteBuf, ? extends T> mappingFunction) {
203207
Assert.requireNonNull(mappingFunction, "mappingFunction must not be null");
204-
return this.responseProcessor.map(data -> {
208+
return this.stream.map(data -> {
205209

206210
try {
207211
return mappingFunction.apply(data.getData());

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+19-28
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import reactor.core.publisher.FluxSink;
5454
import reactor.core.publisher.Mono;
5555
import reactor.core.publisher.Operators;
56+
import reactor.core.publisher.Sinks;
5657
import reactor.core.scheduler.Schedulers;
5758
import reactor.netty.Connection;
5859
import reactor.netty.resources.LoopResources;
@@ -88,7 +89,6 @@
8889
*
8990
* @see TcpClient
9091
*/
91-
@SuppressWarnings("deprecation")
9292
public final class ReactorNettyClient implements Client {
9393

9494
private static final Logger logger = Loggers.getLogger(ReactorNettyClient.class);
@@ -107,11 +107,9 @@ public final class ReactorNettyClient implements Client {
107107

108108
private ConnectionContext context;
109109

110-
private final reactor.core.publisher.EmitterProcessor<Publisher<FrontendMessage>> requestProcessor = reactor.core.publisher.EmitterProcessor.create(false);
110+
private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();
111111

112-
private final FluxSink<Publisher<FrontendMessage>> requests = this.requestProcessor.sink();
113-
114-
private final reactor.core.publisher.DirectProcessor<NotificationResponse> notificationProcessor = reactor.core.publisher.DirectProcessor.create();
112+
private final Sinks.Many<NotificationResponse> notificationProcessor = Sinks.many().multicast().directBestEffort();
115113

116114
private final AtomicBoolean isClosed = new AtomicBoolean(false);
117115

@@ -143,7 +141,7 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
143141
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null");
144142

145143
connection.addHandler(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
146-
connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestProcessor));
144+
connection.addHandler(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
147145
this.connection = connection;
148146
this.byteBufAllocator = connection.outbound().alloc();
149147
this.context = new ConnectionContext().withChannelId(connection.channel().toString());
@@ -166,7 +164,7 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
166164
})
167165
.subscribe(this.messageSubscriber);
168166

169-
Mono<Void> request = this.requestProcessor
167+
Mono<Void> request = this.requestSink.asFlux()
170168
.concatMap(Function.identity())
171169
.flatMap(message -> {
172170
if (DEBUG_ENABLED) {
@@ -186,9 +184,7 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
186184
public Mono<Void> close() {
187185
return Mono.defer(() -> {
188186

189-
if (!this.notificationProcessor.isTerminated()) {
190-
this.notificationProcessor.onComplete();
191-
}
187+
this.notificationProcessor.tryEmitComplete();
192188

193189
drainError(EXPECTED);
194190

@@ -221,20 +217,20 @@ public Flux<BackendMessage> exchange(Predicate<BackendMessage> takeUntil, Publis
221217
Assert.requireNonNull(takeUntil, "takeUntil must not be null");
222218
Assert.requireNonNull(requests, "requests must not be null");
223219

224-
return this.messageSubscriber.addConversation(takeUntil, requests, this.requests::next, this::isConnected);
220+
return this.messageSubscriber.addConversation(takeUntil, requests, it -> this.requestSink.emitNext(it, Sinks.EmitFailureHandler.FAIL_FAST), this::isConnected);
225221
}
226222

227223
@Override
228224
public void send(FrontendMessage message) {
229225
Assert.requireNonNull(message, "requests must not be null");
230226

231-
this.requests.next(Mono.just(message));
227+
this.requestSink.emitNext(Mono.just(message), Sinks.EmitFailureHandler.FAIL_FAST);
232228
}
233229

234230
private Mono<Void> resumeError(Throwable throwable) {
235231

236232
handleConnectionError(throwable);
237-
this.requestProcessor.onComplete();
233+
this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
238234

239235
if (isSslException(throwable)) {
240236
logger.debug(this.context.getMessage("Connection Error"), throwable);
@@ -291,7 +287,7 @@ private boolean consumeMessage(BackendMessage message) {
291287
}
292288

293289
if (message.getClass() == NotificationResponse.class) {
294-
this.notificationProcessor.onNext((NotificationResponse) message);
290+
this.notificationProcessor.tryEmitNext((NotificationResponse) message);
295291
return true;
296292
}
297293

@@ -423,12 +419,13 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
423419

424420
@Override
425421
public Disposable addNotificationListener(Consumer<NotificationResponse> consumer) {
426-
return this.notificationProcessor.subscribe(consumer);
422+
return this.notificationProcessor.asFlux().subscribe(consumer);
427423
}
428424

429425
@Override
426+
@SuppressWarnings("deprecation")
430427
public Disposable addNotificationListener(Subscriber<NotificationResponse> consumer) {
431-
return this.notificationProcessor.subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
428+
return this.notificationProcessor.asFlux().subscribe(consumer::onNext, consumer::onError, consumer::onComplete, consumer::onSubscribe);
432429
}
433430

434431
@Override
@@ -467,10 +464,6 @@ public boolean isConnected() {
467464
return false;
468465
}
469466

470-
if (this.requestProcessor.isDisposed()) {
471-
return false;
472-
}
473-
474467
Channel channel = this.connection.channel();
475468
return channel.isOpen();
476469
}
@@ -513,17 +506,15 @@ private void drainError(Supplier<? extends Throwable> supplier) {
513506

514507
this.messageSubscriber.close(supplier);
515508

516-
if (!this.notificationProcessor.isTerminated()) {
517-
this.notificationProcessor.onError(supplier.get());
518-
}
509+
this.notificationProcessor.tryEmitError(supplier.get());
519510
}
520511

521512
private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
522513

523-
private final reactor.core.publisher.EmitterProcessor<Publisher<FrontendMessage>> requestProcessor;
514+
private final Sinks.Many<?> requestSink;
524515

525-
private EnsureSubscribersCompleteChannelHandler(reactor.core.publisher.EmitterProcessor<Publisher<FrontendMessage>> requestProcessor) {
526-
this.requestProcessor = requestProcessor;
516+
private EnsureSubscribersCompleteChannelHandler(Sinks.Many<?> requestSink) {
517+
this.requestSink = requestSink;
527518
}
528519

529520
@Override
@@ -535,7 +526,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
535526
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
536527
super.channelUnregistered(ctx);
537528

538-
this.requestProcessor.onComplete();
529+
this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
539530
handleClose();
540531
}
541532

@@ -870,7 +861,7 @@ public void onError(Throwable throwable) {
870861
}
871862

872863
handleConnectionError(throwable);
873-
ReactorNettyClient.this.requestProcessor.onComplete();
864+
ReactorNettyClient.this.requestSink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
874865
this.terminated = true;
875866

876867
if (isSslException(throwable)) {

0 commit comments

Comments
 (0)