Skip to content

Commit 7116a33

Browse files
committed
code health: all requests use request objects
The patch is a refactoring of an internal logic. It replaces the usage of closures to request objects to construct a request body. After the patch all Connection.* requests use request objects inside. Closes #126
1 parent 1e1f8f4 commit 7116a33

File tree

8 files changed

+159
-258
lines changed

8 files changed

+159
-258
lines changed

connection.go

+39-40
Original file line numberDiff line numberDiff line change
@@ -468,18 +468,35 @@ func (conn *Connection) dial() (err error) {
468468
return
469469
}
470470

471-
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
472-
request := &Future{
473-
requestId: 0,
474-
requestCode: AuthRequestCode,
471+
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
472+
hl := h.Len()
473+
h.Write([]byte{
474+
0xce, 0, 0, 0, 0, // Length.
475+
0x82, // 2 element map.
476+
KeyCode, byte(req.Code()), // Request code.
477+
KeySync, 0xce,
478+
byte(reqid >> 24), byte(reqid >> 16),
479+
byte(reqid >> 8), byte(reqid),
480+
})
481+
482+
if err = req.Body(res, enc); err != nil {
483+
return
475484
}
485+
486+
l := uint32(h.Len() - 5 - hl)
487+
h.b[hl+1] = byte(l >> 24)
488+
h.b[hl+2] = byte(l >> 16)
489+
h.b[hl+3] = byte(l >> 8)
490+
h.b[hl+4] = byte(l)
491+
492+
return
493+
}
494+
495+
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
476496
var packet smallWBuf
477-
err = request.pack(&packet, msgpack.NewEncoder(&packet), func(enc *msgpack.Encoder) error {
478-
return enc.Encode(map[uint32]interface{}{
479-
KeyUserName: conn.opts.User,
480-
KeyTuple: []interface{}{string("chap-sha1"), string(scramble)},
481-
})
482-
})
497+
req := newAuthRequest(conn.opts.User, string(scramble))
498+
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)
499+
483500
if err != nil {
484501
return errors.New("auth: pack error " + err.Error())
485502
}
@@ -704,7 +721,7 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
704721
}
705722
}
706723

707-
func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
724+
func (conn *Connection) newFuture() (fut *Future) {
708725
fut = NewFuture()
709726
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
710727
select {
@@ -720,7 +737,6 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
720737
}
721738
}
722739
fut.requestId = conn.nextRequestId()
723-
fut.requestCode = requestCode
724740
shardn := fut.requestId & (conn.opts.Concurrency - 1)
725741
shard := &conn.shard[shardn]
726742
shard.rmut.Lock()
@@ -769,23 +785,16 @@ func (conn *Connection) newFuture(requestCode int32) (fut *Future) {
769785
return
770786
}
771787

772-
func (conn *Connection) sendFuture(fut *Future, body func(*msgpack.Encoder) error) *Future {
788+
func (conn *Connection) send(req Request) *Future {
789+
fut := conn.newFuture()
773790
if fut.ready == nil {
774791
return fut
775792
}
776-
conn.putFuture(fut, body)
777-
return fut
778-
}
779-
780-
func (conn *Connection) failFuture(fut *Future, err error) *Future {
781-
if f := conn.fetchFuture(fut.requestId); f == fut {
782-
fut.SetError(err)
783-
conn.markDone(fut)
784-
}
793+
conn.putFuture(fut, req)
785794
return fut
786795
}
787796

788-
func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error) {
797+
func (conn *Connection) putFuture(fut *Future, req Request) {
789798
shardn := fut.requestId & (conn.opts.Concurrency - 1)
790799
shard := &conn.shard[shardn]
791800
shard.bufmut.Lock()
@@ -801,10 +810,11 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
801810
shard.enc = msgpack.NewEncoder(&shard.buf)
802811
}
803812
blen := shard.buf.Len()
804-
if err := fut.pack(&shard.buf, shard.enc, body); err != nil {
813+
reqid := fut.requestId
814+
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
805815
shard.buf.Trunc(blen)
806816
shard.bufmut.Unlock()
807-
if f := conn.fetchFuture(fut.requestId); f == fut {
817+
if f := conn.fetchFuture(reqid); f == fut {
808818
fut.SetError(err)
809819
conn.markDone(fut)
810820
} else if f != nil {
@@ -983,10 +993,7 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
983993
// An error is returned if the request was formed incorrectly, or failed to
984994
// communicate by the connection, or unable to decode the response.
985995
func (conn *Connection) Do(req Request) (*Response, error) {
986-
fut, err := conn.DoAsync(req)
987-
if err != nil {
988-
return nil, err
989-
}
996+
fut := conn.DoAsync(req)
990997
return fut.Get()
991998
}
992999

@@ -995,24 +1002,16 @@ func (conn *Connection) Do(req Request) (*Response, error) {
9951002
// An error is returned if the request was formed incorrectly, or failed to
9961003
// communicate by the connection, or unable to decode the response.
9971004
func (conn *Connection) DoTyped(req Request, result interface{}) error {
998-
fut, err := conn.DoAsync(req)
999-
if err != nil {
1000-
return err
1001-
}
1005+
fut := conn.DoAsync(req)
10021006
return fut.GetTyped(result)
10031007
}
10041008

10051009
// DoAsync verifies, sends the request and returns a future.
10061010
//
10071011
// An error is returned if the request was formed incorrectly, or failed to
10081012
// create the future.
1009-
func (conn *Connection) DoAsync(req Request) (*Future, error) {
1010-
bodyFunc, err := req.BodyFunc(conn.Schema)
1011-
if err != nil {
1012-
return nil, err
1013-
}
1014-
future := conn.newFuture(req.Code())
1015-
return conn.sendFuture(future, bodyFunc), nil
1013+
func (conn *Connection) DoAsync(req Request) *Future {
1014+
return conn.send(req)
10161015
}
10171016

10181017
// ConfiguredTimeout returns a timeout from connection config.

connection_pool/connection_pool.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -545,10 +545,10 @@ func (connPool *ConnectionPool) DoTyped(req tarantool.Request, result interface{
545545
}
546546

547547
// DoAsync sends the request and returns a future.
548-
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) (*tarantool.Future, error) {
548+
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) *tarantool.Future {
549549
conn, err := connPool.getNextConnection(userMode)
550550
if err != nil {
551-
return nil, err
551+
return tarantool.NewErrorFuture(err)
552552
}
553553

554554
return conn.DoAsync(req)

connector.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ type Connector interface {
4444

4545
Do(req Request) (resp *Response, err error)
4646
DoTyped(req Request, result interface{}) (err error)
47-
DoAsync(req Request) (fut *Future, err error)
47+
DoAsync(req Request) (fut *Future)
4848
}

example_test.go

+3-12
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,7 @@ func ExampleSelectRequest() {
144144
Index("primary").
145145
Limit(100).
146146
Key(tarantool.IntKey{1111})
147-
fut, err := conn.DoAsync(req)
148-
if err != nil {
149-
fmt.Printf("error in do async select request is %v", err)
150-
}
147+
fut := conn.DoAsync(req)
151148
resp, err = fut.Get()
152149
if err != nil {
153150
fmt.Printf("error in do async select request is %v", err)
@@ -177,10 +174,7 @@ func ExampleUpdateRequest() {
177174
Index("primary").
178175
Key(tarantool.IntKey{1111}).
179176
Operations(tarantool.NewOperations().Assign(1, "hello"))
180-
fut, err := conn.DoAsync(req)
181-
if err != nil {
182-
fmt.Printf("error in do async update request is %v", err)
183-
}
177+
fut := conn.DoAsync(req)
184178
resp, err = fut.Get()
185179
if err != nil {
186180
fmt.Printf("error in do async update request is %v", err)
@@ -210,10 +204,7 @@ func ExampleUpsertRequest() {
210204
req = tarantool.NewUpsertRequest("test").
211205
Tuple([]interface{}{uint(1113), "second", "second"}).
212206
Operations(tarantool.NewOperations().Assign(2, "updated"))
213-
fut, err := conn.DoAsync(req)
214-
if err != nil {
215-
fmt.Printf("error in do async upsert request is %v", err)
216-
}
207+
fut := conn.DoAsync(req)
217208
resp, err = fut.Get()
218209
if err != nil {
219210
fmt.Printf("error in do async upsert request is %v", err)

future.go

+9-37
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,19 @@ package tarantool
33
import (
44
"sync"
55
"time"
6-
7-
"gopkg.in/vmihailenco/msgpack.v2"
86
)
97

108
// Future is a handle for asynchronous request.
119
type Future struct {
12-
requestId uint32
13-
requestCode int32
14-
timeout time.Duration
15-
mutex sync.Mutex
16-
pushes []*Response
17-
resp *Response
18-
err error
19-
ready chan struct{}
20-
done chan struct{}
21-
next *Future
22-
}
23-
24-
func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) {
25-
rid := fut.requestId
26-
hl := h.Len()
27-
h.Write([]byte{
28-
0xce, 0, 0, 0, 0, // Length.
29-
0x82, // 2 element map.
30-
KeyCode, byte(fut.requestCode), // Request code.
31-
KeySync, 0xce,
32-
byte(rid >> 24), byte(rid >> 16),
33-
byte(rid >> 8), byte(rid),
34-
})
35-
36-
if err = body(enc); err != nil {
37-
return
38-
}
39-
40-
l := uint32(h.Len() - 5 - hl)
41-
h.b[hl+1] = byte(l >> 24)
42-
h.b[hl+2] = byte(l >> 16)
43-
h.b[hl+3] = byte(l >> 8)
44-
h.b[hl+4] = byte(l)
45-
46-
return
10+
requestId uint32
11+
next *Future
12+
timeout time.Duration
13+
mutex sync.Mutex
14+
pushes []*Response
15+
resp *Response
16+
err error
17+
ready chan struct{}
18+
done chan struct{}
4719
}
4820

4921
func (fut *Future) wait() {

multi/multi.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,6 @@ func (connMulti *ConnectionMulti) DoTyped(req tarantool.Request, result interfac
493493
}
494494

495495
// DoAsync sends the request and returns a future.
496-
func (connMulti *ConnectionMulti) DoAsync(req tarantool.Request) (*tarantool.Future, error) {
496+
func (connMulti *ConnectionMulti) DoAsync(req tarantool.Request) *tarantool.Future {
497497
return connMulti.getCurrentConnection().DoAsync(req)
498498
}

0 commit comments

Comments
 (0)