Skip to content

Commit b7f43ac

Browse files
committed
Add support for tarantool clusters
Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Fix a regression in TarantoolClientImpl. It is a wrong comparison between response result code and original request operation code. To perform a right thing TarantoolOp class was created to wrap an original future (see TarantoolClientImpl.complete(packet, feature)). Closes: 34
1 parent 9b3f91f commit b7f43ac

22 files changed

+1364
-305
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
7+
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
8+
9+
/**
10+
* Limit of retries.
11+
*/
12+
private int retriesLimit = RETRY_NO_LIMIT;
13+
14+
/**
15+
* Timeout to establish socket connection with an individual server.
16+
*/
17+
private int timeout = NO_TIMEOUT;
18+
19+
@Override
20+
public final SocketChannel get(int retryNumber, Throwable lastError) {
21+
if (areRetriesExhausted(retryNumber)) {
22+
throw new CommunicationException("Connection retries exceeded.", lastError);
23+
}
24+
25+
return doRetry(retryNumber, lastError);
26+
}
27+
28+
protected abstract SocketChannel doRetry(int retryNumber, Throwable lastError);
29+
30+
/**
31+
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
32+
* The retry count is maintained by a {@link #get(int, Throwable)} caller
33+
* when a socket level connection was established.
34+
*
35+
* Negative value means unlimited attempts.
36+
*
37+
* @param retriesLimit Limit of retries to use.
38+
*/
39+
public void setRetriesLimit(int retriesLimit) {
40+
this.retriesLimit = retriesLimit;
41+
}
42+
43+
/**
44+
* @return Maximum reconnect attempts to make before raising exception.
45+
*/
46+
public int getRetriesLimit() {
47+
return retriesLimit;
48+
}
49+
50+
/**
51+
* Parse a string address in the form of [host]:[port]
52+
* and builds a socket address.
53+
*
54+
* @param address Server address.
55+
* @return Socket address.
56+
*/
57+
protected InetSocketAddress parseAddress(String address) {
58+
int idx = address.indexOf(':');
59+
String host = (idx < 0) ? address : address.substring(0, idx);
60+
int port = (idx < 0) ? 3301 : Integer.parseInt(address.substring(idx + 1));
61+
return new InetSocketAddress(host, port);
62+
}
63+
64+
protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException {
65+
SocketChannel channel = null;
66+
try {
67+
channel = SocketChannel.open();
68+
channel.socket().connect(socketAddress, timeout);
69+
return channel;
70+
} catch (IOException e) {
71+
if (channel != null) {
72+
try {
73+
channel.close();
74+
} catch (IOException ignored) {
75+
// No-op.
76+
}
77+
}
78+
throw e;
79+
}
80+
}
81+
82+
/**
83+
* Sets maximum amount of time to wait for a socket connection establishment
84+
* with an individual server.
85+
* <p>
86+
* Zero means infinite timeout.
87+
*
88+
* @param timeout timeout value, ms.
89+
* @throws IllegalArgumentException if timeout is negative.
90+
*/
91+
public void setTimeout(int timeout) {
92+
if (timeout < 0) {
93+
throw new IllegalArgumentException("timeout is negative.");
94+
}
95+
this.timeout = timeout;
96+
}
97+
98+
/**
99+
* @return Maximum amount of time to wait for a socket connection establishment
100+
* with an individual server.
101+
*/
102+
public int getTimeout() {
103+
return timeout;
104+
}
105+
106+
/**
107+
* Provides a decision on whether retries limit is hit.
108+
*
109+
* @param retries Current count of retries.
110+
* @return {@code true} if retries are exhausted.
111+
*/
112+
private boolean areRetriesExhausted(int retries) {
113+
int limit = getRetriesLimit();
114+
if (limit < 0) {
115+
return false;
116+
}
117+
return retries >= limit;
118+
}
119+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

+86-112
Original file line numberDiff line numberDiff line change
@@ -2,123 +2,106 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5+
import java.net.SocketAddress;
56
import java.nio.channels.SocketChannel;
7+
import java.util.ArrayList;
68
import java.util.Arrays;
9+
import java.util.Collection;
10+
import java.util.Collections;
11+
import java.util.List;
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReadWriteLock;
15+
import java.util.concurrent.locks.ReentrantReadWriteLock;
16+
import java.util.stream.Collectors;
717

818
/**
919
* Basic reconnection strategy that changes addresses in a round-robin fashion.
1020
* To be used with {@link TarantoolClientImpl}.
1121
*/
12-
public class RoundRobinSocketProviderImpl implements SocketChannelProvider {
13-
/** Timeout to establish socket connection with an individual server. */
14-
private int timeout; // 0 is infinite.
15-
/** Limit of retries. */
16-
private int retriesLimit = -1; // No-limit.
17-
/** Server addresses as configured. */
18-
private final String[] addrs;
19-
/** Socket addresses. */
20-
private final InetSocketAddress[] sockAddrs;
21-
/** Current position within {@link #sockAddrs} array. */
22-
private int pos;
22+
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {
2323

2424
/**
25-
* Constructs an instance.
26-
*
27-
* @param addrs Array of addresses in a form of [host]:[port].
25+
* Socket addresses.
2826
*/
29-
public RoundRobinSocketProviderImpl(String... addrs) {
30-
if (addrs == null || addrs.length == 0)
31-
throw new IllegalArgumentException("addrs is null or empty.");
32-
33-
this.addrs = Arrays.copyOf(addrs, addrs.length);
27+
private final List<InetSocketAddress> socketAddresses = new ArrayList<>();
3428

35-
sockAddrs = new InetSocketAddress[this.addrs.length];
36-
37-
for (int i = 0; i < this.addrs.length; i++) {
38-
sockAddrs[i] = parseAddress(this.addrs[i]);
39-
}
40-
}
29+
/**
30+
* Current position within {@link #socketAddresses} array.
31+
*/
32+
private AtomicInteger currentPosition = new AtomicInteger(-1);
4133

4234
/**
43-
* @return Configured addresses in a form of [host]:[port].
35+
* Lock
4436
*/
45-
public String[] getAddresses() {
46-
return this.addrs;
47-
}
37+
private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
4838

4939
/**
50-
* Sets maximum amount of time to wait for a socket connection establishment
51-
* with an individual server.
52-
*
53-
* Zero means infinite timeout.
40+
* Constructs an instance.
5441
*
55-
* @param timeout Timeout value, ms.
56-
* @return {@code this}.
57-
* @throws IllegalArgumentException If timeout is negative.
42+
* @param addresses Array of addresses in a form of [host]:[port].
5843
*/
59-
public RoundRobinSocketProviderImpl setTimeout(int timeout) {
60-
if (timeout < 0)
61-
throw new IllegalArgumentException("timeout is negative.");
62-
63-
this.timeout = timeout;
44+
public RoundRobinSocketProviderImpl(String... addresses) {
45+
if (addresses.length == 0) {
46+
throw new IllegalArgumentException("Addresses list must contain at least one address.");
47+
}
6448

65-
return this;
49+
updateAddressList(Arrays.asList(addresses));
6650
}
6751

68-
/**
69-
* @return Maximum amount of time to wait for a socket connection establishment
70-
* with an individual server.
71-
*/
72-
public int getTimeout() {
73-
return timeout;
52+
private void updateAddressList(Collection<String> addresses) {
53+
Lock writeLock = addressListLock.writeLock();
54+
writeLock.lock();
55+
try {
56+
InetSocketAddress lastAddress = getLastObtainedAddress();
57+
socketAddresses.clear();
58+
addresses.stream()
59+
.map(this::parseAddress)
60+
.collect(Collectors.toCollection(() -> socketAddresses));
61+
if (lastAddress != null) {
62+
int recoveredPosition = socketAddresses.indexOf(lastAddress);
63+
currentPosition.set(recoveredPosition);
64+
} else {
65+
currentPosition.set(-1);
66+
}
67+
} finally {
68+
writeLock.unlock();
69+
}
7470
}
7571

7672
/**
77-
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
78-
* The retry count is maintained by a {@link #get(int, Throwable)} caller
79-
* when a socket level connection was established.
80-
*
81-
* Negative value means unlimited.
82-
*
83-
* @param retriesLimit Limit of retries to use.
84-
* @return {@code this}.
73+
* @return resolved socket addresses
8574
*/
86-
public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) {
87-
this.retriesLimit = retriesLimit;
88-
89-
return this;
75+
public List<SocketAddress> getAddresses() {
76+
Lock readLock = addressListLock.readLock();
77+
readLock.lock();
78+
try {
79+
return Collections.unmodifiableList(this.socketAddresses);
80+
} finally {
81+
readLock.unlock();
82+
}
9083
}
9184

92-
/**
93-
* @return Maximum reconnect attempts to make before raising exception.
94-
*/
95-
public int getRetriesLimit() {
96-
return retriesLimit;
85+
protected InetSocketAddress getLastObtainedAddress() {
86+
Lock readLock = addressListLock.readLock();
87+
readLock.lock();
88+
try {
89+
int index = currentPosition.get();
90+
return index >= 0 ? socketAddresses.get(index) : null;
91+
} finally {
92+
readLock.unlock();
93+
}
9794
}
9895

99-
/** {@inheritDoc} */
10096
@Override
101-
public SocketChannel get(int retryNumber, Throwable lastError) {
102-
if (areRetriesExhausted(retryNumber)) {
103-
throw new CommunicationException("Connection retries exceeded.", lastError);
104-
}
97+
protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
10598
int attempts = getAddressCount();
106-
long deadline = System.currentTimeMillis() + timeout * attempts;
99+
// todo: recalc deadline?
100+
long deadline = System.currentTimeMillis() + getTimeout() * attempts;
107101
while (!Thread.currentThread().isInterrupted()) {
108-
SocketChannel channel = null;
109102
try {
110-
channel = SocketChannel.open();
111-
InetSocketAddress addr = getNextSocketAddress();
112-
channel.socket().connect(addr, timeout);
113-
return channel;
103+
return openChannel(getNextSocketAddress());
114104
} catch (IOException e) {
115-
if (channel != null) {
116-
try {
117-
channel.close();
118-
} catch (IOException ignored) {
119-
// No-op.
120-
}
121-
}
122105
long now = System.currentTimeMillis();
123106
if (deadline <= now) {
124107
throw new CommunicationException("Connection time out.", e);
@@ -141,42 +124,33 @@ public SocketChannel get(int retryNumber, Throwable lastError) {
141124
* @return Number of configured addresses.
142125
*/
143126
protected int getAddressCount() {
144-
return sockAddrs.length;
127+
Lock readLock = addressListLock.readLock();
128+
readLock.lock();
129+
try {
130+
return socketAddresses.size();
131+
} finally {
132+
readLock.unlock();
133+
}
145134
}
146135

147136
/**
148137
* @return Socket address to use for the next reconnection attempt.
149138
*/
150139
protected InetSocketAddress getNextSocketAddress() {
151-
InetSocketAddress res = sockAddrs[pos];
152-
pos = (pos + 1) % sockAddrs.length;
153-
return res;
154-
}
155-
156-
/**
157-
* Parse a string address in the form of [host]:[port]
158-
* and builds a socket address.
159-
*
160-
* @param addr Server address.
161-
* @return Socket address.
162-
*/
163-
protected InetSocketAddress parseAddress(String addr) {
164-
int idx = addr.indexOf(':');
165-
String host = (idx < 0) ? addr : addr.substring(0, idx);
166-
int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1));
167-
return new InetSocketAddress(host, port);
140+
Lock readLock = addressListLock.readLock();
141+
readLock.lock();
142+
try {
143+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
144+
return socketAddresses.get(position);
145+
} finally {
146+
readLock.unlock();
147+
}
168148
}
169149

170-
/**
171-
* Provides a decision on whether retries limit is hit.
172-
*
173-
* @param retries Current count of retries.
174-
* @return {@code true} if retries are exhausted.
175-
*/
176-
private boolean areRetriesExhausted(int retries) {
177-
int limit = getRetriesLimit();
178-
if (limit < 0)
179-
return false;
180-
return retries >= limit;
150+
public void refreshAddresses(Collection<String> addresses) {
151+
if (addresses == null || addresses.isEmpty()) {
152+
throw new IllegalArgumentException("Addresses are null or empty.");
153+
}
154+
updateAddressList(addresses);
181155
}
182156
}

0 commit comments

Comments
 (0)