Skip to content

Commit f493df0

Browse files
committed
Add AsyncTransportSettings, ExecutorService
1 parent fc7084d commit f493df0

File tree

16 files changed

+349
-61
lines changed

16 files changed

+349
-61
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*
17+
*/
18+
package com.mongodb.connection;
19+
20+
import com.mongodb.lang.Nullable;
21+
22+
import java.util.concurrent.ExecutorService;
23+
24+
/**
25+
* {@link TransportSettings} for an async transport implementation.
26+
*
27+
* @since 5.2
28+
*/
29+
public class AsyncTransportSettings extends TransportSettings {
30+
31+
private final ExecutorService executorService;
32+
33+
public AsyncTransportSettings(final Builder builder) {
34+
this.executorService = builder.executorService;
35+
}
36+
37+
public static Builder builder() {
38+
return new Builder();
39+
}
40+
41+
/**
42+
* A builder for an instance of {@link AsyncTransportSettings}
43+
*/
44+
public static final class Builder {
45+
46+
private ExecutorService executorService;
47+
48+
private Builder() {
49+
}
50+
51+
/**
52+
* Sets the executor service
53+
*
54+
* @param executorService the executor service
55+
* @return this
56+
* @see #getExecutorService()
57+
*/
58+
public Builder executorService(final ExecutorService executorService) {
59+
this.executorService = executorService;
60+
return this;
61+
}
62+
63+
/**
64+
* Build an instance of {@link AsyncTransportSettings}
65+
* @return an instance of {@link AsyncTransportSettings}
66+
*/
67+
public AsyncTransportSettings build() {
68+
return new AsyncTransportSettings(this);
69+
}
70+
}
71+
72+
/**
73+
* Gets the executor service
74+
*
75+
* @return the executor service
76+
* @see Builder#executorService(ExecutorService)
77+
*/
78+
@Nullable
79+
public ExecutorService getExecutorService() {
80+
return executorService;
81+
}
82+
}

driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public Builder sslContext(final SslContext sslContext) {
137137
/**
138138
* Build an instance of {@code NettyTransportSettings}.
139139
*
140-
* @return factory for {@code NettyTransportSettings}
140+
* @return an instance of {@code NettyTransportSettings}
141141
*/
142142
public NettyTransportSettings build() {
143143
return new NettyTransportSettings(this);

driver-core/src/main/com/mongodb/connection/TransportSettings.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,14 @@ public abstract class TransportSettings {
3535
public static NettyTransportSettings.Builder nettyBuilder() {
3636
return NettyTransportSettings.builder();
3737
}
38+
39+
/**
40+
* A builder for {@link AsyncTransportSettings}.
41+
*
42+
* @return a builder for {@link AsyncTransportSettings}
43+
* @since 5.2
44+
*/
45+
public static AsyncTransportSettings.Builder asyncBuilder() {
46+
return AsyncTransportSettings.builder();
47+
}
3848
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@
2828
import java.net.SocketAddress;
2929
import java.net.StandardSocketOptions;
3030
import java.nio.ByteBuffer;
31+
import java.nio.channels.AsynchronousChannelGroup;
3132
import java.nio.channels.AsynchronousSocketChannel;
3233
import java.nio.channels.CompletionHandler;
3334
import java.util.LinkedList;
3435
import java.util.Queue;
36+
import java.util.concurrent.ExecutorService;
3537
import java.util.concurrent.Future;
3638
import java.util.concurrent.TimeUnit;
3739
import java.util.concurrent.atomic.AtomicReference;
@@ -46,13 +48,24 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt
4648
private final ServerAddress serverAddress;
4749
private final InetAddressResolver inetAddressResolver;
4850
private final SocketSettings settings;
51+
@Nullable
52+
private final ExecutorService executorService;
4953

50-
public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
54+
public AsynchronousSocketChannelStream(
55+
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
5156
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) {
57+
this(serverAddress, inetAddressResolver, settings, bufferProvider, null);
58+
}
59+
60+
public AsynchronousSocketChannelStream(
61+
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
62+
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider,
63+
@Nullable final ExecutorService executorService) {
5264
super(serverAddress, settings, bufferProvider);
5365
this.serverAddress = serverAddress;
5466
this.inetAddressResolver = inetAddressResolver;
5567
this.settings = settings;
68+
this.executorService = executorService;
5669
}
5770

5871
@Override
@@ -77,7 +90,13 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
7790
SocketAddress socketAddress = socketAddressQueue.poll();
7891

7992
try {
80-
AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open();
93+
AsynchronousSocketChannel attemptConnectionChannel;
94+
if (executorService != null) {
95+
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executorService);
96+
attemptConnectionChannel = AsynchronousSocketChannel.open(group);
97+
} else {
98+
attemptConnectionChannel = AsynchronousSocketChannel.open();
99+
}
81100
attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
82101
attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
83102
if (settings.getReceiveBufferSize() > 0) {

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.connection.SocketSettings;
2121
import com.mongodb.connection.SslSettings;
22+
import com.mongodb.lang.Nullable;
2223
import com.mongodb.spi.dns.InetAddressResolver;
2324

25+
import java.util.concurrent.ExecutorService;
26+
2427
import static com.mongodb.assertions.Assertions.assertFalse;
2528
import static com.mongodb.assertions.Assertions.notNull;
2629

@@ -31,23 +34,34 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
3134
private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT;
3235
private final SocketSettings settings;
3336
private final InetAddressResolver inetAddressResolver;
37+
@Nullable
38+
private final ExecutorService executorService;
3439

3540
/**
3641
* Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}.
3742
*
3843
* @param settings the settings for the connection to a MongoDB server
3944
* @param sslSettings the settings for connecting via SSL
4045
*/
41-
public AsynchronousSocketChannelStreamFactory(final InetAddressResolver inetAddressResolver, final SocketSettings settings,
46+
public AsynchronousSocketChannelStreamFactory(
47+
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
4248
final SslSettings sslSettings) {
49+
this(inetAddressResolver, settings, sslSettings, null);
50+
}
51+
52+
public AsynchronousSocketChannelStreamFactory(
53+
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
54+
final SslSettings sslSettings, @Nullable final ExecutorService executorService) {
4355
assertFalse(sslSettings.isEnabled());
4456
this.inetAddressResolver = inetAddressResolver;
4557
this.settings = notNull("settings", settings);
58+
this.executorService = executorService;
4659
}
4760

4861
@Override
4962
public Stream create(final ServerAddress serverAddress) {
50-
return new AsynchronousSocketChannelStream(serverAddress, inetAddressResolver, settings, bufferProvider);
63+
return new AsynchronousSocketChannelStream(
64+
serverAddress, inetAddressResolver, settings, bufferProvider, executorService);
5165
}
5266

5367
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,36 @@
1818

1919
import com.mongodb.connection.SocketSettings;
2020
import com.mongodb.connection.SslSettings;
21+
import com.mongodb.lang.Nullable;
2122
import com.mongodb.spi.dns.InetAddressResolver;
2223

24+
import java.util.concurrent.ExecutorService;
25+
2326
/**
2427
* A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams.
2528
*
2629
* @see java.nio.channels.AsynchronousSocketChannel
2730
*/
2831
public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory {
2932
private final InetAddressResolver inetAddressResolver;
33+
@Nullable
34+
private final ExecutorService executorService;
3035

3136
public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
37+
this(inetAddressResolver, null);
38+
}
39+
40+
public AsynchronousSocketChannelStreamFactoryFactory(
41+
final InetAddressResolver inetAddressResolver,
42+
@Nullable final ExecutorService executorService) {
3243
this.inetAddressResolver = inetAddressResolver;
44+
this.executorService = executorService;
3345
}
3446

3547
@Override
3648
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
37-
return new AsynchronousSocketChannelStreamFactory(inetAddressResolver, socketSettings, sslSettings);
49+
return new AsynchronousSocketChannelStreamFactory(
50+
inetAddressResolver, socketSettings, sslSettings, executorService);
3851
}
3952

4053
@Override

driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,63 @@
1717
package com.mongodb.internal.connection;
1818

1919
import com.mongodb.MongoClientException;
20+
import com.mongodb.MongoClientSettings;
21+
import com.mongodb.connection.AsyncTransportSettings;
2022
import com.mongodb.connection.NettyTransportSettings;
23+
import com.mongodb.connection.SocketSettings;
2124
import com.mongodb.connection.TransportSettings;
2225
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
2326
import com.mongodb.spi.dns.InetAddressResolver;
2427

28+
import java.util.concurrent.ExecutorService;
29+
2530
/**
2631
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2732
*/
2833
public final class StreamFactoryHelper {
29-
public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings,
34+
35+
public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings,
36+
final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) {
37+
TransportSettings transportSettings = settings.getTransportSettings();
38+
if (transportSettings == null) {
39+
return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
40+
} else if (transportSettings instanceof AsyncTransportSettings) {
41+
throw new MongoClientException("Unsupported async transport settings: " + transportSettings.getClass().getName());
42+
} else if (transportSettings instanceof NettyTransportSettings) {
43+
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings)
44+
.create(socketSettings, settings.getSslSettings());
45+
} else {
46+
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
47+
}
48+
}
49+
50+
public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClientSettings settings,
3051
final InetAddressResolver inetAddressResolver) {
31-
if (transportSettings instanceof NettyTransportSettings) {
32-
return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings)
33-
.inetAddressResolver(inetAddressResolver)
34-
.build();
52+
TransportSettings transportSettings = settings.getTransportSettings();
53+
if (transportSettings == null || transportSettings instanceof AsyncTransportSettings) {
54+
ExecutorService executorService = transportSettings == null
55+
? null
56+
: ((AsyncTransportSettings) transportSettings).getExecutorService();
57+
if (settings.getSslSettings().isEnabled()) {
58+
return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService);
59+
} else {
60+
return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, executorService);
61+
}
62+
} else if (transportSettings instanceof NettyTransportSettings) {
63+
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);
3564
} else {
3665
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
3766
}
3867
}
3968

69+
private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
70+
final NettyTransportSettings transportSettings) {
71+
return NettyStreamFactoryFactory.builder()
72+
.applySettings(transportSettings)
73+
.inetAddressResolver(inetAddressResolver)
74+
.build();
75+
}
76+
4077
private StreamFactoryHelper() {
4178
}
4279
}

driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.security.NoSuchAlgorithmException;
4747
import java.util.Iterator;
4848
import java.util.concurrent.ConcurrentLinkedDeque;
49+
import java.util.concurrent.ExecutorService;
4950
import java.util.concurrent.Future;
5051
import java.util.concurrent.TimeUnit;
5152

@@ -71,13 +72,18 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory {
7172
/**
7273
* Construct a new instance
7374
*/
74-
public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
75+
public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
76+
@Nullable final ExecutorService executorService) {
7577
this.inetAddressResolver = inetAddressResolver;
76-
this.group = new AsynchronousTlsChannelGroup();
78+
this.group = new AsynchronousTlsChannelGroup(executorService);
7779
selectorMonitor = new SelectorMonitor();
7880
selectorMonitor.start();
7981
}
8082

83+
public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
84+
this(inetAddressResolver, null);
85+
}
86+
8187
@Override
8288
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
8389
assertTrue(sslSettings.isEnabled());

0 commit comments

Comments
 (0)