Skip to content

Commit 47667e9

Browse files
authored
Add transaction close support to async and reactive APIs (#1119) (#1124)
1 parent 2aac13d commit 47667e9

File tree

7 files changed

+53
-10
lines changed

7 files changed

+53
-10
lines changed

driver/clirr-ignored-differences.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,16 @@
3131
<method>org.neo4j.driver.Logger getLog(java.lang.Class)</method>
3232
</difference>
3333

34+
<difference>
35+
<className>org/neo4j/driver/async/AsyncTransaction</className>
36+
<differenceType>7012</differenceType>
37+
<method>java.util.concurrent.CompletionStage closeAsync()</method>
38+
</difference>
39+
40+
<difference>
41+
<className>org/neo4j/driver/reactive/RxTransaction</className>
42+
<differenceType>7012</differenceType>
43+
<method>org.reactivestreams.Publisher close()</method>
44+
</difference>
45+
3446
</differences>

driver/src/main/java/org/neo4j/driver/async/AsyncTransaction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import java.util.concurrent.Executor;
2424
import java.util.function.Function;
2525

26-
import org.neo4j.driver.Session;
2726
import org.neo4j.driver.Query;
2827
import org.neo4j.driver.QueryRunner;
28+
import org.neo4j.driver.Session;
2929

3030
/**
3131
* Logical container for an atomic unit of work.
@@ -90,4 +90,12 @@ public interface AsyncTransaction extends AsyncQueryRunner
9090
* be completed exceptionally when rollback fails.
9191
*/
9292
CompletionStage<Void> rollbackAsync();
93+
94+
/**
95+
* Close the transaction. If the transaction has been {@link #commitAsync() committed} or {@link #rollbackAsync() rolled back}, the close is optional and no
96+
* operation is performed. Otherwise, the transaction will be rolled back by default by this method.
97+
*
98+
* @return new {@link CompletionStage} that gets completed with {@code null} when close is successful, otherwise it gets completed exceptionally.
99+
*/
100+
CompletionStage<Void> closeAsync();
93101
}

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,13 @@ public CompletionStage<Void> rollbackAsync()
4545
}
4646

4747
@Override
48-
public CompletionStage<ResultCursor> runAsync(Query query)
48+
public CompletionStage<Void> closeAsync()
49+
{
50+
return tx.closeAsync();
51+
}
52+
53+
@Override
54+
public CompletionStage<ResultCursor> runAsync( Query query )
4955
{
5056
return tx.runAsync( query );
5157
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public <T> Publisher<T> rollback()
7676
return createEmptyPublisher( tx::rollbackAsync );
7777
}
7878

79-
Publisher<Void> close()
79+
public Publisher<Void> close()
8080
{
8181
return close( false );
8282
}

driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,19 @@ public interface RxTransaction extends RxQueryRunner
4141
<T> Publisher<T> commit();
4242

4343
/**
44-
* Rolls back the transaction.
45-
* It completes without publishing anything if transaction is rolled back successfully.
46-
* Otherwise, errors when there is any error to roll back.
44+
* Rolls back the transaction. It completes without publishing anything if transaction is rolled back successfully. Otherwise, errors when there is any
45+
* error to roll back.
46+
*
4747
* @param <T> makes it easier to be chained after other publishers.
4848
* @return an empty publisher.
4949
*/
5050
<T> Publisher<T> rollback();
51+
52+
/**
53+
* Close the transaction. If the transaction has been {@link #commit() committed} or {@link #rollback() rolled back}, the close is optional and no operation
54+
* is performed. Otherwise, the transaction will be rolled back by default by this method.
55+
*
56+
* @return new {@link Publisher} that gets completed when close is successful, otherwise an error is signalled.
57+
*/
58+
Publisher<Void> close();
5159
}

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ public class GetFeatures implements TestkitRequest
5959
"Temporary:GetConnectionPoolMetrics",
6060
"Temporary:CypherPathAndRelationship",
6161
"Temporary:FullSummary",
62-
"Temporary:ResultKeys"
62+
"Temporary:ResultKeys",
63+
"Temporary:TransactionClose"
6364
) );
6465

6566
private static final Set<String> SYNC_FEATURES = new HashSet<>( Arrays.asList(
6667
"Feature:Bolt:3.0",
6768
"Optimization:PullPipelining",
68-
"Temporary:TransactionClose",
6969
"Feature:API:Result.List",
7070
"Optimization:ResultListFetchAll"
7171
) );

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import lombok.Getter;
2222
import lombok.Setter;
2323
import neo4j.org.testkit.backend.TestkitState;
24+
import neo4j.org.testkit.backend.holder.AbstractTransactionHolder;
2425
import neo4j.org.testkit.backend.messages.responses.TestkitResponse;
2526
import neo4j.org.testkit.backend.messages.responses.Transaction;
2627
import reactor.core.publisher.Mono;
2728

2829
import java.util.concurrent.CompletionStage;
2930

31+
import org.neo4j.driver.async.AsyncTransaction;
32+
3033
@Setter
3134
@Getter
3235
public class TransactionClose implements TestkitRequest
@@ -43,13 +46,19 @@ public TestkitResponse process( TestkitState testkitState )
4346
@Override
4447
public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState )
4548
{
46-
throw new UnsupportedOperationException();
49+
return testkitState.getAsyncTransactionHolder( data.getTxId() )
50+
.thenApply( AbstractTransactionHolder::getTransaction )
51+
.thenCompose( AsyncTransaction::closeAsync )
52+
.thenApply( ignored -> createResponse( data.getTxId() ) );
4753
}
4854

4955
@Override
5056
public Mono<TestkitResponse> processRx( TestkitState testkitState )
5157
{
52-
throw new UnsupportedOperationException( "Operation not supported" );
58+
return testkitState.getRxTransactionHolder( data.getTxId() )
59+
.map( AbstractTransactionHolder::getTransaction )
60+
.flatMap( tx -> Mono.fromDirect( tx.close() ) )
61+
.then( Mono.just( createResponse( data.getTxId() ) ) );
5362
}
5463

5564
private Transaction createResponse( String txId )

0 commit comments

Comments
 (0)