18
18
import reactor .core .publisher .Flux ;
19
19
import reactor .core .publisher .Mono ;
20
20
21
+ import java .lang .reflect .InvocationTargetException ;
21
22
import java .time .Duration ;
22
23
import java .util .ArrayList ;
23
24
import java .util .List ;
@@ -89,6 +90,7 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
89
90
@ Stability .Internal
90
91
<T > Flux <T > executeReactive (TransactionDefinition definition ,
91
92
org .springframework .transaction .reactive .TransactionCallback <T > callback ) {
93
+ final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder (null , getSecurityContext ()); // caller's resources
92
94
return TransactionalSupport .checkForTransactionInThreadLocalStorage ().flatMapMany (isInTransaction -> {
93
95
boolean isInExistingTransaction = isInTransaction .isPresent ();
94
96
boolean createNewTransaction = handlePropagation (definition , isInExistingTransaction );
@@ -100,17 +102,20 @@ <T> Flux<T> executeReactive(TransactionDefinition definition,
100
102
} else {
101
103
return Mono .error (new UnsupportedOperationException ("Unsupported operation" ));
102
104
}
103
- });
105
+ }).contextWrite ( // set CouchbaseResourceHolder containing caller's SecurityContext
106
+ ctx -> ctx .put (CouchbaseResourceHolder .class , couchbaseResourceHolder ));
104
107
}
105
108
106
109
private <T > T executeNewTransaction (TransactionCallback <T > callback ) {
107
110
final AtomicReference <T > execResult = new AtomicReference <>();
111
+ final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder (null , getSecurityContext ());
108
112
109
113
// Each of these transactions will block one thread on the underlying SDK's transactions scheduler. This
110
114
// scheduler is effectively unlimited, but this can still potentially lead to high thread usage by the application.
111
115
// If this is an issue then users need to instead use the standard Couchbase reactive transactions SDK.
112
116
try {
113
117
TransactionResult ignored = couchbaseClientFactory .getCluster ().transactions ().run (ctx -> {
118
+ setSecurityContext (couchbaseResourceHolder .getSecurityContext ()); // set the security context for the transaction
114
119
CouchbaseTransactionStatus status = new CouchbaseTransactionStatus (ctx , true , false , false , true , null );
115
120
116
121
T res = callback .doInTransaction (status );
@@ -173,12 +178,16 @@ public boolean isCompleted() {
173
178
}
174
179
};
175
180
176
- return Flux .from (callback .doInTransaction (status )).doOnNext (v -> out .add (v )).then (Mono .defer (() -> {
177
- if (status .isRollbackOnly ()) {
178
- return Mono .error (new TransactionRollbackRequestedException ("TransactionStatus.isRollbackOnly() is set" ));
179
- }
180
- return Mono .empty ();
181
- }));
181
+ // Get caller's resources, set SecurityContext for the transaction
182
+ return CouchbaseResourceOwner .get ().map (cbrh -> setSecurityContext (cbrh .get ().getSecurityContext ()))
183
+ .flatMap (ignore -> Flux .from (callback .doInTransaction (status )).doOnNext (v -> out .add (v ))
184
+ .then (Mono .defer (() -> {
185
+ if (status .isRollbackOnly ()) {
186
+ return Mono .error (new TransactionRollbackRequestedException (
187
+ "TransactionStatus.isRollbackOnly() is set" ));
188
+ }
189
+ return Mono .empty ();
190
+ })));
182
191
});
183
192
184
193
}, this .options ).thenMany (Flux .defer (() -> Flux .fromIterable (out ))).onErrorMap (ex -> {
@@ -288,4 +297,26 @@ public void rollback(TransactionStatus ignored) throws TransactionException {
288
297
throw new UnsupportedOperationException (
289
298
"Direct programmatic use of the Couchbase PlatformTransactionManager is not supported" );
290
299
}
300
+
301
+ static private Object getSecurityContext () {
302
+ try {
303
+ Class <?> securityContextHolderClass = Class
304
+ .forName ("org.springframework.security.core.context.SecurityContextHolder" );
305
+ return securityContextHolderClass .getMethod ("getContext" ).invoke (null );
306
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
307
+ | InvocationTargetException cnfe ) {}
308
+ return null ;
309
+ }
310
+
311
+ static private <S > S setSecurityContext (S sc ) {
312
+ try {
313
+ Class <?> securityContextHolder = Class .forName ("org.springframework.security.core.context.SecurityContext" );
314
+ Class <?> securityContextHolderClass = Class
315
+ .forName ("org.springframework.security.core.context.SecurityContextHolder" );
316
+ securityContextHolderClass .getMethod ("setContext" , new Class [] { securityContextHolder }).invoke (null , sc );
317
+ } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
318
+ | InvocationTargetException cnfe ) {}
319
+ return sc ;
320
+ }
321
+
291
322
}
0 commit comments