Skip to content

Commit cc36724

Browse files
committed
Race condition in TarantoolClientImpl
A state of a client is a set of the following flags: {READING, WRITING, RECONNECT, CLOSED}. Let's name a state when no flags are set UNINITIALIZED. A reader thread sets READING, performs reading until an error or an interruption, drops READING and tries to trigger reconnection (if a state allows, see below). A writer do quite same things, but with the WRITING flag. The key point here is that a reconnection is triggered from a reader/writer thread and only when certain conditions are met. The prerequisite to set RECONNECT and signal (unpark) a connector thread is that a client has UNINITIALIZED state. There are several problems here: - Say, a reader stalls a bit after dropping READING, then a writer drops WRITING and trigger reconnection. Then reader wokes up and set RECONNECT again. - Calling unpark() N times for a connector thread when it is alive doesn't lead to skipping next N park() calls, so the problem above is not just about extra reconnection, but lead the connector thread to be stuck. - Say, a reader stalls just before setting READING. A writer is hit by an IO error and triggers reconnection (set RECONNECT, unpark connector). Then the reader wakes up and set READING+RECONNECT state that disallows a connector thread to proceed further (it expects pure RECONNECT). Even when the reader drops READING it will not wake up (unpark) the connector thread, because RECONNECT was already set (state is not UNINITIALIZED). This commit introduces several changes that eliminate the problems above: - Use ReentrantLock + Condition instead of park() / unpark() to never miss signals to reconnect, does not matter whether a connector is parked. - Ensure a reader and a writer threads from one generation (that are created on the same reconnection iteration) triggers reconnection once. - Hold RECONNECT state most of time a connector works (while acquiring a new socket, connecting and reading Tarantool greeting) and prevent to set READING/WRITING while RECONNECT is set. - Ensure a new reconnection iteration will start only after old reader and old writer threads exit (because we cannot receive a reconnection signal until we send it). Fixes: #142 Affects: #34, #136
1 parent acb17e4 commit cc36724

File tree

2 files changed

+134
-70
lines changed

2 files changed

+134
-70
lines changed

src/main/java/org/tarantool/TarantoolClientImpl.java

+131-66
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.concurrent.atomic.AtomicInteger;
2323
import java.util.concurrent.atomic.AtomicReference;
2424
import java.util.concurrent.locks.Condition;
25-
import java.util.concurrent.locks.LockSupport;
2625
import java.util.concurrent.locks.ReentrantLock;
2726

2827
public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {
@@ -72,10 +71,12 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
7271
@Override
7372
public void run() {
7473
while (!Thread.currentThread().isInterrupted()) {
75-
if (state.compareAndSet(StateHelper.RECONNECT, 0)) {
76-
reconnect(0, thumbstone);
74+
reconnect(0, thumbstone);
75+
try {
76+
state.awaitReconnection();
77+
} catch (InterruptedException e) {
78+
Thread.currentThread().interrupt();
7779
}
78-
LockSupport.park(state);
7980
}
8081
}
8182
});
@@ -146,14 +147,10 @@ protected void connect(final SocketChannel channel) throws Exception {
146147
TarantoolGreeting greeting = ProtoUtils.connect(channel, config.username, config.password);
147148
this.serverVersion = greeting.getServerVersion();
148149
} catch (IOException e) {
149-
try {
150-
channel.close();
151-
} catch (IOException ignored) {
152-
// No-op
153-
}
154-
150+
closeChannel(channel);
155151
throw new CommunicationException("Couldn't connect to tarantool", e);
156152
}
153+
157154
channel.configureBlocking(false);
158155
this.channel = channel;
159156
this.readChannel = new ReadableViaSelectorChannel(channel);
@@ -169,44 +166,42 @@ protected void connect(final SocketChannel channel) throws Exception {
169166
}
170167

171168
protected void startThreads(String threadName) throws InterruptedException {
172-
final CountDownLatch init = new CountDownLatch(2);
173-
reader = new Thread(new Runnable() {
174-
@Override
175-
public void run() {
176-
init.countDown();
177-
if (state.acquire(StateHelper.READING)) {
178-
try {
179-
readThread();
180-
} finally {
181-
state.release(StateHelper.READING);
182-
if (state.compareAndSet(0, StateHelper.RECONNECT)) {
183-
LockSupport.unpark(connector);
184-
}
169+
final CountDownLatch ioThreadStarted = new CountDownLatch(2);
170+
final AtomicInteger leftIoThreads = new AtomicInteger(2);
171+
reader = new Thread(() -> {
172+
ioThreadStarted.countDown();
173+
if (state.acquire(StateHelper.READING)) {
174+
try {
175+
readThread();
176+
} finally {
177+
state.release(StateHelper.READING);
178+
// only last of two IO-threads can signal for reconnection
179+
if (leftIoThreads.decrementAndGet() == 0) {
180+
state.trySignalForReconnection();
185181
}
186182
}
187183
}
188184
});
189-
writer = new Thread(new Runnable() {
190-
@Override
191-
public void run() {
192-
init.countDown();
193-
if (state.acquire(StateHelper.WRITING)) {
194-
try {
195-
writeThread();
196-
} finally {
197-
state.release(StateHelper.WRITING);
198-
if (state.compareAndSet(0, StateHelper.RECONNECT)) {
199-
LockSupport.unpark(connector);
200-
}
185+
writer = new Thread(() -> {
186+
ioThreadStarted.countDown();
187+
if (state.acquire(StateHelper.WRITING)) {
188+
try {
189+
writeThread();
190+
} finally {
191+
state.release(StateHelper.WRITING);
192+
// only last of two IO-threads can signal for reconnection
193+
if (leftIoThreads.decrementAndGet() == 0) {
194+
state.trySignalForReconnection();
201195
}
202196
}
203197
}
204198
});
199+
state.release(StateHelper.RECONNECT);
205200

206201
configureThreads(threadName);
207202
reader.start();
208203
writer.start();
209-
init.await();
204+
ioThreadStarted.await();
210205
}
211206

212207
protected void configureThreads(String threadName) {
@@ -356,25 +351,21 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
356351
}
357352

358353
protected void readThread() {
359-
try {
360-
while (!Thread.currentThread().isInterrupted()) {
361-
try {
362-
TarantoolPacket packet = ProtoUtils.readPacket(readChannel);
354+
while (!Thread.currentThread().isInterrupted()) {
355+
try {
356+
TarantoolPacket packet = ProtoUtils.readPacket(readChannel);
363357

364-
Map<Integer, Object> headers = packet.getHeaders();
358+
Map<Integer, Object> headers = packet.getHeaders();
365359

366-
Long syncId = (Long) headers.get(Key.SYNC.getId());
367-
TarantoolOp<?> future = futures.remove(syncId);
368-
stats.received++;
369-
wait.decrementAndGet();
370-
complete(packet, future);
371-
} catch (Exception e) {
372-
die("Cant read answer", e);
373-
return;
374-
}
360+
Long syncId = (Long) headers.get(Key.SYNC.getId());
361+
TarantoolOp<?> future = futures.remove(syncId);
362+
stats.received++;
363+
wait.decrementAndGet();
364+
complete(packet, future);
365+
} catch (Exception e) {
366+
die("Cant read answer", e);
367+
return;
375368
}
376-
} catch (Exception e) {
377-
die("Cant init thread", e);
378369
}
379370
}
380371

@@ -489,7 +480,7 @@ protected void stopIO() {
489480
try {
490481
readChannel.close(); // also closes this.channel
491482
} catch (IOException ignored) {
492-
// No-op
483+
// no-op
493484
}
494485
}
495486
closeChannel(channel);
@@ -639,6 +630,7 @@ public TarantoolClientStats getStats() {
639630
* Manages state changes.
640631
*/
641632
protected final class StateHelper {
633+
static final int UNINITIALIZED = 0;
642634
static final int READING = 1;
643635
static final int WRITING = 2;
644636
static final int ALIVE = READING | WRITING;
@@ -648,10 +640,22 @@ protected final class StateHelper {
648640
private final AtomicInteger state;
649641

650642
private final AtomicReference<CountDownLatch> nextAliveLatch =
651-
new AtomicReference<CountDownLatch>(new CountDownLatch(1));
643+
new AtomicReference<>(new CountDownLatch(1));
652644

653645
private final CountDownLatch closedLatch = new CountDownLatch(1);
654646

647+
/**
648+
* The condition variable to signal a reconnection is needed from reader /
649+
* writer threads and waiting for that signal from the reconnection thread.
650+
*
651+
* The lock variable to access this condition.
652+
*
653+
* @see #awaitReconnection()
654+
* @see #trySignalForReconnection()
655+
*/
656+
protected final ReentrantLock connectorLock = new ReentrantLock();
657+
protected final Condition reconnectRequired = connectorLock.newCondition();
658+
655659
protected StateHelper(int state) {
656660
this.state = new AtomicInteger(state);
657661
}
@@ -660,39 +664,60 @@ protected int getState() {
660664
return state.get();
661665
}
662666

667+
/**
668+
* Set CLOSED state, drop RECONNECT state.
669+
*/
663670
protected boolean close() {
664671
for (; ; ) {
665-
int st = getState();
666-
if ((st & CLOSED) == CLOSED) {
672+
int currentState = getState();
673+
674+
/* CLOSED is the terminal state. */
675+
if ((currentState & CLOSED) == CLOSED) {
667676
return false;
668677
}
669-
if (compareAndSet(st, (st & ~RECONNECT) | CLOSED)) {
678+
679+
/* Drop RECONNECT, set CLOSED. */
680+
if (compareAndSet(currentState, (currentState & ~RECONNECT) | CLOSED)) {
670681
return true;
671682
}
672683
}
673684
}
674685

686+
/**
687+
* Move from a current state to a give one.
688+
*
689+
* Some moves are forbidden.
690+
*/
675691
protected boolean acquire(int mask) {
676692
for (; ; ) {
677-
int st = getState();
678-
if ((st & CLOSED) == CLOSED) {
693+
int currentState = getState();
694+
695+
/* CLOSED is the terminal state. */
696+
if ((currentState & CLOSED) == CLOSED) {
697+
return false;
698+
}
699+
700+
/* Don't move to READING, WRITING or ALIVE from RECONNECT. */
701+
if ((currentState & RECONNECT) > mask) {
679702
return false;
680703
}
681704

682-
if ((st & mask) != 0) {
705+
/* Cannot move from a state to the same state. */
706+
if ((currentState & mask) != 0) {
683707
throw new IllegalStateException("State is already " + mask);
684708
}
685709

686-
if (compareAndSet(st, st | mask)) {
710+
/* Set acquired state. */
711+
if (compareAndSet(currentState, currentState | mask)) {
687712
return true;
688713
}
689714
}
690715
}
691716

692717
protected void release(int mask) {
693718
for (; ; ) {
694-
int st = getState();
695-
if (compareAndSet(st, st & ~mask)) {
719+
int currentState = getState();
720+
if (compareAndSet(currentState, currentState & ~mask)) {
696721
return;
697722
}
698723
}
@@ -713,10 +738,18 @@ protected boolean compareAndSet(int expect, int update) {
713738
return true;
714739
}
715740

741+
/**
742+
* Reconnection uses another way to await state via receiving a signal
743+
* instead of latches.
744+
*/
716745
protected void awaitState(int state) throws InterruptedException {
717-
CountDownLatch latch = getStateLatch(state);
718-
if (latch != null) {
719-
latch.await();
746+
if (state == RECONNECT) {
747+
awaitReconnection();
748+
} else {
749+
CountDownLatch latch = getStateLatch(state);
750+
if (latch != null) {
751+
latch.await();
752+
}
720753
}
721754
}
722755

@@ -740,6 +773,38 @@ private CountDownLatch getStateLatch(int state) {
740773
}
741774
return null;
742775
}
776+
777+
/**
778+
* Blocks until a reconnection signal will be received.
779+
*
780+
* @see #trySignalForReconnection()
781+
*/
782+
private void awaitReconnection() throws InterruptedException {
783+
connectorLock.lock();
784+
try {
785+
while (getState() != StateHelper.RECONNECT) {
786+
reconnectRequired.await();
787+
}
788+
} finally {
789+
connectorLock.unlock();
790+
}
791+
}
792+
793+
/**
794+
* Signals to the connector that reconnection process can be performed.
795+
*
796+
* @see #awaitReconnection()
797+
*/
798+
private void trySignalForReconnection() {
799+
if (compareAndSet(StateHelper.UNINITIALIZED, StateHelper.RECONNECT)) {
800+
connectorLock.lock();
801+
try {
802+
reconnectRequired.signal();
803+
} finally {
804+
connectorLock.unlock();
805+
}
806+
}
807+
}
743808
}
744809

745810
protected static class TarantoolOp<V> extends CompletableFuture<V> {

src/test/java/org/tarantool/ClientReconnectIT.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,8 @@ public void testLongParallelCloseReconnects() {
231231
SocketChannelProvider provider = new TestSocketChannelProvider(host,
232232
port, RESTART_TIMEOUT).setSoLinger(0);
233233

234-
final AtomicReferenceArray<TarantoolClient> clients = new AtomicReferenceArray<TarantoolClient>(numClients);
234+
final AtomicReferenceArray<TarantoolClient> clients =
235+
new AtomicReferenceArray<>(numClients);
235236

236237
for (int idx = 0; idx < clients.length(); idx++) {
237238
clients.set(idx, makeClient(provider));
@@ -249,9 +250,7 @@ public void testLongParallelCloseReconnects() {
249250
@Override
250251
public void run() {
251252
while (!Thread.currentThread().isInterrupted() && deadline > System.currentTimeMillis()) {
252-
253253
int idx = rnd.nextInt(clients.length());
254-
255254
try {
256255
TarantoolClient cli = clients.get(idx);
257256

@@ -300,7 +299,7 @@ public void run() {
300299

301300
// Wait for all threads to finish.
302301
try {
303-
assertTrue(latch.await(RESTART_TIMEOUT, TimeUnit.MILLISECONDS));
302+
assertTrue(latch.await(RESTART_TIMEOUT * 2, TimeUnit.MILLISECONDS));
304303
} catch (InterruptedException e) {
305304
fail(e);
306305
}

0 commit comments

Comments
 (0)