Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a858b07

Browse files
committedNov 8, 2023
new api draft
1 parent c564e6d commit a858b07

File tree

5 files changed

+353
-342
lines changed

5 files changed

+353
-342
lines changed
 

‎connection.go

Lines changed: 14 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,15 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
9090
reconnects := v[0].(uint)
9191
err := v[1].(error)
9292
log.Printf("tarantool: reconnect (%d/%d) to %s failed: %s",
93-
reconnects, conn.opts.MaxReconnects, conn.addr, err)
93+
reconnects, conn.opts.MaxReconnects, conn.Addr(), err)
9494
case LogLastReconnectFailed:
9595
err := v[0].(error)
9696
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
97-
conn.addr, err)
97+
conn.Addr(), err)
9898
case LogUnexpectedResultId:
9999
resp := v[0].(*Response)
100100
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
101-
conn.addr, resp.RequestId)
101+
conn.Addr(), resp.RequestId)
102102
case LogWatchEventReadFailed:
103103
err := v[0].(error)
104104
log.Printf("tarantool: unable to parse watch event: %s", err)
@@ -156,10 +156,10 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
156156
// More on graceful shutdown:
157157
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
158158
type Connection struct {
159-
addr string
160-
c Conn
161-
mutex sync.Mutex
162-
cond *sync.Cond
159+
dialer Dialer
160+
c Conn
161+
mutex sync.Mutex
162+
cond *sync.Cond
163163
// Schema contains schema loaded on connection.
164164
Schema *Schema
165165
// requestId contains the last request ID for requests with nil context.
@@ -260,11 +260,6 @@ const (
260260

261261
// Opts is a way to configure Connection
262262
type Opts struct {
263-
// Auth is an authentication method.
264-
Auth Auth
265-
// Dialer is a Dialer object used to create a new connection to a
266-
// Tarantool instance. TtDialer is a default one.
267-
Dialer Dialer
268263
// Timeout for response to a particular request. The timeout is reset when
269264
// push messages are received. If Timeout is zero, any request can be
270265
// blocked infinitely.
@@ -287,10 +282,6 @@ type Opts struct {
287282
// endlessly.
288283
// After MaxReconnects attempts Connection becomes closed.
289284
MaxReconnects uint
290-
// Username for logging in to Tarantool.
291-
User string
292-
// User password for logging in to Tarantool.
293-
Pass string
294285
// RateLimit limits number of 'in-fly' request, i.e. already put into
295286
// requests queue, but not yet answered by server or timeouted.
296287
// It is disabled by default.
@@ -315,69 +306,20 @@ type Opts struct {
315306
Handle interface{}
316307
// Logger is user specified logger used for error messages.
317308
Logger Logger
318-
// Transport is the connection type, by default the connection is unencrypted.
319-
Transport string
320-
// SslOpts is used only if the Transport == 'ssl' is set.
321-
Ssl SslOpts
322-
// RequiredProtocolInfo contains minimal protocol version and
323-
// list of protocol features that should be supported by
324-
// Tarantool server. By default there are no restrictions.
325-
RequiredProtocolInfo ProtocolInfo
326-
}
327-
328-
// SslOpts is a way to configure ssl transport.
329-
type SslOpts struct {
330-
// KeyFile is a path to a private SSL key file.
331-
KeyFile string
332-
// CertFile is a path to an SSL certificate file.
333-
CertFile string
334-
// CaFile is a path to a trusted certificate authorities (CA) file.
335-
CaFile string
336-
// Ciphers is a colon-separated (:) list of SSL cipher suites the connection
337-
// can use.
338-
//
339-
// We don't provide a list of supported ciphers. This is what OpenSSL
340-
// does. The only limitation is usage of TLSv1.2 (because other protocol
341-
// versions don't seem to support the GOST cipher). To add additional
342-
// ciphers (GOST cipher), you must configure OpenSSL.
343-
//
344-
// See also
345-
//
346-
// * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
347-
Ciphers string
348-
// Password is a password for decrypting the private SSL key file.
349-
// The priority is as follows: try to decrypt with Password, then
350-
// try PasswordFile.
351-
Password string
352-
// PasswordFile is a path to the list of passwords for decrypting
353-
// the private SSL key file. The connection tries every line from the
354-
// file as a password.
355-
PasswordFile string
356309
}
357310

358311
// Clone returns a copy of the Opts object.
359312
// Any changes in copy RequiredProtocolInfo will not affect the original
360313
// RequiredProtocolInfo value.
361314
func (opts Opts) Clone() Opts {
362315
optsCopy := opts
363-
optsCopy.RequiredProtocolInfo = opts.RequiredProtocolInfo.Clone()
364-
365316
return optsCopy
366317
}
367318

368319
// Connect creates and configures a new Connection.
369-
//
370-
// Address could be specified in following ways:
371-
//
372-
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
373-
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
374-
//
375-
// - Unix socket, first '/' or '.' indicates Unix socket
376-
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
377-
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
378-
func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err error) {
320+
func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, err error) {
379321
conn = &Connection{
380-
addr: addr,
322+
dialer: dialer,
381323
requestId: 0,
382324
contextRequestId: 1,
383325
Greeting: &Greeting{},
@@ -389,9 +331,6 @@ func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err
389331
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
390332
conn.opts.Concurrency = maxprocs * 4
391333
}
392-
if conn.opts.Dialer == nil {
393-
conn.opts.Dialer = TtDialer{}
394-
}
395334
if c := conn.opts.Concurrency; c&(c-1) != 0 {
396335
for i := uint(1); i < 32; i *= 2 {
397336
c |= c >> i
@@ -473,27 +412,7 @@ func (conn *Connection) CloseGraceful() error {
473412

474413
// Addr returns a configured address of Tarantool socket.
475414
func (conn *Connection) Addr() string {
476-
return conn.addr
477-
}
478-
479-
// RemoteAddr returns an address of Tarantool socket.
480-
func (conn *Connection) RemoteAddr() string {
481-
conn.mutex.Lock()
482-
defer conn.mutex.Unlock()
483-
if conn.c == nil {
484-
return ""
485-
}
486-
return conn.c.RemoteAddr().String()
487-
}
488-
489-
// LocalAddr returns an address of outgoing socket.
490-
func (conn *Connection) LocalAddr() string {
491-
conn.mutex.Lock()
492-
defer conn.mutex.Unlock()
493-
if conn.c == nil {
494-
return ""
495-
}
496-
return conn.c.LocalAddr().String()
415+
return conn.c.GetAddr()
497416
}
498417

499418
// Handle returns a user-specified handle from Opts.
@@ -512,14 +431,8 @@ func (conn *Connection) dial(ctx context.Context) error {
512431
opts := conn.opts
513432

514433
var c Conn
515-
c, err := conn.opts.Dialer.Dial(ctx, conn.addr, DialOpts{
516-
IoTimeout: opts.Timeout,
517-
Transport: opts.Transport,
518-
Ssl: opts.Ssl,
519-
RequiredProtocol: opts.RequiredProtocolInfo,
520-
Auth: opts.Auth,
521-
User: opts.User,
522-
Password: opts.Pass,
434+
c, err := conn.dialer.Dial(ctx, DialOpts{
435+
IoTimeout: opts.Timeout,
523436
})
524437
if err != nil {
525438
return err
@@ -1474,7 +1387,7 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
14741387
// That's why we can't just check the Tarantool response for an unsupported
14751388
// request error.
14761389
if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
1477-
conn.opts.RequiredProtocolInfo.Features) {
1390+
conn.c.ProtocolInfo().Features) {
14781391
err := fmt.Errorf("the feature %s must be required by connection "+
14791392
"options to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
14801393
return nil, err
@@ -1580,7 +1493,7 @@ func (conn *Connection) ServerProtocolInfo() ProtocolInfo {
15801493
// Since 1.10.0
15811494
func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
15821495
info := clientProtocolInfo.Clone()
1583-
info.Auth = conn.opts.Auth
1496+
info.Auth = conn.serverProtocolInfo.Auth
15841497
return info
15851498
}
15861499

‎dial.go

Lines changed: 162 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ import (
1515
"github.com/vmihailenco/msgpack/v5"
1616
)
1717

18-
const (
19-
dialTransportNone = ""
20-
dialTransportSsl = "ssl"
21-
)
18+
const bufSize = 128 * 1024
2219

2320
// Greeting is a message sent by Tarantool on connect.
2421
type Greeting struct {
@@ -45,34 +42,18 @@ type Conn interface {
4542
// Any blocked Read or Flush operations will be unblocked and return
4643
// errors.
4744
Close() error
48-
// LocalAddr returns the local network address, if known.
49-
LocalAddr() net.Addr
50-
// RemoteAddr returns the remote network address, if known.
51-
RemoteAddr() net.Addr
5245
// Greeting returns server greeting.
5346
Greeting() Greeting
5447
// ProtocolInfo returns server protocol info.
5548
ProtocolInfo() ProtocolInfo
49+
// GetAddr returns the connection address.
50+
GetAddr() string
5651
}
5752

5853
// DialOpts is a way to configure a Dial method to create a new Conn.
5954
type DialOpts struct {
6055
// IoTimeout is a timeout per a network read/write.
6156
IoTimeout time.Duration
62-
// Transport is a connect transport type.
63-
Transport string
64-
// Ssl configures "ssl" transport.
65-
Ssl SslOpts
66-
// RequiredProtocol contains minimal protocol version and
67-
// list of protocol features that should be supported by
68-
// Tarantool server. By default there are no restrictions.
69-
RequiredProtocol ProtocolInfo
70-
// Auth is an authentication method.
71-
Auth Auth
72-
// Username for logging in to Tarantool.
73-
User string
74-
// User password for logging in to Tarantool.
75-
Password string
7657
}
7758

7859
// Dialer is the interface that wraps a method to connect to a Tarantool
@@ -85,72 +66,198 @@ type DialOpts struct {
8566
type Dialer interface {
8667
// Dial connects to a Tarantool instance to the address with specified
8768
// options.
88-
Dial(ctx context.Context, address string, opts DialOpts) (Conn, error)
69+
Dial(ctx context.Context, opts DialOpts) (Conn, error)
8970
}
9071

9172
type tntConn struct {
73+
addr string
9274
net net.Conn
9375
reader io.Reader
9476
writer writeFlusher
9577
greeting Greeting
9678
protocol ProtocolInfo
9779
}
9880

99-
// TtDialer is a default implementation of the Dialer interface which is
100-
// used by the connector.
81+
// rawDial does basic dial operations:
82+
// reads greeting, identifies a protocol and validates it.
83+
func rawDial(conn *tntConn, requiredProto ProtocolInfo) (string, error) {
84+
version, salt, err := readGreeting(conn.reader)
85+
if err != nil {
86+
return "", fmt.Errorf("failed to read greeting: %w", err)
87+
}
88+
conn.greeting.Version = version
89+
90+
if conn.protocol, err = identify(conn.writer, conn.reader); err != nil {
91+
return "", fmt.Errorf("failed to read greeting: %w", err)
92+
}
93+
94+
if err = checkProtocolInfo(requiredProto, conn.protocol); err != nil {
95+
return "", fmt.Errorf("invalid server protocol: %w", err)
96+
}
97+
return salt, err
98+
}
99+
101100
type TtDialer struct {
101+
// Address is an address to connect.
102+
// It could be specified in following ways:
103+
//
104+
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
105+
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
106+
//
107+
// - Unix socket, first '/' or '.' indicates Unix socket
108+
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
109+
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
110+
Address string
111+
// Username for logging in to Tarantool.
112+
User string
113+
// User password for logging in to Tarantool.
114+
Password string
115+
// RequiredProtocol contains minimal protocol version and
116+
// list of protocol features that should be supported by
117+
// Tarantool server. By default, there are no restrictions.
118+
RequiredProtocolInfo ProtocolInfo
102119
}
103120

104-
// Dial connects to a Tarantool instance to the address with specified
105-
// options.
106-
func (t TtDialer) Dial(ctx context.Context, address string, opts DialOpts) (Conn, error) {
121+
// Dial makes TtDialer satisfy the Dialer interface.
122+
func (d TtDialer) Dial(ctx context.Context, opts DialOpts) (Conn, error) {
107123
var err error
108124
conn := new(tntConn)
125+
conn.addr = d.Address
109126

110-
if conn.net, err = dial(ctx, address, opts); err != nil {
127+
network, address := parseAddress(d.Address)
128+
dialer := net.Dialer{}
129+
conn.net, err = dialer.DialContext(ctx, network, address)
130+
if err != nil {
111131
return nil, fmt.Errorf("failed to dial: %w", err)
112132
}
113133

114134
dc := &deadlineIO{to: opts.IoTimeout, c: conn.net}
115-
conn.reader = bufio.NewReaderSize(dc, 128*1024)
116-
conn.writer = bufio.NewWriterSize(dc, 128*1024)
135+
conn.reader = bufio.NewReaderSize(dc, bufSize)
136+
conn.writer = bufio.NewWriterSize(dc, bufSize)
117137

118-
var version, salt string
119-
if version, salt, err = readGreeting(conn.reader); err != nil {
138+
salt, err := rawDial(conn, d.RequiredProtocolInfo)
139+
if err != nil {
120140
conn.net.Close()
121-
return nil, fmt.Errorf("failed to read greeting: %w", err)
141+
return nil, err
122142
}
123-
conn.greeting.Version = version
124143

125-
if conn.protocol, err = identify(conn.writer, conn.reader); err != nil {
144+
if d.User == "" {
145+
return conn, nil
146+
}
147+
148+
if err = authenticate(conn, ChapSha1Auth, d.User, d.Password, salt); err != nil {
126149
conn.net.Close()
127-
return nil, fmt.Errorf("failed to identify: %w", err)
150+
return nil, fmt.Errorf("failed to authenticate: %w", err)
151+
}
152+
153+
return conn, nil
154+
}
155+
156+
type OpenSSLDialer struct {
157+
// Address is an address to connect.
158+
// It could be specified in following ways:
159+
//
160+
// - TCP connections (tcp://192.168.1.1:3013, tcp://my.host:3013,
161+
// tcp:192.168.1.1:3013, tcp:my.host:3013, 192.168.1.1:3013, my.host:3013)
162+
//
163+
// - Unix socket, first '/' or '.' indicates Unix socket
164+
// (unix:///abs/path/tnt.sock, unix:path/tnt.sock, /abs/path/tnt.sock,
165+
// ./rel/path/tnt.sock, unix/:path/tnt.sock)
166+
Address string
167+
// Auth is an authentication method.
168+
Auth Auth
169+
// Username for logging in to Tarantool.
170+
User string
171+
// User password for logging in to Tarantool.
172+
Password string
173+
// RequiredProtocol contains minimal protocol version and
174+
// list of protocol features that should be supported by
175+
// Tarantool server. By default, there are no restrictions.
176+
RequiredProtocolInfo ProtocolInfo
177+
// KeyFile is a path to a private SSL key file.
178+
KeyFile string
179+
// CertFile is a path to an SSL certificate file.
180+
CertFile string
181+
// CaFile is a path to a trusted certificate authorities (CA) file.
182+
CaFile string
183+
// Ciphers is a colon-separated (:) list of SSL cipher suites the connection
184+
// can use.
185+
//
186+
// We don't provide a list of supported ciphers. This is what OpenSSL
187+
// does. The only limitation is usage of TLSv1.2 (because other protocol
188+
// versions don't seem to support the GOST cipher). To add additional
189+
// ciphers (GOST cipher), you must configure OpenSSL.
190+
//
191+
// See also
192+
//
193+
// * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
194+
Ciphers string
195+
// SSLPassword is a password for decrypting the private SSL key file.
196+
// The priority is as follows: try to decrypt with SSLPassword, then
197+
// try PasswordFile.
198+
SSLPassword string
199+
// PasswordFile is a path to the list of passwords for decrypting
200+
// the private SSL key file. The connection tries every line from the
201+
// file as a password.
202+
PasswordFile string
203+
}
204+
205+
type TlsDialer = OpenSSLDialer
206+
207+
// Dial makes OpenSSLDialer satisfy the Dialer interface.
208+
func (d OpenSSLDialer) Dial(ctx context.Context, opts DialOpts) (Conn, error) {
209+
var err error
210+
conn := new(tntConn)
211+
conn.addr = d.Address
212+
213+
network, address := parseAddress(d.Address)
214+
conn.net, err = sslDialContext(ctx, network, address, sslOpts{
215+
keyFile: d.KeyFile,
216+
certFile: d.CertFile,
217+
caFile: d.CaFile,
218+
ciphers: d.Ciphers,
219+
password: d.SSLPassword,
220+
passwordFile: d.PasswordFile,
221+
})
222+
if err != nil {
223+
return nil, fmt.Errorf("failed to dial: %w", err)
128224
}
129225

130-
if err = checkProtocolInfo(opts.RequiredProtocol, conn.protocol); err != nil {
226+
dc := &deadlineIO{to: opts.IoTimeout, c: conn.net}
227+
conn.reader = bufio.NewReaderSize(dc, bufSize)
228+
conn.writer = bufio.NewWriterSize(dc, bufSize)
229+
230+
salt, err := rawDial(conn, d.RequiredProtocolInfo)
231+
if err != nil {
131232
conn.net.Close()
132-
return nil, fmt.Errorf("invalid server protocol: %w", err)
233+
return nil, err
133234
}
134235

135-
if opts.User != "" {
136-
if opts.Auth == AutoAuth {
137-
if conn.protocol.Auth != AutoAuth {
138-
opts.Auth = conn.protocol.Auth
139-
} else {
140-
opts.Auth = ChapSha1Auth
141-
}
142-
}
236+
if d.User == "" {
237+
return conn, nil
238+
}
143239

144-
err := authenticate(conn, opts, salt)
145-
if err != nil {
146-
conn.net.Close()
147-
return nil, fmt.Errorf("failed to authenticate: %w", err)
240+
if d.Auth == AutoAuth {
241+
if conn.protocol.Auth != AutoAuth {
242+
d.Auth = conn.protocol.Auth
243+
} else {
244+
d.Auth = ChapSha1Auth
148245
}
149246
}
150247

248+
if err = authenticate(conn, d.Auth, d.User, d.Password, salt); err != nil {
249+
conn.net.Close()
250+
return nil, fmt.Errorf("failed to authenticate: %w", err)
251+
}
252+
151253
return conn, nil
152254
}
153255

256+
// GetAddr makes tntConn satisfy the Conn interface.
257+
func (c *tntConn) GetAddr() string {
258+
return c.addr
259+
}
260+
154261
// Read makes tntConn satisfy the Conn interface.
155262
func (c *tntConn) Read(p []byte) (int, error) {
156263
return c.reader.Read(p)
@@ -177,16 +284,6 @@ func (c *tntConn) Close() error {
177284
return c.net.Close()
178285
}
179286

180-
// RemoteAddr makes tntConn satisfy the Conn interface.
181-
func (c *tntConn) RemoteAddr() net.Addr {
182-
return c.net.RemoteAddr()
183-
}
184-
185-
// LocalAddr makes tntConn satisfy the Conn interface.
186-
func (c *tntConn) LocalAddr() net.Addr {
187-
return c.net.LocalAddr()
188-
}
189-
190287
// Greeting makes tntConn satisfy the Conn interface.
191288
func (c *tntConn) Greeting() Greeting {
192289
return c.greeting
@@ -197,20 +294,6 @@ func (c *tntConn) ProtocolInfo() ProtocolInfo {
197294
return c.protocol
198295
}
199296

200-
// dial connects to a Tarantool instance.
201-
func dial(ctx context.Context, address string, opts DialOpts) (net.Conn, error) {
202-
network, address := parseAddress(address)
203-
switch opts.Transport {
204-
case dialTransportNone:
205-
dialer := net.Dialer{}
206-
return dialer.DialContext(ctx, network, address)
207-
case dialTransportSsl:
208-
return sslDialContext(ctx, network, address, opts.Ssl)
209-
default:
210-
return nil, fmt.Errorf("unsupported transport type: %s", opts.Transport)
211-
}
212-
}
213-
214297
// parseAddress split address into network and address parts.
215298
func parseAddress(address string) (string, string) {
216299
network := "tcp"
@@ -316,29 +399,21 @@ func checkProtocolInfo(required ProtocolInfo, actual ProtocolInfo) error {
316399
}
317400
}
318401

319-
// authenticate authenticate for a connection.
320-
func authenticate(c Conn, opts DialOpts, salt string) error {
321-
auth := opts.Auth
322-
user := opts.User
323-
pass := opts.Password
324-
402+
// authenticate authenticates for a connection.
403+
func authenticate(c Conn, auth Auth, user string, pass string, salt string) error {
325404
var req Request
326405
var err error
327406

328-
switch opts.Auth {
407+
switch auth {
329408
case ChapSha1Auth:
330409
req, err = newChapSha1AuthRequest(user, pass, salt)
331410
if err != nil {
332411
return err
333412
}
334413
case PapSha256Auth:
335-
if opts.Transport != dialTransportSsl {
336-
return errors.New("forbidden to use " + auth.String() +
337-
" unless SSL is enabled for the connection")
338-
}
339414
req = newPapSha256AuthRequest(user, pass)
340415
default:
341-
return errors.New("unsupported method " + opts.Auth.String())
416+
return errors.New("unsupported method " + auth.String())
342417
}
343418

344419
if err = writeRequest(c, req); err != nil {

‎pool/connection_pool.go

Lines changed: 119 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
)
2424

2525
var (
26-
ErrEmptyAddrs = errors.New("addrs (first argument) should not be empty")
26+
ErrEmptyDialers = errors.New("dialers (second argument) should not be empty")
2727
ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0")
2828
ErrNoConnection = errors.New("no active connections")
2929
ErrTooManyArgs = errors.New("too many arguments")
@@ -94,8 +94,8 @@ Main features:
9494
- Automatic master discovery by mode parameter.
9595
*/
9696
type ConnectionPool struct {
97-
addrs map[string]*endpoint
98-
addrsMutex sync.RWMutex
97+
ends map[string]*endpoint
98+
endsMutex sync.RWMutex
9999

100100
connOpts tarantool.Opts
101101
opts Opts
@@ -112,7 +112,8 @@ type ConnectionPool struct {
112112
var _ Pooler = (*ConnectionPool)(nil)
113113

114114
type endpoint struct {
115-
addr string
115+
id string
116+
dialer tarantool.Dialer
116117
notify chan tarantool.ConnEvent
117118
conn *tarantool.Connection
118119
role Role
@@ -124,9 +125,10 @@ type endpoint struct {
124125
closeErr error
125126
}
126127

127-
func newEndpoint(addr string) *endpoint {
128+
func newEndpoint(id string, dialer tarantool.Dialer) *endpoint {
128129
return &endpoint{
129-
addr: addr,
130+
id: id,
131+
dialer: dialer,
130132
notify: make(chan tarantool.ConnEvent, 100),
131133
conn: nil,
132134
role: UnknownRole,
@@ -137,24 +139,24 @@ func newEndpoint(addr string) *endpoint {
137139
}
138140
}
139141

140-
// ConnectWithOpts creates pool for instances with addresses addrs
141-
// with options opts.
142-
func ConnectWithOpts(ctx context.Context, addrs []string,
142+
// ConnectWithOpts creates pool for instances with specified dialers and options opts.
143+
// Each dialer corresponds to a certain id by which they will be distinguished.
144+
func ConnectWithOpts(ctx context.Context, dialers map[string]tarantool.Dialer,
143145
connOpts tarantool.Opts, opts Opts) (*ConnectionPool, error) {
144-
if len(addrs) == 0 {
145-
return nil, ErrEmptyAddrs
146+
if len(dialers) == 0 {
147+
return nil, ErrEmptyDialers
146148
}
147149
if opts.CheckTimeout <= 0 {
148150
return nil, ErrWrongCheckTimeout
149151
}
150152

151-
size := len(addrs)
153+
size := len(dialers)
152154
rwPool := newRoundRobinStrategy(size)
153155
roPool := newRoundRobinStrategy(size)
154156
anyPool := newRoundRobinStrategy(size)
155157

156158
connPool := &ConnectionPool{
157-
addrs: make(map[string]*endpoint),
159+
ends: make(map[string]*endpoint),
158160
connOpts: connOpts.Clone(),
159161
opts: opts,
160162
state: unknownState,
@@ -164,11 +166,7 @@ func ConnectWithOpts(ctx context.Context, addrs []string,
164166
anyPool: anyPool,
165167
}
166168

167-
for _, addr := range addrs {
168-
connPool.addrs[addr] = nil
169-
}
170-
171-
somebodyAlive, ctxCanceled := connPool.fillPools(ctx)
169+
somebodyAlive, ctxCanceled := connPool.fillPools(ctx, dialers)
172170
if !somebodyAlive {
173171
connPool.state.set(closedState)
174172
if ctxCanceled {
@@ -179,7 +177,7 @@ func ConnectWithOpts(ctx context.Context, addrs []string,
179177

180178
connPool.state.set(connectedState)
181179

182-
for _, s := range connPool.addrs {
180+
for _, s := range connPool.ends {
183181
endpointCtx, cancel := context.WithCancel(context.Background())
184182
s.cancel = cancel
185183
go connPool.controller(endpointCtx, s)
@@ -188,17 +186,18 @@ func ConnectWithOpts(ctx context.Context, addrs []string,
188186
return connPool, nil
189187
}
190188

191-
// ConnectWithOpts creates pool for instances with addresses addrs.
189+
// Connect creates pool for instances with specified dialers.
190+
// Each dialer corresponds to a certain id by which they will be distinguished.
192191
//
193192
// It is useless to set up tarantool.Opts.Reconnect value for a connection.
194193
// The connection pool has its own reconnection logic. See
195194
// Opts.CheckTimeout description.
196-
func Connect(ctx context.Context, addrs []string,
195+
func Connect(ctx context.Context, dialers map[string]tarantool.Dialer,
197196
connOpts tarantool.Opts) (*ConnectionPool, error) {
198197
opts := Opts{
199198
CheckTimeout: 1 * time.Second,
200199
}
201-
return ConnectWithOpts(ctx, addrs, connOpts, opts)
200+
return ConnectWithOpts(ctx, dialers, connOpts, opts)
202201
}
203202

204203
// ConnectedNow gets connected status of pool.
@@ -235,32 +234,32 @@ func (p *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
235234
return conn.ConfiguredTimeout(), nil
236235
}
237236

238-
// Add adds a new endpoint with the address into the pool. This function
237+
// Add adds a new endpoint with the id into the pool. This function
239238
// adds the endpoint only after successful connection.
240-
func (p *ConnectionPool) Add(ctx context.Context, addr string) error {
241-
e := newEndpoint(addr)
239+
func (p *ConnectionPool) Add(ctx context.Context, id string, dialer tarantool.Dialer) error {
240+
e := newEndpoint(id, dialer)
242241

243-
p.addrsMutex.Lock()
242+
p.endsMutex.Lock()
244243
// Ensure that Close()/CloseGraceful() not in progress/done.
245244
if p.state.get() != connectedState {
246-
p.addrsMutex.Unlock()
245+
p.endsMutex.Unlock()
247246
return ErrClosed
248247
}
249-
if _, ok := p.addrs[addr]; ok {
250-
p.addrsMutex.Unlock()
248+
if _, ok := p.ends[id]; ok {
249+
p.endsMutex.Unlock()
251250
return ErrExists
252251
}
253252

254253
endpointCtx, cancel := context.WithCancel(context.Background())
255254
e.cancel = cancel
256255

257-
p.addrs[addr] = e
258-
p.addrsMutex.Unlock()
256+
p.ends[id] = e
257+
p.endsMutex.Unlock()
259258

260259
if err := p.tryConnect(ctx, e); err != nil {
261-
p.addrsMutex.Lock()
262-
delete(p.addrs, addr)
263-
p.addrsMutex.Unlock()
260+
p.endsMutex.Lock()
261+
delete(p.ends, id)
262+
p.endsMutex.Unlock()
264263
e.cancel()
265264
close(e.closed)
266265
return err
@@ -270,13 +269,13 @@ func (p *ConnectionPool) Add(ctx context.Context, addr string) error {
270269
return nil
271270
}
272271

273-
// Remove removes an endpoint with the address from the pool. The call
272+
// Remove removes an endpoint with the id from the pool. The call
274273
// closes an active connection gracefully.
275-
func (p *ConnectionPool) Remove(addr string) error {
276-
p.addrsMutex.Lock()
277-
endpoint, ok := p.addrs[addr]
274+
func (p *ConnectionPool) Remove(id string) error {
275+
p.endsMutex.Lock()
276+
endpoint, ok := p.ends[id]
278277
if !ok {
279-
p.addrsMutex.Unlock()
278+
p.endsMutex.Unlock()
280279
return errors.New("endpoint not exist")
281280
}
282281

@@ -290,20 +289,20 @@ func (p *ConnectionPool) Remove(addr string) error {
290289
close(endpoint.shutdown)
291290
}
292291

293-
delete(p.addrs, addr)
294-
p.addrsMutex.Unlock()
292+
delete(p.ends, id)
293+
p.endsMutex.Unlock()
295294

296295
<-endpoint.closed
297296
return nil
298297
}
299298

300299
func (p *ConnectionPool) waitClose() []error {
301-
p.addrsMutex.RLock()
302-
endpoints := make([]*endpoint, 0, len(p.addrs))
303-
for _, e := range p.addrs {
300+
p.endsMutex.RLock()
301+
endpoints := make([]*endpoint, 0, len(p.ends))
302+
for _, e := range p.ends {
304303
endpoints = append(endpoints, e)
305304
}
306-
p.addrsMutex.RUnlock()
305+
p.endsMutex.RUnlock()
307306

308307
errs := make([]error, 0, len(endpoints))
309308
for _, e := range endpoints {
@@ -319,12 +318,12 @@ func (p *ConnectionPool) waitClose() []error {
319318
func (p *ConnectionPool) Close() []error {
320319
if p.state.cas(connectedState, closedState) ||
321320
p.state.cas(shutdownState, closedState) {
322-
p.addrsMutex.RLock()
323-
for _, s := range p.addrs {
321+
p.endsMutex.RLock()
322+
for _, s := range p.ends {
324323
s.cancel()
325324
close(s.close)
326325
}
327-
p.addrsMutex.RUnlock()
326+
p.endsMutex.RUnlock()
328327
}
329328

330329
return p.waitClose()
@@ -334,47 +333,31 @@ func (p *ConnectionPool) Close() []error {
334333
// for all requests to complete.
335334
func (p *ConnectionPool) CloseGraceful() []error {
336335
if p.state.cas(connectedState, shutdownState) {
337-
p.addrsMutex.RLock()
338-
for _, s := range p.addrs {
336+
p.endsMutex.RLock()
337+
for _, s := range p.ends {
339338
s.cancel()
340339
close(s.shutdown)
341340
}
342-
p.addrsMutex.RUnlock()
341+
p.endsMutex.RUnlock()
343342
}
344343

345344
return p.waitClose()
346345
}
347346

348-
// GetAddrs gets addresses of connections in pool.
349-
func (p *ConnectionPool) GetAddrs() []string {
350-
p.addrsMutex.RLock()
351-
defer p.addrsMutex.RUnlock()
352-
353-
cpy := make([]string, len(p.addrs))
354-
355-
i := 0
356-
for addr := range p.addrs {
357-
cpy[i] = addr
358-
i++
359-
}
360-
361-
return cpy
362-
}
363-
364-
// GetPoolInfo gets information of connections (connected status, ro/rw role).
365-
func (p *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo {
347+
// GetInfo gets information of connections (connected status, ro/rw role).
348+
func (p *ConnectionPool) GetInfo() map[string]*ConnectionInfo {
366349
info := make(map[string]*ConnectionInfo)
367350

368-
p.addrsMutex.RLock()
369-
defer p.addrsMutex.RUnlock()
351+
p.endsMutex.RLock()
352+
defer p.endsMutex.RUnlock()
370353
p.poolsMutex.RLock()
371354
defer p.poolsMutex.RUnlock()
372355

373356
if p.state.get() != connectedState {
374357
return info
375358
}
376359

377-
for addr := range p.addrs {
360+
for addr := range p.ends {
378361
conn, role := p.getConnectionFromPool(addr)
379362
if conn != nil {
380363
info[addr] = &ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role}
@@ -932,16 +915,33 @@ func (p *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Pre
932915
// Since 1.10.0
933916
func (p *ConnectionPool) NewWatcher(key string,
934917
callback tarantool.WatchCallback, mode Mode) (tarantool.Watcher, error) {
935-
watchersRequired := false
936-
for _, feature := range p.connOpts.RequiredProtocolInfo.Features {
937-
if iproto.IPROTO_FEATURE_WATCHERS == feature {
938-
watchersRequired = true
918+
919+
rr := p.anyPool
920+
if mode == RW {
921+
rr = p.rwPool
922+
} else if mode == RO {
923+
rr = p.roPool
924+
}
925+
926+
conns := rr.GetConnections()
927+
928+
watchersRequired := true
929+
for _, conn := range conns {
930+
watchersRequired = false
931+
for _, feature := range conn.ServerProtocolInfo().Features {
932+
if iproto.IPROTO_FEATURE_WATCHERS == feature {
933+
watchersRequired = true
934+
break
935+
}
936+
}
937+
if !watchersRequired {
939938
break
940939
}
941940
}
941+
942942
if !watchersRequired {
943943
return nil, errors.New("the feature IPROTO_FEATURE_WATCHERS must " +
944-
"be required by connection options to create a watcher")
944+
"be required by any connection to create a watcher")
945945
}
946946

947947
watcher := &poolWatcher{
@@ -955,14 +955,6 @@ func (p *ConnectionPool) NewWatcher(key string,
955955

956956
watcher.container.add(watcher)
957957

958-
rr := p.anyPool
959-
if mode == RW {
960-
rr = p.rwPool
961-
} else if mode == RO {
962-
rr = p.roPool
963-
}
964-
965-
conns := rr.GetConnections()
966958
for _, conn := range conns {
967959
if err := watcher.watch(conn); err != nil {
968960
conn.Close()
@@ -1030,22 +1022,22 @@ func (p *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, er
10301022
return UnknownRole, nil
10311023
}
10321024

1033-
func (p *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.Connection, Role) {
1034-
if conn := p.rwPool.GetConnByAddr(addr); conn != nil {
1025+
func (p *ConnectionPool) getConnectionFromPool(id string) (*tarantool.Connection, Role) {
1026+
if conn := p.rwPool.GetConnById(id); conn != nil {
10351027
return conn, MasterRole
10361028
}
10371029

1038-
if conn := p.roPool.GetConnByAddr(addr); conn != nil {
1030+
if conn := p.roPool.GetConnById(id); conn != nil {
10391031
return conn, ReplicaRole
10401032
}
10411033

1042-
return p.anyPool.GetConnByAddr(addr), UnknownRole
1034+
return p.anyPool.GetConnById(id), UnknownRole
10431035
}
10441036

1045-
func (p *ConnectionPool) deleteConnection(addr string) {
1046-
if conn := p.anyPool.DeleteConnByAddr(addr); conn != nil {
1047-
if conn := p.rwPool.DeleteConnByAddr(addr); conn == nil {
1048-
p.roPool.DeleteConnByAddr(addr)
1037+
func (p *ConnectionPool) deleteConnection(id string) {
1038+
if conn := p.anyPool.DeleteConnById(id); conn != nil {
1039+
if conn := p.rwPool.DeleteConnById(id); conn == nil {
1040+
p.roPool.DeleteConnById(id)
10491041
}
10501042
// The internal connection deinitialization.
10511043
p.watcherContainer.mutex.RLock()
@@ -1058,7 +1050,7 @@ func (p *ConnectionPool) deleteConnection(addr string) {
10581050
}
10591051
}
10601052

1061-
func (p *ConnectionPool) addConnection(addr string,
1053+
func (p *ConnectionPool) addConnection(id string,
10621054
conn *tarantool.Connection, role Role) error {
10631055
// The internal connection initialization.
10641056
p.watcherContainer.mutex.RLock()
@@ -1087,17 +1079,17 @@ func (p *ConnectionPool) addConnection(addr string,
10871079
for _, watcher := range watched {
10881080
watcher.unwatch(conn)
10891081
}
1090-
log.Printf("tarantool: failed initialize watchers for %s: %s", addr, err)
1082+
log.Printf("tarantool: failed initialize watchers for %s: %s", id, err)
10911083
return err
10921084
}
10931085

1094-
p.anyPool.AddConn(addr, conn)
1086+
p.anyPool.AddConn(id, conn)
10951087

10961088
switch role {
10971089
case MasterRole:
1098-
p.rwPool.AddConn(addr, conn)
1090+
p.rwPool.AddConn(id, conn)
10991091
case ReplicaRole:
1100-
p.roPool.AddConn(addr, conn)
1092+
p.roPool.AddConn(id, conn)
11011093
}
11021094
return nil
11031095
}
@@ -1130,35 +1122,35 @@ func (p *ConnectionPool) handlerDeactivated(conn *tarantool.Connection,
11301122
}
11311123
}
11321124

1133-
func (p *ConnectionPool) deactivateConnection(addr string,
1125+
func (p *ConnectionPool) deactivateConnection(id string,
11341126
conn *tarantool.Connection, role Role) {
1135-
p.deleteConnection(addr)
1127+
p.deleteConnection(id)
11361128
conn.Close()
11371129
p.handlerDeactivated(conn, role)
11381130
}
11391131

11401132
func (p *ConnectionPool) deactivateConnections() {
1141-
for address, endpoint := range p.addrs {
1133+
for id, endpoint := range p.ends {
11421134
if endpoint != nil && endpoint.conn != nil {
1143-
p.deactivateConnection(address, endpoint.conn, endpoint.role)
1135+
p.deactivateConnection(id, endpoint.conn, endpoint.role)
11441136
}
11451137
}
11461138
}
11471139

11481140
func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
1149-
addr string, end *endpoint) bool {
1141+
id string, end *endpoint) bool {
11501142
role, err := p.getConnectionRole(conn)
11511143
if err != nil {
11521144
conn.Close()
1153-
log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err)
1145+
log.Printf("tarantool: storing connection %s failed: %s\n", id, err)
11541146
return false
11551147
}
11561148

11571149
if !p.handlerDiscovered(conn, role) {
11581150
conn.Close()
11591151
return false
11601152
}
1161-
if p.addConnection(addr, conn, role) != nil {
1153+
if p.addConnection(id, conn, role) != nil {
11621154
conn.Close()
11631155
p.handlerDeactivated(conn, role)
11641156
return false
@@ -1169,34 +1161,35 @@ func (p *ConnectionPool) processConnection(conn *tarantool.Connection,
11691161
return true
11701162
}
11711163

1172-
func (p *ConnectionPool) fillPools(ctx context.Context) (bool, bool) {
1164+
func (p *ConnectionPool) fillPools(
1165+
ctx context.Context,
1166+
dialers map[string]tarantool.Dialer) (bool, bool) {
11731167
somebodyAlive := false
11741168
ctxCanceled := false
11751169

1176-
// It is called before controller() goroutines so we don't expect
1170+
// It is called before controller() goroutines, so we don't expect
11771171
// concurrency issues here.
1178-
for addr := range p.addrs {
1179-
end := newEndpoint(addr)
1180-
p.addrs[addr] = end
1181-
1172+
for id, dialer := range dialers {
1173+
end := newEndpoint(id, dialer)
1174+
p.ends[id] = end
11821175
connOpts := p.connOpts
11831176
connOpts.Notify = end.notify
1184-
conn, err := tarantool.Connect(ctx, addr, connOpts)
1177+
conn, err := tarantool.Connect(ctx, dialer, connOpts)
11851178
if err != nil {
1186-
log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error())
1179+
log.Printf("tarantool: connect to %s failed: %s\n", dialer.GetAddr(), err.Error())
11871180
select {
11881181
case <-ctx.Done():
11891182
ctxCanceled = true
11901183

1191-
p.addrs[addr] = nil
1184+
p.ends[id] = nil
11921185
log.Printf("tarantool: operation was canceled")
11931186

11941187
p.deactivateConnections()
11951188

11961189
return false, ctxCanceled
11971190
default:
11981191
}
1199-
} else if p.processConnection(conn, addr, end) {
1192+
} else if p.processConnection(conn, id, end) {
12001193
somebodyAlive = true
12011194
}
12021195
}
@@ -1214,7 +1207,7 @@ func (p *ConnectionPool) updateConnection(e *endpoint) {
12141207

12151208
if role, err := p.getConnectionRole(e.conn); err == nil {
12161209
if e.role != role {
1217-
p.deleteConnection(e.addr)
1210+
p.deleteConnection(e.id)
12181211
p.poolsMutex.Unlock()
12191212

12201213
p.handlerDeactivated(e.conn, e.role)
@@ -1237,7 +1230,7 @@ func (p *ConnectionPool) updateConnection(e *endpoint) {
12371230
return
12381231
}
12391232

1240-
if p.addConnection(e.addr, e.conn, role) != nil {
1233+
if p.addConnection(e.id, e.conn, role) != nil {
12411234
p.poolsMutex.Unlock()
12421235

12431236
e.conn.Close()
@@ -1251,7 +1244,7 @@ func (p *ConnectionPool) updateConnection(e *endpoint) {
12511244
p.poolsMutex.Unlock()
12521245
return
12531246
} else {
1254-
p.deleteConnection(e.addr)
1247+
p.deleteConnection(e.id)
12551248
p.poolsMutex.Unlock()
12561249

12571250
e.conn.Close()
@@ -1275,14 +1268,15 @@ func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error {
12751268

12761269
connOpts := p.connOpts
12771270
connOpts.Notify = e.notify
1278-
conn, err := tarantool.Connect(ctx, e.addr, connOpts)
1271+
conn, err := tarantool.Connect(ctx, e.dialer, connOpts)
12791272
if err == nil {
12801273
role, err := p.getConnectionRole(conn)
12811274
p.poolsMutex.Unlock()
12821275

12831276
if err != nil {
12841277
conn.Close()
1285-
log.Printf("tarantool: storing connection to %s failed: %s\n", e.addr, err)
1278+
log.Printf("tarantool: storing connection to %s failed: %s\n",
1279+
e.dialer.GetAddr(), err)
12861280
return err
12871281
}
12881282

@@ -1300,7 +1294,7 @@ func (p *ConnectionPool) tryConnect(ctx context.Context, e *endpoint) error {
13001294
return ErrClosed
13011295
}
13021296

1303-
if err = p.addConnection(e.addr, conn, role); err != nil {
1297+
if err = p.addConnection(e.id, conn, role); err != nil {
13041298
p.poolsMutex.Unlock()
13051299
conn.Close()
13061300
p.handlerDeactivated(conn, role)
@@ -1322,7 +1316,7 @@ func (p *ConnectionPool) reconnect(ctx context.Context, e *endpoint) {
13221316
return
13231317
}
13241318

1325-
p.deleteConnection(e.addr)
1319+
p.deleteConnection(e.id)
13261320
p.poolsMutex.Unlock()
13271321

13281322
p.handlerDeactivated(e.conn, e.role)
@@ -1358,7 +1352,7 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) {
13581352
case <-e.close:
13591353
if e.conn != nil {
13601354
p.poolsMutex.Lock()
1361-
p.deleteConnection(e.addr)
1355+
p.deleteConnection(e.id)
13621356
p.poolsMutex.Unlock()
13631357

13641358
if !shutdown {
@@ -1380,7 +1374,7 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) {
13801374
shutdown = true
13811375
if e.conn != nil {
13821376
p.poolsMutex.Lock()
1383-
p.deleteConnection(e.addr)
1377+
p.deleteConnection(e.id)
13841378
p.poolsMutex.Unlock()
13851379

13861380
// We need to catch s.close in the current goroutine, so
@@ -1402,7 +1396,7 @@ func (p *ConnectionPool) controller(ctx context.Context, e *endpoint) {
14021396
if e.conn != nil && e.conn.ClosedNow() {
14031397
p.poolsMutex.Lock()
14041398
if p.state.get() == connectedState {
1405-
p.deleteConnection(e.addr)
1399+
p.deleteConnection(e.id)
14061400
p.poolsMutex.Unlock()
14071401
p.handlerDeactivated(e.conn, e.role)
14081402
e.conn = nil

‎pool/round_robin.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,56 +8,56 @@ import (
88
)
99

1010
type roundRobinStrategy struct {
11-
conns []*tarantool.Connection
12-
indexByAddr map[string]uint
13-
mutex sync.RWMutex
14-
size uint64
15-
current uint64
11+
conns []*tarantool.Connection
12+
indexById map[string]uint
13+
mutex sync.RWMutex
14+
size uint64
15+
current uint64
1616
}
1717

1818
func newRoundRobinStrategy(size int) *roundRobinStrategy {
1919
return &roundRobinStrategy{
20-
conns: make([]*tarantool.Connection, 0, size),
21-
indexByAddr: make(map[string]uint),
22-
size: 0,
23-
current: 0,
20+
conns: make([]*tarantool.Connection, 0, size),
21+
indexById: make(map[string]uint),
22+
size: 0,
23+
current: 0,
2424
}
2525
}
2626

27-
func (r *roundRobinStrategy) GetConnByAddr(addr string) *tarantool.Connection {
27+
func (r *roundRobinStrategy) GetConnById(id string) *tarantool.Connection {
2828
r.mutex.RLock()
2929
defer r.mutex.RUnlock()
3030

31-
index, found := r.indexByAddr[addr]
31+
index, found := r.indexById[id]
3232
if !found {
3333
return nil
3434
}
3535

3636
return r.conns[index]
3737
}
3838

39-
func (r *roundRobinStrategy) DeleteConnByAddr(addr string) *tarantool.Connection {
39+
func (r *roundRobinStrategy) DeleteConnById(id string) *tarantool.Connection {
4040
r.mutex.Lock()
4141
defer r.mutex.Unlock()
4242

4343
if r.size == 0 {
4444
return nil
4545
}
4646

47-
index, found := r.indexByAddr[addr]
47+
index, found := r.indexById[id]
4848
if !found {
4949
return nil
5050
}
5151

52-
delete(r.indexByAddr, addr)
52+
delete(r.indexById, id)
5353

5454
conn := r.conns[index]
5555
r.conns = append(r.conns[:index], r.conns[index+1:]...)
5656
r.size -= 1
5757

58-
for k, v := range r.indexByAddr {
58+
for k, v := range r.indexById {
5959
if v > index {
60-
r.indexByAddr[k] = v - 1
60+
r.indexById[k] = v - 1
6161
}
6262
}
6363

@@ -91,15 +91,15 @@ func (r *roundRobinStrategy) GetConnections() []*tarantool.Connection {
9191
return ret
9292
}
9393

94-
func (r *roundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) {
94+
func (r *roundRobinStrategy) AddConn(id string, conn *tarantool.Connection) {
9595
r.mutex.Lock()
9696
defer r.mutex.Unlock()
9797

98-
if idx, ok := r.indexByAddr[addr]; ok {
98+
if idx, ok := r.indexById[id]; ok {
9999
r.conns[idx] = conn
100100
} else {
101101
r.conns = append(r.conns, conn)
102-
r.indexByAddr[addr] = uint(r.size)
102+
r.indexById[id] = uint(r.size)
103103
r.size += 1
104104
}
105105
}

‎ssl.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,37 @@ import (
1515
"github.com/tarantool/go-openssl"
1616
)
1717

18+
type sslOpts struct {
19+
// keyFile is a path to a private SSL key file.
20+
keyFile string
21+
// certFile is a path to an SSL certificate file.
22+
certFile string
23+
// caFile is a path to a trusted certificate authorities (CA) file.
24+
caFile string
25+
// ciphers is a colon-separated (:) list of SSL cipher suites the connection
26+
// can use.
27+
//
28+
// We don't provide a list of supported ciphers. This is what OpenSSL
29+
// does. The only limitation is usage of TLSv1.2 (because other protocol
30+
// versions don't seem to support the GOST cipher). To add additional
31+
// ciphers (GOST cipher), you must configure OpenSSL.
32+
//
33+
// See also
34+
//
35+
// * https://www.openssl.org/docs/man1.1.1/man1/ciphers.html
36+
ciphers string
37+
// password is a password for decrypting the private SSL key file.
38+
// The priority is as follows: try to decrypt with Password, then
39+
// try PasswordFile.
40+
password string
41+
// passwordFile is a path to the list of passwords for decrypting
42+
// the private SSL key file. The connection tries every line from the
43+
// file as a password.
44+
passwordFile string
45+
}
46+
1847
func sslDialContext(ctx context.Context, network, address string,
19-
opts SslOpts) (connection net.Conn, err error) {
48+
opts sslOpts) (connection net.Conn, err error) {
2049
var sslCtx interface{}
2150
if sslCtx, err = sslCreateContext(opts); err != nil {
2251
return
@@ -27,7 +56,7 @@ func sslDialContext(ctx context.Context, network, address string,
2756

2857
// interface{} is a hack. It helps to avoid dependency of go-openssl in build
2958
// of tests with the tag 'go_tarantool_ssl_disable'.
30-
func sslCreateContext(opts SslOpts) (ctx interface{}, err error) {
59+
func sslCreateContext(opts sslOpts) (ctx interface{}, err error) {
3160
var sslCtx *openssl.Ctx
3261

3362
// Require TLSv1.2, because other protocol versions don't seem to
@@ -39,28 +68,28 @@ func sslCreateContext(opts SslOpts) (ctx interface{}, err error) {
3968
sslCtx.SetMaxProtoVersion(openssl.TLS1_2_VERSION)
4069
sslCtx.SetMinProtoVersion(openssl.TLS1_2_VERSION)
4170

42-
if opts.CertFile != "" {
43-
if err = sslLoadCert(sslCtx, opts.CertFile); err != nil {
71+
if opts.certFile != "" {
72+
if err = sslLoadCert(sslCtx, opts.certFile); err != nil {
4473
return
4574
}
4675
}
4776

48-
if opts.KeyFile != "" {
49-
if err = sslLoadKey(sslCtx, opts.KeyFile, opts.Password, opts.PasswordFile); err != nil {
77+
if opts.keyFile != "" {
78+
if err = sslLoadKey(sslCtx, opts.keyFile, opts.password, opts.passwordFile); err != nil {
5079
return
5180
}
5281
}
5382

54-
if opts.CaFile != "" {
55-
if err = sslCtx.LoadVerifyLocations(opts.CaFile, ""); err != nil {
83+
if opts.caFile != "" {
84+
if err = sslCtx.LoadVerifyLocations(opts.caFile, ""); err != nil {
5685
return
5786
}
5887
verifyFlags := openssl.VerifyPeer | openssl.VerifyFailIfNoPeerCert
5988
sslCtx.SetVerify(verifyFlags, nil)
6089
}
6190

62-
if opts.Ciphers != "" {
63-
sslCtx.SetCipherList(opts.Ciphers)
91+
if opts.ciphers != "" {
92+
sslCtx.SetCipherList(opts.ciphers)
6493
}
6594

6695
return

0 commit comments

Comments
 (0)
Please sign in to comment.