Skip to content

Fetch rows never completes #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Leuteris opened this issue May 4, 2021 · 8 comments
Closed

Fetch rows never completes #401

Leuteris opened this issue May 4, 2021 · 8 comments
Assignees
Labels
type: bug A general bug
Milestone

Comments

@Leuteris
Copy link

Leuteris commented May 4, 2021

Bug Report

Versions

  • Driver: 0.8.6.RELEASE, 0.8.7.RELEASE
  • Database: PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
  • Java: Java(TM) SE Runtime Environment (build 16+36-2231)
  • OS: Ubuntu 18.04.5 LTS

Current Behavior

I have created the following method that fetches some rows from database and returns a Flux of objects. When the number of rows returned by the query are e.g. 1200 or 15.000.000, Flux never emits the onComplete signal. Here's a sample code:

public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
	ConnectionFactory connectionFactory = getConnectionFactory(config.getDatabaseDetails());
	return Flux.usingWhen(connectionFactory.create(),
			connection -> {
				return Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1200")
						.execute()).flatMap(result -> result.map(this::toProfile));
			}, Connection::close)
                        .log();//never emits the onComplete signal
}

When adding a .log() call to the chain I see 256 request(1) before the stream stops. I would expect to see the onComplete signal:

Logs
2021-05-04 12:34:58,015  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - onSubscribe(FluxUsingWhen.ResourceSubscriber)
2021-05-04 12:34:58,015  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - request(256)
2021-05-04 12:34:58,091  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=1953401825, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
 .
 .
 .
2021-05-04 12:34:58,753  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=9095453157, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
2021-05-04 12:34:58,754  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,755  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,756  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,757  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,757  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,757  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,758  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,758  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,759  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,760  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,760  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,760  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,761  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,761  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,761  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,763  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,763  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,764  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,764  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,765  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,765  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,765  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,766  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,767  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,767  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,767  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,768  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,768  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,768  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,769  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,770  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,771  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,772  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,772  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,772  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,773  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,773  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,773  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,774  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,774  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,775  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,775  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,777  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,780  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,780  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,781  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,782  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,784  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,784  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,785  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,786  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,786  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,786  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,789  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,789  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,790  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,791  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,791  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,793  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,793  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,794  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,796  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,796  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,797  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,797  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,797  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,798  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,798  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,798  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,799  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,800  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,801  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,805  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,807  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,807  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,808  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,808  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,809  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,810  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,812  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,813  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,813  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,813  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,814  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,816  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,817  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,817  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,818  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,820  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,820  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,821  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,822  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,822  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,823  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,823  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,824  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,824  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,824  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,825  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,825  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,825  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,826  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,826  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,827  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,829  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,830  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,832  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,833  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,834  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,834  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,835  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,836  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,837  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,838  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,838  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,839  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,840  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,840  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,841  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,842  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,843  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,844  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,847  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,848  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,849  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,850  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,850  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,850  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,852  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,853  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,853  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,853  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,854  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,855  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,855  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,855  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,856  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,857  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,857  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,857  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,859  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,859  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,860  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,860  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,862  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,863  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,863  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,864  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,864  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,864  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,866  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,866  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,867  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,867  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,867  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,868  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,868  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,868  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,869  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,870  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,871  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,872  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,873  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,873  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,873  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,874  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)
2021-05-04 12:34:58,876  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - request(1)

If I change the limit to 1201 instead of 1200, Flux emits the onComplete signal as expected:

public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
	ConnectionFactory connectionFactory = getConnectionFactory(config.getDatabaseDetails());
	return Flux.usingWhen(connectionFactory.create(),
			connection -> {
				return Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1201")
						.execute()).flatMap(result -> result.map(this::toProfile));
			}, Connection::close)
                        .log();//emits the onComplete signal
}
Logs
2021-05-04 12:41:35,307  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - onSubscribe(FluxUsingWhen.ResourceSubscriber)
2021-05-04 12:41:35,307  INFO data-extractor-quartz_Worker-1 r.F.UsingWhen.1 - request(256)
2021-05-04 12:41:35,362  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=3389648496, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
.
.
.
2021-05-04 12:41:35,982  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onNext(UserProfile(identifier=9637753866, type=MSISDN, attributes={world_records=5, relationship_advice=20000, bs_min=1, bs_avg=32, sports_facts=4}))
2021-05-04 12:41:35,996  INFO reactor-tcp-epoll-1 r.F.UsingWhen.1 - onComplete()

Table schema

Input Code
create table if not exists test_r2dbc.users
(
	"userId" integer,
	attr1 integer,
	attr2 integer,
	attr3 integer,
	attr4 integer,
	attr5 integer,
	attr6 integer,
	attr7 integer,
	attr8 integer,
	attr9 integer,
	attr10 integer,
	attr11 text,
	attr12 text,
	attr13 text,
	attr14 text,
	attr15 text,
	attr16 text,
	attr17 text,
	attr18 text,
	attr19 text,
	attr20 text
);

Steps to reproduce

Input Code
	public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
		ConnectionFactory connectionFactory = getConnectionFactory(config.getDatabaseDetails());
		return Flux.usingWhen(connectionFactory.create(),
				connection -> {
					return Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1200")
							.execute()).flatMap(result -> result.map(this::toProfile));
				}, Connection::close)
				.log();
	}

	private UserProfile toProfile(Row row, RowMetadata rowMetadata) {
		Map<String, Object> attributes = mapAttributes(row, rowMetadata);
		return UserProfile.builder()
				.identifier(row.get("userId", String.class))
				.attributes(attributes)
				.build();
	}

	private Map<String, Object> mapAttributes(Row row, RowMetadata rowMetadata) {
		Map<String, Object> columns = new HashMap<>();
		rowMetadata.getColumnNames()
				.forEach(columnName -> {
					Object columnValue = row.get(columnName);
					if (columnValue != null) {
						columns.put(columnName, columnValue);
					}
				});
		columns.remove("userId");
		return columns;
	}

	private ConnectionFactory getConnectionFactory(DatabaseDetails details) {
		return ConnectionFactories.get(ConnectionFactoryOptions.builder()
				.option(ConnectionFactoryOptions.DRIVER, details.getDriver())
				.option(ConnectionFactoryOptions.HOST, details.getHost())
				.option(ConnectionFactoryOptions.PORT, details.getPort())
				.option(ConnectionFactoryOptions.USER, details.getUser())
				.option(ConnectionFactoryOptions.PASSWORD, details.getPassword())
				.option(ConnectionFactoryOptions.DATABASE, details.getDatabase())
				.build());
	}

	public Mono<Void> synchronizeProfiles(RelationalDBJobConfiguration jobConfig) {
		Flux<UserProfile> profileFlux = fetchProfiles(jobConfig);
		return submitProfiles(profileFlux) //submit items to external system using RSocket
				.then();
	}
@Leuteris Leuteris added the status: waiting-for-triage An issue we've not yet triaged label May 4, 2021
@mp911de mp911de added type: bug A general bug and removed status: waiting-for-triage An issue we've not yet triaged labels May 4, 2021
@Squiry
Copy link
Collaborator

Squiry commented Jul 2, 2021

    @Test
    void noCompleteSignalTest() {
        getJdbcOperations().execute("create table if not exists users\n" +
                "(\n" +
                "\tuserId integer,\n" +
                "\tattr1 integer,\n" +
                "\tattr2 integer,\n" +
                "\tattr3 integer,\n" +
                "\tattr4 integer,\n" +
                "\tattr5 integer,\n" +
                "\tattr6 integer,\n" +
                "\tattr7 integer,\n" +
                "\tattr8 integer,\n" +
                "\tattr9 integer,\n" +
                "\tattr10 integer,\n" +
                "\tattr11 text,\n" +
                "\tattr12 text,\n" +
                "\tattr13 text,\n" +
                "\tattr14 text,\n" +
                "\tattr15 text,\n" +
                "\tattr16 text,\n" +
                "\tattr17 text,\n" +
                "\tattr18 text,\n" +
                "\tattr19 text,\n" +
                "\tattr20 text\n" +
                ");\n");
        getJdbcOperations().execute(c -> {
            return c.prepareStatement("INSERT INTO users" +
                    "(userId, attr1, attr2, attr3, attr4, attr5, attr6, attr7, attr8, attr9, attr10, attr11, attr12, attr13, attr14, attr15, attr16, attr17, attr18, attr19, attr20) " +
                    "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
        }, (PreparedStatement stmt) -> {
            for (int i = 0; i < 10000; i++) {
                stmt.setInt(1, i);
                stmt.setInt(2, i * 2);
                stmt.setInt(3, i * 3);
                stmt.setInt(4, i * 4);
                stmt.setInt(5, i * 5);
                stmt.setInt(6, i * 6);
                stmt.setInt(7, i * 7);
                stmt.setInt(8, i * 8);
                stmt.setInt(9, i * 9);
                stmt.setInt(10, i * 10);
                stmt.setInt(11, i * 11);
                stmt.setString(12, String.valueOf(i));
                stmt.setString(13, String.valueOf(i * 2));
                stmt.setString(14, String.valueOf(i * 3));
                stmt.setString(15, String.valueOf(i * 4));
                stmt.setString(16, String.valueOf(i * 5));
                stmt.setString(17, String.valueOf(i * 6));
                stmt.setString(18, String.valueOf(i * 7));
                stmt.setString(19, String.valueOf(i * 8));
                stmt.setString(20, String.valueOf(i * 9));
                stmt.setString(21, String.valueOf(i * 10));
                stmt.addBatch();
            }
            return stmt.executeBatch();
        });

        this.fetchProfiles(1200)
                .as(StepVerifier::create)
                .expectNextCount(1200)
                .verifyComplete();
    }


    public Flux<UserProfile> fetchProfiles(int limit) {
        PostgresqlConnectionFactory connectionFactory = (PostgresqlConnectionFactory) getConnectionFactory();
        return Flux.usingWhen(
                connectionFactory.create(),
                connection -> connection.createStatement("select userId, attr1, attr2, attr3, attr4, attr5 from users limit " + limit)
                        .execute()
                        .flatMap(result -> result.map(UserProfile::new)),
                Connection::close
        )
                .log();
    }

    private static class UserProfile {
        private final String userId;
        private final Map<String, Object> attributes = new HashMap<>();

        private UserProfile(Row row, RowMetadata rowMetadata) {
            for (String columnName : rowMetadata.getColumnNames()) {
                Object columnValue = row.get(columnName);
                if (columnValue != null) {
                    attributes.put(columnName, columnValue);
                }
            }
            attributes.remove("userId");
            this.userId = String.valueOf(row.get("userId", Integer.class));
        }
    }

@Leuteris does this look like a reproducer? I can't reproduce your behaviour.

@Leuteris
Copy link
Author

Leuteris commented Jul 6, 2021

@Squiry Could you try to warp your statement in Flux.from:

Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, attr4, attr5 from users limit " + limit).execute())

Note that this issue is also reproduced in spring-data-r2dbc project. You may find the corresponding ticket here.

@Squiry
Copy link
Collaborator

Squiry commented Jul 6, 2021

Still nothing, test completes as expected. Can you reproduce your problem as a test in a PostgresqlTestKit class?

@Leuteris
Copy link
Author

Leuteris commented Jul 7, 2021

@Squiry Try to consume your result stream to enable back-pressure.

@Leuteris Leuteris closed this as completed Jul 7, 2021
@Leuteris Leuteris reopened this Jul 7, 2021
@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Jul 7, 2021
@Squiry
Copy link
Collaborator

Squiry commented Jul 7, 2021

Well consuming with thread.sleep or somthing like that is not enough. We should move consuming out of event loop so that backpressure is actually on a reactive streams side. Simple delay(1ms) does the job. After that I do thread sleep for 2 ms and limit rate with 1 to make backpressure really bad. And with all that stuff I do reproduce it with 5000 elements on a 0.8.7.RELEASE tag, but only with a simple query flow. So as a really quick fix I can suggest you to move limit value to parameter. I'll try find reasons later.

@Squiry Squiry self-assigned this Jul 7, 2021
@Squiry
Copy link
Collaborator

Squiry commented Jul 8, 2021

Reproduced on master. Here's the test case:

@Test
void noCompleteSignalTest() {
    getJdbcOperations().execute("create table if not exists users\n" +
            "(\n" +
            "\tuserId integer,\n" +
            "\tattr1 integer,\n" +
            "\tattr2 integer,\n" +
            "\tattr3 integer,\n" +
            "\tattr4 integer,\n" +
            "\tattr5 integer,\n" +
            "\tattr6 integer,\n" +
            "\tattr7 integer,\n" +
            "\tattr8 integer,\n" +
            "\tattr9 integer,\n" +
            "\tattr10 integer,\n" +
            "\tattr11 text,\n" +
            "\tattr12 text,\n" +
            "\tattr13 text,\n" +
            "\tattr14 text,\n" +
            "\tattr15 text,\n" +
            "\tattr16 text,\n" +
            "\tattr17 text,\n" +
            "\tattr18 text,\n" +
            "\tattr19 text,\n" +
            "\tattr20 text\n" +
            ");\n");
    getJdbcOperations().execute(c -> {
        return c.prepareStatement("INSERT INTO users" +
                "(userId, attr1, attr2, attr3, attr4, attr5, attr6, attr7, attr8, attr9, attr10, attr11, attr12, attr13, attr14, attr15, attr16, attr17, attr18, attr19, attr20) " +
                "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
    }, (PreparedStatement stmt) -> {
        for (int i = 0; i < 10000; i++) {
            stmt.setInt(1, i);
            stmt.setInt(2, i * 2);
            stmt.setInt(3, i * 3);
            stmt.setInt(4, i * 4);
            stmt.setInt(5, i * 5);
            stmt.setInt(6, i * 6);
            stmt.setInt(7, i * 7);
            stmt.setInt(8, i * 8);
            stmt.setInt(9, i * 9);
            stmt.setInt(10, i * 10);
            stmt.setInt(11, i * 11);
            stmt.setString(12, String.valueOf(i));
            stmt.setString(13, String.valueOf(i * 2));
            stmt.setString(14, String.valueOf(i * 3));
            stmt.setString(15, String.valueOf(i * 4));
            stmt.setString(16, String.valueOf(i * 5));
            stmt.setString(17, String.valueOf(i * 6));
            stmt.setString(18, String.valueOf(i * 7));
            stmt.setString(19, String.valueOf(i * 8));
            stmt.setString(20, String.valueOf(i * 9));
            stmt.setString(21, String.valueOf(i * 10));
            stmt.addBatch();
        }
        return stmt.executeBatch();
    });

    this.fetchProfiles(5000)
            .delayElements(Duration.ofMillis(1))
            .doOnSubscribe(l -> System.out.println("SUBSCRIBE"))
            .doOnRequest(l -> System.out.println(l))
            .doOnComplete(() -> System.out.println("COMPLETE"))
            .limitRate(1, 1)
            .map(this::processProfile)
            .as(StepVerifier::create)
            .expectNextCount(5000)
            .verifyComplete();
}

private UserProfile processProfile(UserProfile up) {
    try {
        Thread.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return up;
}


public Flux<UserProfile> fetchProfiles(int limit) {
    return Flux.usingWhen(
            getConnectionFactory().create(),
            connection -> Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, attr4, attr5 from users limit " + limit)
                    .execute())
                    .flatMap(result -> result.map(UserProfile::new)),
            Connection::close
    );
}

private static class UserProfile {
    private final String userId;
    private final Map<String, Object> attributes = new HashMap<>();

    private UserProfile(Row row, RowMetadata rowMetadata) {
        for (String columnName : rowMetadata.getColumnNames()) {
            Object columnValue = row.get(columnName);
            if (columnValue != null) {
                attributes.put(columnName, columnValue);
            }
        }
        attributes.remove("userId");
        this.userId = String.valueOf(row.get("userId", Integer.class));
    }
}

The problem is in the demand managing again. We can fix it by adding || conversation.canComplete(this.buffer.peek()) here, but I'm still trying to find where are we losing that one unit of demaind before the ReadyForQuery frame to make a more correct fix.

P.S. I have PathCodecUnitTest failing all the time with true/false encoding/decoding issue. Is that some kind of windows problem or what?

@mp911de
Copy link
Collaborator

mp911de commented Jul 8, 2021

Thanks a lot for looking into it. I think that PathCodecUnitTest is broken and it never bubbled up because it does not match the naming scheme. Let me fix that.

@mp911de mp911de removed the status: waiting-for-feedback We need additional information before we can continue label Jul 8, 2021
@mp911de
Copy link
Collaborator

mp911de commented Jul 13, 2021

SimpleQueryPostgresqlStatement.execute(…) still uses windowUntil to cut multiple result sets into their according PostgresqlResult objects so each result set gets isolated. Having each result encapsulated by its Result object makes sense, however for the extended query flow (when using fetch size), we do not cut results into their own Result objects.

I wonder whether that would be the easier fix for now, to just omit windowUntil until we (or Project Reactor) can come up with a better fix in regard to windowUntil.

@mp911de mp911de added this to the 0.8.9.RELEASE milestone Jul 30, 2021
mp911de added a commit that referenced this issue Jul 30, 2021
Remove windowUntil(…) to avoid miscalculation of demand in long-running streams. Refine discardOnCancel operator calls to properly guard simple queries.

[#401][#426]

Signed-off-by: Mark Paluch <[email protected]>
mp911de pushed a commit that referenced this issue Jul 30, 2021
The problem: when our receive buffer fills and conversation demand drops to zero and last item is `CommandComplete` we never request more frames and will never get `ReadyForQuery`

Signed-off-by: Mark Paluch <[email protected]>

[closes #401][#426]
mp911de added a commit that referenced this issue Jul 30, 2021
Remove windowUntil(…) to avoid miscalculation of demand in long-running streams. Refine discardOnCancel operator calls to properly guard simple queries.

[#401][#426]

Signed-off-by: Mark Paluch <[email protected]>
mp911de added a commit that referenced this issue Jul 30, 2021
[#401][#426]

Signed-off-by: Mark Paluch <[email protected]>
mp911de added a commit that referenced this issue Jul 30, 2021
[#401][#426]

Signed-off-by: Mark Paluch <[email protected]>
mp911de added a commit that referenced this issue Jul 30, 2021
[#401][#426]

Signed-off-by: Mark Paluch <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants