20
20
21
21
import static reactor .adapter .JdkFlowAdapter .flowPublisherToFlux ;
22
22
23
- import java .time .Duration ;
24
- import java .util .Map ;
25
- import java .util .Optional ;
26
23
import java .util .concurrent .CompletionStage ;
27
24
import lombok .Getter ;
28
25
import lombok .Setter ;
29
- import neo4j .org .testkit .backend .CustomDriverError ;
30
26
import neo4j .org .testkit .backend .TestkitState ;
31
27
import neo4j .org .testkit .backend .holder .AsyncTransactionHolder ;
32
28
import neo4j .org .testkit .backend .holder .ReactiveTransactionHolder ;
43
39
import org .neo4j .driver .reactive .RxSession ;
44
40
import reactor .core .publisher .Mono ;
45
41
46
- @ Setter
47
- @ Getter
48
- public class SessionBeginTransaction implements TestkitRequest {
49
- private SessionBeginTransactionBody data ;
50
-
51
- private void configureTimeout (TransactionConfig .Builder builder ) {
52
- if (data .getTimeoutPresent ()) {
53
- try {
54
- if (data .getTimeout () != null ) {
55
- builder .withTimeout (Duration .ofMillis (data .getTimeout ()));
56
- } else {
57
- builder .withDefaultTimeout ();
58
- }
59
- } catch (IllegalArgumentException e ) {
60
- throw new CustomDriverError (e );
61
- }
62
- }
63
- }
64
-
42
+ public class SessionBeginTransaction
43
+ extends AbstractTestkitRequestWithTransactionConfig <SessionBeginTransaction .SessionBeginTransactionBody > {
65
44
@ Override
66
45
public TestkitResponse process (TestkitState testkitState ) {
67
46
SessionHolder sessionHolder = testkitState .getSessionHolder (data .getSessionId ());
68
47
Session session = sessionHolder .getSession ();
69
- TransactionConfig .Builder builder = TransactionConfig .builder ();
70
- Optional .ofNullable (data .txMeta ).ifPresent (builder ::withMetadata );
71
48
72
- configureTimeout (builder );
73
-
74
- org .neo4j .driver .Transaction transaction = session .beginTransaction (builder .build ());
49
+ org .neo4j .driver .Transaction transaction = session .beginTransaction (buildTxConfig ());
75
50
return transaction (testkitState .addTransactionHolder (new TransactionHolder (sessionHolder , transaction )));
76
51
}
77
52
@@ -80,11 +55,8 @@ public CompletionStage<TestkitResponse> processAsync(TestkitState testkitState)
80
55
return testkitState .getAsyncSessionHolder (data .getSessionId ()).thenCompose (sessionHolder -> {
81
56
AsyncSession session = sessionHolder .getSession ();
82
57
TransactionConfig .Builder builder = TransactionConfig .builder ();
83
- Optional .ofNullable (data .txMeta ).ifPresent (builder ::withMetadata );
84
-
85
- configureTimeout (builder );
86
58
87
- return session .beginTransactionAsync (builder . build ())
59
+ return session .beginTransactionAsync (buildTxConfig ())
88
60
.thenApply (tx -> transaction (
89
61
testkitState .addAsyncTransactionHolder (new AsyncTransactionHolder (sessionHolder , tx ))));
90
62
});
@@ -96,11 +68,8 @@ public Mono<TestkitResponse> processRx(TestkitState testkitState) {
96
68
return testkitState .getRxSessionHolder (data .getSessionId ()).flatMap (sessionHolder -> {
97
69
RxSession session = sessionHolder .getSession ();
98
70
TransactionConfig .Builder builder = TransactionConfig .builder ();
99
- Optional .ofNullable (data .txMeta ).ifPresent (builder ::withMetadata );
100
71
101
- configureTimeout (builder );
102
-
103
- return Mono .fromDirect (session .beginTransaction (builder .build ()))
72
+ return Mono .fromDirect (session .beginTransaction (buildTxConfig ()))
104
73
.map (tx -> transaction (
105
74
testkitState .addRxTransactionHolder (new RxTransactionHolder (sessionHolder , tx ))));
106
75
});
@@ -111,11 +80,8 @@ public Mono<TestkitResponse> processReactive(TestkitState testkitState) {
111
80
return testkitState .getReactiveSessionHolder (data .getSessionId ()).flatMap (sessionHolder -> {
112
81
ReactiveSession session = sessionHolder .getSession ();
113
82
TransactionConfig .Builder builder = TransactionConfig .builder ();
114
- Optional .ofNullable (data .txMeta ).ifPresent (builder ::withMetadata );
115
-
116
- configureTimeout (builder );
117
83
118
- return Mono .fromDirect (flowPublisherToFlux (session .beginTransaction (builder . build ())))
84
+ return Mono .fromDirect (flowPublisherToFlux (session .beginTransaction (buildTxConfig ())))
119
85
.map (tx -> transaction (testkitState .addReactiveTransactionHolder (
120
86
new ReactiveTransactionHolder (sessionHolder , tx ))));
121
87
});
@@ -126,11 +92,8 @@ public Mono<TestkitResponse> processReactiveStreams(TestkitState testkitState) {
126
92
return testkitState .getReactiveSessionStreamsHolder (data .getSessionId ()).flatMap (sessionHolder -> {
127
93
var session = sessionHolder .getSession ();
128
94
TransactionConfig .Builder builder = TransactionConfig .builder ();
129
- Optional .ofNullable (data .txMeta ).ifPresent (builder ::withMetadata );
130
95
131
- configureTimeout (builder );
132
-
133
- return Mono .fromDirect (session .beginTransaction (builder .build ()))
96
+ return Mono .fromDirect (session .beginTransaction (buildTxConfig ()))
134
97
.map (tx -> transaction (testkitState .addReactiveTransactionStreamsHolder (
135
98
new ReactiveTransactionStreamsHolder (sessionHolder , tx ))));
136
99
});
@@ -144,15 +107,8 @@ private Transaction transaction(String txId) {
144
107
145
108
@ Getter
146
109
@ Setter
147
- public static class SessionBeginTransactionBody {
110
+ public static class SessionBeginTransactionBody
111
+ extends AbstractTestkitRequestWithTransactionConfig .TransactionConfigBody {
148
112
private String sessionId ;
149
- private Map <String , Object > txMeta ;
150
- private Integer timeout ;
151
- private Boolean timeoutPresent = false ;
152
-
153
- public void setTimeout (Integer timeout ) {
154
- this .timeout = timeout ;
155
- timeoutPresent = true ;
156
- }
157
113
}
158
114
}
0 commit comments