Skip to content

Commit 2a378a6

Browse files
committed
api: support IPROTO_WATCH_ONCE request type
Add support of `IPROTO_WATCH_ONCE` request type. It works only for Tarantool version >= 3.0.0. Closes #337
1 parent bd6aab9 commit 2a378a6

11 files changed

+175
-5
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2020
- Support `fetch_latest_metadata` option for crud requests with metadata (#335)
2121
- Support `noreturn` option for data change crud requests (#335)
2222
- Support `crud.schema` request (#336)
23+
- Support `IPROTO_WATCH_ONCE` request type for Tarantool version >= 3.0.0 (#337)
2324

2425
### Changed
2526

example_test.go

+37
Original file line numberDiff line numberDiff line change
@@ -1230,3 +1230,40 @@ func ExampleConnection_CloseGraceful_force() {
12301230
// Result:
12311231
// <nil> connection closed by client (0x4001)
12321232
}
1233+
1234+
func ExampleConnection_watchOnce() {
1235+
const key = "foo"
1236+
const value = "bar"
1237+
1238+
// WatchOnce request present in Tarantool since version 3.0
1239+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
1240+
if err != nil || isLess {
1241+
return
1242+
}
1243+
1244+
server := "127.0.0.1:3013"
1245+
opts := tarantool.Opts{
1246+
Timeout: 5 * time.Second,
1247+
Reconnect: 5 * time.Second,
1248+
MaxReconnects: 3,
1249+
User: "test",
1250+
Pass: "test",
1251+
}
1252+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
1253+
defer cancel()
1254+
conn, err := tarantool.Connect(ctx, server, opts)
1255+
if err != nil {
1256+
fmt.Printf("Failed to connect: %s\n", err)
1257+
return
1258+
}
1259+
defer conn.Close()
1260+
1261+
conn.Do(tarantool.NewBroadcastRequest(key).Value(value)).Get()
1262+
1263+
resp, err := conn.Do(tarantool.NewWatchOnceRequest(key)).Get()
1264+
if err != nil {
1265+
fmt.Printf("Failed to execute the request: %s\n", err)
1266+
} else {
1267+
fmt.Println(resp.Data)
1268+
}
1269+
}

export_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,9 @@ func RefImplRollbackBody(enc *msgpack.Encoder) error {
120120
func RefImplIdBody(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error {
121121
return fillId(enc, protocolInfo)
122122
}
123+
124+
// RefImplWatchOnceBody is reference implementation for filling of an watchOnce
125+
// request's body.
126+
func RefImplWatchOnceBody(enc *msgpack.Encoder, key string) error {
127+
return fillWatchOnce(enc, key)
128+
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
88
github.com/shopspring/decimal v1.3.1
99
github.com/stretchr/testify v1.7.1
10-
github.com/tarantool/go-iproto v0.1.0
10+
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931
1111
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca
1212
github.com/vmihailenco/msgpack/v5 v5.3.5
1313
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
1919
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2020
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
2121
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
22-
github.com/tarantool/go-iproto v0.1.0 h1:zHN9AA8LDawT+JBD0/Nxgr/bIsWkkpDzpcMuaNPSIAQ=
23-
github.com/tarantool/go-iproto v0.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
22+
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931 h1:YrsRc1sDZ6HOZccvM2eJ3Nu2TMBq7NMZMsaT5KCu5qU=
23+
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
2424
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca h1:oOrBh73tDDyooIXajfr+0pfnM+89404ClAhJpTTHI7E=
2525
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A=
2626
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=

protocol.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
type ProtocolVersion uint64
1313

1414
// ProtocolVersion type stores a Tarantool protocol feature.
15-
type ProtocolFeature uint64
15+
type ProtocolFeature iproto.Feature
1616

1717
// ProtocolInfo type aggregates Tarantool protocol version and features info.
1818
type ProtocolInfo struct {
@@ -52,6 +52,11 @@ const (
5252
// PaginationFeature represents support of pagination
5353
// (supported by connector).
5454
PaginationFeature ProtocolFeature = 4
55+
// SpaceAndIndexNamesFeature represents support of using space [index] names
56+
// instead of identifiers support.
57+
SpaceAndIndexNamesFeature ProtocolFeature = 5
58+
// WatchOnceFeature represents support of WatchOnce request types.
59+
WatchOnceFeature ProtocolFeature = 6
5560
)
5661

5762
// String returns the name of a Tarantool feature.
@@ -68,6 +73,10 @@ func (ftr ProtocolFeature) String() string {
6873
return "WatchersFeature"
6974
case PaginationFeature:
7075
return "PaginationFeature"
76+
case SpaceAndIndexNamesFeature:
77+
return "SpaceAndIndexNamesFeature"
78+
case WatchOnceFeature:
79+
return "WatchOnceFeature"
7180
default:
7281
return fmt.Sprintf("Unknown feature (code %d)", ftr)
7382
}
@@ -79,7 +88,7 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{
7988
// introduced in master 948e5cd (possible 2.10.5 or 2.11.0).
8089
// Support of protocol version on connector side was introduced in
8190
// 1.10.0.
82-
Version: ProtocolVersion(4),
91+
Version: ProtocolVersion(6),
8392
// Streams and transactions were introduced in protocol version 1
8493
// (Tarantool 2.10.0), in connector since 1.7.0.
8594
// Error extension type was introduced in protocol
@@ -88,12 +97,18 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{
8897
// connector since 1.10.0.
8998
// Pagination were introduced in protocol version 4 (Tarantool 2.11.0), in
9099
// connector since 1.11.0.
100+
// SpaceAndIndexNames was introduces in protocol version 5
101+
// (Tarantool 3.0.0), in connector since 2.0.0.
102+
// WatchOnce request type was introduces in protocol version 6
103+
// (Tarantool 3.0.0), in connector since 2.0.0.
91104
Features: []ProtocolFeature{
92105
StreamsFeature,
93106
TransactionsFeature,
94107
ErrorExtensionFeature,
95108
WatchersFeature,
96109
PaginationFeature,
110+
SpaceAndIndexNamesFeature,
111+
WatchOnceFeature,
97112
},
98113
}
99114

protocol_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ func TestFeatureStringRepresentation(t *testing.T) {
3232
require.Equal(t, ErrorExtensionFeature.String(), "ErrorExtensionFeature")
3333
require.Equal(t, WatchersFeature.String(), "WatchersFeature")
3434
require.Equal(t, PaginationFeature.String(), "PaginationFeature")
35+
require.Equal(t, SpaceAndIndexNamesFeature.String(), "SpaceAndIndexNamesFeature")
36+
require.Equal(t, WatchOnceFeature.String(), "WatchOnceFeature")
3537

3638
require.Equal(t, ProtocolFeature(15532).String(), "Unknown feature (code 15532)")
3739
}

request.go

+36
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,16 @@ func fillPing(enc *msgpack.Encoder) error {
166166
return enc.EncodeMapLen(0)
167167
}
168168

169+
func fillWatchOnce(enc *msgpack.Encoder, key string) error {
170+
if err := enc.EncodeMapLen(1); err != nil {
171+
return err
172+
}
173+
if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil {
174+
return err
175+
}
176+
return enc.EncodeString(key)
177+
}
178+
169179
// Ping sends empty request to Tarantool to check connection.
170180
//
171181
// Deprecated: the method will be removed in the next major version,
@@ -1354,3 +1364,29 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
13541364
req.ctx = ctx
13551365
return req
13561366
}
1367+
1368+
// WatchOnceRequest synchronously fetches the value currently associated with a
1369+
// specified notification key without subscribing to changes.
1370+
type WatchOnceRequest struct {
1371+
baseRequest
1372+
key string
1373+
}
1374+
1375+
// NewWatchOnceRequest returns a new watchOnceRequest.
1376+
func NewWatchOnceRequest(key string) *WatchOnceRequest {
1377+
req := new(WatchOnceRequest)
1378+
req.rtype = iproto.IPROTO_WATCH_ONCE
1379+
req.key = key
1380+
return req
1381+
}
1382+
1383+
// Body fills an msgpack.Encoder with the watchOnce request body.
1384+
func (req *WatchOnceRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
1385+
return fillWatchOnce(enc, req.key)
1386+
}
1387+
1388+
// Context sets a passed context to the request.
1389+
func (req *WatchOnceRequest) Context(ctx context.Context) *WatchOnceRequest {
1390+
req.ctx = ctx
1391+
return req
1392+
}

request_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ func TestRequestsTypes(t *testing.T) {
196196
{req: NewRollbackRequest(), rtype: iproto.IPROTO_ROLLBACK},
197197
{req: NewIdRequest(validProtocolInfo), rtype: iproto.IPROTO_ID},
198198
{req: NewBroadcastRequest(validKey), rtype: iproto.IPROTO_CALL},
199+
{req: NewWatchOnceRequest(validKey), rtype: iproto.IPROTO_WATCH_ONCE},
199200
}
200201

201202
for _, test := range tests {
@@ -231,6 +232,7 @@ func TestRequestsAsync(t *testing.T) {
231232
{req: NewRollbackRequest(), async: false},
232233
{req: NewIdRequest(validProtocolInfo), async: false},
233234
{req: NewBroadcastRequest(validKey), async: false},
235+
{req: NewWatchOnceRequest(validKey), async: false},
234236
}
235237

236238
for _, test := range tests {
@@ -265,6 +267,7 @@ func TestRequestsCtx_default(t *testing.T) {
265267
{req: NewRollbackRequest(), expected: nil},
266268
{req: NewIdRequest(validProtocolInfo), expected: nil},
267269
{req: NewBroadcastRequest(validKey), expected: nil},
270+
{req: NewWatchOnceRequest(validKey), expected: nil},
268271
}
269272

270273
for _, test := range tests {
@@ -300,6 +303,7 @@ func TestRequestsCtx_setter(t *testing.T) {
300303
{req: NewRollbackRequest().Context(ctx), expected: ctx},
301304
{req: NewIdRequest(validProtocolInfo).Context(ctx), expected: ctx},
302305
{req: NewBroadcastRequest(validKey).Context(ctx), expected: ctx},
306+
{req: NewWatchOnceRequest(validKey).Context(ctx), expected: ctx},
303307
}
304308

305309
for _, test := range tests {
@@ -823,3 +827,17 @@ func TestBroadcastRequestSetters(t *testing.T) {
823827
req := NewBroadcastRequest(validKey).Value(value)
824828
assertBodyEqual(t, refBuf.Bytes(), req)
825829
}
830+
831+
func TestWatchOnceRequestDefaultValues(t *testing.T) {
832+
var refBuf bytes.Buffer
833+
834+
refEnc := msgpack.NewEncoder(&refBuf)
835+
err := RefImplWatchOnceBody(refEnc, validKey)
836+
if err != nil {
837+
t.Errorf("An unexpected RefImplCallBody() error: %q", err.Error())
838+
return
839+
}
840+
841+
req := NewWatchOnceRequest(validKey)
842+
assertBodyEqual(t, refBuf.Bytes(), req)
843+
}

tarantool_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -2606,6 +2606,53 @@ func TestConnectionDoSelectRequest(t *testing.T) {
26062606
testConnectionDoSelectRequestCheck(t, resp, err, false, 10, 1010)
26072607
}
26082608

2609+
func TestConnectionDoWatchOnceRequest(t *testing.T) {
2610+
test_helpers.SkipIfWatchOnceUnsupported(t)
2611+
2612+
conn := test_helpers.ConnectWithValidation(t, server, opts)
2613+
defer conn.Close()
2614+
2615+
_, err := conn.Do(NewBroadcastRequest("hello").Value("world")).Get()
2616+
if err != nil {
2617+
t.Fatalf("Failed to create a broadcast : %s", err.Error())
2618+
}
2619+
2620+
resp, err := conn.Do(NewWatchOnceRequest("hello")).Get()
2621+
if err != nil {
2622+
t.Fatalf("Failed to WatchOnce: %s", err.Error())
2623+
}
2624+
if resp.Code != OkCode {
2625+
t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code)
2626+
}
2627+
if len(resp.Data) < 1 || resp.Data[0] != "world" {
2628+
t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data)
2629+
}
2630+
}
2631+
2632+
func TestConnectionDoWatchOnceOnEmptyKey(t *testing.T) {
2633+
watchOnceNotSupported, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
2634+
if err != nil {
2635+
log.Fatalf("Could not check the Tarantool version")
2636+
}
2637+
if watchOnceNotSupported {
2638+
return
2639+
}
2640+
2641+
conn := test_helpers.ConnectWithValidation(t, server, opts)
2642+
defer conn.Close()
2643+
2644+
resp, err := conn.Do(NewWatchOnceRequest("notexists!")).Get()
2645+
if err != nil {
2646+
t.Fatalf("Failed to WatchOnce: %s", err.Error())
2647+
}
2648+
if resp.Code != OkCode {
2649+
t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code)
2650+
}
2651+
if len(resp.Data) > 0 {
2652+
t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data)
2653+
}
2654+
}
2655+
26092656
func TestConnectionDoSelectRequest_fetch_pos(t *testing.T) {
26102657
test_helpers.SkipIfPaginationUnsupported(t)
26112658

test_helpers/utils.go

+8
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,14 @@ func SkipIfPaginationUnsupported(t *testing.T) {
192192
SkipIfFeatureUnsupported(t, "pagination", 2, 11, 0)
193193
}
194194

195+
// SkipIfWatchOnceUnsupported skips test run if Tarantool without WatchOnce
196+
// request type is used.
197+
func SkipIfWatchOnceUnsupported(t *testing.T) {
198+
t.Helper()
199+
200+
SkipIfFeatureUnsupported(t, "WatchOnce", 3, 0, 0)
201+
}
202+
195203
// CheckEqualBoxErrors checks equivalence of tarantool.BoxError objects.
196204
//
197205
// Tarantool errors are not comparable by nature:

0 commit comments

Comments
 (0)