|
21 | 21 | import io.netty.util.ReferenceCountUtil;
|
22 | 22 | import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
|
23 | 23 | import io.r2dbc.postgresql.PostgresqlConnectionFactory;
|
| 24 | +import io.r2dbc.postgresql.api.ErrorDetails; |
24 | 25 | import io.r2dbc.postgresql.api.PostgresqlConnection;
|
| 26 | +import io.r2dbc.postgresql.api.PostgresqlException; |
25 | 27 | import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
|
26 | 28 | import io.r2dbc.postgresql.message.backend.BackendMessage;
|
27 | 29 | import io.r2dbc.postgresql.message.backend.CommandComplete;
|
|
30 | 32 | import io.r2dbc.postgresql.message.backend.RowDescription;
|
31 | 33 | import io.r2dbc.postgresql.message.frontend.FrontendMessage;
|
32 | 34 | import io.r2dbc.postgresql.message.frontend.Query;
|
| 35 | +import io.r2dbc.postgresql.util.PgBouncer; |
33 | 36 | import io.r2dbc.postgresql.util.PostgresqlServerExtension;
|
| 37 | +import io.r2dbc.spi.R2dbcBadGrammarException; |
34 | 38 | import io.r2dbc.spi.R2dbcNonTransientResourceException;
|
35 | 39 | import io.r2dbc.spi.R2dbcPermissionDeniedException;
|
36 | 40 | import org.junit.jupiter.api.AfterEach;
|
@@ -336,29 +340,77 @@ public boolean verify(String s, SSLSession sslSession) {
|
336 | 340 |
|
337 | 341 | @Nested
|
338 | 342 | @TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
339 |
| - final class StatementCacheSizeTests { |
| 343 | + final class PgBouncerTests { |
340 | 344 |
|
341 | 345 | @ParameterizedTest
|
342 |
| - @ValueSource(ints = {0, 2, -1}) |
343 |
| - void multiplePreparedStatementsTest(int statementCacheSize) { |
344 |
| - PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(statementCacheSize); |
| 346 | + @ValueSource(strings = {"transaction", "statement"}) |
| 347 | + void disabledCacheWorksWithTransactionAndStatementModes(String poolMode) { |
| 348 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, poolMode)) { |
| 349 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, 0); |
| 350 | + |
| 351 | + connectionFactory.create().flatMapMany(connection -> { |
| 352 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 353 | + Flux<Integer> q2 = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 354 | + Flux<Integer> q3 = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 355 | + |
| 356 | + return Flux.concat(q1, q1, q2, q2, q3, q3, connection.close()); |
| 357 | + }) |
| 358 | + .as(StepVerifier::create) |
| 359 | + .expectNext(1, 1, 2, 2, 3, 3) |
| 360 | + .verifyComplete(); |
| 361 | + } |
| 362 | + } |
345 | 363 |
|
346 |
| - connectionFactory.create().flatMapMany(connection -> { |
347 |
| - Flux<Integer> firstQuery = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
348 |
| - Flux<Integer> secondQuery = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
349 |
| - Flux<Integer> thirdQuery = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 364 | + @ParameterizedTest |
| 365 | + @ValueSource(ints = {-1, 0, 2}) |
| 366 | + void sessionModeWorksWithAllCaches(int statementCacheSize) { |
| 367 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, "session")) { |
| 368 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, statementCacheSize); |
| 369 | + |
| 370 | + connectionFactory.create().flatMapMany(connection -> { |
| 371 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 372 | + Flux<Integer> q2 = connection.createStatement("SELECT 2 WHERE $1 = 2").bind(0, 2).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 373 | + Flux<Integer> q3 = connection.createStatement("SELECT 3 WHERE $1 = 3").bind(0, 3).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 374 | + |
| 375 | + return Flux.concat(q1, q1, q2, q2, q3, q3, connection.close()); |
| 376 | + }) |
| 377 | + .as(StepVerifier::create) |
| 378 | + .expectNext(1, 1, 2, 2, 3, 3) |
| 379 | + .verifyComplete(); |
| 380 | + } |
| 381 | + } |
350 | 382 |
|
351 |
| - return Flux.concat(firstQuery, secondQuery, thirdQuery, connection.close()); |
352 |
| - }) |
353 |
| - .as(StepVerifier::create) |
354 |
| - .expectNext(1, 2, 3) |
355 |
| - .verifyComplete(); |
| 383 | + @ParameterizedTest |
| 384 | + @ValueSource(strings = {"transaction", "statement"}) |
| 385 | + void statementCacheDoesntWorkWithTransactionAndStatementModes(String poolMode) { |
| 386 | + try (PgBouncer pgBouncer = new PgBouncer(SERVER, poolMode)) { |
| 387 | + PostgresqlConnectionFactory connectionFactory = this.createConnectionFactory(pgBouncer, -1); |
| 388 | + |
| 389 | + connectionFactory.create().flatMapMany(connection -> { |
| 390 | + Flux<Integer> q1 = connection.createStatement("SELECT 1 WHERE $1 = 1").bind(0, 1).execute().flatMap(r -> r.map((row, rowMetadata) -> row.get(0, Integer.class))); |
| 391 | + |
| 392 | + return Flux.concat(q1, q1, connection.close()); |
| 393 | + }) |
| 394 | + .as(StepVerifier::create) |
| 395 | + .expectNext(1) |
| 396 | + .verifyErrorMatches(e -> { |
| 397 | + if (!(e instanceof R2dbcBadGrammarException)) { |
| 398 | + return false; |
| 399 | + } |
| 400 | + if (!(e instanceof PostgresqlException)) { |
| 401 | + return false; |
| 402 | + } |
| 403 | + PostgresqlException pgException = (PostgresqlException) e; |
| 404 | + ErrorDetails errorDetails = pgException.getErrorDetails(); |
| 405 | + return errorDetails.getCode().equals("26000") && errorDetails.getMessage().equals("prepared statement \"S_0\" does not exist"); |
| 406 | + }); |
| 407 | + } |
356 | 408 | }
|
357 | 409 |
|
358 |
| - private PostgresqlConnectionFactory createConnectionFactory(int statementCacheSize) { |
| 410 | + private PostgresqlConnectionFactory createConnectionFactory(PgBouncer pgBouncer, int statementCacheSize) { |
359 | 411 | return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
|
360 |
| - .host(SERVER.getHost()) |
361 |
| - .port(SERVER.getPort()) |
| 412 | + .host(pgBouncer.getHost()) |
| 413 | + .port(pgBouncer.getPort()) |
362 | 414 | .username(SERVER.getUsername())
|
363 | 415 | .password(SERVER.getPassword())
|
364 | 416 | .database(SERVER.getDatabase())
|
|
0 commit comments