Skip to content

Propagate SecurityContext into @Transactional methods. #1979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -89,6 +90,7 @@ public <T> T execute(TransactionDefinition definition, TransactionCallback<T> ca
@Stability.Internal
<T> Flux<T> executeReactive(TransactionDefinition definition,
org.springframework.transaction.reactive.TransactionCallback<T> callback) {
final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext()); // caller's resources
return TransactionalSupport.checkForTransactionInThreadLocalStorage().flatMapMany(isInTransaction -> {
boolean isInExistingTransaction = isInTransaction.isPresent();
boolean createNewTransaction = handlePropagation(definition, isInExistingTransaction);
Expand All @@ -100,17 +102,20 @@ <T> Flux<T> executeReactive(TransactionDefinition definition,
} else {
return Mono.error(new UnsupportedOperationException("Unsupported operation"));
}
});
}).contextWrite( // set CouchbaseResourceHolder containing caller's SecurityContext
ctx -> ctx.put(CouchbaseResourceHolder.class, couchbaseResourceHolder));
}

private <T> T executeNewTransaction(TransactionCallback<T> callback) {
final AtomicReference<T> execResult = new AtomicReference<>();
final CouchbaseResourceHolder couchbaseResourceHolder = new CouchbaseResourceHolder(null, getSecurityContext());

// Each of these transactions will block one thread on the underlying SDK's transactions scheduler. This
// scheduler is effectively unlimited, but this can still potentially lead to high thread usage by the application.
// If this is an issue then users need to instead use the standard Couchbase reactive transactions SDK.
try {
TransactionResult ignored = couchbaseClientFactory.getCluster().transactions().run(ctx -> {
setSecurityContext(couchbaseResourceHolder.getSecurityContext()); // set the security context for the transaction
CouchbaseTransactionStatus status = new CouchbaseTransactionStatus(ctx, true, false, false, true, null);

T res = callback.doInTransaction(status);
Expand Down Expand Up @@ -173,12 +178,16 @@ public boolean isCompleted() {
}
};

return Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v)).then(Mono.defer(() -> {
if (status.isRollbackOnly()) {
return Mono.error(new TransactionRollbackRequestedException("TransactionStatus.isRollbackOnly() is set"));
}
return Mono.empty();
}));
// Get caller's resources, set SecurityContext for the transaction
return CouchbaseResourceOwner.get().map(cbrh -> setSecurityContext(cbrh.get().getSecurityContext()))
.flatMap(ignore -> Flux.from(callback.doInTransaction(status)).doOnNext(v -> out.add(v))
.then(Mono.defer(() -> {
if (status.isRollbackOnly()) {
return Mono.error(new TransactionRollbackRequestedException(
"TransactionStatus.isRollbackOnly() is set"));
}
return Mono.empty();
})));
});

}, this.options).thenMany(Flux.defer(() -> Flux.fromIterable(out))).onErrorMap(ex -> {
Expand Down Expand Up @@ -288,4 +297,26 @@ public void rollback(TransactionStatus ignored) throws TransactionException {
throw new UnsupportedOperationException(
"Direct programmatic use of the Couchbase PlatformTransactionManager is not supported");
}

static private Object getSecurityContext() {
try {
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
return securityContextHolderClass.getMethod("getContext").invoke(null);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
return null;
}

static private <S> S setSecurityContext(S sc) {
try {
Class<?> securityContextHolder = Class.forName("org.springframework.security.core.context.SecurityContext");
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
securityContextHolderClass.getMethod("setContext", new Class[] { securityContextHolder }).invoke(null, sc);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
return sc;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public class CouchbaseResourceHolder extends ResourceHolderSupport {

private @Nullable CoreTransactionAttemptContext core; // which holds the atr
private @Nullable Object securityContext; // SecurityContext. We don't have the class.

Map<Integer, Object> getResultMap = new HashMap<>();

/**
Expand All @@ -42,7 +44,17 @@ public class CouchbaseResourceHolder extends ResourceHolderSupport {
* @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
*/
public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core) {
this(core, null);
}

/**
* Create a new {@link CouchbaseResourceHolder} for a given {@link CoreTransactionAttemptContext session}.
*
* @param core the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
*/
public CouchbaseResourceHolder(@Nullable CoreTransactionAttemptContext core, @Nullable Object securityContext) {
this.core = core;
this.securityContext = securityContext;
}

/**
Expand All @@ -53,6 +65,14 @@ public CoreTransactionAttemptContext getCore() {
return core;
}

/**
* @return the associated {@link CoreTransactionAttemptContext}. Can be {@literal null}.
*/
@Nullable
public Object getSecurityContext() {
return securityContext;
}

public Object transactionResultHolder(Object holder, Object o) {
getResultMap.put(System.identityHashCode(o), holder);
return holder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.springframework.data.couchbase.transaction;

import reactor.core.publisher.Mono;

import java.util.Optional;

import com.couchbase.client.core.annotation.Stability.Internal;

@Internal
public class CouchbaseResourceOwner {
private static final ThreadLocal<CouchbaseResourceHolder> marker = new ThreadLocal();

public CouchbaseResourceOwner() {}

public static void set(CouchbaseResourceHolder toInject) {
if (marker.get() != null) {
throw new IllegalStateException(
"Trying to set resource holder when already inside a transaction - likely an internal bug, please report it");
} else {
marker.set(toInject);
}
}

public static void clear() {
marker.remove();
}

public static Mono<Optional<CouchbaseResourceHolder>> get() {
return Mono.deferContextual((ctx) -> {
CouchbaseResourceHolder fromThreadLocal = marker.get();
CouchbaseResourceHolder fromReactive = ctx.hasKey(CouchbaseResourceHolder.class)
? ctx.get(CouchbaseResourceHolder.class)
: null;
if (fromThreadLocal != null) {
return Mono.just(Optional.of(fromThreadLocal));
} else {
return fromReactive != null ? Mono.just(Optional.of(fromReactive)) : Mono.just(Optional.empty());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.couchbase.client.java.query.QueryScanConsistency.REQUEST_PLUS;

import org.springframework.data.couchbase.util.Util;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -119,6 +120,7 @@ public void shouldRollbackAfterExceptionOfTxAnnotatedMethod() {
@Test
public void commitShouldPersistTxEntries() {

System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext()));
personService.savePerson(WalterWhite) //
.as(StepVerifier::create) //
.expectNextCount(1) //
Expand All @@ -130,6 +132,17 @@ public void commitShouldPersistTxEntries() {
.verifyComplete();
}

@Test
public void commitShouldPersistTxEntriesBlocking() {
System.err.println("parent SecurityContext: " + System.identityHashCode(Util.getSecurityContext()));
Person p = personService.savePersonBlocking(WalterWhite);

operations.findByQuery(Person.class).withConsistency(REQUEST_PLUS).count() //
.as(StepVerifier::create) //
.expectNext(1L) //
.verifyComplete();
}

@Test
public void commitShouldPersistTxEntriesOfTxAnnotatedMethod() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.springframework.data.couchbase.core.TransactionalSupport;
import org.springframework.data.couchbase.domain.PersonWithoutVersion;
import org.springframework.data.couchbase.transaction.CouchbaseResourceHolder;
import org.springframework.data.couchbase.util.Util;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -31,6 +33,9 @@
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;

import java.lang.reflect.InvocationTargetException;
import java.util.Optional;

/**
* reactive PersonService for tests
*
Expand All @@ -57,8 +62,15 @@ public Mono<Person> savePersonErrors(Person person) {
.<Person> flatMap(it -> Mono.error(new SimulateFailureException()));
}

@Transactional
public Person savePersonBlocking(Person person) {
System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext()));
return personOperations.insertById(Person.class).one(person);
}

@Transactional
public Mono<Person> savePerson(Person person) {
System.err.println("savePerson: "+Thread.currentThread().getName() +" "+ System.identityHashCode(Util.getSecurityContext()));
return TransactionalSupport.checkForTransactionInThreadLocalStorage().map(stat -> {
assertTrue(stat.isPresent(), "Not in transaction");
System.err.println("In a transaction!!");
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/org/springframework/data/couchbase/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.awaitility.Awaitility.with;

import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedList;
Expand Down Expand Up @@ -170,4 +171,26 @@ public static void assertInAnnotationTransaction(boolean inTransaction) {
+ " but expected in-annotation-transaction = " + inTransaction);
}

static public Object getSecurityContext(){
Object sc = null;
try {
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
sc = securityContextHolderClass.getMethod("getContext").invoke(null);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
System.err.println(Thread.currentThread().getName() +" Util.get "+ System.identityHashCode(sc));
return sc;
}

static public void setSecurityContext(Object sc) {
System.err.println(Thread.currentThread().getName() +" Util.set "+ System.identityHashCode(sc));
try {
Class<?> securityContextHolderClass = Class
.forName("org.springframework.security.core.context.SecurityContextHolder");
sc = securityContextHolderClass.getMethod("setContext", new Class[]{securityContextHolderClass}).invoke(sc);
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException cnfe) {}
}

}
Loading