Skip to content

Commit 1b6f13a

Browse files
katcharovstIncMale
andauthored
Add AsyncTransportSettings, ExecutorService (#1489)
JAVA-5505 Co-authored-by: Valentin Kovalenko <[email protected]>
1 parent 600f2c6 commit 1b6f13a

File tree

17 files changed

+419
-62
lines changed

17 files changed

+419
-62
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*
17+
*/
18+
package com.mongodb.connection;
19+
20+
import com.mongodb.lang.Nullable;
21+
22+
import java.util.concurrent.ExecutorService;
23+
24+
import static com.mongodb.assertions.Assertions.notNull;
25+
26+
/**
27+
* {@link TransportSettings} for a non-<a href="http://netty.io/">Netty</a>-based async transport implementation.
28+
* Shallowly immutable.
29+
*
30+
* @since 5.2
31+
*/
32+
public final class AsyncTransportSettings extends TransportSettings {
33+
34+
private final ExecutorService executorService;
35+
36+
private AsyncTransportSettings(final Builder builder) {
37+
this.executorService = builder.executorService;
38+
}
39+
40+
static Builder builder() {
41+
return new Builder();
42+
}
43+
44+
/**
45+
* A builder for an instance of {@link AsyncTransportSettings}
46+
*/
47+
public static final class Builder {
48+
49+
private ExecutorService executorService;
50+
51+
private Builder() {
52+
}
53+
54+
/**
55+
* The executor service, intended to be used exclusively by the mongo
56+
* client. Closing the mongo client will result in {@linkplain ExecutorService#shutdown() orderly shutdown}
57+
* of the executor service.
58+
*
59+
* <p>When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see
60+
* {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)}
61+
* for additional requirements for the executor service.
62+
*
63+
* @param executorService the executor service
64+
* @return this
65+
* @see #getExecutorService()
66+
*/
67+
public Builder executorService(final ExecutorService executorService) {
68+
this.executorService = notNull("executorService", executorService);
69+
return this;
70+
}
71+
72+
/**
73+
* Build an instance of {@link AsyncTransportSettings}
74+
* @return an instance of {@link AsyncTransportSettings}
75+
*/
76+
public AsyncTransportSettings build() {
77+
return new AsyncTransportSettings(this);
78+
}
79+
}
80+
81+
/**
82+
* Gets the executor service
83+
*
84+
* @return the executor service
85+
* @see Builder#executorService(ExecutorService)
86+
*/
87+
@Nullable
88+
public ExecutorService getExecutorService() {
89+
return executorService;
90+
}
91+
92+
@Override
93+
public String toString() {
94+
return "AsyncTransportSettings{"
95+
+ "executorService=" + executorService
96+
+ '}';
97+
}
98+
}

driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.mongodb.connection;
1818

19-
import com.mongodb.annotations.Immutable;
2019
import com.mongodb.lang.Nullable;
2120
import io.netty.buffer.ByteBufAllocator;
2221
import io.netty.channel.EventLoopGroup;
@@ -33,10 +32,10 @@
3332

3433
/**
3534
* {@code TransportSettings} for a <a href="http://netty.io/">Netty</a>-based transport implementation.
35+
* Shallowly immutable.
3636
*
3737
* @since 4.11
3838
*/
39-
@Immutable
4039
public final class NettyTransportSettings extends TransportSettings {
4140

4241
private final EventLoopGroup eventLoopGroup;
@@ -137,7 +136,7 @@ public Builder sslContext(final SslContext sslContext) {
137136
/**
138137
* Build an instance of {@code NettyTransportSettings}.
139138
*
140-
* @return factory for {@code NettyTransportSettings}
139+
* @return an instance of {@code NettyTransportSettings}
141140
*/
142141
public NettyTransportSettings build() {
143142
return new NettyTransportSettings(this);

driver-core/src/main/com/mongodb/connection/TransportSettings.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,14 @@ public abstract class TransportSettings {
3535
public static NettyTransportSettings.Builder nettyBuilder() {
3636
return NettyTransportSettings.builder();
3737
}
38+
39+
/**
40+
* A builder for {@link AsyncTransportSettings}.
41+
*
42+
* @return a builder for {@link AsyncTransportSettings}
43+
* @since 5.2
44+
*/
45+
public static AsyncTransportSettings.Builder asyncBuilder() {
46+
return AsyncTransportSettings.builder();
47+
}
3848
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.net.SocketAddress;
2929
import java.net.StandardSocketOptions;
3030
import java.nio.ByteBuffer;
31+
import java.nio.channels.AsynchronousChannelGroup;
3132
import java.nio.channels.AsynchronousSocketChannel;
3233
import java.nio.channels.CompletionHandler;
3334
import java.util.LinkedList;
@@ -46,13 +47,24 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt
4647
private final ServerAddress serverAddress;
4748
private final InetAddressResolver inetAddressResolver;
4849
private final SocketSettings settings;
50+
@Nullable
51+
private final AsynchronousChannelGroup group;
4952

50-
public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
53+
AsynchronousSocketChannelStream(
54+
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
5155
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) {
56+
this(serverAddress, inetAddressResolver, settings, bufferProvider, null);
57+
}
58+
59+
public AsynchronousSocketChannelStream(
60+
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
61+
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider,
62+
@Nullable final AsynchronousChannelGroup group) {
5263
super(serverAddress, settings, bufferProvider);
5364
this.serverAddress = serverAddress;
5465
this.inetAddressResolver = inetAddressResolver;
5566
this.settings = settings;
67+
this.group = group;
5668
}
5769

5870
@Override
@@ -77,7 +89,10 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
7789
SocketAddress socketAddress = socketAddressQueue.poll();
7890

7991
try {
80-
AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open();
92+
AsynchronousSocketChannel attemptConnectionChannel;
93+
attemptConnectionChannel = group == null
94+
? AsynchronousSocketChannel.open()
95+
: AsynchronousSocketChannel.open(group);
8196
attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
8297
attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
8398
if (settings.getReceiveBufferSize() > 0) {

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.connection.SocketSettings;
2121
import com.mongodb.connection.SslSettings;
22+
import com.mongodb.lang.Nullable;
2223
import com.mongodb.spi.dns.InetAddressResolver;
2324

25+
import java.nio.channels.AsynchronousChannelGroup;
26+
2427
import static com.mongodb.assertions.Assertions.assertFalse;
2528
import static com.mongodb.assertions.Assertions.notNull;
2629

@@ -31,23 +34,34 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
3134
private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT;
3235
private final SocketSettings settings;
3336
private final InetAddressResolver inetAddressResolver;
37+
@Nullable
38+
private final AsynchronousChannelGroup group;
3439

3540
/**
3641
* Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}.
3742
*
3843
* @param settings the settings for the connection to a MongoDB server
3944
* @param sslSettings the settings for connecting via SSL
4045
*/
41-
public AsynchronousSocketChannelStreamFactory(final InetAddressResolver inetAddressResolver, final SocketSettings settings,
46+
public AsynchronousSocketChannelStreamFactory(
47+
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
4248
final SslSettings sslSettings) {
49+
this(inetAddressResolver, settings, sslSettings, null);
50+
}
51+
52+
AsynchronousSocketChannelStreamFactory(
53+
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
54+
final SslSettings sslSettings, @Nullable final AsynchronousChannelGroup group) {
4355
assertFalse(sslSettings.isEnabled());
4456
this.inetAddressResolver = inetAddressResolver;
4557
this.settings = notNull("settings", settings);
58+
this.group = group;
4659
}
4760

4861
@Override
4962
public Stream create(final ServerAddress serverAddress) {
50-
return new AsynchronousSocketChannelStream(serverAddress, inetAddressResolver, settings, bufferProvider);
63+
return new AsynchronousSocketChannelStream(
64+
serverAddress, inetAddressResolver, settings, bufferProvider, group);
5165
}
5266

5367
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,42 @@
1818

1919
import com.mongodb.connection.SocketSettings;
2020
import com.mongodb.connection.SslSettings;
21+
import com.mongodb.lang.Nullable;
2122
import com.mongodb.spi.dns.InetAddressResolver;
2223

24+
import java.nio.channels.AsynchronousChannelGroup;
25+
2326
/**
2427
* A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams.
2528
*
2629
* @see java.nio.channels.AsynchronousSocketChannel
2730
*/
2831
public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory {
2932
private final InetAddressResolver inetAddressResolver;
33+
@Nullable
34+
private final AsynchronousChannelGroup group;
3035

3136
public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
37+
this(inetAddressResolver, null);
38+
}
39+
40+
AsynchronousSocketChannelStreamFactoryFactory(
41+
final InetAddressResolver inetAddressResolver,
42+
@Nullable final AsynchronousChannelGroup group) {
3243
this.inetAddressResolver = inetAddressResolver;
44+
this.group = group;
3345
}
3446

3547
@Override
3648
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
37-
return new AsynchronousSocketChannelStreamFactory(inetAddressResolver, socketSettings, sslSettings);
49+
return new AsynchronousSocketChannelStreamFactory(
50+
inetAddressResolver, socketSettings, sslSettings, group);
3851
}
3952

4053
@Override
4154
public void close() {
55+
if (group != null) {
56+
group.shutdown();
57+
}
4258
}
4359
}

driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,72 @@
1717
package com.mongodb.internal.connection;
1818

1919
import com.mongodb.MongoClientException;
20+
import com.mongodb.MongoClientSettings;
21+
import com.mongodb.connection.AsyncTransportSettings;
2022
import com.mongodb.connection.NettyTransportSettings;
23+
import com.mongodb.connection.SocketSettings;
2124
import com.mongodb.connection.TransportSettings;
2225
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
2326
import com.mongodb.spi.dns.InetAddressResolver;
2427

28+
import java.io.IOException;
29+
import java.nio.channels.AsynchronousChannelGroup;
30+
import java.util.concurrent.ExecutorService;
31+
2532
/**
2633
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2734
*/
2835
public final class StreamFactoryHelper {
29-
public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings,
36+
37+
public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings,
38+
final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) {
39+
TransportSettings transportSettings = settings.getTransportSettings();
40+
if (transportSettings == null) {
41+
return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
42+
} else if (transportSettings instanceof AsyncTransportSettings) {
43+
throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName());
44+
} else if (transportSettings instanceof NettyTransportSettings) {
45+
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings)
46+
.create(socketSettings, settings.getSslSettings());
47+
} else {
48+
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
49+
}
50+
}
51+
52+
public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClientSettings settings,
3053
final InetAddressResolver inetAddressResolver) {
31-
if (transportSettings instanceof NettyTransportSettings) {
32-
return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings)
33-
.inetAddressResolver(inetAddressResolver)
34-
.build();
54+
TransportSettings transportSettings = settings.getTransportSettings();
55+
if (transportSettings == null || transportSettings instanceof AsyncTransportSettings) {
56+
ExecutorService executorService = transportSettings == null
57+
? null
58+
: ((AsyncTransportSettings) transportSettings).getExecutorService();
59+
if (settings.getSslSettings().isEnabled()) {
60+
return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService);
61+
}
62+
AsynchronousChannelGroup group = null;
63+
if (executorService != null) {
64+
try {
65+
group = AsynchronousChannelGroup.withThreadPool(executorService);
66+
} catch (IOException e) {
67+
throw new MongoClientException("Unable to create an asynchronous channel group", e);
68+
}
69+
}
70+
return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group);
71+
} else if (transportSettings instanceof NettyTransportSettings) {
72+
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);
3573
} else {
3674
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
3775
}
3876
}
3977

78+
private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
79+
final NettyTransportSettings transportSettings) {
80+
return NettyStreamFactoryFactory.builder()
81+
.applySettings(transportSettings)
82+
.inetAddressResolver(inetAddressResolver)
83+
.build();
84+
}
85+
4086
private StreamFactoryHelper() {
4187
}
4288
}

driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.security.NoSuchAlgorithmException;
4747
import java.util.Iterator;
4848
import java.util.concurrent.ConcurrentLinkedDeque;
49+
import java.util.concurrent.ExecutorService;
4950
import java.util.concurrent.Future;
5051
import java.util.concurrent.TimeUnit;
5152

@@ -71,13 +72,18 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory {
7172
/**
7273
* Construct a new instance
7374
*/
74-
public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
75+
TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
76+
@Nullable final ExecutorService executorService) {
7577
this.inetAddressResolver = inetAddressResolver;
76-
this.group = new AsynchronousTlsChannelGroup();
78+
this.group = new AsynchronousTlsChannelGroup(executorService);
7779
selectorMonitor = new SelectorMonitor();
7880
selectorMonitor.start();
7981
}
8082

83+
public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
84+
this(inetAddressResolver, null);
85+
}
86+
8187
@Override
8288
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
8389
assertTrue(sslSettings.isEnabled());

0 commit comments

Comments
 (0)