Skip to content

Graceful shutdown #249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Event subscription support (#119)
- Session settings support (#215)
- pap-sha256 authorization method support (Tarantool EE feature) (#243)
- Support graceful shutdown (#214)

### Changed

Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ golangci-lint:

.PHONY: test
test:
@echo "Running all packages tests"
go clean -testcache
go test -tags "$(TAGS)" ./... -v -p 1

.PHONY: testdata
Expand Down
153 changes: 139 additions & 14 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ const ignoreStreamId = 0
const (
connDisconnected = 0
connConnected = 1
connClosed = 2
connShutdown = 2
connClosed = 3
)

const (
connTransportNone = ""
connTransportSsl = "ssl"
)

const shutdownEventKey = "box.shutdown"

type ConnEventKind int
type ConnLogKind int

Expand All @@ -45,6 +48,8 @@ const (
ReconnectFailed
// Either reconnect attempts exhausted, or explicit Close is called.
Closed
// Shutdown signals that shutdown callback is processing.
Shutdown

// LogReconnectFailed is logged when reconnect attempt failed.
LogReconnectFailed ConnLogKind = iota + 1
Expand Down Expand Up @@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
// always returns array of array (array of tuples for space related methods).
// For Eval* and Call* Tarantool always returns array, but does not forces
// array of arrays.
//
// If connected to Tarantool 2.10 or newer, connection supports server graceful
// shutdown. In this case, server will wait until all client requests will be
// finished and client disconnects before going down (server also may go down
// by timeout). Client reconnect will happen if connection options enable
// reconnect. Beware that graceful shutdown event initialization is asynchronous.
//
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
type Connection struct {
addr string
c net.Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// requestId contains the last request ID for requests with nil context.
Expand All @@ -162,6 +176,11 @@ type Connection struct {
serverProtocolInfo ProtocolInfo
// watchMap is a map of key -> chan watchState.
watchMap sync.Map

// shutdownWatcher is the "box.shutdown" event watcher.
shutdownWatcher Watcher
// requestCnt is a counter of active requests.
requestCnt int64
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -387,6 +406,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
conn.opts.Logger = defaultLogger{}
}

conn.cond = sync.NewCond(&conn.mutex)

if err = conn.createConnection(false); err != nil {
ter, ok := err.(Error)
if conn.opts.Reconnect <= 0 {
Expand Down Expand Up @@ -612,10 +633,20 @@ func (conn *Connection) dial() (err error) {
conn.lockShards()
conn.c = connection
atomic.StoreUint32(&conn.state, connConnected)
conn.cond.Broadcast()
conn.unlockShards()
go conn.writer(w, connection)
go conn.reader(r, connection)

// Subscribe shutdown event to process graceful shutdown.
if conn.shutdownWatcher == nil && isFeatureInSlice(WatchersFeature, conn.serverProtocolInfo.Features) {
watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
if werr != nil {
return werr
}
conn.shutdownWatcher = watcher
}

return
}

Expand Down Expand Up @@ -745,10 +776,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
if conn.state != connClosed {
close(conn.control)
atomic.StoreUint32(&conn.state, connClosed)
conn.cond.Broadcast()
// Free the resources.
if conn.shutdownWatcher != nil {
go conn.shutdownWatcher.Unregister()
conn.shutdownWatcher = nil
}
conn.notify(Closed)
}
} else {
atomic.StoreUint32(&conn.state, connDisconnected)
conn.cond.Broadcast()
conn.notify(Disconnected)
}
if conn.c != nil {
Expand All @@ -767,9 +805,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
return
}

func (conn *Connection) reconnect(neterr error, c net.Conn) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
func (conn *Connection) reconnectImpl(neterr error, c net.Conn) {
if conn.opts.Reconnect > 0 {
if c == conn.c {
conn.closeConnection(neterr, false)
Expand All @@ -782,6 +818,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
}
}

func (conn *Connection) reconnect(neterr error, c net.Conn) {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.reconnectImpl(neterr, c)
conn.cond.Broadcast()
}

func (conn *Connection) lockShards() {
for i := range conn.shard {
conn.shard[i].rmut.Lock()
Expand Down Expand Up @@ -1009,6 +1052,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
fut.done = nil
shard.rmut.Unlock()
return
case connShutdown:
fut.err = ClientError{
ErrConnectionShutdown,
"server shutdown in progress",
}
fut.ready = nil
fut.done = nil
shard.rmut.Unlock()
return
}
pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
if ctx != nil {
Expand Down Expand Up @@ -1060,11 +1112,25 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
}
}

func (conn *Connection) incrementRequestCnt() {
atomic.AddInt64(&conn.requestCnt, int64(1))
}

func (conn *Connection) decrementRequestCnt() {
if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 {
conn.cond.Broadcast()
}
}

func (conn *Connection) send(req Request, streamId uint64) *Future {
conn.incrementRequestCnt()

fut := conn.newFuture(req.Ctx())
if fut.ready == nil {
conn.decrementRequestCnt()
return fut
}

if req.Ctx() != nil {
select {
case <-req.Ctx().Done():
Expand All @@ -1075,6 +1141,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
go conn.contextWatchdog(fut, req.Ctx())
}
conn.putFuture(fut, req, streamId)

return fut
}

Expand Down Expand Up @@ -1141,6 +1208,7 @@ func (conn *Connection) markDone(fut *Future) {
if conn.rlimit != nil {
<-conn.rlimit
}
conn.decrementRequestCnt()
}

func (conn *Connection) peekFuture(reqid uint32) (fut *Future) {
Expand Down Expand Up @@ -1426,6 +1494,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
return st, nil
}

func isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) bool {
for _, actual := range actualSlice {
if expected == actual {
return true
}
}
return false
}

// NewWatcher creates a new Watcher object for the connection.
//
// You need to require WatchersFeature to use watchers, see examples for the
Expand Down Expand Up @@ -1464,20 +1541,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// asynchronous. We do not expect any response from a Tarantool instance
// That's why we can't just check the Tarantool response for an unsupported
// request error.
watchersRequired := false
for _, feature := range conn.opts.RequiredProtocolInfo.Features {
if feature == WatchersFeature {
watchersRequired = true
break
}
}

if !watchersRequired {
if !isFeatureInSlice(WatchersFeature, conn.opts.RequiredProtocolInfo.Features) {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", WatchersFeature)
return nil, err
}

return conn.newWatcherImpl(key, callback)
}

func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) {
st, err := subscribeWatchChannel(conn, key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1531,7 +1604,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,

if state.cnt == 0 {
// The last one sends IPROTO_UNWATCH.
conn.Do(newUnwatchRequest(key)).Get()
if !conn.ClosedNow() {
// conn.ClosedNow() check is a workaround for calling
// Unregister from connectionClose().
conn.Do(newUnwatchRequest(key)).Get()
}
conn.watchMap.Delete(key)
close(state.unready)
}
Expand Down Expand Up @@ -1637,3 +1714,51 @@ func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
info.Auth = conn.opts.Auth
return info
}

func shutdownEventCallback(event WatchEvent) {
// Receives "true" on server shutdown.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 2.
val, ok := event.Value.(bool)
if ok && val {
go event.Conn.shutdown()
}
}

func (conn *Connection) shutdown() {
// Forbid state changes.
conn.mutex.Lock()
defer conn.mutex.Unlock()

if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) {
return
}
conn.cond.Broadcast()
conn.notify(Shutdown)

c := conn.c
for {
if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) {
return
}
if atomic.LoadInt64(&conn.requestCnt) == 0 {
break
}
// Use cond var on conn.mutex since request execution may
// call reconnect(). It is ok if state changes as part of
// reconnect since Tarantool server won't allow to reconnect
// in the middle of shutting down.
conn.cond.Wait()
}

// Start to reconnect based on common rules, same as in net.box.
// Reconnect also closes the connection: server waits until all
// subscribed connections are terminated.
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
// step 3.
conn.reconnectImpl(
ClientError{
ErrConnectionClosed,
"connection closed after server shutdown",
}, conn.c)
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
ErrProtocolError = 0x4000 + iota
ErrTimeouted = 0x4000 + iota
ErrRateLimited = 0x4000 + iota
ErrConnectionShutdown = 0x4000 + iota
)

// Tarantool server error codes.
Expand Down
Loading