Skip to content

Commit fe07889

Browse files
feature: support graceful shutdown
If connected to Tarantool 2.10 or newer, after this patch a connection supports server graceful shutdown [1]. In this case, server will wait until all client requests will be finished and client disconnects before going down (server also may go down by timeout). Client reconnect will happen if connection options enable reconnect. Beware that graceful shutdown event initialization is asynchronous. 1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ Closes #214
1 parent 2faaa7d commit fe07889

File tree

4 files changed

+690
-14
lines changed

4 files changed

+690
-14
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
- Error type support in MessagePack (#209)
1616
- Event subscription support (#119)
1717
- Session settings support (#215)
18+
- Support graceful shutdown (#214)
1819

1920
### Changed
2021

connection.go

+137-14
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@ const ignoreStreamId = 0
2525
const (
2626
connDisconnected = 0
2727
connConnected = 1
28-
connClosed = 2
28+
connShutdown = 2
29+
connClosed = 3
2930
)
3031

3132
const (
3233
connTransportNone = ""
3334
connTransportSsl = "ssl"
3435
)
3536

37+
const shutdownEventKey = "box.shutdown"
38+
3639
type ConnEventKind int
3740
type ConnLogKind int
3841

@@ -45,6 +48,8 @@ const (
4548
ReconnectFailed
4649
// Either reconnect attempts exhausted, or explicit Close is called.
4750
Closed
51+
// Shutdown signals that shutdown callback is processing.
52+
Shutdown
4853

4954
// LogReconnectFailed is logged when reconnect attempt failed.
5055
LogReconnectFailed ConnLogKind = iota + 1
@@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134139
// always returns array of array (array of tuples for space related methods).
135140
// For Eval* and Call* Tarantool always returns array, but does not forces
136141
// array of arrays.
142+
//
143+
// If connected to Tarantool 2.10 or newer, connection supports server graceful
144+
// shutdown. In this case, server will wait until all client requests will be
145+
// finished and client disconnects before going down (server also may go down
146+
// by timeout). Client reconnect will happen if connection options enable
147+
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
148+
//
149+
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137150
type Connection struct {
138151
addr string
139152
c net.Conn
140153
mutex sync.Mutex
154+
cond *sync.Cond
141155
// Schema contains schema loaded on connection.
142156
Schema *Schema
143157
// requestId contains the last request ID for requests with nil context.
@@ -162,6 +176,11 @@ type Connection struct {
162176
serverProtocolInfo ProtocolInfo
163177
// watchMap is a map of key -> chan watchState.
164178
watchMap sync.Map
179+
180+
// shutdownWatcher is the "box.shutdown" event watcher.
181+
shutdownWatcher Watcher
182+
// requestCnt is a counter of active requests.
183+
requestCnt int64
165184
}
166185

167186
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -385,6 +404,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
385404
conn.opts.Logger = defaultLogger{}
386405
}
387406

407+
conn.cond = sync.NewCond(&conn.mutex)
408+
388409
if err = conn.createConnection(false); err != nil {
389410
ter, ok := err.(Error)
390411
if conn.opts.Reconnect <= 0 {
@@ -589,10 +610,20 @@ func (conn *Connection) dial() (err error) {
589610
conn.lockShards()
590611
conn.c = connection
591612
atomic.StoreUint32(&conn.state, connConnected)
613+
conn.cond.Broadcast()
592614
conn.unlockShards()
593615
go conn.writer(w, connection)
594616
go conn.reader(r, connection)
595617

618+
// Subscribe shutdown event to process graceful shutdown.
619+
if conn.shutdownWatcher == nil && conn.isFeatureInSlice(WatchersFeature, conn.serverProtocolInfo.Features) {
620+
watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
621+
if werr != nil {
622+
return werr
623+
}
624+
conn.shutdownWatcher = watcher
625+
}
626+
596627
return
597628
}
598629

@@ -762,10 +793,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
762793
if conn.state != connClosed {
763794
close(conn.control)
764795
atomic.StoreUint32(&conn.state, connClosed)
796+
conn.cond.Broadcast()
797+
// Free the resources.
798+
if conn.shutdownWatcher != nil {
799+
go conn.shutdownWatcher.Unregister()
800+
conn.shutdownWatcher = nil
801+
}
765802
conn.notify(Closed)
766803
}
767804
} else {
768805
atomic.StoreUint32(&conn.state, connDisconnected)
806+
conn.cond.Broadcast()
769807
conn.notify(Disconnected)
770808
}
771809
if conn.c != nil {
@@ -784,9 +822,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
784822
return
785823
}
786824

787-
func (conn *Connection) reconnect(neterr error, c net.Conn) {
788-
conn.mutex.Lock()
789-
defer conn.mutex.Unlock()
825+
func (conn *Connection) reconnectImpl(neterr error, c net.Conn) {
790826
if conn.opts.Reconnect > 0 {
791827
if c == conn.c {
792828
conn.closeConnection(neterr, false)
@@ -799,6 +835,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
799835
}
800836
}
801837

838+
func (conn *Connection) reconnect(neterr error, c net.Conn) {
839+
conn.mutex.Lock()
840+
defer conn.mutex.Unlock()
841+
conn.reconnectImpl(neterr, c)
842+
conn.cond.Broadcast()
843+
}
844+
802845
func (conn *Connection) lockShards() {
803846
for i := range conn.shard {
804847
conn.shard[i].rmut.Lock()
@@ -1026,6 +1069,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
10261069
fut.done = nil
10271070
shard.rmut.Unlock()
10281071
return
1072+
case connShutdown:
1073+
fut.err = ClientError{
1074+
ErrConnectionShutdown,
1075+
"server shutdown in progress",
1076+
}
1077+
fut.ready = nil
1078+
fut.done = nil
1079+
shard.rmut.Unlock()
1080+
return
10291081
}
10301082
pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
10311083
if ctx != nil {
@@ -1082,22 +1134,32 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
10821134
}
10831135

10841136
func (conn *Connection) send(req Request, streamId uint64) *Future {
1137+
atomic.AddInt64(&conn.requestCnt, int64(1))
1138+
10851139
fut := conn.newFuture(req.Ctx())
10861140
if fut.ready == nil {
1141+
atomic.AddInt64(&conn.requestCnt, int64(-1))
10871142
return fut
10881143
}
1144+
10891145
if req.Ctx() != nil {
10901146
select {
10911147
case <-req.Ctx().Done():
10921148
conn.cancelFuture(fut, fmt.Errorf("context is done"))
1149+
// future here does not belong to any shard yet,
1150+
// so cancelFuture don't call markDone.
1151+
atomic.AddInt64(&conn.requestCnt, int64(-1))
10931152
return fut
10941153
default:
10951154
}
10961155
}
1156+
10971157
conn.putFuture(fut, req, streamId)
1158+
10981159
if req.Ctx() != nil {
10991160
go conn.contextWatchdog(fut, req.Ctx())
11001161
}
1162+
11011163
return fut
11021164
}
11031165

@@ -1164,6 +1226,10 @@ func (conn *Connection) markDone(fut *Future) {
11641226
if conn.rlimit != nil {
11651227
<-conn.rlimit
11661228
}
1229+
1230+
if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
1231+
conn.cond.Broadcast()
1232+
}
11671233
}
11681234

11691235
func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
@@ -1458,6 +1524,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
14581524
return st, nil
14591525
}
14601526

1527+
func (conn *Connection) isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) bool {
1528+
for _, actual := range actualSlice {
1529+
if expected == actual {
1530+
return true
1531+
}
1532+
}
1533+
return false
1534+
}
1535+
14611536
// NewWatcher creates a new Watcher object for the connection.
14621537
//
14631538
// You need to require WatchersFeature to use watchers, see examples for the
@@ -1496,20 +1571,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
14961571
// asynchronous. We do not expect any response from a Tarantool instance
14971572
// That's why we can't just check the Tarantool response for an unsupported
14981573
// request error.
1499-
watchersRequired := false
1500-
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
1501-
if feature == WatchersFeature {
1502-
watchersRequired = true
1503-
break
1504-
}
1505-
}
1506-
1507-
if !watchersRequired {
1574+
if !conn.isFeatureInSlice(WatchersFeature, conn.opts.RequiredProtocolInfo.Features) {
15081575
err := fmt.Errorf("the feature %s must be required by connection "+
15091576
"options to create a watcher", WatchersFeature)
15101577
return nil, err
15111578
}
15121579

1580+
return conn.newWatcherImpl(key, callback)
1581+
}
1582+
1583+
func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
15131584
st, err := subscribeWatchChannel(conn, key)
15141585
if err != nil {
15151586
return nil, err
@@ -1563,7 +1634,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
15631634

15641635
if state.cnt == 0 {
15651636
// The last one sends IPROTO_UNWATCH.
1566-
conn.Do(newUnwatchRequest(key)).Get()
1637+
if !conn.ClosedNow() {
1638+
// conn.ClosedNow() check is a workaround for calling
1639+
// Unregister from connectionClose().
1640+
conn.Do(newUnwatchRequest(key)).Get()
1641+
}
15671642
conn.watchMap.Delete(key)
15681643
close(state.unready)
15691644
}
@@ -1666,3 +1741,51 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
16661741
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
16671742
return clientProtocolInfo.Clone()
16681743
}
1744+
1745+
func shutdownEventCallback(event WatchEvent) {
1746+
// Receives "true" on server shutdown.
1747+
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1748+
// step 2.
1749+
val, ok := event.Value.(bool)
1750+
if ok && val {
1751+
go event.Conn.shutdown()
1752+
}
1753+
}
1754+
1755+
func (conn *Connection) shutdown() {
1756+
// Forbid state changes.
1757+
conn.mutex.Lock()
1758+
defer conn.mutex.Unlock()
1759+
1760+
if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) {
1761+
return
1762+
}
1763+
conn.cond.Broadcast()
1764+
conn.notify(Shutdown)
1765+
1766+
c := conn.c
1767+
for {
1768+
if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
1769+
return
1770+
}
1771+
if atomic.LoadInt64(&conn.requestCnt) == 0 {
1772+
break
1773+
}
1774+
// Use cond var on conn.mutex since request execution may
1775+
// call reconnect(). It is ok if state changes as part of
1776+
// reconnect since Tarantool server won't allow to reconnect
1777+
// in the middle of shutting down.
1778+
conn.cond.Wait()
1779+
}
1780+
1781+
// Start to reconnect based on common rules, same as in net.box.
1782+
// Reconnect also closes the connection: server waits until all
1783+
// subscribed connections are terminated.
1784+
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1785+
// step 3.
1786+
conn.reconnectImpl(
1787+
ClientError{
1788+
ErrConnectionClosed,
1789+
"connection closed after server shutdown",
1790+
}, conn.c)
1791+
}

errors.go

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ const (
5555
ErrProtocolError = 0x4000 + iota
5656
ErrTimeouted = 0x4000 + iota
5757
ErrRateLimited = 0x4000 + iota
58+
ErrConnectionShutdown = 0x4000 + iota
5859
)
5960

6061
// Tarantool server error codes.

0 commit comments

Comments
 (0)