diff --git a/CHANGELOG.md b/CHANGELOG.md index f9c59d95..c5a161d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,18 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Changed +- Previously, `pool.Connect` attempted to establish a connection one after + another instance. It could cause the entire chain to hang if one connection + hanged. Now connections are established in parallel. After the first + successful connection, the remaining connections wait with a timeout of + `pool.Opts.CheckTimeout` (#444). + ### Fixed - Connect() may not cancel Dial() call on context expiration if network connection hangs (#443). +- pool.Connect() failed to connect to any instance if a first instance + connection hangs (#444). ## [v2.3.1] - 2025-04-03 diff --git a/pool/connection_pool.go b/pool/connection_pool.go index fa5537a6..a47ec19a 100644 --- a/pool/connection_pool.go +++ b/pool/connection_pool.go @@ -171,7 +171,7 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, roPool := newRoundRobinStrategy(size) anyPool := newRoundRobinStrategy(size) - connPool := &ConnectionPool{ + p := &ConnectionPool{ ends: make(map[string]*endpoint), opts: opts, state: connectedState, @@ -181,19 +181,44 @@ func ConnectWithOpts(ctx context.Context, instances []Instance, anyPool: anyPool, } - canceled := connPool.fillPools(ctx, instances) - if canceled { - connPool.state.set(closedState) - return nil, ErrContextCanceled + fillCtx, fillCancel := context.WithCancel(ctx) + defer fillCancel() + + var timeout <-chan time.Time + + timeout = make(chan time.Time) + filled := p.fillPools(fillCtx, instances) + done := 0 + success := len(instances) == 0 + + for done < len(instances) { + select { + case <-timeout: + fillCancel() + // To be sure that the branch is called only once. + timeout = make(chan time.Time) + case err := <-filled: + done++ + + if err == nil && !success { + timeout = time.After(opts.CheckTimeout) + success = true + } + } + } + + if !success && ctx.Err() != nil { + p.state.set(closedState) + return nil, ctx.Err() } - for _, endpoint := range connPool.ends { + for _, endpoint := range p.ends { endpointCtx, cancel := context.WithCancel(context.Background()) endpoint.cancel = cancel - go connPool.controller(endpointCtx, endpoint) + go p.controller(endpointCtx, endpoint) } - return connPool, nil + return p, nil } // Connect creates pool for instances with specified instances. Instances must @@ -1184,45 +1209,33 @@ func (p *ConnectionPool) handlerDeactivated(name string, conn *tarantool.Connect } } -func (p *ConnectionPool) deactivateConnection(name string, - conn *tarantool.Connection, role Role) { - p.deleteConnection(name) - conn.Close() - p.handlerDeactivated(name, conn, role) -} +func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) <-chan error { + done := make(chan error, len(instances)) -func (p *ConnectionPool) deactivateConnections() { - for name, endpoint := range p.ends { - if endpoint != nil && endpoint.conn != nil { - p.deactivateConnection(name, endpoint.conn, endpoint.role) - } - } -} - -func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool { // It is called before controller() goroutines, so we don't expect // concurrency issues here. for _, instance := range instances { end := newEndpoint(instance.Name, instance.Dialer, instance.Opts) p.ends[instance.Name] = end + } - if err := p.tryConnect(ctx, end); err != nil { - log.Printf("tarantool: connect to %s failed: %s\n", - instance.Name, err) - select { - case <-ctx.Done(): - p.ends[instance.Name] = nil - log.Printf("tarantool: operation was canceled") + for _, instance := range instances { + name := instance.Name + end := p.ends[name] - p.deactivateConnections() + go func() { + if err := p.tryConnect(ctx, end); err != nil { + log.Printf("tarantool: connect to %s failed: %s\n", name, err) + done <- fmt.Errorf("failed to connect to %s :%w", name, err) - return true - default: + return } - } + + done <- nil + }() } - return false + return done } func (p *ConnectionPool) updateConnection(e *endpoint) { @@ -1284,19 +1297,24 @@ func (p *ConnectionPool) updateConnection(e *endpoint) { } func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error { + e.conn = nil + e.role = UnknownRole + + connOpts := e.opts + connOpts.Notify = e.notify + conn, err := tarantool.Connect(ctx, e.dialer, connOpts) + p.poolsMutex.Lock() if p.state.get() != connectedState { + if err == nil { + conn.Close() + } + p.poolsMutex.Unlock() return ErrClosed } - e.conn = nil - e.role = UnknownRole - - connOpts := e.opts - connOpts.Notify = e.notify - conn, err := tarantool.Connect(ctx, e.dialer, connOpts) if err == nil { role, err := p.getConnectionRole(conn) p.poolsMutex.Unlock() diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index 323ce37c..f3bf5f55 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "log" + "net" "os" "reflect" "strings" @@ -141,7 +142,7 @@ func TestConnSuccessfully(t *testing.T) { } err = test_helpers.CheckPoolStatuses(args) - require.Nil(t, err) + require.NoError(t, err) } func TestConn_no_execute_supported(t *testing.T) { @@ -261,6 +262,51 @@ func TestConnect_unavailable(t *testing.T) { }, connPool.GetInfo()) } +func TestConnect_single_server_hang(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer l.Close() + + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + insts := makeInstances([]string{l.Addr().String()}, connOpts) + + connPool, err := pool.Connect(ctx, insts) + if connPool != nil { + defer connPool.Close() + } + + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Nil(t, connPool) +} + +func TestConnect_server_hang(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer l.Close() + + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + + servers := []string{l.Addr().String(), servers[0]} + insts := makeInstances(servers, connOpts) + + connPool, err := pool.Connect(ctx, insts) + if connPool != nil { + defer connPool.Close() + } + + require.NoError(t, err, "failed to create a pool") + require.NotNil(t, connPool, "pool is nil after Connect") + require.Equal(t, map[string]pool.ConnectionInfo{ + servers[0]: pool.ConnectionInfo{ + ConnectedNow: false, ConnRole: pool.UnknownRole, Instance: insts[0]}, + servers[1]: pool.ConnectionInfo{ + ConnectedNow: true, ConnRole: pool.MasterRole, Instance: insts[1]}, + }, connPool.GetInfo()) +} + func TestConnErrorAfterCtxCancel(t *testing.T) { var connLongReconnectOpts = tarantool.Opts{ Timeout: 5 * time.Second, @@ -279,7 +325,7 @@ func TestConnErrorAfterCtxCancel(t *testing.T) { if connPool != nil || err == nil { t.Fatalf("ConnectionPool was created after cancel") } - if !strings.Contains(err.Error(), "operation was canceled") { + if !strings.Contains(err.Error(), "context canceled") { t.Fatalf("Unexpected error, expected to contain %s, got %v", "operation was canceled", err) } @@ -287,7 +333,6 @@ func TestConnErrorAfterCtxCancel(t *testing.T) { type mockClosingDialer struct { addr string - cnt *int ctx context.Context ctxCancel context.CancelFunc } @@ -301,26 +346,21 @@ func (m *mockClosingDialer) Dial(ctx context.Context, } conn, err := dialer.Dial(m.ctx, tarantool.DialOpts{}) - if *m.cnt == 0 { - m.ctxCancel() - } - *m.cnt++ + m.ctxCancel() return conn, err } -func TestContextCancelInProgress(t *testing.T) { +func TestConnectContextCancelAfterConnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cnt := new(int) var instances []pool.Instance for _, server := range servers { instances = append(instances, pool.Instance{ Name: server, Dialer: &mockClosingDialer{ addr: server, - cnt: cnt, ctx: ctx, ctxCancel: cancel, }, @@ -329,11 +369,12 @@ func TestContextCancelInProgress(t *testing.T) { } connPool, err := pool.Connect(ctx, instances) - require.NotNilf(t, err, "expected err after ctx cancel") - assert.Truef(t, strings.Contains(err.Error(), "operation was canceled"), - fmt.Sprintf("unexpected error, expected to contain %s, got %v", - "operation was canceled", err)) - require.Nilf(t, connPool, "conn is not nil after ctx cancel") + if connPool != nil { + defer connPool.Close() + } + + assert.NoError(t, err, "expected err after ctx cancel") + assert.NotNil(t, connPool) } func TestConnSuccessfullyDuplicates(t *testing.T) { @@ -527,8 +568,8 @@ func TestAdd(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() connPool, err := pool.Connect(ctx, []pool.Instance{}) - require.Nilf(t, err, "failed to connect") - require.NotNilf(t, connPool, "conn is nil after Connect") + require.NoError(t, err, "failed to connect") + require.NotNil(t, connPool, "conn is nil after Connect") defer connPool.Close() diff --git a/tarantool_test.go b/tarantool_test.go index 3f1e90ef..ded7ba45 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -3927,11 +3927,12 @@ func TestConnect_schema_update(t *testing.T) { conn := test_helpers.ConnectWithValidation(t, dialer, opts) defer conn.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for i := 0; i < 100; i++ { fut := conn.Do(NewCallRequest("create_spaces")) - ctx, cancel := test_helpers.GetConnectContext() - defer cancel() if conn, err := Connect(ctx, dialer, opts); err != nil { if err.Error() != "concurrent schema update" { t.Errorf("unexpected error: %s", err)