diff --git a/README.md b/README.md index 7393988..5d606e6 100644 --- a/README.md +++ b/README.md @@ -214,6 +214,93 @@ that is specific to Oracle Database and the Oracle JDBC Driver. Extended options are declared in the [OracleR2dbcOptions](src/main/java/oracle/r2dbc/OracleR2dbcOptions.java) class. +#### Support for Supplier and Publisher as Option Values +Most options can have a value provided by a `Supplier` or `Publisher`. + +Oracle R2DBC requests the value of an `Option` from a `Supplier` or `Publisher` +each time the `Publisher` returned by `ConnectionFactory.create()` creates a new +`Connection`. Each `Connection` can then be configured with values that change +over time, such as a password which is periodically rotated. + +If a `Supplier` provides the value of an `Option`, then Oracle R2DBC requests +the value by invoking `Supplier.get()`. If `get()` returns `null`, +then no value is configured for the `Option`. If `get()` throws a +`RuntimeException`, then it is set as the initial cause of an +`R2dbcException` emitted by the `Publisher` returned by +`ConnectionFactory.create()`. The `Supplier` must have a thread safe `get()` +method, as multiple subscribers may request connections concurrently. + +If a `Publisher` provides the value of an `Option`, then Oracle R2DBC requests +the value by subscribing to the `Publisher` and signalling demand. +The first value emitted to `onNext` will be used as the value of the `Option`. +If the `Publisher` emits `onComplete` before `onNext`, then no value is +configured for the `Option`. If the `Publisher` emits `onError` before `onNext`, +then the `Throwable` is set as the initial cause of an +`R2dbcException` emitted by the `Publisher` returned by +`ConnectionFactory.create()`. + +The following example configures the `PASSWORD` option with a `Supplier`: +```java + void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) { + + // Cast the PASSWORD option + Option> suppliedOption = OracleR2dbcOptions.supplied(PASSWORD); + + // Supply a password + Supplier supplier = () -> getPassword(); + + // Configure the builder + optionsBuilder.option(suppliedOption, supplier); + } +``` +A more concise example configures `TLS_WALLET_PASSWORD` as a `Publisher` +```java + void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) { + optionsBuilder.option( + OracleR2dbcOptions.published(TLS_WALLET_PASSWORD), + Mono.fromSupplier(() -> getWalletPassword())); + } +``` +These examples use the `supplied(Option)` and `published(Option)` methods +declared by `oracle.r2dbc.OracleR2dbcOptions`. These methods cast an `Option` +to `Option>` and `Option>`, respectively. It is +necessary to cast the generic type of the `Option` when calling +`ConnectionFactoryOptions.Builder.option(Option, T)` in order for the call to +compile and not throw a `ClassCastException` at runtime. It is not strictly +required that `supplied(Option)` or `published(Option)` be used to cast the +`Option`. These methods are only meant to offer code readability and +convenience. + +Note that the following code would compile, but fails at runtime with a +`ClassCastException`: +```java + void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) { + Publisher publisher = Mono.fromSupplier(() -> getPassword()); + // Doesn't work. Throws ClassCastException at runtime: + optionsBuilder.option(PASSWORD, PASSWORD.cast(publisher)); + } +``` +To avoid a `ClassCastException`, the generic type of an `Option` must match the +actual type of the value passed to +`ConnectionFactoryOptions.Builder.option(Option, T)`. + +For a small set of options, providing values with a `Supplier` or `Publisher` +is not supported: +- `DRIVER` +- `PROTOCOL` + +Providing values for these options would not be interoperable with +`io.r2dbc.spi.ConnectionFactories` and `r2dbc-pool`. + +Normally, Oracle R2DBC will not retain references to `Option` values after +`ConnectionFactories.create(ConnectionFactoryOptions)` returns. However, if +the value of at least one `Option` is provided by a `Supplier` or `Publisher`, +then Oracle R2DBC will retain a reference to all `Option` values until the +`ConnectionFactory.create()` `Publisher` emits a `Connection` or error. This is +important to keep in mind when `Option` values may be mutated. In particular, +a password may only be cleared from memory after the `create()` `Publisher` +emits a `Connection` or error. + #### Configuring an Oracle Net Descriptor The `oracle.r2dbc.OracleR2dbcOptions.DESCRIPTOR` option may be used to configure an Oracle Net Descriptor of the form ```(DESCRIPTION=...)```. If this option is diff --git a/src/main/java/oracle/r2dbc/OracleR2dbcOptions.java b/src/main/java/oracle/r2dbc/OracleR2dbcOptions.java index 2cf69f1..210b495 100644 --- a/src/main/java/oracle/r2dbc/OracleR2dbcOptions.java +++ b/src/main/java/oracle/r2dbc/OracleR2dbcOptions.java @@ -22,10 +22,12 @@ import io.r2dbc.spi.Option; import oracle.jdbc.OracleConnection; +import org.reactivestreams.Publisher; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; /** * Extended {@link Option}s supported by the Oracle R2DBC Driver. @@ -522,4 +524,66 @@ public static Set> options() { return OPTIONS; } + /** + *

+ * Casts an Option<T> to + * Option<Supplier<T>>. For instance, if an + * Option<CharSequence> is passed to this method, it is + * returned as an + * Option<Supplier<CharSequence>>. + *

+ * This method can used when configuring an Option with values + * from a Supplier: + *

{@code
+   * void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) {
+   *   optionsBuilder.option(supplied(PASSWORD), () -> getPassword());
+   * }
+   *
+   * CharSequence getPassword() {
+   *   // ... return a database password ...
+   * }
+   * }
+ *

+ * It is not strictly necessary to use this method when configuring an + * Option with a value from a Supplier. This method + * is offered for code readability and convenience. + *

+ */ + public static Option> supplied(Option option) { + @SuppressWarnings("unchecked") + Option> supplierOption = (Option>)option; + return supplierOption; + } + + /** + *

+ * Casts an Option<T> to + * Option<Publisher<T>>. For instance, if an + * Option<CharSequence> is passed to this method, it + * is returned as an + * Option<Publisher<CharSequence>>. + *

+ * This method can used when configuring an Option with values + * from a Publisher: + *

{@code
+   * void configurePassword(ConnectionFactoryOptions.Builder optionsBuilder) {
+   *   optionsBuilder.option(published(PASSWORD), getPasswordPublisher());
+   * }
+   *
+   * Publisher getPasswordPublisher() {
+   *   // ... publish a database password ...
+   * }
+   * }
+ *

+ * It is not strictly necessary to use this method when configuring an + * Option with a value from a Publisher. This method + * is offered for code readability and convenience. + *

+ */ + public static Option> published(Option option) { + @SuppressWarnings("unchecked") + Option> publisherOption = (Option>)option; + return publisherOption; + } + } diff --git a/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java index 08b2a5a..aa482d0 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java @@ -279,7 +279,7 @@ public Publisher create() { */ @Override public ConnectionFactoryMetadata getMetadata() { - return () -> "Oracle Database"; + return OracleConnectionFactoryMetadataImpl.INSTANCE; } } diff --git a/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryMetadataImpl.java b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryMetadataImpl.java new file mode 100644 index 0000000..cf05242 --- /dev/null +++ b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryMetadataImpl.java @@ -0,0 +1,43 @@ +/* + Copyright (c) 2020, 2021, Oracle and/or its affiliates. + + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package oracle.r2dbc.impl; + +import io.r2dbc.spi.ConnectionFactoryMetadata; + +/** + * Implementation of {@code ConnectionFactoryMetaData} which names + * "Oracle Database" as the database product that a + * {@link io.r2dbc.spi.ConnectionFactory} connects to. + */ +final class OracleConnectionFactoryMetadataImpl + implements ConnectionFactoryMetadata { + + static final OracleConnectionFactoryMetadataImpl INSTANCE = + new OracleConnectionFactoryMetadataImpl(); + + private OracleConnectionFactoryMetadataImpl() {} + + @Override + public String getName() { + return "Oracle Database"; + } +} diff --git a/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImpl.java b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImpl.java index b9938b5..e31637f 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImpl.java @@ -96,7 +96,11 @@ public OracleConnectionFactoryProviderImpl() { } public ConnectionFactory create(ConnectionFactoryOptions options) { assert supports(options) : "Options are not supported: " + options; requireNonNull(options, "options must not be null."); - return new OracleConnectionFactoryImpl(options); + + if (SuppliedOptionConnectionFactory.containsSuppliedValue(options)) + return new SuppliedOptionConnectionFactory(options); + else + return new OracleConnectionFactoryImpl(options); } /** diff --git a/src/main/java/oracle/r2dbc/impl/SuppliedOptionConnectionFactory.java b/src/main/java/oracle/r2dbc/impl/SuppliedOptionConnectionFactory.java new file mode 100644 index 0000000..780f4b7 --- /dev/null +++ b/src/main/java/oracle/r2dbc/impl/SuppliedOptionConnectionFactory.java @@ -0,0 +1,217 @@ +/* + Copyright (c) 2020, 2021, Oracle and/or its affiliates. + + This software is dual-licensed to you under the Universal Permissive License + (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License + 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose + either license. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package oracle.r2dbc.impl; + +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.ConnectionFactoryMetadata; +import io.r2dbc.spi.ConnectionFactoryOptions; +import io.r2dbc.spi.Option; +import oracle.r2dbc.OracleR2dbcOptions; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A connection factory having {@link io.r2dbc.spi.ConnectionFactoryOptions} + * with values provided by a {@link Supplier} or {@link Publisher}. Supplied + * values are requested when {@link #create()} is called. After all requested + * values are supplied, this factory delegates to + * {@link OracleConnectionFactoryProviderImpl#create(ConnectionFactoryOptions)}, + * with {@link ConnectionFactoryOptions} composed of the supplied values. + */ +final class SuppliedOptionConnectionFactory implements ConnectionFactory { + + /** + * The set of all options recognized by Oracle R2DBC. This set includes + * the standard options declared by {@link ConnectionFactoryOptions} and + * the extended options declared by {@link OracleR2dbcOptions}. + * + * TODO: This set only includes standard options defined for version 1.0.0 of + * the SPI. If a future SPI version introduces new options, those must be + * added to this set. + */ + private static final Set> ALL_OPTIONS = + Stream.concat( + // Standard options: + Stream.of( + ConnectionFactoryOptions.CONNECT_TIMEOUT, + ConnectionFactoryOptions.DATABASE, + ConnectionFactoryOptions.DRIVER, + ConnectionFactoryOptions.HOST, + ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT, + ConnectionFactoryOptions.PASSWORD, + ConnectionFactoryOptions.PORT, + ConnectionFactoryOptions.PROTOCOL, + ConnectionFactoryOptions.SSL, + ConnectionFactoryOptions.STATEMENT_TIMEOUT, + ConnectionFactoryOptions.USER), + // Extended options: + OracleR2dbcOptions.options().stream()) + .collect(Collectors.toUnmodifiableSet()); + + /** + * Publishers which emit the value of an option. The value may come from a + * user provided Publisher, from a user provided Supplier, or just a user + * provided value. So, all the option values below would get turned into a + * Publisher which is added to this set: + *
{@code
+   * ConnectionFactoryOptions.builder()
+   *   .option(DRIVER, "oracle")
+   *   .option(supplied(HOST, () -> "database-host")
+   *   .option(published(PORT, Mono.just(1521))
+   * }
+ */ + private final Set> optionValuePublishers; + + SuppliedOptionConnectionFactory(ConnectionFactoryOptions options) { + optionValuePublishers = ALL_OPTIONS.stream() + .map(option -> toOptionValuePublisher(option, options.getValue(option))) + .filter(Objects::nonNull) + .collect(Collectors.toUnmodifiableSet()); + } + + @Override + public Publisher create() { + return Flux.merge(optionValuePublishers) + .collectList() + .map(SuppliedOptionConnectionFactory::toConnectionFactoryOptions) + .flatMap(options -> + Mono.from(new OracleConnectionFactoryProviderImpl() + .create(options) + .create())); + } + + @Override + public ConnectionFactoryMetadata getMetadata() { + return OracleConnectionFactoryMetadataImpl.INSTANCE; + } + + /** + * Converts an {@link Option} and its value to a {@link Publisher} that + * emits an {@link OptionValue}. If the value passed to this + * method is a Supplier or Publisher, then the Publisher returned by this + * method emits an OptionValue with the value supplied by the + * Supplier or Publisher. If the value passed to this method is + * not a Supplier or Publisher, then the Publisher returned by this method + * just emits an OptionValue with the given value. + * + * @param option An option. Not null. + * @param value The value of the option, or null if there is no value, or a + * Supplier or Publisher which supplies the value of + * the option. + * @return A publisher that emits the option and its possibly supplied value, + * or this method returns null if the given value is null. + */ + private static Publisher toOptionValuePublisher( + Option option, Object value) { + final Publisher valuePublisher; + + if (value == null) + return null; + + if (value instanceof Supplier) + valuePublisher = Mono.fromSupplier((Supplier)value); + else if (value instanceof Publisher) + valuePublisher = Mono.from((Publisher)value); + else + return Mono.just(new OptionValue(option, value)); + + return Mono.from(valuePublisher) + .map(publishedValue -> new OptionValue(option, publishedValue)) + .onErrorMap(error -> OracleR2dbcExceptions.newNonTransientException( + "Error when requesting a value of " + option + + " from a Supplier or Publisher", + null, // sql + error)); + } + + /** + * Converts a collection of options and values into an instance of + * {@link ConnectionFactoryOptions}. + * + * @param optionValues Iterable options and values. Not null. + * @return ConnectionFactoryOptions configured with the given + * options and values. Not null. + */ + private static ConnectionFactoryOptions toConnectionFactoryOptions( + Iterable optionValues) { + + ConnectionFactoryOptions.Builder optionsBuilder = + ConnectionFactoryOptions.builder(); + + optionValues.forEach(optionValue -> + optionValue.configure(optionsBuilder)); + + return optionsBuilder.build(); + } + + /** + * Checks if the value of an option is supplied by a {@link Supplier} or + * {@link Publisher}. This method is used to check if + * {@link SuppliedOptionConnectionFactory} should be used to handle any + * supplied option values. If this method returns false, then there is no + * reason to use this connection factory. + * + * @param options Options that may a value which is a {@link Supplier} or + * {@link Publisher}. + * @return true if the value of at least one option is a {@link Supplier} or + * {@link Publisher}. Returns false otherwise. + */ + static boolean containsSuppliedValue( + ConnectionFactoryOptions options) { + return ALL_OPTIONS.stream() + .map(options::getValue) + .anyMatch(value -> + value instanceof Supplier || value instanceof Publisher); + } + + /** A record of an {@link Option} and its value */ + private static final class OptionValue { + + final Option option; + + final Object value; + + OptionValue(Option option, Object value) { + this.option = option; + this.value = value; + } + + /** + * @param builder Builder to configure with the {@link #value} of an + * {@link #option}. Not null. + */ + void configure(ConnectionFactoryOptions.Builder builder) { + @SuppressWarnings("unchecked") + Option option = (Option)this.option; + builder.option(option, value); + } + + } +} diff --git a/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryImplTest.java index ec17096..dec6374 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryImplTest.java @@ -35,7 +35,14 @@ import java.time.Duration; import java.util.HashSet; import java.util.Set; - +import java.util.function.IntSupplier; +import java.util.function.Supplier; + +import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE; +import static io.r2dbc.spi.ConnectionFactoryOptions.HOST; +import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD; +import static io.r2dbc.spi.ConnectionFactoryOptions.PORT; +import static io.r2dbc.spi.ConnectionFactoryOptions.USER; import static oracle.r2dbc.test.DatabaseConfig.connectionFactoryOptions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -59,8 +66,8 @@ public void testDiscovery() { ConnectionFactories .get(ConnectionFactoryOptions.builder() .option(ConnectionFactoryOptions.DRIVER, "oracle") - .option(ConnectionFactoryOptions.HOST, "dbhost") - .option(ConnectionFactoryOptions.PORT, 1521) + .option(HOST, "dbhost") + .option(PORT, 1521) .option(ConnectionFactoryOptions.DATABASE, "service_name") .build()) .getClass()); @@ -70,8 +77,8 @@ public void testDiscovery() { ConnectionFactories .find(ConnectionFactoryOptions.builder() .option(ConnectionFactoryOptions.DRIVER, "oracle") - .option(ConnectionFactoryOptions.HOST, "dbhost") - .option(ConnectionFactoryOptions.PORT, 1521) + .option(HOST, "dbhost") + .option(PORT, 1521) .option(ConnectionFactoryOptions.DATABASE, "service_name") .build()) .getClass()); @@ -107,25 +114,7 @@ public void testDiscovery() { public void testCreate() { Publisher connectionPublisher = new OracleConnectionFactoryImpl(connectionFactoryOptions()).create(); - - // Expect publisher to emit one connection to each subscriber - Set connections = new HashSet<>(); - Flux.from(connectionPublisher) - .doOnNext(connections::add) - .doOnNext(connection -> Mono.from(connection.close()).subscribe()) - .blockLast(DatabaseConfig.connectTimeout()); - assertEquals(1, connections.size()); - Flux.from(connectionPublisher) - .doOnNext(connections::add) - .doOnNext(connection -> Mono.from(connection.close()).subscribe()) - .blockLast(DatabaseConfig.connectTimeout()); - assertEquals(2, connections.size()); - Flux.from(connectionPublisher) - .doOnNext(connections::add) - .doOnNext(connection -> Mono.from(connection.close()).subscribe()) - .blockLast(DatabaseConfig.connectTimeout()); - assertEquals(3, connections.size()); - + verifyConnectionPublisher(connectionPublisher); } /** @@ -182,12 +171,35 @@ public void testGetMetadata() { new OracleConnectionFactoryImpl( ConnectionFactoryOptions.builder() .option(ConnectionFactoryOptions.DRIVER, "oracle") - .option(ConnectionFactoryOptions.HOST, "dbhost") - .option(ConnectionFactoryOptions.PORT, 1521) + .option(HOST, "dbhost") + .option(PORT, 1521) .option(ConnectionFactoryOptions.DATABASE, "service_name") .build()) .getMetadata() .getName()); } + /** Verifies that a publisher emits connections to multiple subscribers */ + private static void verifyConnectionPublisher( + Publisher connectionPublisher) { + + // Expect publisher to emit one connection to each subscriber + Set connections = new HashSet<>(); + Flux.from(connectionPublisher) + .doOnNext(connections::add) + .doOnNext(connection -> Mono.from(connection.close()).subscribe()) + .blockLast(DatabaseConfig.connectTimeout()); + assertEquals(1, connections.size()); + Flux.from(connectionPublisher) + .doOnNext(connections::add) + .doOnNext(connection -> Mono.from(connection.close()).subscribe()) + .blockLast(DatabaseConfig.connectTimeout()); + assertEquals(2, connections.size()); + Flux.from(connectionPublisher) + .doOnNext(connections::add) + .doOnNext(connection -> Mono.from(connection.close()).subscribe()) + .blockLast(DatabaseConfig.connectTimeout()); + assertEquals(3, connections.size()); + } + } diff --git a/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImplTest.java index ddce4f5..01273fc 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryProviderImplTest.java @@ -21,11 +21,32 @@ package oracle.r2dbc.impl; +import io.r2dbc.spi.Connection; import io.r2dbc.spi.ConnectionFactories; +import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.ConnectionFactoryOptions; import io.r2dbc.spi.Option; +import io.r2dbc.spi.R2dbcException; +import oracle.r2dbc.test.DatabaseConfig; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE; +import static io.r2dbc.spi.ConnectionFactoryOptions.HOST; +import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD; +import static io.r2dbc.spi.ConnectionFactoryOptions.PORT; +import static io.r2dbc.spi.ConnectionFactoryOptions.USER; +import static oracle.r2dbc.OracleR2dbcOptions.published; +import static oracle.r2dbc.OracleR2dbcOptions.supplied; +import static oracle.r2dbc.test.DatabaseConfig.connectionFactoryOptions; +import static oracle.r2dbc.test.DatabaseConfig.password; +import static oracle.r2dbc.util.Awaits.awaitError; +import static oracle.r2dbc.util.Awaits.awaitOne; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -152,4 +173,282 @@ public void testSupports() { .option(unsupported, "expect Oracle R2DBC to ignore this") .build())); } + + @Test + public void testSupplierOption() { + Supplier hostSupplier = DatabaseConfig::host; + Supplier portSupplier = DatabaseConfig::port; + Supplier databaseSupplier = DatabaseConfig::serviceName; + TestSupplier userSupplier = + new TestSupplier<>(DatabaseConfig.user()); + Supplier passwordSupplier = DatabaseConfig::password; + + ConnectionFactoryOptions connectionFactoryOptions = + connectionFactoryOptions() + .mutate() + .option(supplied(HOST), hostSupplier) + .option(supplied(PORT), portSupplier) + .option(supplied(DATABASE), databaseSupplier) + .option(supplied(USER), userSupplier) + .option(supplied(PASSWORD), passwordSupplier) + .build(); + + ConnectionFactory connectionFactory = + ConnectionFactories.get(connectionFactoryOptions); + + // Round 1: Expect success as all values are supplied + verifyConnection(connectionFactory); + + // Round 2: Expect ORA-01017 has an invalid username is supplied + userSupplier.value = "not" + DatabaseConfig.user(); + R2dbcException r2dbcException = verifyConnectionError(connectionFactory); + assertEquals(1017, r2dbcException.getErrorCode()); + + // Round 3: Expect success as all values are supplied + userSupplier.value = DatabaseConfig.user(); + verifyConnection(connectionFactory); + + } + + @Test + public void testSupplierOptionNull() { + Supplier hostSupplier = DatabaseConfig::host; + Supplier portSupplier = DatabaseConfig::port; + Supplier databaseSupplier = DatabaseConfig::serviceName; + Supplier userSupplier = DatabaseConfig::user; + TestSupplier passwordSupplier = new TestSupplier(password()); + + ConnectionFactoryOptions connectionFactoryOptions = + connectionFactoryOptions() + .mutate() + .option(supplied(HOST), hostSupplier) + .option(supplied(PORT), portSupplier) + .option(supplied(DATABASE), databaseSupplier) + .option(supplied(USER), userSupplier) + .option(supplied(PASSWORD), passwordSupplier) + .option( + // Oracle Database doesn't support this option, and Oracle R2DBC + // throws an exception if it is set. The supplied null value should + // have it not set. + supplied(ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT), + () -> null) + .build(); + + ConnectionFactory connectionFactory = + ConnectionFactories.get(connectionFactoryOptions); + + // Round 1: Verify success with no lock wait timeout and a password + verifyConnection(connectionFactory); + + // Round 2: Verify failure with a null password. The expected error code + // may depend on the version of the test database. Expect ORA-01005 with a + // 23.3 database. + passwordSupplier.value = null; + R2dbcException r2dbcException = verifyConnectionError(connectionFactory); + assertEquals(1005, r2dbcException.getErrorCode()); + } + + @Test + public void testSupplierOptionError() { + class TestException extends RuntimeException { } + + Supplier hostSupplier = DatabaseConfig::host; + Supplier portSupplier = DatabaseConfig::port; + Supplier databaseSupplier = DatabaseConfig::serviceName; + TestSupplier userSupplier = new TestSupplier<>(new TestException()); + Supplier passwordSupplier = DatabaseConfig::password; + + ConnectionFactoryOptions connectionFactoryOptions = + connectionFactoryOptions() + .mutate() + .option(supplied(HOST), hostSupplier) + .option(supplied(PORT), portSupplier) + .option(supplied(DATABASE), databaseSupplier) + .option(supplied(USER), userSupplier) + .option(supplied(PASSWORD), passwordSupplier) + .build(); + + ConnectionFactory connectionFactory = + ConnectionFactories.get(connectionFactoryOptions); + + // Round 1: Expect a failure from the TestSupplier + R2dbcException r2dbcException = verifyConnectionError(connectionFactory); + + assertTrue( + r2dbcException.getCause() instanceof TestException, + "Unexpected cause: " + r2dbcException.getCause()); + + // Round 2: Expect success as the TestSupplier no longer throws an error + userSupplier.error = null; + userSupplier.value = DatabaseConfig.user(); + verifyConnection(connectionFactory); + } + + @Test + public void testPublisherOption() { + Publisher hostPublisher = Mono.fromSupplier(DatabaseConfig::host); + Publisher portPublisher = Mono.fromSupplier(DatabaseConfig::port); + Publisher databasePublisher = Mono.fromSupplier(DatabaseConfig::serviceName); + TestSupplier userPublisher = + new TestSupplier<>(DatabaseConfig.user()); + Publisher passwordPublisher = Mono.fromSupplier(DatabaseConfig::password); + + ConnectionFactoryOptions connectionFactoryOptions = + connectionFactoryOptions() + .mutate() + .option(published(HOST), hostPublisher) + .option(published(PORT), portPublisher) + .option(published(DATABASE), databasePublisher) + .option(published(USER), Mono.fromSupplier(userPublisher)) + .option(published(PASSWORD), passwordPublisher) + .build(); + + ConnectionFactory connectionFactory = + ConnectionFactories.get(connectionFactoryOptions); + + // Round 1: Expect success as all values are published + verifyConnection(connectionFactory); + + // Round 2: Expect ORA-01017 has an invalid username is published + userPublisher.value = "not" + DatabaseConfig.user(); + R2dbcException r2dbcException = verifyConnectionError(connectionFactory); + assertEquals(1017, r2dbcException.getErrorCode()); + + // Round 3: Expect success as all values are published + userPublisher.value = DatabaseConfig.user(); + verifyConnection(connectionFactory); + + } + + @Test + public void testPublisherOptionNull() { + Publisher hostPublisher = Mono.fromSupplier(DatabaseConfig::host); + Publisher portPublisher = Mono.fromSupplier(DatabaseConfig::port); + Publisher databasePublisher = Mono.fromSupplier(DatabaseConfig::serviceName); + Publisher userPublisher = Mono.fromSupplier(DatabaseConfig::user); + TestSupplier passwordPublisher = new TestSupplier(password()); + + ConnectionFactoryOptions connectionFactoryOptions = + connectionFactoryOptions() + .mutate() + .option(published(HOST), hostPublisher) + .option(published(PORT), portPublisher) + .option(published(DATABASE), databasePublisher) + .option(published(USER), userPublisher) + .option(published(PASSWORD), Mono.fromSupplier(passwordPublisher)) + .option( + // Oracle Database doesn't support this option, and Oracle R2DBC + // throws an exception if it is set. The published null value should + // have it not set. + published(ConnectionFactoryOptions.LOCK_WAIT_TIMEOUT), + Mono.empty()) + .build(); + + ConnectionFactory connectionFactory = + ConnectionFactories.get(connectionFactoryOptions); + + // Round 1: Verify success with no lock wait timeout and a password + verifyConnection(connectionFactory); + + // Round 2: Verify failure with a null password. The expected error code + // may depend on the version of the test database. Expect ORA-01005 with a + // 23.3 database. + passwordPublisher.value = null; + R2dbcException r2dbcException = verifyConnectionError(connectionFactory); + assertEquals(1005, r2dbcException.getErrorCode()); + } + + @Test + public void testPublisherOptionError() { + class TestException extends RuntimeException { } + + Publisher hostPublisher = Mono.fromSupplier(DatabaseConfig::host); + Publisher portPublisher = Mono.fromSupplier(DatabaseConfig::port); + Publisher databasePublisher = Mono.fromSupplier(DatabaseConfig::serviceName); + TestSupplier userPublisher = new TestSupplier<>(new TestException()); + Publisher passwordPublisher = Mono.fromSupplier(DatabaseConfig::password); + + ConnectionFactoryOptions connectionFactoryOptions = + connectionFactoryOptions() + .mutate() + .option(published(HOST), hostPublisher) + .option(published(PORT), portPublisher) + .option(published(DATABASE), databasePublisher) + .option(published(USER), Mono.fromSupplier(userPublisher)) + .option(published(PASSWORD), passwordPublisher) + .build(); + + ConnectionFactory connectionFactory = + ConnectionFactories.get(connectionFactoryOptions); + + // Round 1: Expect a failure from the TestSupplier + R2dbcException r2dbcException = verifyConnectionError(connectionFactory); + + assertTrue( + r2dbcException.getCause() instanceof TestException, + "Unexpected cause: " + r2dbcException.getCause()); + + // Round 2: Expect success as the TestSupplier no longer throws an error + userPublisher.error = null; + userPublisher.value = DatabaseConfig.user(); + verifyConnection(connectionFactory); + } + + /** Verifies that a connection can be created with the given options */ + private void verifyConnection(ConnectionFactory connectionFactory) { + + awaitOne( + 1, + Flux.usingWhen( + connectionFactory.create(), + connection -> + Flux.from( + connection.createStatement("SELECT 1 FROM sys.dual").execute()) + .flatMap(result -> + result.map(row -> row.get(0, Integer.class))), + Connection::close)); + } + + /** + * Verifies that a connection fails to be created with the given options, + * and returns the exception. + */ + private R2dbcException verifyConnectionError( + ConnectionFactory connectionFactory) { + + return awaitError( + R2dbcException.class, + Flux.usingWhen( + connectionFactory.create(), + connection -> + Flux.from( + connection.createStatement("SELECT 1 FROM sys.dual").execute()) + .flatMap(result -> + result.map(row -> row.get(0, Integer.class))), + Connection::close)); + } + + private static final class TestSupplier implements Supplier { + + T value; + + RuntimeException error; + + TestSupplier(RuntimeException error) { + this.error = error; + } + + TestSupplier(T value) { + this.value = value; + } + + @Override + public T get() { + if (error != null) + throw error; + + return value; + } + } + } diff --git a/src/test/java/oracle/r2dbc/util/Awaits.java b/src/test/java/oracle/r2dbc/util/Awaits.java index a9c241a..30bd782 100644 --- a/src/test/java/oracle/r2dbc/util/Awaits.java +++ b/src/test/java/oracle/r2dbc/util/Awaits.java @@ -104,9 +104,9 @@ public static void awaitNone(Publisher emptyPublisher) { * @throws Throwable If the publisher emits {@code onError} with a * {@code Throwable} that is not an instance of {@code errorType}. */ - public static void awaitError( - Class errorType, Publisher errorPublisher) { - assertThrows( + public static T awaitError( + Class errorType, Publisher errorPublisher) { + return assertThrows( errorType, () -> Flux.from(errorPublisher).blockLast(sqlTimeout()), "Unexpected signal from Publisher of an error");