27
27
import org .junit .jupiter .api .Test ;
28
28
import org .junit .jupiter .api .extension .RegisterExtension ;
29
29
import org .testcontainers .containers .PostgreSQLContainer ;
30
+ import reactor .core .publisher .Flux ;
30
31
import reactor .core .publisher .Mono ;
32
+ import reactor .netty .DisposableChannel ;
33
+ import reactor .netty .DisposableServer ;
34
+ import reactor .netty .tcp .TcpServer ;
31
35
import reactor .test .StepVerifier ;
32
36
33
37
import static org .assertj .core .api .Assertions .assertThat ;
@@ -119,6 +123,53 @@ void testTargetPreferSecondaryConnectedToStandby() {
119
123
.verifyComplete ();
120
124
}
121
125
126
+ @ Test
127
+ void testTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
128
+ DisposableServer failingServer = newServer ();
129
+ try {
130
+ isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), failingServer )
131
+ .as (StepVerifier ::create )
132
+ .expectNext (true )
133
+ .verifyComplete ();
134
+ } finally {
135
+ failingServer .dispose ();
136
+ }
137
+ }
138
+
139
+ @ Test
140
+ void testMultipleCallsWithTargetPreferSecondaryConnectedToStandby () {
141
+ PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactory (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (), SERVERS .getStandby ());
142
+
143
+ Mono <Boolean > allocator = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
144
+ Flux <Boolean > connectionPool = Flux .merge (allocator , allocator );
145
+
146
+ connectionPool
147
+ .as (StepVerifier ::create )
148
+ .expectNext (false )
149
+ .expectNext (false )
150
+ .verifyComplete ();
151
+ }
152
+
153
+ @ Test
154
+ void testMultipleCallsWithTargetPreferSecondaryConnectedToMasterOnStandbyFailure () {
155
+ DisposableServer failingServer = newServer ();
156
+ try {
157
+ PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType .PREFER_SECONDARY , SERVERS .getPrimary (),
158
+ failingServer );
159
+
160
+ Mono <Boolean > allocator = Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
161
+ Flux <Boolean > connectionPool = Flux .merge (allocator , allocator );
162
+
163
+ connectionPool
164
+ .as (StepVerifier ::create )
165
+ .expectNext (true )
166
+ .expectNext (true )
167
+ .verifyComplete ();
168
+ } finally {
169
+ failingServer .dispose ();
170
+ }
171
+ }
172
+
122
173
@ Test
123
174
void testTargetPrimaryChoosePrimary () {
124
175
isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType .PRIMARY , SERVERS .getPrimary (), SERVERS .getStandby ())
@@ -181,6 +232,12 @@ private Mono<Boolean> isConnectedToPrimary(MultiHostConnectionStrategy.TargetSer
181
232
return Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
182
233
}
183
234
235
+ private Mono <Boolean > isConnectedToPrimary (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer , DisposableServer failingServer ) {
236
+ PostgresqlConnectionFactory connectionFactory = this .multiHostConnectionFactoryWithFailingServer (targetServerType , primaryServer , failingServer );
237
+
238
+ return Mono .usingWhen (connectionFactory .create (), this ::isPrimary , Connection ::close );
239
+ }
240
+
184
241
private Mono <Boolean > isPrimary (PostgresqlConnection connection ) {
185
242
return connection .createStatement ("SHOW TRANSACTION_READ_ONLY" )
186
243
.execute ()
@@ -203,4 +260,25 @@ private PostgresqlConnectionFactory multiHostConnectionFactory(MultiHostConnecti
203
260
return new PostgresqlConnectionFactory (configuration );
204
261
}
205
262
263
+ private PostgresqlConnectionFactory multiHostConnectionFactoryWithFailingServer (MultiHostConnectionStrategy .TargetServerType targetServerType , PostgreSQLContainer <?> primaryServer ,
264
+ DisposableServer failingServer ) {
265
+ PostgresqlConnectionConfiguration .Builder builder = PostgresqlConnectionConfiguration .builder ();
266
+ builder .addHost (primaryServer .getHost (), primaryServer .getMappedPort (5432 ));
267
+ builder .addHost (failingServer .host (), failingServer .port ());
268
+
269
+ PostgresqlConnectionConfiguration configuration = builder
270
+ .targetServerType (targetServerType )
271
+ .username (primaryServer .getUsername ())
272
+ .password (primaryServer .getPassword ())
273
+ .build ();
274
+ return new PostgresqlConnectionFactory (configuration );
275
+ }
276
+
277
+ // Simulate server downtime, where connections are accepted and then closed immediately
278
+ static DisposableServer newServer () {
279
+ return TcpServer .create ()
280
+ .doOnConnection (DisposableChannel ::dispose )
281
+ .bindNow ();
282
+ }
283
+
206
284
}
0 commit comments