Skip to content

DatabaseClient fetch all rows never completes #592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Leuteris opened this issue May 1, 2021 · 3 comments
Closed

DatabaseClient fetch all rows never completes #592

Leuteris opened this issue May 1, 2021 · 3 comments
Labels
for: external-project For an external project and not something we can fix

Comments

@Leuteris
Copy link

Leuteris commented May 1, 2021

I'am using Spring Data R2DBC with PostgreSQL R2DBC driver. I have created the following method that fetches some rows from database and returns a Flux of objects. When the number of rows returned by the query are e.g. 1200 or 15.000.000, Flux never emits the onComplete signal. Here's a sample code:

	public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
		DatabaseClient databaseClient = DatabaseClient.create(getConnectionFactory(config.getDatabaseDetails()));

		return databaseClient
				.sql("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1200")
				.fetch()
				.all()
				.map(this::toProfile)
				.log(); //never emits the onComplete signal
	}

When adding a .log() call to the chain I see 256 request(1) before the stream stops. I would expect to see the onComplete signal:

Logs
2021-05-01 18:33:41,076  INFO data-extractor-quartz_Worker-1 r.F.M.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
 2021-05-01 18:33:41,077  INFO data-extractor-quartz_Worker-1 r.F.M.1 - | request(256) 2021-05-01 18:33:41,159  INFO reactor-tcp-epoll-1 r.F.M.1 - | onNext(UserProfile(identifier=3253647531, type=MSISDN, attributes={bs_min=1, bs_avg=32, relationship_advice=20000, sports_facts=4, world_records=5}))
 .
 .
 .
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | onNext(UserProfile(identifier=3211286379, type=MSISDN, attributes={bs_min=1, bs_avg=32, relationship_advice=20000, sports_facts=4, world_records=5}))
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,539  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,540  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,549  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,550  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,551  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,552  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,553  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)
2021-05-01 18:33:41,554  INFO reactor-tcp-epoll-1 r.F.M.1 - | request(1)

If I change the limit to 1201 instead of 1200, flux emits the onComplete signal as expected:

	public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
		DatabaseClient databaseClient = DatabaseClient.create(getConnectionFactory(config.getDatabaseDetails()));

		return databaseClient
				.sql("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1201")
				.fetch()
				.all()
				.map(this::toProfile)
				.log(); //emits onComplete signal
	}
Logs
2021-05-01 19:18:40,492  INFO data-extractor-quartz_Worker-2 r.F.M.2 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2021-05-01 19:18:40,492  INFO data-extractor-quartz_Worker-2 r.F.M.2 - | request(256)
2021-05-01 19:18:40,501  INFO reactor-tcp-epoll-2 r.F.M.2 - | onNext(UserProfile(identifier=4214902037, type=MSISDN, attributes={bs_min=1, bs_avg=32, relationship_advice=20000, sports_facts=4, world_records=5}))
.
.
.
2021-05-01 19:18:40,678  INFO reactor-tcp-epoll-1 r.F.M.2 - | request(1)
2021-05-01 19:18:40,678  INFO reactor-tcp-epoll-2 r.F.M.2 - | onComplete()

Full Code Sample

	public Flux<UserProfile> fetchProfiles(RelationalDBJobConfiguration config) {
		DatabaseClient databaseClient = DatabaseClient.create(getConnectionFactory(config.getDatabaseDetails()));

		return databaseClient
				.sql("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1200")
				.fetch()
				.all()
				.map(this::toProfile)
				.log(); //never emits the onComplete signal
	}
	private UserProfile toProfile(Map<String, Object> result) {
		String userId = (String) result.get("userId");

		return UserProfile.builder()
				.identifier(userId)
				.type(UserIdentifierType.MSISDN)
				.attributes(result)
				.build();
	}

	private ConnectionFactory getConnectionFactory(DatabaseDetails details) {
		return ConnectionFactories.get(ConnectionFactoryOptions.builder()
				.option(ConnectionFactoryOptions.DRIVER, details.getDriver())
				.option(ConnectionFactoryOptions.HOST, details.getHost())
				.option(ConnectionFactoryOptions.PORT, details.getPort())
				.option(ConnectionFactoryOptions.USER, details.getUser())
				.option(ConnectionFactoryOptions.PASSWORD, details.getPassword())
				.option(ConnectionFactoryOptions.DATABASE, details.getDatabase())
				.build());
	}
	public Mono<Void> synchronizeProfiles(RelationalDBJobConfiguration jobConfig) {
		Flux<UserProfile> profileFlux = fetchProfiles(jobConfig);
		return submitProfiles(profileFlux) //submit items to external system using RSocket
				.then();
	}

Versions

  • Spring Data: 1.2.8, 1.3.0
  • PostgreSQL R2DBC Driver: 0.8.6.RELEASE, 0.8.7.RELEASE
  • Spring-R2DBC: 5.3.6
  • Database: PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
  • Java: Java(TM) SE Runtime Environment (build 16+36-2231)
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label May 1, 2021
@mp911de
Copy link
Member

mp911de commented May 3, 2021

Does the same happen when using R2DBC SPI directly? That is something like:

Flux.usingWhen(connectionFactory.create(),
connection -> {
return Flux.from(connection.createStatement("select userId, attr1, attr2, attr3, " +
					"attr4, attr5 from test_r2dbc.users limit 1200")
    .execute()).flatMap(result -> result.map(…));
}, Connection::close);

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label May 3, 2021
@Leuteris
Copy link
Author

Leuteris commented May 3, 2021

@mp911de The same thing happens with the above code. By changing it to the following, the issue does not appear but a memory leak is occurred as described here pgjdbc/r2dbc-postgresql#396 (comment). I have run many tests with the Spring Data and the memory leak does not happen.

return Flux.usingWhen(
		conFactory.create(),
		conn -> Mono.from(conn.createStatement(config.getQuery()).execute())
				.flatMapMany(result -> result.map(this::toProfile)),
		Connection::close);

@mp911de
Copy link
Member

mp911de commented May 3, 2021

Thanks a lot. In that case, I'm closing this issue here as the problem originates in the driver.

@mp911de mp911de closed this as completed May 3, 2021
@mp911de mp911de added for: external-project For an external project and not something we can fix and removed status: waiting-for-feedback We need additional information before we can continue status: waiting-for-triage An issue we've not yet triaged labels May 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: external-project For an external project and not something we can fix
Projects
None yet
Development

No branches or pull requests

3 participants