Skip to content

Commit 4806ee9

Browse files
committed
Race condition in TarantoolClientImpl
- Avoid a possible race between reading, writing and reconnecting threads when a reconnection process is started. It might have happened that the lagged thread (reading or writing) could reset the state to RECONNECT after the reconnecting thread has already started and set the state to 0. As a result, all next attempts to reconnect will never happen. Now the reconnect thread holds on the state as long as it is required. - Avoid another possible race between reading and writing threads when they are started during the reconnection process. It might have happened that one of the threads crashed when it was starting and another slightly lagged thread set up its flag. It could have led that the reconnecting thread saw RECONNECT + R/W state instead of pure RECONNECT. Again, this case broke down all next reconnection attempts. Now reading and writing threads take into account whether RECONNECT state is already set or not. - Replace LockSupport with ReentrantLock.Condition for a thread to be suspended and woken up. Our cluster tests and standalone demo app show that LockSupport is not a safe memory barrier as it could be. The reconnect thread relies on a visibility guarantee between park-unpark invocations which, actually, sometimes doesn't work. Also, according to java-docs LockSupport is more like an internal component to build high-level blocking primitives. It is not recommended using this class directly. It was replaced by ReentrantLock.Condition primitive based on LockSupport but which has proper LockSupport usage inside. Fixes: #142 Affects: #34, #136
1 parent 42c98b3 commit 4806ee9

File tree

1 file changed

+90
-74
lines changed

1 file changed

+90
-74
lines changed

src/main/java/org/tarantool/TarantoolClientImpl.java

+90-74
Original file line numberDiff line numberDiff line change
@@ -62,33 +62,19 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
6262
*/
6363
protected TarantoolClientStats stats;
6464
protected StateHelper state = new StateHelper(StateHelper.RECONNECT);
65-
protected Thread reader;
66-
protected Thread writer;
67-
68-
/**
69-
* The condition variable to signal a reconnection is needed from reader /
70-
* writer threads and waiting for that signal from the reconnection thread.
71-
*
72-
* The lock variable to access this condition.
73-
*
74-
* XXX: Maybe it is better to move signalling / waiting support to the
75-
* state helper (or fsm when we'll reimplement it in that way). Maybe
76-
* additional RECONNECTION_NEEDED state is required to distinguish whether
77-
* we're in the reconnection state already or just want to reconnect (don't
78-
* sure).
79-
*
80-
* @see #awaitReconnection()
81-
* @see #trySignalForReconnection()
82-
*/
83-
protected final ReentrantLock connectorLock = new ReentrantLock();
84-
protected final Condition reconnectRequired = connectorLock.newCondition();
65+
protected volatile Thread reader;
66+
protected volatile Thread writer;
8567

8668
protected Thread connector = new Thread(new Runnable() {
8769
@Override
8870
public void run() {
8971
while (!Thread.currentThread().isInterrupted()) {
9072
reconnect(0, thumbstone);
91-
awaitReconnection();
73+
try {
74+
state.awaitReconnection();
75+
} catch (InterruptedException e) {
76+
Thread.currentThread().interrupt();
77+
}
9278
}
9379
}
9480
});
@@ -177,15 +163,6 @@ protected void connect(final SocketChannel channel) throws Exception {
177163

178164
protected void startThreads(String threadName) throws InterruptedException {
179165
final CountDownLatch init = new CountDownLatch(2);
180-
181-
if (reader != null) {
182-
reader.join(config.initTimeoutMillis / 2);
183-
}
184-
if (writer != null) {
185-
writer.join(config.initTimeoutMillis / 2);
186-
}
187-
state.release(StateHelper.RECONNECT);
188-
189166
reader = new Thread(new Runnable() {
190167
@Override
191168
public void run() {
@@ -195,7 +172,16 @@ public void run() {
195172
readThread();
196173
} finally {
197174
state.release(StateHelper.READING);
198-
trySignalForReconnection();
175+
// there're two cases when a read thread is here
176+
// 1. it's a new generation thread inside/outside
177+
// a reconnection process (currentThread == reader)
178+
// 2. It's an old generation thread inside
179+
// a reconnection process (currentThread != reader)
180+
// Skip the old gen. attempt to reconnect
181+
if (state.getState() == StateHelper.UNINITIALIZED
182+
&& Thread.currentThread() == reader) {
183+
state.trySignalForReconnection();
184+
}
199185
}
200186
}
201187
}
@@ -209,12 +195,28 @@ public void run() {
209195
writeThread();
210196
} finally {
211197
state.release(StateHelper.WRITING);
212-
trySignalForReconnection();
198+
// there're two cases when a write thread is here
199+
// 1. it's a new generation thread inside/outside
200+
// a reconnection process (currentThread == writer)
201+
// 2. It's an old generation thread inside
202+
// a reconnection process (currentThread != writer)
203+
// Skip the old gen. attempt to reconnect
204+
if (state.getState() == StateHelper.UNINITIALIZED
205+
&& Thread.currentThread() == writer) {
206+
state.trySignalForReconnection();
207+
}
213208
}
214209
}
215210
}
216211
});
217212

213+
// reconnection preparation is done
214+
// before reconnection state will be released
215+
// reader/writer threads have been replaced by new ones
216+
// it's required to be sure that old r/w threads
217+
// won't affect new r/w threads.
218+
state.release(StateHelper.RECONNECT);
219+
218220
configureThreads(threadName);
219221
reader.start();
220222
writer.start();
@@ -492,40 +494,6 @@ protected void stopIO() {
492494
closeChannel(channel);
493495
}
494496

495-
/**
496-
* Blocks until a reconnection signal will be received.
497-
*
498-
* @see #trySignalForReconnection()
499-
*/
500-
private void awaitReconnection() {
501-
connectorLock.lock();
502-
try {
503-
while (state.getState() != StateHelper.RECONNECT) {
504-
reconnectRequired.await();
505-
}
506-
} catch (InterruptedException ignored) {
507-
Thread.currentThread().interrupt();
508-
} finally {
509-
connectorLock.unlock();
510-
}
511-
}
512-
513-
/**
514-
* Signals to the connector that reconnection process can be performed.
515-
*
516-
* @see #awaitReconnection()
517-
*/
518-
private void trySignalForReconnection() {
519-
if (state.compareAndSet(StateHelper.UNINITIALIZED, StateHelper.RECONNECT)) {
520-
connectorLock.lock();
521-
try {
522-
reconnectRequired.signal();
523-
} finally {
524-
connectorLock.unlock();
525-
}
526-
}
527-
}
528-
529497
@Override
530498
public boolean isAlive() {
531499
return state.getState() == StateHelper.ALIVE && thumbstone == null;
@@ -682,6 +650,18 @@ protected final class StateHelper {
682650

683651
private final CountDownLatch closedLatch = new CountDownLatch(1);
684652

653+
/**
654+
* The condition variable to signal a reconnection is needed from reader /
655+
* writer threads and waiting for that signal from the reconnection thread.
656+
*
657+
* The lock variable to access this condition.
658+
*
659+
* @see #awaitReconnection()
660+
* @see #trySignalForReconnection()
661+
*/
662+
protected final ReentrantLock connectorLock = new ReentrantLock();
663+
protected final Condition reconnectRequired = connectorLock.newCondition();
664+
685665
protected StateHelper(int state) {
686666
this.state = new AtomicInteger(state);
687667
}
@@ -701,11 +681,7 @@ protected boolean close() {
701681
if ((st & CLOSED) == CLOSED)
702682
return false;
703683

704-
/* Drop RECONNECT, set CLOSED.
705-
*
706-
* XXX: Should not we also drop other states? Or reimplement
707-
* this class as a finite state machine?
708-
*/
684+
/* Drop RECONNECT, set CLOSED. */
709685
if (compareAndSet(st, (st & ~RECONNECT) | CLOSED))
710686
return true;
711687
}
@@ -766,10 +742,18 @@ protected boolean compareAndSet(int expect, int update) {
766742
return true;
767743
}
768744

745+
/**
746+
* Reconnection uses another way to await state via receiving a signal
747+
* instead of latches.
748+
*/
769749
protected void awaitState(int state) throws InterruptedException {
770-
CountDownLatch latch = getStateLatch(state);
771-
if (latch != null) {
772-
latch.await();
750+
if (state == RECONNECT) {
751+
awaitReconnection();
752+
} else {
753+
CountDownLatch latch = getStateLatch(state);
754+
if (latch != null) {
755+
latch.await();
756+
}
773757
}
774758
}
775759

@@ -793,5 +777,37 @@ private CountDownLatch getStateLatch(int state) {
793777
}
794778
return null;
795779
}
780+
781+
/**
782+
* Blocks until a reconnection signal will be received.
783+
*
784+
* @see #trySignalForReconnection()
785+
*/
786+
private void awaitReconnection() throws InterruptedException {
787+
connectorLock.lock();
788+
try {
789+
while (getState() != StateHelper.RECONNECT) {
790+
reconnectRequired.await();
791+
}
792+
} finally {
793+
connectorLock.unlock();
794+
}
795+
}
796+
797+
/**
798+
* Signals to the connector that reconnection process can be performed.
799+
*
800+
* @see #awaitReconnection()
801+
*/
802+
private void trySignalForReconnection() {
803+
if (compareAndSet(StateHelper.UNINITIALIZED, StateHelper.RECONNECT)) {
804+
connectorLock.lock();
805+
try {
806+
reconnectRequired.signal();
807+
} finally {
808+
connectorLock.unlock();
809+
}
810+
}
811+
}
796812
}
797813
}

0 commit comments

Comments
 (0)