22
22
import org .apache .commons .logging .Log ;
23
23
import org .apache .commons .logging .LogFactory ;
24
24
25
- import org .springframework .core .Ordered ;
26
25
import org .springframework .dao .DataAccessResourceFailureException ;
27
26
import org .springframework .lang .Nullable ;
28
- import org .springframework .transaction .NoTransactionException ;
29
- import org .springframework .transaction .reactive .TransactionSynchronization ;
30
27
import org .springframework .transaction .reactive .TransactionSynchronizationManager ;
31
28
import org .springframework .util .Assert ;
32
29
@@ -68,8 +65,7 @@ private ConnectionFactoryUtils() {}
68
65
* @see #releaseConnection
69
66
*/
70
67
public static Mono <Connection > getConnection (ConnectionFactory connectionFactory ) {
71
- return doGetConnection (connectionFactory )
72
- .onErrorMap (e -> new DataAccessResourceFailureException ("Failed to obtain R2DBC Connection" , e ));
68
+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .getConnection (connectionFactory );
73
69
}
74
70
75
71
/**
@@ -83,64 +79,7 @@ public static Mono<Connection> getConnection(ConnectionFactory connectionFactory
83
79
* @return a R2DBC {@link io.r2dbc.spi.Connection} from the given {@link io.r2dbc.spi.ConnectionFactory}.
84
80
*/
85
81
public static Mono <Connection > doGetConnection (ConnectionFactory connectionFactory ) {
86
-
87
- Assert .notNull (connectionFactory , "ConnectionFactory must not be null!" );
88
-
89
- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (synchronizationManager -> {
90
-
91
- ConnectionHolder conHolder = (ConnectionHolder ) synchronizationManager .getResource (connectionFactory );
92
- if (conHolder != null && (conHolder .hasConnection () || conHolder .isSynchronizedWithTransaction ())) {
93
- conHolder .requested ();
94
- if (!conHolder .hasConnection ()) {
95
-
96
- if (logger .isDebugEnabled ()) {
97
- logger .debug ("Fetching resumed R2DBC Connection from ConnectionFactory" );
98
- }
99
- return fetchConnection (connectionFactory ).doOnNext (conHolder ::setConnection );
100
- }
101
- return Mono .just (conHolder .getConnection ());
102
- }
103
- // Else we either got no holder or an empty thread-bound holder here.
104
-
105
- if (logger .isDebugEnabled ()) {
106
- logger .debug ("Fetching R2DBC Connection from ConnectionFactory" );
107
- }
108
-
109
- Mono <Connection > con = fetchConnection (connectionFactory );
110
-
111
- if (synchronizationManager .isSynchronizationActive ()) {
112
-
113
- return con .flatMap (it -> {
114
-
115
- return Mono .just (it ).doOnNext (conn -> {
116
-
117
- // Use same Connection for further R2DBC actions within the transaction.
118
- // Thread-bound object will get removed by synchronization at transaction completion.
119
- ConnectionHolder holderToUse = conHolder ;
120
- if (holderToUse == null ) {
121
- holderToUse = new ConnectionHolder (conn );
122
- } else {
123
- holderToUse .setConnection (conn );
124
- }
125
- holderToUse .requested ();
126
- synchronizationManager
127
- .registerSynchronization (new ConnectionSynchronization (holderToUse , connectionFactory ));
128
- holderToUse .setSynchronizedWithTransaction (true );
129
- if (holderToUse != conHolder ) {
130
- synchronizationManager .bindResource (connectionFactory , holderToUse );
131
- }
132
- }).onErrorResume (e -> {
133
- // Unexpected exception from external delegation call -> close Connection and rethrow.
134
- return releaseConnection (it , connectionFactory ).then (Mono .error (e ));
135
- });
136
- });
137
- }
138
-
139
- return con ;
140
- }) //
141
- .onErrorResume (NoTransactionException .class , e -> {
142
- return Mono .from (connectionFactory .create ());
143
- });
82
+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .doGetConnection (connectionFactory );
144
83
}
145
84
146
85
/**
@@ -183,17 +122,8 @@ public static Mono<Void> releaseConnection(io.r2dbc.spi.Connection con, Connecti
183
122
public static Mono <Void > doReleaseConnection (io .r2dbc .spi .Connection connection ,
184
123
ConnectionFactory connectionFactory ) {
185
124
186
- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
187
-
188
- ConnectionHolder conHolder = (ConnectionHolder ) it .getResource (connectionFactory );
189
- if (conHolder != null && connectionEquals (conHolder , connection )) {
190
- // It's the transactional Connection: Don't close it.
191
- conHolder .released ();
192
- }
193
- return Mono .from (connection .close ());
194
- }).onErrorResume (NoTransactionException .class , e -> {
195
- return doCloseConnection (connection , connectionFactory );
196
- });
125
+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .doReleaseConnection (connection ,
126
+ connectionFactory );
197
127
}
198
128
199
129
/**
@@ -246,37 +176,7 @@ public static Mono<Void> doCloseConnection(Connection connection, @Nullable Conn
246
176
*/
247
177
public static Mono <ConnectionFactory > currentConnectionFactory (ConnectionFactory connectionFactory ) {
248
178
249
- return TransactionSynchronizationManager .forCurrentTransaction ()
250
- .filter (TransactionSynchronizationManager ::isSynchronizationActive ).filter (it -> {
251
-
252
- ConnectionHolder conHolder = (ConnectionHolder ) it .getResource (connectionFactory );
253
- if (conHolder != null && (conHolder .hasConnection () || conHolder .isSynchronizedWithTransaction ())) {
254
- return true ;
255
- }
256
- return false ;
257
- }).map (it -> connectionFactory );
258
- }
259
-
260
- /**
261
- * Determine whether the given two {@link io.r2dbc.spi.Connection}s are equal, asking the target
262
- * {@link io.r2dbc.spi.Connection} in case of a proxy. Used to detect equality even if the user passed in a raw target
263
- * Connection while the held one is a proxy.
264
- *
265
- * @param conHolder the {@link .ConnectionHolder} for the held {@link io.r2dbc.spi.Connection} (potentially a proxy).
266
- * @param passedInCon the {@link io.r2dbc.spi.Connection} passed-in by the user (potentially a target
267
- * {@link io.r2dbc.spi.Connection} without proxy).
268
- * @return whether the given Connections are equal
269
- * @see #getTargetConnection
270
- */
271
- private static boolean connectionEquals (ConnectionHolder conHolder , Connection passedInCon ) {
272
-
273
- if (!conHolder .hasConnection ()) {
274
- return false ;
275
- }
276
- Connection heldCon = conHolder .getConnection ();
277
- // Explicitly check for identity too: for Connection handles that do not implement
278
- // "equals" properly).
279
- return (heldCon == passedInCon || heldCon .equals (passedInCon ) || getTargetConnection (heldCon ).equals (passedInCon ));
179
+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .currentConnectionFactory (connectionFactory );
280
180
}
281
181
282
182
/**
@@ -317,112 +217,4 @@ private static int getConnectionSynchronizationOrder(ConnectionFactory connectio
317
217
return order ;
318
218
}
319
219
320
- /**
321
- * Callback for resource cleanup at the end of a non-native R2DBC transaction.
322
- */
323
- private static class ConnectionSynchronization implements TransactionSynchronization , Ordered {
324
-
325
- private final ConnectionHolder connectionHolder ;
326
-
327
- private final ConnectionFactory connectionFactory ;
328
-
329
- private int order ;
330
-
331
- private boolean holderActive = true ;
332
-
333
- ConnectionSynchronization (ConnectionHolder connectionHolder , ConnectionFactory connectionFactory ) {
334
- this .connectionHolder = connectionHolder ;
335
- this .connectionFactory = connectionFactory ;
336
- this .order = getConnectionSynchronizationOrder (connectionFactory );
337
- }
338
-
339
- @ Override
340
- public int getOrder () {
341
- return this .order ;
342
- }
343
-
344
- @ Override
345
- public Mono <Void > suspend () {
346
- if (this .holderActive ) {
347
-
348
- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
349
-
350
- it .unbindResource (this .connectionFactory );
351
- if (this .connectionHolder .hasConnection () && !this .connectionHolder .isOpen ()) {
352
- // Release Connection on suspend if the application doesn't keep
353
- // a handle to it anymore. We will fetch a fresh Connection if the
354
- // application accesses the ConnectionHolder again after resume,
355
- // assuming that it will participate in the same transaction.
356
- return releaseConnection (this .connectionHolder .getConnection (), this .connectionFactory )
357
- .doOnTerminate (() -> this .connectionHolder .setConnection (null ));
358
- }
359
- return Mono .empty ();
360
- });
361
- }
362
-
363
- return Mono .empty ();
364
- }
365
-
366
- @ Override
367
- public Mono <Void > resume () {
368
- if (this .holderActive ) {
369
- return TransactionSynchronizationManager .forCurrentTransaction ().doOnNext (it -> {
370
- it .bindResource (this .connectionFactory , this .connectionHolder );
371
- }).then ();
372
- }
373
- return Mono .empty ();
374
- }
375
-
376
- @ Override
377
- public Mono <Void > beforeCompletion () {
378
-
379
- // Release Connection early if the holder is not open anymore
380
- // (that is, not used by another resource
381
- // that has its own cleanup via transaction synchronization),
382
- // to avoid issues with strict transaction implementations that expect
383
- // the close call before transaction completion.
384
- if (!this .connectionHolder .isOpen ()) {
385
- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
386
-
387
- it .unbindResource (this .connectionFactory );
388
- this .holderActive = false ;
389
- if (this .connectionHolder .hasConnection ()) {
390
- return releaseConnection (this .connectionHolder .getConnection (), this .connectionFactory );
391
- }
392
- return Mono .empty ();
393
- });
394
- }
395
-
396
- return Mono .empty ();
397
- }
398
-
399
- @ Override
400
- public Mono <Void > afterCompletion (int status ) {
401
-
402
- // If we haven't closed the Connection in beforeCompletion,
403
- // close it now. The holder might have been used for other
404
- // cleanup in the meantime, for example by a Hibernate Session.
405
- if (this .holderActive ) {
406
- // The thread-bound ConnectionHolder might not be available anymore,
407
- // since afterCompletion might get called from a different thread.
408
- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
409
-
410
- it .unbindResourceIfPossible (this .connectionFactory );
411
- this .holderActive = false ;
412
- if (this .connectionHolder .hasConnection ()) {
413
- return releaseConnection (this .connectionHolder .getConnection (), this .connectionFactory )
414
- .doOnTerminate (() -> {
415
- // Reset the ConnectionHolder: It might remain bound to the context.
416
- this .connectionHolder .setConnection (null );
417
- });
418
- }
419
-
420
- return Mono .empty ();
421
- });
422
- }
423
-
424
- this .connectionHolder .reset ();
425
- return Mono .empty ();
426
- }
427
- }
428
220
}
0 commit comments