20
20
import static org .springframework .data .couchbase .util .JavaIntegrationTests .throwSimulateFailureException ;
21
21
import static org .springframework .data .couchbase .util .Util .assertInAnnotationTransaction ;
22
22
23
- import reactor .core .publisher .Flux ;
24
23
import reactor .core .publisher .Mono ;
25
24
26
25
import java .util .List ;
32
31
import org .springframework .data .couchbase .domain .Person ;
33
32
import org .springframework .stereotype .Component ;
34
33
import org .springframework .stereotype .Service ;
35
- import org .springframework .transaction .annotation .EnableTransactionManagement ;
36
34
import org .springframework .transaction .annotation .Transactional ;
37
- import org .springframework .transaction .reactive .TransactionalOperator ;
38
35
39
36
/**
40
37
* PersonService for tests
41
38
*
42
39
* @author Michael Reiche
43
40
*/
44
- @ Service
45
- @ Component
46
- @ EnableTransactionManagement
41
+ @ Service // this will work in the unit tests even without @Service because of explicit loading by @SpringJUnitConfig
47
42
class PersonService {
48
43
49
44
final CouchbaseOperations personOperations ;
50
- final ReactiveCouchbaseOperations personOperationsRx ;
51
- final TransactionalOperator transactionalOperator ;
45
+ final ReactiveCouchbaseOperations reactivePersonOperations ;
52
46
53
- public PersonService (CouchbaseOperations ops , ReactiveCouchbaseOperations opsRx ,
54
- TransactionalOperator transactionalOperator ) {
47
+ public PersonService (CouchbaseOperations ops , ReactiveCouchbaseOperations reactiveOps ) {
55
48
personOperations = ops ;
56
- personOperationsRx = opsRx ;
57
- this .transactionalOperator = transactionalOperator ;
49
+ reactivePersonOperations = reactiveOps ;
58
50
}
59
51
52
+ @ Transactional
60
53
public Person savePersonErrors (Person person ) {
61
54
assertInAnnotationTransaction (false );
62
-
63
- return personOperationsRx .insertById (Person .class ).one (person )//
64
- .<Person > flatMap (it -> Mono .error (new SimulateFailureException ()))//
65
- .as (transactionalOperator ::transactional ).block ();
55
+ Person p = personOperations .insertById (Person .class ).one (person );
56
+ SimulateFailureException .throwEx ("savePersonErrors" );
57
+ return p ;
66
58
}
67
59
60
+ @ Transactional
68
61
public Person savePerson (Person person ) {
69
- assertInAnnotationTransaction (false );
70
- return personOperationsRx .insertById (Person .class ).one (person )//
71
- .as (transactionalOperator ::transactional ).block ();
62
+ assertInAnnotationTransaction (true );
63
+ return personOperations .insertById (Person .class ).one (person );
72
64
}
73
65
66
+ @ Transactional
74
67
public Long countDuringTx (Person person ) {
75
- assertInAnnotationTransaction (false );
76
- return personOperationsRx .insertById (Person .class ).one (person )//
77
- .then (personOperationsRx .findByQuery (Person .class ).withConsistency (REQUEST_PLUS ).count ())
78
- .as (transactionalOperator ::transactional ).block ();
68
+ assertInAnnotationTransaction (true );
69
+ Person p = personOperations .insertById (Person .class ).one (person );
70
+ return personOperations .findByQuery (Person .class ).withConsistency (REQUEST_PLUS ).count ();
79
71
}
80
72
73
+ @ Transactional
81
74
public List <CouchbasePersonTransactionIntegrationTests .EventLog > saveWithLogs (Person person ) {
82
- assertInAnnotationTransaction (false );
83
- return Flux
84
- .merge (
85
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
86
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeConvert" )),
87
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
88
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterConvert" )),
89
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
90
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeInsert" )),
91
- personOperationsRx .insertById (Person .class ).one (person ),
92
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
93
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterInsert" ))) //
94
- .thenMany (personOperationsRx .findByQuery (CouchbasePersonTransactionIntegrationTests .EventLog .class )
95
- .withConsistency (REQUEST_PLUS ).all ()) //
96
- .as (transactionalOperator ::transactional ).collectList ().block ();
75
+ assertInAnnotationTransaction (true );
76
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
77
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeConvert" ));
78
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
79
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterConvert" ));
80
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
81
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeInsert" ));
82
+ personOperations .insertById (Person .class ).one (person );
83
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
84
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterInsert" ));
85
+ return personOperations .findByQuery (CouchbasePersonTransactionIntegrationTests .EventLog .class )
86
+ .withConsistency (REQUEST_PLUS ).all ();
97
87
}
98
88
89
+ @ Transactional
99
90
public List <CouchbasePersonTransactionIntegrationTests .EventLog > saveWithErrorLogs (Person person ) {
100
- assertInAnnotationTransaction (false );
101
-
102
- return Flux
103
- .merge (
104
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
105
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeConvert" )),
106
- //
107
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
108
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterConvert" )),
109
- //
110
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
111
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeInsert" )),
112
- //
113
- personOperationsRx .insertById (Person .class ).one (person ),
114
- //
115
- personOperationsRx .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
116
- .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterInsert" ))) //
117
- .thenMany (personOperationsRx .findByQuery (CouchbasePersonTransactionIntegrationTests .EventLog .class )
118
- .withConsistency (REQUEST_PLUS ).all ()) //
119
- .<CouchbasePersonTransactionIntegrationTests .EventLog > flatMap (it -> Mono .error (new SimulateFailureException ()))
120
- .as (transactionalOperator ::transactional ).collectList ().block ();
121
-
91
+ assertInAnnotationTransaction (true );
92
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
93
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeConvert" ));
94
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
95
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterConvert" ));
96
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
97
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "beforeInsert" ));
98
+ personOperations .insertById (Person .class ).one (person );
99
+ personOperations .insertById (CouchbasePersonTransactionIntegrationTests .EventLog .class )
100
+ .one (new CouchbasePersonTransactionIntegrationTests .EventLog (new ObjectId (), "afterInsert" ));
101
+ SimulateFailureException .throwEx ("saveEventError" );
102
+ return personOperations .findByQuery (CouchbasePersonTransactionIntegrationTests .EventLog .class )
103
+ .withConsistency (REQUEST_PLUS ).all ();
122
104
}
123
105
124
106
// org.springframework.beans.factory.NoUniqueBeanDefinitionException:
@@ -160,14 +142,12 @@ public Person declarativeFindReplacePersonCallback(Person person, AtomicInteger
160
142
* @param person
161
143
* @return
162
144
*/
163
- // @Transactional(transactionManager = BeanNames.REACTIVE_COUCHBASE_TRANSACTION_MANAGER)
164
- // must use transactionalOperator
145
+ @ Transactional
165
146
public Mono <Person > declarativeFindReplacePersonReactive (Person person , AtomicInteger tryCount ) {
166
- // assertInAnnotationTransaction(true);
167
- return personOperationsRx .findById (Person .class ).one (person .id ())
147
+ assertInAnnotationTransaction (true );
148
+ return reactivePersonOperations .findById (Person .class ).one (person .id ())
168
149
.map ((p ) -> ReplaceLoopThread .updateOutOfTransaction (personOperations , p , tryCount .incrementAndGet ()))
169
- .flatMap (p -> personOperationsRx .replaceById (Person .class ).one (p .withFirstName (person .getFirstname ())))
170
- .as (transactionalOperator ::transactional );
150
+ .flatMap (p -> reactivePersonOperations .replaceById (Person .class ).one (p .withFirstName (person .getFirstname ())));
171
151
}
172
152
173
153
/**
@@ -183,19 +163,17 @@ public Person declarativeFindReplacePerson(Person person, AtomicInteger tryCount
183
163
return personOperations .replaceById (Person .class ).one (p .withFirstName (person .getFirstname ()));
184
164
}
185
165
186
- // @Transactional(transactionManager = BeanNames.REACTIVE_COUCHBASE_TRANSACTION_MANAGER)
187
- // must use transactionalOperator
166
+ @ Transactional
188
167
public Mono <Person > declarativeSavePersonReactive (Person person ) {
189
- // assertInAnnotationTransaction(true);
190
- return personOperationsRx .insertById (Person .class ).one (person ). as ( transactionalOperator :: transactional );
168
+ assertInAnnotationTransaction (true );
169
+ return reactivePersonOperations .insertById (Person .class ).one (person );
191
170
}
192
171
193
- // @Transactional(transactionManager = BeanNames.REACTIVE_COUCHBASE_TRANSACTION_MANAGER)
194
- // must use transactionalOperator
172
+ @ Transactional
195
173
public Mono <Person > declarativeSavePersonErrorsReactive (Person person ) {
196
- // assertInAnnotationTransaction(true);
197
- return personOperationsRx .insertById (Person .class ).one (person ).map ((pp ) -> throwSimulateFailureException (pp ))
198
- . as ( transactionalOperator :: transactional ); //
174
+ assertInAnnotationTransaction (true );
175
+ return reactivePersonOperations .insertById (Person .class ).one (person ).map ((pp ) -> throwSimulateFailureException (pp ));
176
+
199
177
}
200
178
201
179
}
0 commit comments