Skip to content

Commit 7bd3be6

Browse files
author
ivanovna
committed
add fluent client config builders
1 parent 434bd46 commit 7bd3be6

File tree

6 files changed

+401
-123
lines changed

6 files changed

+401
-123
lines changed

src/main/java/org/tarantool/TarantoolBase.java

+1
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,5 @@ public void setInitialRequestSize(int initialRequestSize) {
8383
public String getServerVersion() {
8484
return serverVersion;
8585
}
86+
8687
}
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,226 @@
11
package org.tarantool;
22

3+
public class TarantoolClientConfig {
34

4-
import java.util.concurrent.TimeUnit;
5+
private String username;
6+
private String password;
57

6-
public class TarantoolClientConfig {
8+
private int defaultRequestSize;
9+
private int predictedFutures;
710

8-
/**
9-
* username and password for authorization
10-
*/
11-
public String username;
11+
private int writerThreadPriority;
12+
private int readerThreadPriority;
13+
14+
private int sharedBufferSize;
15+
private double directWriteFactor;
1216

13-
public String password;
17+
private boolean useNewCall;
18+
19+
private long initTimeoutMillis;
20+
private long writeTimeoutMillis;
21+
22+
public static TarantoolClientConfigBuilder builder() {
23+
return new TarantoolClientConfigBuilder();
24+
}
25+
26+
protected TarantoolClientConfig(TarantoolClientConfigBuilder builder) {
27+
this.username = builder.username;
28+
this.password = builder.password;
29+
this.defaultRequestSize = builder.defaultRequestSize;
30+
this.predictedFutures = builder.predictedFutures;
31+
this.writerThreadPriority = builder.writerThreadPriority;
32+
this.readerThreadPriority = builder.readerThreadPriority;
33+
this.sharedBufferSize = builder.sharedBufferSize;
34+
this.directWriteFactor = builder.directWriteFactor;
35+
this.useNewCall = builder.useNewCall;
36+
this.initTimeoutMillis = builder.initTimeoutMillis;
37+
this.writeTimeoutMillis = builder.writeTimeoutMillis;
38+
}
1439

1540
/**
16-
* default ByteArrayOutputStream size when make query serialization
41+
* Gets an username for auth
42+
*
43+
* @return username
1744
*/
18-
public int defaultRequestSize = 4096;
45+
public String getUsername() {
46+
return username;
47+
}
1948

2049
/**
21-
* initial size for map which holds futures of sent request
50+
* Gets a password for auth
51+
*
52+
* @return password
2253
*/
23-
public int predictedFutures = (int) ((1024 * 1024) / 0.75) + 1;
54+
public String getPassword() {
55+
return password;
56+
}
2457

58+
/**
59+
* Gets a default request size when make query serialization
60+
*
61+
* @return
62+
*/
63+
public int getDefaultRequestSize() {
64+
return defaultRequestSize;
65+
}
2566

26-
public int writerThreadPriority = Thread.NORM_PRIORITY;
67+
/**
68+
* Gets an initial capacity for the map which holds futures of sent request
69+
*
70+
* @return initial capacity
71+
*/
72+
public int getPredictedFutures() {
73+
return predictedFutures;
74+
}
2775

28-
public int readerThreadPriority = Thread.NORM_PRIORITY;
76+
/**
77+
* Gets a writer thread priority
78+
*
79+
* @return thread priority
80+
*/
81+
public int getWriterThreadPriority() {
82+
return writerThreadPriority;
83+
}
2984

85+
/**
86+
* Gets a reader thread priority
87+
*
88+
* @return thread priority
89+
*/
90+
public int getReaderThreadPriority() {
91+
return readerThreadPriority;
92+
}
3093

3194
/**
32-
* shared buffer is place where client collect requests when socket is busy on write
95+
* Gets a shared buffer size (place where client collects requests
96+
* when socket is busy on write)
97+
*
98+
* @return buffer size
3399
*/
34-
public int sharedBufferSize = 8 * 1024 * 1024;
100+
public int getSharedBufferSize() {
101+
return sharedBufferSize;
102+
}
103+
35104
/**
36-
* not put request into the shared buffer if request size is ge directWriteFactor * sharedBufferSize
105+
* Gets a factor to calculate a threshold whether request will be accommodated
106+
* in the shared buffer.
107+
* if request size exceeds <code>directWriteFactor * sharedBufferSize</code>
108+
* request is sent directly.
109+
*
110+
* @return buffer factor
37111
*/
38-
public double directWriteFactor = 0.5d;
112+
public double getDirectWriteFactor() {
113+
return directWriteFactor;
114+
}
39115

40116
/**
117+
* Gets a flag whether the client will use new CALL version.
118+
*
41119
* Use old call command https://github.com/tarantool/doc/issues/54,
42120
* please ensure that you server supports new call command
121+
*
122+
* @return flag indicating CALL command version
43123
*/
44-
public boolean useNewCall = false;
124+
public boolean isUseNewCall() {
125+
return useNewCall;
126+
}
45127

46128
/**
47-
* Any blocking ops timeout
129+
* Gets an init timeout for synchronous operations
130+
*
131+
* @return init timeout
48132
*/
49-
public long initTimeoutMillis = 60*1000L;
133+
public long getInitTimeoutMillis() {
134+
return initTimeoutMillis;
135+
}
50136

51-
public long writeTimeoutMillis = 60*1000L;
137+
/**
138+
* Gets a write timeout for synchronous operations
139+
*
140+
* @return write timeout
141+
*/
142+
public long getWriteTimeoutMillis() {
143+
return writeTimeoutMillis;
144+
}
145+
146+
public static class TarantoolClientConfigBuilder<T extends TarantoolClientConfigBuilder<T>> {
147+
148+
private String username;
149+
private String password;
150+
private int defaultRequestSize = 4096;
151+
private int predictedFutures = (int) ((1024 * 1024) / 0.75) + 1;
152+
private int writerThreadPriority = Thread.NORM_PRIORITY;
153+
private int readerThreadPriority = Thread.NORM_PRIORITY;
154+
private int sharedBufferSize = 8 * 1024 * 1024;
155+
private double directWriteFactor = 0.5d;
156+
private boolean useNewCall = false;
157+
private long initTimeoutMillis = 60 * 1000L;
158+
private long writeTimeoutMillis = 60 * 1000L;
159+
160+
public T setUsername(String username) {
161+
this.username = username;
162+
return self();
163+
}
164+
165+
public T setPassword(String password) {
166+
this.password = password;
167+
return self();
168+
}
169+
170+
public T setDefaultRequestSize(int defaultRequestSize) {
171+
this.defaultRequestSize = defaultRequestSize;
172+
return self();
173+
}
174+
175+
public T setPredictedFutures(int predictedFutures) {
176+
this.predictedFutures = predictedFutures;
177+
return self();
178+
}
179+
180+
public TarantoolClientConfigBuilder setWriterThreadPriority(int writerThreadPriority) {
181+
this.writerThreadPriority = writerThreadPriority;
182+
return self();
183+
}
184+
185+
public T setReaderThreadPriority(int readerThreadPriority) {
186+
this.readerThreadPriority = readerThreadPriority;
187+
return self();
188+
}
189+
190+
public T setSharedBufferSize(int sharedBufferSize) {
191+
this.sharedBufferSize = sharedBufferSize;
192+
return self();
193+
}
194+
195+
public T setDirectWriteFactor(double directWriteFactor) {
196+
this.directWriteFactor = directWriteFactor;
197+
return self();
198+
}
199+
200+
public T setUseNewCall(boolean useNewCall) {
201+
this.useNewCall = useNewCall;
202+
return self();
203+
}
204+
205+
public T setInitTimeoutMillis(long initTimeoutMillis) {
206+
this.initTimeoutMillis = initTimeoutMillis;
207+
return self();
208+
}
209+
210+
public T setWriteTimeoutMillis(long writeTimeoutMillis) {
211+
this.writeTimeoutMillis = writeTimeoutMillis;
212+
return self();
213+
}
214+
215+
public TarantoolClientConfig build() {
216+
return new TarantoolClientConfig(this);
217+
}
218+
219+
@SuppressWarnings("unchecked")
220+
protected T self() {
221+
return (T) this;
222+
}
223+
224+
}
52225

53226
}

src/main/java/org/tarantool/TarantoolClientImpl.java

+25-21
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727

2828

2929
public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {
30+
3031
public static final CommunicationException NOT_INIT_EXCEPTION = new CommunicationException("Not connected, initializing connection");
32+
3133
protected TarantoolClientConfig config;
3234

3335
/**
@@ -82,27 +84,27 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
8284
super();
8385
this.thumbstone = NOT_INIT_EXCEPTION;
8486
this.config = config;
85-
this.initialRequestSize = config.defaultRequestSize;
87+
this.initialRequestSize = config.getDefaultRequestSize();
8688
this.socketProvider = socketProvider;
8789
this.stats = new TarantoolClientStats();
88-
this.futures = new ConcurrentHashMap<>(config.predictedFutures);
89-
this.sharedBuffer = ByteBuffer.allocateDirect(config.sharedBufferSize);
90+
this.futures = new ConcurrentHashMap<>(config.getPredictedFutures());
91+
this.sharedBuffer = ByteBuffer.allocateDirect(config.getSharedBufferSize());
9092
this.writerBuffer = ByteBuffer.allocateDirect(sharedBuffer.capacity());
9193
this.connector.setDaemon(true);
9294
this.connector.setName("Tarantool connector");
9395
this.syncOps = new SyncOps();
9496
this.composableAsyncOps = new ComposableAsyncOps();
9597
this.fireAndForgetOps = new FireAndForgetOps();
96-
if (config.useNewCall) {
98+
if (config.isUseNewCall()) {
9799
setCallCode(Code.CALL);
98100
this.syncOps.setCallCode(Code.CALL);
99101
this.fireAndForgetOps.setCallCode(Code.CALL);
100102
this.composableAsyncOps.setCallCode(Code.CALL);
101103
}
102104
connector.start();
103105
try {
104-
if (!waitAlive(config.initTimeoutMillis, TimeUnit.MILLISECONDS)) {
105-
CommunicationException e = new CommunicationException(config.initTimeoutMillis +
106+
if (!waitAlive(config.getInitTimeoutMillis(), TimeUnit.MILLISECONDS)) {
107+
CommunicationException e = new CommunicationException(config.getInitTimeoutMillis() +
106108
"ms is exceeded when waiting for client initialization. " +
107109
"You could configure init timeout in TarantoolConfig");
108110

@@ -139,7 +141,7 @@ protected void reconnect(int retry, Throwable lastError) {
139141
protected void connect(final SocketChannel channel) throws Exception {
140142
try {
141143
TarantoolInstanceConnectionMeta connectMeta = BinaryProtoUtils
142-
.connect(channel, config.username, config.password);
144+
.connect(channel, config.getUsername(), config.getPassword());
143145

144146
this.salt = connectMeta.getSalt();
145147
this.serverVersion = connectMeta.getServerVersion();
@@ -207,8 +209,8 @@ public void run() {
207209
protected void configureThreads(String threadName) {
208210
reader.setName("Tarantool " + threadName + " reader");
209211
writer.setName("Tarantool " + threadName + " writer");
210-
writer.setPriority(config.writerThreadPriority);
211-
reader.setPriority(config.readerThreadPriority);
212+
writer.setPriority(config.getWriterThreadPriority());
213+
reader.setPriority(config.getReaderThreadPriority());
212214
}
213215

214216
protected Future<?> exec(Code code, Object... args) {
@@ -282,7 +284,9 @@ protected void write(Code code, Long syncId, Long schemaId, Object... args)
282284

283285
protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, TimeoutException {
284286
long start = System.currentTimeMillis();
285-
if (bufferLock.tryLock(config.writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
287+
long writeTimeoutMillis = config.getWriteTimeoutMillis();
288+
289+
if (bufferLock.tryLock(writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
286290
try {
287291
int rem = buffer.remaining();
288292
stats.sharedMaxPacketSize = Math.max(stats.sharedMaxPacketSize, rem);
@@ -291,11 +295,11 @@ protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, Timeo
291295
}
292296
while (sharedBuffer.remaining() < buffer.limit()) {
293297
stats.sharedEmptyAwait++;
294-
long remaining = config.writeTimeoutMillis - (System.currentTimeMillis() - start);
298+
long remaining = writeTimeoutMillis - (System.currentTimeMillis() - start);
295299
try {
296300
if (remaining < 1 || !bufferEmpty.await(remaining, TimeUnit.MILLISECONDS)) {
297301
stats.sharedEmptyAwaitTimeouts++;
298-
throw new TimeoutException(config.writeTimeoutMillis + "ms is exceeded while waiting for empty buffer you could configure write timeout it in TarantoolConfig");
302+
throw new TimeoutException(writeTimeoutMillis + "ms is exceeded while waiting for empty buffer you could configure write timeout it in TarantoolConfig");
299303
}
300304
} catch (InterruptedException e) {
301305
throw new CommunicationException("Interrupted", e);
@@ -310,13 +314,13 @@ protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, Timeo
310314
}
311315
} else {
312316
stats.sharedWriteLockTimeouts++;
313-
throw new TimeoutException(config.writeTimeoutMillis + "ms is exceeded while waiting for shared buffer lock you could configure write timeout in TarantoolConfig");
317+
throw new TimeoutException(writeTimeoutMillis + "ms is exceeded while waiting for shared buffer lock you could configure write timeout in TarantoolConfig");
314318
}
315319
}
316320

317321
private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOException, TimeoutException {
318-
if (sharedBuffer.capacity() * config.directWriteFactor <= buffer.limit()) {
319-
if (writeLock.tryLock(config.writeTimeoutMillis, TimeUnit.MILLISECONDS)) {
322+
if (sharedBuffer.capacity() * config.getDirectWriteFactor() <= buffer.limit()) {
323+
if (writeLock.tryLock(config.getWriteTimeoutMillis(), TimeUnit.MILLISECONDS)) {
320324
try {
321325
int rem = buffer.remaining();
322326
stats.directMaxPacketSize = Math.max(stats.directMaxPacketSize, rem);
@@ -332,7 +336,7 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
332336
return true;
333337
} else {
334338
stats.directWriteLockTimeouts++;
335-
throw new TimeoutException(config.writeTimeoutMillis + "ms is exceeded while waiting for channel lock you could configure write timeout in TarantoolConfig");
339+
throw new TimeoutException(config.getWriteTimeoutMillis() + "ms is exceeded while waiting for channel lock you could configure write timeout in TarantoolConfig");
336340
}
337341
}
338342
return false;
@@ -621,11 +625,11 @@ public TarantoolClientStats getStats() {
621625
* Manages state changes.
622626
*/
623627
protected final class StateHelper {
624-
static final int READING = 1;
625-
static final int WRITING = 2;
626-
static final int ALIVE = READING | WRITING;
627-
static final int RECONNECT = 4;
628-
static final int CLOSED = 8;
628+
static public final int READING = 1;
629+
static public final int WRITING = 2;
630+
static public final int ALIVE = READING | WRITING;
631+
static public final int RECONNECT = 4;
632+
static public final int CLOSED = 8;
629633

630634
private final AtomicInteger state;
631635

0 commit comments

Comments
 (0)