Skip to content

Commit a027a59

Browse files
committed
streams: interactive transactions and support
The main purpose of streams is transactions via iproto. Since v. 2.10.0, Tarantool supports streams and interactive transactions over them. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. API for this feature is the following: * `NewStream()` method to create a stream object for `Connection` and `NewStream(userMode Mode)` method to create a stream object for `ConnectionPool` * stream object `Stream` with `Do()`, `Begin()`, `Commit()`, `Rollback()` methods, `Begin()` - start transaction via iproto stream; `Commit()` - commit transaction; `Rollback()` - rollback transaction. Closes #101
1 parent 9b0ec8a commit a027a59

File tree

9 files changed

+1145
-24
lines changed

9 files changed

+1145
-24
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1515
- Public API with request object types (#126)
1616
- Support decimal type in msgpack (#96)
1717
- Support datetime type in msgpack (#118)
18+
- Streams and interactive transactions support (#101)
1819

1920
### Changed
2021

connection.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
)
2020

2121
const requestsMap = 128
22+
const ignoreStreamId = 0
2223
const (
2324
connDisconnected = 0
2425
connConnected = 1
@@ -139,6 +140,8 @@ type Connection struct {
139140
state uint32
140141
dec *msgpack.Decoder
141142
lenbuf [PacketLengthBytes]byte
143+
144+
lastStreamId uint32
142145
}
143146

144147
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -468,16 +471,27 @@ func (conn *Connection) dial() (err error) {
468471
return
469472
}
470473

471-
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res SchemaResolver) (err error) {
474+
func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32,
475+
req Request, streamId uint32, res SchemaResolver) (err error) {
472476
hl := h.Len()
473-
h.Write([]byte{
477+
478+
hMapLen := byte(0x82) // 2 element map.
479+
if streamId != ignoreStreamId {
480+
hMapLen = byte(0x83) // 3 element map.
481+
}
482+
hBytes := []byte{
474483
0xce, 0, 0, 0, 0, // Length.
475-
0x82, // 2 element map.
484+
hMapLen,
476485
KeyCode, byte(req.Code()), // Request code.
477486
KeySync, 0xce,
478487
byte(reqid >> 24), byte(reqid >> 16),
479488
byte(reqid >> 8), byte(reqid),
480-
})
489+
}
490+
if streamId != ignoreStreamId {
491+
hBytes = append(hBytes, KeyStreamId, byte(streamId))
492+
}
493+
494+
h.Write(hBytes)
481495

482496
if err = req.Body(res, enc); err != nil {
483497
return
@@ -495,7 +509,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
495509
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
496510
var packet smallWBuf
497511
req := newAuthRequest(conn.opts.User, string(scramble))
498-
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, conn.Schema)
512+
err = pack(&packet, msgpack.NewEncoder(&packet), 0, req, ignoreStreamId, conn.Schema)
499513

500514
if err != nil {
501515
return errors.New("auth: pack error " + err.Error())
@@ -785,16 +799,16 @@ func (conn *Connection) newFuture() (fut *Future) {
785799
return
786800
}
787801

788-
func (conn *Connection) send(req Request) *Future {
802+
func (conn *Connection) send(req Request, streamId uint32) *Future {
789803
fut := conn.newFuture()
790804
if fut.ready == nil {
791805
return fut
792806
}
793-
conn.putFuture(fut, req)
807+
conn.putFuture(fut, req, streamId)
794808
return fut
795809
}
796810

797-
func (conn *Connection) putFuture(fut *Future, req Request) {
811+
func (conn *Connection) putFuture(fut *Future, req Request, streamId uint32) {
798812
shardn := fut.requestId & (conn.opts.Concurrency - 1)
799813
shard := &conn.shard[shardn]
800814
shard.bufmut.Lock()
@@ -811,7 +825,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
811825
}
812826
blen := shard.buf.Len()
813827
reqid := fut.requestId
814-
if err := pack(&shard.buf, shard.enc, reqid, req, conn.Schema); err != nil {
828+
if err := pack(&shard.buf, shard.enc, reqid, req, streamId, conn.Schema); err != nil {
815829
shard.buf.Trunc(blen)
816830
shard.bufmut.Unlock()
817831
if f := conn.fetchFuture(reqid); f == fut {
@@ -993,7 +1007,7 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
9931007
// An error is returned if the request was formed incorrectly, or failed to
9941008
// create the future.
9951009
func (conn *Connection) Do(req Request) *Future {
996-
return conn.send(req)
1010+
return conn.send(req, ignoreStreamId)
9971011
}
9981012

9991013
// ConfiguredTimeout returns a timeout from connection config.
@@ -1009,3 +1023,16 @@ func (conn *Connection) OverrideSchema(s *Schema) {
10091023
conn.Schema = s
10101024
}
10111025
}
1026+
1027+
// NewStream creates new Stream object for connection.
1028+
//
1029+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1030+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1031+
// Since 1.7.0
1032+
func (conn *Connection) NewStream() *Stream {
1033+
atomic.AddUint32(&conn.lastStreamId, 1)
1034+
return &Stream{
1035+
Id: conn.lastStreamId,
1036+
Conn: conn,
1037+
}
1038+
}

connection_pool/connection_pool.go

+14
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,20 @@ func (connPool *ConnectionPool) Do(req tarantool.Request, userMode Mode) *tarant
534534
return conn.Do(req)
535535
}
536536

537+
// NewStream creates new Stream object for connection selected
538+
// by userMode from connPool.
539+
//
540+
// Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
541+
// To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
542+
// Since 1.7.0
543+
func (connPool *ConnectionPool) NewStream(userMode Mode) (*tarantool.Stream, error) {
544+
conn, err := connPool.getNextConnection(userMode)
545+
if err != nil {
546+
return nil, err
547+
}
548+
return conn.NewStream(), nil
549+
}
550+
537551
//
538552
// private
539553
//

0 commit comments

Comments
 (0)