Skip to content

Commit 6b08e7b

Browse files
committed
streams: interactive transactions and support
The main purpose of streams is transactions via iproto. Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. API for this feature is the following: * `NewStream()` method to create a stream object for `Connection` and `NewStream(userMode Mode)` method to create a stream object for `ConnectionPool` * stream object `Stream` with `Do()` method and new request objects to work with stream, `BeginRequest` - start transaction via iproto stream; `CommitRequest` - commit transaction; `RollbackRequest` - rollback transaction. Closes #101
1 parent 903b091 commit 6b08e7b

17 files changed

+1749
-33
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1616
- Support decimal type in msgpack (#96)
1717
- Support datetime type in msgpack (#118)
1818
- Prepared SQL statements (#117)
19+
- Streams and interactive transactions support (#101)
1920

2021
### Changed
2122

connection.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
)
2020

2121
const requestsMap = 128
22+
const ignoreStreamId = 0
2223
const (
2324
connDisconnected = 0
2425
connConnected = 1
@@ -139,6 +140,8 @@ type Connection struct {
139140
state uint32
140141
dec *msgpack.Decoder
141142
lenbuf [PacketLengthBytes]byte
143+
144+
lastStreamId uint64
142145
}
143146

144147
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -468,16 +471,27 @@ func (conn *Connection) dial() (err error) {
468471
return
469472
}
470473

471-
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
474+
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
475+
req Request, streamId uint64, res SchemaResolver) (err error) {
472476
hl := h.Len()
473-
h.Write([]byte{
477+
478+
hMapLen := byte(0x82) // 2 element map.
479+
if streamId != ignoreStreamId {
480+
hMapLen = byte(0x83) // 3 element map.
481+
}
482+
hBytes := []byte{
474483
0xce, 0, 0, 0, 0, // Length.
475-
0x82, // 2 element map.
484+
hMapLen,
476485
KeyCode, byte(req.Code()), // Request code.
477486
KeySync, 0xce,
478487
byte(reqid >> 24), byte(reqid >> 16),
479488
byte(reqid >> 8), byte(reqid),
480-
})
489+
}
490+
if streamId != ignoreStreamId {
491+
hBytes = append(hBytes, KeyStreamId, byte(streamId))
492+
}
493+
494+
h.Write(hBytes)
481495

482496
if err = req.Body(res, enc); err != nil {
483497
return
@@ -495,7 +509,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
495509
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
496510
var packet smallWBuf
497511
req := newAuthRequest(conn.opts.User, string(scramble))
498-
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)
512+
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, ignoreStreamId, conn.Schema)
499513

500514
if err != nil {
501515
return errors.New("auth: pack error " + err.Error())
@@ -785,16 +799,16 @@ func (conn *Connection) newFuture() (fut *Future) {
785799
return
786800
}
787801

788-
func (conn *Connection) send(req Request) *Future {
802+
func (conn *Connection) send(req Request, streamId uint64) *Future {
789803
fut := conn.newFuture()
790804
if fut.ready == nil {
791805
return fut
792806
}
793-
conn.putFuture(fut, req)
807+
conn.putFuture(fut, req, streamId)
794808
return fut
795809
}
796810

797-
func (conn *Connection) putFuture(fut *Future, req Request) {
811+
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
798812
shardn := fut.requestId & (conn.opts.Concurrency - 1)
799813
shard := &conn.shard[shardn]
800814
shard.bufmut.Lock()
@@ -811,7 +825,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
811825
}
812826
blen := shard.buf.Len()
813827
reqid := fut.requestId
814-
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
828+
if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil {
815829
shard.buf.Trunc(blen)
816830
shard.bufmut.Unlock()
817831
if f := conn.fetchFuture(reqid); f == fut {
@@ -1000,7 +1014,7 @@ func (conn *Connection) Do(req Request) *Future {
10001014
return fut
10011015
}
10021016
}
1003-
return conn.send(req)
1017+
return conn.send(req, ignoreStreamId)
10041018
}
10051019

10061020
// ConfiguredTimeout returns a timeout from connection config.
@@ -1026,3 +1040,16 @@ func (conn *Connection) NewPrepared(expr string) (*Prepared, error) {
10261040
}
10271041
return NewPreparedFromResponse(conn, resp)
10281042
}
1043+
1044+
// NewStream creates new Stream object for connection.
1045+
//
1046+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1047+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1048+
// Since 1.7.0
1049+
func (conn *Connection) NewStream() (*Stream, error) {
1050+
next := atomic.AddUint64(&conn.lastStreamId, 1)
1051+
return &Stream{
1052+
Id: next,
1053+
Conn: conn,
1054+
}, nil
1055+
}

connection_pool/connection_pool.go

+14
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,20 @@ func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarant
544544
return conn.Do(req)
545545
}
546546

547+
// NewStream creates new Stream object for connection selected
548+
// by userMode from connPool.
549+
//
550+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
551+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
552+
// Since 1.7.0
553+
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
554+
conn, err := connPool.getNextConnection(userMode)
555+
if err != nil {
556+
return nil, err
557+
}
558+
return conn.NewStream()
559+
}
560+
547561
//
548562
// private
549563
//

0 commit comments

Comments
 (0)