Skip to content

Commit 3240a5a

Browse files
committed
api: add events subscription support
A user can create watcher by the Connection.NewWatcher() call: watcher = conn.NewWatcker("key", func(event WatchEvent) { // The callback code. }) After that, the watcher callback is invoked for the first time. In this case, the callback is triggered whether or not the key has already been broadcast. All subsequent invocations are triggered with box.broadcast() called on the remote host. If a watcher is subscribed for a key that has not been broadcast yet, the callback is triggered only once, after the registration of the watcher. If the key is updated while the watcher callback is running, the callback will be invoked again with the latest value as soon as it returns. Multiple watchers can be created for one key. If you don’t need the watcher anymore, you can unregister it using the Unregister method: watcher.Unregister() The api is similar to net.box implementation [1]. It also adds a BroadcastRequest to make it easier to send broadcast messages. 1. https://www.tarantool.io/en/doc/latest/reference/reference_lua/net_box/#conn-watch Closes #119
1 parent 7d4b3cc commit 3240a5a

19 files changed

+1497
-57
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1010

1111
### Added
1212

13+
- Event subscription support (#119)
14+
1315
### Changed
1416

1517
### Fixed

connection.go

+279-8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ const (
5353
// LogUnexpectedResultId is logged when response with unknown id was received.
5454
// Most probably it is due to request timeout.
5555
LogUnexpectedResultId
56+
// LogReadWatchEventFailed is logged when failed to read a watch event.
57+
LogReadWatchEventFailed
5658
)
5759

5860
// ConnEvent is sent throw Notify channel specified in Opts.
@@ -62,6 +64,12 @@ type ConnEvent struct {
6264
When time.Time
6365
}
6466

67+
// A raw watch event.
68+
type connWatchEvent struct {
69+
key string
70+
value interface{}
71+
}
72+
6573
var epoch = time.Now()
6674

6775
// Logger is logger type expected to be passed in options.
@@ -83,6 +91,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
8391
case LogUnexpectedResultId:
8492
resp := v[0].(*Response)
8593
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response", conn.addr, resp.RequestId)
94+
case LogReadWatchEventFailed:
95+
err := v[0].(error)
96+
log.Printf("tarantool: unable to parse watch event: %s\n", err)
8697
default:
8798
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
8899
log.Print(args...)
@@ -146,6 +157,9 @@ type Connection struct {
146157
lenbuf [PacketLengthBytes]byte
147158

148159
lastStreamId uint64
160+
161+
// watchMap is a map of key -> watchSharedData.
162+
watchMap sync.Map
149163
}
150164

151165
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -502,7 +516,7 @@ func (conn *Connection) dial() (err error) {
502516
conn.Greeting.Version = bytes.NewBuffer(greeting[:64]).String()
503517
conn.Greeting.auth = bytes.NewBuffer(greeting[64:108]).String()
504518

505-
// Auth
519+
// Auth.
506520
if opts.User != "" {
507521
scr, err := scramble(conn.Greeting.auth, opts.Pass)
508522
if err != nil {
@@ -520,7 +534,20 @@ func (conn *Connection) dial() (err error) {
520534
}
521535
}
522536

523-
// Only if connected and authenticated.
537+
// Watchers.
538+
conn.watchMap.Range(func(key, value interface{}) bool {
539+
req := newWatchRequest(key.(string))
540+
if err = conn.writeRequest(w, req); err != nil {
541+
return false
542+
}
543+
return true
544+
})
545+
546+
if err != nil {
547+
return fmt.Errorf("unable to register watch: %w", err)
548+
}
549+
550+
// Only if connected and fully initialized.
524551
conn.lockShards()
525552
conn.c = connection
526553
atomic.StoreUint32(&conn.state, connConnected)
@@ -581,23 +608,33 @@ func pack(h *smallWBuf, enc *encoder, reqid uint32,
581608
return
582609
}
583610

584-
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
611+
func (conn *Connection) writeRequest(w *bufio.Writer, req Request) (err error) {
585612
var packet smallWBuf
586-
req := newAuthRequest(conn.opts.User, string(scramble))
587613
err = pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, conn.Schema)
588614

589615
if err != nil {
590-
return errors.New("auth: pack error " + err.Error())
616+
return fmt.Errorf("pack error %w", err)
591617
}
592618
if err := write(w, packet.b); err != nil {
593-
return errors.New("auth: write error " + err.Error())
619+
return fmt.Errorf("write error %w", err)
594620
}
595621
if err = w.Flush(); err != nil {
596-
return errors.New("auth: flush error " + err.Error())
622+
return fmt.Errorf("flush error %w", err)
597623
}
598624
return
599625
}
600626

627+
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
628+
req := newAuthRequest(conn.opts.User, string(scramble))
629+
630+
err = conn.writeRequest(w, req)
631+
if err != nil {
632+
return fmt.Errorf("auth: %w", err)
633+
}
634+
635+
return nil
636+
}
637+
601638
func (conn *Connection) readAuthResponse(r io.Reader) (err error) {
602639
respBytes, err := conn.read(r)
603640
if err != nil {
@@ -774,7 +811,50 @@ func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
774811
}
775812
}
776813

814+
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
815+
keyExist := false
816+
event := connWatchEvent{}
817+
d := newDecoder(reader)
818+
819+
if l, err := d.DecodeMapLen(); err == nil {
820+
for ; l > 0; l-- {
821+
if cd, err := d.DecodeInt(); err == nil {
822+
switch cd {
823+
case KeyEvent:
824+
if event.key, err = d.DecodeString(); err != nil {
825+
return event, err
826+
}
827+
keyExist = true
828+
case KeyEventData:
829+
if event.value, err = d.DecodeInterface(); err != nil {
830+
return event, err
831+
}
832+
default:
833+
if err = d.Skip(); err != nil {
834+
return event, err
835+
}
836+
}
837+
} else {
838+
return event, err
839+
}
840+
}
841+
} else {
842+
return event, err
843+
}
844+
845+
if !keyExist {
846+
return event, errors.New("watch event does not have a key")
847+
}
848+
849+
return event, nil
850+
}
851+
777852
func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
853+
events := make(chan connWatchEvent, 1024)
854+
defer close(events)
855+
856+
go conn.eventer(events)
857+
778858
for atomic.LoadUint32(&conn.state) != connClosed {
779859
respBytes, err := conn.read(r)
780860
if err != nil {
@@ -789,7 +869,14 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
789869
}
790870

791871
var fut *Future = nil
792-
if resp.Code == PushCode {
872+
if resp.Code == EventCode {
873+
if event, err := readWatchEvent(&resp.buf); err == nil {
874+
events <- event
875+
} else {
876+
conn.opts.Logger.Report(LogReadWatchEventFailed, conn, err)
877+
}
878+
continue
879+
} else if resp.Code == PushCode {
793880
if fut = conn.peekFuture(resp.RequestId); fut != nil {
794881
fut.AppendPush(resp)
795882
}
@@ -799,12 +886,41 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
799886
conn.markDone(fut)
800887
}
801888
}
889+
802890
if fut == nil {
803891
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
804892
}
805893
}
806894
}
807895

896+
// eventer goroutine gets watch events and updates values for watchers.
897+
func (conn *Connection) eventer(events <-chan connWatchEvent) {
898+
for {
899+
event, ok := <-events
900+
if !ok {
901+
// The channel is closed.
902+
break
903+
}
904+
905+
if value, ok := conn.watchMap.Load(event.key); ok {
906+
shared := value.(*watchSharedData)
907+
state := <-shared.st
908+
if state.changed != nil {
909+
close(state.changed)
910+
}
911+
shared.st <- watchState{event.value, false, nil}
912+
913+
if atomic.LoadUint32(&conn.state) == connConnected {
914+
shared.mu.Lock()
915+
if shared.cnt > 0 {
916+
conn.Do(newWatchRequest(event.key))
917+
}
918+
shared.mu.Unlock()
919+
}
920+
}
921+
}
922+
}
923+
808924
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
809925
fut = NewFuture()
810926
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
@@ -960,6 +1076,18 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
9601076
return
9611077
}
9621078
shard.bufmut.Unlock()
1079+
1080+
if req.Async() {
1081+
if fut = conn.fetchFuture(reqid); fut != nil {
1082+
resp := &Response{
1083+
RequestId: reqid,
1084+
Code: OkCode,
1085+
}
1086+
fut.SetResponse(resp)
1087+
conn.markDone(fut)
1088+
}
1089+
}
1090+
9631091
if firstWritten {
9641092
conn.dirtyShard <- shardn
9651093
}
@@ -1163,3 +1291,146 @@ func (conn *Connection) NewStream() (*Stream, error) {
11631291
Conn: conn,
11641292
}, nil
11651293
}
1294+
1295+
type watchState struct {
1296+
value interface{}
1297+
init bool
1298+
changed chan struct{}
1299+
}
1300+
1301+
// watchSharedData is a shared between watchers of some key.
1302+
type watchSharedData struct {
1303+
// st is the current state of the watcher. See the idea at page 70, 105:
1304+
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1305+
st chan watchState
1306+
1307+
// cnt is a number of active watchers.
1308+
cnt int32
1309+
// mu helps to send IPROTO_WATCH/IPROTO_UNWATCH without duplicates
1310+
// and intersections.
1311+
mu sync.Mutex
1312+
}
1313+
1314+
// connWatcher is an internal implementation of the Watcher interface.
1315+
type connWatcher struct {
1316+
shared *watchSharedData
1317+
unregister sync.Once
1318+
done chan struct{}
1319+
finished chan struct{}
1320+
}
1321+
1322+
// Unregister unregisters the connection watcher.
1323+
func (w *connWatcher) Unregister() {
1324+
w.unregister.Do(func() {
1325+
close(w.done)
1326+
})
1327+
<-w.finished
1328+
}
1329+
1330+
// NewWatcher creates a new Watcher object for the connection.
1331+
//
1332+
// After watcher creation, the watcher callback is invoked for the first time.
1333+
// In this case, the callback is triggered whether or not the key has already
1334+
// been broadcast. All subsequent invocations are triggered with
1335+
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1336+
// key that has not been broadcast yet, the callback is triggered only once,
1337+
// after the registration of the watcher.
1338+
//
1339+
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1340+
// callback is never executed in parallel with itself, but they can be executed
1341+
// in parallel to other watchers.
1342+
//
1343+
// If the key is updated while the watcher callback is running, the callback
1344+
// will be invoked again with the latest value as soon as it returns.
1345+
//
1346+
// Watchers survive reconnection. All registered watchers are automatically
1347+
// resubscribed when the connection is reestablished.
1348+
//
1349+
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1350+
// watcher’s destruction. In this case, the watcher remains registered. You
1351+
// need to call Unregister() directly.
1352+
//
1353+
// Unregister() guarantees that there will be no the watcher's callback calls
1354+
// after it, but Unregister() call from the callback leads to a deadlock.
1355+
//
1356+
// See:
1357+
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1358+
//
1359+
// Since 1.10.0
1360+
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1361+
// TODO: check required features after:
1362+
//
1363+
// https://github.com/tarantool/go-tarantool/issues/120
1364+
var shared *watchSharedData
1365+
// Get or create a shared data for the key.
1366+
if val, ok := conn.watchMap.Load(key); !ok {
1367+
shared = &watchSharedData{
1368+
st: make(chan watchState, 1),
1369+
cnt: 0,
1370+
}
1371+
shared.st <- watchState{nil, true, nil}
1372+
1373+
if val, ok := conn.watchMap.LoadOrStore(key, shared); ok {
1374+
shared = val.(*watchSharedData)
1375+
}
1376+
} else {
1377+
shared = val.(*watchSharedData)
1378+
}
1379+
1380+
// Send an initial watch request.
1381+
shared.mu.Lock()
1382+
if shared.cnt == 0 {
1383+
<-shared.st
1384+
shared.st <- watchState{nil, true, nil}
1385+
1386+
if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
1387+
shared.mu.Unlock()
1388+
return nil, err
1389+
}
1390+
}
1391+
shared.cnt += 1
1392+
shared.mu.Unlock()
1393+
1394+
// Start the watcher goroutine.
1395+
done := make(chan struct{})
1396+
finished := make(chan struct{})
1397+
1398+
go func() {
1399+
for {
1400+
state := <-shared.st
1401+
if state.changed == nil {
1402+
state.changed = make(chan struct{})
1403+
}
1404+
shared.st <- state
1405+
1406+
if !state.init {
1407+
callback(WatchEvent{
1408+
Conn: conn,
1409+
Key: key,
1410+
Value: state.value,
1411+
})
1412+
}
1413+
1414+
select {
1415+
case <-done:
1416+
shared.mu.Lock()
1417+
shared.cnt -= 1
1418+
if shared.cnt == 0 {
1419+
// A last one sends IPROTO_UNWATCH.
1420+
conn.Do(newUnwatchRequest(key))
1421+
}
1422+
shared.mu.Unlock()
1423+
1424+
close(finished)
1425+
return
1426+
case <-state.changed:
1427+
}
1428+
}
1429+
}()
1430+
1431+
return &connWatcher{
1432+
shared: shared,
1433+
done: done,
1434+
finished: finished,
1435+
}, nil
1436+
}

0 commit comments

Comments
 (0)