Skip to content

Commit c65683e

Browse files
author
Oleh Dokuka
committed
ensures last frame is delivered in UP
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent 00d8311 commit c65683e

File tree

6 files changed

+208
-48
lines changed

6 files changed

+208
-48
lines changed

rsocket-core/src/jcstress/java/io/rsocket/internal/UnboundedProcessorStressTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,70 @@ public void arbiter(LLL_Result r) {
895895
}
896896
}
897897

898+
@JCStressTest
899+
@Outcome(
900+
id = {
901+
"0, 1, 0, 5",
902+
"1, 1, 0, 5",
903+
"2, 1, 0, 5",
904+
"3, 1, 0, 5",
905+
"4, 1, 0, 5",
906+
"5, 1, 0, 5",
907+
},
908+
expect = Expect.ACCEPTABLE,
909+
desc = "onComplete()")
910+
@State
911+
public static class Smoke33StressTest extends UnboundedProcessorStressTest {
912+
913+
final StressSubscriber<ByteBuf> stressSubscriber =
914+
new StressSubscriber<>(Long.MAX_VALUE, Fuseable.NONE);
915+
final ByteBuf byteBuf1 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(1);
916+
final ByteBuf byteBuf2 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(2);
917+
final ByteBuf byteBuf3 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(3);
918+
final ByteBuf byteBuf4 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(4);
919+
final ByteBuf byteBuf5 = UnpooledByteBufAllocator.DEFAULT.buffer().writeByte(5);
920+
921+
{
922+
unboundedProcessor.subscribe(stressSubscriber);
923+
}
924+
925+
@Actor
926+
public void next1() {
927+
unboundedProcessor.tryEmitNormal(byteBuf1);
928+
unboundedProcessor.tryEmitPrioritized(byteBuf2);
929+
}
930+
931+
@Actor
932+
public void next2() {
933+
unboundedProcessor.tryEmitPrioritized(byteBuf3);
934+
unboundedProcessor.tryEmitNormal(byteBuf4);
935+
}
936+
937+
@Actor
938+
public void complete() {
939+
unboundedProcessor.tryEmitFinal(byteBuf5);
940+
}
941+
942+
@Arbiter
943+
public void arbiter(LLLL_Result r) {
944+
r.r1 = stressSubscriber.onNextCalls;
945+
r.r2 =
946+
stressSubscriber.onCompleteCalls
947+
+ stressSubscriber.onErrorCalls * 2
948+
+ stressSubscriber.droppedErrors.size() * 3;
949+
950+
r.r4 = stressSubscriber.values.get(stressSubscriber.values.size() - 1).readByte();
951+
stressSubscriber.values.forEach(ByteBuf::release);
952+
953+
r.r3 =
954+
byteBuf1.refCnt()
955+
+ byteBuf2.refCnt()
956+
+ byteBuf3.refCnt()
957+
+ byteBuf4.refCnt()
958+
+ byteBuf5.refCnt();
959+
}
960+
}
961+
898962
@JCStressTest
899963
@Outcome(
900964
id = {

rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ public BaseDuplexConnection() {}
3030
@Override
3131
public void sendFrame(int streamId, ByteBuf frame) {
3232
if (streamId == 0) {
33-
sender.onNextPrioritized(frame);
33+
sender.tryEmitPrioritized(frame);
3434
} else {
35-
sender.onNext(frame);
35+
sender.tryEmitNormal(frame);
3636
}
3737
}
3838

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 125 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public final class UnboundedProcessor extends Flux<ByteBuf>
9191
static final AtomicLongFieldUpdater<UnboundedProcessor> REQUESTED =
9292
AtomicLongFieldUpdater.newUpdater(UnboundedProcessor.class, "requested");
9393

94+
ByteBuf last;
95+
9496
boolean outputFused;
9597

9698
public UnboundedProcessor() {
@@ -121,78 +123,127 @@ public Object scanUnsafe(Attr key) {
121123
return null;
122124
}
123125

124-
public void onNextPrioritized(ByteBuf t) {
126+
public boolean tryEmitPrioritized(ByteBuf t) {
125127
if (this.done || this.cancelled) {
126128
release(t);
127-
return;
129+
return false;
128130
}
129131

130132
if (!this.priorityQueue.offer(t)) {
131133
onError(Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()));
132134
release(t);
133-
return;
135+
return false;
134136
}
135137

136138
final long previousState = markValueAdded(this);
137139
if (isFinalized(previousState)) {
138140
this.clearSafely();
139-
return;
141+
return false;
140142
}
141143

142144
if (isSubscriberReady(previousState)) {
143145
if (this.outputFused) {
144146
// fast path for fusion
145147
this.actual.onNext(null);
146-
return;
148+
return true;
147149
}
148150

149151
if (isWorkInProgress(previousState)) {
150-
return;
152+
return true;
151153
}
152154

153155
if (hasRequest(previousState)) {
154156
drainRegular(previousState);
155157
}
156158
}
159+
return true;
157160
}
158161

159-
@Override
160-
public void onNext(ByteBuf t) {
162+
public boolean tryEmitNormal(ByteBuf t) {
161163
if (this.done || this.cancelled) {
162164
release(t);
163-
return;
165+
return false;
164166
}
165167

166168
if (!this.queue.offer(t)) {
167169
onError(Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()));
168170
release(t);
169-
return;
171+
return false;
170172
}
171173

172174
final long previousState = markValueAdded(this);
173175
if (isFinalized(previousState)) {
174176
this.clearSafely();
175-
return;
177+
return false;
176178
}
177179

178180
if (isSubscriberReady(previousState)) {
179181
if (this.outputFused) {
180182
// fast path for fusion
181183
this.actual.onNext(null);
182-
return;
184+
return true;
183185
}
184186

185187
if (isWorkInProgress(previousState)) {
186-
return;
188+
return true;
187189
}
188190

189191
if (hasRequest(previousState)) {
190192
drainRegular(previousState);
191193
}
192194
}
195+
196+
return true;
197+
}
198+
199+
public boolean tryEmitFinal(ByteBuf t) {
200+
if (this.done || this.cancelled) {
201+
release(t);
202+
return false;
203+
}
204+
205+
this.last = t;
206+
this.done = true;
207+
208+
final long previousState = markValueAddedAndTerminated(this);
209+
if (isFinalized(previousState)) {
210+
this.clearSafely();
211+
return false;
212+
}
213+
214+
if (isSubscriberReady(previousState)) {
215+
if (this.outputFused) {
216+
// fast path for fusion
217+
this.actual.onNext(null);
218+
this.actual.onComplete();
219+
return true;
220+
}
221+
222+
if (isWorkInProgress(previousState)) {
223+
return true;
224+
}
225+
226+
if (hasRequest(previousState)) {
227+
drainRegular(previousState);
228+
}
229+
}
230+
231+
return true;
232+
}
233+
234+
@Deprecated
235+
public void onNextPrioritized(ByteBuf t) {
236+
tryEmitPrioritized(t);
193237
}
194238

195239
@Override
240+
@Deprecated
241+
public void onNext(ByteBuf t) {
242+
tryEmitNormal(t);
243+
}
244+
245+
@Override
246+
@Deprecated
196247
public void onError(Throwable t) {
197248
if (this.done || this.cancelled) {
198249
Operators.onErrorDropped(t, currentContext());
@@ -235,6 +286,7 @@ public void onError(Throwable t) {
235286
}
236287

237288
@Override
289+
@Deprecated
238290
public void onComplete() {
239291
if (this.done || this.cancelled) {
240292
return;
@@ -363,6 +415,11 @@ boolean checkTerminated(boolean done, boolean empty, CoreSubscriber<? super Byte
363415
}
364416

365417
if (done && empty) {
418+
final ByteBuf last = this.last;
419+
if (last != null) {
420+
this.last = null;
421+
a.onNext(last);
422+
}
366423
clearAndFinalize(this);
367424
Throwable e = this.error;
368425
if (e != null) {
@@ -515,6 +572,7 @@ public void cancel() {
515572
}
516573

517574
@Override
575+
@Deprecated
518576
public void dispose() {
519577
this.cancelled = true;
520578

@@ -553,7 +611,24 @@ public ByteBuf poll() {
553611
if (t != null) {
554612
return t;
555613
}
556-
return this.queue.poll();
614+
615+
t = this.queue.poll();
616+
if (t != null) {
617+
return t;
618+
}
619+
620+
t = this.last;
621+
if (t != null) {
622+
try {
623+
this.last = null;
624+
return t;
625+
} finally {
626+
627+
clearAndFinalize(this);
628+
}
629+
}
630+
631+
return null;
557632
}
558633

559634
@Override
@@ -598,6 +673,12 @@ void clearUnsafely() {
598673
final Queue<ByteBuf> queue = this.queue;
599674
final Queue<ByteBuf> priorityQueue = this.priorityQueue;
600675

676+
final ByteBuf last = this.last;
677+
678+
if (last != null) {
679+
release(last);
680+
}
681+
601682
ByteBuf byteBuf;
602683
while ((byteBuf = queue.poll()) != null) {
603684
release(byteBuf);
@@ -745,6 +826,36 @@ static long markValueAdded(UnboundedProcessor instance) {
745826
}
746827
}
747828

829+
/**
830+
* Sets {@link #FLAG_HAS_VALUE} flag if it was not set before and if flags {@link
831+
* #FLAG_FINALIZED}, {@link #FLAG_CANCELLED}, {@link #FLAG_DISPOSED} are unset. Also, this method
832+
* increments number of work in progress (WIP) if {@link #FLAG_HAS_REQUEST} is set
833+
*
834+
* @return previous state
835+
*/
836+
static long markValueAddedAndTerminated(UnboundedProcessor instance) {
837+
for (; ; ) {
838+
final long state = instance.state;
839+
840+
if (isFinalized(state)) {
841+
return state;
842+
}
843+
844+
long nextState = state;
845+
if (isWorkInProgress(state)) {
846+
nextState = addWork(state);
847+
} else if (isSubscriberReady(state) && !instance.outputFused) {
848+
if (hasRequest(state)) {
849+
nextState = addWork(state);
850+
}
851+
}
852+
853+
if (STATE.compareAndSet(instance, state, nextState | FLAG_HAS_VALUE | FLAG_TERMINATED)) {
854+
return state;
855+
}
856+
}
857+
}
858+
748859
/**
749860
* Sets {@link #FLAG_TERMINATED} flag if it was not set before and if flags {@link
750861
* #FLAG_FINALIZED}, {@link #FLAG_CANCELLED}, {@link #FLAG_DISPOSED} are unset. Also, this method

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,16 @@ public Flux<ByteBuf> receive() {
8989
@Override
9090
public void sendFrame(int streamId, ByteBuf frame) {
9191
if (streamId == 0) {
92-
out.onNextPrioritized(frame);
92+
out.tryEmitPrioritized(frame);
9393
} else {
94-
out.onNext(frame);
94+
out.tryEmitNormal(frame);
9595
}
9696
}
9797

9898
@Override
9999
public void sendErrorAndClose(RSocketErrorException e) {
100100
final ByteBuf errorFrame = ErrorFrameCodec.encode(allocator, 0, e);
101-
out.onNext(errorFrame);
102-
dispose();
101+
out.tryEmitFinal(errorFrame);
103102
}
104103

105104
@Override

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,11 @@ public TcpDuplexConnection(Connection connection) {
4343
this.connection = Objects.requireNonNull(connection, "connection must not be null");
4444

4545
connection
46-
.channel()
47-
.closeFuture()
48-
.addListener(
49-
future -> {
50-
if (!isDisposed()) dispose();
51-
});
52-
53-
connection.outbound().send(sender).then().subscribe();
46+
.outbound()
47+
.send(sender.hide())
48+
.then()
49+
.doFinally(__ -> connection.dispose())
50+
.subscribe();
5451
}
5552

5653
@Override
@@ -70,17 +67,13 @@ protected void doOnClose() {
7067

7168
@Override
7269
public Mono<Void> onClose() {
73-
return super.onClose().and(connection.onDispose());
70+
return Mono.whenDelayError(super.onClose(), connection.onTerminate());
7471
}
7572

7673
@Override
7774
public void sendErrorAndClose(RSocketErrorException e) {
7875
final ByteBuf errorFrame = ErrorFrameCodec.encode(alloc(), 0, e);
79-
connection
80-
.outbound()
81-
.sendObject(FrameLengthCodec.encode(alloc(), errorFrame.readableBytes(), errorFrame))
82-
.subscribe(connection.disposeSubscriber());
83-
sender.onComplete();
76+
sender.tryEmitFinal(FrameLengthCodec.encode(alloc(), errorFrame.readableBytes(), errorFrame));
8477
}
8578

8679
@Override

0 commit comments

Comments
 (0)