Skip to content

api: add ability to mock connections in tests #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ jobs:
- macos-12
tarantool:
- brew
- 1.10.14
- 1.10.15

env:
# Make sense only for non-brew jobs.
Expand Down
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
in requests instead of their IDs.
- `GetSchema` function to get the actual schema (#7)
- Support connection via an existing socket fd (#321)
- `Header` struct for the response header (#237). It can be accessed via
`Header()` method of the `Response` interface.
- `Response` method added to the `Request` interface (#237)
- New `LogAppendPushFailed` connection log constant (#237).
It is logged when connection fails to append a push response.
- `ErrorNo` constant that indicates that no error has occurred while getting
the response (#237)
- Ability to mock connections for tests (#237). Added new types `MockDoer`,
`MockRequest` to `test_helpers`.

### Changed

Expand Down Expand Up @@ -67,6 +76,23 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
it (#321)
- Rename `pool.GetPoolInfo` to `pool.GetInfo`. Change return type to
`map[string]ConnectionInfo` (#321)
- `Response` is now an interface (#237)
- All responses are now implementations of the `Response` interface (#237).
`SelectResponse`, `ExecuteResponse`, `PrepareResponse`, `PushResponse` are part
of a public API. `Pos()`, `MetaData()`, `SQLInfo()` methods created for them
to get specific info.
Special types of responses are used with special requests.
- `IsPush()` method is added to the response iterator (#237). It returns
the information if the current response is a `PushResponse`.
`PushCode` constant is removed.
- Method `Get` for `Future` now returns response data (#237). To get the actual
response new `GetResponse` method has been added. Methods `AppendPush` and
`SetResponse` accept response `Header` and data as their arguments.
- `Future` constructors now accept `Request` as their argument (#237)
- Operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`,
`Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Connector` and `Pooler`
return response data instead of an actual responses (#237)
- Renamed `StrangerResponse` to `MockResponse` (#237)

### Deprecated

Expand All @@ -89,6 +115,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- IPROTO constants (#158)
- Code() method from the Request interface (#158)
- `Schema` field from the `Connection` struct (#7)
- `OkCode` and `PushCode` constants (#237)

### Fixed

Expand Down
51 changes: 46 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ func main() {
if err != nil {
fmt.Println("Connection refused:", err)
}
resp, err := conn.Do(tarantool.NewInsertRequest(999).
data, err := conn.Do(tarantool.NewInsertRequest(999).
Tuple([]interface{}{99999, "BB"}),
).Get()
if err != nil {
fmt.Println("Error", err)
fmt.Println("Code", resp.Code)
} else {
fmt.Printf("Data: %v", data)
}
}
```
Expand Down Expand Up @@ -201,12 +202,19 @@ The subpackage has been deleted. You could use `pool` instead.
unique string ID, which allows them to be distinguished.
* `pool.GetPoolInfo` has been renamed to `pool.GetInfo`. Return type has been changed
to `map[string]ConnectionInfo`.
* Operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`,
`Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Pooler` return
response data instead of an actual responses.

#### crud package

* `crud` operations `Timeout` option has `crud.OptFloat64` type
instead of `crud.OptUint`.

#### test_helpers package

Renamed `StrangerResponse` to `MockResponse`.

#### msgpack.v5

Most function names and argument types in `msgpack.v5` and `msgpack.v2`
Expand Down Expand Up @@ -241,7 +249,11 @@ of the requests is an array instead of array of arrays.

#### IPROTO constants

IPROTO constants have been moved to a separate package [go-iproto](https://github.com/tarantool/go-iproto).
* IPROTO constants have been moved to a separate package [go-iproto](https://github.com/tarantool/go-iproto).
* `PushCode` constant is removed. To check whether the current response is
a push response, use `IsPush()` method of the response iterator instead.
* `ErrorNo` constant is added to indicate that no error has occurred while
getting the response. It should be used instead of the removed `OkCode`.

#### Request changes

Expand All @@ -254,6 +266,35 @@ longer accept `ops` argument (operations) as an `interface{}`. `*Operations`
needs to be passed instead.
* `UpdateRequest` and `UpsertRequest` structs no longer accept `interface{}`
for an `ops` field. `*Operations` needs to be used instead.
* `Response` method added to the `Request` interface.

#### Response changes

* `Response` is now an interface.
* Response header stored in a new `Header` struct. It could be accessed via
`Header()` method.
* `ResponseIterator` interface now has `IsPush()` method.
It returns true if the current response is a push response.
* For each request type, a different response type is created. They all
implement a `Response` interface. `SelectResponse`, `PrepareResponse`,
`ExecuteResponse`, `PushResponse` are a part of a public API.
`Pos()`, `MetaData()`, `SQLInfo()` methods created for them to get specific info.
Special types of responses are used with special requests.

#### Future changes

* Method `Get` now returns response data instead of the actual response.
* New method `GetResponse` added to get an actual response.
* `Future` constructors now accept `Request` as their argument.
* Methods `AppendPush` and `SetResponse` accepts response `Header` and data
as their arguments.

#### Connector changes

* Operations `Ping`, `Select`, `Insert`, `Replace`, `Delete`, `Update`, `Upsert`,
`Call`, `Call16`, `Call17`, `Eval`, `Execute` of a `Connector` return
response data instead of an actual responses.
* New interface `Doer` is added as a child-interface instead of a `Do` method.

#### Connect function

Expand All @@ -270,8 +311,8 @@ for creating a connection are now stored in corresponding `Dialer`, not in `Opts
#### Connection schema

* Removed `Schema` field from the `Connection` struct. Instead, new
`GetSchema(Connector)` function was added to get the actual connection
schema on demand.
`GetSchema(Doer)` function was added to get the actual connection
schema on demand.
* `OverrideSchema(*Schema)` method replaced with the `SetSchema(Schema)`.

#### Protocol changes
Expand Down
17 changes: 8 additions & 9 deletions box_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,11 @@ func TestErrorTypeEval(t *testing.T) {

for name, testcase := range tupleCases {
t.Run(name, func(t *testing.T) {
resp, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
data, err := conn.Eval("return ...", []interface{}{&testcase.tuple.val})
require.Nil(t, err)
require.NotNil(t, resp.Data)
require.Equal(t, len(resp.Data), 1)
actual, ok := resp.Data[0].(*BoxError)
require.NotNil(t, data)
require.Equal(t, len(data), 1)
actual, ok := data[0].(*BoxError)
require.Truef(t, ok, "Response data has valid type")
require.Equal(t, testcase.tuple.val, *actual)
})
Expand Down Expand Up @@ -436,15 +436,14 @@ func TestErrorTypeSelect(t *testing.T) {
_, err := conn.Eval(insertEval, []interface{}{})
require.Nilf(t, err, "Tuple has been successfully inserted")

var resp *Response
var offset uint32 = 0
var limit uint32 = 1
resp, err = conn.Select(space, index, offset, limit, IterEq,
data, err := conn.Select(space, index, offset, limit, IterEq,
[]interface{}{testcase.tuple.pk})
require.Nil(t, err)
require.NotNil(t, resp.Data)
require.Equalf(t, len(resp.Data), 1, "Exactly one tuple had been found")
tpl, ok := resp.Data[0].([]interface{})
require.NotNil(t, data)
require.Equalf(t, len(data), 1, "Exactly one tuple had been found")
tpl, ok := data[0].([]interface{})
require.Truef(t, ok, "Tuple has valid type")
require.Equal(t, testcase.tuple.pk, tpl[0])
actual, ok := tpl[1].(*BoxError)
Expand Down
54 changes: 34 additions & 20 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
LogUnexpectedResultId
// LogWatchEventReadFailed is logged when failed to read a watch event.
LogWatchEventReadFailed
// LogAppendPushFailed is logged when failed to append a push response.
LogAppendPushFailed
)

// ConnEvent is sent throw Notify channel specified in Opts.
Expand Down Expand Up @@ -97,12 +99,15 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
log.Printf("tarantool: last reconnect to %s failed: %s, giving it up",
conn.Addr(), err)
case LogUnexpectedResultId:
resp := v[0].(*Response)
header := v[0].(Header)
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response",
conn.Addr(), resp.RequestId)
conn.Addr(), header.RequestId)
case LogWatchEventReadFailed:
err := v[0].(error)
log.Printf("tarantool: unable to parse watch event: %s", err)
case LogAppendPushFailed:
err := v[0].(error)
log.Printf("tarantool: unable to append a push response: %s", err)
default:
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
log.Print(args...)
Expand Down Expand Up @@ -807,8 +812,8 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
conn.reconnect(err, c)
return
}
resp := &Response{buf: smallBuf{b: respBytes}}
err = resp.decodeHeader(conn.dec)
buf := smallBuf{b: respBytes}
header, code, err := decodeHeader(conn.dec, &buf)
if err != nil {
err = ClientError{
ErrProtocolError,
Expand All @@ -819,8 +824,8 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
}

var fut *Future = nil
if iproto.Type(resp.Code) == iproto.IPROTO_EVENT {
if event, err := readWatchEvent(&resp.buf); err == nil {
if code == iproto.IPROTO_EVENT {
if event, err := readWatchEvent(&buf); err == nil {
events <- event
} else {
err = ClientError{
Expand All @@ -830,19 +835,27 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
conn.opts.Logger.Report(LogWatchEventReadFailed, conn, err)
}
continue
} else if resp.Code == PushCode {
if fut = conn.peekFuture(resp.RequestId); fut != nil {
fut.AppendPush(resp)
} else if code == iproto.IPROTO_CHUNK {
if fut = conn.peekFuture(header.RequestId); fut != nil {
if err := fut.AppendPush(header, &buf); err != nil {
err = ClientError{
ErrProtocolError,
fmt.Sprintf("failed to append push response: %s", err),
}
conn.opts.Logger.Report(LogAppendPushFailed, conn, err)
}
}
} else {
if fut = conn.fetchFuture(resp.RequestId); fut != nil {
fut.SetResponse(resp)
if fut = conn.fetchFuture(header.RequestId); fut != nil {
if err := fut.SetResponse(header, &buf); err != nil {
fut.SetError(fmt.Errorf("failed to set response: %w", err))
}
conn.markDone(fut)
}
}

if fut == nil {
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
conn.opts.Logger.Report(LogUnexpectedResultId, conn, header)
}
}
}
Expand Down Expand Up @@ -872,8 +885,9 @@ func (conn *Connection) eventer(events <-chan connWatchEvent) {
}
}

func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
fut = NewFuture()
func (conn *Connection) newFuture(req Request) (fut *Future) {
ctx := req.Ctx()
fut = NewFuture(req)
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
select {
case conn.rlimit <- struct{}{}:
Expand Down Expand Up @@ -983,7 +997,7 @@ func (conn *Connection) decrementRequestCnt() {
func (conn *Connection) send(req Request, streamId uint64) *Future {
conn.incrementRequestCnt()

fut := conn.newFuture(req.Ctx())
fut := conn.newFuture(req)
if fut.ready == nil {
conn.decrementRequestCnt()
return fut
Expand Down Expand Up @@ -1052,11 +1066,11 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {

if req.Async() {
if fut = conn.fetchFuture(reqid); fut != nil {
resp := &Response{
header := Header{
RequestId: reqid,
Code: OkCode,
Error: ErrorNo,
}
fut.SetResponse(resp)
fut.SetResponse(header, nil)
conn.markDone(fut)
}
}
Expand Down Expand Up @@ -1202,7 +1216,7 @@ func (conn *Connection) nextRequestId(context bool) (requestId uint32) {
func (conn *Connection) Do(req Request) *Future {
if connectedReq, ok := req.(ConnectedRequest); ok {
if connectedReq.Conn() != conn {
fut := NewFuture()
fut := NewFuture(req)
fut.SetError(errUnknownRequest)
return fut
}
Expand Down Expand Up @@ -1236,7 +1250,7 @@ func (conn *Connection) SetSchema(s Schema) {
// NewPrepared passes a sql statement to Tarantool for preparation synchronously.
func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
req := NewPrepareRequest(expr)
resp, err := conn.Do(req).Get()
resp, err := conn.Do(req).GetResponse()
if err != nil {
return nil, err
}
Expand Down
32 changes: 19 additions & 13 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,58 @@ package tarantool

import "time"

// Doer is an interface that performs requests asynchronously.
type Doer interface {
// Do performs a request asynchronously.
Do(req Request) (fut *Future)
}

type Connector interface {
Doer
ConnectedNow() bool
Close() error
ConfiguredTimeout() time.Duration
NewPrepared(expr string) (*Prepared, error)
NewStream() (*Stream, error)
NewWatcher(key string, callback WatchCallback) (Watcher, error)
Do(req Request) (fut *Future)

// Deprecated: the method will be removed in the next major version,
// use a PingRequest object + Do() instead.
Ping() (*Response, error)
Ping() ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a SelectRequest object + Do() instead.
Select(space, index interface{}, offset, limit uint32, iterator Iter,
key interface{}) (*Response, error)
key interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use an InsertRequest object + Do() instead.
Insert(space interface{}, tuple interface{}) (*Response, error)
Insert(space interface{}, tuple interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a ReplicaRequest object + Do() instead.
Replace(space interface{}, tuple interface{}) (*Response, error)
Replace(space interface{}, tuple interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a DeleteRequest object + Do() instead.
Delete(space, index interface{}, key interface{}) (*Response, error)
Delete(space, index interface{}, key interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a UpdateRequest object + Do() instead.
Update(space, index interface{}, key interface{}, ops *Operations) (*Response, error)
Update(space, index interface{}, key interface{}, ops *Operations) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a UpsertRequest object + Do() instead.
Upsert(space interface{}, tuple interface{}, ops *Operations) (*Response, error)
Upsert(space interface{}, tuple interface{}, ops *Operations) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a CallRequest object + Do() instead.
Call(functionName string, args interface{}) (*Response, error)
Call(functionName string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a Call16Request object + Do() instead.
Call16(functionName string, args interface{}) (*Response, error)
Call16(functionName string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use a Call17Request object + Do() instead.
Call17(functionName string, args interface{}) (*Response, error)
Call17(functionName string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use an EvalRequest object + Do() instead.
Eval(expr string, args interface{}) (*Response, error)
Eval(expr string, args interface{}) ([]interface{}, error)
// Deprecated: the method will be removed in the next major version,
// use an ExecuteRequest object + Do() instead.
Execute(expr string, args interface{}) (*Response, error)
Execute(expr string, args interface{}) ([]interface{}, error)

// Deprecated: the method will be removed in the next major version,
// use a SelectRequest object + Do() instead.
Expand Down
Loading