Skip to content

sql: support prepared statements #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Public API with request object types (#126)
- Support decimal type in msgpack (#96)
- Support datetime type in msgpack (#118)
- Prepared SQL statements (#117)

### Changed

Expand All @@ -30,6 +31,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
### Fixed

- Build with OpenSSL < 1.1.1 (#194)
- Add `ExecuteAsync` and `ExecuteTyped` to common connector interface (#62)

## [1.6.0] - 2022-06-01

Expand Down
17 changes: 17 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,13 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
// An error is returned if the request was formed incorrectly, or failed to
// create the future.
func (conn *Connection) Do(req Request) *Future {
if connectedReq, ok := req.(ConnectedRequest); ok {
if connectedReq.Conn() != conn {
fut := NewFuture()
fut.SetError(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool"))
return fut
}
}
return conn.send(req)
}

Expand All @@ -1009,3 +1016,13 @@ func (conn *Connection) OverrideSchema(s *Schema) {
conn.Schema = s
}
}

// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
req := NewPrepareRequest(expr)
resp, err := conn.Do(req).Get()
if err != nil {
return nil, err
}
return NewPreparedFromResponse(conn, resp)
}
15 changes: 15 additions & 0 deletions connection_pool/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ box.once("init", function()
parts = {{ field = 1, type = 'string' }},
if_not_exists = true
})

local sp = box.schema.space.create('SQL_TEST', {
id = 521,
if_not_exists = true,
format = {
{name = "NAME0", type = "unsigned"},
{name = "NAME1", type = "string"},
{name = "NAME2", type = "string"},
}
})
sp:create_index('primary', {type = 'tree', parts = {1, 'uint'}, if_not_exists = true})
sp:insert{1, "test", "test"}
-- grants for sql tests
box.schema.user.grant('test', 'create,read,write,drop,alter', 'space')
box.schema.user.grant('test', 'create', 'sequence')
end)

local function simple_incr(a)
Expand Down
19 changes: 19 additions & 0 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package connection_pool

import (
"errors"
"fmt"
"log"
"sync/atomic"
"time"
Expand Down Expand Up @@ -525,7 +526,16 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
}

// Do sends the request and returns a future.
// For requests that belong to an only one connection (e.g. Unprepare or ExecutePrepared)
// the argument of type Mode is unused.
func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarantool.Future {
if connectedReq, ok := req.(tarantool.ConnectedRequest); ok {
conn, _ := connPool.getConnectionFromPool(connectedReq.Conn().Addr())
if conn == nil {
return newErrorFuture(fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool"))
}
return connectedReq.Conn().Do(req)
}
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return newErrorFuture(err)
Expand Down Expand Up @@ -788,3 +798,12 @@ func newErrorFuture(err error) *tarantool.Future {
fut.SetError(err)
return fut
}

// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
func (connPool *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Prepared, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return nil, err
}
return conn.NewPrepared(expr)
}
93 changes: 93 additions & 0 deletions connection_pool/connection_pool_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package connection_pool_test

import (
"fmt"
"log"
"os"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1276,6 +1278,97 @@ func TestDo(t *testing.T) {
require.NotNilf(t, resp, "response is nil after Ping")
}

func TestNewPrepared(t *testing.T) {
test_helpers.SkipIfSQLUnsupported(t)

roles := []bool{true, true, false, true, false}

err := test_helpers.SetClusterRO(servers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

connPool, err := connection_pool.Connect(servers, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

stmt, err := connPool.NewPrepared("SELECT NAME0, NAME1 FROM SQL_TEST WHERE NAME0=:id AND NAME1=:name;", connection_pool.RO)
require.Nilf(t, err, "fail to prepare statement: %v", err)

if connPool.GetPoolInfo()[stmt.Conn.Addr()].ConnRole != connection_pool.RO {
t.Errorf("wrong role for the statement's connection")
}

executeReq := tarantool.NewExecutePreparedRequest(stmt)
unprepareReq := tarantool.NewUnprepareRequest(stmt)

resp, err := connPool.Do(executeReq.Args([]interface{}{1, "test"}), connection_pool.ANY).Get()
if err != nil {
t.Fatalf("failed to execute prepared: %v", err)
}
if resp == nil {
t.Fatalf("nil response")
}
if resp.Code != tarantool.OkCode {
t.Fatalf("failed to execute prepared: code %d", resp.Code)
}
if reflect.DeepEqual(resp.Data[0], []interface{}{1, "test"}) {
t.Error("Select with named arguments failed")
}
if resp.MetaData[0].FieldType != "unsigned" ||
resp.MetaData[0].FieldName != "NAME0" ||
resp.MetaData[1].FieldType != "string" ||
resp.MetaData[1].FieldName != "NAME1" {
t.Error("Wrong metadata")
}

// the second argument for unprepare request is unused - it already belongs to some connection
resp, err = connPool.Do(unprepareReq, connection_pool.ANY).Get()
if err != nil {
t.Errorf("failed to unprepare prepared statement: %v", err)
}
if resp.Code != tarantool.OkCode {
t.Errorf("failed to unprepare prepared statement: code %d", resp.Code)
}

_, err = connPool.Do(unprepareReq, connection_pool.ANY).Get()
if err == nil {
t.Errorf("the statement must be already unprepared")
}
require.Contains(t, err.Error(), "Prepared statement with id")

_, err = connPool.Do(executeReq, connection_pool.ANY).Get()
if err == nil {
t.Errorf("the statement must be already unprepared")
}
require.Contains(t, err.Error(), "Prepared statement with id")
}

func TestDoWithStrangerConn(t *testing.T) {
expectedErr := fmt.Errorf("the passed connected request doesn't belong to the current connection or connection pool")

roles := []bool{true, true, false, true, false}

err := test_helpers.SetClusterRO(servers, connOpts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

connPool, err := connection_pool.Connect(servers, connOpts)
require.Nilf(t, err, "failed to connect")
require.NotNilf(t, connPool, "conn is nil after Connect")

defer connPool.Close()

req := test_helpers.NewStrangerRequest()

_, err = connPool.Do(req, connection_pool.ANY).Get()
if err == nil {
t.Fatalf("nil error catched")
}
if err.Error() != expectedErr.Error() {
t.Fatalf("Unexpected error catched")
}
}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
Expand Down
25 changes: 25 additions & 0 deletions connection_pool/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,28 @@ func ExampleConnectionPool_Do() {
// Ping Data []
// Ping Error <nil>
}

func ExampleConnectionPool_NewPrepared() {
pool, err := examplePool(testRoles)
if err != nil {
fmt.Println(err)
}
defer pool.Close()

stmt, err := pool.NewPrepared("SELECT 1", connection_pool.ANY)
if err != nil {
fmt.Println(err)
}

executeReq := tarantool.NewExecutePreparedRequest(stmt)
unprepareReq := tarantool.NewUnprepareRequest(stmt)

_, err = pool.Do(executeReq, connection_pool.ANY).Get()
if err != nil {
fmt.Printf("Failed to execute prepared stmt")
}
_, err = pool.Do(unprepareReq, connection_pool.ANY).Get()
if err != nil {
fmt.Printf("Failed to prepare")
}
}
4 changes: 4 additions & 0 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Connector interface {
Call16Typed(functionName string, args interface{}, result interface{}) (err error)
Call17Typed(functionName string, args interface{}, result interface{}) (err error)
EvalTyped(expr string, args interface{}, result interface{}) (err error)
ExecuteTyped(expr string, args interface{}, result interface{}) (SQLInfo, []ColumnMetaData, error)

SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}) *Future
InsertAsync(space interface{}, tuple interface{}) *Future
Expand All @@ -41,6 +42,9 @@ type Connector interface {
Call16Async(functionName string, args interface{}) *Future
Call17Async(functionName string, args interface{}) *Future
EvalAsync(expr string, args interface{}) *Future
ExecuteAsync(expr string, args interface{}) *Future

NewPrepared(expr string) (*Prepared, error)

Do(req Request) (fut *Future)
}
3 changes: 3 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
UpsertRequestCode = 9
Call17RequestCode = 10 /* call in >= 1.7 format */
ExecuteRequestCode = 11
PrepareRequestCode = 13
PingRequestCode = 64
SubscribeRequestCode = 66

Expand All @@ -31,9 +32,11 @@ const (
KeyData = 0x30
KeyError = 0x31
KeyMetaData = 0x32
KeyBindCount = 0x34
KeySQLText = 0x40
KeySQLBind = 0x41
KeySQLInfo = 0x42
KeyStmtID = 0x43

KeyFieldName = 0x00
KeyFieldType = 0x01
Expand Down
4 changes: 1 addition & 3 deletions errors.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package tarantool

import (
"fmt"
)
import "fmt"

// Error is wrapper around error returned by Tarantool.
type Error struct {
Expand Down
40 changes: 40 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,43 @@ func ExampleConnection_Execute() {
fmt.Println("MetaData", resp.MetaData)
fmt.Println("SQL Info", resp.SQLInfo)
}

// To use prepared statements to query a tarantool instance, call NewPrepared.
func ExampleConnection_NewPrepared() {
// Tarantool supports SQL since version 2.0.0
isLess, err := test_helpers.IsTarantoolVersionLess(2, 0, 0)
if err != nil || isLess {
return
}

server := "127.0.0.1:3013"
opts := tarantool.Opts{
Timeout: 500 * time.Millisecond,
Reconnect: 1 * time.Second,
MaxReconnects: 3,
User: "test",
Pass: "test",
}
conn, err := tarantool.Connect(server, opts)
if err != nil {
fmt.Printf("Failed to connect: %s", err.Error())
}

stmt, err := conn.NewPrepared("SELECT 1")
if err != nil {
fmt.Printf("Failed to connect: %s", err.Error())
}

executeReq := tarantool.NewExecutePreparedRequest(stmt)
unprepareReq := tarantool.NewUnprepareRequest(stmt)

_, err = conn.Do(executeReq).Get()
if err != nil {
fmt.Printf("Failed to execute prepared stmt")
}

_, err = conn.Do(unprepareReq).Get()
if err != nil {
fmt.Printf("Failed to prepare")
}
}
18 changes: 18 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,21 @@ func RefImplEvalBody(enc *msgpack.Encoder, expr string, args interface{}) error
func RefImplExecuteBody(enc *msgpack.Encoder, expr string, args interface{}) error {
return fillExecute(enc, expr, args)
}

// RefImplPrepareBody is reference implementation for filling of an prepare
// request's body.
func RefImplPrepareBody(enc *msgpack.Encoder, expr string) error {
return fillPrepare(enc, expr)
}

// RefImplUnprepareBody is reference implementation for filling of an execute prepared
// request's body.
func RefImplExecutePreparedBody(enc *msgpack.Encoder, stmt Prepared, args interface{}) error {
return fillExecutePrepared(enc, stmt, args)
}

// RefImplUnprepareBody is reference implementation for filling of an unprepare
// request's body.
func RefImplUnprepareBody(enc *msgpack.Encoder, stmt Prepared) error {
return fillUnprepare(enc, stmt)
}
15 changes: 15 additions & 0 deletions multi/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ rawset(_G, 'get_cluster_nodes', get_cluster_nodes)
box.once("init", function()
box.schema.user.create('test', { password = 'test' })
box.schema.user.grant('test', 'read,write,execute', 'universe')

local sp = box.schema.space.create('SQL_TEST', {
id = 521,
if_not_exists = true,
format = {
{name = "NAME0", type = "unsigned"},
{name = "NAME1", type = "string"},
{name = "NAME2", type = "string"},
}
})
sp:create_index('primary', {type = 'tree', parts = {1, 'uint'}, if_not_exists = true})
sp:insert{1, "test", "test"}
-- grants for sql tests
box.schema.user.grant('test', 'create,read,write,drop,alter', 'space')
box.schema.user.grant('test', 'create', 'sequence')
end)

local function simple_incr(a)
Expand Down
Loading