Skip to content

Commit faffc7e

Browse files
committed
Switch to ConcurrentLinkedQueue to avoid expensive size calls #2602
See also #2601
1 parent 6185ebd commit faffc7e

File tree

2 files changed

+23
-6
lines changed

2 files changed

+23
-6
lines changed

src/main/java/io/lettuce/core/internal/LettuceFactories.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Queue;
2121
import java.util.concurrent.ArrayBlockingQueue;
2222
import java.util.concurrent.BlockingQueue;
23-
import java.util.concurrent.ConcurrentLinkedDeque;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2424
import java.util.concurrent.LinkedBlockingQueue;
2525

2626
/**
@@ -41,13 +41,13 @@ public class LettuceFactories {
4141
/**
4242
* Creates a new, optionally bounded, {@link Queue} that does not require external synchronization.
4343
*
44-
* @param maxSize queue size. If {@link Integer#MAX_VALUE}, then creates an {@link ConcurrentLinkedDeque unbounded queue}.
44+
* @param maxSize queue size. If {@link Integer#MAX_VALUE}, then creates an {@link ConcurrentLinkedQueue unbounded queue}.
4545
* @return a new, empty {@link Queue}.
4646
*/
4747
public static <T> Queue<T> newConcurrentQueue(int maxSize) {
4848

4949
if (maxSize == Integer.MAX_VALUE) {
50-
return new ConcurrentLinkedDeque<>();
50+
return new ConcurrentLinkedQueue<>();
5151
}
5252

5353
return maxSize > ARRAY_QUEUE_THRESHOLD ? new LinkedBlockingQueue<>(maxSize) : new ArrayBlockingQueue<>(maxSize);

src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) {
696696
logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", logPrefix(), commands.size());
697697
}
698698

699-
commands.addAll(drainCommands(disconnectedBuffer));
699+
drainCommands(disconnectedBuffer, commands);
700700

701701
for (RedisCommand<?, ?, ?> command : commands) {
702702

@@ -745,8 +745,8 @@ protected <T> T doExclusive(Supplier<T> supplier) {
745745

746746
List<RedisCommand<?, ?, ?>> target = new ArrayList<>(disconnectedBuffer.size() + commandBuffer.size());
747747

748-
target.addAll(drainCommands(disconnectedBuffer));
749-
target.addAll(drainCommands(commandBuffer));
748+
drainCommands(disconnectedBuffer, target);
749+
drainCommands(commandBuffer, target);
750750

751751
return target;
752752
}
@@ -769,9 +769,26 @@ protected <T> T doExclusive(Supplier<T> supplier) {
769769
}
770770
}
771771

772+
drainCommands(source, target);
772773
return target;
773774
}
774775

776+
/**
777+
* Drain commands from a queue and return only active commands.
778+
*
779+
* @param source the source queue.
780+
*/
781+
private static void drainCommands(Queue<? extends RedisCommand<?, ?, ?>> source, Collection<RedisCommand<?, ?, ?>> target) {
782+
783+
RedisCommand<?, ?, ?> cmd;
784+
while ((cmd = source.poll()) != null) {
785+
786+
if (!cmd.isDone() && !ActivationCommand.isActivationCommand(cmd)) {
787+
target.add(cmd);
788+
}
789+
}
790+
}
791+
775792
private void cancelBufferedCommands(String message) {
776793
cancelCommands(message, doExclusive(this::drainCommands), RedisCommand::cancel);
777794
}

0 commit comments

Comments
 (0)