Skip to content

Commit a057d23

Browse files
authored
Update client pool logic (#846)
* client/pool - allow to check a connection to DB inside pool initialization * fix Makefile after PR#803
1 parent b9563f7 commit a057d23

File tree

4 files changed

+225
-30
lines changed

4 files changed

+225
-30
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ test-local:
1919
-v $${PWD}/docker/resources/replication.cnf:/etc/mysql/conf.d/replication.cnf \
2020
mysql:$(MYSQL_VERSION)
2121
docker/resources/waitfor.sh 127.0.0.1 3306 \
22-
&& go test -race -v -timeout 2m ./... -gocheck.v
22+
&& go test -race -v -timeout 2m ./...
2323
docker stop go-mysql-server
2424

2525
fmt:

client/pool.go

+125-27
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"context"
5+
"log"
56
"math"
67
"math/rand"
78
"sync"
@@ -77,43 +78,46 @@ var (
7778
MaxNewConnectionAtOnce = 5
7879
)
7980

80-
// NewPool initializes new connection pool and uses params: addr, user, password, dbName and options.
81-
// minAlive specifies the minimum number of open connections that the pool will try to maintain.
82-
// maxAlive specifies the maximum number of open connections (for internal reasons,
83-
// may be greater by 1 inside newConnectionProducer).
84-
// maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
85-
func NewPool(
86-
logFunc LogFunc,
87-
minAlive int,
88-
maxAlive int,
89-
maxIdle int,
81+
// NewPoolWithOptions initializes new connection pool and uses params: addr, user, password, dbName and options.
82+
func NewPoolWithOptions(
9083
addr string,
9184
user string,
9285
password string,
9386
dbName string,
94-
options ...func(conn *Conn),
95-
) *Pool {
96-
if minAlive > maxAlive {
97-
minAlive = maxAlive
87+
options ...PoolOption,
88+
) (*Pool, error) {
89+
po := getDefaultPoolOptions()
90+
91+
po.addr = addr
92+
po.user = user
93+
po.password = password
94+
po.dbName = dbName
95+
96+
for _, o := range options {
97+
o(&po)
98+
}
99+
100+
if po.minAlive > po.maxAlive {
101+
po.minAlive = po.maxAlive
98102
}
99-
if maxIdle > maxAlive {
100-
maxIdle = maxAlive
103+
if po.maxIdle > po.maxAlive {
104+
po.maxIdle = po.maxAlive
101105
}
102-
if maxIdle <= minAlive {
103-
maxIdle = minAlive
106+
if po.maxIdle <= po.minAlive {
107+
po.maxIdle = po.minAlive
104108
}
105109

106110
pool := &Pool{
107-
logFunc: logFunc,
108-
minAlive: minAlive,
109-
maxAlive: maxAlive,
110-
maxIdle: maxIdle,
111+
logFunc: po.logFunc,
112+
minAlive: po.minAlive,
113+
maxAlive: po.maxAlive,
114+
maxIdle: po.maxIdle,
111115

112116
idleCloseTimeout: Timestamp(math.Ceil(DefaultIdleTimeout.Seconds())),
113117
idlePingTimeout: Timestamp(math.Ceil(MaxIdleTimeoutWithoutPing.Seconds())),
114118

115119
connect: func() (*Conn, error) {
116-
return Connect(addr, user, password, dbName, options...)
120+
return Connect(addr, user, password, dbName, po.connOptions...)
117121
},
118122

119123
readyConnection: make(chan Connection),
@@ -127,13 +131,56 @@ func NewPool(
127131
go pool.newConnectionProducer()
128132

129133
if pool.minAlive > 0 {
130-
pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, pool.minAlive)
131-
pool.startNewConnections(pool.minAlive)
134+
go pool.startNewConnections(pool.minAlive)
132135
}
133136

134137
pool.wg.Add(1)
135138
go pool.closeOldIdleConnections()
136139

140+
if po.newPoolPingTimeout > 0 {
141+
ctx, cancel := context.WithTimeout(pool.ctx, po.newPoolPingTimeout)
142+
err := pool.checkConnection(ctx)
143+
cancel()
144+
if err != nil {
145+
pool.Close()
146+
return nil, errors.Errorf("checkConnection: %s", err)
147+
}
148+
}
149+
150+
return pool, nil
151+
}
152+
153+
// NewPool initializes new connection pool and uses params: addr, user, password, dbName and options.
154+
// minAlive specifies the minimum number of open connections that the pool will try to maintain.
155+
// maxAlive specifies the maximum number of open connections (for internal reasons,
156+
// may be greater by 1 inside newConnectionProducer).
157+
// maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
158+
//
159+
// Deprecated: use NewPoolWithOptions
160+
func NewPool(
161+
logFunc LogFunc,
162+
minAlive int,
163+
maxAlive int,
164+
maxIdle int,
165+
addr string,
166+
user string,
167+
password string,
168+
dbName string,
169+
options ...func(conn *Conn),
170+
) *Pool {
171+
pool, err := NewPoolWithOptions(
172+
addr,
173+
user,
174+
password,
175+
dbName,
176+
WithLogFunc(logFunc),
177+
WithPoolLimits(minAlive, maxAlive, maxIdle),
178+
WithConnOptions(options...),
179+
)
180+
if err != nil {
181+
pool.logFunc(`Pool: NewPool: %s`, err.Error())
182+
}
183+
137184
return pool
138185
}
139186

@@ -235,6 +282,7 @@ func (pool *Pool) putConnectionUnsafe(connection Connection) {
235282

236283
func (pool *Pool) newConnectionProducer() {
237284
defer pool.wg.Done()
285+
238286
var connection Connection
239287
var err error
240288

@@ -263,10 +311,24 @@ func (pool *Pool) newConnectionProducer() {
263311
pool.synchro.stats.TotalCount-- // Bad luck, should try again
264312
pool.synchro.Unlock()
265313

266-
time.Sleep(time.Duration(10+rand.Intn(90)) * time.Millisecond)
267-
continue
314+
pool.logFunc("Cannot establish new db connection: %s", err.Error())
315+
316+
timer := time.NewTimer(
317+
time.Duration(10+rand.Intn(90)) * time.Millisecond,
318+
)
319+
320+
select {
321+
case <-timer.C:
322+
continue
323+
case <-pool.ctx.Done():
324+
if !timer.Stop() {
325+
<-timer.C
326+
}
327+
return
328+
}
268329
}
269330
}
331+
270332
select {
271333
case pool.readyConnection <- connection:
272334
case <-pool.ctx.Done():
@@ -309,6 +371,7 @@ func (pool *Pool) getIdleConnectionUnsafe() Connection {
309371

310372
func (pool *Pool) closeOldIdleConnections() {
311373
defer pool.wg.Done()
374+
312375
var toPing []Connection
313376

314377
ticker := time.NewTicker(5 * time.Second)
@@ -468,13 +531,17 @@ func (pool *Pool) closeConn(conn *Conn) {
468531
}
469532

470533
func (pool *Pool) startNewConnections(count int) {
534+
pool.logFunc(`Pool: Setup %d new connections (minimal pool size)...`, count)
535+
471536
connections := make([]Connection, 0, count)
472537
for i := 0; i < count; i++ {
473538
if conn, err := pool.createNewConnection(); err == nil {
474539
pool.synchro.Lock()
475540
pool.synchro.stats.TotalCount++
476541
pool.synchro.Unlock()
477542
connections = append(connections, conn)
543+
} else {
544+
pool.logFunc(`Pool: createNewConnection: %s`, err)
478545
}
479546
}
480547

@@ -512,3 +579,34 @@ func (pool *Pool) Close() {
512579
pool.synchro.idleConnections = nil
513580
pool.synchro.Unlock()
514581
}
582+
583+
// checkConnection tries to connect and ping DB server
584+
func (pool *Pool) checkConnection(ctx context.Context) error {
585+
errChan := make(chan error, 1)
586+
587+
go func() {
588+
conn, err := pool.connect()
589+
if err == nil {
590+
err = conn.Ping()
591+
_ = conn.Close()
592+
}
593+
errChan <- err
594+
}()
595+
596+
select {
597+
case err := <-errChan:
598+
return err
599+
case <-ctx.Done():
600+
return ctx.Err()
601+
}
602+
}
603+
604+
// getDefaultPoolOptions returns pool config for low load services
605+
func getDefaultPoolOptions() poolOptions {
606+
return poolOptions{
607+
logFunc: log.Printf,
608+
minAlive: 1,
609+
maxAlive: 10,
610+
maxIdle: 2,
611+
}
612+
}

client/pool_options.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package client
2+
3+
import (
4+
"time"
5+
)
6+
7+
type (
8+
poolOptions struct {
9+
logFunc LogFunc
10+
11+
minAlive int
12+
maxAlive int
13+
maxIdle int
14+
15+
addr string
16+
user string
17+
password string
18+
dbName string
19+
20+
connOptions []func(conn *Conn)
21+
22+
newPoolPingTimeout time.Duration
23+
}
24+
)
25+
26+
type (
27+
PoolOption func(o *poolOptions)
28+
)
29+
30+
// WithPoolLimits sets pool limits:
31+
// - minAlive specifies the minimum number of open connections that the pool will try to maintain.
32+
// - maxAlive specifies the maximum number of open connections (for internal reasons,
33+
// may be greater by 1 inside newConnectionProducer).
34+
// - maxIdle specifies the maximum number of idle connections (see DefaultIdleTimeout).
35+
func WithPoolLimits(minAlive, maxAlive, maxIdle int) PoolOption {
36+
return func(o *poolOptions) {
37+
o.minAlive = minAlive
38+
o.maxAlive = maxAlive
39+
o.maxIdle = maxIdle
40+
}
41+
}
42+
43+
func WithLogFunc(f LogFunc) PoolOption {
44+
return func(o *poolOptions) {
45+
o.logFunc = f
46+
}
47+
}
48+
49+
func WithConnOptions(options ...func(conn *Conn)) PoolOption {
50+
return func(o *poolOptions) {
51+
o.connOptions = append(o.connOptions, options...)
52+
}
53+
}
54+
55+
// WithNewPoolPingTimeout enables connect & ping to DB during the pool initialization
56+
func WithNewPoolPingTimeout(timeout time.Duration) PoolOption {
57+
return func(o *poolOptions) {
58+
o.newPoolPingTimeout = timeout
59+
}
60+
}

client/pool_test.go

+39-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package client
33
import (
44
"context"
55
"fmt"
6+
"net"
67
"strings"
78
"testing"
9+
"time"
810

911
"github.com/go-mysql-org/go-mysql/test_util"
1012
"github.com/siddontang/go-log/log"
@@ -26,7 +28,12 @@ func TestPoolSuite(t *testing.T) {
2628

2729
func (s *poolTestSuite) TestPool_Close() {
2830
addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port)
29-
pool := NewPool(log.Debugf, 5, 10, 5, addr, *testUser, *testPassword, "")
31+
pool, err := NewPoolWithOptions(addr, *testUser, *testPassword, "",
32+
WithPoolLimits(5, 10, 5),
33+
WithLogFunc(log.Debugf),
34+
)
35+
require.NoError(s.T(), err)
36+
3037
conn, err := pool.GetConn(context.Background())
3138
require.NoError(s.T(), err)
3239
err = conn.Ping()
@@ -35,8 +42,38 @@ func (s *poolTestSuite) TestPool_Close() {
3542
pool.Close()
3643
var poolStats ConnectionStats
3744
pool.GetStats(&poolStats)
38-
require.Equal(s.T(), 0, poolStats.TotalCount)
45+
require.Equal(s.T(), 0, poolStats.IdleCount)
3946
require.Len(s.T(), pool.readyConnection, 0)
4047
_, err = pool.GetConn(context.Background())
4148
require.Error(s.T(), err)
4249
}
50+
51+
func (s *poolTestSuite) TestPool_WrongPassword() {
52+
addr := fmt.Sprintf("%s:%s", *test_util.MysqlHost, s.port)
53+
54+
_, err := NewPoolWithOptions(addr, *testUser, "wrong-password", "",
55+
WithPoolLimits(5, 10, 5),
56+
WithLogFunc(log.Debugf),
57+
WithNewPoolPingTimeout(time.Second),
58+
)
59+
60+
require.ErrorContains(s.T(), err, "ERROR 1045 (28000): Access denied for user")
61+
}
62+
63+
func (s *poolTestSuite) TestPool_WrongAddr() {
64+
l, err := net.Listen("tcp4", "127.0.0.1:0")
65+
require.NoError(s.T(), err)
66+
67+
laddr, ok := l.Addr().(*net.TCPAddr)
68+
require.True(s.T(), ok)
69+
70+
_ = l.Close()
71+
72+
_, err = NewPoolWithOptions(laddr.String(), *testUser, *testPassword, "",
73+
WithPoolLimits(5, 10, 5),
74+
WithLogFunc(log.Debugf),
75+
WithNewPoolPingTimeout(time.Second),
76+
)
77+
78+
require.Error(s.T(), err)
79+
}

0 commit comments

Comments
 (0)