Skip to content

Commit 40302db

Browse files
committed
api: proposal to add the context support
This patch adds the support of using context in API. The proposed API is based on using request objects. Added tests that cover almost all cases of using the context in a query. Added benchamrk tests are equivalent to other, that use the same query but without any context. Closes #48
1 parent daae235 commit 40302db

11 files changed

+359
-67
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1313
- SSL support (#155)
1414
- IPROTO_PUSH messages support (#67)
1515
- Public API with request object types (#126)
16+
- Context support for request objects (#48)
1617

1718
### Changed
1819

connection.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package tarantool
55
import (
66
"bufio"
77
"bytes"
8+
"context"
89
"errors"
910
"fmt"
1011
"io"
@@ -785,8 +786,10 @@ func (conn *Connection) newFuture() (fut *Future) {
785786
return
786787
}
787788

788-
func (conn *Connection) send(req Request) *Future {
789+
func (conn *Connection) send(ctx context.Context, req Request) *Future {
789790
fut := conn.newFuture()
791+
fut.ctx = ctx
792+
fut.conn = conn
790793
if fut.ready == nil {
791794
return fut
792795
}
@@ -992,26 +995,34 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
992995
//
993996
// An error is returned if the request was formed incorrectly, or failure to
994997
// communicate by the connection, or unable to decode the response.
995-
func (conn *Connection) Do(req Request) (*Response, error) {
996-
fut := conn.DoAsync(req)
998+
func (conn *Connection) Do(ctx context.Context, req Request) (*Response, error) {
999+
fut := conn.DoAsync(ctx, req)
9971000
return fut.Get()
9981001
}
9991002

10001003
// DoTyped verifies, sends the request and fills the typed result.
10011004
//
10021005
// An error is returned if the request was formed incorrectly, or failure to
10031006
// communicate by the connection, or unable to decode the response.
1004-
func (conn *Connection) DoTyped(req Request, result interface{}) error {
1005-
fut := conn.DoAsync(req)
1007+
func (conn *Connection) DoTyped(ctx context.Context, req Request, result interface{}) error {
1008+
fut := conn.DoAsync(ctx, req)
10061009
return fut.GetTyped(result)
10071010
}
10081011

10091012
// DoAsync verifies, sends the request and returns a future.
10101013
//
10111014
// An error is returned if the request was formed incorrectly, or failure to
10121015
// create the future.
1013-
func (conn *Connection) DoAsync(req Request) *Future {
1014-
return conn.send(req)
1016+
func (conn *Connection) DoAsync(ctx context.Context, req Request) *Future {
1017+
if ctx == nil {
1018+
return NewErrorFuture(errors.New("passed nil context"))
1019+
}
1020+
select {
1021+
case <-ctx.Done():
1022+
return NewErrorFuture(errors.New("context is done"))
1023+
default:
1024+
}
1025+
return conn.send(ctx, req)
10151026
}
10161027

10171028
// ConfiguredTimeout returns a timeout from connection config.

connection_pool/connection_pool.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package connection_pool
1212

1313
import (
14+
"context"
1415
"errors"
1516
"log"
1617
"sync/atomic"
@@ -483,33 +484,33 @@ func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMod
483484
}
484485

485486
// Do sends the request and returns a response.
486-
func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) (*tarantool.Response, error) {
487+
func (connPool *ConnectionPool) Do(ctx context.Context, req tarantool.Request, userMode Mode) (*tarantool.Response, error) {
487488
conn, err := connPool.getNextConnection(userMode)
488489
if err != nil {
489490
return nil, err
490491
}
491492

492-
return conn.Do(req)
493+
return conn.Do(ctx, req)
493494
}
494495

495496
// DoTyped sends the request and fills the typed result.
496-
func (connPool *ConnectionPool) DoTyped(req tarantool.Request, result interface{}, userMode Mode) error {
497+
func (connPool *ConnectionPool) DoTyped(ctx context.Context, req tarantool.Request, result interface{}, userMode Mode) error {
497498
conn, err := connPool.getNextConnection(userMode)
498499
if err != nil {
499500
return err
500501
}
501502

502-
return conn.DoTyped(req, result)
503+
return conn.DoTyped(ctx, req, result)
503504
}
504505

505506
// DoAsync sends the request and returns a future.
506-
func (connPool *ConnectionPool) DoAsync(req tarantool.Request, userMode Mode) *tarantool.Future {
507+
func (connPool *ConnectionPool) DoAsync(ctx context.Context, req tarantool.Request, userMode Mode) *tarantool.Future {
507508
conn, err := connPool.getNextConnection(userMode)
508509
if err != nil {
509510
return tarantool.NewErrorFuture(err)
510511
}
511512

512-
return conn.DoAsync(req)
513+
return conn.DoAsync(ctx, req)
513514
}
514515

515516
//

connection_pool/connection_pool_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package connection_pool_test
22

33
import (
4+
"context"
45
"log"
56
"os"
67
"strings"
@@ -1266,27 +1267,27 @@ func TestDo(t *testing.T) {
12661267

12671268
req := tarantool.NewPingRequest()
12681269
// ANY
1269-
resp, err := connPool.Do(req, connection_pool.ANY)
1270+
resp, err := connPool.Do(context.Background(), req, connection_pool.ANY)
12701271
require.Nilf(t, err, "failed to Ping")
12711272
require.NotNilf(t, resp, "response is nil after Ping")
12721273

12731274
// RW
1274-
resp, err = connPool.Do(req, connection_pool.RW)
1275+
resp, err = connPool.Do(context.Background(), req, connection_pool.RW)
12751276
require.Nilf(t, err, "failed to Ping")
12761277
require.NotNilf(t, resp, "response is nil after Ping")
12771278

12781279
// RO
1279-
resp, err = connPool.Do(req, connection_pool.RO)
1280+
resp, err = connPool.Do(context.Background(), req, connection_pool.RO)
12801281
require.Nilf(t, err, "failed to Ping")
12811282
require.NotNilf(t, resp, "response is nil after Ping")
12821283

12831284
// PreferRW
1284-
resp, err = connPool.Do(req, connection_pool.PreferRW)
1285+
resp, err = connPool.Do(context.Background(), req, connection_pool.PreferRW)
12851286
require.Nilf(t, err, "failed to Ping")
12861287
require.NotNilf(t, resp, "response is nil after Ping")
12871288

12881289
// PreferRO
1289-
resp, err = connPool.Do(req, connection_pool.PreferRO)
1290+
resp, err = connPool.Do(context.Background(), req, connection_pool.PreferRO)
12901291
require.Nilf(t, err, "failed to Ping")
12911292
require.NotNilf(t, resp, "response is nil after Ping")
12921293
}

connection_pool/example_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package connection_pool_test
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/tarantool/go-tarantool"
@@ -539,7 +540,7 @@ func ExampleConnectionPool_Do() {
539540

540541
// Ping a Tarantool instance to check connection.
541542
req := tarantool.NewPingRequest()
542-
resp, err := pool.Do(req, connection_pool.ANY)
543+
resp, err := pool.Do(context.Background(), req, connection_pool.ANY)
543544
fmt.Println("Ping Code", resp.Code)
544545
fmt.Println("Ping Data", resp.Data)
545546
fmt.Println("Ping Error", err)

connector.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package tarantool
22

3-
import "time"
3+
import (
4+
"context"
5+
"time"
6+
)
47

58
type Connector interface {
69
ConnectedNow() bool
@@ -39,7 +42,7 @@ type Connector interface {
3942
Call17Async(functionName string, args interface{}) *Future
4043
EvalAsync(expr string, args interface{}) *Future
4144

42-
Do(req Request) (resp *Response, err error)
43-
DoTyped(req Request, result interface{}) (err error)
44-
DoAsync(req Request) (fut *Future)
45+
Do(ctx context.Context, req Request) (resp *Response, err error)
46+
DoTyped(ctx context.Context, req Request, result interface{}) (err error)
47+
DoAsync(ctx context.Context, req Request) (fut *Future)
4548
}

example_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package tarantool_test
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

@@ -133,7 +134,7 @@ func ExampleSelectRequest() {
133134
req := tarantool.NewSelectRequest(517).
134135
Limit(100).
135136
Key(tarantool.IntKey{1111})
136-
resp, err := conn.Do(req)
137+
resp, err := conn.Do(context.Background(), req)
137138
if err != nil {
138139
fmt.Printf("error in do select request is %v", err)
139140
return
@@ -144,7 +145,7 @@ func ExampleSelectRequest() {
144145
Index("primary").
145146
Limit(100).
146147
Key(tarantool.IntKey{1111})
147-
fut := conn.DoAsync(req)
148+
fut := conn.DoAsync(context.Background(), req)
148149
resp, err = fut.Get()
149150
if err != nil {
150151
fmt.Printf("error in do async select request is %v", err)
@@ -163,7 +164,7 @@ func ExampleUpdateRequest() {
163164
req := tarantool.NewUpdateRequest(517).
164165
Key(tarantool.IntKey{1111}).
165166
Operations(tarantool.NewOperations().Assign(1, "bye"))
166-
resp, err := conn.Do(req)
167+
resp, err := conn.Do(context.Background(), req)
167168
if err != nil {
168169
fmt.Printf("error in do update request is %v", err)
169170
return
@@ -174,7 +175,7 @@ func ExampleUpdateRequest() {
174175
Index("primary").
175176
Key(tarantool.IntKey{1111}).
176177
Operations(tarantool.NewOperations().Assign(1, "hello"))
177-
fut := conn.DoAsync(req)
178+
fut := conn.DoAsync(context.Background(), req)
178179
resp, err = fut.Get()
179180
if err != nil {
180181
fmt.Printf("error in do async update request is %v", err)
@@ -194,7 +195,7 @@ func ExampleUpsertRequest() {
194195
req = tarantool.NewUpsertRequest(517).
195196
Tuple([]interface{}{uint(1113), "first", "first"}).
196197
Operations(tarantool.NewOperations().Assign(1, "updated"))
197-
resp, err := conn.Do(req)
198+
resp, err := conn.Do(context.Background(), req)
198199
if err != nil {
199200
fmt.Printf("error in do select upsert is %v", err)
200201
return
@@ -204,7 +205,7 @@ func ExampleUpsertRequest() {
204205
req = tarantool.NewUpsertRequest("test").
205206
Tuple([]interface{}{uint(1113), "second", "second"}).
206207
Operations(tarantool.NewOperations().Assign(2, "updated"))
207-
fut := conn.DoAsync(req)
208+
fut := conn.DoAsync(context.Background(), req)
208209
resp, err = fut.Get()
209210
if err != nil {
210211
fmt.Printf("error in do async upsert request is %v", err)
@@ -215,7 +216,7 @@ func ExampleUpsertRequest() {
215216
req = tarantool.NewSelectRequest(517).
216217
Limit(100).
217218
Key(tarantool.IntKey{1113})
218-
resp, err = conn.Do(req)
219+
resp, err = conn.Do(context.Background(), req)
219220
if err != nil {
220221
fmt.Printf("error in do select request is %v", err)
221222
return

future.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package tarantool
22

33
import (
4+
"context"
5+
"fmt"
46
"sync"
57
"time"
68
)
@@ -16,13 +18,34 @@ type Future struct {
1618
err error
1719
ready chan struct{}
1820
done chan struct{}
21+
ctx context.Context
22+
conn *Connection
1923
}
2024

2125
func (fut *Future) wait() {
2226
if fut.done == nil {
2327
return
2428
}
25-
<-fut.done
29+
if fut.ctx == nil {
30+
<-fut.done
31+
return
32+
}
33+
select {
34+
case <-fut.done:
35+
default:
36+
select {
37+
case <-fut.ctx.Done():
38+
fut.conn.fetchFuture(fut.requestId)
39+
fut.SetError(fmt.Errorf("context is done"))
40+
default:
41+
select {
42+
case <-fut.done:
43+
case <-fut.ctx.Done():
44+
fut.conn.fetchFuture(fut.requestId)
45+
fut.SetError(fmt.Errorf("context is done"))
46+
}
47+
}
48+
}
2649
}
2750

2851
func (fut *Future) isDone() bool {

multi/multi.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package multi
1313

1414
import (
15+
"context"
1516
"errors"
1617
"sync"
1718
"sync/atomic"
@@ -456,16 +457,16 @@ func (connMulti *ConnectionMulti) EvalAsync(expr string, args interface{}) *tara
456457
}
457458

458459
// Do sends the request and returns a response.
459-
func (connMulti *ConnectionMulti) Do(req tarantool.Request) (*tarantool.Response, error) {
460-
return connMulti.getCurrentConnection().Do(req)
460+
func (connMulti *ConnectionMulti) Do(ctx context.Context, req tarantool.Request) (*tarantool.Response, error) {
461+
return connMulti.getCurrentConnection().Do(ctx, req)
461462
}
462463

463464
// DoTyped sends the request and fills the typed result.
464-
func (connMulti *ConnectionMulti) DoTyped(req tarantool.Request, result interface{}) error {
465-
return connMulti.getCurrentConnection().DoTyped(req, result)
465+
func (connMulti *ConnectionMulti) DoTyped(ctx context.Context, req tarantool.Request, result interface{}) error {
466+
return connMulti.getCurrentConnection().DoTyped(ctx, req, result)
466467
}
467468

468469
// DoAsync sends the request and returns a future.
469-
func (connMulti *ConnectionMulti) DoAsync(req tarantool.Request) *tarantool.Future {
470-
return connMulti.getCurrentConnection().DoAsync(req)
470+
func (connMulti *ConnectionMulti) DoAsync(ctx context.Context, req tarantool.Request) *tarantool.Future {
471+
return connMulti.getCurrentConnection().DoAsync(ctx, req)
471472
}

0 commit comments

Comments
 (0)