Skip to content

Commit 779224c

Browse files
author
Damir Sayfutdinov
committed
Check connection liveness in beginTx
This commit fixes for ClickHouse#213. If the connection closes from the server side for some reason, the database driver returns driver.ErrBadConn to database/sql. Usually, database/sql retries a request, assuming that the error occurs in a function that could be called first after retrieving a connection from the pool. But beginTx in clickhouse-go doesn't perform any network interaction and driver.ErrBadConn is returned later in the transaction. database/sql doesn't retry it because assumes that connection is alive - beginTx didn't return the error. This commit adds a method to check the connection liveness and performs that check in beginTx function. The check is taken from go-sql-driver/mysql#934. There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn in clickhouse-go.
1 parent a472d1b commit 779224c

6 files changed

+204
-29
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Golang SQL database driver for [Yandex ClickHouse](https://clickhouse.yandex/)
2626
* pool_size - the maximum amount of preallocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa.
2727
* debug - enable debug output (boolean value)
2828
* compress - enable lz4 compression (integer value, default is '0')
29+
* check_connection_liveness - on supported platforms non-secure connections retrieved from the connection pool are checked in beginTx() for liveness before using them. If the check fails, the respective connection is marked as bad and the query retried with another connection. (boolean value, default is 'true')
2930

3031
SSL/TLS parameters:
3132

bootstrap.go

+29-19
Original file line numberDiff line numberDiff line change
@@ -83,21 +83,22 @@ func open(dsn string) (*clickhouse, error) {
8383
return nil, err
8484
}
8585
var (
86-
hosts = []string{url.Host}
87-
query = url.Query()
88-
secure = false
89-
skipVerify = false
90-
tlsConfigName = query.Get("tls_config")
91-
noDelay = true
92-
compress = false
93-
database = query.Get("database")
94-
username = query.Get("username")
95-
password = query.Get("password")
96-
blockSize = 1000000
97-
connTimeout = DefaultConnTimeout
98-
readTimeout = DefaultReadTimeout
99-
writeTimeout = DefaultWriteTimeout
100-
connOpenStrategy = connOpenRandom
86+
hosts = []string{url.Host}
87+
query = url.Query()
88+
secure = false
89+
skipVerify = false
90+
tlsConfigName = query.Get("tls_config")
91+
noDelay = true
92+
compress = false
93+
database = query.Get("database")
94+
username = query.Get("username")
95+
password = query.Get("password")
96+
blockSize = 1000000
97+
connTimeout = DefaultConnTimeout
98+
readTimeout = DefaultReadTimeout
99+
writeTimeout = DefaultWriteTimeout
100+
connOpenStrategy = connOpenRandom
101+
checkConnLiveness = true
101102
)
102103
if len(database) == 0 {
103104
database = DefaultDatabase
@@ -156,12 +157,21 @@ func open(dsn string) (*clickhouse, error) {
156157
compress = v
157158
}
158159

160+
if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil {
161+
checkConnLiveness = v
162+
}
163+
if secure {
164+
// There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn
165+
checkConnLiveness = false
166+
}
167+
159168
var (
160169
ch = clickhouse{
161-
logf: func(string, ...interface{}) {},
162-
settings: settings,
163-
compress: compress,
164-
blockSize: blockSize,
170+
logf: func(string, ...interface{}) {},
171+
settings: settings,
172+
compress: compress,
173+
blockSize: blockSize,
174+
checkConnLiveness: checkConnLiveness,
165175
ServerInfo: data.ServerInfo{
166176
Timezone: time.Local,
167177
},

clickhouse.go

+23-10
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,17 @@ type clickhouse struct {
4747
sync.Mutex
4848
data.ServerInfo
4949
data.ClientInfo
50-
logf logger
51-
conn *connect
52-
block *data.Block
53-
buffer *bufio.Writer
54-
decoder *binary.Decoder
55-
encoder *binary.Encoder
56-
settings *querySettings
57-
compress bool
58-
blockSize int
59-
inTransaction bool
50+
logf logger
51+
conn *connect
52+
block *data.Block
53+
buffer *bufio.Writer
54+
decoder *binary.Decoder
55+
encoder *binary.Encoder
56+
settings *querySettings
57+
compress bool
58+
blockSize int
59+
inTransaction bool
60+
checkConnLiveness bool
6061
}
6162

6263
func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {
@@ -124,6 +125,18 @@ func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse,
124125
case ch.conn.closed:
125126
return nil, driver.ErrBadConn
126127
}
128+
129+
// Perform a stale connection check. We only perform this check in beginTx,
130+
// because database/sql retries driver.ErrBadConn only for first request,
131+
// but beginTx doesn't perform any other network interaction.
132+
if ch.checkConnLiveness {
133+
if err := ch.conn.connCheck(); err != nil {
134+
ch.logf("[begin] closing bad idle connection: %w", err)
135+
ch.Close()
136+
return ch, driver.ErrBadConn
137+
}
138+
}
139+
127140
if finish := ch.watchCancel(ctx); finish != nil {
128141
defer finish()
129142
}

connect_check.go

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
2+
3+
package clickhouse
4+
5+
import (
6+
"errors"
7+
"fmt"
8+
"io"
9+
"syscall"
10+
"time"
11+
)
12+
13+
var errUnexpectedRead = errors.New("unexpected read from socket")
14+
15+
func (conn *connect) connCheck() error {
16+
var sysErr error
17+
18+
sysConn, ok := conn.Conn.(syscall.Conn)
19+
if !ok {
20+
return nil
21+
}
22+
rawConn, err := sysConn.SyscallConn()
23+
if err != nil {
24+
return err
25+
}
26+
// If this connection has a ReadTimeout which we've been setting on
27+
// reads, reset it to zero value before we attempt a non-blocking
28+
// read, otherwise we may get os.ErrDeadlineExceeded for the cached
29+
// connection from the pool with an expired timeout.
30+
if conn.readTimeout != 0 {
31+
err = conn.SetReadDeadline(time.Time{})
32+
if err != nil {
33+
return fmt.Errorf("set read deadline: %w", err)
34+
}
35+
conn.lastReadDeadlineTime = time.Time{}
36+
}
37+
err = rawConn.Read(func(fd uintptr) bool {
38+
var buf [1]byte
39+
n, err := syscall.Read(int(fd), buf[:])
40+
switch {
41+
case n == 0 && err == nil:
42+
sysErr = io.EOF
43+
case n > 0:
44+
sysErr = errUnexpectedRead
45+
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
46+
sysErr = nil
47+
default:
48+
sysErr = err
49+
}
50+
return true
51+
})
52+
if err != nil {
53+
return err
54+
}
55+
56+
return sysErr
57+
}

connect_check_dummy.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
2+
3+
package clickhouse
4+
5+
import "net"
6+
7+
func connCheck(conn net.Conn) error {
8+
return nil
9+
}

connect_check_test.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package clickhouse
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func Test_ConnCheck(t *testing.T) {
14+
const (
15+
ddl = `
16+
CREATE TABLE clickhouse_test_conncheck (
17+
Value String
18+
) Engine = Memory
19+
`
20+
dml = `
21+
INSERT INTO clickhouse_test_conncheck
22+
VALUES (?)
23+
`
24+
)
25+
26+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=false"); assert.NoError(t, err) {
27+
// We could change settings only at session level.
28+
// If we have only 1 connection, we guarantee that we change settings for them.
29+
connect.SetMaxOpenConns(1)
30+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck"); assert.NoError(t, err) {
31+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
32+
_, err = connect.Exec("set idle_connection_timeout=1")
33+
assert.NoError(t, err)
34+
35+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
36+
assert.NoError(t, err)
37+
38+
time.Sleep(1100 * time.Millisecond)
39+
ctx := context.Background()
40+
tx, err := connect.BeginTx(ctx, nil)
41+
assert.NoError(t, err)
42+
43+
_, err = tx.PrepareContext(ctx, dml)
44+
assert.NoError(t, err)
45+
}
46+
}
47+
}
48+
}
49+
50+
func Test_ConnCheckNegative(t *testing.T) {
51+
const (
52+
ddl = `
53+
CREATE TABLE clickhouse_test_conncheck_negative (
54+
Value String
55+
) Engine = Memory
56+
`
57+
dml = `
58+
INSERT INTO clickhouse_test_conncheck_negative
59+
VALUES (?)
60+
`
61+
)
62+
63+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&check_connection_liveness=false"); assert.NoError(t, err) {
64+
// We can only change the settings at the connection level.
65+
// If we have only one connection, we change the settings specifically for that connection.
66+
connect.SetMaxOpenConns(1)
67+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck_negative"); assert.NoError(t, err) {
68+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
69+
_, err = connect.Exec("set idle_connection_timeout=1")
70+
assert.NoError(t, err)
71+
72+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
73+
assert.NoError(t, err)
74+
75+
time.Sleep(1100 * time.Millisecond)
76+
ctx := context.Background()
77+
tx, err := connect.BeginTx(ctx, nil)
78+
assert.NoError(t, err)
79+
80+
_, err = tx.PrepareContext(ctx, dml)
81+
assert.Equal(t, driver.ErrBadConn, err)
82+
}
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)