@@ -19,6 +19,7 @@ import (
19
19
)
20
20
21
21
const requestsMap = 128
22
+ const ignoreStreamId = 0
22
23
const (
23
24
connDisconnected = 0
24
25
connConnected = 1
@@ -139,6 +140,8 @@ type Connection struct {
139
140
state uint32
140
141
dec * msgpack.Decoder
141
142
lenbuf [PacketLengthBytes ]byte
143
+
144
+ lastStreamId uint32
142
145
}
143
146
144
147
var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -468,16 +471,27 @@ func (conn *Connection) dial() (err error) {
468
471
return
469
472
}
470
473
471
- func pack (h * smallWBuf , enc * msgpack.Encoder , reqid uint32 , req Request , res SchemaResolver ) (err error ) {
474
+ func pack (h * smallWBuf , enc * msgpack.Encoder , reqid uint32 ,
475
+ req Request , streamId uint32 , res SchemaResolver ) (err error ) {
472
476
hl := h .Len ()
473
- h .Write ([]byte {
477
+
478
+ hMapLen := byte (0x82 ) // 2 element map.
479
+ if streamId != ignoreStreamId {
480
+ hMapLen = byte (0x83 ) // 3 element map.
481
+ }
482
+ hBytes := []byte {
474
483
0xce , 0 , 0 , 0 , 0 , // Length.
475
- 0x82 , // 2 element map.
484
+ hMapLen ,
476
485
KeyCode , byte (req .Code ()), // Request code.
477
486
KeySync , 0xce ,
478
487
byte (reqid >> 24 ), byte (reqid >> 16 ),
479
488
byte (reqid >> 8 ), byte (reqid ),
480
- })
489
+ }
490
+ if streamId != ignoreStreamId {
491
+ hBytes = append (hBytes , KeyStreamId , byte (streamId ))
492
+ }
493
+
494
+ h .Write (hBytes )
481
495
482
496
if err = req .Body (res , enc ); err != nil {
483
497
return
@@ -495,7 +509,7 @@ func pack(h *smallWBuf, enc *msgpack.Encoder, reqid uint32, req Request, res Sch
495
509
func (conn * Connection ) writeAuthRequest (w * bufio.Writer , scramble []byte ) (err error ) {
496
510
var packet smallWBuf
497
511
req := newAuthRequest (conn .opts .User , string (scramble ))
498
- err = pack (& packet , msgpack .NewEncoder (& packet ), 0 , req , conn .Schema )
512
+ err = pack (& packet , msgpack .NewEncoder (& packet ), 0 , req , ignoreStreamId , conn .Schema )
499
513
500
514
if err != nil {
501
515
return errors .New ("auth: pack error " + err .Error ())
@@ -785,16 +799,16 @@ func (conn *Connection) newFuture() (fut *Future) {
785
799
return
786
800
}
787
801
788
- func (conn * Connection ) send (req Request ) * Future {
802
+ func (conn * Connection ) send (req Request , streamId uint32 ) * Future {
789
803
fut := conn .newFuture ()
790
804
if fut .ready == nil {
791
805
return fut
792
806
}
793
- conn .putFuture (fut , req )
807
+ conn .putFuture (fut , req , streamId )
794
808
return fut
795
809
}
796
810
797
- func (conn * Connection ) putFuture (fut * Future , req Request ) {
811
+ func (conn * Connection ) putFuture (fut * Future , req Request , streamId uint32 ) {
798
812
shardn := fut .requestId & (conn .opts .Concurrency - 1 )
799
813
shard := & conn .shard [shardn ]
800
814
shard .bufmut .Lock ()
@@ -811,7 +825,7 @@ func (conn *Connection) putFuture(fut *Future, req Request) {
811
825
}
812
826
blen := shard .buf .Len ()
813
827
reqid := fut .requestId
814
- if err := pack (& shard .buf , shard .enc , reqid , req , conn .Schema ); err != nil {
828
+ if err := pack (& shard .buf , shard .enc , reqid , req , streamId , conn .Schema ); err != nil {
815
829
shard .buf .Trunc (blen )
816
830
shard .bufmut .Unlock ()
817
831
if f := conn .fetchFuture (reqid ); f == fut {
@@ -993,7 +1007,7 @@ func (conn *Connection) nextRequestId() (requestId uint32) {
993
1007
// An error is returned if the request was formed incorrectly, or failed to
994
1008
// create the future.
995
1009
func (conn * Connection ) Do (req Request ) * Future {
996
- return conn .send (req )
1010
+ return conn .send (req , ignoreStreamId )
997
1011
}
998
1012
999
1013
// ConfiguredTimeout returns a timeout from connection config.
@@ -1009,3 +1023,16 @@ func (conn *Connection) OverrideSchema(s *Schema) {
1009
1023
conn .Schema = s
1010
1024
}
1011
1025
}
1026
+
1027
+ // NewStream creates new Stream object for connection.
1028
+ //
1029
+ // Since v. 2.10.0, Tarantool supports streams and interactive transactions over them.
1030
+ // To use interactive transactions, memtx_use_mvcc_engine box option should be set to true.
1031
+ // Since 1.7.0
1032
+ func (conn * Connection ) NewStream () * Stream {
1033
+ atomic .AddUint32 (& conn .lastStreamId , 1 )
1034
+ return & Stream {
1035
+ Id : conn .lastStreamId ,
1036
+ Conn : conn ,
1037
+ }
1038
+ }
0 commit comments