Skip to content

Commit c01e004

Browse files
committed
api: proposal to add the context support
This patch adds the support of using context in API. The proposed API is based on using request objects. Added tests that cover almost all cases of using the context in a query. Added benchamrk tests are equivalent to other, that use the same query but without any context. Added a new interface for canceling Future objects from the common queue. Closes #48
1 parent daae235 commit c01e004

13 files changed

+421
-80
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1313
- SSL support (#155)
1414
- IPROTO_PUSH messages support (#67)
1515
- Public API with request object types (#126)
16+
- Context support for request objects (#48)
1617

1718
### Changed
1819

connection.go

+32-10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package tarantool
55
import (
66
"bufio"
77
"bytes"
8+
"context"
89
"errors"
910
"fmt"
1011
"io"
@@ -139,6 +140,8 @@ type Connection struct {
139140
state uint32
140141
dec *msgpack.Decoder
141142
lenbuf [PacketLengthBytes]byte
143+
144+
futCanceler futureCanceler
142145
}
143146

144147
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -334,6 +337,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
334337
return nil, err
335338
}
336339
}
340+
conn.futCanceler = newFutCancelerImpl(conn.cancelFuture)
337341

338342
return conn, err
339343
}
@@ -387,6 +391,16 @@ func (conn *Connection) Handle() interface{} {
387391
return conn.opts.Handle
388392
}
389393

394+
// cancelFuture removes the passed future from the internal queue of Future objects.
395+
func (conn *Connection) cancelFuture(fut *Future, err error) error {
396+
if fut == nil {
397+
return fmt.Errorf("passed nil future")
398+
}
399+
fut.SetError(err)
400+
conn.fetchFuture(fut.requestId)
401+
return nil
402+
}
403+
390404
func (conn *Connection) dial() (err error) {
391405
var connection net.Conn
392406
network := "tcp"
@@ -721,8 +735,8 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
721735
}
722736
}
723737

724-
func (conn *Connection) newFuture() (fut *Future) {
725-
fut = NewFuture()
738+
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
739+
fut = NewFuture(ctx, conn.futCanceler)
726740
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
727741
select {
728742
case conn.rlimit <- struct{}{}:
@@ -785,8 +799,8 @@ func (conn *Connection) newFuture() (fut *Future) {
785799
return
786800
}
787801

788-
func (conn *Connection) send(req Request) *Future {
789-
fut := conn.newFuture()
802+
func (conn *Connection) send(ctx context.Context, req Request) *Future {
803+
fut := conn.newFuture(ctx)
790804
if fut.ready == nil {
791805
return fut
792806
}
@@ -992,26 +1006,34 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
9921006
//
9931007
// An error is returned if the request was formed incorrectly, or failure to
9941008
// communicate by the connection, or unable to decode the response.
995-
func (conn *Connection) Do(req Request) (*Response, error) {
996-
fut := conn.DoAsync(req)
1009+
func (conn *Connection) Do(ctx context.Context, req Request) (*Response, error) {
1010+
fut := conn.DoAsync(ctx, req)
9971011
return fut.Get()
9981012
}
9991013

10001014
// DoTyped verifies, sends the request and fills the typed result.
10011015
//
10021016
// An error is returned if the request was formed incorrectly, or failure to
10031017
// communicate by the connection, or unable to decode the response.
1004-
func (conn *Connection) DoTyped(req Request, result interface{}) error {
1005-
fut := conn.DoAsync(req)
1018+
func (conn *Connection) DoTyped(ctx context.Context, req Request, result interface{}) error {
1019+
fut := conn.DoAsync(ctx, req)
10061020
return fut.GetTyped(result)
10071021
}
10081022

10091023
// DoAsync verifies, sends the request and returns a future.
10101024
//
10111025
// An error is returned if the request was formed incorrectly, or failure to
10121026
// create the future.
1013-
func (conn *Connection) DoAsync(req Request) *Future {
1014-
return conn.send(req)
1027+
func (conn *Connection) DoAsync(ctx context.Context, req Request) *Future {
1028+
if ctx == nil {
1029+
return NewErrorFuture(errors.New("passed nil context"))
1030+
}
1031+
select {
1032+
case <-ctx.Done():
1033+
return NewErrorFuture(errors.New("context is done"))
1034+
default:
1035+
}
1036+
return conn.send(ctx, req)
10151037
}
10161038

10171039
// ConfiguredTimeout returns a timeout from connection config.

connection_pool/connection_pool.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package connection_pool
1212

1313
import (
14+
"context"
1415
"errors"
1516
"log"
1617
"sync/atomic"
@@ -483,33 +484,33 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
483484
}
484485

485486
// Do sends the request and returns a response.
486-
func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) (*tarantool.Response, error) {
487+
func (connPool *ConnectionPool) Do(ctx context.Context, req tarantool.Request, userMode Mode) (*tarantool.Response, error) {
487488
conn, err := connPool.getNextConnection(userMode)
488489
if err != nil {
489490
return nil, err
490491
}
491492

492-
return conn.Do(req)
493+
return conn.Do(ctx, req)
493494
}
494495

495496
// DoTyped sends the request and fills the typed result.
496-
func (connPool *ConnectionPool) DoTyped(req tarantool.Request, result interface{}, userMode Mode) error {
497+
func (connPool *ConnectionPool) DoTyped(ctx context.Context, req tarantool.Request, result interface{}, userMode Mode) error {
497498
conn, err := connPool.getNextConnection(userMode)
498499
if err != nil {
499500
return err
500501
}
501502

502-
return conn.DoTyped(req, result)
503+
return conn.DoTyped(ctx, req, result)
503504
}
504505

505506
// DoAsync sends the request and returns a future.
506-
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) *tarantool.Future {
507+
func (connPool *ConnectionPool) DoAsync(ctx context.Context, req tarantool.Request, userMode Mode) *tarantool.Future {
507508
conn, err := connPool.getNextConnection(userMode)
508509
if err != nil {
509510
return tarantool.NewErrorFuture(err)
510511
}
511512

512-
return conn.DoAsync(req)
513+
return conn.DoAsync(ctx, req)
513514
}
514515

515516
//

connection_pool/connection_pool_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package connection_pool_test
22

33
import (
4+
"context"
45
"log"
56
"os"
67
"strings"
@@ -1266,27 +1267,27 @@ func TestDo(t *testing.T) {
12661267

12671268
req := tarantool.NewPingRequest()
12681269
// ANY
1269-
resp, err := connPool.Do(req, connection_pool.ANY)
1270+
resp, err := connPool.Do(context.Background(), req, connection_pool.ANY)
12701271
require.Nilf(t, err, "failed to Ping")
12711272
require.NotNilf(t, resp, "response is nil after Ping")
12721273

12731274
// RW
1274-
resp, err = connPool.Do(req, connection_pool.RW)
1275+
resp, err = connPool.Do(context.Background(), req, connection_pool.RW)
12751276
require.Nilf(t, err, "failed to Ping")
12761277
require.NotNilf(t, resp, "response is nil after Ping")
12771278

12781279
// RO
1279-
resp, err = connPool.Do(req, connection_pool.RO)
1280+
resp, err = connPool.Do(context.Background(), req, connection_pool.RO)
12801281
require.Nilf(t, err, "failed to Ping")
12811282
require.NotNilf(t, resp, "response is nil after Ping")
12821283

12831284
// PreferRW
1284-
resp, err = connPool.Do(req, connection_pool.PreferRW)
1285+
resp, err = connPool.Do(context.Background(), req, connection_pool.PreferRW)
12851286
require.Nilf(t, err, "failed to Ping")
12861287
require.NotNilf(t, resp, "response is nil after Ping")
12871288

12881289
// PreferRO
1289-
resp, err = connPool.Do(req, connection_pool.PreferRO)
1290+
resp, err = connPool.Do(context.Background(), req, connection_pool.PreferRO)
12901291
require.Nilf(t, err, "failed to Ping")
12911292
require.NotNilf(t, resp, "response is nil after Ping")
12921293
}

connection_pool/example_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package connection_pool_test
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/tarantool/go-tarantool"
@@ -539,7 +540,7 @@ func ExampleConnectionPool_Do() {
539540

540541
// Ping a Tarantool instance to check connection.
541542
req := tarantool.NewPingRequest()
542-
resp, err := pool.Do(req, connection_pool.ANY)
543+
resp, err := pool.Do(context.Background(), req, connection_pool.ANY)
543544
fmt.Println("Ping Code", resp.Code)
544545
fmt.Println("Ping Data", resp.Data)
545546
fmt.Println("Ping Error", err)

connector.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package tarantool
22

3-
import "time"
3+
import (
4+
"context"
5+
"time"
6+
)
47

58
type Connector interface {
69
ConnectedNow() bool
@@ -39,7 +42,7 @@ type Connector interface {
3942
Call17Async(functionName string, args interface{}) *Future
4043
EvalAsync(expr string, args interface{}) *Future
4144

42-
Do(req Request) (resp *Response, err error)
43-
DoTyped(req Request, result interface{}) (err error)
44-
DoAsync(req Request) (fut *Future)
45+
Do(ctx context.Context, req Request) (resp *Response, err error)
46+
DoTyped(ctx context.Context, req Request, result interface{}) (err error)
47+
DoAsync(ctx context.Context, req Request) (fut *Future)
4548
}

example_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tarantool_test
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

@@ -133,7 +134,7 @@ func ExampleSelectRequest() {
133134
req := tarantool.NewSelectRequest(517).
134135
Limit(100).
135136
Key(tarantool.IntKey{1111})
136-
resp, err := conn.Do(req)
137+
resp, err := conn.Do(context.Background(), req)
137138
if err != nil {
138139
fmt.Printf("error in do select request is %v", err)
139140
return
@@ -144,7 +145,7 @@ func ExampleSelectRequest() {
144145
Index("primary").
145146
Limit(100).
146147
Key(tarantool.IntKey{1111})
147-
fut := conn.DoAsync(req)
148+
fut := conn.DoAsync(context.Background(), req)
148149
resp, err = fut.Get()
149150
if err != nil {
150151
fmt.Printf("error in do async select request is %v", err)
@@ -163,7 +164,7 @@ func ExampleUpdateRequest() {
163164
req := tarantool.NewUpdateRequest(517).
164165
Key(tarantool.IntKey{1111}).
165166
Operations(tarantool.NewOperations().Assign(1, "bye"))
166-
resp, err := conn.Do(req)
167+
resp, err := conn.Do(context.Background(), req)
167168
if err != nil {
168169
fmt.Printf("error in do update request is %v", err)
169170
return
@@ -174,7 +175,7 @@ func ExampleUpdateRequest() {
174175
Index("primary").
175176
Key(tarantool.IntKey{1111}).
176177
Operations(tarantool.NewOperations().Assign(1, "hello"))
177-
fut := conn.DoAsync(req)
178+
fut := conn.DoAsync(context.Background(), req)
178179
resp, err = fut.Get()
179180
if err != nil {
180181
fmt.Printf("error in do async update request is %v", err)
@@ -194,7 +195,7 @@ func ExampleUpsertRequest() {
194195
req = tarantool.NewUpsertRequest(517).
195196
Tuple([]interface{}{uint(1113), "first", "first"}).
196197
Operations(tarantool.NewOperations().Assign(1, "updated"))
197-
resp, err := conn.Do(req)
198+
resp, err := conn.Do(context.Background(), req)
198199
if err != nil {
199200
fmt.Printf("error in do select upsert is %v", err)
200201
return
@@ -204,7 +205,7 @@ func ExampleUpsertRequest() {
204205
req = tarantool.NewUpsertRequest("test").
205206
Tuple([]interface{}{uint(1113), "second", "second"}).
206207
Operations(tarantool.NewOperations().Assign(2, "updated"))
207-
fut := conn.DoAsync(req)
208+
fut := conn.DoAsync(context.Background(), req)
208209
resp, err = fut.Get()
209210
if err != nil {
210211
fmt.Printf("error in do async upsert request is %v", err)
@@ -215,7 +216,7 @@ func ExampleUpsertRequest() {
215216
req = tarantool.NewSelectRequest(517).
216217
Limit(100).
217218
Key(tarantool.IntKey{1113})
218-
resp, err = conn.Do(req)
219+
resp, err = conn.Do(context.Background(), req)
219220
if err != nil {
220221
fmt.Printf("error in do select request is %v", err)
221222
return

future.go

+26-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package tarantool
22

33
import (
4+
"context"
5+
"fmt"
46
"sync"
57
"time"
68
)
@@ -16,13 +18,32 @@ type Future struct {
1618
err error
1719
ready chan struct{}
1820
done chan struct{}
21+
ctx context.Context
22+
canceler futureCanceler
1923
}
2024

2125
func (fut *Future) wait() {
2226
if fut.done == nil {
2327
return
2428
}
25-
<-fut.done
29+
if fut.ctx == nil {
30+
<-fut.done
31+
return
32+
}
33+
select {
34+
case <-fut.done:
35+
default:
36+
select {
37+
case <-fut.ctx.Done():
38+
fut.canceler.Cancel(fut, fmt.Errorf("context is done"))
39+
default:
40+
select {
41+
case <-fut.done:
42+
case <-fut.ctx.Done():
43+
fut.canceler.Cancel(fut, fmt.Errorf("context is done"))
44+
}
45+
}
46+
}
2647
}
2748

2849
func (fut *Future) isDone() bool {
@@ -118,8 +139,10 @@ func (it *asyncResponseIterator) nextResponse() (resp *Response) {
118139
}
119140

120141
// NewFuture creates a new empty Future.
121-
func NewFuture() (fut *Future) {
142+
func NewFuture(ctx context.Context, canceler futureCanceler) (fut *Future) {
122143
fut = &Future{}
144+
fut.ctx = ctx
145+
fut.canceler = canceler
123146
fut.ready = make(chan struct{}, 1000000000)
124147
fut.done = make(chan struct{})
125148
fut.pushes = make([]*Response, 0)
@@ -128,7 +151,7 @@ func NewFuture() (fut *Future) {
128151

129152
// NewErrorFuture returns new set empty Future with filled error field.
130153
func NewErrorFuture(err error) *Future {
131-
fut := NewFuture()
154+
fut := NewFuture(context.Background(), nil)
132155
fut.SetError(err)
133156
return fut
134157
}

future_canceler.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package tarantool
2+
3+
type futureCanceler interface {
4+
Cancel(fut *Future, err error) error
5+
}
6+
7+
type futCancelerImpl struct {
8+
cancel func(fut *Future, err error) error
9+
}
10+
11+
func newFutCancelerImpl(cancelFunc func(fut *Future, err error) error) *futCancelerImpl {
12+
return &futCancelerImpl{cancel: cancelFunc}
13+
}
14+
15+
func (fc *futCancelerImpl) Cancel(fut *Future, err error) error {
16+
return fc.cancel(fut, err)
17+
}

0 commit comments

Comments
 (0)