2
2
3
3
import org .tarantool .protocol .ProtoUtils ;
4
4
import org .tarantool .protocol .ReadableViaSelectorChannel ;
5
- import org .tarantool .protocol .TarantoolPacket ;
6
5
import org .tarantool .protocol .TarantoolGreeting ;
6
+ import org .tarantool .protocol .TarantoolPacket ;
7
7
8
8
import java .io .IOException ;
9
9
import java .nio .ByteBuffer ;
22
22
import java .util .concurrent .atomic .AtomicInteger ;
23
23
import java .util .concurrent .atomic .AtomicReference ;
24
24
import java .util .concurrent .locks .Condition ;
25
- import java .util .concurrent .locks .LockSupport ;
26
25
import java .util .concurrent .locks .ReentrantLock ;
27
26
28
27
@@ -66,14 +65,15 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
66
65
protected Thread reader ;
67
66
protected Thread writer ;
68
67
68
+ protected final ReentrantLock connectorLock = new ReentrantLock ();
69
+ protected final Condition reconnectRequired = connectorLock .newCondition ();
70
+
69
71
protected Thread connector = new Thread (new Runnable () {
70
72
@ Override
71
73
public void run () {
72
74
while (!Thread .currentThread ().isInterrupted ()) {
73
- if (state .compareAndSet (StateHelper .RECONNECT , 0 )) {
74
- reconnect (0 , thumbstone );
75
- }
76
- LockSupport .park (state );
75
+ reconnect (0 , thumbstone );
76
+ awaitReconnection ();
77
77
}
78
78
}
79
79
});
@@ -139,16 +139,13 @@ protected void reconnect(int retry, Throwable lastError) {
139
139
protected void connect (final SocketChannel channel ) throws Exception {
140
140
try {
141
141
TarantoolGreeting greeting = ProtoUtils .connect (channel ,
142
- config .username , config .password );
142
+ config .username , config .password );
143
143
this .serverVersion = greeting .getServerVersion ();
144
144
} catch (IOException e ) {
145
- try {
146
- channel .close ();
147
- } catch (IOException ignored ) {
148
- }
149
-
145
+ closeChannel (channel );
150
146
throw new CommunicationException ("Couldn't connect to tarantool" , e );
151
147
}
148
+
152
149
channel .configureBlocking (false );
153
150
this .channel = channel ;
154
151
this .readChannel = new ReadableViaSelectorChannel (channel );
@@ -165,6 +162,15 @@ protected void connect(final SocketChannel channel) throws Exception {
165
162
166
163
protected void startThreads (String threadName ) throws InterruptedException {
167
164
final CountDownLatch init = new CountDownLatch (2 );
165
+
166
+ if (reader != null ) {
167
+ reader .join (config .initTimeoutMillis / 2 );
168
+ }
169
+ if (writer != null ) {
170
+ writer .join (config .initTimeoutMillis / 2 );
171
+ }
172
+ state .release (StateHelper .RECONNECT );
173
+
168
174
reader = new Thread (new Runnable () {
169
175
@ Override
170
176
public void run () {
@@ -174,8 +180,7 @@ public void run() {
174
180
readThread ();
175
181
} finally {
176
182
state .release (StateHelper .READING );
177
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
178
- LockSupport .unpark (connector );
183
+ trySignalForReconnection ();
179
184
}
180
185
}
181
186
}
@@ -189,8 +194,7 @@ public void run() {
189
194
writeThread ();
190
195
} finally {
191
196
state .release (StateHelper .WRITING );
192
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
193
- LockSupport .unpark (connector );
197
+ trySignalForReconnection ();
194
198
}
195
199
}
196
200
}
@@ -337,25 +341,21 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
337
341
}
338
342
339
343
protected void readThread () {
340
- try {
341
- while (!Thread .currentThread ().isInterrupted ()) {
342
- try {
343
- TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
344
+ while (!Thread .currentThread ().isInterrupted ()) {
345
+ try {
346
+ TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
344
347
345
- Map <Integer , Object > headers = packet .getHeaders ();
348
+ Map <Integer , Object > headers = packet .getHeaders ();
346
349
347
- Long syncId = (Long ) headers .get (Key .SYNC .getId ());
348
- CompletableFuture <?> future = futures .remove (syncId );
349
- stats .received ++;
350
- wait .decrementAndGet ();
351
- complete (packet , future );
352
- } catch (Exception e ) {
353
- die ("Cant read answer" , e );
354
- return ;
355
- }
350
+ Long syncId = (Long ) headers .get (Key .SYNC .getId ());
351
+ CompletableFuture <?> future = futures .remove (syncId );
352
+ stats .received ++;
353
+ wait .decrementAndGet ();
354
+ complete (packet , future );
355
+ } catch (Exception e ) {
356
+ die ("Cant read answer" , e );
357
+ return ;
356
358
}
357
- } catch (Exception e ) {
358
- die ("Cant init thread" , e );
359
359
}
360
360
}
361
361
@@ -414,7 +414,7 @@ protected void complete(TarantoolPacket packet, CompletableFuture<?> q) {
414
414
415
415
protected void completeSql (CompletableFuture <?> q , TarantoolPacket pack ) {
416
416
Long rowCount = SqlProtoUtils .getSqlRowCount (pack );
417
- if (rowCount != null ) {
417
+ if (rowCount != null ) {
418
418
((CompletableFuture ) q ).complete (rowCount );
419
419
} else {
420
420
List <Map <String , Object >> values = SqlProtoUtils .readSqlResult (pack );
@@ -477,6 +477,40 @@ protected void stopIO() {
477
477
closeChannel (channel );
478
478
}
479
479
480
+ /**
481
+ * Blocks until a reconnection process can be carried on
482
+ *
483
+ * @see #trySignalForReconnection()
484
+ */
485
+ private void awaitReconnection () {
486
+ connectorLock .lock ();
487
+ try {
488
+ while (state .getState () != StateHelper .RECONNECT ) {
489
+ reconnectRequired .await ();
490
+ }
491
+ } catch (InterruptedException ignored ) {
492
+ Thread .currentThread ().interrupt ();
493
+ } finally {
494
+ connectorLock .unlock ();
495
+ }
496
+ }
497
+
498
+ /**
499
+ * Signals to the connector that reconnection process can be performed
500
+ *
501
+ * @see #awaitReconnection()
502
+ */
503
+ private void trySignalForReconnection () {
504
+ if (state .compareAndSet (StateHelper .UNINITIALIZED , StateHelper .RECONNECT )) {
505
+ connectorLock .lock ();
506
+ try {
507
+ reconnectRequired .signal ();
508
+ } finally {
509
+ connectorLock .unlock ();
510
+ }
511
+ }
512
+ }
513
+
480
514
@ Override
481
515
public boolean isAlive () {
482
516
return state .getState () == StateHelper .ALIVE && thumbstone == null ;
@@ -499,7 +533,7 @@ public TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOps() {
499
533
500
534
@ Override
501
535
public TarantoolClientOps <Integer , List <?>, Object , Future <List <?>>> asyncOps () {
502
- return (TarantoolClientOps )this ;
536
+ return (TarantoolClientOps ) this ;
503
537
}
504
538
505
539
@ Override
@@ -515,7 +549,7 @@ public TarantoolClientOps<Integer, List<?>, Object, Long> fireAndForgetOps() {
515
549
516
550
@ Override
517
551
public TarantoolSQLOps <Object , Long , List <Map <String , Object >>> sqlSyncOps () {
518
- return new TarantoolSQLOps <Object , Long , List <Map <String ,Object >>>() {
552
+ return new TarantoolSQLOps <Object , Long , List <Map <String , Object >>>() {
519
553
520
554
@ Override
521
555
public Long update (String sql , Object ... bind ) {
@@ -531,7 +565,7 @@ public List<Map<String, Object>> query(String sql, Object... bind) {
531
565
532
566
@ Override
533
567
public TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>> sqlAsyncOps () {
534
- return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String ,Object >>>>() {
568
+ return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>>() {
535
569
@ Override
536
570
public Future <Long > update (String sql , Object ... bind ) {
537
571
return (Future <Long >) exec (Code .EXECUTE , Key .SQL_TEXT , sql , Key .SQL_BIND , bind );
@@ -619,6 +653,7 @@ public TarantoolClientStats getStats() {
619
653
* Manages state changes.
620
654
*/
621
655
protected final class StateHelper {
656
+ static final int UNINITIALIZED = 0 ;
622
657
static final int READING = 1 ;
623
658
static final int WRITING = 2 ;
624
659
static final int ALIVE = READING | WRITING ;
@@ -628,7 +663,7 @@ protected final class StateHelper {
628
663
private final AtomicInteger state ;
629
664
630
665
private final AtomicReference <CountDownLatch > nextAliveLatch =
631
- new AtomicReference <CountDownLatch >(new CountDownLatch (1 ));
666
+ new AtomicReference <CountDownLatch >(new CountDownLatch (1 ));
632
667
633
668
private final CountDownLatch closedLatch = new CountDownLatch (1 );
634
669
@@ -641,7 +676,7 @@ protected int getState() {
641
676
}
642
677
643
678
protected boolean close () {
644
- for (;; ) {
679
+ for (; ; ) {
645
680
int st = getState ();
646
681
if ((st & CLOSED ) == CLOSED )
647
682
return false ;
@@ -651,24 +686,29 @@ protected boolean close() {
651
686
}
652
687
653
688
protected boolean acquire (int mask ) {
654
- for (;; ) {
655
- int st = getState ();
656
- if ((st & CLOSED ) == CLOSED )
689
+ for (; ; ) {
690
+ int currentState = getState ();
691
+ if ((currentState & CLOSED ) == CLOSED ) {
657
692
return false ;
658
-
659
- if ((st & mask ) != 0 )
693
+ }
694
+ if ((currentState & RECONNECT ) > mask ) {
695
+ return false ;
696
+ }
697
+ if ((currentState & mask ) != 0 ) {
660
698
throw new IllegalStateException ("State is already " + mask );
661
-
662
- if (compareAndSet (st , st | mask ))
699
+ }
700
+ if (compareAndSet (currentState , currentState | mask )) {
663
701
return true ;
702
+ }
664
703
}
665
704
}
666
705
667
706
protected void release (int mask ) {
668
- for (;; ) {
707
+ for (; ; ) {
669
708
int st = getState ();
670
- if (compareAndSet (st , st & ~mask ))
709
+ if (compareAndSet (st , st & ~mask )) {
671
710
return ;
711
+ }
672
712
}
673
713
}
674
714
@@ -710,7 +750,7 @@ private CountDownLatch getStateLatch(int state) {
710
750
CountDownLatch latch = nextAliveLatch .get ();
711
751
/* It may happen so that an error is detected but the state is still alive.
712
752
Wait for the 'next' alive state in such cases. */
713
- return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
753
+ return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
714
754
}
715
755
return null ;
716
756
}
0 commit comments