@@ -25,14 +25,17 @@ const ignoreStreamId = 0
25
25
const (
26
26
connDisconnected = 0
27
27
connConnected = 1
28
- connClosed = 2
28
+ connShutdown = 2
29
+ connClosed = 3
29
30
)
30
31
31
32
const (
32
33
connTransportNone = ""
33
34
connTransportSsl = "ssl"
34
35
)
35
36
37
+ const shutdownEventKey = "box.shutdown"
38
+
36
39
type ConnEventKind int
37
40
type ConnLogKind int
38
41
@@ -45,6 +48,8 @@ const (
45
48
ReconnectFailed
46
49
// Either reconnect attempts exhausted, or explicit Close is called.
47
50
Closed
51
+ // Shutdown signals that shutdown callback is processing.
52
+ Shutdown
48
53
49
54
// LogReconnectFailed is logged when reconnect attempt failed.
50
55
LogReconnectFailed ConnLogKind = iota + 1
@@ -134,10 +139,20 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134
139
// always returns array of array (array of tuples for space related methods).
135
140
// For Eval* and Call* Tarantool always returns array, but does not forces
136
141
// array of arrays.
142
+ //
143
+ // If connected to Tarantool 2.10 or newer and WatchersFeature is required,
144
+ // connection supports server graceful shutdown. In this case, server will
145
+ // wait until all client requests will be finished and client disconnects
146
+ // before going down (server also may go down by timeout). Client reconnect will
147
+ // happen if connection options enable reconnect. Beware that graceful shutdown
148
+ // event initialization is asynchronous.
149
+ //
150
+ // More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137
151
type Connection struct {
138
152
addr string
139
153
c net.Conn
140
154
mutex sync.Mutex
155
+ cond * sync.Cond
141
156
// Schema contains schema loaded on connection.
142
157
Schema * Schema
143
158
// requestId contains the last request ID for requests with nil context.
@@ -162,6 +177,11 @@ type Connection struct {
162
177
serverProtocolInfo ProtocolInfo
163
178
// watchMap is a map of key -> chan watchState.
164
179
watchMap sync.Map
180
+
181
+ // shutdownWatcher is the "box.shutdown" event watcher.
182
+ shutdownWatcher Watcher
183
+ // requestCnt is a counter of active requests.
184
+ requestCnt int64
165
185
}
166
186
167
187
var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -385,6 +405,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
385
405
conn .opts .Logger = defaultLogger {}
386
406
}
387
407
408
+ conn .cond = sync .NewCond (& conn .mutex )
409
+
388
410
if err = conn .createConnection (false ); err != nil {
389
411
ter , ok := err .(Error )
390
412
if conn .opts .Reconnect <= 0 {
@@ -421,6 +443,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
421
443
}
422
444
}
423
445
446
+ // Subscribe shutdown event to process graceful shutdown.
447
+ if conn .isWatchersRequired () {
448
+ watcher , werr := conn .NewWatcher (shutdownEventKey , shutdownEventCallback )
449
+ if werr != nil {
450
+ conn .closeConnection (werr , true )
451
+ return nil , werr
452
+ }
453
+ conn .shutdownWatcher = watcher
454
+ }
455
+
424
456
return conn , err
425
457
}
426
458
@@ -589,6 +621,7 @@ func (conn *Connection) dial() (err error) {
589
621
conn .lockShards ()
590
622
conn .c = connection
591
623
atomic .StoreUint32 (& conn .state , connConnected )
624
+ conn .cond .Broadcast ()
592
625
conn .unlockShards ()
593
626
go conn .writer (w , connection )
594
627
go conn .reader (r , connection )
@@ -762,10 +795,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
762
795
if conn .state != connClosed {
763
796
close (conn .control )
764
797
atomic .StoreUint32 (& conn .state , connClosed )
798
+ conn .cond .Broadcast ()
799
+ // Free the resources.
800
+ if conn .shutdownWatcher != nil {
801
+ go conn .shutdownWatcher .Unregister ()
802
+ conn .shutdownWatcher = nil
803
+ }
765
804
conn .notify (Closed )
766
805
}
767
806
} else {
768
807
atomic .StoreUint32 (& conn .state , connDisconnected )
808
+ conn .cond .Broadcast ()
769
809
conn .notify (Disconnected )
770
810
}
771
811
if conn .c != nil {
@@ -784,9 +824,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
784
824
return
785
825
}
786
826
787
- func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
788
- conn .mutex .Lock ()
789
- defer conn .mutex .Unlock ()
827
+ func (conn * Connection ) reconnectImpl (neterr error , c net.Conn ) {
790
828
if conn .opts .Reconnect > 0 {
791
829
if c == conn .c {
792
830
conn .closeConnection (neterr , false )
@@ -799,6 +837,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
799
837
}
800
838
}
801
839
840
+ func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
841
+ conn .mutex .Lock ()
842
+ defer conn .mutex .Unlock ()
843
+ conn .reconnectImpl (neterr , c )
844
+ conn .cond .Broadcast ()
845
+ }
846
+
802
847
func (conn * Connection ) lockShards () {
803
848
for i := range conn .shard {
804
849
conn .shard [i ].rmut .Lock ()
@@ -1026,6 +1071,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
1026
1071
fut .done = nil
1027
1072
shard .rmut .Unlock ()
1028
1073
return
1074
+ case connShutdown :
1075
+ fut .err = ClientError {
1076
+ ErrConnectionShutdown ,
1077
+ "server shutdown in progress" ,
1078
+ }
1079
+ fut .ready = nil
1080
+ fut .done = nil
1081
+ shard .rmut .Unlock ()
1082
+ return
1029
1083
}
1030
1084
pos := (fut .requestId / conn .opts .Concurrency ) & (requestsMap - 1 )
1031
1085
if ctx != nil {
@@ -1086,6 +1140,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
1086
1140
if fut .ready == nil {
1087
1141
return fut
1088
1142
}
1143
+
1089
1144
if req .Ctx () != nil {
1090
1145
select {
1091
1146
case <- req .Ctx ().Done ():
@@ -1094,10 +1149,15 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
1094
1149
default :
1095
1150
}
1096
1151
}
1152
+
1153
+ atomic .AddInt64 (& (conn .requestCnt ), int64 (1 ))
1154
+
1097
1155
conn .putFuture (fut , req , streamId )
1156
+
1098
1157
if req .Ctx () != nil {
1099
1158
go conn .contextWatchdog (fut , req .Ctx ())
1100
1159
}
1160
+
1101
1161
return fut
1102
1162
}
1103
1163
@@ -1164,6 +1224,10 @@ func (conn *Connection) markDone(fut *Future) {
1164
1224
if conn .rlimit != nil {
1165
1225
<- conn .rlimit
1166
1226
}
1227
+
1228
+ if atomic .AddInt64 (& (conn .requestCnt ), int64 (- 1 )) == 0 {
1229
+ conn .cond .Broadcast ()
1230
+ }
1167
1231
}
1168
1232
1169
1233
func (conn * Connection ) peekFuture (reqid uint32 ) (fut * Future ) {
@@ -1458,6 +1522,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
1458
1522
return st , nil
1459
1523
}
1460
1524
1525
+ func (conn * Connection ) isWatchersRequired () bool {
1526
+ for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1527
+ if feature == WatchersFeature {
1528
+ return true
1529
+ }
1530
+ }
1531
+ return false
1532
+ }
1533
+
1461
1534
// NewWatcher creates a new Watcher object for the connection.
1462
1535
//
1463
1536
// You need to require WatchersFeature to use watchers, see examples for the
@@ -1496,15 +1569,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1496
1569
// asynchronous. We do not expect any response from a Tarantool instance
1497
1570
// That's why we can't just check the Tarantool response for an unsupported
1498
1571
// request error.
1499
- watchersRequired := false
1500
- for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1501
- if feature == WatchersFeature {
1502
- watchersRequired = true
1503
- break
1504
- }
1505
- }
1506
-
1507
- if ! watchersRequired {
1572
+ if ! conn .isWatchersRequired () {
1508
1573
err := fmt .Errorf ("the feature %s must be required by connection " +
1509
1574
"options to create a watcher" , WatchersFeature )
1510
1575
return nil , err
@@ -1563,7 +1628,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1563
1628
1564
1629
if state .cnt == 0 {
1565
1630
// The last one sends IPROTO_UNWATCH.
1566
- conn .Do (newUnwatchRequest (key )).Get ()
1631
+ if ! conn .ClosedNow () {
1632
+ // conn.ClosedNow() check is a workaround for calling
1633
+ // Unregister from connectionClose().
1634
+ conn .Do (newUnwatchRequest (key )).Get ()
1635
+ }
1567
1636
conn .watchMap .Delete (key )
1568
1637
close (state .unready )
1569
1638
}
@@ -1666,3 +1735,52 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
1666
1735
func (conn * Connection ) ClientProtocolInfo () ProtocolInfo {
1667
1736
return clientProtocolInfo .Clone ()
1668
1737
}
1738
+
1739
+ func shutdownEventCallback (event WatchEvent ) {
1740
+ // Receives "true" on server shutdown.
1741
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1742
+ // step 2.
1743
+ val , ok := event .Value .(bool )
1744
+ if ok && val {
1745
+ go event .Conn .processShutdown ()
1746
+ }
1747
+ }
1748
+
1749
+ func (conn * Connection ) processShutdown () {
1750
+ // Forbid state changes.
1751
+ conn .mutex .Lock ()
1752
+ defer conn .mutex .Unlock ()
1753
+
1754
+ if ! atomic .CompareAndSwapUint32 (& (conn .state ), connConnected , connShutdown ) {
1755
+ return
1756
+ }
1757
+ conn .notify (Shutdown )
1758
+
1759
+ c := conn .c
1760
+ for (atomic .LoadUint32 (& (conn .state )) == connShutdown ) &&
1761
+ (atomic .LoadInt64 (& (conn .requestCnt )) != 0 ) &&
1762
+ (c == conn .c ) {
1763
+ // Use cond var on conn.mutex since request execution may
1764
+ // call reconnect(). It is ok if state changes as part of
1765
+ // reconnect since Tarantool server won't allow to reconnect
1766
+ // in the middle of shutting down.
1767
+ conn .cond .Wait ()
1768
+ }
1769
+ // Do not unregister task explicitly here since connection teardown
1770
+ // has the same effect. To clean up connection resources,
1771
+ // unregister on full close.
1772
+
1773
+ if (atomic .LoadUint32 (& (conn .state )) == connShutdown ) &&
1774
+ (c == conn .c ) {
1775
+ // Start to reconnect based on common rules, same as in net.box.
1776
+ // Reconnect also closes the connection: server waits until all
1777
+ // subscribed connections are terminated.
1778
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1779
+ // step 3.
1780
+ conn .reconnectImpl (
1781
+ ClientError {
1782
+ ErrConnectionClosed ,
1783
+ "connection closed after server shutdown" ,
1784
+ }, conn .c )
1785
+ }
1786
+ }
0 commit comments