Skip to content

Commit 0cbf552

Browse files
committed
pool: add a new method Pool.DoInstance
The method allows to execute a request on the target instance in a pool. Closes #376
1 parent f02579a commit 0cbf552

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
3737
the response (#237)
3838
- Ability to mock connections for tests (#237). Added new types `MockDoer`,
3939
`MockRequest` to `test_helpers`.
40+
- New method `Pool.DoInstance` to execute a request on a target instance in
41+
a pool (#376).
4042

4143
### Changed
4244

pool/connection_pool.go

+10
Original file line numberDiff line numberDiff line change
@@ -1002,6 +1002,16 @@ func (p *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarantool.Fut
10021002
return conn.Do(req)
10031003
}
10041004

1005+
// DoInstance sends the request into a target instance and returns a future.
1006+
func (p *ConnectionPool) DoInstance(req tarantool.Request, name string) *tarantool.Future {
1007+
conn := p.anyPool.GetConnection(name)
1008+
if conn == nil {
1009+
return newErrorFuture(ErrNoHealthyInstance)
1010+
}
1011+
1012+
return conn.Do(req)
1013+
}
1014+
10051015
//
10061016
// private
10071017
//

pool/connection_pool_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -2540,6 +2540,76 @@ func TestDo_concurrent(t *testing.T) {
25402540
wg.Wait()
25412541
}
25422542

2543+
func TestDoInstance(t *testing.T) {
2544+
ctx, cancel := test_helpers.GetPoolConnectContext()
2545+
defer cancel()
2546+
2547+
connPool, err := pool.Connect(ctx, instances)
2548+
require.Nilf(t, err, "failed to connect")
2549+
require.NotNilf(t, connPool, "conn is nil after Connect")
2550+
2551+
defer connPool.Close()
2552+
2553+
req := tarantool.NewEvalRequest("return box.cfg.listen")
2554+
for _, server := range servers {
2555+
data, err := connPool.DoInstance(req, server).Get()
2556+
require.NoError(t, err)
2557+
assert.Equal(t, []interface{}{server}, data)
2558+
}
2559+
}
2560+
2561+
func TestDoInstance_not_found(t *testing.T) {
2562+
roles := []bool{true, true, false, true, false}
2563+
2564+
err := test_helpers.SetClusterRO(dialers, connOpts, roles)
2565+
require.Nilf(t, err, "fail to set roles for cluster")
2566+
2567+
ctx, cancel := test_helpers.GetPoolConnectContext()
2568+
defer cancel()
2569+
2570+
connPool, err := pool.Connect(ctx, []pool.Instance{})
2571+
require.Nilf(t, err, "failed to connect")
2572+
require.NotNilf(t, connPool, "conn is nil after Connect")
2573+
2574+
defer connPool.Close()
2575+
2576+
data, err := connPool.DoInstance(tarantool.NewPingRequest(), "not_exist").Get()
2577+
assert.Nil(t, data)
2578+
require.ErrorIs(t, err, pool.ErrNoHealthyInstance)
2579+
}
2580+
2581+
func TestDoInstance_concurrent(t *testing.T) {
2582+
ctx, cancel := test_helpers.GetPoolConnectContext()
2583+
defer cancel()
2584+
connPool, err := pool.Connect(ctx, instances)
2585+
require.Nilf(t, err, "failed to connect")
2586+
require.NotNilf(t, connPool, "conn is nil after Connect")
2587+
2588+
defer connPool.Close()
2589+
2590+
eval := tarantool.NewEvalRequest("return box.cfg.listen")
2591+
ping := tarantool.NewPingRequest()
2592+
const concurrency = 100
2593+
var wg sync.WaitGroup
2594+
wg.Add(concurrency)
2595+
2596+
for i := 0; i < concurrency; i++ {
2597+
go func() {
2598+
defer wg.Done()
2599+
2600+
for _, server := range servers {
2601+
data, err := connPool.DoInstance(eval, server).Get()
2602+
require.NoError(t, err)
2603+
assert.Equal(t, []interface{}{server}, data)
2604+
}
2605+
_, err := connPool.DoInstance(ping, "not_exist").Get()
2606+
require.ErrorIs(t, err, pool.ErrNoHealthyInstance)
2607+
}()
2608+
}
2609+
2610+
wg.Wait()
2611+
}
2612+
25432613
func TestNewPrepared(t *testing.T) {
25442614
test_helpers.SkipIfSQLUnsupported(t)
25452615

0 commit comments

Comments
 (0)