Skip to content

Commit 6ccc607

Browse files
author
ivanovna
committed
add cluster discovery service
1 parent 7bd3be6 commit 6ccc607

11 files changed

+401
-138
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
7+
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
8+
9+
/**
10+
* Limit of retries.
11+
*/
12+
private int retriesLimit = RETRY_NO_LIMIT;
13+
14+
@Override
15+
public final SocketChannel get(int retryNumber, Throwable lastError) {
16+
if (areRetriesExhausted(retryNumber)) {
17+
throw new CommunicationException("Connection retries exceeded.", lastError);
18+
}
19+
20+
return doRetry(retryNumber, lastError);
21+
}
22+
23+
protected abstract SocketChannel doRetry(int retryNumber, Throwable lastError);
24+
25+
/**
26+
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
27+
* The retry count is maintained by a {@link #get(int, Throwable)} caller
28+
* when a socket level connection was established.
29+
*
30+
* Negative value means unlimited attempts.
31+
*
32+
* @param retriesLimit Limit of retries to use.
33+
*/
34+
public void setRetriesLimit(int retriesLimit) {
35+
this.retriesLimit = retriesLimit;
36+
}
37+
38+
/**
39+
* @return Maximum reconnect attempts to make before raising exception.
40+
*/
41+
public int getRetriesLimit() {
42+
return retriesLimit;
43+
}
44+
45+
/**
46+
* Parse a string address in the form of [host]:[port]
47+
* and builds a socket address.
48+
*
49+
* @param address Server address.
50+
* @return Socket address.
51+
*/
52+
protected InetSocketAddress parseAddress(String address) {
53+
int idx = address.indexOf(':');
54+
String host = (idx < 0) ? address : address.substring(0, idx);
55+
int port = (idx < 0) ? 3301 : Integer.parseInt(address.substring(idx + 1));
56+
return new InetSocketAddress(host, port);
57+
}
58+
59+
protected SocketChannel openChannel(InetSocketAddress socketAddress, int timeout) throws IOException {
60+
SocketChannel channel = null;
61+
try {
62+
channel = SocketChannel.open();
63+
channel.socket().connect(socketAddress, timeout);
64+
return channel;
65+
} catch (IOException e) {
66+
if (channel != null) {
67+
try {
68+
channel.close();
69+
} catch (IOException ignored) {
70+
// No-op.
71+
}
72+
}
73+
throw e;
74+
}
75+
}
76+
77+
/**
78+
* Provides a decision on whether retries limit is hit.
79+
*
80+
* @param retries Current count of retries.
81+
* @return {@code true} if retries are exhausted.
82+
*/
83+
private boolean areRetriesExhausted(int retries) {
84+
int limit = getRetriesLimit();
85+
if (limit < 0)
86+
return false;
87+
return retries >= limit;
88+
}
89+
}

src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

+66-93
Original file line numberDiff line numberDiff line change
@@ -3,62 +3,94 @@
33
import java.io.IOException;
44
import java.net.InetSocketAddress;
55
import java.nio.channels.SocketChannel;
6+
import java.util.ArrayList;
67
import java.util.Arrays;
8+
import java.util.Collection;
9+
import java.util.Collections;
10+
import java.util.List;
11+
import java.util.concurrent.atomic.AtomicInteger;
712

813
/**
914
* Basic reconnection strategy that changes addresses in a round-robin fashion.
1015
* To be used with {@link TarantoolClientImpl}.
1116
*/
12-
public class RoundRobinSocketProviderImpl implements SocketChannelProvider {
13-
/** Timeout to establish socket connection with an individual server. */
17+
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider {
18+
19+
/**
20+
* Timeout to establish socket connection with an individual server.
21+
*/
1422
private int timeout; // 0 is infinite.
15-
/** Limit of retries. */
16-
private int retriesLimit = -1; // No-limit.
17-
/** Server addresses as configured. */
18-
private final String[] addrs;
19-
/** Socket addresses. */
20-
private final InetSocketAddress[] sockAddrs;
21-
/** Current position within {@link #sockAddrs} array. */
22-
private int pos;
23+
24+
/**
25+
* Server addresses as configured.
26+
*/
27+
private final List<String> addresses = new ArrayList<>();
28+
29+
/**
30+
* Socket addresses.
31+
*/
32+
private final List<InetSocketAddress> socketAddresses = new ArrayList<>();
33+
34+
/**
35+
* Current position within {@link #socketAddresses} array.
36+
*/
37+
private AtomicInteger currentPosition = new AtomicInteger(-1);
38+
39+
/**
40+
* Lock
41+
*/
42+
// private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
2343

2444
/**
2545
* Constructs an instance.
2646
*
27-
* @param addrs Array of addresses in a form of [host]:[port].
47+
* @param addresses Array of addresses in a form of [host]:[port].
2848
*/
29-
public RoundRobinSocketProviderImpl(String... addrs) {
30-
if (addrs == null || addrs.length == 0)
31-
throw new IllegalArgumentException("addrs is null or empty.");
32-
33-
this.addrs = Arrays.copyOf(addrs, addrs.length);
49+
public RoundRobinSocketProviderImpl(String... addresses) {
50+
if (addresses == null || addresses.length == 0) {
51+
throw new IllegalArgumentException("Addresses are null or empty.");
52+
}
3453

35-
sockAddrs = new InetSocketAddress[this.addrs.length];
54+
updateAddressList(Arrays.asList(addresses));
55+
}
3656

37-
for (int i = 0; i < this.addrs.length; i++) {
38-
sockAddrs[i] = parseAddress(this.addrs[i]);
57+
private void updateAddressList(Collection<String> addresses) {
58+
String lastAddress = getLastObtainedAddress();
59+
this.addresses.clear();
60+
this.addresses.addAll(addresses);
61+
this.addresses.forEach(address -> socketAddresses.add(parseAddress(address)));
62+
if (lastAddress != null) {
63+
int recoveredPosition = this.addresses.indexOf(lastAddress);
64+
currentPosition.set(recoveredPosition);
3965
}
4066
}
4167

4268
/**
4369
* @return Configured addresses in a form of [host]:[port].
4470
*/
45-
public String[] getAddresses() {
46-
return this.addrs;
71+
public List<String> getAddresses() {
72+
return Collections.unmodifiableList(this.addresses);
73+
}
74+
75+
public String getLastObtainedAddress() {
76+
int index = currentPosition.get();
77+
return index >= 0 ? addresses.get(index) : null;
4778
}
4879

4980
/**
5081
* Sets maximum amount of time to wait for a socket connection establishment
5182
* with an individual server.
52-
*
83+
* <p>
5384
* Zero means infinite timeout.
5485
*
5586
* @param timeout Timeout value, ms.
5687
* @return {@code this}.
5788
* @throws IllegalArgumentException If timeout is negative.
5889
*/
5990
public RoundRobinSocketProviderImpl setTimeout(int timeout) {
60-
if (timeout < 0)
91+
if (timeout < 0) {
6192
throw new IllegalArgumentException("timeout is negative.");
93+
}
6294

6395
this.timeout = timeout;
6496

@@ -67,58 +99,20 @@ public RoundRobinSocketProviderImpl setTimeout(int timeout) {
6799

68100
/**
69101
* @return Maximum amount of time to wait for a socket connection establishment
70-
* with an individual server.
102+
* with an individual server.
71103
*/
72104
public int getTimeout() {
73105
return timeout;
74106
}
75107

76-
/**
77-
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
78-
* The retry count is maintained by a {@link #get(int, Throwable)} caller
79-
* when a socket level connection was established.
80-
*
81-
* Negative value means unlimited.
82-
*
83-
* @param retriesLimit Limit of retries to use.
84-
* @return {@code this}.
85-
*/
86-
public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) {
87-
this.retriesLimit = retriesLimit;
88-
89-
return this;
90-
}
91-
92-
/**
93-
* @return Maximum reconnect attempts to make before raising exception.
94-
*/
95-
public int getRetriesLimit() {
96-
return retriesLimit;
97-
}
98-
99-
/** {@inheritDoc} */
100108
@Override
101-
public SocketChannel get(int retryNumber, Throwable lastError) {
102-
if (areRetriesExhausted(retryNumber)) {
103-
throw new CommunicationException("Connection retries exceeded.", lastError);
104-
}
109+
protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
105110
int attempts = getAddressCount();
106111
long deadline = System.currentTimeMillis() + timeout * attempts;
107112
while (!Thread.currentThread().isInterrupted()) {
108-
SocketChannel channel = null;
109113
try {
110-
channel = SocketChannel.open();
111-
InetSocketAddress addr = getNextSocketAddress();
112-
channel.socket().connect(addr, timeout);
113-
return channel;
114+
return openChannel(getNextSocketAddress(), timeout);
114115
} catch (IOException e) {
115-
if (channel != null) {
116-
try {
117-
channel.close();
118-
} catch (IOException ignored) {
119-
// No-op.
120-
}
121-
}
122116
long now = System.currentTimeMillis();
123117
if (deadline <= now) {
124118
throw new CommunicationException("Connection time out.", e);
@@ -141,42 +135,21 @@ public SocketChannel get(int retryNumber, Throwable lastError) {
141135
* @return Number of configured addresses.
142136
*/
143137
protected int getAddressCount() {
144-
return sockAddrs.length;
138+
return socketAddresses.size();
145139
}
146140

147141
/**
148142
* @return Socket address to use for the next reconnection attempt.
149143
*/
150144
protected InetSocketAddress getNextSocketAddress() {
151-
InetSocketAddress res = sockAddrs[pos];
152-
pos = (pos + 1) % sockAddrs.length;
153-
return res;
154-
}
155-
156-
/**
157-
* Parse a string address in the form of [host]:[port]
158-
* and builds a socket address.
159-
*
160-
* @param addr Server address.
161-
* @return Socket address.
162-
*/
163-
protected InetSocketAddress parseAddress(String addr) {
164-
int idx = addr.indexOf(':');
165-
String host = (idx < 0) ? addr : addr.substring(0, idx);
166-
int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1));
167-
return new InetSocketAddress(host, port);
145+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
146+
return socketAddresses.get(position);
168147
}
169148

170-
/**
171-
* Provides a decision on whether retries limit is hit.
172-
*
173-
* @param retries Current count of retries.
174-
* @return {@code true} if retries are exhausted.
175-
*/
176-
private boolean areRetriesExhausted(int retries) {
177-
int limit = getRetriesLimit();
178-
if (limit < 0)
179-
return false;
180-
return retries >= limit;
149+
public void setAddresses(Collection<String> addresses) {
150+
if (addresses == null || addresses.isEmpty()) {
151+
throw new IllegalArgumentException("Addresses are null or empty.");
152+
}
153+
updateAddressList(addresses);
181154
}
182155
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
7+
/**
8+
* Simple provider that produces a single connection
9+
* To be used with {@link TarantoolClientImpl}.
10+
*/
11+
public class SingleSocketChannelProviderImpl extends BaseSocketChannelProvider {
12+
13+
private InetSocketAddress address;
14+
private int timeout;
15+
16+
/**
17+
*
18+
*
19+
* @param address instance address
20+
* @param timeout time in millis
21+
*/
22+
public SingleSocketChannelProviderImpl(String address, int timeout) {
23+
this.address = parseAddress(address);
24+
this.timeout = timeout;
25+
}
26+
27+
@Override
28+
protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
29+
long deadline = System.currentTimeMillis();
30+
try {
31+
return openChannel(address, timeout);
32+
} catch (IOException e) {
33+
if (deadline <= System.currentTimeMillis()) {
34+
throw new CommunicationException("Connection time out.", e);
35+
}
36+
throw new CommunicationException("Connection error.", e);
37+
}
38+
}
39+
40+
}

src/main/java/org/tarantool/SocketChannelProvider.java

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import java.nio.channels.SocketChannel;
55

66
public interface SocketChannelProvider {
7+
8+
int RETRY_NO_LIMIT = -1;
9+
710
/**
811
* Provides socket channel to init restore connection.
912
* You could change hosts on fail and sleep between retries in this method

0 commit comments

Comments
 (0)