Skip to content

Commit 1701183

Browse files
committed
Fixed up merge issues.
1 parent 4978cec commit 1701183

File tree

5 files changed

+101
-164
lines changed

5 files changed

+101
-164
lines changed

src/main/java/com/couchbase/client/java/transactions/AttemptContextReactiveAccessor.java

-14
Original file line numberDiff line numberDiff line change
@@ -206,18 +206,4 @@ public static TransactionResult run(Transactions transactions, Consumer<Transact
206206
return reactive(transactions).runBlocking(transactionLogic, coreTransactionOptions);
207207
}
208208

209-
CoreTransactionAttemptContext coreTransactionsReactive;
210-
try {
211-
Field field = TransactionAttemptContext.class.getDeclaredField("internal");
212-
field.setAccessible(true);
213-
coreTransactionsReactive = (CoreTransactionAttemptContext) field.get(atr);
214-
} catch (Throwable err) {
215-
throw new RuntimeException(err);
216-
}
217-
return coreTransactionsReactive;
218-
}
219-
220-
public static ReactiveTransactionAttemptContext createReactiveTransactionAttemptContext(CoreTransactionAttemptContext core, JsonSerializer jsonSerializer) {
221-
return new ReactiveTransactionAttemptContext(core, jsonSerializer);
222-
}
223209
}

src/main/java/org/springframework/data/couchbase/core/ReactiveFindByQueryOperationSupport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public Mono<Long> count() {
276276
}
277277
}).flatMapMany(o -> o instanceof ReactiveQueryResult ? ((ReactiveQueryResult) o).rowsAsObject()
278278
: Flux.fromIterable(((TransactionQueryResult) o).rowsAsObject()))
279-
.map(row -> row.getLong(row.getNames().iterator().next())).elementAt(0);
279+
.map(row -> row.getLong(row.getNames().iterator().next())).next();
280280
}
281281

282282
@Override

src/main/java/org/springframework/data/couchbase/transaction/CouchbaseTransactionManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected void doBegin(Object transaction, TransactionDefinition definition) thr
133133

134134
CouchbaseTransactionObject couchbaseTransactionObject = extractCouchbaseTransaction(transaction);
135135
// should ACR already be in TSM? TransactionSynchronizationManager.bindResource(getRequiredDbFactory().getCluster(), resourceHolder);
136-
ReactiveCouchbaseResourceHolder resourceHolder = newResourceHolder(definition, TransactionOptions.transactionOptions(),
136+
ReactiveCouchbaseResourceHolder resourceHolder = newResourceHolder(getDatabaseFactory(), definition, TransactionOptions.transactionOptions(),
137137
null /* ((CouchbaseTransactionDefinition) definition).getAttemptContextReactive()*/);
138138
couchbaseTransactionObject.setResourceHolder(resourceHolder);
139139

Original file line numberDiff line numberDiff line change
@@ -1,157 +1,106 @@
11
package org.springframework.data.couchbase.transaction;
22

3-
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
3+
import static org.springframework.data.couchbase.transaction.CouchbaseTransactionManager.debugString;
4+
import static org.springframework.data.couchbase.transaction.CouchbaseTransactionManager.newResourceHolder;
5+
6+
import reactor.util.annotation.Nullable;
7+
8+
import java.util.function.Consumer;
9+
10+
import org.springframework.data.couchbase.CouchbaseClientFactory;
11+
import org.springframework.transaction.support.TransactionSynchronizationManager;
12+
13+
import com.couchbase.client.core.error.transaction.internal.CoreTransactionFailedException;
14+
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
15+
import com.couchbase.client.core.transaction.CoreTransactionResult;
16+
import com.couchbase.client.core.transaction.log.CoreTransactionLogger;
417
import com.couchbase.client.java.transactions.AttemptContextReactiveAccessor;
5-
import com.couchbase.client.java.transactions.ReactiveTransactionAttemptContext;
18+
import com.couchbase.client.java.transactions.TransactionAttemptContext;
619
import com.couchbase.client.java.transactions.TransactionResult;
20+
import com.couchbase.client.java.transactions.Transactions;
721
import com.couchbase.client.java.transactions.config.TransactionOptions;
8-
import org.springframework.data.couchbase.ReactiveCouchbaseClientFactory;
9-
import org.springframework.transaction.ReactiveTransaction;
10-
import org.springframework.transaction.TransactionDefinition;
11-
import org.springframework.transaction.reactive.TransactionContextManager;
12-
import org.springframework.transaction.reactive.TransactionSynchronizationManager;
13-
import reactor.core.publisher.Mono;
14-
15-
import java.time.Duration;
16-
import java.time.temporal.ChronoUnit;
17-
import java.util.function.Function;
22+
import com.couchbase.client.java.transactions.error.TransactionFailedException;
1823

1924
// todo gp needed now Transactions has gone?
20-
public class TransactionsWrapper {
21-
ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory;
22-
23-
public TransactionsWrapper(ReactiveCouchbaseClientFactory reactiveCouchbaseClientFactory) {
24-
this.reactiveCouchbaseClientFactory = reactiveCouchbaseClientFactory;
25-
}
26-
27-
/**
28-
* A convenience wrapper around {@link TransactionsReactive#run}, that provides a default
29-
* <code>PerTransactionConfig</code>.
30-
*/
31-
public Mono<TransactionResult> reactive(Function<ReactiveTransactionAttemptContext, Mono<?>> transactionLogic) {
32-
// TODO long duration for debugger
33-
Duration duration = Duration.ofMinutes(20);
34-
System.err.println("tx duration of " + duration);
35-
return run(transactionLogic, TransactionOptions.transactionOptions().timeout(duration));
36-
}
37-
38-
public Mono<TransactionResult> run(Function<ReactiveTransactionAttemptContext, Mono<?>> transactionLogic) {
39-
return run(transactionLogic,null);
40-
}
41-
public Mono<TransactionResult> run(Function<ReactiveTransactionAttemptContext, Mono<?>> transactionLogic,
42-
TransactionOptions perConfig) {
43-
// todo gp this is duplicating a lot of logic from the core loop, and is hopefully not needed..
44-
// todo mr it binds to with the TransactionSynchronizationManager - which is necessary.
45-
Mono<TransactionResult> txResult = reactiveCouchbaseClientFactory.getCluster().block().reactive().transactions().run((ctx) -> {
46-
ReactiveCouchbaseResourceHolder resourceHolder = reactiveCouchbaseClientFactory
47-
.getTransactionResources(TransactionOptions.transactionOptions(), AttemptContextReactiveAccessor.getCore(ctx));
48-
49-
Mono<TransactionSynchronizationManager> sync = TransactionContextManager.currentContext()
50-
.map(TransactionSynchronizationManager::new).flatMap(synchronizationManager -> {
51-
synchronizationManager.bindResource(reactiveCouchbaseClientFactory.getCluster().block(), resourceHolder);
52-
prepareSynchronization(synchronizationManager, null, new CouchbaseTransactionDefinition());
53-
Mono<?> result = transactionLogic.apply(ctx);
54-
result
55-
.onErrorResume(err -> {
56-
AttemptContextReactiveAccessor.getLogger(ctx).info(ctx.toString(), "caught exception '%s' in async, rethrowing", err);
57-
//logElidedStacktrace(ctx, err);
58-
59-
return Mono.error(new TransactionOperationFailedException(true, true, err, null));
60-
})
61-
.thenReturn(ctx);
62-
return result.then(Mono.just(synchronizationManager));
63-
});
64-
65-
return sync.contextWrite(TransactionContextManager.getOrCreateContext())
66-
.contextWrite(TransactionContextManager.getOrCreateContextHolder());
67-
});
68-
return txResult;
69-
/*
70-
TransactionsConfig config = TransactionsConfig.create().build();
71-
72-
ClusterEnvironment env = ClusterEnvironment.builder().build();
73-
return Mono.defer(() -> {
74-
MergedTransactionsConfig merged = new MergedTransactionsConfig(config, Optional.of(perConfig));
75-
76-
TransactionContext overall =
77-
new TransactionContext(env.requestTracer(),
78-
env.eventBus(),
79-
UUID.randomUUID().toString(),
80-
now(),
81-
Duration.ZERO,
82-
merged);
83-
AtomicReference<Long> startTime = new AtomicReference<>(0L);
84-
85-
Mono<ReactiveTransactionAttemptContext> ob = Mono.fromCallable(() -> {
86-
String txnId = UUID.randomUUID().toString();
87-
//overall.LOGGER.info(configDebug(config, perConfig));
88-
return reactiveCouchbaseClientFactory.getCluster().block().reactive().transactions().createAttemptContext(overall, merged, txnId);
89-
}).flatMap(ctx -> {
90-
91-
AttemptContextReactiveAccessor.getLogger(ctx).info("starting attempt %d/%s/%s",
92-
overall.numAttempts(), ctx.transactionId(), ctx.attemptId());
93-
94-
// begin spring-data-couchbase transaction 1/2 *
95-
ClientSession clientSession = reactiveCouchbaseClientFactory // couchbaseClientFactory
96-
.getSession(ClientSessionOptions.builder().causallyConsistent(true).build(), transactions, null, ctx);
97-
ReactiveCouchbaseResourceHolder resourceHolder = new ReactiveCouchbaseResourceHolder(clientSession,
98-
reactiveCouchbaseClientFactory);
99-
Mono<TransactionSynchronizationManager> sync = TransactionContextManager.currentContext()
100-
.map(TransactionSynchronizationManager::new).<TransactionSynchronizationManager>flatMap(synchronizationManager -> {
101-
synchronizationManager.bindResource(reactiveCouchbaseClientFactory.getCluster().block(), resourceHolder);
102-
prepareSynchronization(synchronizationManager, null, new CouchbaseTransactionDefinition());
103-
// end spring-data-couchbase transaction 1/2
104-
Mono<Void> result = transactionLogic.apply(ctx);
105-
result
106-
.onErrorResume(err -> {
107-
AttemptContextReactiveAccessor.getLogger(ctx).info(ctx.attemptId(), "caught exception '%s' in async, rethrowing", err);
108-
logElidedStacktrace(ctx, err);
109-
110-
return Mono.error(TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx));
111-
})
112-
.thenReturn(ctx);
113-
return result.then(Mono.just(synchronizationManager));
114-
});
115-
// begin spring-data-couchbase transaction 2/2
116-
return sync.contextWrite(TransactionContextManager.getOrCreateContext())
117-
.contextWrite(TransactionContextManager.getOrCreateContextHolder()).then(Mono.just(ctx));
118-
// end spring-data-couchbase transaction 2/2
119-
}).doOnSubscribe(v -> startTime.set(System.nanoTime()))
120-
.doOnNext(v -> AttemptContextReactiveAccessor.getLogger(v).trace(v.attemptId(), "finished attempt %d in %sms",
121-
overall.numAttempts(), (System.nanoTime() - startTime.get()) / 1_000_000));
122-
123-
return transactions.reactive().executeTransaction(merged, overall, ob)
124-
.doOnNext(v -> overall.span().finish())
125-
.doOnError(err -> overall.span().failWith(err));
126-
});
127-
128-
*/
129-
}
130-
131-
// private void logElidedStacktrace(ReactiveTransactionAttemptContext ctx, Throwable err) {
132-
// transactions.reactive().logElidedStacktrace(ctx, err);
133-
// }
134-
//
135-
// private String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
136-
// return transactions.reactive().configDebug(config, perConfig);
137-
// }
138-
//
139-
private static Duration now() {
140-
return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
141-
}
142-
143-
private static void prepareSynchronization(TransactionSynchronizationManager synchronizationManager,
144-
ReactiveTransaction status, TransactionDefinition definition) {
145-
146-
// if (status.isNewSynchronization()) {
147-
synchronizationManager.setActualTransactionActive(false /*status.hasTransaction()*/);
148-
synchronizationManager.setCurrentTransactionIsolationLevel(
149-
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel()
150-
: null);
151-
synchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
152-
synchronizationManager.setCurrentTransactionName(definition.getName());
153-
synchronizationManager.initSynchronization();
154-
// }
155-
}
25+
public class TransactionsWrapper /* wraps Transactions */ {
26+
CouchbaseClientFactory couchbaseClientFactory;
27+
28+
public TransactionsWrapper(CouchbaseClientFactory couchbaseClientFactory) {
29+
this.couchbaseClientFactory = couchbaseClientFactory;
30+
}
31+
32+
33+
/**
34+
* Runs supplied transactional logic until success or failure.
35+
* <p>
36+
* The supplied transactional logic will be run if necessary multiple times, until either:
37+
* <ul>
38+
* <li>The transaction successfully commits</li>
39+
* <li>The transactional logic requests an explicit rollback</li>
40+
* <li>The transaction timesout.</li>
41+
* <li>An exception is thrown, either inside the transaction library or by the supplied transaction logic, that cannot
42+
* be handled.
43+
* </ul>
44+
* <p>
45+
* The transaction logic {@link Consumer} is provided an {@link TransactionAttemptContext}, which contains methods
46+
* allowing it to read, mutate, insert and delete documents, as well as commit or rollback the transaction.
47+
* <p>
48+
* If the transaction logic performs a commit or rollback it must be the last operation performed. Else a
49+
* {@link com.couchbase.client.java.transactions.error.TransactionFailedException} will be thrown. Similarly, there
50+
* cannot be a commit followed by a rollback, or vice versa - this will also raise a
51+
* {@link CoreTransactionFailedException}.
52+
* <p>
53+
* If the transaction logic does not perform an explicit commit or rollback, then a commit will be performed anyway.
54+
*
55+
* @param transactionLogic the application's transaction logic
56+
* @param options the configuration to use for this transaction
57+
* @return there is no need to check the returned {@link CoreTransactionResult}, as success is implied by the lack of
58+
* a thrown exception. It contains information useful only for debugging and logging.
59+
* @throws TransactionFailedException or a derived exception if the transaction fails to commit for any reason,
60+
* possibly after multiple retries. The exception contains further details of the error
61+
*/
62+
63+
public TransactionResult run(Consumer<TransactionAttemptContext> transactionLogic,
64+
@Nullable TransactionOptions options) {
65+
Consumer<TransactionAttemptContext> newTransactionLogic = (ctx) -> {
66+
try {
67+
CoreTransactionLogger logger = AttemptContextReactiveAccessor.getLogger(ctx);
68+
CoreTransactionAttemptContext atr = AttemptContextReactiveAccessor.getCore(ctx);
69+
70+
// from CouchbaseTransactionManager
71+
ReactiveCouchbaseResourceHolder resourceHolder = newResourceHolder(couchbaseClientFactory,
72+
/*definition*/ new CouchbaseTransactionDefinition(), TransactionOptions.transactionOptions(), atr);
73+
// couchbaseTransactionObject.setResourceHolder(resourceHolder);
74+
75+
logger
76+
.debug(String.format("About to start transaction for session %s.", debugString(resourceHolder.getCore())));
77+
78+
logger.debug(String.format("Started transaction for session %s.", debugString(resourceHolder.getCore())));
79+
80+
TransactionSynchronizationManager.setActualTransactionActive(true);
81+
resourceHolder.setSynchronizedWithTransaction(true);
82+
TransactionSynchronizationManager.unbindResourceIfPossible(couchbaseClientFactory.getCluster());
83+
logger.debug("CouchbaseTransactionManager: " + this);
84+
logger.debug("bindResource: " + couchbaseClientFactory.getCluster() + " value: " + resourceHolder);
85+
TransactionSynchronizationManager.bindResource(couchbaseClientFactory.getCluster(), resourceHolder);
86+
87+
transactionLogic.accept(ctx);
88+
} finally {
89+
TransactionSynchronizationManager.unbindResource(couchbaseClientFactory.getCluster());
90+
}
91+
};
92+
93+
return AttemptContextReactiveAccessor.run(couchbaseClientFactory.getCluster().transactions(), newTransactionLogic,
94+
options == null ? null : options.build());
95+
}
96+
97+
/**
98+
* Runs supplied transactional logic until success or failure. A convenience overload for {@link Transactions#run}
99+
* that provides a default <code>PerTransactionConfig</code>
100+
*/
101+
102+
public TransactionResult run(Consumer<TransactionAttemptContext> transactionLogic) {
103+
return run(transactionLogic, null);
104+
}
156105

157106
}

src/test/java/org/springframework/data/couchbase/transactions/CouchbasePersonTransactionIntegrationTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,8 @@ private <T> Mono<T> throwSimulatedFailure(T p) {
667667
@Data
668668
static class EventLog {
669669

670+
public EventLog(){};
671+
670672
public EventLog(ObjectId oid, String action) {
671673
this.id = oid.toString();
672674
this.action = action;

0 commit comments

Comments
 (0)