-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[DRAFT] Support sub second timeout for BRPOP & BLPOP via RedisListCommands. #2127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
78013be
e5fca25
1894a9f
c2420b4
0f08542
4e0ee3a
7904495
90ba014
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
package org.springframework.data.redis.connection; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.springframework.lang.Nullable; | ||
import org.springframework.util.CollectionUtils; | ||
|
@@ -27,6 +28,7 @@ | |
* @author Christoph Strobl | ||
* @author Mark Paluch | ||
* @author dengliming | ||
* @author ihaohong | ||
*/ | ||
public interface RedisListCommands { | ||
|
||
|
@@ -314,6 +316,21 @@ default Long lPos(byte[] key, byte[] element) { | |
@Nullable | ||
List<byte[]> bLPop(int timeout, byte[]... keys); | ||
|
||
/** | ||
* Removes and returns first element from lists stored at {@code keys}. <br> | ||
* <b>Blocks connection</b> until element available or {@code timeout} reached. | ||
* | ||
* @param timeout seconds to block. | ||
* @param unit time unit for timeout | ||
* @param keys must not be {@literal null}. | ||
* @return empty {@link List} when no element could be popped and the timeout was reached. {@literal null} when used | ||
* in pipeline / transaction. | ||
* @see <a href="https://redis.io/commands/blpop">Redis Documentation: BLPOP</a> | ||
* @see #lPop(byte[]) | ||
*/ | ||
@Nullable | ||
List<byte[]> bLPop(int timeout, TimeUnit unit, byte[]... keys); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: |
||
|
||
/** | ||
* Removes and returns last element from lists stored at {@code keys}. <br> | ||
* <b>Blocks connection</b> until element available or {@code timeout} reached. | ||
|
@@ -328,6 +345,21 @@ default Long lPos(byte[] key, byte[] element) { | |
@Nullable | ||
List<byte[]> bRPop(int timeout, byte[]... keys); | ||
|
||
/** | ||
* Removes and returns last element from lists stored at {@code keys}. <br> | ||
* <b>Blocks connection</b> until element available or {@code timeout} reached. | ||
* | ||
* @param timeout seconds to block. | ||
* @param unit time unit for timeout | ||
* @param keys must not be {@literal null}. | ||
* @return empty {@link List} when no element could be popped and the timeout was reached. {@literal null} when used | ||
* in pipeline / transaction. | ||
* @see <a href="https://redis.io/commands/brpop">Redis Documentation: BRPOP</a> | ||
* @see #rPop(byte[]) | ||
*/ | ||
@Nullable | ||
List<byte[]> bRPop(int timeout, TimeUnit unit, byte[]... keys); | ||
|
||
/** | ||
* Remove the last element from list at {@code srcKey}, append it to {@code dstKey} and return its value. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,14 @@ | |
*/ | ||
package org.springframework.data.redis.connection.jedis; | ||
|
||
import org.springframework.data.redis.core.TimeoutUtils; | ||
import redis.clients.jedis.args.ListDirection; | ||
import redis.clients.jedis.params.LPosParams; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.springframework.dao.DataAccessException; | ||
import org.springframework.data.redis.connection.ClusterSlotHashUtil; | ||
|
@@ -35,6 +37,7 @@ | |
* @author Mark Paluch | ||
* @author Jot Zhao | ||
* @author dengliming | ||
* @author ihaohong | ||
* @since 2.0 | ||
*/ | ||
class JedisClusterListCommands implements RedisListCommands { | ||
|
@@ -361,27 +364,48 @@ public List<byte[]> rPop(byte[] key, long count) { | |
|
||
/* | ||
* (non-Javadoc) | ||
* @see org.springframework.data.redis.connection.RedisListCommands#bLPop(int, byte[][]) | ||
* @see org.springframework.data.redis.connection.RedisListCommands#rPop(byte[], long) | ||
*/ | ||
@Override | ||
public List<byte[]> bLPop(int timeout, byte[]... keys) { | ||
return bLPop(timeout, TimeUnit.SECONDS, keys); | ||
} | ||
|
||
/* | ||
* (non-Javadoc) | ||
* @see org.springframework.data.redis.connection.RedisListCommands#bLPop(int, java.util.concurrent.TimeUnit, byte[][]) | ||
*/ | ||
@Override | ||
public List<byte[]> bLPop(int timeout, TimeUnit unit, byte[]... keys) { | ||
|
||
Assert.notNull(keys, "Key must not be null!"); | ||
Assert.noNullElements(keys, "Keys must not contain null elements!"); | ||
|
||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) { | ||
try { | ||
return connection.getCluster().blpop(timeout, keys); | ||
if (TimeUnit.MILLISECONDS == unit) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that one could use |
||
return connection.getCluster().blpop(TimeoutUtils.toDoubleSeconds(timeout, unit), keys); | ||
} else { | ||
return connection.getCluster().blpop(timeout, keys); | ||
} | ||
} catch (Exception ex) { | ||
throw convertJedisAccessException(ex); | ||
} | ||
} | ||
|
||
return connection.getClusterCommandExecutor() | ||
.executeMultiKeyCommand( | ||
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.blpop(timeout, key), | ||
Arrays.asList(keys)) | ||
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList()); | ||
if (TimeUnit.MILLISECONDS == unit) { | ||
return connection.getClusterCommandExecutor() | ||
.executeMultiKeyCommand( | ||
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.blpop(TimeoutUtils.toDoubleSeconds(timeout, unit), key), | ||
Arrays.asList(keys)) | ||
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList()); | ||
} else { | ||
return connection.getClusterCommandExecutor() | ||
.executeMultiKeyCommand( | ||
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.blpop(timeout, key), | ||
Arrays.asList(keys)) | ||
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList()); | ||
} | ||
} | ||
|
||
/* | ||
|
@@ -390,23 +414,44 @@ public List<byte[]> bLPop(int timeout, byte[]... keys) { | |
*/ | ||
@Override | ||
public List<byte[]> bRPop(int timeout, byte[]... keys) { | ||
return bRPop(timeout, TimeUnit.SECONDS, keys); | ||
} | ||
|
||
/* | ||
* (non-Javadoc) | ||
* @see org.springframework.data.redis.connection.RedisListCommands#bRPop(int, byte[][]) | ||
*/ | ||
@Override | ||
public List<byte[]> bRPop(int timeout, TimeUnit unit, byte[]... keys) { | ||
|
||
Assert.notNull(keys, "Key must not be null!"); | ||
Assert.noNullElements(keys, "Keys must not contain null elements!"); | ||
|
||
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) { | ||
try { | ||
return connection.getCluster().brpop(timeout, keys); | ||
if (TimeUnit.MILLISECONDS == unit) { | ||
return connection.getCluster().brpop(TimeoutUtils.toDoubleSeconds(timeout, unit), keys); | ||
} else { | ||
return connection.getCluster().brpop(timeout, keys); | ||
} | ||
} catch (Exception ex) { | ||
throw convertJedisAccessException(ex); | ||
} | ||
} | ||
|
||
return connection.getClusterCommandExecutor() | ||
.executeMultiKeyCommand( | ||
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.brpop(timeout, key), | ||
Arrays.asList(keys)) | ||
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList()); | ||
if (TimeUnit.MILLISECONDS == unit) { | ||
return connection.getClusterCommandExecutor() | ||
.executeMultiKeyCommand( | ||
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.brpop(TimeoutUtils.toDoubleSeconds(timeout, unit), key), | ||
Arrays.asList(keys)) | ||
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList()); | ||
} else { | ||
return connection.getClusterCommandExecutor() | ||
.executeMultiKeyCommand( | ||
(JedisMultiKeyClusterCommandCallback<List<byte[]>>) (client, key) -> client.brpop(timeout, key), | ||
Arrays.asList(keys)) | ||
.getFirstNonNullNotEmptyOrDefault(Collections.<byte[]> emptyList()); | ||
} | ||
} | ||
|
||
/* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd like to keep
Duration
as that is the more expressive API.TimeUnit
and timeout always require a tuple.