Skip to content

Commit 3ce90ac

Browse files
committed
streams: interactive transactions and support
The main purpose of streams is transactions via iproto. Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. API for this feature is the following: * `NewStream()` method to create a stream object for `Connection` and `NewStream(userMode Mode)` method to create a stream object for `ConnectionPool` * stream object `Stream` with `Do()`, `Begin()`, `Commit()`, `Rollback()` methods, `Begin()` - start transaction via iproto stream; `Commit()` - commit transaction; `Rollback()` - rollback transaction. Closes #101
1 parent 9b0ec8a commit 3ce90ac

File tree

9 files changed

+786
-22
lines changed

9 files changed

+786
-22
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
- Public API with request object types (#126)
1616
- Support decimal type in msgpack (#96)
1717
- Support datetime type in msgpack (#118)
18+
- Streams and interactive transactions support (#101)
1819

1920
### Changed
2021

connection.go

+26-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
)
2020

2121
const requestsMap = 128
22+
const defaultStreamId = 0
2223
const (
2324
connDisconnected = 0
2425
connConnected = 1
@@ -139,6 +140,8 @@ type Connection struct {
139140
state uint32
140141
dec *msgpack.Decoder
141142
lenbuf [PacketLengthBytes]byte
143+
144+
lastStreamId uint32
142145
}
143146

144147
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -468,15 +471,17 @@ func (conn *Connection) dial() (err error) {
468471
return
469472
}
470473

471-
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
474+
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
475+
req Request, streamId uint32, res SchemaResolver) (err error) {
472476
hl := h.Len()
473477
h.Write([]byte{
474478
0xce, 0, 0, 0, 0, // Length.
475-
0x82, // 2 element map.
479+
0x83, // 3 element map.
476480
KeyCode, byte(req.Code()), // Request code.
477481
KeySync, 0xce,
478482
byte(reqid >> 24), byte(reqid >> 16),
479483
byte(reqid >> 8), byte(reqid),
484+
KeyStreamId, byte(streamId),
480485
})
481486

482487
if err = req.Body(res, enc); err != nil {
@@ -495,7 +500,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
495500
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
496501
var packet smallWBuf
497502
req := newAuthRequest(conn.opts.User, string(scramble))
498-
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)
503+
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, defaultStreamId, conn.Schema)
499504

500505
if err != nil {
501506
return errors.New("auth: pack error " + err.Error())
@@ -785,16 +790,16 @@ func (conn *Connection) newFuture() (fut *Future) {
785790
return
786791
}
787792

788-
func (conn *Connection) send(req Request) *Future {
793+
func (conn *Connection) send(req Request, streamId uint32) *Future {
789794
fut := conn.newFuture()
790795
if fut.ready == nil {
791796
return fut
792797
}
793-
conn.putFuture(fut, req)
798+
conn.putFuture(fut, req, streamId)
794799
return fut
795800
}
796801

797-
func (conn *Connection) putFuture(fut *Future, req Request) {
802+
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint32) {
798803
shardn := fut.requestId & (conn.opts.Concurrency - 1)
799804
shard := &conn.shard[shardn]
800805
shard.bufmut.Lock()
@@ -811,7 +816,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
811816
}
812817
blen := shard.buf.Len()
813818
reqid := fut.requestId
814-
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
819+
if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil {
815820
shard.buf.Trunc(blen)
816821
shard.bufmut.Unlock()
817822
if f := conn.fetchFuture(reqid); f == fut {
@@ -993,7 +998,7 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
993998
// An error is returned if the request was formed incorrectly, or failed to
994999
// create the future.
9951000
func (conn *Connection) Do(req Request) *Future {
996-
return conn.send(req)
1001+
return conn.send(req, defaultStreamId)
9971002
}
9981003

9991004
// ConfiguredTimeout returns a timeout from connection config.
@@ -1009,3 +1014,16 @@ func (conn *Connection) OverrideSchema(s *Schema) {
10091014
conn.Schema = s
10101015
}
10111016
}
1017+
1018+
// NewStream creates new Stream object for connection.
1019+
//
1020+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1021+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1022+
// Since 1.7.0
1023+
func (conn *Connection) NewStream() *Stream {
1024+
conn.lastStreamId += 1
1025+
return &Stream{
1026+
Id: conn.lastStreamId,
1027+
Conn: conn,
1028+
}
1029+
}

connection_pool/connection_pool.go

+14
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,20 @@ func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarant
534534
return conn.Do(req)
535535
}
536536

537+
// NewStream creates new Stream object for connection selected
538+
// by userMode from connPool.
539+
//
540+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
541+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
542+
// Since 1.7.0
543+
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
544+
conn, err := connPool.getNextConnection(userMode)
545+
if err != nil {
546+
return nil, err
547+
}
548+
return conn.NewStream(), nil
549+
}
550+
537551
//
538552
// private
539553
//

connection_pool/connection_pool_test.go

+217-6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ var defaultTimeoutRetry = 500 * time.Millisecond
3939

4040
var instances []test_helpers.TarantoolInstance
4141

42+
var tarantoolVersionIsLess bool
43+
4244
func TestConnError_IncorrectParams(t *testing.T) {
4345
connPool, err := connection_pool.Connect([]string{}, tarantool.Opts{})
4446
require.Nilf(t, connPool, "conn is not nil with incorrect param")
@@ -1276,6 +1278,203 @@ func TestDo(t *testing.T) {
12761278
require.NotNilf(t, resp, "response is nil after Ping")
12771279
}
12781280

1281+
func TestStream_Commit(t *testing.T) {
1282+
var req tarantool.Request
1283+
var resp *tarantool.Response
1284+
var err error
1285+
1286+
// Tarantool supports streams and interactive transactions since version 2.10.0
1287+
if tarantoolVersionIsLess {
1288+
t.Skip()
1289+
}
1290+
1291+
roles := []bool{true, true, false, true, true}
1292+
1293+
err = test_helpers.SetClusterRO(servers, connOpts, roles)
1294+
require.Nilf(t, err, "fail to set roles for cluster")
1295+
1296+
connPool, err := connection_pool.Connect(servers, connOpts)
1297+
require.Nilf(t, err, "failed to connect")
1298+
require.NotNilf(t, connPool, "conn is nil after Connect")
1299+
defer connPool.Close()
1300+
1301+
stream, err := connPool.NewStream(connection_pool.PreferRW)
1302+
require.Nilf(t, err, "failed to create stream")
1303+
require.NotNilf(t, connPool, "stream is nil after NewStream")
1304+
1305+
// Begin transaction
1306+
resp, err = stream.Begin()
1307+
require.Nilf(t, err, "failed to Begin")
1308+
require.NotNilf(t, resp, "response is nil after Begin")
1309+
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned")
1310+
1311+
// Insert in stream
1312+
req = tarantool.NewInsertRequest(spaceName).
1313+
Tuple([]interface{}{"commit_key", "commit_value"})
1314+
resp, err = stream.Do(req).Get()
1315+
require.Nilf(t, err, "failed to Insert")
1316+
require.NotNilf(t, resp, "response is nil after Insert")
1317+
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned")
1318+
1319+
// Connect to servers[2] to check if tuple
1320+
// was inserted outside of stream on RW instance
1321+
// before transaction commit
1322+
conn, err := tarantool.Connect(servers[2], connOpts)
1323+
require.Nilf(t, err, "failed to connect %s", servers[2])
1324+
require.NotNilf(t, conn, "conn is nil after Connect")
1325+
1326+
// Select not related to the transaction
1327+
// while transaction is not committed
1328+
// result of select is empty
1329+
req = tarantool.NewSelectRequest(spaceNo).
1330+
Index(indexNo).
1331+
Offset(0).
1332+
Limit(1).
1333+
Iterator(tarantool.IterEq).
1334+
Key([]interface{}{"commit_key"})
1335+
resp, err = conn.Do(req).Get()
1336+
require.Nilf(t, err, "failed to Select")
1337+
require.NotNilf(t, resp, "response is nil after Select")
1338+
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")
1339+
1340+
// Select in stream
1341+
resp, err = stream.Do(req).Get()
1342+
require.Nilf(t, err, "failed to Select")
1343+
require.NotNilf(t, resp, "response is nil after Select")
1344+
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")
1345+
1346+
tpl, ok := resp.Data[0].([]interface{})
1347+
require.Truef(t, ok, "unexpected body of Select")
1348+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1349+
1350+
key, ok := tpl[0].(string)
1351+
require.Truef(t, ok, "unexpected body of Select (0)")
1352+
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")
1353+
1354+
value, ok := tpl[1].(string)
1355+
require.Truef(t, ok, "unexpected body of Select (1)")
1356+
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")
1357+
1358+
// Commit transaction
1359+
resp, err = stream.Commit()
1360+
require.Nilf(t, err, "failed to Commit")
1361+
require.NotNilf(t, resp, "response is nil after Commit")
1362+
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Commit: wrong code returned")
1363+
1364+
// Select outside of transaction
1365+
resp, err = conn.Do(req).Get()
1366+
require.Nilf(t, err, "failed to Select")
1367+
require.NotNilf(t, resp, "response is nil after Select")
1368+
require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select")
1369+
1370+
tpl, ok = resp.Data[0].([]interface{})
1371+
require.Truef(t, ok, "unexpected body of Select")
1372+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1373+
1374+
key, ok = tpl[0].(string)
1375+
require.Truef(t, ok, "unexpected body of Select (0)")
1376+
require.Equalf(t, "commit_key", key, "unexpected body of Select (0)")
1377+
1378+
value, ok = tpl[1].(string)
1379+
require.Truef(t, ok, "unexpected body of Select (1)")
1380+
require.Equalf(t, "commit_value", value, "unexpected body of Select (1)")
1381+
}
1382+
1383+
func TestStream_Rollback(t *testing.T) {
1384+
var req tarantool.Request
1385+
var resp *tarantool.Response
1386+
var err error
1387+
1388+
// Tarantool supports streams and interactive transactions since version 2.10.0
1389+
if tarantoolVersionIsLess {
1390+
t.Skip()
1391+
}
1392+
1393+
roles := []bool{true, true, false, true, true}
1394+
1395+
err = test_helpers.SetClusterRO(servers, connOpts, roles)
1396+
require.Nilf(t, err, "fail to set roles for cluster")
1397+
1398+
connPool, err := connection_pool.Connect(servers, connOpts)
1399+
require.Nilf(t, err, "failed to connect")
1400+
require.NotNilf(t, connPool, "conn is nil after Connect")
1401+
defer connPool.Close()
1402+
1403+
stream, err := connPool.NewStream(connection_pool.PreferRW)
1404+
require.Nilf(t, err, "failed to create stream")
1405+
require.NotNilf(t, connPool, "stream is nil after NewStream")
1406+
1407+
// Begin transaction
1408+
resp, err = stream.Begin()
1409+
require.Nilf(t, err, "failed to Begin")
1410+
require.NotNilf(t, resp, "response is nil after Begin")
1411+
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Begin: wrong code returned")
1412+
1413+
// Insert in stream
1414+
req = tarantool.NewInsertRequest(spaceName).
1415+
Tuple([]interface{}{"rollback_key", "rollback_value"})
1416+
resp, err = stream.Do(req).Get()
1417+
require.Nilf(t, err, "failed to Insert")
1418+
require.NotNilf(t, resp, "response is nil after Insert")
1419+
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Insert: wrong code returned")
1420+
1421+
// Connect to servers[2] to check if tuple
1422+
// was not inserted outside of stream on RW instance
1423+
conn, err := tarantool.Connect(servers[2], connOpts)
1424+
require.Nilf(t, err, "failed to connect %s", servers[2])
1425+
require.NotNilf(t, conn, "conn is nil after Connect")
1426+
1427+
// Select not related to the transaction
1428+
// while transaction is not committed
1429+
// result of select is empty
1430+
req = tarantool.NewSelectRequest(spaceNo).
1431+
Index(indexNo).
1432+
Offset(0).
1433+
Limit(1).
1434+
Iterator(tarantool.IterEq).
1435+
Key([]interface{}{"rollback_key"})
1436+
resp, err = conn.Do(req).Get()
1437+
require.Nilf(t, err, "failed to Select")
1438+
require.NotNilf(t, resp, "response is nil after Select")
1439+
require.Equalf(t, 0, len(resp.Data), "response Data len != 0")
1440+
1441+
// Select in stream
1442+
resp, err = stream.Do(req).Get()
1443+
require.Nilf(t, err, "failed to Select")
1444+
require.NotNilf(t, resp, "response is nil after Select")
1445+
require.Equalf(t, 1, len(resp.Data), "response Body len != 1 after Select")
1446+
1447+
tpl, ok := resp.Data[0].([]interface{})
1448+
require.Truef(t, ok, "unexpected body of Select")
1449+
require.Equalf(t, 2, len(tpl), "unexpected body of Select")
1450+
1451+
key, ok := tpl[0].(string)
1452+
require.Truef(t, ok, "unexpected body of Select (0)")
1453+
require.Equalf(t, "rollback_key", key, "unexpected body of Select (0)")
1454+
1455+
value, ok := tpl[1].(string)
1456+
require.Truef(t, ok, "unexpected body of Select (1)")
1457+
require.Equalf(t, "rollback_value", value, "unexpected body of Select (1)")
1458+
1459+
// Rollback transaction
1460+
resp, err = stream.Rollback()
1461+
require.Nilf(t, err, "failed to Rollback")
1462+
require.NotNilf(t, resp, "response is nil after Rollback")
1463+
require.Equalf(t, tarantool.OkCode, resp.Code, "failed to Rollback: wrong code returned")
1464+
1465+
// Select outside of transaction
1466+
resp, err = conn.Do(req).Get()
1467+
require.Nilf(t, err, "failed to Select")
1468+
require.NotNilf(t, resp, "response is nil after Select")
1469+
require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select")
1470+
1471+
// Select inside of stream after rollback
1472+
resp, err = stream.Do(req).Get()
1473+
require.Nilf(t, err, "failed to Select")
1474+
require.NotNilf(t, resp, "response is nil after Select")
1475+
require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select")
1476+
}
1477+
12791478
// runTestMain is a body of TestMain function
12801479
// (see https://pkg.go.dev/testing#hdr-Main).
12811480
// Using defer + os.Exit is not works so TestMain body
@@ -1292,13 +1491,25 @@ func runTestMain(m *testing.M) int {
12921491
"work_dir5"}
12931492
var err error
12941493

1494+
memtxUseMvccEngine := true
1495+
1496+
// Tarantool supports streams and interactive transactions since version 2.10.0
1497+
tarantoolVersionIsLess, err = test_helpers.IsTarantoolVersionLess(2, 10, 0)
1498+
if err != nil {
1499+
log.Fatalf("Could not check the Tarantool version")
1500+
}
1501+
if tarantoolVersionIsLess {
1502+
memtxUseMvccEngine = false
1503+
}
1504+
12951505
instances, err = test_helpers.StartTarantoolInstances(servers, workDirs, test_helpers.StartOpts{
1296-
InitScript: initScript,
1297-
User: connOpts.User,
1298-
Pass: connOpts.Pass,
1299-
WaitStart: waitStart,
1300-
ConnectRetry: connectRetry,
1301-
RetryTimeout: retryTimeout,
1506+
InitScript: initScript,
1507+
User: connOpts.User,
1508+
Pass: connOpts.Pass,
1509+
WaitStart: waitStart,
1510+
ConnectRetry: connectRetry,
1511+
RetryTimeout: retryTimeout,
1512+
MemtxUseMvccEngine: memtxUseMvccEngine,
13021513
})
13031514

13041515
if err != nil {

const.go

+4
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ const (
1212
UpsertRequestCode = 9
1313
Call17RequestCode = 10 /* call in >= 1.7 format */
1414
ExecuteRequestCode = 11
15+
BeginRequestCode = 14
16+
CommitRequestCode = 15
17+
RollbackRequestCode = 16
1518
PingRequestCode = 64
1619
SubscribeRequestCode = 66
1720

1821
KeyCode = 0x00
1922
KeySync = 0x01
23+
KeyStreamId = 0x0a
2024
KeySpaceNo = 0x10
2125
KeyIndexNo = 0x11
2226
KeyLimit = 0x12

0 commit comments

Comments
 (0)