Skip to content

Commit 49cb7f6

Browse files
committed
api: create different responses for different requests
Different implementations of the `Response` interface created. Special types of responses are used with special requests. Thus `Response` interface was simplified: some special methods were moved to the corresponding implementations. This means that if a user wants to access this methods, the response should be casted to its actual type. `SelectResponse`, `ExecuteResponse`, `PrepareResponse`, `PushResponse` are part of a public API. `Pos()`, `MetaData()`, `SQLInfo()` methods created for them to get specific info. `Future` constructors now accept `Request` as their argument. `Future` methods `AppendPush` and `SetResponse` accepts response `Header` and data as their arguments. `IsPush()` method is used to return the information if the current response is a `PushResponse`. `PushCode` constant is removed. To get information, if the current response is a push response, `IsPush()` method could be used instead. Part of #237
1 parent bac0680 commit 49cb7f6

25 files changed

+1408
-743
lines changed

box_error_test.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,7 @@ func TestErrorTypeEval(t *testing.T) {
304304

305305
for name, testcase := range tupleCases {
306306
t.Run(name, func(t *testing.T) {
307-
resp, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
308-
require.Nil(t, err)
309-
data, err := resp.Decode()
307+
data, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
310308
require.Nil(t, err)
311309
require.NotNil(t, data)
312310
require.Equal(t, len(data), 1)
@@ -438,14 +436,11 @@ func TestErrorTypeSelect(t *testing.T) {
438436
_, err := conn.Eval(insertEval, []interface{}{})
439437
require.Nilf(t, err, "Tuple has been successfully inserted")
440438

441-
var resp Response
442439
var offset uint32 = 0
443440
var limit uint32 = 1
444-
resp, err = conn.Select(space, index, offset, limit, IterEq,
441+
data, err := conn.Select(space, index, offset, limit, IterEq,
445442
[]interface{}{testcase.tuple.pk})
446443
require.Nil(t, err)
447-
data, err := resp.Decode()
448-
require.Nil(t, err)
449444
require.NotNil(t, data)
450445
require.Equalf(t, len(data), 1, "Exactly one tuple had been found")
451446
tpl, ok := data[0].([]interface{})

connection.go

+20-16
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
9797
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
9898
conn.Addr(), err)
9999
case LogUnexpectedResultId:
100-
respHeader := v[0].(header)
100+
header := v[0].(Header)
101101
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
102-
conn.Addr(), respHeader.requestId)
102+
conn.Addr(), header.RequestId)
103103
case LogWatchEventReadFailed:
104104
err := v[0].(error)
105105
log.Printf("tarantool: unable to parse watch event: %s", err)
@@ -808,7 +808,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
808808
return
809809
}
810810
buf := smallBuf{b: respBytes}
811-
respHeader, err := decodeHeader(conn.dec, &buf)
811+
header, err := decodeHeader(conn.dec, &buf)
812812
if err != nil {
813813
err = ClientError{
814814
ErrProtocolError,
@@ -818,10 +818,9 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
818818
return
819819
}
820820

821-
resp := &ConnResponse{header: respHeader, buf: buf}
822821
var fut *Future = nil
823-
if iproto.Type(respHeader.code) == iproto.IPROTO_EVENT {
824-
if event, err := readWatchEvent(&resp.buf); err == nil {
822+
if iproto.Type(header.Code) == iproto.IPROTO_EVENT {
823+
if event, err := readWatchEvent(&buf); err == nil {
825824
events <- event
826825
} else {
827826
err = ClientError{
@@ -831,19 +830,19 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
831830
conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
832831
}
833832
continue
834-
} else if respHeader.code == PushCode {
835-
if fut = conn.peekFuture(respHeader.requestId); fut != nil {
836-
fut.AppendPush(resp)
833+
} else if header.Code == PushCode {
834+
if fut = conn.peekFuture(header.RequestId); fut != nil {
835+
fut.AppendPush(header, &buf)
837836
}
838837
} else {
839-
if fut = conn.fetchFuture(respHeader.requestId); fut != nil {
840-
fut.SetResponse(resp)
838+
if fut = conn.fetchFuture(header.RequestId); fut != nil {
839+
fut.SetResponse(header, &buf)
841840
conn.markDone(fut)
842841
}
843842
}
844843

845844
if fut == nil {
846-
conn.opts.Logger.Report(LogUnexpectedResultId, conn, respHeader)
845+
conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
847846
}
848847
}
849848
}
@@ -873,8 +872,10 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
873872
}
874873
}
875874

876-
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
875+
func (conn *Connection) newFuture(req Request) (fut *Future) {
876+
ctx := req.Ctx()
877877
fut = NewFuture()
878+
fut.SetRequest(req)
878879
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
879880
select {
880881
case conn.rlimit <- struct{}{}:
@@ -984,7 +985,7 @@ func (conn *Connection) decrementRequestCnt() {
984985
func (conn *Connection) send(req Request, streamId uint64) *Future {
985986
conn.incrementRequestCnt()
986987

987-
fut := conn.newFuture(req.Ctx())
988+
fut := conn.newFuture(req)
988989
if fut.ready == nil {
989990
conn.decrementRequestCnt()
990991
return fut
@@ -1053,8 +1054,11 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10531054

10541055
if req.Async() {
10551056
if fut = conn.fetchFuture(reqid); fut != nil {
1056-
resp := &ConnResponse{}
1057-
fut.SetResponse(resp)
1057+
header := Header{
1058+
RequestId: reqid,
1059+
Code: OkCode,
1060+
}
1061+
fut.SetResponse(header, nil)
10581062
conn.markDone(fut)
10591063
}
10601064
}

connector.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -13,41 +13,41 @@ type Connector interface {
1313

1414
// Deprecated: the method will be removed in the next major version,
1515
// use a PingRequest object + Do() instead.
16-
Ping() (Response, error)
16+
Ping() ([]interface{}, error)
1717
// Deprecated: the method will be removed in the next major version,
1818
// use a SelectRequest object + Do() instead.
1919
Select(space, index interface{}, offset, limit uint32, iterator Iter,
20-
key interface{}) (Response, error)
20+
key interface{}) ([]interface{}, error)
2121
// Deprecated: the method will be removed in the next major version,
2222
// use an InsertRequest object + Do() instead.
23-
Insert(space interface{}, tuple interface{}) (Response, error)
23+
Insert(space interface{}, tuple interface{}) ([]interface{}, error)
2424
// Deprecated: the method will be removed in the next major version,
2525
// use a ReplicaRequest object + Do() instead.
26-
Replace(space interface{}, tuple interface{}) (Response, error)
26+
Replace(space interface{}, tuple interface{}) ([]interface{}, error)
2727
// Deprecated: the method will be removed in the next major version,
2828
// use a DeleteRequest object + Do() instead.
29-
Delete(space, index interface{}, key interface{}) (Response, error)
29+
Delete(space, index interface{}, key interface{}) ([]interface{}, error)
3030
// Deprecated: the method will be removed in the next major version,
3131
// use a UpdateRequest object + Do() instead.
32-
Update(space, index interface{}, key interface{}, ops *Operations) (Response, error)
32+
Update(space, index interface{}, key interface{}, ops *Operations) ([]interface{}, error)
3333
// Deprecated: the method will be removed in the next major version,
3434
// use a UpsertRequest object + Do() instead.
35-
Upsert(space interface{}, tuple interface{}, ops *Operations) (Response, error)
35+
Upsert(space interface{}, tuple interface{}, ops *Operations) ([]interface{}, error)
3636
// Deprecated: the method will be removed in the next major version,
3737
// use a CallRequest object + Do() instead.
38-
Call(functionName string, args interface{}) (Response, error)
38+
Call(functionName string, args interface{}) ([]interface{}, error)
3939
// Deprecated: the method will be removed in the next major version,
4040
// use a Call16Request object + Do() instead.
41-
Call16(functionName string, args interface{}) (Response, error)
41+
Call16(functionName string, args interface{}) ([]interface{}, error)
4242
// Deprecated: the method will be removed in the next major version,
4343
// use a Call17Request object + Do() instead.
44-
Call17(functionName string, args interface{}) (Response, error)
44+
Call17(functionName string, args interface{}) ([]interface{}, error)
4545
// Deprecated: the method will be removed in the next major version,
4646
// use an EvalRequest object + Do() instead.
47-
Eval(expr string, args interface{}) (Response, error)
47+
Eval(expr string, args interface{}) ([]interface{}, error)
4848
// Deprecated: the method will be removed in the next major version,
4949
// use an ExecuteRequest object + Do() instead.
50-
Execute(expr string, args interface{}) (Response, error)
50+
Execute(expr string, args interface{}) ([]interface{}, error)
5151

5252
// Deprecated: the method will be removed in the next major version,
5353
// use a SelectRequest object + Do() instead.

crud/common.go

+7
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ package crud
5555

5656
import (
5757
"context"
58+
"io"
5859

5960
"github.com/tarantool/go-iproto"
6061

@@ -84,6 +85,12 @@ func (req baseRequest) Async() bool {
8485
return req.impl.Async()
8586
}
8687

88+
// Response creates a response for the baseRequest.
89+
func (req baseRequest) Response(header tarantool.Header,
90+
body io.Reader) (tarantool.Response, error) {
91+
return req.impl.Response(header, body)
92+
}
93+
8794
type spaceRequest struct {
8895
baseRequest
8996
space string

crud/select.go

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package crud
22

33
import (
44
"context"
5+
"io"
56

67
"github.com/vmihailenco/msgpack/v5"
78

@@ -133,3 +134,9 @@ func (req SelectRequest) Context(ctx context.Context) SelectRequest {
133134

134135
return req
135136
}
137+
138+
// Response creates a response for the SelectRequest.
139+
func (req SelectRequest) Response(header tarantool.Header,
140+
body io.Reader) (tarantool.Response, error) {
141+
return req.impl.Response(header, body)
142+
}

dial.go

+8-13
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ import (
1212
"strings"
1313
"time"
1414

15-
"github.com/vmihailenco/msgpack/v5"
16-
1715
"github.com/tarantool/go-iproto"
16+
"github.com/vmihailenco/msgpack/v5"
1817
)
1918

2019
const bufSize = 128 * 1024
@@ -405,15 +404,11 @@ func identify(w writeFlusher, r io.Reader) (ProtocolInfo, error) {
405404
}
406405
data, err := resp.Decode()
407406
if err != nil {
408-
switch err := err.(type) {
409-
case Error:
410-
if err.Code == iproto.ER_UNKNOWN_REQUEST_TYPE {
411-
return info, nil
412-
}
413-
return info, err
414-
default:
415-
return info, fmt.Errorf("decode response body error: %w", err)
407+
if iproto.Error(resp.Header().Code) == iproto.ER_UNKNOWN_REQUEST_TYPE {
408+
// IPROTO_ID requests are not supported by server.
409+
return info, nil
416410
}
411+
return info, err
417412
}
418413

419414
if len(data) == 0 {
@@ -511,12 +506,12 @@ func readResponse(r io.Reader) (Response, error) {
511506

512507
respBytes, err := read(r, lenbuf[:])
513508
if err != nil {
514-
return &ConnResponse{}, fmt.Errorf("read error: %w", err)
509+
return &BaseResponse{}, fmt.Errorf("read error: %w", err)
515510
}
516511

517512
buf := smallBuf{b: respBytes}
518-
respHeader, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf)
519-
resp := &ConnResponse{header: respHeader, buf: buf}
513+
header, err := decodeHeader(msgpack.NewDecoder(&smallBuf{}), &buf)
514+
resp := &BaseResponse{header: header, buf: buf}
520515
if err != nil {
521516
return resp, fmt.Errorf("decode response header error: %w", err)
522517
}

0 commit comments

Comments
 (0)