Skip to content

Commit 112b2d7

Browse files
committed
sql: support prepared statements
This patch adds the support of prepared statements. Added a new type for handling prepared statements. Added new IPROTO-constants for support of prepared statements in const.go. Added benchmarks for SQL-select prepared statement. Added examples of using Prepare in example_test.go. Fixed some grammar inconsistencies for the method Execute. Added a test helper for checking if SQL is supported in connected Tarantool. Updated CHANGELOG.md. Follows up #62 Closes #117
1 parent 2eb3fd8 commit 112b2d7

17 files changed

+649
-67
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
- Public API with request object types (#126)
1616
- Support decimal type in msgpack (#96)
1717
- Support datetime type in msgpack (#118)
18+
- Prepared SQL statements (#117)
1819

1920
### Changed
2021

connection.go

+23
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ type Connection struct {
143143

144144
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
145145

146+
// ConnectedObject is an interface that provides the info about a Connection
147+
// the object belongs to.
148+
type ConnectedObject interface {
149+
Conn() *Connection
150+
}
151+
146152
type connShard struct {
147153
rmut sync.Mutex
148154
requests [requestsMap]struct {
@@ -993,6 +999,13 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
993999
// An error is returned if the request was formed incorrectly, or failed to
9941000
// create the future.
9951001
func (conn *Connection) Do(req Request) *Future {
1002+
if connectedReq, ok := req.(ConnectedObject); ok {
1003+
if connectedReq.Conn() != conn {
1004+
fut := NewFuture()
1005+
fut.SetError(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool"))
1006+
return fut
1007+
}
1008+
}
9961009
return conn.send(req)
9971010
}
9981011

@@ -1009,3 +1022,13 @@ func (conn *Connection) OverrideSchema(s *Schema) {
10091022
conn.Schema = s
10101023
}
10111024
}
1025+
1026+
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
1027+
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
1028+
req := NewPrepareRequest(expr)
1029+
resp, err := conn.Do(req).Get()
1030+
if err != nil {
1031+
return nil, err
1032+
}
1033+
return NewPreparedFromResponse(conn, resp)
1034+
}

connection_pool/config.lua

+15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,21 @@ box.once("init", function()
2121
parts = {{ field = 1, type = 'string' }},
2222
if_not_exists = true
2323
})
24+
25+
local sp = box.schema.space.create('SQL_TEST', {
26+
id = 521,
27+
if_not_exists = true,
28+
format = {
29+
{name = "NAME0", type = "unsigned"},
30+
{name = "NAME1", type = "string"},
31+
{name = "NAME2", type = "string"},
32+
}
33+
})
34+
sp:create_index('primary', {type = 'tree', parts = {1, 'uint'}, if_not_exists = true})
35+
sp:insert{1, "test", "test"}
36+
-- grants for sql tests
37+
box.schema.user.grant('test', 'create,read,write,drop,alter', 'space')
38+
box.schema.user.grant('test', 'create', 'sequence')
2439
end)
2540

2641
local function simple_incr(a)

connection_pool/connection_pool.go

+19
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package connection_pool
1212

1313
import (
1414
"errors"
15+
"fmt"
1516
"log"
1617
"sync/atomic"
1718
"time"
@@ -525,7 +526,16 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
525526
}
526527

527528
// Do sends the request and returns a future.
529+
// For requests that belong to an only one connection (e.g. Unprepare or ExecutePrepared)
530+
// the argument of type Mode is unused.
528531
func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarantool.Future {
532+
if connectedReq, ok := req.(tarantool.ConnectedObject); ok {
533+
conn, _ := connPool.getConnectionFromPool(connectedReq.Conn().Addr())
534+
if conn == nil {
535+
return newErrorFuture(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool"))
536+
}
537+
return connectedReq.Conn().Do(req)
538+
}
529539
conn, err := connPool.getNextConnection(userMode)
530540
if err != nil {
531541
return newErrorFuture(err)
@@ -788,3 +798,12 @@ func newErrorFuture(err error) *tarantool.Future {
788798
fut.SetError(err)
789799
return fut
790800
}
801+
802+
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
803+
func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) {
804+
conn, err := connPool.getNextConnection(userMode)
805+
if err != nil {
806+
return nil, err
807+
}
808+
return conn.NewPrepared(expr)
809+
}

connection_pool/connection_pool_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package connection_pool_test
33
import (
44
"log"
55
"os"
6+
"reflect"
67
"strings"
78
"testing"
89
"time"
@@ -1276,6 +1277,72 @@ func TestDo(t *testing.T) {
12761277
require.NotNilf(t, resp, "response is nil after Ping")
12771278
}
12781279

1280+
func TestNewPrepared(t *testing.T) {
1281+
test_helpers.SkipIfSQLUnsupported(t)
1282+
1283+
roles := []bool{true, true, false, true, false}
1284+
1285+
err := test_helpers.SetClusterRO(servers, connOpts, roles)
1286+
require.Nilf(t, err, "fail to set roles for cluster")
1287+
1288+
connPool, err := connection_pool.Connect(servers, connOpts)
1289+
require.Nilf(t, err, "failed to connect")
1290+
require.NotNilf(t, connPool, "conn is nil after Connect")
1291+
1292+
defer connPool.Close()
1293+
1294+
stmt, err := connPool.NewPrepared("SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0=:id AND NAME1=:name;", connection_pool.RO)
1295+
require.Nilf(t, err, "fail to prepare statement: %v", err)
1296+
1297+
if connPool.GetPoolInfo()[stmt.Conn.Addr()].ConnRole != connection_pool.RO {
1298+
t.Errorf("wrong role for the statement's connection")
1299+
}
1300+
1301+
executeReq := tarantool.NewExecutePreparedRequest(stmt)
1302+
unprepareReq := tarantool.NewUnprepareRequest(stmt)
1303+
1304+
resp, err := connPool.Do(executeReq.Args([]interface{}{1, "test"}), connection_pool.ANY).Get()
1305+
if err != nil {
1306+
t.Fatalf("failed to execute prepared: %v", err)
1307+
}
1308+
if resp == nil {
1309+
t.Fatalf("nil response")
1310+
}
1311+
if resp.Code != tarantool.OkCode {
1312+
t.Fatalf("failed to execute prepared: code %d", resp.Code)
1313+
}
1314+
if reflect.DeepEqual(resp.Data[0], []interface{}{1, "test"}) {
1315+
t.Error("Select with named arguments failed")
1316+
}
1317+
if resp.MetaData[0].FieldType != "unsigned" ||
1318+
resp.MetaData[0].FieldName != "NAME0" ||
1319+
resp.MetaData[1].FieldType != "string" ||
1320+
resp.MetaData[1].FieldName != "NAME1" {
1321+
t.Error("Wrong metadata")
1322+
}
1323+
1324+
// the second argument for unprepare request is unused - it already belongs to some connection
1325+
resp, err = connPool.Do(unprepareReq, connection_pool.ANY).Get()
1326+
if err != nil {
1327+
t.Errorf("failed to unprepare prepared statement: %v", err)
1328+
}
1329+
if resp.Code != tarantool.OkCode {
1330+
t.Errorf("failed to unprepare prepared statement: code %d", resp.Code)
1331+
}
1332+
1333+
_, err = connPool.Do(unprepareReq, connection_pool.ANY).Get()
1334+
if err == nil {
1335+
t.Errorf("the statement must be already unprepared")
1336+
}
1337+
require.Contains(t, err.Error(), "Prepared statement with id")
1338+
1339+
_, err = connPool.Do(executeReq, connection_pool.ANY).Get()
1340+
if err == nil {
1341+
t.Errorf("the statement must be already unprepared")
1342+
}
1343+
require.Contains(t, err.Error(), "Prepared statement with id")
1344+
}
1345+
12791346
// runTestMain is a body of TestMain function
12801347
// (see https://pkg.go.dev/testing#hdr-Main).
12811348
// Using defer + os.Exit is not works so TestMain body

connection_pool/example_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -548,3 +548,28 @@ func ExampleConnectionPool_Do() {
548548
// Ping Data []
549549
// Ping Error <nil>
550550
}
551+
552+
func ExampleConnectionPool_NewPrepared() {
553+
pool, err := examplePool(testRoles)
554+
if err != nil {
555+
fmt.Println(err)
556+
}
557+
defer pool.Close()
558+
559+
stmt, err := pool.NewPrepared("SELECT 1", connection_pool.ANY)
560+
if err != nil {
561+
fmt.Println(err)
562+
}
563+
564+
executeReq := tarantool.NewExecutePreparedRequest(stmt)
565+
unprepareReq := tarantool.NewUnprepareRequest(stmt)
566+
567+
_, err = pool.Do(executeReq, connection_pool.ANY).Get()
568+
if err != nil {
569+
fmt.Printf("Failed to execute prepared stmt")
570+
}
571+
_, err = pool.Do(unprepareReq, connection_pool.ANY).Get()
572+
if err != nil {
573+
fmt.Printf("Failed to prepare")
574+
}
575+
}

connector.go

+2
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,7 @@ type Connector interface {
4444
EvalAsync(expr string, args interface{}) *Future
4545
ExecuteAsync(expr string, args interface{}) *Future
4646

47+
NewPrepared(expr string) (*Prepared, error)
48+
4749
Do(req Request) (fut *Future)
4850
}

const.go

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const (
1212
UpsertRequestCode = 9
1313
Call17RequestCode = 10 /* call in >= 1.7 format */
1414
ExecuteRequestCode = 11
15+
PrepareRequestCode = 13
1516
PingRequestCode = 64
1617
SubscribeRequestCode = 66
1718

@@ -31,9 +32,11 @@ const (
3132
KeyData = 0x30
3233
KeyError = 0x31
3334
KeyMetaData = 0x32
35+
KeyBindCount = 0x34
3436
KeySQLText = 0x40
3537
KeySQLBind = 0x41
3638
KeySQLInfo = 0x42
39+
KeyStmtID = 0x43
3740

3841
KeyFieldName = 0x00
3942
KeyFieldType = 0x01

errors.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package tarantool
22

3-
import (
4-
"fmt"
5-
)
3+
import "fmt"
64

75
// Error is wrapper around error returned by Tarantool.
86
type Error struct {

example_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -651,3 +651,43 @@ func ExampleConnection_Execute() {
651651
fmt.Println("MetaData", resp.MetaData)
652652
fmt.Println("SQL Info", resp.SQLInfo)
653653
}
654+
655+
// To use prepared statements to query a tarantool instance, call NewPrepared.
656+
func ExampleConnection_NewPrepared() {
657+
// Tarantool supports SQL since version 2.0.0
658+
isLess, err := test_helpers.IsTarantoolVersionLess(2, 0, 0)
659+
if err != nil || isLess {
660+
return
661+
}
662+
663+
server := "127.0.0.1:3013"
664+
opts := tarantool.Opts{
665+
Timeout: 500 * time.Millisecond,
666+
Reconnect: 1 * time.Second,
667+
MaxReconnects: 3,
668+
User: "test",
669+
Pass: "test",
670+
}
671+
conn, err := tarantool.Connect(server, opts)
672+
if err != nil {
673+
fmt.Printf("Failed to connect: %s", err.Error())
674+
}
675+
676+
stmt, err := conn.NewPrepared("SELECT 1")
677+
if err != nil {
678+
fmt.Printf("Failed to connect: %s", err.Error())
679+
}
680+
681+
executeReq := tarantool.NewExecutePreparedRequest(stmt)
682+
unprepareReq := tarantool.NewUnprepareRequest(stmt)
683+
684+
_, err = conn.Do(executeReq).Get()
685+
if err != nil {
686+
fmt.Printf("Failed to execute prepared stmt")
687+
}
688+
689+
_, err = conn.Do(unprepareReq).Get()
690+
if err != nil {
691+
fmt.Printf("Failed to prepare")
692+
}
693+
}

export_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,21 @@ func RefImplEvalBody(enc *msgpack.Encoder, expr string, args interface{}) error
7575
func RefImplExecuteBody(enc *msgpack.Encoder, expr string, args interface{}) error {
7676
return fillExecute(enc, expr, args)
7777
}
78+
79+
// RefImplPrepareBody is reference implementation for filling of an prepare
80+
// request's body.
81+
func RefImplPrepareBody(enc *msgpack.Encoder, expr string) error {
82+
return fillPrepare(enc, expr)
83+
}
84+
85+
// RefImplUnprepareBody is reference implementation for filling of an execute prepared
86+
// request's body.
87+
func RefImplExecutePreparedBody(enc *msgpack.Encoder, stmt Prepared, args interface{}) error {
88+
return fillExecutePrepared(enc, stmt, args)
89+
}
90+
91+
// RefImplUnprepareBody is reference implementation for filling of an unprepare
92+
// request's body.
93+
func RefImplUnprepareBody(enc *msgpack.Encoder, stmt Prepared) error {
94+
return fillUnprepare(enc, stmt)
95+
}

multi/multi.go

+15
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package multi
1313

1414
import (
1515
"errors"
16+
"fmt"
1617
"sync"
1718
"sync/atomic"
1819
"time"
@@ -492,7 +493,21 @@ func (connMulti *ConnectionMulti) ExecuteAsync(expr string, args interface{}) *t
492493
return connMulti.getCurrentConnection().ExecuteAsync(expr, args)
493494
}
494495

496+
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
497+
func (connMulti *ConnectionMulti) NewPrepared(expr string) (*tarantool.Prepared, error) {
498+
return connMulti.getCurrentConnection().NewPrepared(expr)
499+
}
500+
495501
// Do sends the request and returns a future.
496502
func (connMulti *ConnectionMulti) Do(req tarantool.Request) *tarantool.Future {
503+
if connectedReq, ok := req.(tarantool.ConnectedObject); ok {
504+
_, belongs := connMulti.getConnectionFromPool(connectedReq.Conn().Addr())
505+
if !belongs {
506+
fut := tarantool.NewFuture()
507+
fut.SetError(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool"))
508+
return fut
509+
}
510+
return connectedReq.Conn().Do(req)
511+
}
497512
return connMulti.getCurrentConnection().Do(req)
498513
}

0 commit comments

Comments
 (0)