From 771ea892cab254a79a751c6f921ddeb65994a191 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Fri, 30 Aug 2024 09:36:03 -0600 Subject: [PATCH 01/10] Add AsyncTransportSettings, ExecutorService --- .../connection/AsyncTransportSettings.java | 82 +++++++++++++++++++ .../connection/NettyTransportSettings.java | 2 +- .../mongodb/connection/TransportSettings.java | 10 +++ .../AsynchronousSocketChannelStream.java | 23 +++++- ...synchronousSocketChannelStreamFactory.java | 18 +++- ...nousSocketChannelStreamFactoryFactory.java | 15 +++- .../connection/StreamFactoryHelper.java | 47 +++++++++-- .../TlsChannelStreamFactoryFactory.java | 10 ++- .../async/AsynchronousTlsChannelGroup.java | 41 +++++----- .../connection/StreamFactoryHelperTest.java | 8 +- .../reactivestreams/client/MongoClients.java | 25 ++---- .../client/AsyncTransportSettingsTest.java | 70 ++++++++++++++++ .../connection/AsyncTransportSettings.scala | 32 ++++++++ .../scala/connection/TransportSettings.scala | 8 ++ .../mongodb/scala/connection/package.scala | 7 ++ .../client/internal/MongoClientImpl.java | 12 +-- 16 files changed, 349 insertions(+), 61 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java create mode 100644 driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java create mode 100644 driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java new file mode 100644 index 00000000000..65eefbf78ac --- /dev/null +++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java @@ -0,0 +1,82 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package com.mongodb.connection; + +import com.mongodb.lang.Nullable; + +import java.util.concurrent.ExecutorService; + +/** + * {@link TransportSettings} for an async transport implementation. + * + * @since 5.2 + */ +public class AsyncTransportSettings extends TransportSettings { + + private final ExecutorService executorService; + + public AsyncTransportSettings(final Builder builder) { + this.executorService = builder.executorService; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * A builder for an instance of {@link AsyncTransportSettings} + */ + public static final class Builder { + + private ExecutorService executorService; + + private Builder() { + } + + /** + * Sets the executor service + * + * @param executorService the executor service + * @return this + * @see #getExecutorService() + */ + public Builder executorService(final ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + /** + * Build an instance of {@link AsyncTransportSettings} + * @return an instance of {@link AsyncTransportSettings} + */ + public AsyncTransportSettings build() { + return new AsyncTransportSettings(this); + } + } + + /** + * Gets the executor service + * + * @return the executor service + * @see Builder#executorService(ExecutorService) + */ + @Nullable + public ExecutorService getExecutorService() { + return executorService; + } +} diff --git a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java index ef9d68b32b4..e55f1adc111 100644 --- a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java @@ -137,7 +137,7 @@ public Builder sslContext(final SslContext sslContext) { /** * Build an instance of {@code NettyTransportSettings}. * - * @return factory for {@code NettyTransportSettings} + * @return an instance of {@code NettyTransportSettings} */ public NettyTransportSettings build() { return new NettyTransportSettings(this); diff --git a/driver-core/src/main/com/mongodb/connection/TransportSettings.java b/driver-core/src/main/com/mongodb/connection/TransportSettings.java index f897a481eb4..50797f541f5 100644 --- a/driver-core/src/main/com/mongodb/connection/TransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/TransportSettings.java @@ -35,4 +35,14 @@ public abstract class TransportSettings { public static NettyTransportSettings.Builder nettyBuilder() { return NettyTransportSettings.builder(); } + + /** + * A builder for {@link AsyncTransportSettings}. + * + * @return a builder for {@link AsyncTransportSettings} + * @since 5.2 + */ + public static AsyncTransportSettings.Builder asyncBuilder() { + return AsyncTransportSettings.builder(); + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java index 4818b1f7ac4..cd20f2ef34e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java @@ -28,10 +28,12 @@ import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -46,13 +48,24 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt private final ServerAddress serverAddress; private final InetAddressResolver inetAddressResolver; private final SocketSettings settings; + @Nullable + private final ExecutorService executorService; - public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, + public AsynchronousSocketChannelStream( + final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) { + this(serverAddress, inetAddressResolver, settings, bufferProvider, null); + } + + public AsynchronousSocketChannelStream( + final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, + final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider, + @Nullable final ExecutorService executorService) { super(serverAddress, settings, bufferProvider); this.serverAddress = serverAddress; this.inetAddressResolver = inetAddressResolver; this.settings = settings; + this.executorService = executorService; } @Override @@ -77,7 +90,13 @@ private void initializeSocketChannel(final AsyncCompletionHandler handler, SocketAddress socketAddress = socketAddressQueue.poll(); try { - AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open(); + AsynchronousSocketChannel attemptConnectionChannel; + if (executorService != null) { + AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService); + attemptConnectionChannel = AsynchronousSocketChannel.open(group); + } else { + attemptConnectionChannel = AsynchronousSocketChannel.open(); + } attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); if (settings.getReceiveBufferSize() > 0) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java index 65dd6194dcd..a8324d62034 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java @@ -19,8 +19,11 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; +import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; +import java.util.concurrent.ExecutorService; + import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.notNull; @@ -31,6 +34,8 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory { private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT; private final SocketSettings settings; private final InetAddressResolver inetAddressResolver; + @Nullable + private final ExecutorService executorService; /** * Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}. @@ -38,16 +43,25 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory { * @param settings the settings for the connection to a MongoDB server * @param sslSettings the settings for connecting via SSL */ - public AsynchronousSocketChannelStreamFactory(final InetAddressResolver inetAddressResolver, final SocketSettings settings, + public AsynchronousSocketChannelStreamFactory( + final InetAddressResolver inetAddressResolver, final SocketSettings settings, final SslSettings sslSettings) { + this(inetAddressResolver, settings, sslSettings, null); + } + + public AsynchronousSocketChannelStreamFactory( + final InetAddressResolver inetAddressResolver, final SocketSettings settings, + final SslSettings sslSettings, @Nullable final ExecutorService executorService) { assertFalse(sslSettings.isEnabled()); this.inetAddressResolver = inetAddressResolver; this.settings = notNull("settings", settings); + this.executorService = executorService; } @Override public Stream create(final ServerAddress serverAddress) { - return new AsynchronousSocketChannelStream(serverAddress, inetAddressResolver, settings, bufferProvider); + return new AsynchronousSocketChannelStream( + serverAddress, inetAddressResolver, settings, bufferProvider, executorService); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java index db9166eda64..9dd2cbb9fe2 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java @@ -18,8 +18,11 @@ import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; +import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; +import java.util.concurrent.ExecutorService; + /** * A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams. * @@ -27,14 +30,24 @@ */ public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory { private final InetAddressResolver inetAddressResolver; + @Nullable + private final ExecutorService executorService; public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { + this(inetAddressResolver, null); + } + + public AsynchronousSocketChannelStreamFactoryFactory( + final InetAddressResolver inetAddressResolver, + @Nullable final ExecutorService executorService) { this.inetAddressResolver = inetAddressResolver; + this.executorService = executorService; } @Override public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { - return new AsynchronousSocketChannelStreamFactory(inetAddressResolver, socketSettings, sslSettings); + return new AsynchronousSocketChannelStreamFactory( + inetAddressResolver, socketSettings, sslSettings, executorService); } @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java index ef40c164cba..a82a3d752b5 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java @@ -17,26 +17,63 @@ package com.mongodb.internal.connection; import com.mongodb.MongoClientException; +import com.mongodb.MongoClientSettings; +import com.mongodb.connection.AsyncTransportSettings; import com.mongodb.connection.NettyTransportSettings; +import com.mongodb.connection.SocketSettings; import com.mongodb.connection.TransportSettings; import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory; import com.mongodb.spi.dns.InetAddressResolver; +import java.util.concurrent.ExecutorService; + /** *

This class is not part of the public API and may be removed or changed at any time

*/ public final class StreamFactoryHelper { - public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings, + + public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings, + final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) { + TransportSettings transportSettings = settings.getTransportSettings(); + if (transportSettings == null) { + return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings()); + } else if (transportSettings instanceof AsyncTransportSettings) { + throw new MongoClientException("Unsupported async transport settings: " + transportSettings.getClass().getName()); + } else if (transportSettings instanceof NettyTransportSettings) { + return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings) + .create(socketSettings, settings.getSslSettings()); + } else { + throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName()); + } + } + + public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClientSettings settings, final InetAddressResolver inetAddressResolver) { - if (transportSettings instanceof NettyTransportSettings) { - return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings) - .inetAddressResolver(inetAddressResolver) - .build(); + TransportSettings transportSettings = settings.getTransportSettings(); + if (transportSettings == null || transportSettings instanceof AsyncTransportSettings) { + ExecutorService executorService = transportSettings == null + ? null + : ((AsyncTransportSettings) transportSettings).getExecutorService(); + if (settings.getSslSettings().isEnabled()) { + return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService); + } else { + return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, executorService); + } + } else if (transportSettings instanceof NettyTransportSettings) { + return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings); } else { throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName()); } } + private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver, + final NettyTransportSettings transportSettings) { + return NettyStreamFactoryFactory.builder() + .applySettings(transportSettings) + .inetAddressResolver(inetAddressResolver) + .build(); + } + private StreamFactoryHelper() { } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index 436fccb0996..de05e773101 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -46,6 +46,7 @@ import java.security.NoSuchAlgorithmException; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -71,13 +72,18 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory { /** * Construct a new instance */ - public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { + public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver, + @Nullable final ExecutorService executorService) { this.inetAddressResolver = inetAddressResolver; - this.group = new AsynchronousTlsChannelGroup(); + this.group = new AsynchronousTlsChannelGroup(executorService); selectorMonitor = new SelectorMonitor(); selectorMonitor.start(); } + public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { + this(inetAddressResolver, null); + } + @Override public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { assertTrue(sslSettings.isEnabled()); diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java index 2b34226ebac..a4e088f90ca 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java @@ -27,6 +27,7 @@ import com.mongodb.internal.connection.tlschannel.util.Util; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.lang.Nullable; import java.io.IOException; import java.nio.channels.CancelledKeyException; @@ -72,6 +73,7 @@ public class AsynchronousTlsChannelGroup { private static final int queueLengthMultiplier = 32; private static final AtomicInteger globalGroupCount = new AtomicInteger(); + private final boolean executorIsExternal; class RegisteredSocket { @@ -199,35 +201,32 @@ private enum Shutdown { /** * Creates an instance of this class. - * - * @param nThreads number of threads in the executor used to assist the selector loop and run - * completion handlers. */ - public AsynchronousTlsChannelGroup(int nThreads) { + public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorService) { try { selector = Selector.open(); } catch (IOException e) { throw new RuntimeException(e); } timeoutExecutor.setRemoveOnCancelPolicy(true); - this.executor = - new ThreadPoolExecutor( - nThreads, - nThreads, - 0, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier), - runnable -> - new Thread(runnable, format("async-channel-group-%d-handler-executor", id)), - new ThreadPoolExecutor.CallerRunsPolicy()); + if (executorService != null) { + this.executorIsExternal = true; + this.executor = executorService; + } else { + int nThreads = Runtime.getRuntime().availableProcessors(); + this.executorIsExternal = false; + this.executor = new ThreadPoolExecutor( + nThreads, + nThreads, + 0, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier), + runnable -> new Thread(runnable, format("async-channel-group-%d-handler-executor", id)), + new ThreadPoolExecutor.CallerRunsPolicy()); + } selectorThread.start(); } - /** Creates an instance of this class, using as many thread as available processors. */ - public AsynchronousTlsChannelGroup() { - this(Runtime.getRuntime().availableProcessors()); - } - void submit(final Runnable r) { executor.submit(r); } @@ -425,7 +424,9 @@ private void loop() { } catch (Throwable e) { LOGGER.error("error in selector loop", e); } finally { - executor.shutdown(); + if (!executorIsExternal) { + executor.shutdown(); + } // use shutdownNow to stop delayed tasks timeoutExecutor.shutdownNow(); try { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java index 90989a8e133..9afd1478fe4 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java @@ -16,6 +16,7 @@ package com.mongodb.internal.connection; +import com.mongodb.MongoClientSettings; import com.mongodb.connection.NettyTransportSettings; import com.mongodb.connection.TransportSettings; import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory; @@ -37,8 +38,13 @@ void streamFactoryFactoryIsDerivedFromTransportSettings() { .allocator(PooledByteBufAllocator.DEFAULT) .socketChannelClass(io.netty.channel.socket.oio.OioSocketChannel.class) .build(); + + MongoClientSettings settings = MongoClientSettings.builder() + .transportSettings(nettyTransportSettings) + .build(); + assertEquals(NettyStreamFactoryFactory.builder().applySettings(nettyTransportSettings) .inetAddressResolver(inetAddressResolver).build(), - StreamFactoryHelper.getStreamFactoryFactoryFromSettings(nettyTransportSettings, inetAddressResolver)); + StreamFactoryHelper.getAsyncStreamFactoryFactory(settings, inetAddressResolver)); } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java index a2f5fb9d125..57ee076039e 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java @@ -20,15 +20,13 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoClientSettings; import com.mongodb.MongoDriverInformation; -import com.mongodb.connection.TransportSettings; +import com.mongodb.connection.SocketSettings; import com.mongodb.internal.TimeoutSettings; -import com.mongodb.internal.connection.AsynchronousSocketChannelStreamFactoryFactory; import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.connection.DefaultClusterFactory; import com.mongodb.internal.connection.InternalConnectionPoolSettings; import com.mongodb.internal.connection.StreamFactory; import com.mongodb.internal.connection.StreamFactoryFactory; -import com.mongodb.internal.connection.TlsChannelStreamFactoryFactory; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.internal.MongoClientImpl; import com.mongodb.spi.dns.InetAddressResolver; @@ -36,7 +34,7 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver; -import static com.mongodb.internal.connection.StreamFactoryHelper.getStreamFactoryFactoryFromSettings; +import static com.mongodb.internal.connection.StreamFactoryHelper.getAsyncStreamFactoryFactory; import static com.mongodb.internal.event.EventListenerHelper.getCommandListener; @@ -115,17 +113,8 @@ public static MongoClient create(final MongoClientSettings settings, @Nullable f if (settings.getSocketSettings().getProxySettings().isProxyEnabled()) { throw new MongoClientException("Proxy is not supported for reactive clients"); } - InetAddressResolver inetAddressResolver = getInetAddressResolver(settings); - StreamFactoryFactory streamFactoryFactory; - TransportSettings transportSettings = settings.getTransportSettings(); - if (transportSettings != null) { - streamFactoryFactory = getStreamFactoryFactoryFromSettings(transportSettings, inetAddressResolver); - } else if (settings.getSslSettings().isEnabled()) { - streamFactoryFactory = new TlsChannelStreamFactoryFactory(inetAddressResolver); - } else { - streamFactoryFactory = new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver); - } + StreamFactoryFactory streamFactoryFactory = getAsyncStreamFactoryFactory(settings, inetAddressResolver); StreamFactory streamFactory = getStreamFactory(streamFactoryFactory, settings, false); StreamFactory heartbeatStreamFactory = getStreamFactory(streamFactoryFactory, settings, true); MongoDriverInformation wrappedMongoDriverInformation = wrapMongoDriverInformation(mongoDriverInformation); @@ -161,10 +150,12 @@ private static MongoDriverInformation wrapMongoDriverInformation(@Nullable final .driverName("reactive-streams").build(); } - private static StreamFactory getStreamFactory(final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings, + private static StreamFactory getStreamFactory( + final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings, final boolean isHeartbeat) { - return streamFactoryFactory.create(isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(), - settings.getSslSettings()); + SocketSettings socketSettings = isHeartbeat + ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); + return streamFactoryFactory.create(socketSettings, settings.getSslSettings()); } private MongoClients() { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java new file mode 100644 index 00000000000..efb0a471b4e --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java @@ -0,0 +1,70 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client; + +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.connection.AsyncTransportSettings; +import com.mongodb.connection.TransportSettings; +import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +class AsyncTransportSettingsTest { + @Test + public void shouldDefaultAllValuesToNull() { + AsyncTransportSettings settings = TransportSettings.asyncBuilder().build(); + + assertNull(settings.getExecutorService()); + } + + @Test + public void shouldApplySettingsFromBuilder() { + ExecutorService executorService = Executors.newFixedThreadPool(1); + AsyncTransportSettings settings = TransportSettings.asyncBuilder() + .executorService(executorService) + .build(); + + assertEquals(executorService, settings.getExecutorService()); + } + + @Test + void testAsyncTransportSettings() { + ExecutorService executorService = spy(Executors.newFixedThreadPool(5)); + AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() + .executorService(executorService) + .build(); + MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder() + .transportSettings(asyncTransportSettings) + .build(); + + try (MongoClient client = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { + client.listDatabases().first(); + } + verify(executorService, atLeastOnce()).execute(any()); + } +} diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala new file mode 100644 index 00000000000..5157c58501d --- /dev/null +++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.mongodb.scala.connection + +import com.mongodb.connection.{ AsyncTransportSettings => JAsyncTransportSettings } + +/** + * Async transport settings for the driver. + * + * @since 5.2 + */ +object AsyncTransportSettings { + + /** + * AsyncTransportSettings builder type + */ + type Builder = JAsyncTransportSettings.Builder +} diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala index 3e194ea96ca..c41bc958d84 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala @@ -31,4 +31,12 @@ object TransportSettings { * @return a new Builder for creating NettyTransportSettings. */ def nettyBuilder(): NettyTransportSettings.Builder = JTransportSettings.nettyBuilder() + + /** + * Creates a builder for AsyncTransportSettings. + * + * @return a new Builder for creating AsyncTransportSettings. + * @since 5.2 + */ + def asyncBuilder(): AsyncTransportSettings.Builder = JTransportSettings.asyncBuilder() } diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala index adfb8a02c04..e283f4e07be 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala @@ -75,4 +75,11 @@ package object connection { * @since 4.11 */ type NettyTransportSettings = com.mongodb.connection.NettyTransportSettings + + /** + * TransportSettings for an async transport implementation. + * + * @since 5.2 + */ + type AsyncTransportSettings = com.mongodb.connection.AsyncTransportSettings } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java index 473d8ec4e8e..d7ee2ff64ca 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java @@ -33,12 +33,10 @@ import com.mongodb.client.SynchronousContextProvider; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.SocketSettings; -import com.mongodb.connection.TransportSettings; import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.connection.DefaultClusterFactory; import com.mongodb.internal.connection.InternalConnectionPoolSettings; -import com.mongodb.internal.connection.SocketStreamFactory; import com.mongodb.internal.connection.StreamFactory; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; @@ -58,7 +56,7 @@ import static com.mongodb.client.internal.Crypts.createCrypt; import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument; import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver; -import static com.mongodb.internal.connection.StreamFactoryHelper.getStreamFactoryFactoryFromSettings; +import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactory; import static com.mongodb.internal.event.EventListenerHelper.getCommandListener; import static java.lang.String.format; import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation; @@ -270,14 +268,8 @@ private static Cluster createCluster(final MongoClientSettings settings, private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) { SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); - TransportSettings transportSettings = settings.getTransportSettings(); InetAddressResolver inetAddressResolver = getInetAddressResolver(settings); - if (transportSettings == null) { - return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings()); - } else { - return getStreamFactoryFactoryFromSettings(transportSettings, inetAddressResolver) - .create(socketSettings, settings.getSslSettings()); - } + return getSyncStreamFactory(settings, inetAddressResolver, socketSettings); } public Cluster getCluster() { From b9cd7583fe8a6d31a3384c12b2013a94f489424a Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Wed, 25 Sep 2024 11:25:07 -0600 Subject: [PATCH 02/10] Apply suggestions from code review Co-authored-by: Valentin Kovalenko --- .../com/mongodb/connection/AsyncTransportSettings.java | 9 +++++---- .../AsynchronousSocketChannelStreamFactory.java | 2 +- .../AsynchronousSocketChannelStreamFactoryFactory.java | 2 +- .../connection/TlsChannelStreamFactoryFactory.java | 2 +- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java index 65eefbf78ac..fd56d7a2f67 100644 --- a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java @@ -22,19 +22,19 @@ import java.util.concurrent.ExecutorService; /** - * {@link TransportSettings} for an async transport implementation. + * {@link TransportSettings} for a non-Netty-based async transport implementation. * * @since 5.2 */ -public class AsyncTransportSettings extends TransportSettings { +public final class AsyncTransportSettings extends TransportSettings { private final ExecutorService executorService; - public AsyncTransportSettings(final Builder builder) { + private AsyncTransportSettings(final Builder builder) { this.executorService = builder.executorService; } - public static Builder builder() { + static Builder builder() { return new Builder(); } @@ -54,6 +54,7 @@ private Builder() { * @param executorService the executor service * @return this * @see #getExecutorService() + * @see AsynchronousChannelGroup#withThreadPool(ExecutorService) */ public Builder executorService(final ExecutorService executorService) { this.executorService = executorService; diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java index a8324d62034..d39e48990ea 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java @@ -49,7 +49,7 @@ public AsynchronousSocketChannelStreamFactory( this(inetAddressResolver, settings, sslSettings, null); } - public AsynchronousSocketChannelStreamFactory( + AsynchronousSocketChannelStreamFactory( final InetAddressResolver inetAddressResolver, final SocketSettings settings, final SslSettings sslSettings, @Nullable final ExecutorService executorService) { assertFalse(sslSettings.isEnabled()); diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java index 9dd2cbb9fe2..c4e27e4a34f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java @@ -37,7 +37,7 @@ public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver i this(inetAddressResolver, null); } - public AsynchronousSocketChannelStreamFactoryFactory( + AsynchronousSocketChannelStreamFactoryFactory( final InetAddressResolver inetAddressResolver, @Nullable final ExecutorService executorService) { this.inetAddressResolver = inetAddressResolver; diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index de05e773101..daf0d8cecdd 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -72,7 +72,7 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory { /** * Construct a new instance */ - public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver, + TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver, @Nullable final ExecutorService executorService) { this.inetAddressResolver = inetAddressResolver; this.group = new AsynchronousTlsChannelGroup(executorService); From 14b9fee7c90230566811854551f2dab9cdbf931d Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 26 Sep 2024 15:04:20 -0600 Subject: [PATCH 03/10] PR fixes --- .../connection/AsyncTransportSettings.java | 17 ++++++- .../connection/StreamFactoryHelper.java | 2 +- .../AsyncTransportSettingsTest.java | 45 +++++++++++++++++++ .../client/AsyncTransportSettingsTest.java | 32 ++++++------- 4 files changed, 77 insertions(+), 19 deletions(-) create mode 100644 driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java index fd56d7a2f67..e19c70909d0 100644 --- a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java @@ -17,15 +17,20 @@ */ package com.mongodb.connection; +import com.mongodb.annotations.Immutable; import com.mongodb.lang.Nullable; +import java.nio.channels.AsynchronousChannelGroup; import java.util.concurrent.ExecutorService; +import static com.mongodb.assertions.Assertions.notNull; + /** * {@link TransportSettings} for a non-Netty-based async transport implementation. * * @since 5.2 */ +@Immutable public final class AsyncTransportSettings extends TransportSettings { private final ExecutorService executorService; @@ -49,7 +54,8 @@ private Builder() { } /** - * Sets the executor service + * Sets the executor service. This executor service will not be shut + * down by the driver code, and must be shut down by application code. * * @param executorService the executor service * @return this @@ -57,7 +63,7 @@ private Builder() { * @see AsynchronousChannelGroup#withThreadPool(ExecutorService) */ public Builder executorService(final ExecutorService executorService) { - this.executorService = executorService; + this.executorService = notNull("executorService", executorService); return this; } @@ -80,4 +86,11 @@ public AsyncTransportSettings build() { public ExecutorService getExecutorService() { return executorService; } + + @Override + public String toString() { + return "AsyncTransportSettings{" + + "executorService=" + executorService + + '}'; + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java index a82a3d752b5..17372a92be4 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java @@ -38,7 +38,7 @@ public static StreamFactory getSyncStreamFactory(final MongoClientSettings setti if (transportSettings == null) { return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings()); } else if (transportSettings instanceof AsyncTransportSettings) { - throw new MongoClientException("Unsupported async transport settings: " + transportSettings.getClass().getName()); + throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName()); } else if (transportSettings instanceof NettyTransportSettings) { return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings) .create(socketSettings, settings.getSslSettings()); diff --git a/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java b/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java new file mode 100644 index 00000000000..180894ceb78 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.connection; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +class AsyncTransportSettingsTest { + + @Test + public void shouldDefaultAllValuesToNull() { + AsyncTransportSettings settings = TransportSettings.asyncBuilder().build(); + + assertNull(settings.getExecutorService()); + } + + @Test + public void shouldApplySettingsFromBuilder() { + ExecutorService executorService = Executors.newFixedThreadPool(1); + AsyncTransportSettings settings = TransportSettings.asyncBuilder() + .executorService(executorService) + .build(); + + assertEquals(executorService, settings.getExecutorService()); + } +} diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java index efb0a471b4e..3a9a5a4d176 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java @@ -27,44 +27,44 @@ import java.util.concurrent.Executors; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; class AsyncTransportSettingsTest { - @Test - public void shouldDefaultAllValuesToNull() { - AsyncTransportSettings settings = TransportSettings.asyncBuilder().build(); - - assertNull(settings.getExecutorService()); - } @Test - public void shouldApplySettingsFromBuilder() { - ExecutorService executorService = Executors.newFixedThreadPool(1); - AsyncTransportSettings settings = TransportSettings.asyncBuilder() + void testAsyncTransportSettings() { + ExecutorService executorService = spy(Executors.newFixedThreadPool(5)); + AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() .executorService(executorService) .build(); + MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder() + .transportSettings(asyncTransportSettings) + .build(); - assertEquals(executorService, settings.getExecutorService()); + try (MongoClient client = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { + client.listDatabases().first(); + } + verify(executorService, atLeastOnce()).execute(any()); } @Test - void testAsyncTransportSettings() { + void testExternalExecutorNotShutDown() { ExecutorService executorService = spy(Executors.newFixedThreadPool(5)); AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() .executorService(executorService) .build(); MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder() + .applyToSslSettings(builder -> builder.enabled(true)) .transportSettings(asyncTransportSettings) .build(); - try (MongoClient client = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { - client.listDatabases().first(); + try (MongoClient ignored = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { + // ignored } - verify(executorService, atLeastOnce()).execute(any()); + verify(executorService, never()).shutdown(); } } From 7fcbbee0f5b97ae87fb2599d445075c76222997d Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 1 Oct 2024 16:22:03 -0600 Subject: [PATCH 04/10] PR fixes --- .../connection/AsyncTransportSettings.java | 17 ++++--- .../internal/ValueOrExceptionContainer.java | 50 +++++++++++++++++++ .../AsynchronousSocketChannelStream.java | 13 +++-- ...synchronousSocketChannelStreamFactory.java | 11 ++-- ...nousSocketChannelStreamFactoryFactory.java | 18 +++++-- .../connection/StreamFactoryHelper.java | 6 ++- .../async/AsynchronousTlsChannelGroup.java | 7 +-- .../client/AsyncTransportSettingsTest.java | 18 +++++-- 8 files changed, 104 insertions(+), 36 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java index e19c70909d0..e518b470da9 100644 --- a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java @@ -20,7 +20,6 @@ import com.mongodb.annotations.Immutable; import com.mongodb.lang.Nullable; -import java.nio.channels.AsynchronousChannelGroup; import java.util.concurrent.ExecutorService; import static com.mongodb.assertions.Assertions.notNull; @@ -54,13 +53,17 @@ private Builder() { } /** - * Sets the executor service. This executor service will not be shut - * down by the driver code, and must be shut down by application code. + * The executor service, intended to be used exclusively by the mongo + * client. Closing the mongo client will result in orderly shutdown + * of the executor service. + * + *

When TLS is not enabled, see + * {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)} + * for additional requirements for the executor service. * * @param executorService the executor service * @return this * @see #getExecutorService() - * @see AsynchronousChannelGroup#withThreadPool(ExecutorService) */ public Builder executorService(final ExecutorService executorService) { this.executorService = notNull("executorService", executorService); @@ -89,8 +92,8 @@ public ExecutorService getExecutorService() { @Override public String toString() { - return "AsyncTransportSettings{" + - "executorService=" + executorService + - '}'; + return "AsyncTransportSettings{" + + "executorService=" + executorService + + '}'; } } diff --git a/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java b/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java new file mode 100644 index 00000000000..f5b53352430 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java @@ -0,0 +1,50 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal; + +import com.mongodb.internal.function.CheckedSupplier; + +/** + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public class ValueOrExceptionContainer { + private final T value; + private final Exception exception; + + public ValueOrExceptionContainer(final CheckedSupplier supplier) { + T value = null; + Exception exception = null; + try { + value = supplier.get(); + } catch (Exception e) { + exception = e; + } + this.value = value; + this.exception = exception; + } + + public T get() throws Exception { + if (isCompletedExceptionally()) { + throw exception; + } + return value; + } + + public boolean isCompletedExceptionally() { + return exception != null; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java index cd20f2ef34e..012d349bf67 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java @@ -21,6 +21,7 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.AsyncCompletionHandler; import com.mongodb.connection.SocketSettings; +import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; @@ -33,7 +34,6 @@ import java.nio.channels.CompletionHandler; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -49,7 +49,7 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt private final InetAddressResolver inetAddressResolver; private final SocketSettings settings; @Nullable - private final ExecutorService executorService; + private final ValueOrExceptionContainer group; public AsynchronousSocketChannelStream( final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, @@ -60,12 +60,12 @@ public AsynchronousSocketChannelStream( public AsynchronousSocketChannelStream( final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider, - @Nullable final ExecutorService executorService) { + @Nullable final ValueOrExceptionContainer group) { super(serverAddress, settings, bufferProvider); this.serverAddress = serverAddress; this.inetAddressResolver = inetAddressResolver; this.settings = settings; - this.executorService = executorService; + this.group = group; } @Override @@ -91,9 +91,8 @@ private void initializeSocketChannel(final AsyncCompletionHandler handler, try { AsynchronousSocketChannel attemptConnectionChannel; - if (executorService != null) { - AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService); - attemptConnectionChannel = AsynchronousSocketChannel.open(group); + if (group != null) { + attemptConnectionChannel = AsynchronousSocketChannel.open(group.get()); } else { attemptConnectionChannel = AsynchronousSocketChannel.open(); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java index d39e48990ea..7a158d0c810 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java @@ -19,10 +19,11 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; +import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; -import java.util.concurrent.ExecutorService; +import java.nio.channels.AsynchronousChannelGroup; import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.notNull; @@ -35,7 +36,7 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory { private final SocketSettings settings; private final InetAddressResolver inetAddressResolver; @Nullable - private final ExecutorService executorService; + private final ValueOrExceptionContainer group; /** * Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}. @@ -51,17 +52,17 @@ public AsynchronousSocketChannelStreamFactory( AsynchronousSocketChannelStreamFactory( final InetAddressResolver inetAddressResolver, final SocketSettings settings, - final SslSettings sslSettings, @Nullable final ExecutorService executorService) { + final SslSettings sslSettings, @Nullable final ValueOrExceptionContainer group) { assertFalse(sslSettings.isEnabled()); this.inetAddressResolver = inetAddressResolver; this.settings = notNull("settings", settings); - this.executorService = executorService; + this.group = group; } @Override public Stream create(final ServerAddress serverAddress) { return new AsynchronousSocketChannelStream( - serverAddress, inetAddressResolver, settings, bufferProvider, executorService); + serverAddress, inetAddressResolver, settings, bufferProvider, group); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java index c4e27e4a34f..b4009f7ce36 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java @@ -18,10 +18,11 @@ import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; +import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; -import java.util.concurrent.ExecutorService; +import java.nio.channels.AsynchronousChannelGroup; /** * A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams. @@ -31,7 +32,7 @@ public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory { private final InetAddressResolver inetAddressResolver; @Nullable - private final ExecutorService executorService; + private final ValueOrExceptionContainer group; public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { this(inetAddressResolver, null); @@ -39,18 +40,25 @@ public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver i AsynchronousSocketChannelStreamFactoryFactory( final InetAddressResolver inetAddressResolver, - @Nullable final ExecutorService executorService) { + @Nullable final ValueOrExceptionContainer group) { this.inetAddressResolver = inetAddressResolver; - this.executorService = executorService; + this.group = group; } @Override public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { return new AsynchronousSocketChannelStreamFactory( - inetAddressResolver, socketSettings, sslSettings, executorService); + inetAddressResolver, socketSettings, sslSettings, group); } @Override public void close() { + if (group != null && !group.isCompletedExceptionally()) { + try { + group.get().shutdown(); + } catch (Exception e) { + // will not occur, since it was not completed exceptionally + } + } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java index 17372a92be4..15883c88b60 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java @@ -22,9 +22,11 @@ import com.mongodb.connection.NettyTransportSettings; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.TransportSettings; +import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory; import com.mongodb.spi.dns.InetAddressResolver; +import java.nio.channels.AsynchronousChannelGroup; import java.util.concurrent.ExecutorService; /** @@ -57,7 +59,9 @@ public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClien if (settings.getSslSettings().isEnabled()) { return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService); } else { - return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, executorService); + ValueOrExceptionContainer group = new ValueOrExceptionContainer<>( + () -> AsynchronousChannelGroup.withThreadPool(executorService)); + return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group); } } else if (transportSettings instanceof NettyTransportSettings) { return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings); diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java index a4e088f90ca..57db0df66e8 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java @@ -73,7 +73,6 @@ public class AsynchronousTlsChannelGroup { private static final int queueLengthMultiplier = 32; private static final AtomicInteger globalGroupCount = new AtomicInteger(); - private final boolean executorIsExternal; class RegisteredSocket { @@ -210,11 +209,9 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi } timeoutExecutor.setRemoveOnCancelPolicy(true); if (executorService != null) { - this.executorIsExternal = true; this.executor = executorService; } else { int nThreads = Runtime.getRuntime().availableProcessors(); - this.executorIsExternal = false; this.executor = new ThreadPoolExecutor( nThreads, nThreads, @@ -424,9 +421,7 @@ private void loop() { } catch (Throwable e) { LOGGER.error("error in selector loop", e); } finally { - if (!executorIsExternal) { - executor.shutdown(); - } + executor.shutdown(); // use shutdownNow to stop delayed tasks timeoutExecutor.shutdownNow(); try { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java index 3a9a5a4d176..451130eb488 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java @@ -22,6 +22,8 @@ import com.mongodb.connection.TransportSettings; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,8 +31,8 @@ import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; class AsyncTransportSettingsTest { @@ -51,20 +53,26 @@ void testAsyncTransportSettings() { verify(executorService, atLeastOnce()).execute(any()); } - @Test - void testExternalExecutorNotShutDown() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExternalExecutorWasShutDown(final boolean tlsEnabled) { ExecutorService executorService = spy(Executors.newFixedThreadPool(5)); AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() .executorService(executorService) .build(); MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder() - .applyToSslSettings(builder -> builder.enabled(true)) + .applyToSslSettings(builder -> builder.enabled(tlsEnabled)) .transportSettings(asyncTransportSettings) .build(); try (MongoClient ignored = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { // ignored } - verify(executorService, never()).shutdown(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + verify(executorService, times(1)).shutdown(); } } From 2b717ccbf068069bb4c972e07842b31be29a2d3b Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Wed, 13 Nov 2024 14:30:35 -0700 Subject: [PATCH 05/10] Apply suggestions from code review Co-authored-by: Valentin Kovalenko --- .../com/mongodb/connection/AsyncTransportSettings.java | 4 ++-- .../mongodb/internal/ValueOrExceptionContainer.java | 2 +- .../connection/AsynchronousSocketChannelStream.java | 10 ++++------ 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java index e518b470da9..85521c6a4ef 100644 --- a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java @@ -54,10 +54,10 @@ private Builder() { /** * The executor service, intended to be used exclusively by the mongo - * client. Closing the mongo client will result in orderly shutdown + * client. Closing the mongo client will result in {@linkplain ExecutorService#shutdown() orderly shutdown} * of the executor service. * - *

When TLS is not enabled, see + *

When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see * {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)} * for additional requirements for the executor service. * diff --git a/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java b/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java index f5b53352430..0992174a734 100644 --- a/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java +++ b/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java @@ -21,7 +21,7 @@ /** *

This class is not part of the public API and may be removed or changed at any time

*/ -public class ValueOrExceptionContainer { +public final class ValueOrExceptionContainer { private final T value; private final Exception exception; diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java index 012d349bf67..b74fa62cdef 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java @@ -51,7 +51,7 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt @Nullable private final ValueOrExceptionContainer group; - public AsynchronousSocketChannelStream( + AsynchronousSocketChannelStream( final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) { this(serverAddress, inetAddressResolver, settings, bufferProvider, null); @@ -91,11 +91,9 @@ private void initializeSocketChannel(final AsyncCompletionHandler handler, try { AsynchronousSocketChannel attemptConnectionChannel; - if (group != null) { - attemptConnectionChannel = AsynchronousSocketChannel.open(group.get()); - } else { - attemptConnectionChannel = AsynchronousSocketChannel.open(); - } + attemptConnectionChannel = group == null + ? AsynchronousSocketChannel.open() + : AsynchronousSocketChannel.open(group.get()); attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); if (settings.getReceiveBufferSize() > 0) { From 6491f2f476a3868f54c40ac854660e3094930144 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Wed, 13 Nov 2024 16:12:06 -0700 Subject: [PATCH 06/10] PR fixes --- .../com/mongodb/connection/AsyncTransportSettings.java | 3 +-- .../com/mongodb/connection/NettyTransportSettings.java | 3 +-- .../com/mongodb/internal/ValueOrExceptionContainer.java | 4 ++-- .../AsynchronousSocketChannelStreamFactoryFactory.java | 8 ++++++-- .../client/AsyncTransportSettingsTest.java | 7 ++----- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java index 85521c6a4ef..8e259392313 100644 --- a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java @@ -17,7 +17,6 @@ */ package com.mongodb.connection; -import com.mongodb.annotations.Immutable; import com.mongodb.lang.Nullable; import java.util.concurrent.ExecutorService; @@ -26,10 +25,10 @@ /** * {@link TransportSettings} for a non-Netty-based async transport implementation. + * Shallowly immutable. * * @since 5.2 */ -@Immutable public final class AsyncTransportSettings extends TransportSettings { private final ExecutorService executorService; diff --git a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java index e55f1adc111..cb3a7c7c090 100644 --- a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java @@ -16,7 +16,6 @@ package com.mongodb.connection; -import com.mongodb.annotations.Immutable; import com.mongodb.lang.Nullable; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoopGroup; @@ -33,10 +32,10 @@ /** * {@code TransportSettings} for a Netty-based transport implementation. + * Shallowly immutable. * * @since 4.11 */ -@Immutable public final class NettyTransportSettings extends TransportSettings { private final EventLoopGroup eventLoopGroup; diff --git a/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java b/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java index 0992174a734..e0121b8a47b 100644 --- a/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java +++ b/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java @@ -38,13 +38,13 @@ public ValueOrExceptionContainer(final CheckedSupplier supplier) { } public T get() throws Exception { - if (isCompletedExceptionally()) { + if (containsException()) { throw exception; } return value; } - public boolean isCompletedExceptionally() { + public boolean containsException() { return exception != null; } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java index b4009f7ce36..859d845de0d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java @@ -16,6 +16,7 @@ package com.mongodb.internal.connection; +import com.mongodb.assertions.Assertions; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; import com.mongodb.internal.ValueOrExceptionContainer; @@ -53,12 +54,15 @@ public StreamFactory create(final SocketSettings socketSettings, final SslSettin @Override public void close() { - if (group != null && !group.isCompletedExceptionally()) { + if (group != null && !group.containsException()) { + AsynchronousChannelGroup asynchronousChannelGroup = null; try { - group.get().shutdown(); + asynchronousChannelGroup = group.get(); } catch (Exception e) { // will not occur, since it was not completed exceptionally + Assertions.fail(); } + asynchronousChannelGroup.shutdown(); } } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java index 451130eb488..a65d8a90709 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java @@ -16,6 +16,7 @@ package com.mongodb.reactivestreams.client; +import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; import com.mongodb.connection.AsyncTransportSettings; @@ -68,11 +69,7 @@ void testExternalExecutorWasShutDown(final boolean tlsEnabled) { try (MongoClient ignored = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { // ignored } - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + ClusterFixture.sleep(100); verify(executorService, times(1)).shutdown(); } } From 9f3d8a99efaee5ca7be60d70eed3d8c92f4b1675 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 14 Nov 2024 10:02:19 -0700 Subject: [PATCH 07/10] PR fixes --- .../internal/ValueOrExceptionContainer.java | 50 ------------------- .../AsynchronousSocketChannelStream.java | 7 ++- ...synchronousSocketChannelStreamFactory.java | 5 +- ...nousSocketChannelStreamFactoryFactory.java | 17 ++----- .../connection/StreamFactoryHelper.java | 10 ++-- 5 files changed, 16 insertions(+), 73 deletions(-) delete mode 100644 driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java diff --git a/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java b/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java deleted file mode 100644 index e0121b8a47b..00000000000 --- a/driver-core/src/main/com/mongodb/internal/ValueOrExceptionContainer.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.internal; - -import com.mongodb.internal.function.CheckedSupplier; - -/** - *

This class is not part of the public API and may be removed or changed at any time

- */ -public final class ValueOrExceptionContainer { - private final T value; - private final Exception exception; - - public ValueOrExceptionContainer(final CheckedSupplier supplier) { - T value = null; - Exception exception = null; - try { - value = supplier.get(); - } catch (Exception e) { - exception = e; - } - this.value = value; - this.exception = exception; - } - - public T get() throws Exception { - if (containsException()) { - throw exception; - } - return value; - } - - public boolean containsException() { - return exception != null; - } -} diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java index b74fa62cdef..c60981c115e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java @@ -21,7 +21,6 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.AsyncCompletionHandler; import com.mongodb.connection.SocketSettings; -import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; @@ -49,7 +48,7 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt private final InetAddressResolver inetAddressResolver; private final SocketSettings settings; @Nullable - private final ValueOrExceptionContainer group; + private final AsynchronousChannelGroup group; AsynchronousSocketChannelStream( final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, @@ -60,7 +59,7 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt public AsynchronousSocketChannelStream( final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider, - @Nullable final ValueOrExceptionContainer group) { + @Nullable final AsynchronousChannelGroup group) { super(serverAddress, settings, bufferProvider); this.serverAddress = serverAddress; this.inetAddressResolver = inetAddressResolver; @@ -93,7 +92,7 @@ private void initializeSocketChannel(final AsyncCompletionHandler handler, AsynchronousSocketChannel attemptConnectionChannel; attemptConnectionChannel = group == null ? AsynchronousSocketChannel.open() - : AsynchronousSocketChannel.open(group.get()); + : AsynchronousSocketChannel.open(group); attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); if (settings.getReceiveBufferSize() > 0) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java index 7a158d0c810..1ea15abe59d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java @@ -19,7 +19,6 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; -import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; @@ -36,7 +35,7 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory { private final SocketSettings settings; private final InetAddressResolver inetAddressResolver; @Nullable - private final ValueOrExceptionContainer group; + private final AsynchronousChannelGroup group; /** * Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}. @@ -52,7 +51,7 @@ public AsynchronousSocketChannelStreamFactory( AsynchronousSocketChannelStreamFactory( final InetAddressResolver inetAddressResolver, final SocketSettings settings, - final SslSettings sslSettings, @Nullable final ValueOrExceptionContainer group) { + final SslSettings sslSettings, @Nullable final AsynchronousChannelGroup group) { assertFalse(sslSettings.isEnabled()); this.inetAddressResolver = inetAddressResolver; this.settings = notNull("settings", settings); diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java index 859d845de0d..8c5a8f654c5 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java @@ -16,10 +16,8 @@ package com.mongodb.internal.connection; -import com.mongodb.assertions.Assertions; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; -import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; @@ -33,7 +31,7 @@ public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory { private final InetAddressResolver inetAddressResolver; @Nullable - private final ValueOrExceptionContainer group; + private final AsynchronousChannelGroup group; public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { this(inetAddressResolver, null); @@ -41,7 +39,7 @@ public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver i AsynchronousSocketChannelStreamFactoryFactory( final InetAddressResolver inetAddressResolver, - @Nullable final ValueOrExceptionContainer group) { + @Nullable final AsynchronousChannelGroup group) { this.inetAddressResolver = inetAddressResolver; this.group = group; } @@ -54,15 +52,8 @@ public StreamFactory create(final SocketSettings socketSettings, final SslSettin @Override public void close() { - if (group != null && !group.containsException()) { - AsynchronousChannelGroup asynchronousChannelGroup = null; - try { - asynchronousChannelGroup = group.get(); - } catch (Exception e) { - // will not occur, since it was not completed exceptionally - Assertions.fail(); - } - asynchronousChannelGroup.shutdown(); + if (group != null) { + group.shutdown(); } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java index 15883c88b60..79951c79f4c 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java @@ -22,10 +22,10 @@ import com.mongodb.connection.NettyTransportSettings; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.TransportSettings; -import com.mongodb.internal.ValueOrExceptionContainer; import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory; import com.mongodb.spi.dns.InetAddressResolver; +import java.io.IOException; import java.nio.channels.AsynchronousChannelGroup; import java.util.concurrent.ExecutorService; @@ -59,8 +59,12 @@ public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClien if (settings.getSslSettings().isEnabled()) { return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService); } else { - ValueOrExceptionContainer group = new ValueOrExceptionContainer<>( - () -> AsynchronousChannelGroup.withThreadPool(executorService)); + AsynchronousChannelGroup group; + try { + group = AsynchronousChannelGroup.withThreadPool(executorService); + } catch (IOException e) { + throw new MongoClientException("Unable to create an asynchronous channel group", e); + } return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group); } } else if (transportSettings instanceof NettyTransportSettings) { From e144364e1f8b678a94fd441784a0a945dc0047a9 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 14 Nov 2024 11:56:24 -0700 Subject: [PATCH 08/10] PR fix --- .../mongodb/internal/connection/StreamFactoryHelper.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java index 79951c79f4c..1100a4e27f1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java @@ -58,15 +58,16 @@ public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClien : ((AsyncTransportSettings) transportSettings).getExecutorService(); if (settings.getSslSettings().isEnabled()) { return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService); - } else { - AsynchronousChannelGroup group; + } + AsynchronousChannelGroup group = null; + if (executorService != null) { try { group = AsynchronousChannelGroup.withThreadPool(executorService); } catch (IOException e) { throw new MongoClientException("Unable to create an asynchronous channel group", e); } - return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group); } + return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group); } else if (transportSettings instanceof NettyTransportSettings) { return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings); } else { From 43c2f94b7c337511227a265d68693e3e25828d10 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Tue, 19 Nov 2024 11:34:02 -0700 Subject: [PATCH 09/10] Update driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java Co-authored-by: Valentin Kovalenko --- .../reactivestreams/client/AsyncTransportSettingsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java index a65d8a90709..cc522d1ac80 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java @@ -56,6 +56,7 @@ void testAsyncTransportSettings() { @ParameterizedTest @ValueSource(booleans = {true, false}) + @SuppressWarnings("try") void testExternalExecutorWasShutDown(final boolean tlsEnabled) { ExecutorService executorService = spy(Executors.newFixedThreadPool(5)); AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() From e373347d6ac4c6e67cb5467499e0becdfccae389 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Wed, 20 Nov 2024 08:29:30 -0700 Subject: [PATCH 10/10] PR fix --- .../client/AsyncTransportSettingsTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java index cc522d1ac80..95201cc0890 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java @@ -16,7 +16,6 @@ package com.mongodb.reactivestreams.client; -import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; import com.mongodb.connection.AsyncTransportSettings; @@ -28,12 +27,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; class AsyncTransportSettingsTest { @@ -57,8 +57,8 @@ void testAsyncTransportSettings() { @ParameterizedTest @ValueSource(booleans = {true, false}) @SuppressWarnings("try") - void testExternalExecutorWasShutDown(final boolean tlsEnabled) { - ExecutorService executorService = spy(Executors.newFixedThreadPool(5)); + void testExternalExecutorWasShutDown(final boolean tlsEnabled) throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(5); AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() .executorService(executorService) .build(); @@ -70,7 +70,7 @@ void testExternalExecutorWasShutDown(final boolean tlsEnabled) { try (MongoClient ignored = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { // ignored } - ClusterFixture.sleep(100); - verify(executorService, times(1)).shutdown(); + + assertTrue(executorService.awaitTermination(100, TimeUnit.MILLISECONDS)); } }