Skip to content

Commit 622be3b

Browse files
committed
Use wrapper class instead of AtomicBoolean
1 parent 07c1365 commit 622be3b

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import com.rabbitmq.stream.impl.Client.StreamMetadata;
6969
import com.rabbitmq.stream.impl.Client.StreamStatsResponse;
7070
import com.rabbitmq.stream.impl.Client.SubscriptionOffset;
71+
import com.rabbitmq.stream.impl.Utils.MutableBoolean;
7172
import com.rabbitmq.stream.metrics.MetricsCollector;
7273
import io.netty.buffer.ByteBuf;
7374
import io.netty.buffer.ByteBufInputStream;
@@ -85,7 +86,6 @@
8586
import java.util.Map;
8687
import java.util.Objects;
8788
import java.util.concurrent.ConcurrentMap;
88-
import java.util.concurrent.atomic.AtomicBoolean;
8989
import org.slf4j.Logger;
9090
import org.slf4j.LoggerFactory;
9191

@@ -345,7 +345,7 @@ static int handleMessage(
345345
ByteBuf bb,
346346
int read,
347347
boolean filter,
348-
AtomicBoolean messageFiltered,
348+
MutableBoolean messageFiltered,
349349
long offset,
350350
long offsetLimit,
351351
long chunkTimestamp,
@@ -465,7 +465,6 @@ static int handleDeliver(
465465
final boolean filter = offsetLimit != -1;
466466

467467
try {
468-
// TODO handle exception in exception handler
469468
chunkChecksum.checksum(message, dataLength, crc);
470469
} catch (ChunkChecksumValidationException e) {
471470
LOGGER.warn(
@@ -478,7 +477,7 @@ static int handleDeliver(
478477

479478
metricsCollector.chunk(numEntries);
480479
long messagesRead = 0;
481-
AtomicBoolean messageFiltered = new AtomicBoolean(false);
480+
MutableBoolean messageFiltered = new MutableBoolean(false);
482481

483482
while (numRecords != 0) {
484483
byte entryType = message.readByte();
@@ -539,7 +538,7 @@ static int handleDeliver(
539538
ByteBuf outBb = client.channel.alloc().heapBuffer(uncompressedDataSize);
540539
ByteBuf slice = message.slice(message.readerIndex(), dataSize);
541540
InputStream inputStream = compressionCodec.decompress(new ByteBufInputStream(slice));
542-
byte[] inBuffer = new byte[uncompressedDataSize < 1024 ? uncompressedDataSize : 1024];
541+
byte[] inBuffer = new byte[Math.min(uncompressedDataSize, 1024)];
543542
int n;
544543
try {
545544
while (-1 != (n = inputStream.read(inBuffer))) {

src/main/java/com/rabbitmq/stream/impl/Utils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,4 +630,21 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
630630
@Override
631631
public void execute(Runnable command) {}
632632
}
633+
634+
static class MutableBoolean {
635+
636+
private boolean value;
637+
638+
MutableBoolean(boolean initialValue) {
639+
this.value = initialValue;
640+
}
641+
642+
void set(boolean value) {
643+
this.value = value;
644+
}
645+
646+
boolean get() {
647+
return this.value;
648+
}
649+
}
633650
}

0 commit comments

Comments
 (0)