From 09450c621e91d47b5caabb9d2af9a90a04519154 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 14 Apr 2025 00:48:14 +0300 Subject: [PATCH 1/2] tests: fix flaky TestConnect_schema_update The tests flaks sometime with the error: ``` failed to identify: read error: context deadline exceeded ``` It was decided just to increase a connect timeout. --- tarantool_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tarantool_test.go b/tarantool_test.go index 3f1e90ef..ded7ba45 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -3927,11 +3927,12 @@ func TestConnect_schema_update(t *testing.T) { conn := test_helpers.ConnectWithValidation(t, dialer, opts) defer conn.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for i := 0; i < 100; i++ { fut := conn.Do(NewCallRequest("create_spaces")) - ctx, cancel := test_helpers.GetConnectContext() - defer cancel() if conn, err := Connect(ctx, dialer, opts); err != nil { if err.Error() != "concurrent schema update" { t.Errorf("unexpected error: %s", err) From 6a0c2df94f8152389b60248d958e8758fb0059bc Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Mon, 14 Apr 2025 00:18:25 +0300 Subject: [PATCH 2/2] pool: fix pool.Connect if a server i/o hangs Previously, `pool.Connect` attempted to establish a connection one after another instance. It could cause the entire chain to hang if one connection hanged. Now connections are established in parallel. After the first successful connection, the remaining connections wait with a timeout of `pool.Opts.CheckTimeout`. Closes #TNTP-2018 --- CHANGELOG.md | 8 +++ pool/connection_pool.go | 100 +++++++++++++++++++++-------------- pool/connection_pool_test.go | 75 ++++++++++++++++++++------ 3 files changed, 125 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9c59d95..c5a161d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,18 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Changed +- Previously, `pool.Connect` attempted to establish a connection one after + another instance. It could cause the entire chain to hang if one connection + hanged. Now connections are established in parallel. After the first + successful connection, the remaining connections wait with a timeout of + `pool.Opts.CheckTimeout` (#444). + ### Fixed - Connect() may not cancel Dial() call on context expiration if network connection hangs (#443). +- pool.Connect() failed to connect to any instance if a first instance + connection hangs (#444). ## [v2.3.1] - 2025-04-03 diff --git a/pool/connection_pool.go b/pool/connection_pool.go index fa5537a6..a47ec19a 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -171,7 +171,7 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, roPool := newRoundRobinStrategy(size) anyPool := newRoundRobinStrategy(size) - connPool := &ConnectionPool{ + p := &ConnectionPool{ ends: make(map[string]*endpoint), opts: opts, state: connectedState, @@ -181,19 +181,44 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, anyPool: anyPool, } - canceled := connPool.fillPools(ctx, instances) - if canceled { - connPool.state.set(closedState) - return nil, ErrContextCanceled + fillCtx, fillCancel := context.WithCancel(ctx) + defer fillCancel() + + var timeout <-chan time.Time + + timeout = make(chan time.Time) + filled := p.fillPools(fillCtx, instances) + done := 0 + success := len(instances) == 0 + + for done < len(instances) { + select { + case <-timeout: + fillCancel() + // To be sure that the branch is called only once. + timeout = make(chan time.Time) + case err := <-filled: + done++ + + if err == nil && !success { + timeout = time.After(opts.CheckTimeout) + success = true + } + } + } + + if !success && ctx.Err() != nil { + p.state.set(closedState) + return nil, ctx.Err() } - for _, endpoint := range connPool.ends { + for _, endpoint := range p.ends { endpointCtx, cancel := context.WithCancel(context.Background()) endpoint.cancel = cancel - go connPool.controller(endpointCtx, endpoint) + go p.controller(endpointCtx, endpoint) } - return connPool, nil + return p, nil } // Connect creates pool for instances with specified instances. Instances must @@ -1184,45 +1209,33 @@ func (p *ConnectionPool) handlerDeactivated(name string, conn *tarantool.Connect } } -func (p *ConnectionPool) deactivateConnection(name string, - conn *tarantool.Connection, role Role) { - p.deleteConnection(name) - conn.Close() - p.handlerDeactivated(name, conn, role) -} +func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) <-chan error { + done := make(chan error, len(instances)) -func (p *ConnectionPool) deactivateConnections() { - for name, endpoint := range p.ends { - if endpoint != nil && endpoint.conn != nil { - p.deactivateConnection(name, endpoint.conn, endpoint.role) - } - } -} - -func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool { // It is called before controller() goroutines, so we don't expect // concurrency issues here. for _, instance := range instances { end := newEndpoint(instance.Name, instance.Dialer, instance.Opts) p.ends[instance.Name] = end + } - if err := p.tryConnect(ctx, end); err != nil { - log.Printf("tarantool: connect to %s failed: %s\n", - instance.Name, err) - select { - case <-ctx.Done(): - p.ends[instance.Name] = nil - log.Printf("tarantool: operation was canceled") + for _, instance := range instances { + name := instance.Name + end := p.ends[name] - p.deactivateConnections() + go func() { + if err := p.tryConnect(ctx, end); err != nil { + log.Printf("tarantool: connect to %s failed: %s\n", name, err) + done <- fmt.Errorf("failed to connect to %s :%w", name, err) - return true - default: + return } - } + + done <- nil + }() } - return false + return done } func (p *ConnectionPool) updateConnection(e *endpoint) { @@ -1284,19 +1297,24 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { } func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { + e.conn = nil + e.role = UnknownRole + + connOpts := e.opts + connOpts.Notify = e.notify + conn, err := tarantool.Connect(ctx, e.dialer, connOpts) + p.poolsMutex.Lock() if p.state.get() != connectedState { + if err == nil { + conn.Close() + } + p.poolsMutex.Unlock() return ErrClosed } - e.conn = nil - e.role = UnknownRole - - connOpts := e.opts - connOpts.Notify = e.notify - conn, err := tarantool.Connect(ctx, e.dialer, connOpts) if err == nil { role, err := p.getConnectionRole(conn) p.poolsMutex.Unlock() diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index 323ce37c..f3bf5f55 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "log" + "net" "os" "reflect" "strings" @@ -141,7 +142,7 @@ func TestConnSuccessfully(t *testing.T) { } err = test_helpers.CheckPoolStatuses(args) - require.Nil(t, err) + require.NoError(t, err) } func TestConn_no_execute_supported(t *testing.T) { @@ -261,6 +262,51 @@ func TestConnect_unavailable(t *testing.T) { }, connPool.GetInfo()) } +func TestConnect_single_server_hang(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer l.Close() + + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + insts := makeInstances([]string{l.Addr().String()}, connOpts) + + connPool, err := pool.Connect(ctx, insts) + if connPool != nil { + defer connPool.Close() + } + + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, connPool) +} + +func TestConnect_server_hang(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer l.Close() + + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + servers := []string{l.Addr().String(), servers[0]} + insts := makeInstances(servers, connOpts) + + connPool, err := pool.Connect(ctx, insts) + if connPool != nil { + defer connPool.Close() + } + + require.NoError(t, err, "failed to create a pool") + require.NotNil(t, connPool, "pool is nil after Connect") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ + ConnectedNow: false, ConnRole: pool.UnknownRole, Instance: insts[0]}, + servers[1]: pool.ConnectionInfo{ + ConnectedNow: true, ConnRole: pool.MasterRole, Instance: insts[1]}, + }, connPool.GetInfo()) +} + func TestConnErrorAfterCtxCancel(t *testing.T) { var connLongReconnectOpts = tarantool.Opts{ Timeout: 5 * time.Second, @@ -279,7 +325,7 @@ func TestConnErrorAfterCtxCancel(t *testing.T) { if connPool != nil || err == nil { t.Fatalf("ConnectionPool was created after cancel") } - if !strings.Contains(err.Error(), "operation was canceled") { + if !strings.Contains(err.Error(), "context canceled") { t.Fatalf("Unexpected error, expected to contain %s, got %v", "operation was canceled", err) } @@ -287,7 +333,6 @@ func TestConnErrorAfterCtxCancel(t *testing.T) { type mockClosingDialer struct { addr string - cnt *int ctx context.Context ctxCancel context.CancelFunc } @@ -301,26 +346,21 @@ func (m *mockClosingDialer) Dial(ctx context.Context, } conn, err := dialer.Dial(m.ctx, tarantool.DialOpts{}) - if *m.cnt == 0 { - m.ctxCancel() - } - *m.cnt++ + m.ctxCancel() return conn, err } -func TestContextCancelInProgress(t *testing.T) { +func TestConnectContextCancelAfterConnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cnt := new(int) var instances []pool.Instance for _, server := range servers { instances = append(instances, pool.Instance{ Name: server, Dialer: &mockClosingDialer{ addr: server, - cnt: cnt, ctx: ctx, ctxCancel: cancel, }, @@ -329,11 +369,12 @@ func TestContextCancelInProgress(t *testing.T) { } connPool, err := pool.Connect(ctx, instances) - require.NotNilf(t, err, "expected err after ctx cancel") - assert.Truef(t, strings.Contains(err.Error(), "operation was canceled"), - fmt.Sprintf("unexpected error, expected to contain %s, got %v", - "operation was canceled", err)) - require.Nilf(t, connPool, "conn is not nil after ctx cancel") + if connPool != nil { + defer connPool.Close() + } + + assert.NoError(t, err, "expected err after ctx cancel") + assert.NotNil(t, connPool) } func TestConnSuccessfullyDuplicates(t *testing.T) { @@ -527,8 +568,8 @@ func TestAdd(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() connPool, err := pool.Connect(ctx, []pool.Instance{}) - require.Nilf(t, err, "failed to connect") - require.NotNilf(t, connPool, "conn is nil after Connect") + require.NoError(t, err, "failed to connect") + require.NotNil(t, connPool, "conn is nil after Connect") defer connPool.Close()