Skip to content

Commit 56149fa

Browse files
committed
dial: add the ability to connect via socket fd
This patch introduces `FdDialer`, which connects to Tarantool using an existing socket file descriptor. `FdDialer` is not authenticated when creating a connection. Closes #321
1 parent b00ce5c commit 56149fa

File tree

6 files changed

+260
-4
lines changed

6 files changed

+260
-4
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
work_dir*
55
.rocks
66
bench*
7+
testdata/sidecar

dial.go

+56
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"net"
11+
"os"
1112
"strings"
1213
"time"
1314

@@ -267,6 +268,61 @@ func (d OpenSslDialer) Dial(ctx context.Context, opts DialOpts) (Conn, error) {
267268
return conn, nil
268269
}
269270

271+
type FdDialer struct {
272+
// Fd is a socket file descrpitor.
273+
Fd uintptr
274+
// RequiredProtocol contains minimal protocol version and
275+
// list of protocol features that should be supported by
276+
// Tarantool server. By default, there are no restrictions.
277+
RequiredProtocolInfo ProtocolInfo
278+
}
279+
280+
type fdAddr struct {
281+
fd uintptr
282+
}
283+
284+
func (a fdAddr) Network() string {
285+
return "fd"
286+
}
287+
288+
func (a fdAddr) String() string {
289+
return fmt.Sprintf("fd://%d", a.fd)
290+
}
291+
292+
type fdConn struct {
293+
fd uintptr
294+
net.Conn
295+
}
296+
297+
func (c *fdConn) LocalAddr() net.Addr {
298+
return fdAddr{fd: c.fd}
299+
}
300+
301+
// Dial makes FdDialer satisfy the Dialer interface.
302+
func (d FdDialer) Dial(ctx context.Context, opts DialOpts) (Conn, error) {
303+
file := os.NewFile(d.Fd, "")
304+
c, err := net.FileConn(file)
305+
if err != nil {
306+
return nil, fmt.Errorf("failed to dial: %w", err)
307+
}
308+
309+
conn := new(tntConn)
310+
conn.isLocal = true
311+
conn.net = &fdConn{fd: d.Fd, Conn: c}
312+
313+
dc := &deadlineIO{to: opts.IoTimeout, c: conn.net}
314+
conn.reader = bufio.NewReaderSize(dc, bufSize)
315+
conn.writer = bufio.NewWriterSize(dc, bufSize)
316+
317+
_, err = rawDial(conn, d.RequiredProtocolInfo)
318+
if err != nil {
319+
conn.net.Close()
320+
return nil, err
321+
}
322+
323+
return conn, nil
324+
}
325+
270326
// Addr makes tntConn satisfy the Conn interface.
271327
func (c *tntConn) Addr() net.Addr {
272328
if c.isLocal {

dial_test.go

+52-4
Original file line numberDiff line numberDiff line change
@@ -425,10 +425,11 @@ func assertRequest(t *testing.T, r io.Reader, expected []byte) {
425425
}
426426

427427
type testDialOpts struct {
428-
errGreeting bool
429-
errId bool
430-
errAuth bool
431-
idUnsupported bool
428+
errGreeting bool
429+
errId bool
430+
errAuth bool
431+
idUnsupported bool
432+
authNoRequired bool
432433
}
433434

434435
func testDialAccept(t *testing.T, ch chan struct{}, opts testDialOpts, l net.Listener) {
@@ -458,6 +459,9 @@ func testDialAccept(t *testing.T, ch chan struct{}, opts testDialOpts, l net.Lis
458459
client.Write(idResponse)
459460
}
460461

462+
if opts.authNoRequired {
463+
return
464+
}
461465
// Check Auth request.
462466
assertRequest(t, client, authRequestExpected)
463467
if opts.errAuth {
@@ -566,3 +570,47 @@ func TestOpenSslDialer_Dial(t *testing.T) {
566570
testDialer(t, l, dialer)
567571
}
568572
}
573+
574+
func TestFdDialer_Dial(t *testing.T) {
575+
l, err := net.Listen("tcp", "127.0.0.1:0")
576+
require.NoError(t, err)
577+
addr := l.Addr().String()
578+
579+
for _, cs := range testDialCases {
580+
opts := cs.opts
581+
if opts.errAuth {
582+
// No need to test FdDialer for auth errors.
583+
continue
584+
}
585+
// FdDialer doesn't make auth requests.
586+
opts.authNoRequired = true
587+
588+
t.Run(cs.name, func(t *testing.T) {
589+
ch := make(chan struct{})
590+
go testDialAccept(t, ch, opts, l)
591+
592+
sock, err := net.Dial("tcp", addr)
593+
require.NoError(t, err)
594+
f, err := sock.(*net.TCPConn).File()
595+
require.NoError(t, err)
596+
597+
dialer := tarantool.FdDialer{
598+
Fd: f.Fd(),
599+
}
600+
ctx, cancel := test_helpers.GetConnectContext()
601+
defer cancel()
602+
conn, err := dialer.Dial(ctx, tarantool.DialOpts{
603+
IoTimeout: time.Second * 2,
604+
})
605+
<-ch
606+
if cs.wantErr {
607+
require.Error(t, err)
608+
return
609+
}
610+
require.NoError(t, err)
611+
require.Equal(t, cs.protocolInfo, conn.ProtocolInfo())
612+
require.Equal(t, cs.version, []byte(conn.Greeting().Version))
613+
conn.Close()
614+
})
615+
}
616+
}

example_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package tarantool_test
33
import (
44
"context"
55
"fmt"
6+
"net"
67
"time"
78

89
"github.com/tarantool/go-iproto"
@@ -1250,3 +1251,33 @@ func ExampleWatchOnceRequest() {
12501251
fmt.Println(resp.Data)
12511252
}
12521253
}
1254+
1255+
// This example demonstrates how to use an existing socket file descriptor
1256+
// to establish a connection with Tarantool. This can be useful if the socket fd
1257+
// was inherited from the Tarantool process itself.
1258+
// For details, please see TestFdDialer.
1259+
func ExampleFdDialer() {
1260+
addr := dialer.Address
1261+
c, err := net.Dial("tcp", addr)
1262+
if err != nil {
1263+
fmt.Printf("can't establish connection: %v\n", err)
1264+
return
1265+
}
1266+
f, err := c.(*net.TCPConn).File()
1267+
if err != nil {
1268+
fmt.Printf("unexpected error: %v\n", err)
1269+
}
1270+
dialer := tarantool.FdDialer{
1271+
Fd: f.Fd(),
1272+
}
1273+
// Use an existing socket fd to create connection with Tarantool.
1274+
conn, err := tarantool.Connect(context.Background(), dialer, opts)
1275+
if err != nil {
1276+
fmt.Printf("connect error: %v\n", err)
1277+
return
1278+
}
1279+
resp, err := conn.Do(tarantool.NewPingRequest()).Get()
1280+
fmt.Println(resp.Code, err)
1281+
// Output:
1282+
// 0 <nil>
1283+
}

tarantool_test.go

+81
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"log"
99
"math"
1010
"os"
11+
"os/exec"
12+
"path/filepath"
1113
"reflect"
1214
"runtime"
1315
"strings"
@@ -76,6 +78,7 @@ func (m *Member) DecodeMsgpack(d *msgpack.Decoder) error {
7678
}
7779

7880
var server = "127.0.0.1:3013"
81+
var fdDialerTestServer = "127.0.0.1:3014"
7982
var spaceNo = uint32(617)
8083
var spaceName = "test"
8184
var indexNo = uint32(0)
@@ -3989,6 +3992,84 @@ func TestConnect_context_cancel(t *testing.T) {
39893992
}
39903993
}
39913994

3995+
func buildSidecar(dir string) error {
3996+
goPath, err := exec.LookPath("go")
3997+
if err != nil {
3998+
return err
3999+
}
4000+
cmd := exec.Command(goPath, "build", "sidecar.go")
4001+
cmd.Dir = filepath.Join(dir, "testdata")
4002+
return cmd.Run()
4003+
}
4004+
4005+
func TestFdDialer(t *testing.T) {
4006+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
4007+
if err != nil || isLess {
4008+
t.Skip("box.session.new present in Tarantool since version 3.0")
4009+
}
4010+
4011+
wd, err := os.Getwd()
4012+
require.NoError(t, err)
4013+
4014+
err = buildSidecar(wd)
4015+
require.NoErrorf(t, err, "failed to build sidecar: %v", err)
4016+
4017+
instOpts := startOpts
4018+
instOpts.Listen = fdDialerTestServer
4019+
_, err = test_helpers.StartTarantool(instOpts, TtDialer{
4020+
Address: fdDialerTestServer,
4021+
User: "test",
4022+
Password: "test",
4023+
})
4024+
require.NoError(t, err)
4025+
4026+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
4027+
defer conn.Close()
4028+
4029+
sidecarExe := filepath.Join(wd, "testdata", "sidecar")
4030+
4031+
evalBody := fmt.Sprintf(`
4032+
local socket = require('socket')
4033+
local popen = require('popen')
4034+
local os = require('os')
4035+
local s1, s2 = socket.socketpair('AF_UNIX', 'SOCK_STREAM', 0)
4036+
4037+
--[[ Tell sidecar which fd use to connect. --]]
4038+
os.setenv('SOCKET_FD', tostring(s2:fd()))
4039+
4040+
box.session.new({
4041+
type = 'binary',
4042+
fd = s1:fd(),
4043+
user = 'test',
4044+
})
4045+
s1:detach()
4046+
4047+
local ph, err = popen.new({'%s'}, {
4048+
stdout = popen.opts.PIPE,
4049+
stderr = popen.opts.PIPE,
4050+
inherit_fds = {s2:fd()},
4051+
})
4052+
4053+
if err ~= nil then
4054+
return 1, err
4055+
end
4056+
4057+
ph:wait()
4058+
4059+
local status_code = ph:info().status.exit_code
4060+
local stderr = ph:read({stderr=true}):rstrip()
4061+
local stdout = ph:read({stdout=true}):rstrip()
4062+
return status_code, stderr, stdout
4063+
`, sidecarExe)
4064+
4065+
var resp []interface{}
4066+
err = conn.EvalTyped(evalBody, []interface{}{}, &resp)
4067+
require.NoError(t, err)
4068+
require.Equal(t, "", resp[1], resp[1])
4069+
require.Equal(t, "", resp[2], resp[2])
4070+
require.Equal(t, int8(0), resp[0])
4071+
}
4072+
39924073
// runTestMain is a body of TestMain function
39934074
// (see https://pkg.go.dev/testing#hdr-Main).
39944075
// Using defer + os.Exit is not works so TestMain body

testdata/sidecar.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
"reflect"
7+
"strconv"
8+
9+
"github.com/tarantool/go-tarantool/v2"
10+
)
11+
12+
func main() {
13+
fd, err := strconv.Atoi(os.Getenv("SOCKET_FD"))
14+
if err != nil {
15+
panic(err)
16+
}
17+
dialer := tarantool.FdDialer{
18+
Fd: uintptr(fd),
19+
}
20+
conn, err := tarantool.Connect(context.Background(), dialer, tarantool.Opts{})
21+
if err != nil {
22+
panic(err)
23+
}
24+
if _, err := conn.Do(tarantool.NewPingRequest()).
25+
Get(); err != nil {
26+
panic(err)
27+
}
28+
if _, err := conn.Do(tarantool.NewInsertRequest("test").
29+
Tuple([]interface{}{666})).Get(); err != nil {
30+
panic(err)
31+
}
32+
var res [][]int64
33+
if err := conn.Do(tarantool.NewSelectRequest("test")).GetTyped(&res); err != nil {
34+
panic(err)
35+
}
36+
if !reflect.DeepEqual([][]int64{[]int64{666}}, res) {
37+
panic("unexpected data")
38+
}
39+
}

0 commit comments

Comments
 (0)