Skip to content

Commit 3f7860e

Browse files
DerekBumoleg-jukovec
authored andcommitted
api: support IPROTO_WATCH_ONCE request type
Add support of `IPROTO_WATCH_ONCE` request type. It works only for Tarantool version >= 3.0.0-alpha1. Part of #337
1 parent a422262 commit 3f7860e

11 files changed

+159
-8
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
2020
- Support `fetch_latest_metadata` option for crud requests with metadata (#335)
2121
- Support `noreturn` option for data change crud requests (#335)
2222
- Support `crud.schema` request (#336)
23+
- Support `IPROTO_WATCH_ONCE` request type for Tarantool
24+
version >= 3.0.0-alpha1 (#337)
2325

2426
### Changed
2527

example_test.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -626,12 +626,13 @@ func ExampleProtocolVersion() {
626626
fmt.Println("Connector client protocol feature:", f)
627627
}
628628
// Output:
629-
// Connector client protocol version: 4
629+
// Connector client protocol version: 6
630630
// Connector client protocol feature: StreamsFeature
631631
// Connector client protocol feature: TransactionsFeature
632632
// Connector client protocol feature: ErrorExtensionFeature
633633
// Connector client protocol feature: WatchersFeature
634634
// Connector client protocol feature: PaginationFeature
635+
// Connector client protocol feature: WatchOnceFeature
635636
}
636637

637638
func getTestTxnOpts() tarantool.Opts {
@@ -1230,3 +1231,26 @@ func ExampleConnection_CloseGraceful_force() {
12301231
// Result:
12311232
// <nil> connection closed by client (0x4001)
12321233
}
1234+
1235+
func ExampleWatchOnceRequest() {
1236+
const key = "foo"
1237+
const value = "bar"
1238+
1239+
// WatchOnce request present in Tarantool since version 3.0
1240+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
1241+
if err != nil || isLess {
1242+
return
1243+
}
1244+
1245+
conn := exampleConnect(opts)
1246+
defer conn.Close()
1247+
1248+
conn.Do(tarantool.NewBroadcastRequest(key).Value(value)).Get()
1249+
1250+
resp, err := conn.Do(tarantool.NewWatchOnceRequest(key)).Get()
1251+
if err != nil {
1252+
fmt.Printf("Failed to execute the request: %s\n", err)
1253+
} else {
1254+
fmt.Println(resp.Data)
1255+
}
1256+
}

export_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,9 @@ func RefImplRollbackBody(enc *msgpack.Encoder) error {
120120
func RefImplIdBody(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error {
121121
return fillId(enc, protocolInfo)
122122
}
123+
124+
// RefImplWatchOnceBody is reference implementation for filling of an watchOnce
125+
// request's body.
126+
func RefImplWatchOnceBody(enc *msgpack.Encoder, key string) error {
127+
return fillWatchOnce(enc, key)
128+
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
88
github.com/shopspring/decimal v1.3.1
99
github.com/stretchr/testify v1.7.1
10-
github.com/tarantool/go-iproto v0.1.0
10+
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931
1111
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca
1212
github.com/vmihailenco/msgpack/v5 v5.3.5
1313
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
1919
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
2020
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
2121
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
22-
github.com/tarantool/go-iproto v0.1.0 h1:zHN9AA8LDawT+JBD0/Nxgr/bIsWkkpDzpcMuaNPSIAQ=
23-
github.com/tarantool/go-iproto v0.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
22+
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931 h1:YrsRc1sDZ6HOZccvM2eJ3Nu2TMBq7NMZMsaT5KCu5qU=
23+
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
2424
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca h1:oOrBh73tDDyooIXajfr+0pfnM+89404ClAhJpTTHI7E=
2525
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A=
2626
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=

protocol.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
type ProtocolVersion uint64
1313

1414
// ProtocolVersion type stores a Tarantool protocol feature.
15-
type ProtocolFeature uint64
15+
type ProtocolFeature iproto.Feature
1616

1717
// ProtocolInfo type aggregates Tarantool protocol version and features info.
1818
type ProtocolInfo struct {
@@ -52,6 +52,8 @@ const (
5252
// PaginationFeature represents support of pagination
5353
// (supported by connector).
5454
PaginationFeature ProtocolFeature = 4
55+
// WatchOnceFeature represents support of WatchOnce request types.
56+
WatchOnceFeature ProtocolFeature = 6
5557
)
5658

5759
// String returns the name of a Tarantool feature.
@@ -68,6 +70,8 @@ func (ftr ProtocolFeature) String() string {
6870
return "WatchersFeature"
6971
case PaginationFeature:
7072
return "PaginationFeature"
73+
case WatchOnceFeature:
74+
return "WatchOnceFeature"
7175
default:
7276
return fmt.Sprintf("Unknown feature (code %d)", ftr)
7377
}
@@ -79,7 +83,7 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{
7983
// introduced in master 948e5cd (possible 2.10.5 or 2.11.0).
8084
// Support of protocol version on connector side was introduced in
8185
// 1.10.0.
82-
Version: ProtocolVersion(4),
86+
Version: ProtocolVersion(6),
8387
// Streams and transactions were introduced in protocol version 1
8488
// (Tarantool 2.10.0), in connector since 1.7.0.
8589
// Error extension type was introduced in protocol
@@ -88,12 +92,15 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{
8892
// connector since 1.10.0.
8993
// Pagination were introduced in protocol version 4 (Tarantool 2.11.0), in
9094
// connector since 1.11.0.
95+
// WatchOnce request type was introduces in protocol version 6
96+
// (Tarantool 3.0.0), in connector since 2.0.0.
9197
Features: []ProtocolFeature{
9298
StreamsFeature,
9399
TransactionsFeature,
94100
ErrorExtensionFeature,
95101
WatchersFeature,
96102
PaginationFeature,
103+
WatchOnceFeature,
97104
},
98105
}
99106

protocol_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func TestFeatureStringRepresentation(t *testing.T) {
3232
require.Equal(t, ErrorExtensionFeature.String(), "ErrorExtensionFeature")
3333
require.Equal(t, WatchersFeature.String(), "WatchersFeature")
3434
require.Equal(t, PaginationFeature.String(), "PaginationFeature")
35+
require.Equal(t, WatchOnceFeature.String(), "WatchOnceFeature")
3536

3637
require.Equal(t, ProtocolFeature(15532).String(), "Unknown feature (code 15532)")
3738
}

request.go

+36
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,16 @@ func fillPing(enc *msgpack.Encoder) error {
166166
return enc.EncodeMapLen(0)
167167
}
168168

169+
func fillWatchOnce(enc *msgpack.Encoder, key string) error {
170+
if err := enc.EncodeMapLen(1); err != nil {
171+
return err
172+
}
173+
if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil {
174+
return err
175+
}
176+
return enc.EncodeString(key)
177+
}
178+
169179
// Ping sends empty request to Tarantool to check connection.
170180
//
171181
// Deprecated: the method will be removed in the next major version,
@@ -1354,3 +1364,29 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
13541364
req.ctx = ctx
13551365
return req
13561366
}
1367+
1368+
// WatchOnceRequest synchronously fetches the value currently associated with a
1369+
// specified notification key without subscribing to changes.
1370+
type WatchOnceRequest struct {
1371+
baseRequest
1372+
key string
1373+
}
1374+
1375+
// NewWatchOnceRequest returns a new watchOnceRequest.
1376+
func NewWatchOnceRequest(key string) *WatchOnceRequest {
1377+
req := new(WatchOnceRequest)
1378+
req.rtype = iproto.IPROTO_WATCH_ONCE
1379+
req.key = key
1380+
return req
1381+
}
1382+
1383+
// Body fills an msgpack.Encoder with the watchOnce request body.
1384+
func (req *WatchOnceRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
1385+
return fillWatchOnce(enc, req.key)
1386+
}
1387+
1388+
// Context sets a passed context to the request.
1389+
func (req *WatchOnceRequest) Context(ctx context.Context) *WatchOnceRequest {
1390+
req.ctx = ctx
1391+
return req
1392+
}

request_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ func TestRequestsTypes(t *testing.T) {
196196
{req: NewRollbackRequest(), rtype: iproto.IPROTO_ROLLBACK},
197197
{req: NewIdRequest(validProtocolInfo), rtype: iproto.IPROTO_ID},
198198
{req: NewBroadcastRequest(validKey), rtype: iproto.IPROTO_CALL},
199+
{req: NewWatchOnceRequest(validKey), rtype: iproto.IPROTO_WATCH_ONCE},
199200
}
200201

201202
for _, test := range tests {
@@ -231,6 +232,7 @@ func TestRequestsAsync(t *testing.T) {
231232
{req: NewRollbackRequest(), async: false},
232233
{req: NewIdRequest(validProtocolInfo), async: false},
233234
{req: NewBroadcastRequest(validKey), async: false},
235+
{req: NewWatchOnceRequest(validKey), async: false},
234236
}
235237

236238
for _, test := range tests {
@@ -265,6 +267,7 @@ func TestRequestsCtx_default(t *testing.T) {
265267
{req: NewRollbackRequest(), expected: nil},
266268
{req: NewIdRequest(validProtocolInfo), expected: nil},
267269
{req: NewBroadcastRequest(validKey), expected: nil},
270+
{req: NewWatchOnceRequest(validKey), expected: nil},
268271
}
269272

270273
for _, test := range tests {
@@ -300,6 +303,7 @@ func TestRequestsCtx_setter(t *testing.T) {
300303
{req: NewRollbackRequest().Context(ctx), expected: ctx},
301304
{req: NewIdRequest(validProtocolInfo).Context(ctx), expected: ctx},
302305
{req: NewBroadcastRequest(validKey).Context(ctx), expected: ctx},
306+
{req: NewWatchOnceRequest(validKey).Context(ctx), expected: ctx},
303307
}
304308

305309
for _, test := range tests {
@@ -823,3 +827,17 @@ func TestBroadcastRequestSetters(t *testing.T) {
823827
req := NewBroadcastRequest(validKey).Value(value)
824828
assertBodyEqual(t, refBuf.Bytes(), req)
825829
}
830+
831+
func TestWatchOnceRequestDefaultValues(t *testing.T) {
832+
var refBuf bytes.Buffer
833+
834+
refEnc := msgpack.NewEncoder(&refBuf)
835+
err := RefImplWatchOnceBody(refEnc, validKey)
836+
if err != nil {
837+
t.Errorf("An unexpected RefImplCallBody() error: %q", err.Error())
838+
return
839+
}
840+
841+
req := NewWatchOnceRequest(validKey)
842+
assertBodyEqual(t, refBuf.Bytes(), req)
843+
}

tarantool_test.go

+51-2
Original file line numberDiff line numberDiff line change
@@ -2606,6 +2606,53 @@ func TestConnectionDoSelectRequest(t *testing.T) {
26062606
testConnectionDoSelectRequestCheck(t, resp, err, false, 10, 1010)
26072607
}
26082608

2609+
func TestConnectionDoWatchOnceRequest(t *testing.T) {
2610+
test_helpers.SkipIfWatchOnceUnsupported(t)
2611+
2612+
conn := test_helpers.ConnectWithValidation(t, server, opts)
2613+
defer conn.Close()
2614+
2615+
_, err := conn.Do(NewBroadcastRequest("hello").Value("world")).Get()
2616+
if err != nil {
2617+
t.Fatalf("Failed to create a broadcast : %s", err.Error())
2618+
}
2619+
2620+
resp, err := conn.Do(NewWatchOnceRequest("hello")).Get()
2621+
if err != nil {
2622+
t.Fatalf("Failed to WatchOnce: %s", err.Error())
2623+
}
2624+
if resp.Code != OkCode {
2625+
t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code)
2626+
}
2627+
if len(resp.Data) < 1 || resp.Data[0] != "world" {
2628+
t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data)
2629+
}
2630+
}
2631+
2632+
func TestConnectionDoWatchOnceOnEmptyKey(t *testing.T) {
2633+
watchOnceNotSupported, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
2634+
if err != nil {
2635+
log.Fatalf("Could not check the Tarantool version")
2636+
}
2637+
if watchOnceNotSupported {
2638+
return
2639+
}
2640+
2641+
conn := test_helpers.ConnectWithValidation(t, server, opts)
2642+
defer conn.Close()
2643+
2644+
resp, err := conn.Do(NewWatchOnceRequest("notexists!")).Get()
2645+
if err != nil {
2646+
t.Fatalf("Failed to WatchOnce: %s", err.Error())
2647+
}
2648+
if resp.Code != OkCode {
2649+
t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code)
2650+
}
2651+
if len(resp.Data) > 0 {
2652+
t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data)
2653+
}
2654+
}
2655+
26092656
func TestConnectionDoSelectRequest_fetch_pos(t *testing.T) {
26102657
test_helpers.SkipIfPaginationUnsupported(t)
26112658

@@ -3247,13 +3294,14 @@ func TestConnectionProtocolInfoSupported(t *testing.T) {
32473294
require.Equal(t,
32483295
clientProtocolInfo,
32493296
ProtocolInfo{
3250-
Version: ProtocolVersion(4),
3297+
Version: ProtocolVersion(6),
32513298
Features: []ProtocolFeature{
32523299
StreamsFeature,
32533300
TransactionsFeature,
32543301
ErrorExtensionFeature,
32553302
WatchersFeature,
32563303
PaginationFeature,
3304+
WatchOnceFeature,
32573305
},
32583306
})
32593307

@@ -3364,13 +3412,14 @@ func TestConnectionProtocolInfoUnsupported(t *testing.T) {
33643412
require.Equal(t,
33653413
clientProtocolInfo,
33663414
ProtocolInfo{
3367-
Version: ProtocolVersion(4),
3415+
Version: ProtocolVersion(6),
33683416
Features: []ProtocolFeature{
33693417
StreamsFeature,
33703418
TransactionsFeature,
33713419
ErrorExtensionFeature,
33723420
WatchersFeature,
33733421
PaginationFeature,
3422+
WatchOnceFeature,
33743423
},
33753424
})
33763425

test_helpers/utils.go

+8
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,14 @@ func SkipIfPaginationUnsupported(t *testing.T) {
192192
SkipIfFeatureUnsupported(t, "pagination", 2, 11, 0)
193193
}
194194

195+
// SkipIfWatchOnceUnsupported skips test run if Tarantool without WatchOnce
196+
// request type is used.
197+
func SkipIfWatchOnceUnsupported(t *testing.T) {
198+
t.Helper()
199+
200+
SkipIfFeatureUnsupported(t, "watch once", 3, 0, 0)
201+
}
202+
195203
// CheckEqualBoxErrors checks equivalence of tarantool.BoxError objects.
196204
//
197205
// Tarantool errors are not comparable by nature:

0 commit comments

Comments
 (0)