Skip to content

Commit 6e11534

Browse files
committed
api: proposal to add the context support
This patch adds the support of using context in API. The proposed API is based on using request objects. Added tests that cover almost all cases of using the context in a query. Added benchamrk tests are equivalent to other, that use the same query but without any context. Closes #48
1 parent e1bb59c commit 6e11534

File tree

5 files changed

+470
-56
lines changed

5 files changed

+470
-56
lines changed

connection.go

+143-56
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package tarantool
55
import (
66
"bufio"
77
"bytes"
8+
"context"
89
"errors"
910
"fmt"
1011
"io"
@@ -125,8 +126,11 @@ type Connection struct {
125126
c net.Conn
126127
mutex sync.Mutex
127128
// Schema contains schema loaded on connection.
128-
Schema *Schema
129+
Schema *Schema
130+
// requestId contains the last request ID for requests with nil context.
129131
requestId uint32
132+
// contextRequestId contains the last request ID for requests with context.
133+
contextRequestId uint32
130134
// Greeting contains first message sent by Tarantool.
131135
Greeting *Greeting
132136

@@ -143,16 +147,57 @@ type Connection struct {
143147

144148
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
145149

150+
type futureList struct {
151+
first *Future
152+
last **Future
153+
}
154+
155+
func (list *futureList) findFuture(reqid uint32, fetch bool) *Future {
156+
root := &list.first
157+
for {
158+
fut := *root
159+
if fut == nil {
160+
return nil
161+
}
162+
if fut.requestId == reqid {
163+
if fetch {
164+
*root = fut.next
165+
if fut.next == nil {
166+
list.last = root
167+
} else {
168+
fut.next = nil
169+
}
170+
}
171+
return fut
172+
}
173+
root = &fut.next
174+
}
175+
}
176+
177+
func (list *futureList) addFuture(fut *Future) {
178+
*list.last = fut
179+
list.last = &fut.next
180+
}
181+
182+
func (list *futureList) clear(err error, conn *Connection) {
183+
fut := list.first
184+
list.first = nil
185+
list.last = &list.first
186+
for fut != nil {
187+
fut.SetError(err)
188+
conn.markDone(fut)
189+
fut, fut.next = fut.next, nil
190+
}
191+
}
192+
146193
type connShard struct {
147-
rmut sync.Mutex
148-
requests [requestsMap]struct {
149-
first *Future
150-
last **Future
151-
}
152-
bufmut sync.Mutex
153-
buf smallWBuf
154-
enc *msgpack.Encoder
155-
_pad [16]uint64 //nolint: unused,structcheck
194+
rmut sync.Mutex
195+
requests [requestsMap]futureList
196+
requestsWithCtx [requestsMap]futureList
197+
bufmut sync.Mutex
198+
buf smallWBuf
199+
enc *msgpack.Encoder
200+
_pad [16]uint64 //nolint: unused,structcheck
156201
}
157202

158203
// Greeting is a message sent by Tarantool on connect.
@@ -262,12 +307,13 @@ type SslOpts struct {
262307
// and will not finish to make attempts on authorization failures.
263308
func Connect(addr string, opts Opts) (conn *Connection, err error) {
264309
conn = &Connection{
265-
addr: addr,
266-
requestId: 0,
267-
Greeting: &Greeting{},
268-
control: make(chan struct{}),
269-
opts: opts,
270-
dec: msgpack.NewDecoder(&smallBuf{}),
310+
addr: addr,
311+
requestId: 0,
312+
contextRequestId: 1,
313+
Greeting: &Greeting{},
314+
control: make(chan struct{}),
315+
opts: opts,
316+
dec: msgpack.NewDecoder(&smallBuf{}),
271317
}
272318
maxprocs := uint32(runtime.GOMAXPROCS(-1))
273319
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
@@ -283,8 +329,11 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
283329
conn.shard = make([]connShard, conn.opts.Concurrency)
284330
for i := range conn.shard {
285331
shard := &conn.shard[i]
286-
for j := range shard.requests {
287-
shard.requests[j].last = &shard.requests[j].first
332+
requestsLists := []*[requestsMap]futureList{&shard.requests, &shard.requestsWithCtx}
333+
for _, requests := range requestsLists {
334+
for j := range requests {
335+
shard.requests[j].last = &shard.requests[j].first
336+
}
288337
}
289338
}
290339

@@ -387,6 +436,14 @@ func (conn *Connection) Handle() interface{} {
387436
return conn.opts.Handle
388437
}
389438

439+
func (conn *Connection) cancelFuture(fut *Future, err error) error {
440+
if fut = conn.fetchFuture(fut.requestId); fut != nil {
441+
fut.SetError(err)
442+
conn.markDone(fut)
443+
}
444+
return nil
445+
}
446+
390447
func (conn *Connection) dial() (err error) {
391448
var connection net.Conn
392449
network := "tcp"
@@ -580,15 +637,10 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
580637
}
581638
for i := range conn.shard {
582639
conn.shard[i].buf.Reset()
583-
requests := &conn.shard[i].requests
584-
for pos := range requests {
585-
fut := requests[pos].first
586-
requests[pos].first = nil
587-
requests[pos].last = &requests[pos].first
588-
for fut != nil {
589-
fut.SetError(neterr)
590-
conn.markDone(fut)
591-
fut, fut.next = fut.next, nil
640+
requestsLists := []*[requestsMap]futureList{&conn.shard[i].requests, &conn.shard[i].requestsWithCtx}
641+
for _, requests := range requestsLists {
642+
for pos := range requests {
643+
requests[pos].clear(neterr, conn)
592644
}
593645
}
594646
}
@@ -721,7 +773,7 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
721773
}
722774
}
723775

724-
func (conn *Connection) newFuture() (fut *Future) {
776+
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
725777
fut = NewFuture()
726778
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
727779
select {
@@ -736,7 +788,7 @@ func (conn *Connection) newFuture() (fut *Future) {
736788
return
737789
}
738790
}
739-
fut.requestId = conn.nextRequestId()
791+
fut.requestId = conn.nextRequestId(ctx != nil)
740792
shardn := fut.requestId & (conn.opts.Concurrency - 1)
741793
shard := &conn.shard[shardn]
742794
shard.rmut.Lock()
@@ -761,11 +813,20 @@ func (conn *Connection) newFuture() (fut *Future) {
761813
return
762814
}
763815
pos := (fut.requestId / conn.opts.Concurrency) & (requestsMap - 1)
764-
pair := &shard.requests[pos]
765-
*pair.last = fut
766-
pair.last = &fut.next
767-
if conn.opts.Timeout > 0 {
768-
fut.timeout = time.Since(epoch) + conn.opts.Timeout
816+
if ctx != nil {
817+
select {
818+
case <-ctx.Done():
819+
fut.SetError(fmt.Errorf("context is done"))
820+
shard.rmut.Unlock()
821+
return
822+
default:
823+
}
824+
shard.requestsWithCtx[pos].addFuture(fut)
825+
} else {
826+
shard.requests[pos].addFuture(fut)
827+
if conn.opts.Timeout > 0 {
828+
fut.timeout = time.Since(epoch) + conn.opts.Timeout
829+
}
769830
}
770831
shard.rmut.Unlock()
771832
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitWait {
@@ -785,12 +846,40 @@ func (conn *Connection) newFuture() (fut *Future) {
785846
return
786847
}
787848

849+
func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
850+
select {
851+
case <-fut.done:
852+
default:
853+
select {
854+
case <-ctx.Done():
855+
conn.cancelFuture(fut, fmt.Errorf("context is done"))
856+
default:
857+
select {
858+
case <-fut.done:
859+
case <-ctx.Done():
860+
conn.cancelFuture(fut, fmt.Errorf("context is done"))
861+
}
862+
}
863+
}
864+
}
865+
788866
func (conn *Connection) send(req Request) *Future {
789-
fut := conn.newFuture()
867+
fut := conn.newFuture(req.Ctx())
790868
if fut.ready == nil {
791869
return fut
792870
}
871+
if req.Ctx() != nil {
872+
select {
873+
case <-req.Ctx().Done():
874+
conn.cancelFuture(fut, fmt.Errorf("context is done"))
875+
return fut
876+
default:
877+
}
878+
}
793879
conn.putFuture(fut, req)
880+
if req.Ctx() != nil {
881+
go conn.contextWatchdog(fut, req.Ctx())
882+
}
794883
return fut
795884
}
796885

@@ -877,25 +966,10 @@ func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
877966
func (conn *Connection) getFutureImp(reqid uint32, fetch bool) *Future {
878967
shard := &conn.shard[reqid&(conn.opts.Concurrency-1)]
879968
pos := (reqid / conn.opts.Concurrency) & (requestsMap - 1)
880-
pair := &shard.requests[pos]
881-
root := &pair.first
882-
for {
883-
fut := *root
884-
if fut == nil {
885-
return nil
886-
}
887-
if fut.requestId == reqid {
888-
if fetch {
889-
*root = fut.next
890-
if fut.next == nil {
891-
pair.last = root
892-
} else {
893-
fut.next = nil
894-
}
895-
}
896-
return fut
897-
}
898-
root = &fut.next
969+
if reqid%2 == 0 {
970+
return shard.requests[pos].findFuture(reqid, fetch)
971+
} else {
972+
return shard.requestsWithCtx[pos].findFuture(reqid, fetch)
899973
}
900974
}
901975

@@ -984,8 +1058,12 @@ func (conn *Connection) read(r io.Reader) (response []byte, err error) {
9841058
return
9851059
}
9861060

987-
func (conn *Connection) nextRequestId() (requestId uint32) {
988-
return atomic.AddUint32(&conn.requestId, 1)
1061+
func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
1062+
if context {
1063+
return atomic.AddUint32(&conn.contextRequestId, 2)
1064+
} else {
1065+
return atomic.AddUint32(&conn.requestId, 2)
1066+
}
9891067
}
9901068

9911069
// Do performs a request asynchronously on the connection.
@@ -1000,6 +1078,15 @@ func (conn *Connection) Do(req Request) *Future {
10001078
return fut
10011079
}
10021080
}
1081+
if req.Ctx() != nil {
1082+
select {
1083+
case <-req.Ctx().Done():
1084+
fut := NewFuture()
1085+
fut.SetError(fmt.Errorf("context is done"))
1086+
return fut
1087+
default:
1088+
}
1089+
}
10031090
return conn.send(req)
10041091
}
10051092

prepared.go

+19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tarantool
22

33
import (
4+
"context"
45
"fmt"
56

67
"gopkg.in/vmihailenco/msgpack.v2"
@@ -58,6 +59,12 @@ func (req *PrepareRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error
5859
return fillPrepare(enc, req.expr)
5960
}
6061

62+
// Context sets a passed context to the request.
63+
func (req *PrepareRequest) Context(ctx context.Context) *PrepareRequest {
64+
req.ctx = ctx
65+
return req
66+
}
67+
6168
// UnprepareRequest helps you to create an unprepare request object for
6269
// execution by a Connection.
6370
type UnprepareRequest struct {
@@ -83,6 +90,12 @@ func (req *UnprepareRequest) Body(res SchemaResolver, enc *msgpack.Encoder) erro
8390
return fillUnprepare(enc, *req.stmt)
8491
}
8592

93+
// Context sets a passed context to the request.
94+
func (req *UnprepareRequest) Context(ctx context.Context) *UnprepareRequest {
95+
req.ctx = ctx
96+
return req
97+
}
98+
8699
// ExecutePreparedRequest helps you to create an execute prepared request
87100
// object for execution by a Connection.
88101
type ExecutePreparedRequest struct {
@@ -117,6 +130,12 @@ func (req *ExecutePreparedRequest) Body(res SchemaResolver, enc *msgpack.Encoder
117130
return fillExecutePrepared(enc, *req.stmt, req.args)
118131
}
119132

133+
// Context sets a passed context to the request.
134+
func (req *ExecutePreparedRequest) Context(ctx context.Context) *ExecutePreparedRequest {
135+
req.ctx = ctx
136+
return req
137+
}
138+
120139
func fillPrepare(enc *msgpack.Encoder, expr string) error {
121140
enc.EncodeMapLen(1)
122141
enc.EncodeUint64(KeySQLText)

0 commit comments

Comments
 (0)