@@ -33,6 +33,13 @@ const (
33
33
connTransportSsl = "ssl"
34
34
)
35
35
36
+ const (
37
+ shutdownNotInProgress = 0
38
+ shutdownInProgress = 1
39
+ )
40
+
41
+ const shutdownEventKey = "box.shutdown"
42
+
36
43
type ConnEventKind int
37
44
type ConnLogKind int
38
45
@@ -134,6 +141,14 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134
141
// always returns array of array (array of tuples for space related methods).
135
142
// For Eval* and Call* Tarantool always returns array, but does not forces
136
143
// array of arrays.
144
+ //
145
+ // If connected to Tarantool 2.10 or newer and WatchersFeature is required,
146
+ // connection supports server graceful shutdown. In this case, server will
147
+ // wait until all client requests will be finished and client disconnects
148
+ // before going down (server also may go down by timeout). Client reconnect will
149
+ // happen if connection options enable reconnect.
150
+ //
151
+ // More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137
152
type Connection struct {
138
153
addr string
139
154
c net.Conn
@@ -162,6 +177,13 @@ type Connection struct {
162
177
serverProtocolInfo ProtocolInfo
163
178
// watchMap is a map of key -> chan watchState.
164
179
watchMap sync.Map
180
+
181
+ // shutdownInProgress defined whether shutdown now in progress.
182
+ shutdownInProgress uint32
183
+ // shutdownWatcher is the "box.shutdown" event watcher.
184
+ shutdownWatcher Watcher
185
+ // shutdownWg is the wait group to finish all tasks on shutdown.
186
+ shutdownWg sync.WaitGroup
165
187
}
166
188
167
189
var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -373,6 +395,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
373
395
}
374
396
}
375
397
}
398
+ conn .shutdownInProgress = shutdownNotInProgress
376
399
377
400
if conn .opts .RateLimit > 0 {
378
401
conn .rlimit = make (chan struct {}, conn .opts .RateLimit )
@@ -421,6 +444,16 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
421
444
}
422
445
}
423
446
447
+ // Subscribe shutdown event to process graceful shutdown.
448
+ if conn .isWatchersRequired () {
449
+ watcher , werr := conn .NewWatcher (shutdownEventKey , shutdownEventCallback )
450
+ if werr != nil {
451
+ conn .closeConnection (werr , true )
452
+ return nil , werr
453
+ }
454
+ conn .shutdownWatcher = watcher
455
+ }
456
+
424
457
return conn , err
425
458
}
426
459
@@ -1086,6 +1119,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
1086
1119
if fut .ready == nil {
1087
1120
return fut
1088
1121
}
1122
+
1089
1123
if req .Ctx () != nil {
1090
1124
select {
1091
1125
case <- req .Ctx ().Done ():
@@ -1094,13 +1128,31 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
1094
1128
default :
1095
1129
}
1096
1130
}
1131
+
1132
+ if atomic .LoadUint32 (& (conn .shutdownInProgress )) == shutdownInProgress {
1133
+ conn .cancelFuture (fut , fmt .Errorf ("server shutdown in progress" ))
1134
+ return fut
1135
+ }
1136
+
1097
1137
conn .putFuture (fut , req , streamId )
1138
+
1098
1139
if req .Ctx () != nil {
1099
1140
go conn .contextWatchdog (fut , req .Ctx ())
1100
1141
}
1142
+
1143
+ if conn .shutdownWatcher != nil {
1144
+ go conn .gracefulWait (fut )
1145
+ }
1146
+
1101
1147
return fut
1102
1148
}
1103
1149
1150
+ func (conn * Connection ) gracefulWait (fut * Future ) {
1151
+ conn .shutdownWg .Add (1 )
1152
+ <- fut .done
1153
+ conn .shutdownWg .Done ()
1154
+ }
1155
+
1104
1156
func (conn * Connection ) putFuture (fut * Future , req Request , streamId uint64 ) {
1105
1157
shardn := fut .requestId & (conn .opts .Concurrency - 1 )
1106
1158
shard := & conn .shard [shardn ]
@@ -1458,6 +1510,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
1458
1510
return st , nil
1459
1511
}
1460
1512
1513
+ func (conn * Connection ) isWatchersRequired () bool {
1514
+ for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1515
+ if feature == WatchersFeature {
1516
+ return true
1517
+ }
1518
+ }
1519
+ return false
1520
+ }
1521
+
1461
1522
// NewWatcher creates a new Watcher object for the connection.
1462
1523
//
1463
1524
// You need to require WatchersFeature to use watchers, see examples for the
@@ -1496,15 +1557,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1496
1557
// asynchronous. We do not expect any response from a Tarantool instance
1497
1558
// That's why we can't just check the Tarantool response for an unsupported
1498
1559
// request error.
1499
- watchersRequired := false
1500
- for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1501
- if feature == WatchersFeature {
1502
- watchersRequired = true
1503
- break
1504
- }
1505
- }
1506
-
1507
- if ! watchersRequired {
1560
+ if ! conn .isWatchersRequired () {
1508
1561
err := fmt .Errorf ("the feature %s must be required by connection " +
1509
1562
"options to create a watcher" , WatchersFeature )
1510
1563
return nil , err
@@ -1666,3 +1719,43 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
1666
1719
func (conn * Connection ) ClientProtocolInfo () ProtocolInfo {
1667
1720
return clientProtocolInfo .Clone ()
1668
1721
}
1722
+
1723
+ func shutdownEventCallback (event WatchEvent ) {
1724
+ // Receives "true" on server shutdown.
1725
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1726
+ // step 2.
1727
+ val , ok := event .Value .(bool )
1728
+ if ok && val {
1729
+ // defer cause otherwise we'll block ourselves on ack wait.
1730
+ defer event .Conn .processShutdown ()
1731
+ }
1732
+ }
1733
+
1734
+ func (conn * Connection ) processShutdown () {
1735
+ // Forbid starting new tasks.
1736
+ atomic .StoreUint32 (& (conn .shutdownInProgress ), shutdownInProgress )
1737
+
1738
+ // After finish, allow starting new tasks, they will fail with
1739
+ // "not connected" instead.
1740
+ defer atomic .StoreUint32 (& (conn .shutdownInProgress ), shutdownNotInProgress )
1741
+
1742
+ // Wait for tasks to finish.
1743
+ conn .shutdownWg .Wait ()
1744
+
1745
+ // Do not unregister task explicitly since connection teardown
1746
+ // has the same effect.
1747
+
1748
+ if ! conn .ClosedNow () {
1749
+ err := ClientError {
1750
+ ErrConnectionClosed ,
1751
+ "connection closed after server shutdown" ,
1752
+ }
1753
+
1754
+ // Start to reconnect based on common rules, same as in net.box.
1755
+ // Reconnect also closes the connection: server waits until all
1756
+ // subscribed connections are terminated.
1757
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1758
+ // step 3.
1759
+ defer conn .reconnect (err , conn .c )
1760
+ }
1761
+ }
0 commit comments