Skip to content

Commit 67217d7

Browse files
committed
connection: make dialer mandatory
This patch modifies `Connect` api. Now, to connect to the Tarantool, you need to pass an object that satisfies `tarantool.Dialer` interface. You can use one of the existing implementations: `TtDialer` or `OpenSslDialer`. For example: ``` conn, err := tarantool.Connect(context.Background(), tarantool.TtDialer{ Address: "127.0.0.1:3301", User: "user", Password: "secret", }, tarantool.Opts{}) ``` To create a connection pool, you need to pass a `map[string]tarantool.Dialer`, where each dialer is associated with a unique ID (for example, it can be the server address). Connections will be distinguished from each other using these IDs. For example: ``` connPool, err := pool.Connect(context.Background(), map[string]tarantool.Dialer{ "127.0.0.1": tarantool.TtDialer{ Address: "127.0.0.1", User: "user", Password: "secret", }, }, tarantool.Opts{}) ``` The `conn.RemoteAddr` and `conn.LocalAddr` functions have been removed. To obtain the connection address, you can use `conn.Addr`. This function panics if the connection has not been established. Now, `NewWatcher` checks the actual features of the server, rather than relying on the features provided by the user during connection creation. In the case of connection pool, watchers are created for connections that support this feature. `ClientProtocolInfo`, `ServerProtocolInfo` were removed. Now, there is `ProtocolInfo`, which returns the server protocol info. Part of #321
1 parent 6225ec4 commit 67217d7

File tree

7 files changed

+443
-435
lines changed

7 files changed

+443
-435
lines changed

connection.go

+24-127
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io"
1111
"log"
1212
"math"
13+
"net"
1314
"runtime"
1415
"sync"
1516
"sync/atomic"
@@ -89,16 +90,14 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
8990
case LogReconnectFailed:
9091
reconnects := v[0].(uint)
9192
err := v[1].(error)
92-
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
93-
reconnects, conn.opts.MaxReconnects, conn.addr, err)
93+
log.Printf("tarantool: reconnect (%d/%d) failed: %s",
94+
reconnects, conn.opts.MaxReconnects, err)
9495
case LogLastReconnectFailed:
9596
err := v[0].(error)
96-
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
97-
conn.addr, err)
97+
log.Printf("tarantool: last reconnect failed: %s, giving it up", err)
9898
case LogUnexpectedResultId:
9999
resp := v[0].(*Response)
100-
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
101-
conn.addr, resp.RequestId)
100+
log.Printf("tarantool: got unexpected resultId (%d) in response", resp.RequestId)
102101
case LogWatchEventReadFailed:
103102
err := v[0].(error)
104103
log.Printf("tarantool: unable to parse watch event: %s", err)
@@ -156,10 +155,11 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
156155
// More on graceful shutdown:
157156
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
158157
type Connection struct {
159-
addr string
160-
c Conn
161-
mutex sync.Mutex
162-
cond *sync.Cond
158+
addr net.Addr
159+
dialer Dialer
160+
c Conn
161+
mutex sync.Mutex
162+
cond *sync.Cond
163163
// Schema contains schema loaded on connection.
164164
Schema *Schema
165165
// schemaResolver contains a SchemaResolver implementation.
@@ -262,11 +262,6 @@ const (
262262

263263
// Opts is a way to configure Connection
264264
type Opts struct {
265-
// Auth is an authentication method.
266-
Auth Auth
267-
// Dialer is a Dialer object used to create a new connection to a
268-
// Tarantool instance. TtDialer is a default one.
269-
Dialer Dialer
270265
// Timeout for response to a particular request. The timeout is reset when
271266
// push messages are received. If Timeout is zero, any request can be
272267
// blocked infinitely.
@@ -289,10 +284,6 @@ type Opts struct {
289284
// endlessly.
290285
// After MaxReconnects attempts Connection becomes closed.
291286
MaxReconnects uint
292-
// Username for logging in to Tarantool.
293-
User string
294-
// User password for logging in to Tarantool.
295-
Pass string
296287
// RateLimit limits number of 'in-fly' request, i.e. already put into
297288
// requests queue, but not yet answered by server or timeouted.
298289
// It is disabled by default.
@@ -317,83 +308,23 @@ type Opts struct {
317308
Handle interface{}
318309
// Logger is user specified logger used for error messages.
319310
Logger Logger
320-
// Transport is the connection type, by default the connection is unencrypted.
321-
Transport string
322-
// SslOpts is used only if the Transport == 'ssl' is set.
323-
Ssl SslOpts
324-
// RequiredProtocolInfo contains minimal protocol version and
325-
// list of protocol features that should be supported by
326-
// Tarantool server. By default there are no restrictions.
327-
RequiredProtocolInfo ProtocolInfo
328-
}
329-
330-
// SslOpts is a way to configure ssl transport.
331-
type SslOpts struct {
332-
// KeyFile is a path to a private SSL key file.
333-
KeyFile string
334-
// CertFile is a path to an SSL certificate file.
335-
CertFile string
336-
// CaFile is a path to a trusted certificate authorities (CA) file.
337-
CaFile string
338-
// Ciphers is a colon-separated (:) list of SSL cipher suites the connection
339-
// can use.
340-
//
341-
// We don't provide a list of supported ciphers. This is what OpenSSL
342-
// does. The only limitation is usage of TLSv1.2 (because other protocol
343-
// versions don't seem to support the GOST cipher). To add additional
344-
// ciphers (GOST cipher), you must configure OpenSSL.
345-
//
346-
// See also
347-
//
348-
// * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
349-
Ciphers string
350-
// Password is a password for decrypting the private SSL key file.
351-
// The priority is as follows: try to decrypt with Password, then
352-
// try PasswordFile.
353-
Password string
354-
// PasswordFile is a path to the list of passwords for decrypting
355-
// the private SSL key file. The connection tries every line from the
356-
// file as a password.
357-
PasswordFile string
358-
}
359-
360-
// Clone returns a copy of the Opts object.
361-
// Any changes in copy RequiredProtocolInfo will not affect the original
362-
// RequiredProtocolInfo value.
363-
func (opts Opts) Clone() Opts {
364-
optsCopy := opts
365-
optsCopy.RequiredProtocolInfo = opts.RequiredProtocolInfo.Clone()
366-
367-
return optsCopy
368311
}
369312

370313
// Connect creates and configures a new Connection.
371-
//
372-
// Address could be specified in following ways:
373-
//
374-
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
375-
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
376-
//
377-
// - Unix socket, first '/' or '.' indicates Unix socket
378-
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
379-
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
380-
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
314+
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
381315
conn = &Connection{
382-
addr: addr,
316+
dialer: dialer,
383317
requestId: 0,
384318
contextRequestId: 1,
385319
Greeting: &Greeting{},
386320
control: make(chan struct{}),
387-
opts: opts.Clone(),
321+
opts: opts,
388322
dec: msgpack.NewDecoder(&smallBuf{}),
389323
}
390324
maxprocs := uint32(runtime.GOMAXPROCS(-1))
391325
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
392326
conn.opts.Concurrency = maxprocs * 4
393327
}
394-
if conn.opts.Dialer == nil {
395-
conn.opts.Dialer = TtDialer{}
396-
}
397328
if c := conn.opts.Concurrency; c&(c-1) != 0 {
398329
for i := uint(1); i < 32; i *= 2 {
399330
c |= c >> i
@@ -474,30 +405,11 @@ func (conn *Connection) CloseGraceful() error {
474405
}
475406

476407
// Addr returns a configured address of Tarantool socket.
477-
func (conn *Connection) Addr() string {
408+
// It panics, if the connection has not been successfully established.
409+
func (conn *Connection) Addr() net.Addr {
478410
return conn.addr
479411
}
480412

481-
// RemoteAddr returns an address of Tarantool socket.
482-
func (conn *Connection) RemoteAddr() string {
483-
conn.mutex.Lock()
484-
defer conn.mutex.Unlock()
485-
if conn.c == nil {
486-
return ""
487-
}
488-
return conn.c.RemoteAddr().String()
489-
}
490-
491-
// LocalAddr returns an address of outgoing socket.
492-
func (conn *Connection) LocalAddr() string {
493-
conn.mutex.Lock()
494-
defer conn.mutex.Unlock()
495-
if conn.c == nil {
496-
return ""
497-
}
498-
return conn.c.LocalAddr().String()
499-
}
500-
501413
// Handle returns a user-specified handle from Opts.
502414
func (conn *Connection) Handle() interface{} {
503415
return conn.opts.Handle
@@ -514,19 +426,14 @@ func (conn *Connection) dial(ctx context.Context) error {
514426
opts := conn.opts
515427

516428
var c Conn
517-
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
518-
IoTimeout: opts.Timeout,
519-
Transport: opts.Transport,
520-
Ssl: opts.Ssl,
521-
RequiredProtocol: opts.RequiredProtocolInfo,
522-
Auth: opts.Auth,
523-
User: opts.User,
524-
Password: opts.Pass,
429+
c, err := conn.dialer.Dial(ctx, DialOpts{
430+
IoTimeout: opts.Timeout,
525431
})
526432
if err != nil {
527433
return err
528434
}
529435

436+
conn.addr = c.Addr()
530437
conn.Greeting.Version = c.Greeting().Version
531438
conn.serverProtocolInfo = c.ProtocolInfo()
532439

@@ -1447,8 +1354,7 @@ func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) boo
14471354

14481355
// NewWatcher creates a new Watcher object for the connection.
14491356
//
1450-
// You need to require IPROTO_FEATURE_WATCHERS to use watchers, see examples
1451-
// for the function.
1357+
// Server must support IPROTO_FEATURE_WATCHERS to use watchers.
14521358
//
14531359
// After watcher creation, the watcher callback is invoked for the first time.
14541360
// In this case, the callback is triggered whether or not the key has already
@@ -1484,9 +1390,9 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
14841390
// That's why we can't just check the Tarantool response for an unsupported
14851391
// request error.
14861392
if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
1487-
conn.opts.RequiredProtocolInfo.Features) {
1488-
err := fmt.Errorf("the feature %s must be required by connection "+
1489-
"options to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
1393+
conn.c.ProtocolInfo().Features) {
1394+
err := fmt.Errorf("the feature %s must be supported by connection "+
1395+
"to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
14901396
return nil, err
14911397
}
14921398

@@ -1577,23 +1483,14 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc
15771483
}, nil
15781484
}
15791485

1580-
// ServerProtocolVersion returns protocol version and protocol features
1486+
// ProtocolInfo returns protocol version and protocol features
15811487
// supported by connected Tarantool server. Beware that values might be
15821488
// outdated if connection is in a disconnected state.
15831489
// Since 1.10.0
1584-
func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
1490+
func (conn *Connection) ProtocolInfo() ProtocolInfo {
15851491
return conn.serverProtocolInfo.Clone()
15861492
}
15871493

1588-
// ClientProtocolVersion returns protocol version and protocol features
1589-
// supported by Go connection client.
1590-
// Since 1.10.0
1591-
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
1592-
info := clientProtocolInfo.Clone()
1593-
info.Auth = conn.opts.Auth
1594-
return info
1595-
}
1596-
15971494
func shutdownEventCallback(event WatchEvent) {
15981495
// Receives "true" on server shutdown.
15991496
// See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/

0 commit comments

Comments
 (0)