From 5f9eed585c0b2a3f15e8c82f876aa1c705b32eda Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Fri, 16 Dec 2022 12:54:11 +0300 Subject: [PATCH 1/4] code health: reuse Retry in test helpers Part of #214 --- test_helpers/utils.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/test_helpers/utils.go b/test_helpers/utils.go index 4862d90d8..f1002e954 100644 --- a/test_helpers/utils.go +++ b/test_helpers/utils.go @@ -47,20 +47,16 @@ func DeleteRecordByKey(t *testing.T, conn tarantool.Connector, // Returns false in case of connection is not in the connected state // after specified retries count, true otherwise. func WaitUntilReconnected(conn *tarantool.Connection, retries uint, timeout time.Duration) bool { - for i := uint(0); ; i++ { + err := Retry(func(arg interface{}) error { + conn := arg.(*tarantool.Connection) connected := conn.ConnectedNow() - if connected { - return true + if !connected { + return fmt.Errorf("not connected") } + return nil + }, conn, int(retries), timeout) - if i == retries { - break - } - - time.Sleep(timeout) - } - - return false + return err == nil } func SkipIfSQLUnsupported(t testing.TB) { From decb45f70f3b404abf0120db0d1fafa04eb1c761 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Fri, 16 Dec 2022 14:43:16 +0300 Subject: [PATCH 2/4] make: clean cache for test After this patch, "make test" behaves similar to "make test-*". Part of #214 --- Makefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Makefile b/Makefile index 718139967..7ed0aa88e 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,8 @@ golangci-lint: .PHONY: test test: + @echo "Running all packages tests" + go clean -testcache go test -tags "$(TAGS)" ./... -v -p 1 .PHONY: testdata From f3cc24d235924203084121ffddc5e737686530c6 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Tue, 20 Dec 2022 16:49:58 +0300 Subject: [PATCH 3/4] test: idempotent stop in helpers Before this patch, calling StopTarantool wasn't idempotent because it accepts a struct copy and doesn't actually set Cmd to nil. Setting Cmd.Process to nil is effective since it's a pointer. Reworking helpers to use pointer would be better, but it would break existing API. --- test_helpers/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_helpers/main.go b/test_helpers/main.go index f2a97bef9..3363ed375 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -298,7 +298,7 @@ func StopTarantool(inst TarantoolInstance) { log.Fatalf("Failed to wait for Tarantool process to exit, got %s", err) } - inst.Cmd = nil + inst.Cmd.Process = nil } } From 4eb8ac84c0487cc8074f49d7e05d51fbea5a5365 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Fri, 16 Dec 2022 14:53:59 +0300 Subject: [PATCH 4/4] feature: support graceful shutdown If connected to Tarantool 2.10 or newer, after this patch a connection supports server graceful shutdown [1]. In this case, server will wait until all client requests will be finished and client disconnects before going down (server also may go down by timeout). Client reconnect will happen if connection options enable reconnect. Beware that graceful shutdown event initialization is asynchronous. 1. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ Closes #214 --- CHANGELOG.md | 1 + connection.go | 153 +++++++++++-- errors.go | 1 + shutdown_test.go | 551 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 692 insertions(+), 14 deletions(-) create mode 100644 shutdown_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7521d3796..80baf62ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Event subscription support (#119) - Session settings support (#215) - pap-sha256 authorization method support (Tarantool EE feature) (#243) +- Support graceful shutdown (#214) ### Changed diff --git a/connection.go b/connection.go index e584d44d5..331e1c805 100644 --- a/connection.go +++ b/connection.go @@ -25,7 +25,8 @@ const ignoreStreamId = 0 const ( connDisconnected = 0 connConnected = 1 - connClosed = 2 + connShutdown = 2 + connClosed = 3 ) const ( @@ -33,6 +34,8 @@ const ( connTransportSsl = "ssl" ) +const shutdownEventKey = "box.shutdown" + type ConnEventKind int type ConnLogKind int @@ -45,6 +48,8 @@ const ( ReconnectFailed // Either reconnect attempts exhausted, or explicit Close is called. Closed + // Shutdown signals that shutdown callback is processing. + Shutdown // LogReconnectFailed is logged when reconnect attempt failed. LogReconnectFailed ConnLogKind = iota + 1 @@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac // always returns array of array (array of tuples for space related methods). // For Eval* and Call* Tarantool always returns array, but does not forces // array of arrays. +// +// If connected to Tarantool 2.10 or newer, connection supports server graceful +// shutdown. In this case, server will wait until all client requests will be +// finished and client disconnects before going down (server also may go down +// by timeout). Client reconnect will happen if connection options enable +// reconnect. Beware that graceful shutdown event initialization is asynchronous. +// +// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ type Connection struct { addr string c net.Conn mutex sync.Mutex + cond *sync.Cond // Schema contains schema loaded on connection. Schema *Schema // requestId contains the last request ID for requests with nil context. @@ -162,6 +176,11 @@ type Connection struct { serverProtocolInfo ProtocolInfo // watchMap is a map of key -> chan watchState. watchMap sync.Map + + // shutdownWatcher is the "box.shutdown" event watcher. + shutdownWatcher Watcher + // requestCnt is a counter of active requests. + requestCnt int64 } var _ = Connector(&Connection{}) // Check compatibility with connector interface. @@ -387,6 +406,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) { conn.opts.Logger = defaultLogger{} } + conn.cond = sync.NewCond(&conn.mutex) + if err = conn.createConnection(false); err != nil { ter, ok := err.(Error) if conn.opts.Reconnect <= 0 { @@ -612,10 +633,20 @@ func (conn *Connection) dial() (err error) { conn.lockShards() conn.c = connection atomic.StoreUint32(&conn.state, connConnected) + conn.cond.Broadcast() conn.unlockShards() go conn.writer(w, connection) go conn.reader(r, connection) + // Subscribe shutdown event to process graceful shutdown. + if conn.shutdownWatcher == nil && isFeatureInSlice(WatchersFeature, conn.serverProtocolInfo.Features) { + watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback) + if werr != nil { + return werr + } + conn.shutdownWatcher = watcher + } + return } @@ -745,10 +776,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) if conn.state != connClosed { close(conn.control) atomic.StoreUint32(&conn.state, connClosed) + conn.cond.Broadcast() + // Free the resources. + if conn.shutdownWatcher != nil { + go conn.shutdownWatcher.Unregister() + conn.shutdownWatcher = nil + } conn.notify(Closed) } } else { atomic.StoreUint32(&conn.state, connDisconnected) + conn.cond.Broadcast() conn.notify(Disconnected) } if conn.c != nil { @@ -767,9 +805,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error) return } -func (conn *Connection) reconnect(neterr error, c net.Conn) { - conn.mutex.Lock() - defer conn.mutex.Unlock() +func (conn *Connection) reconnectImpl(neterr error, c net.Conn) { if conn.opts.Reconnect > 0 { if c == conn.c { conn.closeConnection(neterr, false) @@ -782,6 +818,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) { } } +func (conn *Connection) reconnect(neterr error, c net.Conn) { + conn.mutex.Lock() + defer conn.mutex.Unlock() + conn.reconnectImpl(neterr, c) + conn.cond.Broadcast() +} + func (conn *Connection) lockShards() { for i := range conn.shard { conn.shard[i].rmut.Lock() @@ -1009,6 +1052,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) { fut.done = nil shard.rmut.Unlock() return + case connShutdown: + fut.err = ClientError{ + ErrConnectionShutdown, + "server shutdown in progress", + } + fut.ready = nil + fut.done = nil + shard.rmut.Unlock() + return } pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1) if ctx != nil { @@ -1060,11 +1112,25 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) { } } +func (conn *Connection) incrementRequestCnt() { + atomic.AddInt64(&conn.requestCnt, int64(1)) +} + +func (conn *Connection) decrementRequestCnt() { + if atomic.AddInt64(&conn.requestCnt, int64(-1)) == 0 { + conn.cond.Broadcast() + } +} + func (conn *Connection) send(req Request, streamId uint64) *Future { + conn.incrementRequestCnt() + fut := conn.newFuture(req.Ctx()) if fut.ready == nil { + conn.decrementRequestCnt() return fut } + if req.Ctx() != nil { select { case <-req.Ctx().Done(): @@ -1075,6 +1141,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future { go conn.contextWatchdog(fut, req.Ctx()) } conn.putFuture(fut, req, streamId) + return fut } @@ -1141,6 +1208,7 @@ func (conn *Connection) markDone(fut *Future) { if conn.rlimit != nil { <-conn.rlimit } + conn.decrementRequestCnt() } func (conn *Connection) peekFuture(reqid uint32) (fut *Future) { @@ -1426,6 +1494,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error return st, nil } +func isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) bool { + for _, actual := range actualSlice { + if expected == actual { + return true + } + } + return false +} + // NewWatcher creates a new Watcher object for the connection. // // You need to require WatchersFeature to use watchers, see examples for the @@ -1464,20 +1541,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, // asynchronous. We do not expect any response from a Tarantool instance // That's why we can't just check the Tarantool response for an unsupported // request error. - watchersRequired := false - for _, feature := range conn.opts.RequiredProtocolInfo.Features { - if feature == WatchersFeature { - watchersRequired = true - break - } - } - - if !watchersRequired { + if !isFeatureInSlice(WatchersFeature, conn.opts.RequiredProtocolInfo.Features) { err := fmt.Errorf("the feature %s must be required by connection "+ "options to create a watcher", WatchersFeature) return nil, err } + return conn.newWatcherImpl(key, callback) +} + +func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watcher, error) { st, err := subscribeWatchChannel(conn, key) if err != nil { return nil, err @@ -1531,7 +1604,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, if state.cnt == 0 { // The last one sends IPROTO_UNWATCH. - conn.Do(newUnwatchRequest(key)).Get() + if !conn.ClosedNow() { + // conn.ClosedNow() check is a workaround for calling + // Unregister from connectionClose(). + conn.Do(newUnwatchRequest(key)).Get() + } conn.watchMap.Delete(key) close(state.unready) } @@ -1637,3 +1714,51 @@ func (conn *Connection) ClientProtocolInfo() ProtocolInfo { info.Auth = conn.opts.Auth return info } + +func shutdownEventCallback(event WatchEvent) { + // Receives "true" on server shutdown. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 2. + val, ok := event.Value.(bool) + if ok && val { + go event.Conn.shutdown() + } +} + +func (conn *Connection) shutdown() { + // Forbid state changes. + conn.mutex.Lock() + defer conn.mutex.Unlock() + + if !atomic.CompareAndSwapUint32(&(conn.state), connConnected, connShutdown) { + return + } + conn.cond.Broadcast() + conn.notify(Shutdown) + + c := conn.c + for { + if (atomic.LoadUint32(&conn.state) != connShutdown) || (c != conn.c) { + return + } + if atomic.LoadInt64(&conn.requestCnt) == 0 { + break + } + // Use cond var on conn.mutex since request execution may + // call reconnect(). It is ok if state changes as part of + // reconnect since Tarantool server won't allow to reconnect + // in the middle of shutting down. + conn.cond.Wait() + } + + // Start to reconnect based on common rules, same as in net.box. + // Reconnect also closes the connection: server waits until all + // subscribed connections are terminated. + // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/ + // step 3. + conn.reconnectImpl( + ClientError{ + ErrConnectionClosed, + "connection closed after server shutdown", + }, conn.c) +} diff --git a/errors.go b/errors.go index 02e4635bb..906184c24 100644 --- a/errors.go +++ b/errors.go @@ -55,6 +55,7 @@ const ( ErrProtocolError = 0x4000 + iota ErrTimeouted = 0x4000 + iota ErrRateLimited = 0x4000 + iota + ErrConnectionShutdown = 0x4000 + iota ) // Tarantool server error codes. diff --git a/shutdown_test.go b/shutdown_test.go new file mode 100644 index 000000000..d9b1db111 --- /dev/null +++ b/shutdown_test.go @@ -0,0 +1,551 @@ +//go:build linux || (darwin && !cgo) +// +build linux darwin,!cgo + +// Use OS build flags since signals are system-dependent. + +package tarantool_test + +import ( + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + . "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +var shtdnServer = "127.0.0.1:3014" +var shtdnClntOpts = Opts{ + User: opts.User, + Pass: opts.Pass, + Timeout: 20 * time.Second, + Reconnect: 200 * time.Millisecond, + MaxReconnects: 10, + RequiredProtocolInfo: ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}, +} +var shtdnSrvOpts = test_helpers.StartOpts{ + InitScript: "config.lua", + Listen: shtdnServer, + User: shtdnClntOpts.User, + Pass: shtdnClntOpts.Pass, + WaitStart: 100 * time.Millisecond, + ConnectRetry: 3, + RetryTimeout: 500 * time.Millisecond, +} + +var evalMsg = "got enough sleep" +var evalBody = ` + local fiber = require('fiber') + local time, msg = ... + fiber.sleep(time) + return msg +` + +func testGracefulShutdown(t *testing.T, conn *Connection, inst *test_helpers.TarantoolInstance) { + var resp *Response + var err error + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 1 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that we can't send new requests after shutdown starts. + // Retry helps to wait a bit until server starts to shutdown + // and send us the shutdown event. + shutdownWaitRetries := 5 + shutdownWaitTimeout := 100 * time.Millisecond + + err = test_helpers.Retry(func(interface{}) error { + _, err = conn.Do(NewPingRequest()).Get() + if err == nil { + return fmt.Errorf("expected error for requests sent on shutdown") + } + + if err.Error() != "server shutdown in progress (0x4005)" { + return err + } + + return nil + }, nil, shutdownWaitRetries, shutdownWaitTimeout) + require.Nil(t, err) + + // Check that requests started before the shutdown finish successfully. + resp, err = fut.Get() + require.Nil(t, err) + require.NotNil(t, resp) + require.Equal(t, resp.Data, []interface{}{evalMsg}) + + // Wait until server go down. + // Server will go down only when it process all requests from our connection + // (or on timeout). + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is unavailable when server is down. + require.Equal(t, false, conn.ConnectedNow()) +} + +func TestGracefulShutdown(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) +} + +func TestGracefulShutdownWithReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + testGracefulShutdown(t, conn, &inst) + + err = test_helpers.RestartTarantool(&inst) + require.Nilf(t, err, "Failed to restart tarantool") + + connected := test_helpers.WaitUntilReconnected(conn, shtdnClntOpts.MaxReconnects, shtdnClntOpts.Reconnect) + require.Truef(t, connected, "Reconnect success") + + testGracefulShutdown(t, conn, &inst) +} + +func TestNoGracefulShutdown(t *testing.T) { + // No watchers = no graceful shutdown. + noShtdnClntOpts := shtdnClntOpts.Clone() + noShtdnClntOpts.RequiredProtocolInfo = ProtocolInfo{} + test_helpers.SkipIfWatchersSupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, noShtdnClntOpts) + defer conn.Close() + + evalSleep := 10 // in seconds + serverShutdownTimeout := 60 // in seconds + require.Less(t, evalSleep, serverShutdownTimeout) + + // Send request with sleep. + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") +} + +func TestGracefulShutdownRespectsClose(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 10 // in seconds + require.Lessf(t, + time.Duration(evalSleep)*time.Second, + shtdnClntOpts.Timeout, + "test request won't be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + fut := conn.Do(req) + + // SIGTERM the server. + shutdownStart := time.Now() + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Close the connection. + conn.Close() + + // Connection is closed. + require.Equal(t, true, conn.ClosedNow()) + + // Check that request was interrupted. + _, err = fut.Get() + require.NotNilf(t, err, "sleep request error") + + // Wait until server go down. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + shutdownFinish := time.Now() + shutdownTime := shutdownFinish.Sub(shutdownStart) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that server finished without waiting for eval to finish. + require.Lessf(t, + shutdownTime, + time.Duration(evalSleep/2)*time.Second, + "server went down without any additional waiting") + + // Check that it wasn't a timeout. + require.Lessf(t, + shutdownTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") + + // Connection is still closed. + require.Equal(t, true, conn.ClosedNow()) +} + +func TestGracefulShutdownNotRacesWithRequestReconnect(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var conn *Connection + var err error + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + // Set a small timeout so server will shutdown before requesst finishes. + serverShutdownTimeout := 1 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + + // Send request with sleep. + evalSleep := 5 // in seconds + require.Lessf(t, + serverShutdownTimeout, + evalSleep, + "test request will be failed by timeout") + require.Lessf(t, + time.Duration(serverShutdownTimeout)*time.Second, + shtdnClntOpts.Timeout, + "test request will be failed by timeout") + + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + + evalStart := time.Now() + fut := conn.Do(req) + + // SIGTERM the server. + require.Nil(t, inst.Cmd.Process.Signal(syscall.SIGTERM)) + + // Wait until server go down. + // Server is expected to go down on timeout. + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + // Check that request failed by server disconnect, not a client timeout. + _, err = fut.Get() + require.NotNil(t, err) + require.NotContains(t, err.Error(), "client timeout for request") + + evalFinish := time.Now() + evalTime := evalFinish.Sub(evalStart) + + // Check that it wasn't a client timeout. + require.Lessf(t, + evalTime, + shtdnClntOpts.Timeout, + "server went down not by timeout") +} + +func TestGracefulShutdownCloseConcurrent(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + var srvShtdnStart, srvShtdnFinish time.Time + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, herr := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + require.Nil(t, herr) + defer helperW.Unregister() + <-helperCh + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + conn.Close() + + const testConcurrency = 50 + + var caseWg, srvToStop, srvStop sync.WaitGroup + caseWg.Add(testConcurrency) + srvToStop.Add(testConcurrency) + srvStop.Add(1) + + // Create many connections. + for i := 0; i < testConcurrency; i++ { + go func(i int) { + defer caseWg.Done() + + // Do not wait till Tarantool register out watcher, + // test everything is ok even on async. + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Wait till all connections created. + srvToStop.Done() + srvStop.Wait() + }(i) + } + + var sret error + go func(inst *test_helpers.TarantoolInstance) { + srvToStop.Wait() + srvShtdnStart = time.Now() + cerr := inst.Cmd.Process.Signal(syscall.SIGTERM) + if cerr != nil { + sret = cerr + } + srvStop.Done() + }(&inst) + + srvStop.Wait() + require.Nil(t, sret, "No errors on server SIGTERM") + + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + srvShtdnFinish = time.Now() + srvShtdnTime := srvShtdnFinish.Sub(srvShtdnStart) + + require.Less(t, + srvShtdnTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") +} + +func TestGracefulShutdownConcurrent(t *testing.T) { + test_helpers.SkipIfWatchersUnsupported(t) + + var inst test_helpers.TarantoolInstance + var err error + var srvShtdnStart, srvShtdnFinish time.Time + + inst, err = test_helpers.StartTarantool(shtdnSrvOpts) + require.Nil(t, err) + defer test_helpers.StopTarantoolWithCleanup(inst) + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Set a big timeout so it would be easy to differ + // if server went down on timeout or after all connections were terminated. + serverShutdownTimeout := 60 // in seconds + _, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout}) + require.Nil(t, err) + conn.Close() + + const testConcurrency = 50 + + var caseWg, srvToStop, srvStop sync.WaitGroup + caseWg.Add(testConcurrency) + srvToStop.Add(testConcurrency) + srvStop.Add(1) + + // Create many connections. + var ret error + for i := 0; i < testConcurrency; i++ { + go func(i int) { + defer caseWg.Done() + + conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts) + defer conn.Close() + + // Create a helper watcher to ensure that async + // shutdown is set up. + helperCh := make(chan WatchEvent, 10) + helperW, _ := conn.NewWatcher("box.shutdown", func(event WatchEvent) { + helperCh <- event + }) + defer helperW.Unregister() + <-helperCh + + evalSleep := 1 // in seconds + req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg}) + fut := conn.Do(req) + + // Wait till all connections had started sleeping. + srvToStop.Done() + srvStop.Wait() + + _, gerr := fut.Get() + if gerr != nil { + ret = gerr + } + }(i) + } + + var sret error + go func(inst *test_helpers.TarantoolInstance) { + srvToStop.Wait() + srvShtdnStart = time.Now() + cerr := inst.Cmd.Process.Signal(syscall.SIGTERM) + if cerr != nil { + sret = cerr + } + srvStop.Done() + }(&inst) + + srvStop.Wait() + require.Nil(t, sret, "No errors on server SIGTERM") + + caseWg.Wait() + require.Nil(t, ret, "No errors on concurrent wait") + + _, err = inst.Cmd.Process.Wait() + require.Nil(t, err) + + // Help test helpers to properly clean up. + inst.Cmd.Process = nil + + srvShtdnFinish = time.Now() + srvShtdnTime := srvShtdnFinish.Sub(srvShtdnStart) + + require.Less(t, + srvShtdnTime, + time.Duration(serverShutdownTimeout/2)*time.Second, + "server went down not by timeout") +}