Skip to content
This repository was archived by the owner on Feb 26, 2024. It is now read-only.

Commit f40a48a

Browse files
Damir Sayfutdinovfarbodsalimi
Damir Sayfutdinov
authored andcommitted
Check connection liveness in beginTx
This commit fixes for ClickHouse#213. If the connection closes from the server side for some reason, the database driver returns driver.ErrBadConn to database/sql. Usually, database/sql retries a request, assuming that the error occurs in a function that could be called first after retrieving a connection from the pool. But beginTx in clickhouse-go doesn't perform any network interaction and driver.ErrBadConn is returned later in the transaction. database/sql doesn't retry it because assumes that connection is alive - beginTx didn't return the error. This commit adds a method to check the connection liveness and performs that check in beginTx function. The check is taken from go-sql-driver/mysql#934. There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn in clickhouse-go.
1 parent e95986c commit f40a48a

6 files changed

+205
-30
lines changed

Diff for: README.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Golang SQL database driver for [Yandex ClickHouse](https://clickhouse.yandex/)
2626
* pool_size - maximum amount of preallocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa.
2727
* debug - enable debug output (boolean value)
2828
* compress - enable lz4 compression (integer value, default is '0')
29+
* check_connection_liveness - on supported platforms non-secure connections retrieved from the connection pool are checked in beginTx() for liveness before using them. If the check fails, the respective connection is marked as bad and the query retried with another connection. (boolean value, default is 'true')
2930

3031
SSL/TLS parameters:
3132

Diff for: bootstrap.go

+30-20
Original file line numberDiff line numberDiff line change
@@ -85,22 +85,23 @@ func open(dsn string) (*clickhouse, error) {
8585
return nil, err
8686
}
8787
var (
88-
hosts = []string{url.Host}
89-
query = url.Query()
90-
secure = false
91-
skipVerify = false
92-
tlsConfigName = query.Get("tls_config")
93-
noDelay = true
94-
compress = false
95-
database = query.Get("database")
96-
username = query.Get("username")
97-
password = query.Get("password")
98-
blockSize = 1000000
99-
connTimeout = DefaultConnTimeout
100-
readTimeout = DefaultReadTimeout
101-
writeTimeout = DefaultWriteTimeout
102-
connOpenStrategy = connOpenRandom
103-
poolSize = 100
88+
hosts = []string{url.Host}
89+
query = url.Query()
90+
secure = false
91+
skipVerify = false
92+
tlsConfigName = query.Get("tls_config")
93+
noDelay = true
94+
compress = false
95+
database = query.Get("database")
96+
username = query.Get("username")
97+
password = query.Get("password")
98+
blockSize = 1000000
99+
connTimeout = DefaultConnTimeout
100+
readTimeout = DefaultReadTimeout
101+
writeTimeout = DefaultWriteTimeout
102+
connOpenStrategy = connOpenRandom
103+
poolSize = 100
104+
checkConnLiveness = true
104105
)
105106
if len(database) == 0 {
106107
database = DefaultDatabase
@@ -165,12 +166,21 @@ func open(dsn string) (*clickhouse, error) {
165166
compress = v
166167
}
167168

169+
if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil {
170+
checkConnLiveness = v
171+
}
172+
if secure {
173+
// There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn
174+
checkConnLiveness = false
175+
}
176+
168177
var (
169178
ch = clickhouse{
170-
logf: func(string, ...interface{}) {},
171-
settings: settings,
172-
compress: compress,
173-
blockSize: blockSize,
179+
logf: func(string, ...interface{}) {},
180+
settings: settings,
181+
compress: compress,
182+
blockSize: blockSize,
183+
checkConnLiveness: checkConnLiveness,
174184
ServerInfo: data.ServerInfo{
175185
Timezone: time.Local,
176186
},

Diff for: clickhouse.go

+23-10
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,17 @@ type clickhouse struct {
4747
sync.Mutex
4848
data.ServerInfo
4949
data.ClientInfo
50-
logf logger
51-
conn *connect
52-
block *data.Block
53-
buffer *bufio.Writer
54-
decoder *binary.Decoder
55-
encoder *binary.Encoder
56-
settings *querySettings
57-
compress bool
58-
blockSize int
59-
inTransaction bool
50+
logf logger
51+
conn *connect
52+
block *data.Block
53+
buffer *bufio.Writer
54+
decoder *binary.Decoder
55+
encoder *binary.Encoder
56+
settings *querySettings
57+
compress bool
58+
blockSize int
59+
inTransaction bool
60+
checkConnLiveness bool
6061
}
6162

6263
func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {
@@ -124,6 +125,18 @@ func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse,
124125
case ch.conn.closed:
125126
return nil, driver.ErrBadConn
126127
}
128+
129+
// Perform a stale connection check. We only perform this check in beginTx,
130+
// because database/sql retries driver.ErrBadConn only for first request,
131+
// but beginTx doesn't perform any other network interaction.
132+
if ch.checkConnLiveness {
133+
if err := ch.conn.connCheck(); err != nil {
134+
ch.logf("[begin] closing bad idle connection: %w", err)
135+
ch.Close()
136+
return ch, driver.ErrBadConn
137+
}
138+
}
139+
127140
if finish := ch.watchCancel(ctx); finish != nil {
128141
defer finish()
129142
}

Diff for: connect_check.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
2+
3+
package clickhouse
4+
5+
import (
6+
"errors"
7+
"fmt"
8+
"io"
9+
"syscall"
10+
"time"
11+
)
12+
13+
var errUnexpectedRead = errors.New("unexpected read from socket")
14+
15+
func (conn *connect) connCheck() error {
16+
var sysErr error
17+
18+
sysConn, ok := conn.Conn.(syscall.Conn)
19+
if !ok {
20+
return nil
21+
}
22+
rawConn, err := sysConn.SyscallConn()
23+
if err != nil {
24+
return err
25+
}
26+
// If this connection has a ReadTimeout which we've been setting on
27+
// reads, reset it to zero value before we attempt a non-blocking
28+
// read, otherwise we may get os.ErrDeadlineExceeded for the cached
29+
// connection from the pool with an expired timeout.
30+
if conn.readTimeout != 0 {
31+
err = conn.SetReadDeadline(time.Time{})
32+
if err != nil {
33+
return fmt.Errorf("set read deadline: %w", err)
34+
}
35+
conn.lastReadDeadlineTime = time.Time{}
36+
}
37+
err = rawConn.Read(func(fd uintptr) bool {
38+
var buf [1]byte
39+
n, err := syscall.Read(int(fd), buf[:])
40+
switch {
41+
case n == 0 && err == nil:
42+
sysErr = io.EOF
43+
case n > 0:
44+
sysErr = errUnexpectedRead
45+
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
46+
sysErr = nil
47+
default:
48+
sysErr = err
49+
}
50+
return true
51+
})
52+
if err != nil {
53+
return err
54+
}
55+
56+
return sysErr
57+
}

Diff for: connect_check_dummy.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
2+
3+
package clickhouse
4+
5+
import "net"
6+
7+
func connCheck(conn net.Conn) error {
8+
return nil
9+
}

Diff for: connect_check_test.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package clickhouse
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func Test_ConnCheck(t *testing.T) {
14+
const (
15+
ddl = `
16+
CREATE TABLE clickhouse_test_conncheck (
17+
Value String
18+
) Engine = Memory
19+
`
20+
dml = `
21+
INSERT INTO clickhouse_test_conncheck
22+
VALUES (?)
23+
`
24+
)
25+
26+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=false"); assert.NoError(t, err) {
27+
// We could change settings only at session level.
28+
// If we have only 1 connection, we guarantee that we change settings for them.
29+
connect.SetMaxOpenConns(1)
30+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck"); assert.NoError(t, err) {
31+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
32+
_, err = connect.Exec("set idle_connection_timeout=1")
33+
assert.NoError(t, err)
34+
35+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
36+
assert.NoError(t, err)
37+
38+
time.Sleep(1100 * time.Millisecond)
39+
ctx := context.Background()
40+
tx, err := connect.BeginTx(ctx, nil)
41+
assert.NoError(t, err)
42+
43+
_, err = tx.PrepareContext(ctx, dml)
44+
assert.NoError(t, err)
45+
}
46+
}
47+
}
48+
}
49+
50+
func Test_ConnCheckNegative(t *testing.T) {
51+
const (
52+
ddl = `
53+
CREATE TABLE clickhouse_test_conncheck_negative (
54+
Value String
55+
) Engine = Memory
56+
`
57+
dml = `
58+
INSERT INTO clickhouse_test_conncheck_negative
59+
VALUES (?)
60+
`
61+
)
62+
63+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&check_connection_liveness=false"); assert.NoError(t, err) {
64+
// We can only change the settings at the connection level.
65+
// If we have only one connection, we change the settings specifically for that connection.
66+
connect.SetMaxOpenConns(1)
67+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck_negative"); assert.NoError(t, err) {
68+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
69+
_, err = connect.Exec("set idle_connection_timeout=1")
70+
assert.NoError(t, err)
71+
72+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
73+
assert.NoError(t, err)
74+
75+
time.Sleep(1100 * time.Millisecond)
76+
ctx := context.Background()
77+
tx, err := connect.BeginTx(ctx, nil)
78+
assert.NoError(t, err)
79+
80+
_, err = tx.PrepareContext(ctx, dml)
81+
assert.Equal(t, driver.ErrBadConn, err)
82+
}
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)