|
53 | 53 | import java.util.concurrent.Future;
|
54 | 54 | import java.util.concurrent.TimeUnit;
|
55 | 55 | import java.util.concurrent.atomic.AtomicLong;
|
| 56 | +import java.util.function.Function; |
56 | 57 | import java.util.function.Supplier;
|
57 | 58 |
|
58 | 59 | import org.springframework.beans.BeanUtils;
|
@@ -936,6 +937,79 @@ void pipeline(LettuceResult result) {
|
936 | 937 | }
|
937 | 938 | }
|
938 | 939 |
|
| 940 | + <S, T> void executeInPipeline(Function<RedisClusterAsyncCommands, RedisFuture<S>> command, |
| 941 | + Converter<S, T> converter) { |
| 942 | + |
| 943 | + try { |
| 944 | + pipeline(newLettuceResult(command.apply(getAsyncConnection()), converter)); |
| 945 | + } catch (Exception ex) { |
| 946 | + throw convertLettuceAccessException(ex); |
| 947 | + } |
| 948 | + } |
| 949 | + |
| 950 | + <S, T> void executeInTx(Function<RedisClusterAsyncCommands, RedisFuture<S>> command, Converter<S, T> converter) { |
| 951 | + |
| 952 | + try { |
| 953 | + transaction(newLettuceResult(command.apply(getAsyncConnection()), converter)); |
| 954 | + } catch (Exception ex) { |
| 955 | + throw convertLettuceAccessException(ex); |
| 956 | + } |
| 957 | + } |
| 958 | + |
| 959 | + <T> NullableResult<T> execute(Function<RedisClusterCommands, T> command) { |
| 960 | + |
| 961 | + try { |
| 962 | + |
| 963 | + T result = command.apply(getConnection()); |
| 964 | + return NullableResult.of(result); |
| 965 | + } catch (Exception ex) { |
| 966 | + throw convertLettuceAccessException(ex); |
| 967 | + } |
| 968 | + } |
| 969 | + |
| 970 | + <T> RedisFuture<T> executeAsync(Function<RedisClusterAsyncCommands, RedisFuture<T>> command) { |
| 971 | + |
| 972 | + try { |
| 973 | + return command.apply(getAsyncConnection()); |
| 974 | + } catch (Exception ex) { |
| 975 | + throw convertLettuceAccessException(ex); |
| 976 | + } |
| 977 | + } |
| 978 | + |
| 979 | + <S, T> T invoke(RedisFuture<S> future, Converter<S, T> converter) { |
| 980 | + |
| 981 | + if (isPipelined()) { |
| 982 | + pipeline(newLettuceResult(future, converter)); |
| 983 | + return null; |
| 984 | + } |
| 985 | + if (isQueueing()) { |
| 986 | + transaction(newLettuceResult(future, converter)); |
| 987 | + return null; |
| 988 | + } |
| 989 | + return NullableResult.of((S) await(future)).convert(converter).get(); |
| 990 | + } |
| 991 | + |
| 992 | + <T> T execute(Function<RedisClusterCommands, T> sync, Function<RedisClusterAsyncCommands, RedisFuture<T>> async) { |
| 993 | + return execute(sync, async, val -> val); |
| 994 | + } |
| 995 | + |
| 996 | + // use a future here and only async. |
| 997 | + <S, T> T execute(Function<RedisClusterCommands, S> sync, Function<RedisClusterAsyncCommands, RedisFuture<S>> async, |
| 998 | + Converter<S, T> converter) { |
| 999 | + |
| 1000 | + if (isPipelined()) { |
| 1001 | + executeInPipeline(async, converter); |
| 1002 | + return null; |
| 1003 | + } |
| 1004 | + if (isQueueing()) { |
| 1005 | + executeInTx(async, converter); |
| 1006 | + return null; |
| 1007 | + } |
| 1008 | + |
| 1009 | + // return execute(sync).convert(converter).get(); |
| 1010 | + return NullableResult.of((S) await(executeAsync(async))).convert(converter).get(); |
| 1011 | + } |
| 1012 | + |
939 | 1013 | void transaction(FutureResult<?> result) {
|
940 | 1014 | txResults.add(result);
|
941 | 1015 | }
|
|
0 commit comments