Skip to content

[DRAFT] Support sub second timeout for BRPOP & BLPOP via RedisListCommands. #2127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ public List<byte[]> bLPop(int timeout, byte[]... keys) {
return convertAndReturn(delegate.bLPop(timeout, keys), Converters.identityConverter());
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisListCommands#bLPop(int, java.util.concurrent.TimeUnit, byte[][])
*/
@Override
public List<byte[]> bLPop(int timeout, TimeUnit unit, byte[]... keys) {
return convertAndReturn(delegate.bLPop(timeout, unit, keys), Converters.identityConverter());
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisListCommands#bRPop(int, byte[][])
Expand All @@ -233,6 +242,15 @@ public List<byte[]> bRPop(int timeout, byte[]... keys) {
return convertAndReturn(delegate.bRPop(timeout, keys), Converters.identityConverter());
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisListCommands#bRPop(int, java.util.concurrent.TimeUnit byte[][])
*/
@Override
public List<byte[]> bRPop(int timeout, TimeUnit unit, byte[]... keys) {
return convertAndReturn(delegate.bRPop(timeout, unit, keys), Converters.identityConverter());
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisListCommands#bRPopLPush(int, byte[], byte[])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,13 +779,28 @@ default List<byte[]> bLPop(int timeout, byte[]... keys) {
return listCommands().bLPop(timeout, keys);
}

/** @deprecated in favor of {@link RedisConnection#listCommands()}}. */
@Override
@Deprecated
default List<byte[]> bLPop(int timeout, TimeUnit unit, byte[]... keys) {
return listCommands().bLPop(timeout, unit, keys);
}

/** @deprecated in favor of {@link RedisConnection#listCommands()}}. */
@Override
@Deprecated
default List<byte[]> bRPop(int timeout, byte[]... keys) {
return listCommands().bRPop(timeout, keys);
}

/** @deprecated in favor of {@link RedisConnection#listCommands()}}. */
@Override
@Deprecated
default List<byte[]> bRPop(int timeout, TimeUnit unit, byte[]... keys) {
return listCommands().bRPop(timeout, unit, keys);
}


/** @deprecated in favor of {@link RedisConnection#listCommands()}}. */
@Override
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
Expand All @@ -43,6 +44,7 @@
* @author Christoph Strobl
* @author Mark Paluch
* @author dengliming
* @author ihaohong
* @since 2.0
*/
public interface ReactiveListCommands {
Expand Down Expand Up @@ -1233,13 +1235,15 @@ default Flux<ByteBuffer> rPop(ByteBuffer key, long count) {
class BPopCommand implements Command {

private final List<ByteBuffer> keys;
private final Duration timeout;
private final @Nullable Long timeout;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd like to keep Duration as that is the more expressive API. TimeUnit and timeout always require a tuple.

private final @Nullable TimeUnit timeUnit;
private final Direction direction;

private BPopCommand(List<ByteBuffer> keys, Duration timeout, Direction direction) {
private BPopCommand(List<ByteBuffer> keys, @Nullable Long timeout, @Nullable TimeUnit timeUnit, Direction direction) {

this.keys = keys;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.direction = direction;
}

Expand All @@ -1249,7 +1253,7 @@ private BPopCommand(List<ByteBuffer> keys, Duration timeout, Direction direction
* @return a new {@link BPopCommand} for right push ({@literal BRPOP}).
*/
public static BPopCommand right() {
return new BPopCommand(Collections.emptyList(), Duration.ZERO, Direction.RIGHT);
return new BPopCommand(Collections.emptyList(), null, null, Direction.RIGHT);
}

/**
Expand All @@ -1258,7 +1262,7 @@ public static BPopCommand right() {
* @return a new {@link BPopCommand} for right push ({@literal BLPOP}).
*/
public static BPopCommand left() {
return new BPopCommand(Collections.emptyList(), Duration.ZERO, Direction.LEFT);
return new BPopCommand(Collections.emptyList(), null, null, Direction.LEFT);
}

/**
Expand All @@ -1271,7 +1275,7 @@ public BPopCommand from(List<ByteBuffer> keys) {

Assert.notNull(keys, "Keys must not be null!");

return new BPopCommand(new ArrayList<>(keys), Duration.ZERO, direction);
return new BPopCommand(new ArrayList<>(keys), null, null, direction);
}

/**
Expand All @@ -1284,7 +1288,20 @@ public BPopCommand blockingFor(Duration timeout) {

Assert.notNull(timeout, "Timeout must not be null!");

return new BPopCommand(keys, timeout, direction);
return blockingFor(timeout.toMillis(), TimeUnit.MICROSECONDS);
}

/**
* Applies a {@link Duration timeout}. Constructs a new command instance with all previously configured properties.
*
* @param timeout must not be {@literal null}.
* @param timeUnit must node be {@literal null}.
* @return a new {@link BPopCommand} with {@link Duration timeout} applied.
*/
public BPopCommand blockingFor(long timeout, TimeUnit timeUnit) {
Assert.notNull(timeUnit, "TimeUnit must not be null!");

return new BPopCommand(getKeys(), timeout, timeUnit, direction);
}

/* (non-Javadoc)
Expand All @@ -1305,10 +1322,17 @@ public List<ByteBuffer> getKeys() {
/**
* @return
*/
public Duration getTimeout() {
public Long getTimeout() {
return timeout;
}

/**
* @return
*/
public TimeUnit getTimeUnit() {
return timeUnit;
}

/**
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.data.redis.connection;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
Expand All @@ -27,6 +28,7 @@
* @author Christoph Strobl
* @author Mark Paluch
* @author dengliming
* @author ihaohong
*/
public interface RedisListCommands {

Expand Down Expand Up @@ -314,6 +316,21 @@ default Long lPos(byte[] key, byte[] element) {
@Nullable
List<byte[]> bLPop(int timeout, byte[]... keys);

/**
* Removes and returns first element from lists stored at {@code keys}. <br>
* <b>Blocks connection</b> until element available or {@code timeout} reached.
*
* @param timeout seconds to block.
* @param unit time unit for timeout
* @param keys must not be {@literal null}.
* @return empty {@link List} when no element could be popped and the timeout was reached. {@literal null} when used
* in pipeline / transaction.
* @see <a href="https://redis.io/commands/blpop">Redis Documentation: BLPOP</a>
* @see #lPop(byte[])
*/
@Nullable
List<byte[]> bLPop(int timeout, TimeUnit unit, byte[]... keys);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: @since 2.6


/**
* Removes and returns last element from lists stored at {@code keys}. <br>
* <b>Blocks connection</b> until element available or {@code timeout} reached.
Expand All @@ -328,6 +345,21 @@ default Long lPos(byte[] key, byte[] element) {
@Nullable
List<byte[]> bRPop(int timeout, byte[]... keys);

/**
* Removes and returns last element from lists stored at {@code keys}. <br>
* <b>Blocks connection</b> until element available or {@code timeout} reached.
*
* @param timeout seconds to block.
* @param unit time unit for timeout
* @param keys must not be {@literal null}.
* @return empty {@link List} when no element could be popped and the timeout was reached. {@literal null} when used
* in pipeline / transaction.
* @see <a href="https://redis.io/commands/brpop">Redis Documentation: BRPOP</a>
* @see #rPop(byte[])
*/
@Nullable
List<byte[]> bRPop(int timeout, TimeUnit unit, byte[]... keys);

/**
* Remove the last element from list at {@code srcKey}, append it to {@code dstKey} and return its value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package org.springframework.data.redis.connection.jedis;

import org.springframework.data.redis.core.TimeoutUtils;
import redis.clients.jedis.args.ListDirection;
import redis.clients.jedis.params.LPosParams;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.ClusterSlotHashUtil;
Expand All @@ -35,6 +37,7 @@
* @author Mark Paluch
* @author Jot Zhao
* @author dengliming
* @author ihaohong
* @since 2.0
*/
class JedisClusterListCommands implements RedisListCommands {
Expand Down Expand Up @@ -361,27 +364,48 @@ public List<byte[]> rPop(byte[] key, long count) {

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisListCommands#bLPop(int, byte[][])
* @see org.springframework.data.redis.connection.RedisListCommands#rPop(byte[], long)
*/
@Override
public List<byte[]> bLPop(int timeout, byte[]... keys) {
return bLPop(timeout, TimeUnit.SECONDS, keys);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisListCommands#bLPop(int, java.util.concurrent.TimeUnit, byte[][])
*/
@Override
public List<byte[]> bLPop(int timeout, TimeUnit unit, byte[]... keys) {

Assert.notNull(keys, "Key must not be null!");
Assert.noNullElements(keys, "Keys must not contain null elements!");

if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
try {
return connection.getCluster().blpop(timeout, keys);
if (TimeUnit.MILLISECONDS == unit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that one could use MICROSECONDS and NANOSECONDS as units. Ideally, we add a utility method to TimeoutUtils along the lines of supportsDoubleSeconds or isSubSecondUnit instead of repeating the same code to check the timeout across the code base.

return connection.getCluster().blpop(TimeoutUtils.toDoubleSeconds(timeout, unit), keys);
} else {
return connection.getCluster().blpop(timeout, keys);
}
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}

return connection.getClusterCommandExecutor()
.executeMultiKeyCommand(
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.blpop(timeout, key),
Arrays.asList(keys))
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList());
if (TimeUnit.MILLISECONDS == unit) {
return connection.getClusterCommandExecutor()
.executeMultiKeyCommand(
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.blpop(TimeoutUtils.toDoubleSeconds(timeout, unit), key),
Arrays.asList(keys))
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList());
} else {
return connection.getClusterCommandExecutor()
.executeMultiKeyCommand(
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.blpop(timeout, key),
Arrays.asList(keys))
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList());
}
}

/*
Expand All @@ -390,23 +414,44 @@ public List<byte[]> bLPop(int timeout, byte[]... keys) {
*/
@Override
public List<byte[]> bRPop(int timeout, byte[]... keys) {
return bRPop(timeout, TimeUnit.SECONDS, keys);
}

/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisListCommands#bRPop(int, byte[][])
*/
@Override
public List<byte[]> bRPop(int timeout, TimeUnit unit, byte[]... keys) {

Assert.notNull(keys, "Key must not be null!");
Assert.noNullElements(keys, "Keys must not contain null elements!");

if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
try {
return connection.getCluster().brpop(timeout, keys);
if (TimeUnit.MILLISECONDS == unit) {
return connection.getCluster().brpop(TimeoutUtils.toDoubleSeconds(timeout, unit), keys);
} else {
return connection.getCluster().brpop(timeout, keys);
}
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}

return connection.getClusterCommandExecutor()
.executeMultiKeyCommand(
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.brpop(timeout, key),
Arrays.asList(keys))
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList());
if (TimeUnit.MILLISECONDS == unit) {
return connection.getClusterCommandExecutor()
.executeMultiKeyCommand(
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.brpop(TimeoutUtils.toDoubleSeconds(timeout, unit), key),
Arrays.asList(keys))
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList());
} else {
return connection.getClusterCommandExecutor()
.executeMultiKeyCommand(
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.brpop(timeout, key),
Arrays.asList(keys))
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList());
}
}

/*
Expand Down
Loading