Skip to content

Commit d364f6b

Browse files
committed
Use Netty's pooled memory allocator
Instead of the adaptative one, which is the default in 4.2. The pooled allocator was the default in 4.1.
1 parent 35beeeb commit d364f6b

File tree

9 files changed

+16
-12
lines changed

9 files changed

+16
-12
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public Client(ClientParameters parameters) {
256256
b.option(
257257
ChannelOption.ALLOCATOR,
258258
parameters.byteBufAllocator == null
259-
? ByteBufAllocator.DEFAULT
259+
? Utils.byteBufAllocator()
260260
: parameters.byteBufAllocator);
261261
}
262262

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
111111
state.items = new ArrayList<>(state.batchSize);
112112
}
113113
} catch (Exception e) {
114+
// e.printStackTrace();
114115
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
115116
}
116117
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ static class DefaultNettyConfiguration implements NettyConfiguration {
422422

423423
private final EnvironmentBuilder environmentBuilder;
424424
private EventLoopGroup eventLoopGroup;
425-
private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
425+
private ByteBufAllocator byteBufAllocator = Utils.byteBufAllocator();
426426
private Consumer<Channel> channelCustomizer = noOpConsumer();
427427
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
428428

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import com.rabbitmq.stream.*;
2121
import com.rabbitmq.stream.impl.Client.ClientParameters;
22+
import io.netty.buffer.ByteBufAllocator;
23+
import io.netty.buffer.PooledByteBufAllocator;
2224
import io.netty.channel.ConnectTimeoutException;
2325
import io.netty.channel.EventLoopGroup;
2426
import io.netty.channel.MultiThreadIoEventLoopGroup;
@@ -415,6 +417,10 @@ static EventLoopGroup eventLoopGroup() {
415417
return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
416418
}
417419

420+
static ByteBufAllocator byteBufAllocator() {
421+
return PooledByteBufAllocator.DEFAULT;
422+
}
423+
418424
/*
419425
class to help testing SAC on super streams
420426
*/

src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.rabbitmq.stream.impl.ServerFrameHandler.DeliverVersion1FrameHandler;
2121
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
2222
import io.netty.buffer.ByteBuf;
23-
import io.netty.buffer.ByteBufAllocator;
2423
import java.util.ArrayList;
2524
import java.util.List;
2625
import java.util.Random;
@@ -51,7 +50,7 @@ public MessageBuilder messageBuilder() {
5150

5251
ByteBuf generateFrameBuffer(
5352
int nbMessages, long chunkOffset, int dataSize, Iterable<byte[]> messages) {
54-
ByteBuf bb = ByteBufAllocator.DEFAULT.buffer(1024);
53+
ByteBuf bb = Utils.byteBufAllocator().buffer(1024);
5554
bb.writeShort(Utils.encodeRequestCode(Constants.COMMAND_DELIVER))
5655
.writeShort(Constants.VERSION_1)
5756
.writeByte(1) // subscription id

src/test/java/com/rabbitmq/stream/impl/FrameTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.rabbitmq.stream.Message;
2727
import com.rabbitmq.stream.Properties;
2828
import io.netty.buffer.ByteBuf;
29-
import io.netty.buffer.ByteBufAllocator;
3029
import io.netty.channel.Channel;
3130
import io.netty.channel.ChannelFuture;
3231
import java.io.ByteArrayOutputStream;
@@ -149,7 +148,7 @@ public TestDesc(String description, List<Integer> sizes, int expectedCalls) {
149148
tests.forEach(
150149
test -> {
151150
Channel channel = Mockito.mock(Channel.class);
152-
Mockito.when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
151+
Mockito.when(channel.alloc()).thenReturn(Utils.byteBufAllocator());
153152
Mockito.when(channel.writeAndFlush(Mockito.any()))
154153
.thenReturn(Mockito.mock(ChannelFuture.class));
155154

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit
216216
}
217217

218218
@Test
219-
void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) throws Exception {
219+
void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) {
220220
int messageCount = 100;
221221
int streamCount = 20;
222222
int producersCount = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT * 3 + 10;

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.rabbitmq.stream.StreamException;
2727
import com.rabbitmq.stream.impl.Client.ClientParameters;
2828
import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException;
29-
import io.netty.buffer.ByteBufAllocator;
3029
import java.net.URI;
3130
import java.time.Duration;
3231
import java.util.Arrays;
@@ -94,7 +93,7 @@ Client.ClientParameters duplicate() {
9493
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
9594
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
9695
null,
97-
ByteBufAllocator.DEFAULT,
96+
Utils.byteBufAllocator(),
9897
false,
9998
type -> "locator-connection",
10099
cf,
@@ -163,7 +162,7 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
163162
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
164163
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
165164
null,
166-
ByteBufAllocator.DEFAULT,
165+
Utils.byteBufAllocator(),
167166
false,
168167
type -> "locator-connection",
169168
cf,
@@ -195,7 +194,7 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
195194
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
196195
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
197196
null,
198-
ByteBufAllocator.DEFAULT,
197+
Utils.byteBufAllocator(),
199198
lazyInit,
200199
type -> "locator-connection",
201200
cf,

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class StreamProducerUnitTest {
7575
void init() {
7676
mocks = MockitoAnnotations.openMocks(this);
7777
executorService = Executors.newScheduledThreadPool(2);
78-
when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
78+
when(channel.alloc()).thenReturn(Utils.byteBufAllocator());
7979
when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
8080
when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt()))
8181
.thenAnswer(

0 commit comments

Comments
 (0)