Skip to content

pool: add a connection even on connection error #374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
connection objects (#136). This function now does not attempt to reconnect
and tries to establish a connection only once. Function might be canceled
via context. Context accepted as first argument.
`pool.Connect` and `pool.Add` now accept context as first argument, which
user may cancel in process. If `pool.Connect` is canceled in progress, an
`pool.Connect` and `pool.Add` now accept context as the first argument, which
user may cancel in process. If `pool.Connect` is canceled in progress, an
error will be returned. All created connections will be closed.
- `iproto.Feature` type now used instead of `ProtocolFeature` (#337)
- `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature`
Expand Down Expand Up @@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Renamed `StrangerResponse` to `MockResponse` (#237)
- `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type
`pool.Instance` to determinate connection options (#356)
- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to
the pool even it is unable to connect to it (#372)

### Deprecated

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ The subpackage has been deleted. You could use `pool` instead.
the second argument instead of a list of addresses. Each instance is
associated with a unique string name, `Dialer` and connection options which
allows instances to be independently configured.
* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into
the pool even it is unable to connect to it. The pool will try to connect to
the instance later.
* `pool.Add` now accepts context as the first argument, which user may cancel
in process.
* `pool.Add` now accepts `pool.Instance` as the second argument instead of
Expand Down
109 changes: 42 additions & 67 deletions pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
)

var (
ErrEmptyInstances = errors.New("instances (second argument) should not be empty")
ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
ErrNoConnection = errors.New("no active connections")
ErrTooManyArgs = errors.New("too many arguments")
ErrIncorrectResponse = errors.New("incorrect response format")
ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`")
Expand Down Expand Up @@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
// opts. Instances must have unique names.
func ConnectWithOpts(ctx context.Context, instances []Instance,
opts Opts) (*ConnectionPool, error) {
if len(instances) == 0 {
return nil, ErrEmptyInstances
}
unique := make(map[string]bool)
for _, instance := range instances {
if _, ok := unique[instance.Name]; ok {
Expand All @@ -178,28 +173,23 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
connPool := &ConnectionPool{
ends: make(map[string]*endpoint),
opts: opts,
state: unknownState,
state: connectedState,
done: make(chan struct{}),
rwPool: rwPool,
roPool: roPool,
anyPool: anyPool,
}

somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances)
if !somebodyAlive {
canceled := connPool.fillPools(ctx, instances)
if canceled {
connPool.state.set(closedState)
if ctxCanceled {
return nil, ErrContextCanceled
}
return nil, ErrNoConnection
return nil, ErrContextCanceled
}

connPool.state.set(connectedState)

for _, s := range connPool.ends {
for _, endpoint := range connPool.ends {
endpointCtx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
go connPool.controller(endpointCtx, s)
endpoint.cancel = cancel
go connPool.controller(endpointCtx, endpoint)
}

return connPool, nil
Expand Down Expand Up @@ -252,8 +242,12 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
return conn.ConfiguredTimeout(), nil
}

// Add adds a new instance into the pool. This function adds the instance
// only after successful connection.
// Add adds a new instance into the pool. The pool will try to connect to the
// instance later if it is unable to establish a connection.
//
// The function may return an error and don't add the instance into the pool
// if the context has been cancelled or on concurrent Close()/CloseGraceful()
// call.
func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
e := newEndpoint(instance.Name, instance.Dialer, instance.Opts)

Expand All @@ -268,19 +262,34 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
return ErrExists
}

endpointCtx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
endpointCtx, endpointCancel := context.WithCancel(context.Background())
connectCtx, connectCancel := context.WithCancel(ctx)
e.cancel = func() {
connectCancel()
endpointCancel()
}

p.ends[instance.Name] = e
p.endsMutex.Unlock()

if err := p.tryConnect(ctx, e); err != nil {
p.endsMutex.Lock()
delete(p.ends, instance.Name)
p.endsMutex.Unlock()
e.cancel()
close(e.closed)
return err
if err := p.tryConnect(connectCtx, e); err != nil {
var canceled bool
select {
case <-connectCtx.Done():
canceled = true
case <-endpointCtx.Done():
canceled = true
default:
canceled = false
}
if canceled {
p.endsMutex.Lock()
delete(p.ends, instance.Name)
p.endsMutex.Unlock()
e.cancel()
close(e.closed)
return err
}
}

go p.controller(endpointCtx, e)
Expand Down Expand Up @@ -1145,64 +1154,30 @@ func (p *ConnectionPool) deactivateConnections() {
}
}

func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
name string, end *endpoint) bool {
role, err := p.getConnectionRole(conn)
if err != nil {
conn.Close()
log.Printf("tarantool: storing connection to %s failed: %s\n", name, err)
return false
}

if !p.handlerDiscovered(name, conn, role) {
conn.Close()
return false
}
if p.addConnection(name, conn, role) != nil {
conn.Close()
p.handlerDeactivated(name, conn, role)
return false
}

end.conn = conn
end.role = role
return true
}

func (p *ConnectionPool) fillPools(ctx context.Context,
instances []Instance) (bool, bool) {
somebodyAlive := false
ctxCanceled := false

func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool {
// It is called before controller() goroutines, so we don't expect
// concurrency issues here.
for _, instance := range instances {
end := newEndpoint(instance.Name, instance.Dialer, instance.Opts)
p.ends[instance.Name] = end
connOpts := instance.Opts
connOpts.Notify = end.notify
conn, err := tarantool.Connect(ctx, instance.Dialer, connOpts)
if err != nil {

if err := p.tryConnect(ctx, end); err != nil {
log.Printf("tarantool: connect to %s failed: %s\n",
instance.Name, err)
select {
case <-ctx.Done():
ctxCanceled = true

p.ends[instance.Name] = nil
log.Printf("tarantool: operation was canceled")

p.deactivateConnections()

return false, ctxCanceled
return true
default:
}
} else if p.processConnection(conn, instance.Name, end) {
somebodyAlive = true
}
}

return somebodyAlive, ctxCanceled
return false
}

func (p *ConnectionPool) updateConnection(e *endpoint) {
Expand Down
Loading