Skip to content

DataRow memory leak #396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Leuteris opened this issue Apr 16, 2021 · 9 comments
Closed

DataRow memory leak #396

Leuteris opened this issue Apr 16, 2021 · 9 comments
Labels
status: waiting-for-feedback We need additional information before we can continue status: waiting-for-triage An issue we've not yet triaged

Comments

@Leuteris
Copy link

Leuteris commented Apr 16, 2021

Bug Report

Versions

Driver: 0.8.6.RELEASE
Database: PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
Java:
java version "15.0.1" 2020-10-20
Java(TM) SE Runtime Environment (build 15.0.1+9-18)
Java HotSpot(TM) 64-Bit Server VM (build 15.0.1+9-18, mixed mode, sharing)
OS: Ubuntu 18.04.5 LTS

Current Behavior

We are facing a memory leak while executing a select query with a limit of 30m rows. A heap analysis is showing that more than 4m instances of io.r2dbc.postgresql.message.backend.DataRow are created in a few seconds. This is not always the case but it is very common if we run the query more than once.

This is the JProfiler report after some minutes of execution:
memory-leak-jprofiler

Stack trace
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "boundedElastic-evictor-1"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "parallel-1"

Table schema

Input Code
create table if not exists test_r2dbc.users
(
	"userId" integer,
	attr1 integer,
	attr2 integer,
	attr3 integer,
	attr4 integer,
	attr5 integer,
	attr6 integer,
	attr7 integer,
	attr8 integer,
	attr9 integer,
	attr10 integer,
	attr11 text,
	attr12 text,
	attr13 text,
	attr14 text,
	attr15 text,
	attr16 text,
	attr17 text,
	attr18 text,
	attr19 text,
	attr20 text
);

Steps to reproduce

Input Code
ConnectionFactory conFactory = getConnectionFactory(config.getDatabaseDetails());
		return Flux.usingWhen(
				conFactory.create(),
				conn -> Mono.from(conn.createStatement("select attr1, attr2, attr3  attr4, attr5 from test_r2dbc.users limit 30000000").execute())
						.flatMapMany(result -> result.map(this::toProfile)),
				Connection::close);

	private ConnectionFactory getConnectionFactory(DatabaseDetails details) {
		return ConnectionFactories.get(ConnectionFactoryOptions.builder()
				.option(ConnectionFactoryOptions.DRIVER, details.getDriver())
				.option(ConnectionFactoryOptions.HOST, details.getHost())
				.option(ConnectionFactoryOptions.PORT, details.getPort())
				.option(ConnectionFactoryOptions.USER, details.getUser())
				.option(ConnectionFactoryOptions.PASSWORD, details.getPassword())
				.option(ConnectionFactoryOptions.DATABASE, details.getDatabase())
				.build());
	}
@Leuteris Leuteris added the status: waiting-for-triage An issue we've not yet triaged label Apr 16, 2021
@mp911de
Copy link
Collaborator

mp911de commented Apr 16, 2021

Care to attach how toProfile looks like? Row instances are created as the response stream comes in and released upon consumption of the row via Result.map(…).

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Apr 16, 2021
@Leuteris
Copy link
Author

Leuteris commented Apr 16, 2021

Here is my mapper:

	private UserProfile toProfile(Row row, RowMetadata rowMetadata) {
		Map<String, Object> attributes = mapAttributes(row, rowMetadata);
		return UserProfile.builder()
				.identifier(row.get("userId", String.class))
				.type(UserIdentifierType.MSISDN)
				.attributes(attributes)
				.build();
	}

	private Map<String, Object> mapAttributes(Row row, RowMetadata rowMetadata) {
		Map<String, Object> columns = new HashMap<>();
		rowMetadata.getColumnNames()
				.forEach(columnName -> {
					Object columnValue = row.get(columnName);
					if (columnValue != null) {
						columns.put(columnName, columnValue);
					}
				});
		columns.remove("userId");
		return columns;
	}

@mp911de
Copy link
Collaborator

mp911de commented Apr 16, 2021

Thank you. May I assume that you no longer hold on to the Row object in your map? Maybe it is just an effect of having proper load in combination with direct buffers. You could switch to unpooled heap buffers instead of direct memory to let GC clean up the buffers.

@Leuteris
Copy link
Author

How can i switch to unpooled buffers? Thank you.

@mp911de
Copy link
Collaborator

mp911de commented Apr 16, 2021

Via system property, check out netty/netty#6305 (comment) for all netty property names.

Specifically, io.netty.allocator.type=unpooled and io.netty.noPreferDirect=true

@Leuteris
Copy link
Author

I tested it with system variables -Dio.netty.noPreferDirect=true -Dio.netty.allocator.type=unpooled. The OOM error persists. Millions instances of io.r2dbc.postgresql.message.backend.DataRow are created in a matter of seconds.

The unpooled heap buffer is used as shown below:
unpooled-memory-leak-jprofiler

Stacktrace has changed to the following:

java.lang.OutOfMemoryError: Java heap space

@Leuteris
Copy link
Author

More logs:

2021-04-22 22:47:54,731 ERROR reactor-tcp-epoll-8 r.n.c.ChannelOperationsHandler - [id:324f5be1, L:/127.0.0.1:55474 - R:localhost/127.0.0.1:5432] Error was received while reading the incoming data. The connection will be closed.
java.lang.OutOfMemoryError: Java heap space
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1221)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.slice(AbstractUnpooledSlicedByteBuf.java:230)
at io.netty.buffer.AbstractByteBuf.retainedSlice(AbstractByteBuf.java:1226)
at io.netty.buffer.AbstractByteBuf.readRetainedSlice(AbstractByteBuf.java:888)
at io.r2dbc.postgresql.message.backend.DataRow.decodeColumn(DataRow.java:110)
at io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:100)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
at io.r2dbc.postgresql.client.ReactorNettyClient$$Lambda$1320/0x00000008013df4c8.apply(Unknown Source)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
2021-04-22 22:47:56,553 ERROR reactor-tcp-epoll-8 r.c.p.Operators - Operator called default onErrorDropped
reactor.netty.ReactorNetty$InternalNettyException: java.lang.OutOfMemoryError: Java heap space
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ ⇢ at reactor.netty.channel.ChannelOperations.receiveObject(ChannelOperations.java:267)
|_ Flux.from ⇢ at reactor.netty.ReactorNetty.publisherOrScalarMap(ReactorNetty.java:516)
|_ Flux.map ⇢ at reactor.netty.ReactorNetty.publisherOrScalarMap(ReactorNetty.java:517)
|_ Flux.from ⇢ at reactor.netty.ByteBufFlux.fromInbound(ByteBufFlux.java:72)
|_ Flux.doOnError ⇢ at io.r2dbc.postgresql.client.ReactorNettyClient.(ReactorNettyClient.java:153)
|_ Flux.handle ⇢ at io.r2dbc.postgresql.client.ReactorNettyClient.(ReactorNettyClient.java:157)
Stack trace:
Caused by: java.lang.OutOfMemoryError: Java heap space
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1221)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.slice(AbstractUnpooledSlicedByteBuf.java:230)
at io.netty.buffer.AbstractByteBuf.retainedSlice(AbstractByteBuf.java:1226)
at io.netty.buffer.AbstractByteBuf.readRetainedSlice(AbstractByteBuf.java:888)
at io.r2dbc.postgresql.message.backend.DataRow.decodeColumn(DataRow.java:110)
at io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:100)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
at io.r2dbc.postgresql.client.ReactorNettyClient$$Lambda$1320/0x00000008013df4c8.apply(Unknown Source)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)

@mp911de
Copy link
Collaborator

mp911de commented May 10, 2021

I tried to reproduce the issue but with no luck using your parameters. My data set had a size of 7M rows. I attached a graph showing memory usage over time. Objects created by the driver are not retained within the mapping function.

Code used to reproduce:

connection.createStatement("SELECT * from MANY_USERS").execute()
    .flatMap(it -> it.map((row, rowMetadata) -> {

        // simulate interaction
        row.get(0);
        row.get(10);

        int i = counter.incrementAndGet();
        if(i % 1000 == 0) {
            System.out.println(i);
        }
        return new Object();
    })).blockLast();

Bildschirmfoto 2021-05-10 um 10 13 14

Bildschirmfoto 2021-05-10 um 10 13 08

@mp911de
Copy link
Collaborator

mp911de commented Jul 5, 2021

Closing as this ticket isn't actionable. If you would like us to look at this issue, please provide additional information and we will re-open the issue.

@mp911de mp911de closed this as completed Jul 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-feedback We need additional information before we can continue status: waiting-for-triage An issue we've not yet triaged
Projects
None yet
Development

No branches or pull requests

2 participants