Skip to content

Commit 7946786

Browse files
shutdown notify channel
1 parent e1a5390 commit 7946786

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

connection.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
144144
// connection supports server graceful shutdown. In this case, server will
145145
// wait until all client requests will be finished and client disconnects
146146
// before going down (server also may go down by timeout). Client reconnect will
147-
// happen if connection options enable reconnect.
147+
// happen if connection options enable reconnect. Beware that graceful shutdown
148+
// event initialization is asynchronous. You may use ShutdownNotify channel
149+
// in options to track events.
148150
//
149151
// More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
150152
type Connection struct {
@@ -308,6 +310,8 @@ type Opts struct {
308310
// list of protocol features that should be supported by
309311
// Tarantool server. By default there are no restrictions.
310312
RequiredProtocolInfo ProtocolInfo
313+
// ShutdownNotify is a channel which receives Shutdown event values.
314+
ShutdownNotify chan<- WatchEvent
311315
}
312316

313317
// SslOpts is a way to configure ssl transport.
@@ -883,6 +887,15 @@ func (conn *Connection) notify(kind ConnEventKind) {
883887
}
884888
}
885889

890+
func (conn *Connection) shutdownNotify(event WatchEvent) {
891+
if conn.opts.ShutdownNotify != nil {
892+
select {
893+
case conn.opts.ShutdownNotify <- event:
894+
default:
895+
}
896+
}
897+
}
898+
886899
func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
887900
var shardn uint32
888901
var packet smallWBuf
@@ -1740,6 +1753,7 @@ func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
17401753
}
17411754

17421755
func shutdownEventCallback(event WatchEvent) {
1756+
event.Conn.shutdownNotify(event)
17431757
// Receives "true" on server shutdown.
17441758
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
17451759
// step 2.

shutdown_test.go

+42-2
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,15 @@ func TestGracefulShutdown(t *testing.T) {
127127
require.Nil(t, err)
128128
defer test_helpers.StopTarantoolWithCleanup(inst)
129129

130+
shtdnNotify := make(chan WatchEvent, 10)
131+
shtdnClntOpts.ShutdownNotify = shtdnNotify
132+
130133
conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
131134
defer conn.Close()
132135

136+
// Wait till Tarantool register out watcher.
137+
<-shtdnNotify
138+
133139
testGracefulShutdown(t, conn, &inst)
134140
}
135141

@@ -143,17 +149,29 @@ func TestGracefulShutdownWithReconnect(t *testing.T) {
143149
require.Nil(t, err)
144150
defer test_helpers.StopTarantoolWithCleanup(inst)
145151

152+
shtdnNotify := make(chan WatchEvent, 10)
153+
shtdnClntOpts.ShutdownNotify = shtdnNotify
154+
146155
conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
147156
defer conn.Close()
148157

158+
// Wait till Tarantool register out watcher.
159+
<-shtdnNotify
160+
149161
testGracefulShutdown(t, conn, &inst)
150162

163+
// Skip shutdown event.
164+
<-shtdnNotify
165+
151166
err = test_helpers.RestartTarantool(&inst)
152167
require.Nilf(t, err, "Failed to restart tarantool")
153168

154169
connected := test_helpers.WaitUntilReconnected(conn, shtdnClntOpts.MaxReconnects, shtdnClntOpts.Reconnect)
155170
require.Truef(t, connected, "Reconnect success")
156171

172+
// Wait till Tarantool register out watcher.
173+
<-shtdnNotify
174+
157175
testGracefulShutdown(t, conn, &inst)
158176
}
159177

@@ -241,9 +259,15 @@ func TestGracefulShutdownRespectsClose(t *testing.T) {
241259
require.Nil(t, err)
242260
defer test_helpers.StopTarantoolWithCleanup(inst)
243261

262+
shtdnNotify := make(chan WatchEvent, 10)
263+
shtdnClntOpts.ShutdownNotify = shtdnNotify
264+
244265
conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
245266
defer conn.Close()
246267

268+
// Wait till Tarantool register out watcher.
269+
<-shtdnNotify
270+
247271
// Set a big timeout so it would be easy to differ
248272
// if server went down on timeout or after all connections were terminated.
249273
serverShutdownTimeout := 60 // in seconds
@@ -311,9 +335,15 @@ func TestGracefulShutdownNotRacesWithRequestReconnect(t *testing.T) {
311335
require.Nil(t, err)
312336
defer test_helpers.StopTarantoolWithCleanup(inst)
313337

338+
shtdnNotify := make(chan WatchEvent, 10)
339+
shtdnClntOpts.ShutdownNotify = shtdnNotify
340+
314341
conn = test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
315342
defer conn.Close()
316343

344+
// Wait till Tarantool register out watcher.
345+
<-shtdnNotify
346+
317347
// Set a small timeout so server will shutdown before requesst finishes.
318348
serverShutdownTimeout := 1 // in seconds
319349
_, err = conn.Call("box.ctl.set_on_shutdown_timeout", []interface{}{serverShutdownTimeout})
@@ -394,6 +424,9 @@ func TestGracefulShutdownCloseConcurrent(t *testing.T) {
394424
go func(i int) {
395425
defer caseWg.Done()
396426

427+
// Do not wait till Tarantool register out watcher,
428+
// test everything is ok even on async.
429+
397430
conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
398431
defer conn.Close()
399432

@@ -463,12 +496,18 @@ func TestGracefulShutdownConcurrent(t *testing.T) {
463496
// Create many connections.
464497
var ret error
465498
for i := 0; i < testConcurrency; i++ {
466-
go func(i int) {
499+
go func(i int, opts Opts) {
467500
defer caseWg.Done()
468501

469-
conn := test_helpers.ConnectWithValidation(t, shtdnServer, shtdnClntOpts)
502+
shtdnNotify := make(chan WatchEvent, 10)
503+
opts.ShutdownNotify = shtdnNotify
504+
505+
conn := test_helpers.ConnectWithValidation(t, shtdnServer, opts)
470506
defer conn.Close()
471507

508+
// Wait till Tarantool register out watcher.
509+
<-shtdnNotify
510+
472511
evalSleep := 1 // in seconds
473512
req := NewEvalRequest(evalBody).Args([]interface{}{evalSleep, evalMsg})
474513
fut := conn.Do(req)
@@ -482,6 +521,7 @@ func TestGracefulShutdownConcurrent(t *testing.T) {
482521
ret = gerr
483522
}
484523
}(i)
524+
}(i, shtdnClntOpts)
485525
}
486526

487527
var sret error

0 commit comments

Comments
 (0)