@@ -25,14 +25,17 @@ const ignoreStreamId = 0
25
25
const (
26
26
connDisconnected = 0
27
27
connConnected = 1
28
- connClosed = 2
28
+ connShutdown = 2
29
+ connClosed = 3
29
30
)
30
31
31
32
const (
32
33
connTransportNone = ""
33
34
connTransportSsl = "ssl"
34
35
)
35
36
37
+ const shutdownEventKey = "box.shutdown"
38
+
36
39
type ConnEventKind int
37
40
type ConnLogKind int
38
41
@@ -45,6 +48,8 @@ const (
45
48
ReconnectFailed
46
49
// Either reconnect attempts exhausted, or explicit Close is called.
47
50
Closed
51
+ // Shutdown signals that shutdown callback is processing.
52
+ Shutdown
48
53
49
54
// LogReconnectFailed is logged when reconnect attempt failed.
50
55
LogReconnectFailed ConnLogKind = iota + 1
@@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134
139
// always returns array of array (array of tuples for space related methods).
135
140
// For Eval* and Call* Tarantool always returns array, but does not forces
136
141
// array of arrays.
142
+ //
143
+ // If connected to Tarantool 2.10 or newer and WatchersFeature is required,
144
+ // connection supports server graceful shutdown. In this case, server will
145
+ // wait until all client requests will be finished and client disconnects
146
+ // before going down (server also may go down by timeout). Client reconnect will
147
+ // happen if connection options enable reconnect.
148
+ //
149
+ // More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137
150
type Connection struct {
138
151
addr string
139
152
c net.Conn
140
153
mutex sync.Mutex
154
+ cond * sync.Cond
141
155
// Schema contains schema loaded on connection.
142
156
Schema * Schema
143
157
// requestId contains the last request ID for requests with nil context.
@@ -162,6 +176,11 @@ type Connection struct {
162
176
serverProtocolInfo ProtocolInfo
163
177
// watchMap is a map of key -> chan watchState.
164
178
watchMap sync.Map
179
+
180
+ // shutdownWatcher is the "box.shutdown" event watcher.
181
+ shutdownWatcher Watcher
182
+ // requestCnt is a counter of active requests.
183
+ requestCnt uint32
165
184
}
166
185
167
186
var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
385
404
conn .opts .Logger = defaultLogger {}
386
405
}
387
406
407
+ conn .cond = sync .NewCond (& conn .mutex )
408
+
388
409
if err = conn .createConnection (false ); err != nil {
389
410
ter , ok := err .(Error )
390
411
if conn .opts .Reconnect <= 0 {
@@ -421,6 +442,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
421
442
}
422
443
}
423
444
445
+ // Subscribe shutdown event to process graceful shutdown.
446
+ if conn .isWatchersRequired () {
447
+ watcher , werr := conn .NewWatcher (shutdownEventKey , shutdownEventCallback )
448
+ if werr != nil {
449
+ conn .closeConnection (werr , true )
450
+ return nil , werr
451
+ }
452
+ conn .shutdownWatcher = watcher
453
+ }
454
+
424
455
return conn , err
425
456
}
426
457
@@ -762,6 +793,11 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
762
793
if conn .state != connClosed {
763
794
close (conn .control )
764
795
atomic .StoreUint32 (& conn .state , connClosed )
796
+ // Free the resources.
797
+ if conn .shutdownWatcher != nil {
798
+ conn .shutdownWatcher .Unregister ()
799
+ conn .shutdownWatcher = nil
800
+ }
765
801
conn .notify (Closed )
766
802
}
767
803
} else {
@@ -1026,6 +1062,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
1026
1062
fut .done = nil
1027
1063
shard .rmut .Unlock ()
1028
1064
return
1065
+ case connShutdown :
1066
+ fut .err = ClientError {
1067
+ ErrConnectionShutdown ,
1068
+ "server shutdown in progress" ,
1069
+ }
1070
+ fut .ready = nil
1071
+ fut .done = nil
1072
+ shard .rmut .Unlock ()
1073
+ return
1029
1074
}
1030
1075
pos := (fut .requestId / conn .opts .Concurrency ) & (requestsMap - 1 )
1031
1076
if ctx != nil {
@@ -1086,6 +1131,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
1086
1131
if fut .ready == nil {
1087
1132
return fut
1088
1133
}
1134
+
1089
1135
if req .Ctx () != nil {
1090
1136
select {
1091
1137
case <- req .Ctx ().Done ():
@@ -1094,13 +1140,30 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
1094
1140
default :
1095
1141
}
1096
1142
}
1143
+
1144
+ if conn .shutdownWatcher != nil {
1145
+ atomic .AddUint32 (& (conn .requestCnt ), uint32 (1 ))
1146
+ go conn .gracefulWait (fut )
1147
+ }
1148
+
1097
1149
conn .putFuture (fut , req , streamId )
1150
+
1098
1151
if req .Ctx () != nil {
1099
1152
go conn .contextWatchdog (fut , req .Ctx ())
1100
1153
}
1154
+
1101
1155
return fut
1102
1156
}
1103
1157
1158
+ func (conn * Connection ) gracefulWait (fut * Future ) {
1159
+ <- fut .done
1160
+ // This is a real advice from Go documentation
1161
+ // about how to decrement atomic uint32.
1162
+ // https://pkg.go.dev/sync/atomic#AddUint32
1163
+ atomic .AddUint32 (& (conn .requestCnt ), ^ uint32 (0 ))
1164
+ conn .cond .Broadcast ()
1165
+ }
1166
+
1104
1167
func (conn * Connection ) putFuture (fut * Future , req Request , streamId uint64 ) {
1105
1168
shardn := fut .requestId & (conn .opts .Concurrency - 1 )
1106
1169
shard := & conn .shard [shardn ]
@@ -1458,6 +1521,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
1458
1521
return st , nil
1459
1522
}
1460
1523
1524
+ func (conn * Connection ) isWatchersRequired () bool {
1525
+ for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1526
+ if feature == WatchersFeature {
1527
+ return true
1528
+ }
1529
+ }
1530
+ return false
1531
+ }
1532
+
1461
1533
// NewWatcher creates a new Watcher object for the connection.
1462
1534
//
1463
1535
// You need to require WatchersFeature to use watchers, see examples for the
@@ -1496,15 +1568,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1496
1568
// asynchronous. We do not expect any response from a Tarantool instance
1497
1569
// That's why we can't just check the Tarantool response for an unsupported
1498
1570
// request error.
1499
- watchersRequired := false
1500
- for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1501
- if feature == WatchersFeature {
1502
- watchersRequired = true
1503
- break
1504
- }
1505
- }
1506
-
1507
- if ! watchersRequired {
1571
+ if ! conn .isWatchersRequired () {
1508
1572
err := fmt .Errorf ("the feature %s must be required by connection " +
1509
1573
"options to create a watcher" , WatchersFeature )
1510
1574
return nil , err
@@ -1563,7 +1627,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1563
1627
1564
1628
if state .cnt == 0 {
1565
1629
// The last one sends IPROTO_UNWATCH.
1566
- conn .Do (newUnwatchRequest (key )).Get ()
1630
+ if ! conn .ClosedNow () {
1631
+ // conn.ClosedNow() check is a workaround for calling
1632
+ // Unregister from connectionClose().
1633
+ conn .Do (newUnwatchRequest (key )).Get ()
1634
+ }
1567
1635
conn .watchMap .Delete (key )
1568
1636
close (state .unready )
1569
1637
}
@@ -1666,3 +1734,47 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
1666
1734
func (conn * Connection ) ClientProtocolInfo () ProtocolInfo {
1667
1735
return clientProtocolInfo .Clone ()
1668
1736
}
1737
+
1738
+ func shutdownEventCallback (event WatchEvent ) {
1739
+ // Receives "true" on server shutdown.
1740
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1741
+ // step 2.
1742
+ val , ok := event .Value .(bool )
1743
+ if ok && val {
1744
+ event .Conn .processShutdown ()
1745
+ }
1746
+ }
1747
+
1748
+ func (conn * Connection ) processShutdown () {
1749
+ // Forbid state changes.
1750
+ conn .mutex .Lock ()
1751
+ defer conn .mutex .Unlock ()
1752
+
1753
+ conn .state = connShutdown
1754
+ conn .notify (Shutdown )
1755
+
1756
+ for conn .state == connConnected && atomic .LoadUint32 (& (conn .requestCnt )) != 0 {
1757
+ // Use cond var on conn.mutex since request execution may
1758
+ // call reconnect(). It is ok if state changes as part of
1759
+ // reconnect since Tarantool server won't allow to reconnect
1760
+ // in the middle of shutting down.
1761
+ conn .cond .Wait ()
1762
+ }
1763
+
1764
+ // Do not unregister task explicitly here since connection teardown
1765
+ // has the same effect. To clean up connection resources,
1766
+ // unregister on full close.
1767
+
1768
+ if conn .state == connConnected {
1769
+ // Start to reconnect based on common rules, same as in net.box.
1770
+ // Reconnect also closes the connection: server waits until all
1771
+ // subscribed connections are terminated.
1772
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1773
+ // step 3.
1774
+ go conn .reconnect (
1775
+ ClientError {
1776
+ ErrConnectionClosed ,
1777
+ "connection closed after server shutdown" ,
1778
+ }, conn .c )
1779
+ }
1780
+ }
0 commit comments