Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0ad6766

Browse files
committedApr 14, 2019
Support auto refresh a list of cluster nodes
Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a Tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Closes: #34 See also: #142
1 parent 06652db commit 0ad6766

21 files changed

+1794
-328
lines changed
 
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
7+
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
8+
9+
/**
10+
* Limit of retries.
11+
*/
12+
private int retriesLimit = RETRY_NO_LIMIT;
13+
14+
/**
15+
* Timeout to establish socket connection with an individual server.
16+
*/
17+
private int timeout = NO_TIMEOUT;
18+
19+
/**
20+
* Tries to establish a new connection to the Tarantool instances.
21+
*
22+
* @param retryNumber number of current retry. Reset after successful connect.
23+
* @param lastError the last error occurs when reconnecting
24+
*
25+
* @return connected socket channel
26+
*
27+
* @throws CommunicationException if any I/O errors happen or there are
28+
* no addresses available
29+
*/
30+
@Override
31+
public final SocketChannel get(int retryNumber, Throwable lastError) {
32+
if (areRetriesExhausted(retryNumber)) {
33+
throw new CommunicationException("Connection retries exceeded.", lastError);
34+
}
35+
36+
long deadline = System.currentTimeMillis() + timeout;
37+
while (!Thread.currentThread().isInterrupted()) {
38+
try {
39+
InetSocketAddress address = getAddress(retryNumber, lastError);
40+
return openChannel(address);
41+
} catch (IOException e) {
42+
checkTimeout(deadline, e);
43+
}
44+
}
45+
throw new CommunicationException("Thread interrupted.", new InterruptedException());
46+
}
47+
48+
private void checkTimeout(long deadline, Exception e) {
49+
long timeLeft = deadline - System.currentTimeMillis();
50+
if (timeLeft <= 0) {
51+
throw new CommunicationException("Connection time out.", e);
52+
}
53+
try {
54+
Thread.sleep(timeLeft / 10);
55+
} catch (InterruptedException ignored) {
56+
Thread.currentThread().interrupt();
57+
}
58+
}
59+
60+
/**
61+
* Gets address to be used to establish a new connection
62+
* Address can be null
63+
*
64+
* @param retryNumber reconnection attempt number
65+
* @param lastError reconnection reason
66+
*
67+
* @return available address which is depended on implementation
68+
*
69+
* @throws IOException if any I/O errors occur
70+
*/
71+
protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
72+
73+
/**
74+
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
75+
* The retry count is maintained by a {@link #get(int, Throwable)} caller
76+
* when a socket level connection was established.
77+
* <p>
78+
* Negative value means unlimited attempts.
79+
*
80+
* @param retriesLimit Limit of retries to use.
81+
*/
82+
public void setRetriesLimit(int retriesLimit) {
83+
this.retriesLimit = retriesLimit;
84+
}
85+
86+
/**
87+
* @return Maximum reconnect attempts to make before raising exception.
88+
*/
89+
public int getRetriesLimit() {
90+
return retriesLimit;
91+
}
92+
93+
/**
94+
* Parse a string address in the form of host[:port]
95+
* and builds a socket address.
96+
*
97+
* @param address Server address.
98+
*
99+
* @return Socket address.
100+
*/
101+
protected InetSocketAddress parseAddress(String address) {
102+
int separatorPosition = address.indexOf(':');
103+
String host = (separatorPosition < 0) ? address : address.substring(0, separatorPosition);
104+
int port = (separatorPosition < 0) ? 3301 : Integer.parseInt(address.substring(separatorPosition + 1));
105+
return new InetSocketAddress(host, port);
106+
}
107+
108+
protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException {
109+
SocketChannel channel = null;
110+
try {
111+
channel = SocketChannel.open();
112+
channel.socket().connect(socketAddress, timeout);
113+
return channel;
114+
} catch (IOException e) {
115+
if (channel != null) {
116+
try {
117+
channel.close();
118+
} catch (IOException ignored) {
119+
// No-op.
120+
}
121+
}
122+
throw e;
123+
}
124+
}
125+
126+
/**
127+
* Sets maximum amount of time to wait for a socket connection establishment
128+
* with an individual server.
129+
* <p>
130+
* Zero means infinite timeout.
131+
*
132+
* @param timeout timeout value, ms.
133+
*
134+
* @throws IllegalArgumentException if timeout is negative.
135+
*/
136+
public void setTimeout(int timeout) {
137+
if (timeout < 0) {
138+
throw new IllegalArgumentException("timeout is negative.");
139+
}
140+
this.timeout = timeout;
141+
}
142+
143+
/**
144+
* @return Maximum amount of time to wait for a socket connection establishment
145+
* with an individual server.
146+
*/
147+
public int getTimeout() {
148+
return timeout;
149+
}
150+
151+
/**
152+
* Provides a decision on whether retries limit is hit.
153+
*
154+
* @param retries Current count of retries.
155+
*
156+
* @return {@code true} if retries are exhausted.
157+
*/
158+
private boolean areRetriesExhausted(int retries) {
159+
int limit = getRetriesLimit();
160+
if (limit < 0) {
161+
return false;
162+
}
163+
return retries >= limit;
164+
}
165+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

‎src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

Lines changed: 104 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -2,181 +2,155 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5-
import java.nio.channels.SocketChannel;
5+
import java.net.SocketAddress;
6+
import java.util.ArrayList;
67
import java.util.Arrays;
8+
import java.util.Collection;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.locks.Lock;
13+
import java.util.concurrent.locks.ReadWriteLock;
14+
import java.util.concurrent.locks.ReentrantReadWriteLock;
15+
import java.util.stream.Collectors;
716

817
/**
918
* Basic reconnection strategy that changes addresses in a round-robin fashion.
1019
* To be used with {@link TarantoolClientImpl}.
1120
*/
12-
public class RoundRobinSocketProviderImpl implements SocketChannelProvider {
13-
/** Timeout to establish socket connection with an individual server. */
14-
private int timeout; // 0 is infinite.
15-
/** Limit of retries. */
16-
private int retriesLimit = -1; // No-limit.
17-
/** Server addresses as configured. */
18-
private final String[] addrs;
19-
/** Socket addresses. */
20-
private final InetSocketAddress[] sockAddrs;
21-
/** Current position within {@link #sockAddrs} array. */
22-
private int pos;
21+
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {
22+
23+
private static final int UNSET_POSITION = -1;
2324

2425
/**
25-
* Constructs an instance.
26-
*
27-
* @param addrs Array of addresses in a form of [host]:[port].
26+
* Socket addresses pool.
2827
*/
29-
public RoundRobinSocketProviderImpl(String... addrs) {
30-
if (addrs == null || addrs.length == 0)
31-
throw new IllegalArgumentException("addrs is null or empty.");
32-
33-
this.addrs = Arrays.copyOf(addrs, addrs.length);
34-
35-
sockAddrs = new InetSocketAddress[this.addrs.length];
36-
37-
for (int i = 0; i < this.addrs.length; i++) {
38-
sockAddrs[i] = parseAddress(this.addrs[i]);
39-
}
40-
}
28+
private final List<InetSocketAddress> socketAddresses = new ArrayList<>();
4129

4230
/**
43-
* @return Configured addresses in a form of [host]:[port].
31+
* Current position within {@link #socketAddresses} list.
32+
* <p>
33+
* It is {@link #UNSET_POSITION} when no addresses from
34+
* the {@link #socketAddresses} pool have been processed yet.
35+
* <p>
36+
* When this provider receives new addresses it tries
37+
* to look for a new position for the last used address or
38+
* sets the position to {@link #UNSET_POSITION} otherwise.
39+
*
40+
* @see #getLastObtainedAddress()
41+
* @see #refreshAddresses(Collection)
4442
*/
45-
public String[] getAddresses() {
46-
return this.addrs;
47-
}
43+
private AtomicInteger currentPosition = new AtomicInteger(UNSET_POSITION);
4844

4945
/**
50-
* Sets maximum amount of time to wait for a socket connection establishment
51-
* with an individual server.
46+
* Address list lock for a thread-safe access to it
47+
* when a refresh operation occurs
5248
*
53-
* Zero means infinite timeout.
54-
*
55-
* @param timeout Timeout value, ms.
56-
* @return {@code this}.
57-
* @throws IllegalArgumentException If timeout is negative.
49+
* @see RefreshableSocketProvider#refreshAddresses(Collection)
5850
*/
59-
public RoundRobinSocketProviderImpl setTimeout(int timeout) {
60-
if (timeout < 0)
61-
throw new IllegalArgumentException("timeout is negative.");
62-
63-
this.timeout = timeout;
64-
65-
return this;
66-
}
51+
private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
6752

6853
/**
69-
* @return Maximum amount of time to wait for a socket connection establishment
70-
* with an individual server.
54+
* Constructs an instance.
55+
*
56+
* @param addresses optional array of addresses in a form of host[:port]
57+
* @throws IllegalArgumentException if addresses aren't provided
7158
*/
72-
public int getTimeout() {
73-
return timeout;
59+
public RoundRobinSocketProviderImpl(String... addresses) {
60+
updateAddressList(Arrays.asList(addresses));
61+
}
62+
63+
private void updateAddressList(Collection<String> addresses) {
64+
if (addresses == null || addresses.isEmpty()) {
65+
throw new IllegalArgumentException("At least one address must be provided");
66+
}
67+
Lock writeLock = addressListLock.writeLock();
68+
writeLock.lock();
69+
try {
70+
InetSocketAddress lastAddress = getLastObtainedAddress();
71+
socketAddresses.clear();
72+
addresses.stream()
73+
.map(this::parseAddress)
74+
.collect(Collectors.toCollection(() -> socketAddresses));
75+
if (lastAddress != null) {
76+
int recoveredPosition = socketAddresses.indexOf(lastAddress);
77+
currentPosition.set(recoveredPosition);
78+
} else {
79+
currentPosition.set(UNSET_POSITION);
80+
}
81+
} finally {
82+
writeLock.unlock();
83+
}
7484
}
7585

7686
/**
77-
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
78-
* The retry count is maintained by a {@link #get(int, Throwable)} caller
79-
* when a socket level connection was established.
80-
*
81-
* Negative value means unlimited.
82-
*
83-
* @param retriesLimit Limit of retries to use.
84-
* @return {@code this}.
87+
* @return resolved socket addresses
8588
*/
86-
public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) {
87-
this.retriesLimit = retriesLimit;
88-
89-
return this;
89+
public List<SocketAddress> getAddresses() {
90+
Lock readLock = addressListLock.readLock();
91+
readLock.lock();
92+
try {
93+
return Collections.unmodifiableList(this.socketAddresses);
94+
} finally {
95+
readLock.unlock();
96+
}
9097
}
9198

9299
/**
93-
* @return Maximum reconnect attempts to make before raising exception.
100+
* Gets last used address from the pool if it exists
101+
*
102+
* @return last obtained address or <code>null</code>
103+
* if {@link #currentPosition} has {@link #UNSET_POSITION} value
94104
*/
95-
public int getRetriesLimit() {
96-
return retriesLimit;
105+
protected InetSocketAddress getLastObtainedAddress() {
106+
Lock readLock = addressListLock.readLock();
107+
readLock.lock();
108+
try {
109+
int index = currentPosition.get();
110+
return index != UNSET_POSITION ? socketAddresses.get(index) : null;
111+
} finally {
112+
readLock.unlock();
113+
}
97114
}
98115

99-
/** {@inheritDoc} */
100116
@Override
101-
public SocketChannel get(int retryNumber, Throwable lastError) {
102-
if (areRetriesExhausted(retryNumber)) {
103-
throw new CommunicationException("Connection retries exceeded.", lastError);
104-
}
105-
int attempts = getAddressCount();
106-
long deadline = System.currentTimeMillis() + timeout * attempts;
107-
while (!Thread.currentThread().isInterrupted()) {
108-
SocketChannel channel = null;
109-
try {
110-
channel = SocketChannel.open();
111-
InetSocketAddress addr = getNextSocketAddress();
112-
channel.socket().connect(addr, timeout);
113-
return channel;
114-
} catch (IOException e) {
115-
if (channel != null) {
116-
try {
117-
channel.close();
118-
} catch (IOException ignored) {
119-
// No-op.
120-
}
121-
}
122-
long now = System.currentTimeMillis();
123-
if (deadline <= now) {
124-
throw new CommunicationException("Connection time out.", e);
125-
}
126-
if (--attempts == 0) {
127-
// Tried all addresses without any lack, but still have time.
128-
attempts = getAddressCount();
129-
try {
130-
Thread.sleep((deadline - now) / attempts);
131-
} catch (InterruptedException ignored) {
132-
Thread.currentThread().interrupt();
133-
}
134-
}
135-
}
136-
}
137-
throw new CommunicationException("Thread interrupted.", new InterruptedException());
117+
protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException {
118+
return getNextSocketAddress();
138119
}
139120

140121
/**
141122
* @return Number of configured addresses.
142123
*/
143124
protected int getAddressCount() {
144-
return sockAddrs.length;
125+
Lock readLock = addressListLock.readLock();
126+
readLock.lock();
127+
try {
128+
return socketAddresses.size();
129+
} finally {
130+
readLock.unlock();
131+
}
145132
}
146133

147134
/**
148-
* @return Socket address to use for the next reconnection attempt.
135+
* @return Socket address to use for the next reconnection attempt
149136
*/
150137
protected InetSocketAddress getNextSocketAddress() {
151-
InetSocketAddress res = sockAddrs[pos];
152-
pos = (pos + 1) % sockAddrs.length;
153-
return res;
154-
}
155-
156-
/**
157-
* Parse a string address in the form of [host]:[port]
158-
* and builds a socket address.
159-
*
160-
* @param addr Server address.
161-
* @return Socket address.
162-
*/
163-
protected InetSocketAddress parseAddress(String addr) {
164-
int idx = addr.indexOf(':');
165-
String host = (idx < 0) ? addr : addr.substring(0, idx);
166-
int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1));
167-
return new InetSocketAddress(host, port);
138+
Lock readLock = addressListLock.readLock();
139+
readLock.lock();
140+
try {
141+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
142+
return socketAddresses.get(position);
143+
} finally {
144+
readLock.unlock();
145+
}
168146
}
169147

170148
/**
171-
* Provides a decision on whether retries limit is hit.
149+
* @param addresses list of addresses to be applied
172150
*
173-
* @param retries Current count of retries.
174-
* @return {@code true} if retries are exhausted.
151+
* @throws IllegalArgumentException if addresses list is empty
175152
*/
176-
private boolean areRetriesExhausted(int retries) {
177-
int limit = getRetriesLimit();
178-
if (limit < 0)
179-
return false;
180-
return retries >= limit;
153+
public void refreshAddresses(Collection<String> addresses) {
154+
updateAddressList(addresses);
181155
}
182156
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.tarantool;
2+
3+
import org.tarantool.util.StringUtils;
4+
5+
import java.io.IOException;
6+
import java.net.InetSocketAddress;
7+
import java.net.SocketAddress;
8+
9+
/**
10+
* Simple provider that produces a single connection
11+
* To be used with {@link TarantoolClientImpl}.
12+
*/
13+
public class SingleSocketChannelProviderImpl extends BaseSocketChannelProvider {
14+
15+
private InetSocketAddress address;
16+
17+
/**
18+
* Creates a simple provider
19+
*
20+
* @param address instance address
21+
*/
22+
public SingleSocketChannelProviderImpl(String address) {
23+
setAddress(address);
24+
}
25+
26+
public SocketAddress getAddress() {
27+
return address;
28+
}
29+
30+
public void setAddress(String address) {
31+
if (StringUtils.isBlank(address)) {
32+
throw new IllegalArgumentException("address must not be empty");
33+
}
34+
35+
this.address = parseAddress(address);
36+
}
37+
38+
@Override
39+
protected InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException {
40+
return address;
41+
}
42+
43+
}

‎src/main/java/org/tarantool/SocketChannelProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
import java.nio.channels.SocketChannel;
55

66
public interface SocketChannelProvider {
7+
8+
int RETRY_NO_LIMIT = -1;
9+
int NO_TIMEOUT = 0;
10+
711
/**
812
* Provides socket channel to init restore connection.
913
* You could change hosts on fail and sleep between retries in this method
Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,50 @@
11
package org.tarantool;
22

3-
4-
import java.util.concurrent.TimeUnit;
5-
63
public class TarantoolClientConfig {
74

85
/**
9-
* username and password for authorization
6+
* Auth-related data
107
*/
118
public String username;
12-
139
public String password;
1410

1511
/**
16-
* default ByteArrayOutputStream size when make query serialization
12+
* Default request size when make query serialization
1713
*/
1814
public int defaultRequestSize = 4096;
1915

2016
/**
21-
* initial size for map which holds futures of sent request
17+
* Initial capacity for the map which holds futures of sent request
2218
*/
2319
public int predictedFutures = (int) ((1024 * 1024) / 0.75) + 1;
2420

25-
2621
public int writerThreadPriority = Thread.NORM_PRIORITY;
27-
2822
public int readerThreadPriority = Thread.NORM_PRIORITY;
2923

30-
3124
/**
32-
* shared buffer is place where client collect requests when socket is busy on write
25+
* Shared buffer size (place where client collects requests
26+
* when socket is busy on write)
3327
*/
3428
public int sharedBufferSize = 8 * 1024 * 1024;
29+
3530
/**
36-
* not put request into the shared buffer if request size is ge directWriteFactor * sharedBufferSize
31+
* Factor to calculate a threshold whether request will be accommodated
32+
* in the shared buffer.
33+
* if request size exceeds <code>directWriteFactor * sharedBufferSize</code>
34+
* request is sent directly.
3735
*/
3836
public double directWriteFactor = 0.5d;
3937

4038
/**
41-
* Use old call command https://github.com/tarantool/doc/issues/54,
42-
* please ensure that you server supports new call command
39+
* Use old call command https://github.com/tarantool/doc/issues/54,
40+
* please ensure that you server supports new call command
4341
*/
4442
public boolean useNewCall = false;
4543

4644
/**
47-
* Any blocking ops timeout
45+
* Limits for synchronous operations
4846
*/
49-
public long initTimeoutMillis = 60*1000L;
50-
51-
public long writeTimeoutMillis = 60*1000L;
47+
public long initTimeoutMillis = 60 * 1000L;
48+
public long writeTimeoutMillis = 60 * 1000L;
5249

5350
}

‎src/main/java/org/tarantool/TarantoolClientImpl.java

Lines changed: 186 additions & 63 deletions
Large diffs are not rendered by default.

‎src/main/java/org/tarantool/TarantoolClusterClient.java

Lines changed: 173 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
11
package org.tarantool;
22

3+
import org.tarantool.cluster.TarantoolClusterDiscoverer;
4+
import org.tarantool.cluster.TarantoolClusterStoredFunctionDiscoverer;
5+
import org.tarantool.protocol.TarantoolPacket;
6+
import org.tarantool.util.StringUtils;
7+
8+
import java.io.IOException;
9+
import java.net.SocketAddress;
310
import java.util.ArrayList;
411
import java.util.Collection;
12+
import java.util.Objects;
13+
import java.util.Set;
514
import java.util.concurrent.CompletableFuture;
615
import java.util.concurrent.ConcurrentHashMap;
716
import java.util.concurrent.Executor;
817
import java.util.concurrent.Executors;
9-
10-
import static org.tarantool.TarantoolClientImpl.StateHelper.CLOSED;
18+
import java.util.concurrent.ScheduledExecutorService;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.locks.StampedLock;
1121

1222
/**
1323
* Basic implementation of a client that may work with the cluster
@@ -17,18 +27,30 @@
1727
* unless the configured expiration time is over.
1828
*/
1929
public class TarantoolClusterClient extends TarantoolClientImpl {
20-
/* Need some execution context to retry writes. */
30+
31+
/**
32+
* Need some execution context to retry writes.
33+
*/
2134
private Executor executor;
2235

23-
/* Collection of operations to be retried. */
24-
private ConcurrentHashMap<Long, ExpirableOp<?>> retries = new ConcurrentHashMap<Long, ExpirableOp<?>>();
36+
/**
37+
* Discovery activity
38+
*/
39+
private ScheduledExecutorService instancesDiscoveryExecutor;
40+
private Runnable instancesDiscovererTask;
41+
private StampedLock discoveryLock = new StampedLock();
2542

2643
/**
27-
* @param config Configuration.
28-
* @param addrs Array of addresses in the form of [host]:[port].
44+
* Collection of operations to be retried.
2945
*/
30-
public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) {
31-
this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis));
46+
private ConcurrentHashMap<Long, ExpirableOp<?>> retries = new ConcurrentHashMap<>();
47+
48+
/**
49+
* @param config Configuration.
50+
* @param addresses Array of addresses in the form of host[:port].
51+
*/
52+
public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addresses) {
53+
this(config, makeClusterSocketProvider(addresses, config.operationExpiryTimeMillis));
3254
}
3355

3456
/**
@@ -38,13 +60,32 @@ public TarantoolClusterClient(TarantoolClusterClientConfig config, String... add
3860
public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) {
3961
super(provider, config);
4062

41-
this.executor = config.executor == null ?
42-
Executors.newSingleThreadExecutor() : config.executor;
63+
this.executor = config.executor == null
64+
? Executors.newSingleThreadExecutor()
65+
: config.executor;
66+
67+
if (StringUtils.isNotBlank(config.clusterDiscoveryEntryFunction)) {
68+
this.instancesDiscovererTask =
69+
createDiscoveryTask(new TarantoolClusterStoredFunctionDiscoverer(config, this));
70+
this.instancesDiscoveryExecutor
71+
= Executors.newSingleThreadScheduledExecutor(new TarantoolThreadDaemonFactory("tarantoolDiscoverer"));
72+
int delay = config.clusterDiscoveryDelayMillis > 0
73+
? config.clusterDiscoveryDelayMillis
74+
: TarantoolClusterClientConfig.DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS;
75+
76+
// todo: it's better to start a job later (out of ctor)
77+
this.instancesDiscoveryExecutor.scheduleWithFixedDelay(
78+
this.instancesDiscovererTask,
79+
0,
80+
delay,
81+
TimeUnit.MILLISECONDS
82+
);
83+
}
4384
}
4485

4586
@Override
4687
protected boolean isDead(CompletableFuture<?> q) {
47-
if ((state.getState() & CLOSED) != 0) {
88+
if ((state.getState() & StateHelper.CLOSED) != 0) {
4889
q.completeExceptionally(new CommunicationException("Connection is dead", thumbstone));
4990
return true;
5091
}
@@ -60,22 +101,40 @@ protected CompletableFuture<?> doExec(Code code, Object[] args) {
60101
validateArgs(args);
61102
long sid = syncId.incrementAndGet();
62103
ExpirableOp<?> future = makeFuture(sid, code, args);
104+
return registerOperation(future);
105+
}
63106

64-
if (isDead(future)) {
65-
return future;
66-
}
67-
futures.put(sid, future);
68-
if (isDead(future)) {
69-
futures.remove(sid);
70-
return future;
71-
}
107+
/**
108+
* Registers a new async operation which will be resolved later.
109+
* Registration is discovery-aware in term of synchronization and
110+
* it may be blocked util the discovery finishes its work.
111+
*
112+
* @param future operation to be performed
113+
* @return registered operation
114+
*/
115+
private CompletableFuture<?> registerOperation(ExpirableOp<?> future) {
116+
long stamp = discoveryLock.readLock();
72117
try {
73-
write(code, sid, null, args);
74-
} catch (Exception e) {
75-
futures.remove(sid);
76-
fail(future, e);
118+
if (isDead(future)) {
119+
return future;
120+
}
121+
futures.put(future.getId(), future);
122+
if (isDead(future)) {
123+
futures.remove(future.getId());
124+
return future;
125+
}
126+
127+
try {
128+
write(future.getCode(), future.getId(), null, future.getArgs());
129+
} catch (Exception e) {
130+
futures.remove(future.getId());
131+
fail(future, e);
132+
}
133+
134+
return future;
135+
} finally {
136+
discoveryLock.unlock(stamp);
77137
}
78-
return future;
79138
}
80139

81140
@Override
@@ -99,6 +158,10 @@ protected boolean checkFail(CompletableFuture<?> q, Exception e) {
99158
protected void close(Exception e) {
100159
super.close(e);
101160

161+
if (instancesDiscoveryExecutor != null) {
162+
instancesDiscoveryExecutor.shutdownNow();
163+
}
164+
102165
if (retries == null) {
103166
// May happen within constructor.
104167
return;
@@ -133,27 +196,99 @@ protected void onReconnect() {
133196
// First call is before the constructor finished. Skip it.
134197
return;
135198
}
136-
Collection<ExpirableOp<?>> futuresToRetry = new ArrayList<ExpirableOp<?>>(retries.values());
199+
Collection<ExpirableOp<?>> futuresToRetry = new ArrayList<>(retries.values());
137200
retries.clear();
138201
long now = System.currentTimeMillis();
139202
for (final ExpirableOp<?> future : futuresToRetry) {
140203
if (!future.hasExpired(now)) {
141-
executor.execute(new Runnable() {
142-
@Override
143-
public void run() {
144-
futures.put(future.getId(), future);
145-
try {
146-
write(future.getCode(), future.getId(), null, future.getArgs());
147-
} catch (Exception e) {
148-
futures.remove(future.getId());
149-
fail(future, e);
150-
}
151-
}
152-
});
204+
executor.execute(() -> registerOperation(future));
205+
}
206+
}
207+
}
208+
209+
@Override
210+
protected void complete(TarantoolPacket packet, TarantoolOp<?> future) {
211+
super.complete(packet, future);
212+
RefreshableSocketProvider provider = getRefreshableSocketProvider();
213+
if (provider != null) {
214+
renewConnectionIfRequired(provider.getAddresses());
215+
}
216+
}
217+
218+
protected void onInstancesRefreshed(Set<String> instances) {
219+
RefreshableSocketProvider provider = getRefreshableSocketProvider();
220+
if (provider != null) {
221+
provider.refreshAddresses(instances);
222+
renewConnectionIfRequired(provider.getAddresses());
223+
}
224+
}
225+
226+
private RefreshableSocketProvider getRefreshableSocketProvider() {
227+
return socketProvider instanceof RefreshableSocketProvider
228+
? (RefreshableSocketProvider) socketProvider
229+
: null;
230+
}
231+
232+
private void renewConnectionIfRequired(Collection<SocketAddress> addresses) {
233+
if (pendingResponsesCount.get() > 0 || !isAlive()) {
234+
return;
235+
}
236+
SocketAddress addressInUse = getCurrentAddressOrNull();
237+
if (!(addressInUse == null || addresses.contains(addressInUse))) {
238+
long stamp = discoveryLock.tryWriteLock();
239+
if (!discoveryLock.validate(stamp)) {
240+
return;
241+
}
242+
try {
243+
if (pendingResponsesCount.get() == 0) {
244+
stopIO();
245+
}
246+
} finally {
247+
discoveryLock.unlock(stamp);
153248
}
154249
}
155250
}
156251

252+
private SocketAddress getCurrentAddressOrNull() {
253+
try {
254+
return channel.getRemoteAddress();
255+
} catch (IOException ignored) {
256+
return null;
257+
}
258+
}
259+
260+
public void refreshInstances() {
261+
if (instancesDiscovererTask != null) {
262+
instancesDiscovererTask.run();
263+
}
264+
}
265+
266+
private static RoundRobinSocketProviderImpl makeClusterSocketProvider(String[] addresses,
267+
int connectionTimeout) {
268+
RoundRobinSocketProviderImpl socketProvider = new RoundRobinSocketProviderImpl(addresses);
269+
socketProvider.setTimeout(connectionTimeout);
270+
return socketProvider;
271+
}
272+
273+
private Runnable createDiscoveryTask(TarantoolClusterDiscoverer serviceDiscoverer) {
274+
return new Runnable() {
275+
276+
private Set<String> lastInstances;
277+
278+
@Override
279+
public synchronized void run() {
280+
try {
281+
Set<String> freshInstances = serviceDiscoverer.getInstances();
282+
if (!(freshInstances.isEmpty() || Objects.equals(lastInstances, freshInstances))) {
283+
lastInstances = freshInstances;
284+
onInstancesRefreshed(lastInstances);
285+
}
286+
} catch (Exception ignored) {
287+
}
288+
}
289+
};
290+
}
291+
157292
/**
158293
* Holds operation code and arguments for retry.
159294
*/

‎src/main/java/org/tarantool/TarantoolClusterClientConfig.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,30 @@
66
* Configuration for the {@link TarantoolClusterClient}.
77
*/
88
public class TarantoolClusterClientConfig extends TarantoolClientConfig {
9-
/* Amount of time (in milliseconds) the operation is eligible for retry. */
10-
public int operationExpiryTimeMillis = 500;
119

12-
/* Executor service that will be used as a thread of execution to retry writes. */
13-
public Executor executor = null;
10+
public static final int DEFAULT_OPERATION_EXPIRY_TIME_MILLIS = 500;
11+
public static final int DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS = 60_000;
12+
13+
/**
14+
* Period for the operation is eligible for retry.
15+
*/
16+
public int operationExpiryTimeMillis = DEFAULT_OPERATION_EXPIRY_TIME_MILLIS;
17+
18+
/**
19+
* Executor that will be used as a thread of
20+
* execution to retry writes.
21+
*/
22+
public Executor executor;
23+
24+
/**
25+
* Gets a name of the stored function to be used
26+
* to fetch list of instances.
27+
*/
28+
public String clusterDiscoveryEntryFunction;
29+
30+
/**
31+
* Scan period for refreshing a new list of instances.
32+
*/
33+
public int clusterDiscoveryDelayMillis = DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS;
34+
1435
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.tarantool;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class TarantoolThreadDaemonFactory implements ThreadFactory {
7+
8+
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
9+
private final AtomicInteger threadNumber = new AtomicInteger(1);
10+
private final String namePrefix;
11+
12+
public TarantoolThreadDaemonFactory(String namePrefix) {
13+
this.namePrefix = namePrefix + "-" + POOL_NUMBER.incrementAndGet() + "-thread-";
14+
}
15+
16+
@Override
17+
public Thread newThread(Runnable runnable) {
18+
Thread thread = new Thread(runnable, namePrefix + threadNumber.incrementAndGet());
19+
thread.setDaemon(true);
20+
21+
return thread;
22+
}
23+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.tarantool.cluster;
2+
3+
/**
4+
* Raised when {@link TarantoolClusterStoredFunctionDiscoverer} validates
5+
* a function result as unsupported.
6+
*/
7+
public class IllegalDiscoveryFunctionResult extends RuntimeException {
8+
9+
public IllegalDiscoveryFunctionResult(String message) {
10+
super(message);
11+
}
12+
13+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.tarantool.cluster;
2+
3+
import java.util.Set;
4+
5+
/**
6+
* Discovery strategy to obtain a list of the cluster nodes.
7+
* This one can be used by {@link org.tarantool.RefreshableSocketProvider}
8+
* to provide support for fault tolerance property.
9+
*
10+
* @see org.tarantool.RefreshableSocketProvider
11+
*/
12+
public interface TarantoolClusterDiscoverer {
13+
14+
/**
15+
* Gets nodes addresses in <code>host[:port]</code> format.
16+
*
17+
* @return list of the cluster nodes
18+
*/
19+
Set<String> getInstances();
20+
21+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package org.tarantool.cluster;
2+
3+
import org.tarantool.TarantoolClient;
4+
import org.tarantool.TarantoolClientOps;
5+
import org.tarantool.TarantoolClusterClientConfig;
6+
7+
import java.util.LinkedHashSet;
8+
import java.util.List;
9+
import java.util.Set;
10+
import java.util.stream.Collectors;
11+
12+
/**
13+
* A cluster nodes discoverer based on calling a predefined function
14+
* which returns list of nodes.
15+
*
16+
* The function has to have no arguments and return list of
17+
* the strings which follow <code>host[:port]</code> format
18+
*/
19+
public class TarantoolClusterStoredFunctionDiscoverer implements TarantoolClusterDiscoverer {
20+
21+
private TarantoolClient client;
22+
private String entryFunction;
23+
24+
public TarantoolClusterStoredFunctionDiscoverer(TarantoolClusterClientConfig clientConfig, TarantoolClient client) {
25+
this.client = client;
26+
this.entryFunction = clientConfig.clusterDiscoveryEntryFunction;
27+
}
28+
29+
@Override
30+
public Set<String> getInstances() {
31+
TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOperations = client.syncOps();
32+
33+
List<?> list = syncOperations.call(entryFunction);
34+
// discoverer expects a single array result from the function now;
35+
// in order to protect this contract the discoverer does a strict
36+
// validation against the data returned;
37+
// this strict-mode allows us to extend the contract in a non-breaking
38+
// way for old clients just reserve an extra return value in
39+
// terms of LUA multi-result support.
40+
checkResult(list);
41+
42+
List<Object> funcResult = (List<Object>) list.get(0);
43+
return funcResult.stream()
44+
.map(Object::toString)
45+
.collect(Collectors.toCollection(LinkedHashSet::new));
46+
}
47+
48+
/**
49+
* Check whether the result follows the contract or not.
50+
* The contract is a mandatory <b>single array of strings</b>
51+
*
52+
* @param result result to be validated
53+
*/
54+
private void checkResult(List<?> result) {
55+
if (result == null || result.isEmpty()) {
56+
throw new IllegalDiscoveryFunctionResult("Discovery function returned no data");
57+
}
58+
if (!((List<Object>)result.get(0)).stream().allMatch(item -> item instanceof String)) {
59+
throw new IllegalDiscoveryFunctionResult("The first value must be an array of strings");
60+
}
61+
}
62+
63+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.tarantool.util;
2+
3+
public class StringUtils {
4+
5+
public static boolean isEmpty(String string) {
6+
return (string == null) || (string.isEmpty());
7+
}
8+
9+
public static boolean isNotEmpty(String string) {
10+
return !isEmpty(string);
11+
}
12+
13+
public static boolean isBlank(String string) {
14+
return (string == null) || (string.trim().isEmpty());
15+
}
16+
17+
public static boolean isNotBlank(String string) {
18+
return !isBlank(string);
19+
}
20+
21+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.net.Socket;
6+
import java.net.SocketAddress;
7+
import java.nio.channels.SocketChannel;
8+
import java.util.Collection;
9+
import java.util.stream.Collectors;
10+
11+
import static org.mockito.Mockito.anyObject;
12+
import static org.mockito.Mockito.doReturn;
13+
import static org.mockito.Mockito.doThrow;
14+
import static org.mockito.Mockito.mock;
15+
import static org.mockito.Mockito.spy;
16+
import static org.mockito.Mockito.when;
17+
18+
public class AbstractSocketProviderTest {
19+
20+
protected String extractRawHostAndPortString(SocketAddress socketAddress) {
21+
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
22+
return inetSocketAddress.getAddress().getHostName() + ":" + inetSocketAddress.getPort();
23+
}
24+
25+
protected Iterable<String> asRawHostAndPort(Collection<SocketAddress> addresses) {
26+
return addresses.stream()
27+
.map(this::extractRawHostAndPortString)
28+
.collect(Collectors.toList());
29+
}
30+
31+
protected <T extends BaseSocketChannelProvider> T wrapWithMockChannelProvider(T source) throws IOException {
32+
T wrapper = spy(source);
33+
doReturn(makeSocketChannel()).when(wrapper).openChannel(anyObject());
34+
return wrapper;
35+
}
36+
37+
protected <T extends BaseSocketChannelProvider> T wrapWithMockErroredChannelProvider(T source) throws IOException {
38+
T wrapper = spy(source);
39+
doThrow(IOException.class).when(wrapper).openChannel(anyObject());
40+
return wrapper;
41+
}
42+
43+
private SocketChannel makeSocketChannel() {
44+
SocketChannel socketChannel = mock(SocketChannel.class);
45+
when(socketChannel.socket()).thenReturn(mock(Socket.class));
46+
47+
return socketChannel;
48+
}
49+
50+
}

‎src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import org.junit.jupiter.api.BeforeAll;
55
import org.opentest4j.AssertionFailedError;
66

7-
import java.math.BigInteger;
87
import java.io.IOException;
8+
import java.math.BigInteger;
99
import java.net.InetSocketAddress;
1010
import java.net.Socket;
1111
import java.util.List;
@@ -18,13 +18,13 @@
1818
import static org.junit.jupiter.api.Assertions.assertEquals;
1919
import static org.junit.jupiter.api.Assertions.assertNotNull;
2020
import static org.junit.jupiter.api.Assertions.assertTrue;
21-
2221
import static org.tarantool.TestUtils.makeInstanceEnv;
2322

2423
/**
2524
* Abstract test. Provides environment control and frequently used functions.
2625
*/
2726
public abstract class AbstractTarantoolConnectorIT {
27+
2828
protected static final String host = System.getProperty("tntHost", "localhost");
2929
protected static final int port = Integer.parseInt(System.getProperty("tntPort", "3301"));
3030
protected static final int consolePort = Integer.parseInt(System.getProperty("tntConsolePort", "3313"));
@@ -37,8 +37,7 @@ public abstract class AbstractTarantoolConnectorIT {
3737
protected static final int TIMEOUT = 500;
3838
protected static final int RESTART_TIMEOUT = 2000;
3939

40-
protected static final SocketChannelProvider socketChannelProvider = new TestSocketChannelProvider(host, port,
41-
RESTART_TIMEOUT);
40+
protected static final SocketChannelProvider socketChannelProvider = new TestSocketChannelProvider(host, port, RESTART_TIMEOUT);
4241

4342
protected static TarantoolControl control;
4443
protected static TarantoolConsole console;
@@ -53,36 +52,36 @@ public abstract class AbstractTarantoolConnectorIT {
5352
protected static int MPK_INDEX_ID;
5453
protected static int VIDX_INDEX_ID;
5554

56-
private static final String[] setupScript = new String[] {
57-
"box.schema.space.create('basic_test', { format = " +
58-
"{{name = 'id', type = 'integer'}," +
59-
" {name = 'val', type = 'string'} } })",
55+
private static final String[] setupScript = new String[]{
56+
"box.schema.space.create('basic_test', { format = " +
57+
"{{name = 'id', type = 'integer'}," +
58+
" {name = 'val', type = 'string'} } })",
6059

61-
"box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )",
62-
"box.space.basic_test:create_index('vidx', { type = 'TREE', unique = false, parts = {'val'} } )",
60+
"box.space.basic_test:create_index('pk', { type = 'TREE', parts = {'id'} } )",
61+
"box.space.basic_test:create_index('vidx', { type = 'TREE', unique = false, parts = {'val'} } )",
6362

64-
"box.space.basic_test:replace{1, 'one'}",
65-
"box.space.basic_test:replace{2, 'two'}",
66-
"box.space.basic_test:replace{3, 'three'}",
63+
"box.space.basic_test:replace{1, 'one'}",
64+
"box.space.basic_test:replace{2, 'two'}",
65+
"box.space.basic_test:replace{3, 'three'}",
6766

68-
"box.schema.space.create('multipart_test', { format = " +
69-
"{{name = 'id1', type = 'integer'}," +
70-
" {name = 'id2', type = 'string'}," +
71-
" {name = 'val1', type = 'string'} } })",
67+
"box.schema.space.create('multipart_test', { format = " +
68+
"{{name = 'id1', type = 'integer'}," +
69+
" {name = 'id2', type = 'string'}," +
70+
" {name = 'val1', type = 'string'} } })",
7271

73-
"box.space.multipart_test:create_index('pk', { type = 'TREE', parts = {'id1', 'id2'} })",
74-
"box.space.multipart_test:create_index('vidx', { type = 'TREE', unique = false, parts = {'val1'} })",
72+
"box.space.multipart_test:create_index('pk', { type = 'TREE', parts = {'id1', 'id2'} })",
73+
"box.space.multipart_test:create_index('vidx', { type = 'TREE', unique = false, parts = {'val1'} })",
7574

76-
"box.space.multipart_test:replace{1, 'one', 'o n e'}",
77-
"box.space.multipart_test:replace{2, 'two', 't w o'}",
78-
"box.space.multipart_test:replace{3, 'three', 't h r e e'}",
75+
"box.space.multipart_test:replace{1, 'one', 'o n e'}",
76+
"box.space.multipart_test:replace{2, 'two', 't w o'}",
77+
"box.space.multipart_test:replace{3, 'three', 't h r e e'}",
7978

80-
"function echo(...) return ... end"
79+
"function echo(...) return ... end"
8180
};
8281

83-
private static final String[] cleanScript = new String[] {
84-
"box.space.basic_test and box.space.basic_test:drop()",
85-
"box.space.multipart_test and box.space.multipart_test:drop()"
82+
private static final String[] cleanScript = new String[]{
83+
"box.space.basic_test and box.space.basic_test:drop()",
84+
"box.space.multipart_test and box.space.multipart_test:drop()"
8685
};
8786

8887
@BeforeAll
@@ -124,7 +123,7 @@ private static void executeLua(String[] exprs) {
124123
protected void checkTupleResult(Object res, List tuple) {
125124
assertNotNull(res);
126125
assertTrue(List.class.isAssignableFrom(res.getClass()));
127-
List list = (List)res;
126+
List list = (List) res;
128127
assertEquals(1, list.size());
129128
assertNotNull(list.get(0));
130129
assertTrue(List.class.isAssignableFrom(list.get(0).getClass()));
@@ -143,27 +142,27 @@ protected static TarantoolClientConfig makeClientConfig() {
143142
return fillClientConfig(new TarantoolClientConfig());
144143
}
145144

146-
protected static TarantoolClusterClientConfig makeClusterClientConfig() {
145+
public static TarantoolClusterClientConfig makeClusterClientConfig() {
147146
TarantoolClusterClientConfig config = fillClientConfig(new TarantoolClusterClientConfig());
148147
config.executor = null;
149148
config.operationExpiryTimeMillis = TIMEOUT;
150149
return config;
151150
}
152151

153-
private static <T> T fillClientConfig(TarantoolClientConfig config) {
152+
private static <T extends TarantoolClientConfig> T fillClientConfig(T config) {
154153
config.username = username;
155154
config.password = password;
156155
config.initTimeoutMillis = RESTART_TIMEOUT;
157156
config.sharedBufferSize = 128;
158-
return (T)config;
157+
return (T) config;
159158
}
160159

161160
protected static TarantoolConsole openConsole() {
162161
return TarantoolConsole.open(host, consolePort);
163162
}
164163

165164
protected static TarantoolConsole openConsole(String instance) {
166-
return TarantoolConsole.open(control.tntCtlWorkDir, instance);
165+
return TarantoolConsole.open(TarantoolControl.tntCtlWorkDir, instance);
167166
}
168167

169168
protected TarantoolConnection openConnection() {
@@ -247,7 +246,7 @@ protected static void startTarantool(String instance) {
247246
*
248247
* @param timeout Timeout in ms.
249248
* @param message Error message.
250-
* @param r Runnable.
249+
* @param r Runnable.
251250
*/
252251
protected void assertTimeoutPreemptively(int timeout, String message, Runnable r) {
253252
ExecutorService executorService = Executors.newSingleThreadExecutor();

‎src/test/java/org/tarantool/ClientReconnectClusterIT.java

Lines changed: 407 additions & 43 deletions
Large diffs are not rendered by default.
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package org.tarantool;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
5+
import static org.junit.jupiter.api.Assertions.assertThrows;
6+
7+
import org.junit.jupiter.api.DisplayName;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.io.IOException;
11+
import java.util.Arrays;
12+
import java.util.Collections;
13+
import java.util.List;
14+
15+
@DisplayName("A RR socket provider")
16+
public class RoundRobinSocketProviderImplTest extends AbstractSocketProviderTest {
17+
18+
@Test
19+
@DisplayName("initialized with a right addresses count")
20+
public void testAddressesCount() {
21+
RoundRobinSocketProviderImpl socketProvider
22+
= new RoundRobinSocketProviderImpl("localhost:3301", "127.0.0.1:3302", "10.0.0.10:3303");
23+
assertEquals(3, socketProvider.getAddressCount());
24+
25+
socketProvider.refreshAddresses(Collections.singletonList("10.0.0.1"));
26+
assertEquals(1, socketProvider.getAddressCount());
27+
}
28+
29+
@Test
30+
@DisplayName("initialized with a right addresses values")
31+
public void testAddresses() {
32+
String[] addresses = {"localhost:3301", "127.0.0.2:3302", "10.0.0.10:3303"};
33+
RoundRobinSocketProviderImpl socketProvider
34+
= new RoundRobinSocketProviderImpl(addresses);
35+
assertIterableEquals(Arrays.asList(addresses), asRawHostAndPort(socketProvider.getAddresses()));
36+
37+
List<String> strings = Collections.singletonList("10.0.0.1:3310");
38+
socketProvider.refreshAddresses(strings);
39+
assertIterableEquals(strings, asRawHostAndPort(socketProvider.getAddresses()));
40+
}
41+
42+
@Test
43+
@DisplayName("initialized failed when an empty addresses list is provided")
44+
public void testEmptyAddresses() {
45+
assertThrows(IllegalArgumentException.class, RoundRobinSocketProviderImpl::new);
46+
}
47+
48+
@Test
49+
@DisplayName("changed addresses list with a failure when a new list is empty")
50+
public void testResultWithEmptyAddresses() throws IOException {
51+
RoundRobinSocketProviderImpl socketProvider
52+
= wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl("localhost:3301"));
53+
54+
assertThrows(IllegalArgumentException.class, () -> socketProvider.refreshAddresses(Collections.emptyList()));
55+
}
56+
57+
@Test
58+
@DisplayName("changed addresses list with a failure when a new list is null")
59+
public void testResultWithWrongAddress() throws IOException {
60+
RoundRobinSocketProviderImpl socketProvider
61+
= wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl("localhost:3301"));
62+
63+
assertThrows(IllegalArgumentException.class, () -> socketProvider.refreshAddresses(null));
64+
}
65+
66+
@Test
67+
@DisplayName("initialized with a default timeout")
68+
public void testDefaultTimeout() {
69+
RoundRobinSocketProviderImpl socketProvider
70+
= new RoundRobinSocketProviderImpl("localhost");
71+
assertEquals(RoundRobinSocketProviderImpl.NO_TIMEOUT, socketProvider.getTimeout());
72+
}
73+
74+
@Test
75+
@DisplayName("changed its timeout to new value")
76+
public void testChangingTimeout() {
77+
RoundRobinSocketProviderImpl socketProvider
78+
= new RoundRobinSocketProviderImpl("localhost");
79+
int expectedTimeout = 10_000;
80+
socketProvider.setTimeout(expectedTimeout);
81+
assertEquals(expectedTimeout, socketProvider.getTimeout());
82+
}
83+
84+
@Test
85+
@DisplayName("changed to negative timeout with a failure")
86+
public void testWrongChangingTimeout() {
87+
RoundRobinSocketProviderImpl socketProvider
88+
= new RoundRobinSocketProviderImpl("localhost");
89+
int negativeValue = -200;
90+
assertThrows(IllegalArgumentException.class, () -> socketProvider.setTimeout(negativeValue));
91+
}
92+
93+
@Test
94+
@DisplayName("produced socket channels using a ring pool")
95+
public void testAddressRingPool() throws IOException {
96+
String[] addresses = {"localhost:3301", "10.0.0.1:3302", "10.0.0.2:3309"};
97+
RoundRobinSocketProviderImpl socketProvider
98+
= wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl(addresses));
99+
100+
for (int i = 0; i < 27; i++) {
101+
socketProvider.get(0, null);
102+
assertEquals(addresses[i % 3], extractRawHostAndPortString(socketProvider.getLastObtainedAddress()));
103+
}
104+
}
105+
106+
@Test
107+
@DisplayName("produced socket channels for the same instance")
108+
public void testOneAddressPool() throws IOException {
109+
String expectedAddress = "10.0.0.1:3301";
110+
String[] addresses = {expectedAddress};
111+
RoundRobinSocketProviderImpl socketProvider
112+
= wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl(addresses));
113+
114+
for (int i = 0; i < 5; i++) {
115+
socketProvider.get(0, null);
116+
assertEquals(expectedAddress, extractRawHostAndPortString(socketProvider.getLastObtainedAddress()));
117+
}
118+
}
119+
120+
@Test
121+
@DisplayName("produced socket channel with an exception when an attempt number is over")
122+
public void testTooManyAttempts() throws IOException {
123+
String expectedAddress = "10.0.0.1:3301";
124+
String[] addresses = {expectedAddress};
125+
RoundRobinSocketProviderImpl socketProvider
126+
= wrapWithMockChannelProvider(new RoundRobinSocketProviderImpl(addresses));
127+
128+
int retriesLimit = 5;
129+
socketProvider.setRetriesLimit(retriesLimit);
130+
131+
for (int i = 0; i < retriesLimit; i++) {
132+
socketProvider.get(0, null);
133+
assertEquals(expectedAddress, extractRawHostAndPortString(socketProvider.getLastObtainedAddress()));
134+
}
135+
136+
assertThrows(CommunicationException.class, () -> socketProvider.get(retriesLimit, null));
137+
}
138+
139+
@Test
140+
@DisplayName("produced a socket channel with a failure when an unreachable address is provided")
141+
public void testWrongAddress() throws IOException {
142+
RoundRobinSocketProviderImpl socketProvider
143+
= wrapWithMockErroredChannelProvider(new RoundRobinSocketProviderImpl("unreachable-host:3301"));
144+
assertThrows(CommunicationException.class, () -> socketProvider.get(0, null));
145+
}
146+
147+
@Test
148+
@DisplayName("produced a socket channel with a failure with an unreachable address after refresh")
149+
public void testWrongRefreshAddress() throws IOException {
150+
RoundRobinSocketProviderImpl socketProvider
151+
= wrapWithMockErroredChannelProvider(new RoundRobinSocketProviderImpl("unreachable-host:3301"));
152+
assertThrows(CommunicationException.class, () -> socketProvider.get(0, null));
153+
}
154+
155+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.tarantool;
2+
3+
import org.junit.jupiter.api.DisplayName;
4+
import org.junit.jupiter.api.Test;
5+
6+
import java.io.IOException;
7+
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
10+
11+
@DisplayName("A single socket provider")
12+
class SingleSocketChannelProviderImplTest extends AbstractSocketProviderTest {
13+
14+
@Test
15+
@DisplayName("initialized with a right address")
16+
public void testAddressesCount() {
17+
String expectedAddress = "localhost:3301";
18+
SingleSocketChannelProviderImpl socketProvider
19+
= new SingleSocketChannelProviderImpl(expectedAddress);
20+
assertEquals(expectedAddress, extractRawHostAndPortString(socketProvider.getAddress()));
21+
}
22+
23+
@Test
24+
@DisplayName("poorly initialized with an empty address")
25+
public void testEmptyAddresses() {
26+
assertThrows(IllegalArgumentException.class, () -> new SingleSocketChannelProviderImpl(null));
27+
}
28+
29+
@Test
30+
@DisplayName("initialized with a default timeout")
31+
public void testDefaultTimeout() {
32+
RoundRobinSocketProviderImpl socketProvider
33+
= new RoundRobinSocketProviderImpl("localhost");
34+
assertEquals(RoundRobinSocketProviderImpl.NO_TIMEOUT, socketProvider.getTimeout());
35+
}
36+
37+
@Test
38+
@DisplayName("changed its timeout to new value")
39+
public void testChangingTimeout() {
40+
RoundRobinSocketProviderImpl socketProvider
41+
= new RoundRobinSocketProviderImpl("localhost");
42+
int expectedTimeout = 10_000;
43+
socketProvider.setTimeout(expectedTimeout);
44+
assertEquals(expectedTimeout, socketProvider.getTimeout());
45+
}
46+
47+
@Test
48+
@DisplayName("changed to negative timeout with a failure")
49+
public void testWrongChangingTimeout() {
50+
RoundRobinSocketProviderImpl socketProvider
51+
= new RoundRobinSocketProviderImpl("localhost");
52+
int negativeValue = -100;
53+
assertThrows(IllegalArgumentException.class, () -> socketProvider.setTimeout(negativeValue));
54+
}
55+
56+
@Test
57+
@DisplayName("produced sockets with same address")
58+
public void testMultipleChannelGetting() throws IOException {
59+
String expectedAddresss = "localhost:3301";
60+
SingleSocketChannelProviderImpl socketProvider
61+
= wrapWithMockChannelProvider(new SingleSocketChannelProviderImpl(expectedAddresss));
62+
63+
for (int i = 0; i < 10; i++) {
64+
socketProvider.get(0, null);
65+
assertEquals(expectedAddresss, extractRawHostAndPortString(socketProvider.getAddress()));
66+
}
67+
}
68+
69+
}

‎src/test/java/org/tarantool/TestUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,28 @@
11
package org.tarantool;
22

3+
import java.util.Collection;
34
import java.util.HashMap;
45
import java.util.List;
56
import java.util.Map;
7+
import java.util.stream.Collectors;
68

79
public class TestUtils {
10+
11+
public static String makeDiscoveryFunction(String functionName, Collection<?> addresses) {
12+
String functionResult = addresses.stream()
13+
.map(address -> "'" + address + "'")
14+
.collect(Collectors.joining(",", "{", "}"));
15+
return makeDiscoveryFunction(functionName, functionResult);
16+
}
17+
18+
public static String makeDiscoveryFunction(String functionName, Object result) {
19+
return makeDiscoveryFunction(functionName, result.toString());
20+
}
21+
22+
public static String makeDiscoveryFunction(String functionName, String body) {
23+
return "function " + functionName + "() return " + body + " end";
24+
}
25+
826
final static String replicationInfoRequest = "return " +
927
"box.info.id, " +
1028
"box.info.lsn, " +
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package org.tarantool.cluster;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5+
import static org.junit.jupiter.api.Assertions.assertThrows;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
import static org.tarantool.TestUtils.makeDiscoveryFunction;
8+
import static org.tarantool.TestUtils.makeInstanceEnv;
9+
10+
import org.tarantool.AbstractTarantoolConnectorIT;
11+
import org.tarantool.CommunicationException;
12+
import org.tarantool.TarantoolClient;
13+
import org.tarantool.TarantoolClientImpl;
14+
import org.tarantool.TarantoolClusterClientConfig;
15+
import org.tarantool.TarantoolControl;
16+
import org.tarantool.TarantoolException;
17+
18+
import org.junit.jupiter.api.AfterAll;
19+
import org.junit.jupiter.api.BeforeAll;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.DisplayName;
22+
import org.junit.jupiter.api.Test;
23+
24+
import java.util.Arrays;
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.Set;
28+
29+
@DisplayName("A cluster discoverer")
30+
public class ClusterServiceStoredFunctionDiscovererIT {
31+
32+
protected static final int INSTANCE_LISTEN_PORT = 3301;
33+
protected static final int INSTANCE_ADMIN_PORT = 3313;
34+
private static final String LUA_FILE = "jdk-testing.lua";
35+
36+
private static final String INSTANCE_NAME = "jdk-testing";
37+
private static TarantoolControl control;
38+
39+
private static String ENTRY_FUNCTION_NAME = "getAddresses";
40+
41+
private TarantoolClusterClientConfig clusterConfig;
42+
private TarantoolClient client;
43+
44+
@BeforeAll
45+
public static void setupEnv() {
46+
control = new TarantoolControl();
47+
control.createInstance(INSTANCE_NAME, LUA_FILE, makeInstanceEnv(INSTANCE_LISTEN_PORT, INSTANCE_ADMIN_PORT));
48+
49+
control.start(INSTANCE_NAME);
50+
control.waitStarted(INSTANCE_NAME);
51+
}
52+
53+
@BeforeEach
54+
public void setupTest() {
55+
clusterConfig = AbstractTarantoolConnectorIT.makeClusterClientConfig();
56+
clusterConfig.clusterDiscoveryEntryFunction = ENTRY_FUNCTION_NAME;
57+
58+
client = new TarantoolClientImpl("localhost:" + INSTANCE_LISTEN_PORT, clusterConfig);
59+
}
60+
61+
@AfterAll
62+
public static void tearDownEnv() {
63+
control.stop(INSTANCE_NAME);
64+
control.waitStopped(INSTANCE_NAME);
65+
}
66+
67+
@Test
68+
@DisplayName("fetched list of addresses")
69+
public void testSuccessfulAddressParsing() {
70+
List<String> addresses = Arrays.asList("localhost:3311", "127.0.0.1:3301");
71+
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, addresses);
72+
control.openConsole(INSTANCE_NAME).exec(functionCode);
73+
74+
TarantoolClusterStoredFunctionDiscoverer discoverer =
75+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
76+
77+
Set<String> instances = discoverer.getInstances();
78+
79+
assertNotNull(instances);
80+
assertEquals(2, instances.size());
81+
assertTrue(instances.contains(addresses.get(0)));
82+
assertTrue(instances.contains(addresses.get(1)));
83+
}
84+
85+
@Test
86+
@DisplayName("fetched duplicated addresses")
87+
public void testSuccessfulUniqueAddressParsing() {
88+
List<String> addresses = Arrays.asList("localhost:3311", "127.0.0.1:3301", "127.0.0.2:3301", "localhost:3311");
89+
90+
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, addresses);
91+
control.openConsole(INSTANCE_NAME).exec(functionCode);
92+
93+
TarantoolClusterStoredFunctionDiscoverer discoverer =
94+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
95+
96+
Set<String> instances = discoverer.getInstances();
97+
98+
assertNotNull(instances);
99+
assertEquals(3, instances.size());
100+
assertTrue(instances.contains(addresses.get(0)));
101+
assertTrue(instances.contains(addresses.get(1)));
102+
assertTrue(instances.contains(addresses.get(3)));
103+
}
104+
105+
106+
@Test
107+
@DisplayName("fetched empty address list")
108+
public void testFunctionReturnedEmptyList() {
109+
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, Collections.emptyList());
110+
control.openConsole(INSTANCE_NAME).exec(functionCode);
111+
112+
TarantoolClusterStoredFunctionDiscoverer discoverer =
113+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
114+
115+
Set<String> instances = discoverer.getInstances();
116+
117+
assertNotNull(instances);
118+
assertTrue(instances.isEmpty());
119+
}
120+
121+
@Test
122+
@DisplayName("fetched with an exception using wrong function name")
123+
public void testWrongFunctionName() {
124+
clusterConfig.clusterDiscoveryEntryFunction = "wrongFunction";
125+
126+
TarantoolClusterStoredFunctionDiscoverer discoverer =
127+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
128+
129+
assertThrows(TarantoolException.class, discoverer::getInstances);
130+
}
131+
132+
@Test
133+
@DisplayName("fetched with an exception using a broken client")
134+
public void testWrongInstanceAddress() {
135+
clusterConfig.initTimeoutMillis = 1000;
136+
137+
client.close();
138+
TarantoolClusterStoredFunctionDiscoverer discoverer =
139+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
140+
141+
assertThrows(CommunicationException.class, discoverer::getInstances);
142+
}
143+
144+
@Test
145+
@DisplayName("fetched with an exception when wrong data type returned")
146+
public void testWrongTypeResultData() {
147+
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, 42);
148+
control.openConsole(INSTANCE_NAME).exec(functionCode);
149+
150+
TarantoolClusterStoredFunctionDiscoverer discoverer =
151+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
152+
153+
assertThrows(IllegalDiscoveryFunctionResult.class, discoverer::getInstances);
154+
}
155+
156+
@Test
157+
@DisplayName("fetched with an exception using no return function")
158+
public void testFunctionWithNoReturn() {
159+
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, "");
160+
control.openConsole(INSTANCE_NAME).exec(functionCode);
161+
162+
TarantoolClusterStoredFunctionDiscoverer discoverer =
163+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
164+
165+
assertThrows(IllegalDiscoveryFunctionResult.class, discoverer::getInstances);
166+
}
167+
168+
@Test
169+
@DisplayName("fetched first result as a list and ignored other multi-results")
170+
public void testWrongMultiResultData() {
171+
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, "{'host1'}, 'host2', 423");
172+
control.openConsole(INSTANCE_NAME).exec(functionCode);
173+
174+
TarantoolClusterStoredFunctionDiscoverer discoverer =
175+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
176+
177+
Set<String> instances = discoverer.getInstances();
178+
179+
assertNotNull(instances);
180+
assertEquals(1, instances.size());
181+
assertTrue(instances.contains("host1"));
182+
}
183+
184+
@Test
185+
@DisplayName("fetched with an exception using error-prone function")
186+
public void testFunctionWithError() {
187+
String functionCode = makeDiscoveryFunction(ENTRY_FUNCTION_NAME, "error('msg')");
188+
control.openConsole(INSTANCE_NAME).exec(functionCode);
189+
190+
TarantoolClusterStoredFunctionDiscoverer discoverer =
191+
new TarantoolClusterStoredFunctionDiscoverer(clusterConfig, client);
192+
193+
assertThrows(TarantoolException.class, discoverer::getInstances);
194+
}
195+
196+
}

0 commit comments

Comments
 (0)
Please sign in to comment.