Skip to content

Commit e31038f

Browse files
committed
Merge 1.1.x into master
Signed-off-by: Oleh Dokuka <[email protected]>
2 parents 84af160 + 000f6da commit e31038f

File tree

15 files changed

+406
-38
lines changed

15 files changed

+406
-38
lines changed

benchmarks/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ dependencies {
1414
compileOnly "io.rsocket:rsocket-transport-local:${perfBaselineVersion}"
1515
compileOnly "io.rsocket:rsocket-transport-netty:${perfBaselineVersion}"
1616

17-
implementation "org.openjdk.jmh:jmh-core:1.21"
18-
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:1.21"
17+
implementation "org.openjdk.jmh:jmh-core:1.35"
18+
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:1.35"
1919

2020
current project(':rsocket-core')
2121
current project(':rsocket-transport-local')

rsocket-core/src/jcstress/java/io/rsocket/core/ReconnectMonoStressTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ public void arbiter(IIIIII_Result r) {
517517
id = {"1, 0, 1, 0, 1, 2"},
518518
expect = ACCEPTABLE)
519519
@State
520-
public static class SubscribeBlockRace extends BaseStressTest {
520+
public static class SubscribeBlockConnectRace extends BaseStressTest {
521521

522522
String receivedValue;
523523

@@ -543,6 +543,11 @@ void subscribe() {
543543
reconnectMono.subscribe(stressSubscriber);
544544
}
545545

546+
@Actor
547+
void connect() {
548+
reconnectMono.resolvingInner.connect();
549+
}
550+
546551
@Arbiter
547552
public void arbiter(IIIIII_Result r) {
548553
r.r1 = stressSubscription.subscribes;

rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import reactor.core.publisher.Mono;
3636
import reactor.core.publisher.MonoOperator;
3737
import reactor.core.publisher.Operators;
38+
import reactor.core.publisher.Sinks;
3839
import reactor.util.annotation.Nullable;
3940
import reactor.util.context.Context;
4041

@@ -65,19 +66,27 @@ class DefaultRSocketClient extends ResolvingOperator<RSocket>
6566

6667
final Mono<RSocket> source;
6768

69+
final Sinks.Empty<Void> onDisposeSink;
70+
6871
volatile Subscription s;
6972

7073
static final AtomicReferenceFieldUpdater<DefaultRSocketClient, Subscription> S =
7174
AtomicReferenceFieldUpdater.newUpdater(DefaultRSocketClient.class, Subscription.class, "s");
7275

7376
DefaultRSocketClient(Mono<RSocket> source) {
7477
this.source = unwrapReconnectMono(source);
78+
this.onDisposeSink = Sinks.empty();
7579
}
7680

7781
private Mono<RSocket> unwrapReconnectMono(Mono<RSocket> source) {
7882
return source instanceof ReconnectMono ? ((ReconnectMono<RSocket>) source).getSource() : source;
7983
}
8084

85+
@Override
86+
public Mono<Void> onClose() {
87+
return this.onDisposeSink.asMono();
88+
}
89+
8190
@Override
8291
public Mono<RSocket> source() {
8392
return Mono.fromDirect(this);
@@ -194,6 +203,12 @@ protected void doOnValueExpired(RSocket value) {
194203
@Override
195204
protected void doOnDispose() {
196205
Operators.terminate(S, this);
206+
final RSocket value = this.value;
207+
if (value != null) {
208+
value.onClose().subscribe(null, onDisposeSink::tryEmitError, onDisposeSink::tryEmitEmpty);
209+
} else {
210+
onDisposeSink.tryEmitEmpty();
211+
}
197212
}
198213

199214
static final class FlatMapMain<R> implements CoreSubscriber<Payload>, Context, Scannable {

rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
*/
1616
package io.rsocket.core;
1717

18+
import io.rsocket.Closeable;
1819
import io.rsocket.Payload;
1920
import io.rsocket.RSocket;
2021
import org.reactivestreams.Publisher;
21-
import reactor.core.Disposable;
2222
import reactor.core.publisher.Flux;
2323
import reactor.core.publisher.Mono;
24+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
2425

2526
/**
2627
* Contract for performing RSocket requests.
@@ -74,7 +75,22 @@
7475
* @since 1.1
7576
* @see io.rsocket.loadbalance.LoadbalanceRSocketClient
7677
*/
77-
public interface RSocketClient extends Disposable {
78+
public interface RSocketClient extends Closeable {
79+
80+
/**
81+
* Connect to the remote rsocket endpoint, if not yet connected. This method is a shortcut for
82+
* {@code RSocketClient#source().subscribe()}.
83+
*
84+
* @return {@code true} if an attempt to connect was triggered or if already connected, or {@code
85+
* false} if the client is terminated.
86+
*/
87+
default boolean connect() {
88+
throw new NotImplementedException();
89+
}
90+
91+
default Mono<Void> onClose() {
92+
return Mono.error(new NotImplementedException());
93+
}
7894

7995
/** Return the underlying source used to obtain a shared {@link RSocket} connection. */
8096
Mono<RSocket> source();

rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,21 @@ public RSocket rsocket() {
4141
return rsocket;
4242
}
4343

44+
@Override
45+
public boolean connect() {
46+
throw new UnsupportedOperationException("Connect does not apply to a server side RSocket");
47+
}
48+
4449
@Override
4550
public Mono<RSocket> source() {
4651
return Mono.just(rsocket);
4752
}
4853

54+
@Override
55+
public Mono<Void> onClose() {
56+
return rsocket.onClose();
57+
}
58+
4959
@Override
5060
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
5161
return payloadMono.flatMap(rsocket::fireAndForget);

rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,30 @@ protected void doOnDispose() {
331331
// no ops
332332
}
333333

334+
public final boolean connect() {
335+
for (; ; ) {
336+
final BiConsumer<T, Throwable>[] a = this.subscribers;
337+
338+
if (a == TERMINATED) {
339+
return false;
340+
}
341+
342+
if (a == READY) {
343+
return true;
344+
}
345+
346+
if (a != EMPTY_UNSUBSCRIBED) {
347+
// do nothing if already started
348+
return true;
349+
}
350+
351+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
352+
this.doSubscribe();
353+
return true;
354+
}
355+
}
356+
}
357+
334358
final int add(BiConsumer<T, Throwable> ps) {
335359
for (; ; ) {
336360
BiConsumer<T, Throwable>[] a = this.subscribers;

rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void sendFrame(int streamId, ByteBuf frame) {
3939
protected abstract void doOnClose();
4040

4141
@Override
42-
public final Mono<Void> onClose() {
42+
public Mono<Void> onClose() {
4343
return onClose.asMono();
4444
}
4545

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@ private LoadbalanceRSocketClient(RSocketPool rSocketPool) {
4040
this.rSocketPool = rSocketPool;
4141
}
4242

43+
@Override
44+
public Mono<Void> onClose() {
45+
return rSocketPool.onClose();
46+
}
47+
48+
@Override
49+
public boolean connect() {
50+
return rSocketPool.connect();
51+
}
52+
4353
/** Return {@code Mono} that selects an RSocket from the underlying pool. */
4454
@Override
4555
public Mono<RSocket> source() {

rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
2828
import reactor.core.publisher.Operators;
29+
import reactor.core.publisher.Sinks;
2930
import reactor.util.context.Context;
3031

3132
/** Default implementation of {@link RSocket} stored in {@link RSocketPool} */
@@ -35,6 +36,7 @@ final class PooledRSocket extends ResolvingOperator<RSocket>
3536
final RSocketPool parent;
3637
final Mono<RSocket> rSocketSource;
3738
final LoadbalanceTarget loadbalanceTarget;
39+
final Sinks.Empty<Void> onCloseSink;
3840

3941
volatile Subscription s;
4042

@@ -46,6 +48,7 @@ final class PooledRSocket extends ResolvingOperator<RSocket>
4648
this.parent = parent;
4749
this.rSocketSource = rSocketSource;
4850
this.loadbalanceTarget = loadbalanceTarget;
51+
this.onCloseSink = Sinks.unsafe().empty();
4952
}
5053

5154
@Override
@@ -155,6 +158,12 @@ void doCleanup(Throwable t) {
155158
break;
156159
}
157160
}
161+
162+
if (t == ON_DISPOSE) {
163+
this.onCloseSink.tryEmitEmpty();
164+
} else {
165+
this.onCloseSink.tryEmitError(t);
166+
}
158167
}
159168

160169
@Override
@@ -165,6 +174,13 @@ protected void doOnValueExpired(RSocket value) {
165174
@Override
166175
protected void doOnDispose() {
167176
Operators.terminate(S, this);
177+
178+
final RSocket value = this.value;
179+
if (value != null) {
180+
value.onClose().subscribe(null, onCloseSink::tryEmitError, onCloseSink::tryEmitEmpty);
181+
} else {
182+
onCloseSink.tryEmitEmpty();
183+
}
168184
}
169185

170186
@Override
@@ -193,7 +209,12 @@ public Mono<Void> metadataPush(Payload payload) {
193209
}
194210

195211
LoadbalanceTarget target() {
196-
return loadbalanceTarget;
212+
return this.loadbalanceTarget;
213+
}
214+
215+
@Override
216+
public Mono<Void> onClose() {
217+
return this.onCloseSink.asMono();
197218
}
198219

199220
@Override

rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.rsocket.loadbalance;
1717

1818
import io.netty.util.ReferenceCountUtil;
19+
import io.rsocket.Closeable;
1920
import io.rsocket.Payload;
2021
import io.rsocket.RSocket;
2122
import io.rsocket.core.RSocketConnector;
@@ -28,16 +29,18 @@
2829
import java.util.ListIterator;
2930
import java.util.concurrent.CancellationException;
3031
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32+
import java.util.stream.Collectors;
3133
import org.reactivestreams.Publisher;
3234
import org.reactivestreams.Subscription;
3335
import reactor.core.CoreSubscriber;
3436
import reactor.core.publisher.Flux;
3537
import reactor.core.publisher.Mono;
3638
import reactor.core.publisher.Operators;
39+
import reactor.core.publisher.Sinks;
3740
import reactor.util.annotation.Nullable;
3841

3942
class RSocketPool extends ResolvingOperator<Object>
40-
implements CoreSubscriber<List<LoadbalanceTarget>> {
43+
implements CoreSubscriber<List<LoadbalanceTarget>>, Closeable {
4144

4245
static final AtomicReferenceFieldUpdater<RSocketPool, PooledRSocket[]> ACTIVE_SOCKETS =
4346
AtomicReferenceFieldUpdater.newUpdater(
@@ -49,6 +52,7 @@ class RSocketPool extends ResolvingOperator<Object>
4952
final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
5053
final RSocketConnector connector;
5154
final LoadbalanceStrategy loadbalanceStrategy;
55+
final Sinks.Empty<Void> onAllClosedSink = Sinks.unsafe().empty();
5256
volatile PooledRSocket[] activeSockets;
5357
volatile Subscription s;
5458

@@ -64,6 +68,11 @@ public RSocketPool(
6468
targetPublisher.subscribe(this);
6569
}
6670

71+
@Override
72+
public Mono<Void> onClose() {
73+
return onAllClosedSink.asMono();
74+
}
75+
6776
@Override
6877
protected void doOnDispose() {
6978
Operators.terminate(S, this);
@@ -72,6 +81,14 @@ protected void doOnDispose() {
7281
for (RSocket rSocket : activeSockets) {
7382
rSocket.dispose();
7483
}
84+
85+
if (activeSockets.length > 0) {
86+
Mono.whenDelayError(
87+
Arrays.stream(activeSockets).map(RSocket::onClose).collect(Collectors.toList()))
88+
.subscribe(null, onAllClosedSink::tryEmitError, onAllClosedSink::tryEmitEmpty);
89+
} else {
90+
onAllClosedSink.tryEmitEmpty();
91+
}
7592
}
7693

7794
@Override

rsocket-core/src/main/java/io/rsocket/loadbalance/ResolvingOperator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,30 @@ protected void doOnDispose() {
327327
// no ops
328328
}
329329

330+
public final boolean connect() {
331+
for (; ; ) {
332+
final BiConsumer<T, Throwable>[] a = this.subscribers;
333+
334+
if (a == TERMINATED) {
335+
return false;
336+
}
337+
338+
if (a == READY) {
339+
return true;
340+
}
341+
342+
if (a != EMPTY_UNSUBSCRIBED) {
343+
// do nothing if already started
344+
return true;
345+
}
346+
347+
if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
348+
this.doSubscribe();
349+
return true;
350+
}
351+
}
352+
}
353+
330354
final int add(BiConsumer<T, Throwable> ps) {
331355
for (; ; ) {
332356
BiConsumer<T, Throwable>[] a = this.subscribers;

0 commit comments

Comments
 (0)