-
Notifications
You must be signed in to change notification settings - Fork 184
Query never completes fix #426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The problem: when our receive buffer fills and conversation demand drops to zero and last item is `CommandComplete` we never request more frames and will never get `ReadyForQuery` To fix this we check if conversation can be completed with message and send `onComplete` even if demand is zero.
Thanks a lot. I currently do not understand why demand drops to zero. According to the test, we never cancel or issue a Could it be that we have a bug in our accumulation of |
Actually it looks more like |
Yikes, Let me have a look over the code how we can fix it by removing |
Oh! I got it! In window onNext#234 goes to a guarded drain loop. But |
I don't have a total grasp of the issue, but maybe this can help: it is perfectly fine for a Let's take the example of a publisher that can emit 1, 2, 3. If subscriber |
ah, so there's a bug in reactor then? 🕵️ |
It is, but I'm afraid that some of demand management will go wrong after such demand hacks.
I guess someone with a better reactor knowledge should approve that, I could have understand the code wrong. |
Nah, we have some problems in Extended Flow because it's more complex and we had to write it in another way afaik. Simple flow still uses windows. |
after looking at the code, I'm not sure about that part. I guess if you find a way to demonstrate the issue in a reactor-core test case, that would greatly help. |
@simonbasle try this one. Window should be lager than windowUntil buffer (256) and a little bit more to sure that command complete arrives after. import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ReactorTest {
@Test
@Timeout(10)
void windowUntilTest() {
final int windowSize = 315;
AtomicInteger produced = new AtomicInteger(0);
Flux<Frames> p = Flux.generate(sink -> {
int currentFrame = produced.incrementAndGet();
if (currentFrame > windowSize) {
System.out.println("READY_FOR_QUERY");
sink.next(Frames.READY_FOR_QUERY);
sink.complete();
return;
}
if (currentFrame == windowSize) {
System.out.println("COMMAND_COMPLETE");
sink.next(Frames.COMMAND_COMPLETE);
return;
}
System.out.println("ROW");
sink.next(Frames.ROW);
});
Flux.from(p)
.windowUntil(e -> e == Frames.COMMAND_COMPLETE)
.concatMap(f -> f)
.limitRate(1, 1)
.delayElements(Duration.of(10, ChronoUnit.MILLIS), Schedulers.single())
.limitRate(1, 1)
.as(StepVerifier::create)
.expectNextCount(windowSize + 2)
.verifyComplete();
}
enum Frames {
ROW, COMMAND_COMPLETE, READY_FOR_QUERY
}
}
And it's not in a other thread. The drain happens at the same thread but higher by stack. |
@mp911de are we waiting for @simonbasle response or I should reimplement simple flow without windows? |
Let's move forward with removing |
The problem: when our receive buffer fills and conversation demand drops to zero and last item is `CommandComplete` we never request more frames and will never get `ReadyForQuery` Signed-off-by: Mark Paluch <[email protected]> [closes #401][#426]
Remove windowUntil(…) to avoid miscalculation of demand in long-running streams. Refine discardOnCancel operator calls to properly guard simple queries. [#401][#426] Signed-off-by: Mark Paluch <[email protected]>
The problem: when our receive buffer fills and conversation demand drops to zero and last item is `CommandComplete` we never request more frames and will never get `ReadyForQuery` Signed-off-by: Mark Paluch <[email protected]> [closes #401][#426]
Remove windowUntil(…) to avoid miscalculation of demand in long-running streams. Refine discardOnCancel operator calls to properly guard simple queries. [#401][#426] Signed-off-by: Mark Paluch <[email protected]>
Thank you for your contribution. That's merged, polished, and backported now. I removed |
[#401][#426] Signed-off-by: Mark Paluch <[email protected]>
[#401][#426] Signed-off-by: Mark Paluch <[email protected]>
[#401][#426] Signed-off-by: Mark Paluch <[email protected]>
The problem: when our receive buffer fills and conversation demand drops to zero and last item is
CommandComplete
we never request more frames and will never getReadyForQuery
To fix this we check if conversation can be completed with message and send
onComplete
even if demand is zero.Should fix #401