Skip to content

Commit 66a161f

Browse files
author
Damir Sayfutdinov
committed
Check connection liveness in beginTx
This commit fixes for ClickHouse#213. If the connection closes from the server side for some reason, the database driver returns driver.ErrBadConn to database/sql. Usually, database/sql retries a request, assuming that the error occurs in a function that could be called first after retrieving a connection from the pool. But beginTx in clickhouse-go doesn't perform any network interaction and driver.ErrBadConn is returned later in the transaction. database/sql doesn't retry it because assumes that connection is alive - beginTx didn't return the error. This commit adds a method to check the connection liveness and performs that check in beginTx function. The check is taken from go-sql-driver/mysql#934. There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn in clickhouse-go.
1 parent a472d1b commit 66a161f

6 files changed

+209
-31
lines changed

Diff for: README.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Golang SQL database driver for [Yandex ClickHouse](https://clickhouse.yandex/)
2626
* pool_size - the maximum amount of preallocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa.
2727
* debug - enable debug output (boolean value)
2828
* compress - enable lz4 compression (integer value, default is '0')
29+
* check_connection_liveness - on supported platforms non-secure connections retrieved from the connection pool are checked in beginTx() for liveness before using them. If the check fails, the respective connection is marked as bad and the query retried with another connection. (boolean value, default is 'true')
2930

3031
SSL/TLS parameters:
3132

Diff for: bootstrap.go

+29-19
Original file line numberDiff line numberDiff line change
@@ -83,21 +83,22 @@ func open(dsn string) (*clickhouse, error) {
8383
return nil, err
8484
}
8585
var (
86-
hosts = []string{url.Host}
87-
query = url.Query()
88-
secure = false
89-
skipVerify = false
90-
tlsConfigName = query.Get("tls_config")
91-
noDelay = true
92-
compress = false
93-
database = query.Get("database")
94-
username = query.Get("username")
95-
password = query.Get("password")
96-
blockSize = 1000000
97-
connTimeout = DefaultConnTimeout
98-
readTimeout = DefaultReadTimeout
99-
writeTimeout = DefaultWriteTimeout
100-
connOpenStrategy = connOpenRandom
86+
hosts = []string{url.Host}
87+
query = url.Query()
88+
secure = false
89+
skipVerify = false
90+
tlsConfigName = query.Get("tls_config")
91+
noDelay = true
92+
compress = false
93+
database = query.Get("database")
94+
username = query.Get("username")
95+
password = query.Get("password")
96+
blockSize = 1000000
97+
connTimeout = DefaultConnTimeout
98+
readTimeout = DefaultReadTimeout
99+
writeTimeout = DefaultWriteTimeout
100+
connOpenStrategy = connOpenRandom
101+
checkConnLiveness = true
101102
)
102103
if len(database) == 0 {
103104
database = DefaultDatabase
@@ -156,12 +157,21 @@ func open(dsn string) (*clickhouse, error) {
156157
compress = v
157158
}
158159

160+
if v, err := strconv.ParseBool(query.Get("check_connection_liveness")); err == nil {
161+
checkConnLiveness = v
162+
}
163+
if secure {
164+
// There is no way to check the liveness of a secure connection, as long as there is no access to raw TCP net.Conn
165+
checkConnLiveness = false
166+
}
167+
159168
var (
160169
ch = clickhouse{
161-
logf: func(string, ...interface{}) {},
162-
settings: settings,
163-
compress: compress,
164-
blockSize: blockSize,
170+
logf: func(string, ...interface{}) {},
171+
settings: settings,
172+
compress: compress,
173+
blockSize: blockSize,
174+
checkConnLiveness: checkConnLiveness,
165175
ServerInfo: data.ServerInfo{
166176
Timezone: time.Local,
167177
},

Diff for: clickhouse.go

+26-12
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ import (
1313
"sync"
1414
"time"
1515

16-
"github.com/ClickHouse/clickhouse-go/lib/binary"
1716
"github.com/ClickHouse/clickhouse-go/lib/column"
1817
"github.com/ClickHouse/clickhouse-go/lib/data"
19-
"github.com/ClickHouse/clickhouse-go/lib/protocol"
2018
"github.com/ClickHouse/clickhouse-go/lib/types"
19+
20+
"github.com/ClickHouse/clickhouse-go/lib/binary"
21+
"github.com/ClickHouse/clickhouse-go/lib/protocol"
2122
)
2223

2324
type (
@@ -47,16 +48,17 @@ type clickhouse struct {
4748
sync.Mutex
4849
data.ServerInfo
4950
data.ClientInfo
50-
logf logger
51-
conn *connect
52-
block *data.Block
53-
buffer *bufio.Writer
54-
decoder *binary.Decoder
55-
encoder *binary.Encoder
56-
settings *querySettings
57-
compress bool
58-
blockSize int
59-
inTransaction bool
51+
logf logger
52+
conn *connect
53+
block *data.Block
54+
buffer *bufio.Writer
55+
decoder *binary.Decoder
56+
encoder *binary.Encoder
57+
settings *querySettings
58+
compress bool
59+
blockSize int
60+
inTransaction bool
61+
checkConnLiveness bool
6062
}
6163

6264
func (ch *clickhouse) Prepare(query string) (driver.Stmt, error) {
@@ -124,6 +126,18 @@ func (ch *clickhouse) beginTx(ctx context.Context, opts txOptions) (*clickhouse,
124126
case ch.conn.closed:
125127
return nil, driver.ErrBadConn
126128
}
129+
130+
// Perform a stale connection check. We only perform this check in beginTx,
131+
// because database/sql retries driver.ErrBadConn only for first request,
132+
// but beginTx doesn't perform any other network interaction.
133+
if ch.checkConnLiveness {
134+
if err := ch.conn.connCheck(); err != nil {
135+
ch.logf("[begin] closing bad idle connection: %w", err)
136+
ch.Close()
137+
return ch, driver.ErrBadConn
138+
}
139+
}
140+
127141
if finish := ch.watchCancel(ctx); finish != nil {
128142
defer finish()
129143
}

Diff for: connect_check.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
//go:build linux || darwin || dragonfly || freebsd || netbsd || openbsd || solaris || illumos
2+
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos
3+
4+
package clickhouse
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"io"
10+
"syscall"
11+
"time"
12+
)
13+
14+
var errUnexpectedRead = errors.New("unexpected read from socket")
15+
16+
func (conn *connect) connCheck() error {
17+
var sysErr error
18+
19+
sysConn, ok := conn.Conn.(syscall.Conn)
20+
if !ok {
21+
return nil
22+
}
23+
rawConn, err := sysConn.SyscallConn()
24+
if err != nil {
25+
return err
26+
}
27+
// If this connection has a ReadTimeout which we've been setting on
28+
// reads, reset it to zero value before we attempt a non-blocking
29+
// read, otherwise we may get os.ErrDeadlineExceeded for the cached
30+
// connection from the pool with an expired timeout.
31+
if conn.readTimeout != 0 {
32+
err = conn.SetReadDeadline(time.Time{})
33+
if err != nil {
34+
return fmt.Errorf("set read deadline: %w", err)
35+
}
36+
conn.lastReadDeadlineTime = time.Time{}
37+
}
38+
err = rawConn.Read(func(fd uintptr) bool {
39+
var buf [1]byte
40+
n, err := syscall.Read(int(fd), buf[:])
41+
switch {
42+
case n == 0 && err == nil:
43+
sysErr = io.EOF
44+
case n > 0:
45+
sysErr = errUnexpectedRead
46+
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
47+
sysErr = nil
48+
default:
49+
sysErr = err
50+
}
51+
return true
52+
})
53+
if err != nil {
54+
return err
55+
}
56+
57+
return sysErr
58+
}

Diff for: connect_check_dummy.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
//go:build !linux && !darwin && !dragonfly && !freebsd && !netbsd && !openbsd && !solaris && !illumos
2+
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos
3+
4+
package clickhouse
5+
6+
import "net"
7+
8+
func connCheck(conn net.Conn) error {
9+
return nil
10+
}

Diff for: connect_check_test.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package clickhouse
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func Test_ConnCheck(t *testing.T) {
14+
const (
15+
ddl = `
16+
CREATE TABLE clickhouse_test_conncheck (
17+
Value String
18+
) Engine = Memory
19+
`
20+
dml = `
21+
INSERT INTO clickhouse_test_conncheck
22+
VALUES (?)
23+
`
24+
)
25+
26+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=false"); assert.NoError(t, err) {
27+
// We could change settings only at session level.
28+
// If we have only 1 connection, we guarantee that we change settings for them.
29+
connect.SetMaxOpenConns(1)
30+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck"); assert.NoError(t, err) {
31+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
32+
_, err = connect.Exec("set idle_connection_timeout=1")
33+
assert.NoError(t, err)
34+
35+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
36+
assert.NoError(t, err)
37+
38+
time.Sleep(1100 * time.Millisecond)
39+
ctx := context.Background()
40+
tx, err := connect.BeginTx(ctx, nil)
41+
assert.NoError(t, err)
42+
43+
_, err = tx.PrepareContext(ctx, dml)
44+
assert.NoError(t, err)
45+
}
46+
}
47+
}
48+
}
49+
50+
func Test_ConnCheckNegative(t *testing.T) {
51+
const (
52+
ddl = `
53+
CREATE TABLE clickhouse_test_conncheck_negative (
54+
Value String
55+
) Engine = Memory
56+
`
57+
dml = `
58+
INSERT INTO clickhouse_test_conncheck_negative
59+
VALUES (?)
60+
`
61+
)
62+
63+
if connect, err := sql.Open("clickhouse", "tcp://127.0.0.1:9000?debug=true&check_connection_liveness=false"); assert.NoError(t, err) {
64+
// We can only change the settings at the connection level.
65+
// If we have only one connection, we change the settings specifically for that connection.
66+
connect.SetMaxOpenConns(1)
67+
if _, err := connect.Exec("DROP TABLE IF EXISTS clickhouse_test_conncheck_negative"); assert.NoError(t, err) {
68+
if _, err := connect.Exec(ddl); assert.NoError(t, err) {
69+
_, err = connect.Exec("set idle_connection_timeout=1")
70+
assert.NoError(t, err)
71+
72+
_, err = connect.Exec("set tcp_keep_alive_timeout=0")
73+
assert.NoError(t, err)
74+
75+
time.Sleep(1100 * time.Millisecond)
76+
ctx := context.Background()
77+
tx, err := connect.BeginTx(ctx, nil)
78+
assert.NoError(t, err)
79+
80+
_, err = tx.PrepareContext(ctx, dml)
81+
assert.Equal(t, driver.ErrBadConn, err)
82+
}
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)