Skip to content

Commit 74ae72d

Browse files
committed
pool: add a connection even on connection error
From a user's perspective, it is useful to add all target instances to the pool, even some that are not currently unavailable. This way the user don’t have to keep track of the list of actually added instances. The patch make it possible. Closes #372
1 parent b8d9914 commit 74ae72d

File tree

4 files changed

+90
-84
lines changed

4 files changed

+90
-84
lines changed

CHANGELOG.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
5252
connection objects (#136). This function now does not attempt to reconnect
5353
and tries to establish a connection only once. Function might be canceled
5454
via context. Context accepted as first argument.
55-
`pool.Connect` and `pool.Add` now accept context as first argument, which
56-
user may cancel in process. If `pool.Connect` is canceled in progress, an
55+
`pool.Connect` now accept context as first argument, which user may cancel
56+
in process. If `pool.Connect` is canceled in progress, an
5757
error will be returned. All created connections will be closed.
5858
- `iproto.Feature` type now used instead of `ProtocolFeature` (#337)
5959
- `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature`
@@ -95,6 +95,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
9595
- Renamed `StrangerResponse` to `MockResponse` (#237)
9696
- `pool.Connect`, `pool.ConnetcWithOpts` and `pool.Add` use a new type
9797
`pool.Instance` to determinate connection options (#356)
98+
- `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add connections to
99+
the pool even it is unable to connect to it (#372)
98100

99101
### Deprecated
100102

README.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,10 @@ The subpackage has been deleted. You could use `pool` instead.
199199
the second argument instead of a list of addresses. Each instance is
200200
associated with a unique string name, `Dialer` and connection options which
201201
allows instances to be independently configured.
202-
* `pool.Add` now accepts context as the first argument, which user may cancel
203-
in process.
204-
* `pool.Add` now accepts `pool.Instance` as the second argument instead of
202+
* `pool.Connect`, `pool.ConnectWithOpts` and `pool.Add` add instances into
203+
the pool even it is unable to connect to it. The pool will try to connect to
204+
the instance later.
205+
* `pool.Add` now accepts `pool.Instance` as the first argument instead of
205206
an address, it allows to configure a new instance more flexible.
206207
* `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been
207208
changed to `map[string]ConnectionInfo`.

pool/connection_pool.go

+18-38
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ import (
2424
)
2525

2626
var (
27-
ErrEmptyInstances = errors.New("instances (second argument) should not be empty")
2827
ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
29-
ErrNoConnection = errors.New("no active connections")
3028
ErrTooManyArgs = errors.New("too many arguments")
3129
ErrIncorrectResponse = errors.New("incorrect response format")
3230
ErrIncorrectStatus = errors.New("incorrect instance status: status should be `running`")
@@ -155,9 +153,6 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end
155153
// opts. Instances must have unique names.
156154
func ConnectWithOpts(ctx context.Context, instances []Instance,
157155
opts Opts) (*ConnectionPool, error) {
158-
if len(instances) == 0 {
159-
return nil, ErrEmptyInstances
160-
}
161156
unique := make(map[string]bool)
162157
for _, instance := range instances {
163158
if _, ok := unique[instance.Name]; ok {
@@ -185,13 +180,10 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
185180
anyPool: anyPool,
186181
}
187182

188-
somebodyAlive, ctxCanceled := connPool.fillPools(ctx, instances)
189-
if !somebodyAlive {
183+
canceled := connPool.fillPools(ctx, instances)
184+
if canceled {
190185
connPool.state.set(closedState)
191-
if ctxCanceled {
192-
return nil, ErrContextCanceled
193-
}
194-
return nil, ErrNoConnection
186+
return nil, ErrContextCanceled
195187
}
196188

197189
connPool.state.set(connectedState)
@@ -252,9 +244,9 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
252244
return conn.ConfiguredTimeout(), nil
253245
}
254246

255-
// Add adds a new instance into the pool. This function adds the instance
256-
// only after successful connection.
257-
func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
247+
// Add adds a new instance into the pool. The pool will try to connect to the
248+
// instance later if it is unable to establish a connection.
249+
func (p *ConnectionPool) Add(instance Instance) error {
258250
e := newEndpoint(instance.Name, instance.Dialer, instance.Opts)
259251

260252
p.endsMutex.Lock()
@@ -274,14 +266,9 @@ func (p *ConnectionPool) Add(ctx context.Context, instance Instance) error {
274266
p.ends[instance.Name] = e
275267
p.endsMutex.Unlock()
276268

277-
if err := p.tryConnect(ctx, e); err != nil {
278-
p.endsMutex.Lock()
279-
delete(p.ends, instance.Name)
280-
p.endsMutex.Unlock()
281-
e.cancel()
282-
close(e.closed)
283-
return err
284-
}
269+
// The result does not matter, but we should try to connect to the instance
270+
// as fast as possible.
271+
_ = p.tryConnect(endpointCtx, e)
285272

286273
go p.controller(endpointCtx, e)
287274
return nil
@@ -1146,34 +1133,29 @@ func (p *ConnectionPool) deactivateConnections() {
11461133
}
11471134

11481135
func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
1149-
name string, end *endpoint) bool {
1136+
name string, end *endpoint) {
11501137
role, err := p.getConnectionRole(conn)
11511138
if err != nil {
11521139
conn.Close()
11531140
log.Printf("tarantool: storing connection to %s failed: %s\n", name, err)
1154-
return false
1141+
return
11551142
}
11561143

11571144
if !p.handlerDiscovered(name, conn, role) {
11581145
conn.Close()
1159-
return false
1146+
return
11601147
}
11611148
if p.addConnection(name, conn, role) != nil {
11621149
conn.Close()
11631150
p.handlerDeactivated(name, conn, role)
1164-
return false
1151+
return
11651152
}
11661153

11671154
end.conn = conn
11681155
end.role = role
1169-
return true
11701156
}
11711157

1172-
func (p *ConnectionPool) fillPools(ctx context.Context,
1173-
instances []Instance) (bool, bool) {
1174-
somebodyAlive := false
1175-
ctxCanceled := false
1176-
1158+
func (p *ConnectionPool) fillPools(ctx context.Context, instances []Instance) bool {
11771159
// It is called before controller() goroutines, so we don't expect
11781160
// concurrency issues here.
11791161
for _, instance := range instances {
@@ -1187,22 +1169,20 @@ func (p *ConnectionPool) fillPools(ctx context.Context,
11871169
instance.Name, err)
11881170
select {
11891171
case <-ctx.Done():
1190-
ctxCanceled = true
1191-
11921172
p.ends[instance.Name] = nil
11931173
log.Printf("tarantool: operation was canceled")
11941174

11951175
p.deactivateConnections()
11961176

1197-
return false, ctxCanceled
1177+
return true
11981178
default:
11991179
}
1200-
} else if p.processConnection(conn, instance.Name, end) {
1201-
somebodyAlive = true
1180+
} else {
1181+
p.processConnection(conn, instance.Name, end)
12021182
}
12031183
}
12041184

1205-
return somebodyAlive, ctxCanceled
1185+
return false
12061186
}
12071187

12081188
func (p *ConnectionPool) updateConnection(e *endpoint) {

pool/connection_pool_test.go

+64-41
Original file line numberDiff line numberDiff line change
@@ -86,22 +86,6 @@ var defaultTimeoutRetry = 500 * time.Millisecond
8686

8787
var helpInstances []test_helpers.TarantoolInstance
8888

89-
func TestConnect_error_empty_instances(t *testing.T) {
90-
ctx, cancel := test_helpers.GetPoolConnectContext()
91-
connPool, err := pool.Connect(ctx, []pool.Instance{})
92-
cancel()
93-
require.Nilf(t, connPool, "conn is not nil with incorrect param")
94-
require.ErrorIs(t, err, pool.ErrEmptyInstances)
95-
}
96-
97-
func TestConnect_error_unavailable(t *testing.T) {
98-
ctx, cancel := test_helpers.GetPoolConnectContext()
99-
connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts))
100-
cancel()
101-
require.Nilf(t, connPool, "conn is not nil with incorrect param")
102-
require.ErrorIs(t, err, pool.ErrNoConnection)
103-
}
104-
10589
func TestConnect_error_duplicate(t *testing.T) {
10690
ctx, cancel := test_helpers.GetPoolConnectContext()
10791
connPool, err := pool.Connect(ctx, makeInstances([]string{"foo", "foo"}, connOpts))
@@ -145,6 +129,48 @@ func TestConnSuccessfully(t *testing.T) {
145129
require.Nil(t, err)
146130
}
147131

132+
func TestConnect_empty(t *testing.T) {
133+
cases := []struct {
134+
Name string
135+
Instances []pool.Instance
136+
}{
137+
{"nil", nil},
138+
{"empty", []pool.Instance{}},
139+
}
140+
141+
for _, tc := range cases {
142+
t.Run(tc.Name, func(t *testing.T) {
143+
ctx, cancel := test_helpers.GetPoolConnectContext()
144+
defer cancel()
145+
connPool, err := pool.Connect(ctx, tc.Instances)
146+
if connPool != nil {
147+
defer connPool.Close()
148+
}
149+
require.NoError(t, err, "failed to create a pool")
150+
require.NotNilf(t, connPool, "pool is nil after Connect")
151+
require.Lenf(t, connPool.GetInfo(), 0, "empty pool expected")
152+
})
153+
}
154+
}
155+
156+
func TestConnect_unavailable(t *testing.T) {
157+
servers := []string{"err1", "err2"}
158+
ctx, cancel := test_helpers.GetPoolConnectContext()
159+
connPool, err := pool.Connect(ctx, makeInstances([]string{"err1", "err2"}, connOpts))
160+
cancel()
161+
162+
if connPool != nil {
163+
defer connPool.Close()
164+
}
165+
166+
require.NoError(t, err, "failed to create a pool")
167+
require.NotNilf(t, connPool, "pool is nil after Connect")
168+
require.Equal(t, map[string]pool.ConnectionInfo{
169+
servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
170+
servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
171+
}, connPool.GetInfo())
172+
}
173+
148174
func TestConnErrorAfterCtxCancel(t *testing.T) {
149175
var connLongReconnectOpts = tarantool.Opts{
150176
Timeout: 5 * time.Second,
@@ -410,16 +436,14 @@ func TestDisconnectAll(t *testing.T) {
410436
func TestAdd(t *testing.T) {
411437
ctx, cancel := test_helpers.GetPoolConnectContext()
412438
defer cancel()
413-
connPool, err := pool.Connect(ctx, makeInstances(servers[:1], connOpts))
439+
connPool, err := pool.Connect(ctx, []pool.Instance{})
414440
require.Nilf(t, err, "failed to connect")
415441
require.NotNilf(t, connPool, "conn is nil after Connect")
416442

417443
defer connPool.Close()
418444

419-
for _, server := range servers[1:] {
420-
ctx, cancel := test_helpers.GetConnectContext()
421-
err = connPool.Add(ctx, makeInstance(server, connOpts))
422-
cancel()
445+
for _, server := range servers {
446+
err = connPool.Add(makeInstance(server, connOpts))
423447
require.Nil(t, err)
424448
}
425449

@@ -452,9 +476,7 @@ func TestAdd_exist(t *testing.T) {
452476

453477
defer connPool.Close()
454478

455-
ctx, cancel = test_helpers.GetConnectContext()
456-
err = connPool.Add(ctx, makeInstance(server, connOpts))
457-
cancel()
479+
err = connPool.Add(makeInstance(server, connOpts))
458480
require.Equal(t, pool.ErrExists, err)
459481

460482
args := test_helpers.CheckStatusesArgs{
@@ -484,25 +506,23 @@ func TestAdd_unreachable(t *testing.T) {
484506
defer connPool.Close()
485507

486508
unhealthyServ := "127.0.0.2:6667"
487-
ctx, cancel = test_helpers.GetConnectContext()
488-
err = connPool.Add(ctx, pool.Instance{
509+
err = connPool.Add(pool.Instance{
489510
Name: unhealthyServ,
490511
Dialer: tarantool.NetDialer{
491512
Address: unhealthyServ,
492513
},
493514
Opts: connOpts,
494515
})
495-
cancel()
496-
// The OS-dependent error so we just check for existence.
497-
require.NotNil(t, err)
516+
require.NoError(t, err)
498517

499518
args := test_helpers.CheckStatusesArgs{
500519
ConnPool: connPool,
501520
Mode: pool.ANY,
502521
Servers: servers,
503522
ExpectedPoolStatus: true,
504523
ExpectedStatuses: map[string]bool{
505-
server: true,
524+
server: true,
525+
unhealthyServ: false,
506526
},
507527
}
508528

@@ -520,9 +540,7 @@ func TestAdd_afterClose(t *testing.T) {
520540
require.NotNilf(t, connPool, "conn is nil after Connect")
521541

522542
connPool.Close()
523-
ctx, cancel = test_helpers.GetConnectContext()
524-
err = connPool.Add(ctx, makeInstance(server, connOpts))
525-
cancel()
543+
err = connPool.Add(makeInstance(server, connOpts))
526544
assert.Equal(t, err, pool.ErrClosed)
527545
}
528546

@@ -541,9 +559,7 @@ func TestAdd_Close_concurrent(t *testing.T) {
541559
go func() {
542560
defer wg.Done()
543561

544-
ctx, cancel := test_helpers.GetConnectContext()
545-
err = connPool.Add(ctx, makeInstance(serv1, connOpts))
546-
cancel()
562+
err = connPool.Add(makeInstance(serv1, connOpts))
547563
if err != nil {
548564
assert.Equal(t, pool.ErrClosed, err)
549565
}
@@ -569,9 +585,7 @@ func TestAdd_CloseGraceful_concurrent(t *testing.T) {
569585
go func() {
570586
defer wg.Done()
571587

572-
ctx, cancel := test_helpers.GetConnectContext()
573-
err = connPool.Add(ctx, makeInstance(serv1, connOpts))
574-
cancel()
588+
err = connPool.Add(makeInstance(serv1, connOpts))
575589
if err != nil {
576590
assert.Equal(t, pool.ErrClosed, err)
577591
}
@@ -1028,8 +1042,17 @@ func TestConnectionHandlerOpenError(t *testing.T) {
10281042
if err == nil {
10291043
defer connPool.Close()
10301044
}
1031-
require.NotNilf(t, err, "success to connect")
1032-
require.Equalf(t, 2, h.discovered, "unexpected discovered count")
1045+
require.NoError(t, err, "failed to connect")
1046+
require.NotNil(t, connPool, "pool expected")
1047+
require.Equal(t, map[string]pool.ConnectionInfo{
1048+
servers[0]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
1049+
servers[1]: pool.ConnectionInfo{ConnectedNow: false, ConnRole: pool.UnknownRole},
1050+
}, connPool.GetInfo())
1051+
connPool.Close()
1052+
1053+
// It could happen additional reconnect attempts in the background, but
1054+
// at least 2 connects on start.
1055+
require.GreaterOrEqualf(t, h.discovered, 2, "unexpected discovered count")
10331056
require.Equalf(t, 0, h.deactivated, "unexpected deactivated count")
10341057
}
10351058

0 commit comments

Comments
 (0)