30
30
import io .lettuce .core .internal .ExceptionFactory ;
31
31
import io .lettuce .core .internal .LettuceAssert ;
32
32
import io .lettuce .core .resource .ClientResources ;
33
+ import io .netty .util .Timeout ;
34
+ import io .netty .util .Timer ;
33
35
34
36
/**
35
37
* Extension to {@link RedisChannelWriter} that expires commands. Command timeout starts at the time the command is written
36
38
* regardless to {@link #setAutoFlushCommands(boolean) flushing mode} (user-controlled batching).
37
39
*
38
40
* @author Mark Paluch
41
+ * @author Tianyi Yang
39
42
* @since 5.1
40
43
* @see io.lettuce.core.TimeoutOptions
41
44
*/
@@ -49,6 +52,8 @@ public class CommandExpiryWriter implements RedisChannelWriter {
49
52
50
53
private final ScheduledExecutorService executorService ;
51
54
55
+ private final Timer timer ;
56
+
52
57
private final boolean applyConnectionTimeout ;
53
58
54
59
private volatile long timeout = -1 ;
@@ -72,6 +77,7 @@ public CommandExpiryWriter(RedisChannelWriter delegate, ClientOptions clientOpti
72
77
this .applyConnectionTimeout = timeoutOptions .isApplyConnectionTimeout ();
73
78
this .timeUnit = source .getTimeUnit ();
74
79
this .executorService = clientResources .eventExecutorGroup ();
80
+ this .timer = clientResources .timer ();
75
81
}
76
82
77
83
/**
@@ -168,24 +174,19 @@ private void potentiallyExpire(RedisCommand<?, ?, ?> command, ScheduledExecutorS
168
174
if (timeout <= 0 ) {
169
175
return ;
170
176
}
171
-
172
- ScheduledFuture <?> schedule = executors .schedule (() -> {
173
-
177
+
178
+ Timeout commandTimeout = timer .newTimeout (t -> {
174
179
if (!command .isDone ()) {
175
- command .completeExceptionally (
176
- ExceptionFactory .createTimeoutException (Duration .ofNanos (timeUnit .toNanos (timeout ))));
177
- }
180
+ executors .submit (() -> command .completeExceptionally (
181
+ ExceptionFactory .createTimeoutException (Duration .ofNanos (timeUnit .toNanos (timeout )))));
178
182
183
+ }
179
184
}, timeout , timeUnit );
180
185
181
186
if (command instanceof CompleteableCommand ) {
182
- ((CompleteableCommand ) command ).onComplete ((o , o2 ) -> {
183
-
184
- if (!schedule .isDone ()) {
185
- schedule .cancel (false );
186
- }
187
- });
187
+ ((CompleteableCommand ) command ).onComplete ((o , o2 ) -> commandTimeout .cancel ());
188
188
}
189
+
189
190
}
190
191
191
192
}
0 commit comments