Skip to content

Commit 017eb2a

Browse files
committed
Add public API with a request object for Select/Update/Upsert
This patch provides request types for part of space operations: Select, Update and Upstream. It allows to create requests step by step. The main idea here is too provide more extensible approach to create requests. Part of #126
1 parent a9e491f commit 017eb2a

11 files changed

+1006
-157
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1818
- queue-utube handling (#85)
1919
- Master discovery (#113)
2020
- SQL support (#62)
21+
- Add public API with a request object for Select/Update/Upstream (#126)
2122

2223
### Fixed
2324

client_tools.go

+73
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,79 @@ func (o Op) EncodeMsgpack(enc *msgpack.Encoder) error {
6767
return enc.Encode(o.Arg)
6868
}
6969

70+
const (
71+
appendOperator = "+"
72+
subtractionOperator = "-"
73+
bitwiseAndOperator = "&"
74+
bitwiseOrOperator = "|"
75+
bitwiseXorOperator = "^"
76+
spliceOperator = ":"
77+
insertOperator = "!"
78+
deleteOperator = "#"
79+
assignOperator = "="
80+
)
81+
82+
// Operations is a collection of update operations.
83+
type Operations struct {
84+
ops []Op
85+
}
86+
87+
// NewOperations returns a new empty collection of update operations.
88+
func NewOperations() *Operations {
89+
ops := new(Operations)
90+
return ops
91+
}
92+
93+
func (ops *Operations) append(op string, field int, arg interface{}) *Operations {
94+
ops.ops = append(ops.ops, Op{op, field, arg})
95+
return ops
96+
}
97+
98+
// Add adds an additional operation to the collection of update operations.
99+
func (ops *Operations) Add(field int, arg interface{}) *Operations {
100+
return ops.append(appendOperator, field, arg)
101+
}
102+
103+
// Subtract adds a subtraction operation to the collection of update operations.
104+
func (ops *Operations) Subtract(field int, arg interface{}) *Operations {
105+
return ops.append(subtractionOperator, field, arg)
106+
}
107+
108+
// BitwiseAnd adds a bitwise AND operation to the collection of update operations.
109+
func (ops *Operations) BitwiseAnd(field int, arg interface{}) *Operations {
110+
return ops.append(bitwiseAndOperator, field, arg)
111+
}
112+
113+
// BitwiseOr adds a bitwise OR operation to the collection of update operations.
114+
func (ops *Operations) BitwiseOr(field int, arg interface{}) *Operations {
115+
return ops.append(bitwiseOrOperator, field, arg)
116+
}
117+
118+
// BitwiseXor adds a bitwise XOR operation to the collection of update operations.
119+
func (ops *Operations) BitwiseXor(field int, arg interface{}) *Operations {
120+
return ops.append(bitwiseXorOperator, field, arg)
121+
}
122+
123+
// Splice adds a splice operation to the collection of update operations.
124+
func (ops *Operations) Splice(field int, arg interface{}) *Operations {
125+
return ops.append(spliceOperator, field, arg)
126+
}
127+
128+
// Insert adds an insert operation to the collection of update operations.
129+
func (ops *Operations) Insert(field int, arg interface{}) *Operations {
130+
return ops.append(insertOperator, field, arg)
131+
}
132+
133+
// Delete adds a delete operation to the collection of update operations.
134+
func (ops *Operations) Delete(field int, arg interface{}) *Operations {
135+
return ops.append(deleteOperator, field, arg)
136+
}
137+
138+
// Assign adds an assign operation to the collection of update operations.
139+
func (ops *Operations) Assign(field int, arg interface{}) *Operations {
140+
return ops.append(assignOperator, field, arg)
141+
}
142+
70143
type OpSplice struct {
71144
Op string
72145
Field int

connection.go

+38-1
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ func (conn *Connection) dial() (err error) {
431431
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
432432
request := &Future{
433433
requestId: 0,
434-
requestCode: AuthRequest,
434+
requestCode: AuthRequestCode,
435435
}
436436
var packet smallWBuf
437437
err = request.pack(&packet, msgpack.NewEncoder(&packet), func(enc *msgpack.Encoder) error {
@@ -874,6 +874,43 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
874874
return atomic.AddUint32(&conn.requestId, 1)
875875
}
876876

877+
// Do verifies, sends the request and returns a response.
878+
//
879+
// An error is returned if the request was formed incorrectly, or failure to
880+
// communicate by the connection, or unable to decode the response.
881+
func (conn *Connection) Do(req Request) (*Response, error) {
882+
fut, err := conn.DoAsync(req)
883+
if err != nil {
884+
return nil, err
885+
}
886+
return fut.Get()
887+
}
888+
889+
// DoTyped verifies, sends the request and fills the typed result.
890+
//
891+
// An error is returned if the request was formed incorrectly, or failure to
892+
// communicate by the connection, or unable to decode the response.
893+
func (conn *Connection) DoTyped(req Request, result interface{}) error {
894+
fut, err := conn.DoAsync(req)
895+
if err != nil {
896+
return err
897+
}
898+
return fut.GetTyped(result)
899+
}
900+
901+
// DoAsync verifies, sends the request and returns a future.
902+
//
903+
// An error is returned if the request was formed incorrectly, or failure to
904+
// create the future.
905+
func (conn *Connection) DoAsync(req Request) (*Future, error) {
906+
bodyFunc, err := req.BodyFunc(conn.Schema)
907+
if err != nil {
908+
return nil, err
909+
}
910+
future := conn.newFuture(req.Code())
911+
return future.send(conn, bodyFunc), nil
912+
}
913+
877914
// ConfiguredTimeout returns a timeout from connection config.
878915
func (conn *Connection) ConfiguredTimeout() time.Duration {
879916
return conn.opts.Timeout

const.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package tarantool
22

33
const (
4-
SelectRequest = 1
5-
InsertRequest = 2
6-
ReplaceRequest = 3
7-
UpdateRequest = 4
8-
DeleteRequest = 5
9-
CallRequest = 6 /* call in 1.6 format */
10-
AuthRequest = 7
11-
EvalRequest = 8
12-
UpsertRequest = 9
13-
Call17Request = 10
14-
ExecuteRequest = 11
15-
PingRequest = 64
16-
SubscribeRequest = 66
4+
SelectRequestCode = 1
5+
InsertRequestCode = 2
6+
ReplaceRequestCode = 3
7+
UpdateRequestCode = 4
8+
DeleteRequestCode = 5
9+
CallRequestCode = 6 /* call in 1.6 format */
10+
AuthRequestCode = 7
11+
EvalRequestCode = 8
12+
UpsertRequestCode = 9
13+
Call17RequestCode = 10
14+
ExecuteRequestCode = 11
15+
PingRequestCode = 64
16+
SubscribeRequestCode = 66
1717

1818
KeyCode = 0x00
1919
KeySync = 0x01

example_test.go

+110
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,116 @@ func ExampleConnection_SelectAsync() {
108108
// Future 2 Data [[18 val 18 bla]]
109109
}
110110

111+
func ExampleSelectRequest() {
112+
conn := example_connect()
113+
defer conn.Close()
114+
115+
req := tarantool.NewSelectRequest(517).
116+
Limit(100).
117+
Key(tarantool.IntKey{1111})
118+
resp, err := conn.Do(req)
119+
if err != nil {
120+
fmt.Printf("error in do select request is %v", err)
121+
return
122+
}
123+
fmt.Printf("response is %#v\n", resp.Data)
124+
125+
req = tarantool.NewSelectRequest("test").
126+
Index("primary").
127+
Limit(100).
128+
Key(tarantool.IntKey{1111})
129+
fut, err := conn.DoAsync(req)
130+
if err != nil {
131+
fmt.Printf("error in do async select request is %v", err)
132+
}
133+
resp, err = fut.Get()
134+
if err != nil {
135+
fmt.Printf("error in do async select request is %v", err)
136+
return
137+
}
138+
fmt.Printf("response is %#v\n", resp.Data)
139+
// Output:
140+
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
141+
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
142+
}
143+
144+
func ExampleUpdateRequest() {
145+
conn := example_connect()
146+
defer conn.Close()
147+
148+
req := tarantool.NewUpdateRequest(517).
149+
Key(tarantool.IntKey{1111}).
150+
Operations(tarantool.NewOperations().Assign(1, "bye"))
151+
resp, err := conn.Do(req)
152+
if err != nil {
153+
fmt.Printf("error in do update request is %v", err)
154+
return
155+
}
156+
fmt.Printf("response is %#v\n", resp.Data)
157+
158+
req = tarantool.NewUpdateRequest("test").
159+
Index("primary").
160+
Key(tarantool.IntKey{1111}).
161+
Operations(tarantool.NewOperations().Assign(1, "hello"))
162+
fut, err := conn.DoAsync(req)
163+
if err != nil {
164+
fmt.Printf("error in do async update request is %v", err)
165+
}
166+
resp, err = fut.Get()
167+
if err != nil {
168+
fmt.Printf("error in do async update request is %v", err)
169+
return
170+
}
171+
fmt.Printf("response is %#v\n", resp.Data)
172+
// Output:
173+
// response is []interface {}{[]interface {}{0x457, "bye", "world"}}
174+
// response is []interface {}{[]interface {}{0x457, "hello", "world"}}
175+
}
176+
177+
func ExampleUpsertRequest() {
178+
conn := example_connect()
179+
defer conn.Close()
180+
181+
var req tarantool.Request
182+
req = tarantool.NewUpsertRequest(517).
183+
Tuple([]interface{}{uint(1113), "first", "first"}).
184+
Operations(tarantool.NewOperations().Assign(1, "updated"))
185+
resp, err := conn.Do(req)
186+
if err != nil {
187+
fmt.Printf("error in do select upsert is %v", err)
188+
return
189+
}
190+
fmt.Printf("response is %#v\n", resp.Data)
191+
192+
req = tarantool.NewUpsertRequest("test").
193+
Tuple([]interface{}{uint(1113), "second", "second"}).
194+
Operations(tarantool.NewOperations().Assign(2, "updated"))
195+
fut, err := conn.DoAsync(req)
196+
if err != nil {
197+
fmt.Printf("error in do async upsert request is %v", err)
198+
}
199+
resp, err = fut.Get()
200+
if err != nil {
201+
fmt.Printf("error in do async upsert request is %v", err)
202+
return
203+
}
204+
fmt.Printf("response is %#v\n", resp.Data)
205+
206+
req = tarantool.NewSelectRequest(517).
207+
Limit(100).
208+
Key(tarantool.IntKey{1113})
209+
resp, err = conn.Do(req)
210+
if err != nil {
211+
fmt.Printf("error in do select request is %v", err)
212+
return
213+
}
214+
fmt.Printf("response is %#v\n", resp.Data)
215+
// Output:
216+
// response is []interface {}{}
217+
// response is []interface {}{}
218+
// response is []interface {}{[]interface {}{0x459, "first", "updated"}}
219+
}
220+
111221
func ExampleConnection_Ping() {
112222
conn := example_connect()
113223
defer conn.Close()

export_test.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,23 @@
11
package tarantool
22

3-
func (schema *Schema) ResolveSpaceIndex(s interface{}, i interface{}) (spaceNo, indexNo uint32, err error) {
4-
return schema.resolveSpaceIndex(s, i)
3+
import (
4+
"gopkg.in/vmihailenco/msgpack.v2"
5+
)
6+
7+
// RefImplSelectBody is reference implementation for filling of a select
8+
// request's body.
9+
func RefImplSelectBody(enc *msgpack.Encoder, space, index, offset, limit, iterator uint32, key interface{}) error {
10+
return fillSelect(enc, space, index, offset, limit, iterator, key)
11+
}
12+
13+
// RefImplUpdateBody is reference implementation for filling of an update
14+
// request's body.
15+
func RefImplUpdateBody(enc *msgpack.Encoder, space, index uint32, key, ops interface{}) error {
16+
return fillUpdate(enc, space, index, key, ops)
17+
}
18+
19+
// RefImplUpsertBody is reference implementation for filling of an upsert
20+
// request's body.
21+
func RefImplUpsertBody(enc *msgpack.Encoder, space uint32, tuple, ops interface{}) error {
22+
return fillUpsert(enc, space, tuple, ops)
523
}

0 commit comments

Comments
 (0)