Skip to content

Commit d3b6b28

Browse files
committed
Fix CRC calculation for heap buffers
Fixes #16
1 parent 5f481ad commit d3b6b28

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

src/main/java/com/rabbitmq/stream/impl/JdkChunkChecksum.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class JdkChunkChecksum implements ChunkChecksum {
3939
public void checksum(ByteBuf byteBuf, long dataLength, long expected) {
4040
Checksum checksum = checksumSupplier.get();
4141
if (byteBuf.hasArray()) {
42-
checksum.update(byteBuf.array(), byteBuf.readerIndex(), byteBuf.readableBytes());
42+
checksum.update(
43+
byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), byteBuf.readableBytes());
4344
} else {
4445
byteBuf.forEachByte(
4546
byteBuf.readerIndex(), byteBuf.readableBytes(), new UpdateProcessor(checksum));

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import com.rabbitmq.stream.impl.Client.ClientParameters;
3333
import com.rabbitmq.stream.impl.Client.Response;
3434
import com.rabbitmq.stream.impl.Client.StreamParametersBuilder;
35+
import io.netty.buffer.ByteBufAllocator;
36+
import io.netty.buffer.UnpooledByteBufAllocator;
3537
import java.io.ByteArrayOutputStream;
3638
import java.io.DataOutputStream;
3739
import java.net.UnknownHostException;
@@ -477,9 +479,11 @@ void consume() throws Exception {
477479
client.close();
478480
}
479481

480-
@Test
481-
void publishAndConsume() throws Exception {
482-
int publishCount = 1000000;
482+
@ParameterizedTest
483+
@ValueSource(booleans = {true, false})
484+
void publishAndConsume(boolean directBuffer) throws Exception {
485+
ByteBufAllocator allocator = new UnpooledByteBufAllocator(directBuffer);
486+
int publishCount = 1_000_000;
483487

484488
CountDownLatch consumedLatch = new CountDownLatch(publishCount);
485489
Client.ChunkListener chunkListener =
@@ -494,6 +498,7 @@ void publishAndConsume() throws Exception {
494498
Client client =
495499
cf.get(
496500
new Client.ClientParameters()
501+
.byteBufAllocator(allocator)
497502
.chunkListener(chunkListener)
498503
.messageListener(messageListener));
499504
client.subscribe(b(1), stream, OffsetSpecification.first(), credit);
@@ -504,6 +509,7 @@ void publishAndConsume() throws Exception {
504509
Client publisher =
505510
cf.get(
506511
new Client.ClientParameters()
512+
.byteBufAllocator(allocator)
507513
.publishConfirmListener(
508514
(publisherId, correlationId) -> confirmedLatch.countDown()));
509515
int messageId = 0;

0 commit comments

Comments
 (0)