Skip to content

Commit ee2d1d8

Browse files
reconnect works
1 parent 4805be7 commit ee2d1d8

File tree

3 files changed

+48
-15
lines changed

3 files changed

+48
-15
lines changed

connection.go

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -454,26 +454,30 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
454454

455455
func (conn *Connection) shutdownOnServerEvent() {
456456
conn.mutex.Lock()
457-
defer conn.mutex.Unlock()
458457

459-
fmt.Printf("starting shutdown...\n")
458+
fmt.Printf("starting shutdown... (%v)\n", conn.UniqueId())
460459
conn.shutdownOn.Store(true)
461460
defer conn.shutdownOn.Store(false)
462461

463-
fmt.Printf("wait for futures to finish...\n")
462+
fmt.Printf("wait for futures to finish... (%v)\n", conn.UniqueId())
464463
conn.shutdownWg.Wait()
465464

466465
// fmt.Printf("defer unregister watcher...\n")
467466
// defer conn.shutdownWatcher.Unregister()
468467

469-
fmt.Printf("close the connection...\n")
470-
conn.closeConnection(
471-
ClientError{
472-
ErrConnectionClosed,
473-
"connection closed after server shutdown",
474-
}, false)
468+
err := ClientError{
469+
ErrConnectionClosed,
470+
"connection closed after server shutdown",
471+
}
472+
473+
fmt.Printf("close the connection... (%v)\n", conn.UniqueId())
474+
conn.closeConnection(err, false)
475+
476+
fmt.Printf("finish shutdown... (%v)\n", conn.UniqueId())
475477

476-
fmt.Printf("finish shutdown...\n")
478+
conn.mutex.Unlock()
479+
480+
defer conn.reconnect(err, conn.c)
477481
}
478482

479483
// ConnectedNow reports if connection is established at the moment.
@@ -779,7 +783,10 @@ func (conn *Connection) readIdResponse(r io.Reader) (Response, error) {
779783

780784
func (conn *Connection) createConnection(reconnect bool) (err error) {
781785
var reconnects uint
786+
fmt.Printf("conn.state is %v (%v)\n", conn.state, conn.UniqueId())
782787
for conn.c == nil && conn.state == connDisconnected {
788+
fmt.Printf("createConnection iteration (%v)\n", conn.UniqueId())
789+
783790
now := time.Now()
784791
err = conn.dial()
785792
if err == nil || !reconnect {
@@ -797,9 +804,12 @@ func (conn *Connection) createConnection(reconnect bool) (err error) {
797804
conn.opts.Logger.Report(LogReconnectFailed, conn, reconnects, err)
798805
conn.notify(ReconnectFailed)
799806
reconnects++
807+
808+
fmt.Printf("until sleep %v (%v)\n", time.Now(), conn.UniqueId())
800809
conn.mutex.Unlock()
801810
time.Sleep(time.Until(now.Add(conn.opts.Reconnect)))
802811
conn.mutex.Lock()
812+
fmt.Printf("after sleep %v (%v)\n", time.Now(), conn.UniqueId())
803813
}
804814
if conn.state == connClosed {
805815
err = ClientError{ErrConnectionClosed, "using closed connection"}
@@ -836,19 +846,29 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
836846
return
837847
}
838848

849+
func (conn *Connection) UniqueId() string {
850+
return fmt.Sprintf("conn %v", conn.control)
851+
}
852+
839853
func (conn *Connection) reconnect(neterr error, c net.Conn) {
854+
fmt.Printf("reconnect head (%v)\n", conn.UniqueId())
840855
conn.mutex.Lock()
841856
defer conn.mutex.Unlock()
857+
fmt.Printf("conn.opts.Reconnect %d (%v)\n", conn.opts.Reconnect, conn.UniqueId())
842858
if conn.opts.Reconnect > 0 {
843859
if c == conn.c {
860+
fmt.Printf("reconnect attempt (%v)\n", conn.UniqueId())
844861
conn.closeConnection(neterr, false)
845862
if err := conn.createConnection(true); err != nil {
863+
fmt.Printf("kill that one (folded)(%v)\n", conn.UniqueId())
846864
conn.closeConnection(err, true)
847865
}
848866
}
849867
} else {
868+
fmt.Printf("kill that one (%v)\n", conn.UniqueId())
850869
conn.closeConnection(neterr, true)
851870
}
871+
fmt.Printf("reconnect end (%v)\n", conn.UniqueId())
852872
}
853873

854874
func (conn *Connection) lockShards() {
@@ -1166,11 +1186,11 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
11661186
}
11671187

11681188
func (conn *Connection) gracefulWait(fut *Future) {
1169-
fmt.Printf("start tracking %v\n", fut)
1189+
// fmt.Printf("start tracking %v\n", fut)
11701190
<-fut.done
1171-
fmt.Printf("done tracking %v\n", fut)
1191+
// fmt.Printf("done tracking %v\n", fut)
11721192
conn.shutdownWg.Done()
1173-
fmt.Printf("wg done %v\n", fut)
1193+
// fmt.Printf("wg done %v\n", fut)
11741194
}
11751195

11761196
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
@@ -1619,7 +1639,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
16191639

16201640
if sendAck {
16211641
fut := conn.Do(newWatchRequest(key))
1622-
fmt.Printf("ack request %v %v: %v\n", key, state.value, fut)
1642+
// fmt.Printf("ack request %v %v: %v\n", key, state.value, fut)
16231643
fut.Get()
16241644
// We expect a reconnect and re-subscribe if it fails to
16251645
// send the watch request. So it looks ok do not check a

graceful_shutdown/tarantool_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ func TestGracefulShutdown(t *testing.T) {
4545
inst, err := test_helpers.StartTarantool(instOpts)
4646
require.Nil(t, err)
4747

48+
// fmt.Printf("opts are is %v\n", opts)
49+
// fmt.Printf("opts.Reconnect is %v\n", opts.Reconnect)
4850
conn := test_helpers.ConnectWithValidation(t, server, opts)
51+
fmt.Printf("test connect unique id: %v\n", conn.UniqueId())
4952

5053
_, err = conn.Eval("box.ctl.set_on_shutdown_timeout(10)", []interface{}{})
5154
require.Nil(t, err)
@@ -93,6 +96,7 @@ func TestGracefulShutdown(t *testing.T) {
9396
err = test_helpers.RestartTarantool(&inst)
9497
require.Nilf(t, err, "Failed to restart tarantool")
9598

99+
fmt.Printf("opts are is %v\n", opts)
96100
connected := test_helpers.WaitUntilReconnected(conn, retries, timeout)
97101
require.True(t, connected, "Reconnect success")
98102

@@ -125,5 +129,5 @@ func TestGracefulShutdown(t *testing.T) {
125129

126130
// time.Sleep(2 * time.Second)
127131

128-
require.Equal(t, true, conn.ClosedNow())
132+
require.Equal(t, false, conn.ConnectedNow())
129133
}

test_helpers/utils.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,14 @@ func DeleteRecordByKey(t *testing.T, conn tarantool.Connector,
4747
// Returns false in case of connection is not in the connected state
4848
// after specified retries count, true otherwise.
4949
func WaitUntilReconnected(conn *tarantool.Connection, retries uint, timeout time.Duration) bool {
50+
fmt.Printf("WaitUntilReconnected %v %v \n", retries, timeout)
51+
5052
for i := uint(0); ; i++ {
53+
fmt.Printf("reconnect in tests iteration i=%v\n", i)
54+
5155
connected := conn.ConnectedNow()
5256
if connected {
57+
fmt.Printf("ConnectedNow true\n")
5358
return true
5459
}
5560

@@ -58,6 +63,10 @@ func WaitUntilReconnected(conn *tarantool.Connection, retries uint, timeout time
5863
}
5964

6065
time.Sleep(timeout)
66+
67+
// resp, err := conn.Do(tarantool.NewPingRequest()).Get()
68+
fmt.Printf("reconnect in tests iteration i=%v\n", i)
69+
// fmt.Printf("reconnect tests; err ping %v\n", err)
6170
}
6271

6372
return false

0 commit comments

Comments
 (0)