Skip to content

Commit a756468

Browse files
committed
pool: fix race condition at GetNextConnection()
The `r.current` value can be changed by concurrent threads because the change happens under read-lock. We could use the atomic counter for a current connection number to avoid the race condition. Closes #309 (cherry picked from dbfaab5)
1 parent 3ac003e commit a756468

File tree

3 files changed

+37
-7
lines changed

3 files changed

+37
-7
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1919
### Fixed
2020

2121
- Flaky decimal/TestSelect (#300)
22+
- Race condition at roundRobinStrategy.GetNextConnection() (#309)
2223

2324
## [1.12.0] - 2023-06-07
2425

connection_pool/connection_pool_test.go

+29
Original file line numberDiff line numberDiff line change
@@ -2065,6 +2065,35 @@ func TestDo(t *testing.T) {
20652065
require.NotNilf(t, resp, "response is nil after Ping")
20662066
}
20672067

2068+
func TestDo_concurrent(t *testing.T) {
2069+
roles := []bool{true, true, false, true, false}
2070+
2071+
err := test_helpers.SetClusterRO(servers, connOpts, roles)
2072+
require.Nilf(t, err, "fail to set roles for cluster")
2073+
2074+
connPool, err := connection_pool.Connect(servers, connOpts)
2075+
require.Nilf(t, err, "failed to connect")
2076+
require.NotNilf(t, connPool, "conn is nil after Connect")
2077+
2078+
defer connPool.Close()
2079+
2080+
req := tarantool.NewPingRequest()
2081+
const concurrency = 100
2082+
var wg sync.WaitGroup
2083+
wg.Add(concurrency)
2084+
2085+
for i := 0; i < concurrency; i++ {
2086+
go func() {
2087+
defer wg.Done()
2088+
2089+
_, err := connPool.Do(req, connection_pool.ANY).Get()
2090+
assert.Nil(t, err)
2091+
}()
2092+
}
2093+
2094+
wg.Wait()
2095+
}
2096+
20682097
func TestNewPrepared(t *testing.T) {
20692098
test_helpers.SkipIfSQLUnsupported(t)
20702099

connection_pool/round_robin.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package connection_pool
22

33
import (
44
"sync"
5+
"sync/atomic"
56

67
"github.com/tarantool/go-tarantool"
78
)
@@ -10,8 +11,8 @@ type RoundRobinStrategy struct {
1011
conns []*tarantool.Connection
1112
indexByAddr map[string]uint
1213
mutex sync.RWMutex
13-
size uint
14-
current uint
14+
size uint64
15+
current uint64
1516
}
1617

1718
func NewEmptyRoundRobin(size int) *RoundRobinStrategy {
@@ -98,13 +99,12 @@ func (r *RoundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) {
9899
r.conns[idx] = conn
99100
} else {
100101
r.conns = append(r.conns, conn)
101-
r.indexByAddr[addr] = r.size
102+
r.indexByAddr[addr] = uint(r.size)
102103
r.size += 1
103104
}
104105
}
105106

106-
func (r *RoundRobinStrategy) nextIndex() uint {
107-
ret := r.current % r.size
108-
r.current++
109-
return ret
107+
func (r *RoundRobinStrategy) nextIndex() uint64 {
108+
next := atomic.AddUint64(&r.current, 1)
109+
return (next - 1) % r.size
110110
}

0 commit comments

Comments
 (0)