31
31
import java .util .concurrent .CompletionStage ;
32
32
import java .util .concurrent .ConcurrentHashMap ;
33
33
34
+ import java .util .concurrent .atomic .AtomicReference ;
34
35
import org .apache .commons .logging .Log ;
35
36
import org .apache .commons .logging .LogFactory ;
36
37
import org .apache .commons .pool2 .impl .GenericObjectPool ;
37
38
import org .apache .commons .pool2 .impl .GenericObjectPoolConfig ;
38
39
import org .springframework .beans .factory .DisposableBean ;
40
+ import org .springframework .context .SmartLifecycle ;
39
41
import org .springframework .data .redis .connection .PoolException ;
40
42
import org .springframework .util .Assert ;
41
43
56
58
* @author Mark Paluch
57
59
* @author Christoph Strobl
58
60
* @author Asmir Mustafic
61
+ * @author UHyeon Jeong
59
62
* @since 2.0
60
63
* @see #getConnection(Class)
61
64
*/
62
- class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean {
65
+ class LettucePoolingConnectionProvider implements LettuceConnectionProvider , RedisClientProvider , DisposableBean ,
66
+ SmartLifecycle {
63
67
64
68
private static final Log log = LogFactory .getLog (LettucePoolingConnectionProvider .class );
65
69
70
+ private final AtomicReference <State > state = new AtomicReference <>(State .CREATED );
66
71
private final LettuceConnectionProvider connectionProvider ;
67
72
private final GenericObjectPoolConfig <StatefulConnection <?, ?>> poolConfig ;
68
73
private final Map <StatefulConnection <?, ?>, GenericObjectPool <StatefulConnection <?, ?>>> poolRef = new ConcurrentHashMap <>(
@@ -76,6 +81,10 @@ class LettucePoolingConnectionProvider implements LettuceConnectionProvider, Red
76
81
private final Map <Class <?>, AsyncPool <StatefulConnection <?, ?>>> asyncPools = new ConcurrentHashMap <>(32 );
77
82
private final BoundedPoolConfig asyncPoolConfig ;
78
83
84
+ enum State {
85
+ CREATED , STARTING , STARTED , STOPPING , STOPPED , DESTROYED ;
86
+ }
87
+
79
88
LettucePoolingConnectionProvider (LettuceConnectionProvider connectionProvider ,
80
89
LettucePoolingClientConfiguration clientConfiguration ) {
81
90
@@ -206,39 +215,51 @@ public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection)
206
215
207
216
@ Override
208
217
public void destroy () throws Exception {
218
+ stop ();
219
+ state .set (State .DESTROYED );
220
+ }
209
221
210
- List <CompletableFuture <?>> futures = new ArrayList <>();
211
- if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
212
- log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
213
- }
214
222
215
- if (!inProgressAsyncPoolRef .isEmpty ()) {
223
+ @ Override
224
+ public void start () {
225
+ state .set (State .STARTED );
226
+ }
216
227
217
- log .warn ("LettucePoolingConnectionProvider has active connection retrievals" );
218
- inProgressAsyncPoolRef .forEach ((k , v ) -> futures .add (k .thenApply (StatefulConnection ::closeAsync )));
219
- }
228
+ @ Override
229
+ public void stop () {
230
+ if (state .compareAndSet (State .STARTED , State .STOPPING )) {
231
+ List <CompletableFuture <?>> futures = new ArrayList <>();
232
+ if (!poolRef .isEmpty () || !asyncPoolRef .isEmpty ()) {
233
+ log .warn ("LettucePoolingConnectionProvider contains unreleased connections" );
234
+ }
220
235
221
- if (!poolRef .isEmpty ()) {
236
+ if (!inProgressAsyncPoolRef .isEmpty ()) {
222
237
223
- poolRef . forEach (( connection , pool ) -> pool . returnObject ( connection ) );
224
- poolRef . clear ( );
225
- }
238
+ log . warn ( "LettucePoolingConnectionProvider has active connection retrievals" );
239
+ inProgressAsyncPoolRef . forEach (( k , v ) -> futures . add ( k . thenApply ( StatefulConnection :: closeAsync )) );
240
+ }
226
241
227
- if (!asyncPoolRef .isEmpty ()) {
242
+ if (!poolRef .isEmpty ()) {
228
243
229
- asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
230
- asyncPoolRef .clear ();
231
- }
244
+ poolRef .forEach ((connection , pool ) -> pool .returnObject (connection ));
245
+ poolRef .clear ();
246
+ }
247
+
248
+ if (!asyncPoolRef .isEmpty ()) {
249
+
250
+ asyncPoolRef .forEach ((connection , pool ) -> futures .add (pool .release (connection )));
251
+ asyncPoolRef .clear ();
252
+ }
232
253
233
- pools .forEach ((type , pool ) -> pool .close ());
254
+ pools .forEach ((type , pool ) -> pool .close ());
234
255
235
- CompletableFuture
256
+ CompletableFuture
236
257
.allOf (futures .stream ().map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ()))
237
- .toArray (CompletableFuture []::new )) //
258
+ .toArray (CompletableFuture []::new )) //
238
259
.thenCompose (ignored -> {
239
260
240
261
CompletableFuture [] poolClose = asyncPools .values ().stream ().map (AsyncPool ::closeAsync )
241
- .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
262
+ .map (it -> it .exceptionally (LettuceFutureUtils .ignoreErrors ())).toArray (CompletableFuture []::new );
242
263
243
264
return CompletableFuture .allOf (poolClose );
244
265
}) //
@@ -248,6 +269,18 @@ public void destroy() throws Exception {
248
269
}) //
249
270
.join ();
250
271
251
- pools .clear ();
272
+ pools .clear ();
273
+ }
274
+ state .set (State .STOPPED );
275
+ }
276
+
277
+ @ Override
278
+ public boolean isRunning () {
279
+ return State .STARTED .equals (this .state .get ());
280
+ }
281
+
282
+ @ Override
283
+ public boolean isAutoStartup () {
284
+ return true ;
252
285
}
253
286
}
0 commit comments