Skip to content

Commit 6620388

Browse files
committed
api: add events subscription support
A user can create watcher by the Connection.NewWatcher() call: watcher = conn.NewWatcker("key", func(event WatchEvent) { // The callback code. }) After that, the watcher callback is invoked for the first time. In this case, the callback is triggered whether or not the key has already been broadcast. All subsequent invocations are triggered with box.broadcast() called on the remote host. If a watcher is subscribed for a key that has not been broadcast yet, the callback is triggered only once, after the registration of the watcher. If the key is updated while the watcher callback is running, the callback will be invoked again with the latest value as soon as it returns. Multiple watchers can be created for one key. If you don’t need the watcher anymore, you can unregister it using the Unregister method: watcher.Unregister() The api is similar to net.box implementation [1]. It also adds a BroadcastRequest to make it easier to send broadcast messages. 1. https://www.tarantool.io/en/doc/latest/reference/reference_lua/net_box/#conn-watch Closes #119
1 parent 1d69898 commit 6620388

21 files changed

+1575
-72
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1111
### Added
1212

1313
- Support iproto feature discovery (#120).
14+
- Event subscription support (#119)
1415

1516
### Changed
1617

connection.go

+291-3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ const (
5454
// LogUnexpectedResultId is logged when response with unknown id was received.
5555
// Most probably it is due to request timeout.
5656
LogUnexpectedResultId
57+
// LogReadWatchEventFailed is logged when failed to read a watch event.
58+
LogReadWatchEventFailed
5759
)
5860

5961
// ConnEvent is sent throw Notify channel specified in Opts.
@@ -63,6 +65,12 @@ type ConnEvent struct {
6365
When time.Time
6466
}
6567

68+
// A raw watch event.
69+
type connWatchEvent struct {
70+
key string
71+
value interface{}
72+
}
73+
6674
var epoch = time.Now()
6775

6876
// Logger is logger type expected to be passed in options.
@@ -84,6 +92,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
8492
case LogUnexpectedResultId:
8593
resp := v[0].(*Response)
8694
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response", conn.addr, resp.RequestId)
95+
case LogReadWatchEventFailed:
96+
err := v[0].(error)
97+
log.Printf("tarantool: unable to parse watch event: %s", err)
8798
default:
8899
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
89100
log.Print(args...)
@@ -149,6 +160,8 @@ type Connection struct {
149160
lastStreamId uint64
150161

151162
serverProtocolInfo ProtocolInfo
163+
// watchMap is a map of key -> watchSharedData.
164+
watchMap sync.Map
152165
}
153166

154167
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -531,7 +544,7 @@ func (conn *Connection) dial() (err error) {
531544
return fmt.Errorf("identify: %w", err)
532545
}
533546

534-
// Auth
547+
// Auth.
535548
if opts.User != "" {
536549
scr, err := scramble(conn.Greeting.auth, opts.Pass)
537550
if err != nil {
@@ -549,7 +562,38 @@ func (conn *Connection) dial() (err error) {
549562
}
550563
}
551564

552-
// Only if connected and authenticated.
565+
// Watchers.
566+
watchersChecked := false
567+
conn.watchMap.Range(func(key, value interface{}) bool {
568+
if !watchersChecked {
569+
required := ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}
570+
err = checkProtocolInfo(required, conn.ServerProtocolInfo())
571+
if err != nil {
572+
return false
573+
}
574+
watchersChecked = true
575+
}
576+
577+
st := value.(chan watchState)
578+
state := <-st
579+
if state.cnt > 0 {
580+
req := newWatchRequest(key.(string))
581+
if err = conn.writeRequest(w, req); err != nil {
582+
st <- state
583+
return false
584+
}
585+
state.init = true
586+
state.ack = true
587+
}
588+
st <- state
589+
return true
590+
})
591+
592+
if err != nil {
593+
return fmt.Errorf("unable to register watch: %w", err)
594+
}
595+
596+
// Only if connected and fully initialized.
553597
conn.lockShards()
554598
conn.c = connection
555599
atomic.StoreUint32(&conn.state, connConnected)
@@ -843,7 +887,52 @@ func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
843887
}
844888
}
845889

890+
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
891+
keyExist := false
892+
event := connWatchEvent{}
893+
d := newDecoder(reader)
894+
895+
l, err := d.DecodeMapLen()
896+
if err != nil {
897+
return event, err
898+
}
899+
900+
for ; l > 0; l-- {
901+
cd, err := d.DecodeInt()
902+
if err != nil {
903+
return event, err
904+
}
905+
906+
switch cd {
907+
case KeyEvent:
908+
if event.key, err = d.DecodeString(); err != nil {
909+
return event, err
910+
}
911+
keyExist = true
912+
case KeyEventData:
913+
if event.value, err = d.DecodeInterface(); err != nil {
914+
return event, err
915+
}
916+
default:
917+
if err = d.Skip(); err != nil {
918+
return event, err
919+
}
920+
}
921+
}
922+
923+
if !keyExist {
924+
return event, errors.New("watch event does not have a key")
925+
}
926+
927+
return event, nil
928+
}
929+
846930
func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
931+
events := make(chan connWatchEvent, 1024)
932+
defer close(events)
933+
934+
go conn.eventer(events)
935+
847936
for atomic.LoadUint32(&conn.state) != connClosed {
848937
respBytes, err := conn.read(r)
849938
if err != nil {
@@ -858,7 +947,14 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
858947
}
859948

860949
var fut *Future = nil
861-
if resp.Code == PushCode {
950+
if resp.Code == EventCode {
951+
if event, err := readWatchEvent(&resp.buf); err == nil {
952+
events <- event
953+
} else {
954+
conn.opts.Logger.Report(LogReadWatchEventFailed, conn, err)
955+
}
956+
continue
957+
} else if resp.Code == PushCode {
862958
if fut = conn.peekFuture(resp.RequestId); fut != nil {
863959
fut.AppendPush(resp)
864960
}
@@ -868,12 +964,37 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
868964
conn.markDone(fut)
869965
}
870966
}
967+
871968
if fut == nil {
872969
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
873970
}
874971
}
875972
}
876973

974+
// eventer goroutine gets watch events and updates values for watchers.
975+
func (conn *Connection) eventer(events <-chan connWatchEvent) {
976+
for {
977+
event, ok := <-events
978+
if !ok {
979+
// The channel is closed.
980+
break
981+
}
982+
983+
if value, ok := conn.watchMap.Load(event.key); ok {
984+
st := value.(chan watchState)
985+
state := <-st
986+
state.value = event.value
987+
state.init = false
988+
state.ack = false
989+
if state.changed != nil {
990+
close(state.changed)
991+
state.changed = nil
992+
}
993+
st <- state
994+
}
995+
}
996+
}
997+
877998
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
878999
fut = NewFuture()
8791000
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
@@ -1029,6 +1150,18 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
10291150
return
10301151
}
10311152
shard.bufmut.Unlock()
1153+
1154+
if req.Async() {
1155+
if fut = conn.fetchFuture(reqid); fut != nil {
1156+
resp := &Response{
1157+
RequestId: reqid,
1158+
Code: OkCode,
1159+
}
1160+
fut.SetResponse(resp)
1161+
conn.markDone(fut)
1162+
}
1163+
}
1164+
10321165
if firstWritten {
10331166
conn.dirtyShard <- shardn
10341167
}
@@ -1233,6 +1366,161 @@ func (conn *Connection) NewStream() (*Stream, error) {
12331366
}, nil
12341367
}
12351368

1369+
// watchState is the current state of the watcher. See the idea at p. 70, 105:
1370+
// https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view
1371+
type watchState struct {
1372+
// value is a current value.
1373+
value interface{}
1374+
// init is true if it is an initial state (no events received).
1375+
init bool
1376+
// ack true if the acknowledge is already sended.
1377+
ack bool
1378+
// cnt is a count of active watchers for the key.
1379+
cnt int
1380+
// changed is a channel for broadcast the value changes.
1381+
changed chan struct{}
1382+
}
1383+
1384+
// connWatcher is an internal implementation of the Watcher interface.
1385+
type connWatcher struct {
1386+
unregister sync.Once
1387+
done chan struct{}
1388+
finished chan struct{}
1389+
}
1390+
1391+
// Unregister unregisters the connection watcher.
1392+
func (w *connWatcher) Unregister() {
1393+
w.unregister.Do(func() {
1394+
close(w.done)
1395+
})
1396+
<-w.finished
1397+
}
1398+
1399+
// NewWatcher creates a new Watcher object for the connection.
1400+
//
1401+
// After watcher creation, the watcher callback is invoked for the first time.
1402+
// In this case, the callback is triggered whether or not the key has already
1403+
// been broadcast. All subsequent invocations are triggered with
1404+
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1405+
// key that has not been broadcast yet, the callback is triggered only once,
1406+
// after the registration of the watcher.
1407+
//
1408+
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1409+
// callback is never executed in parallel with itself, but they can be executed
1410+
// in parallel to other watchers.
1411+
//
1412+
// If the key is updated while the watcher callback is running, the callback
1413+
// will be invoked again with the latest value as soon as it returns.
1414+
//
1415+
// Watchers survive reconnection. All registered watchers are automatically
1416+
// resubscribed when the connection is reestablished.
1417+
//
1418+
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1419+
// watcher’s destruction. In this case, the watcher remains registered. You
1420+
// need to call Unregister() directly.
1421+
//
1422+
// Unregister() guarantees that there will be no the watcher's callback calls
1423+
// after it, but Unregister() call from the callback leads to a deadlock.
1424+
//
1425+
// See:
1426+
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1427+
//
1428+
// Since 1.10.0
1429+
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1430+
required := ProtocolInfo{Features: []ProtocolFeature{WatchersFeature}}
1431+
if err := checkProtocolInfo(required, conn.ServerProtocolInfo()); err != nil {
1432+
return nil, err
1433+
}
1434+
1435+
var st chan watchState
1436+
// Get or create a shared data for the key.
1437+
if val, ok := conn.watchMap.Load(key); !ok {
1438+
st = make(chan watchState, 1)
1439+
st <- watchState{
1440+
value: nil,
1441+
init: true,
1442+
ack: false,
1443+
cnt: 0,
1444+
changed: nil,
1445+
}
1446+
1447+
if val, ok := conn.watchMap.LoadOrStore(key, st); ok {
1448+
close(st)
1449+
st = val.(chan watchState)
1450+
}
1451+
} else {
1452+
st = val.(chan watchState)
1453+
}
1454+
1455+
state := <-st
1456+
// Send an initial watch request if needed.
1457+
if state.cnt == 0 {
1458+
if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
1459+
st <- state
1460+
return nil, err
1461+
}
1462+
state.init = true
1463+
state.ack = true
1464+
}
1465+
state.cnt += 1
1466+
st <- state
1467+
1468+
// Start the watcher goroutine.
1469+
done := make(chan struct{})
1470+
finished := make(chan struct{})
1471+
1472+
go func() {
1473+
for {
1474+
state := <-st
1475+
if state.changed == nil {
1476+
state.changed = make(chan struct{})
1477+
}
1478+
st <- state
1479+
1480+
if !state.init {
1481+
callback(WatchEvent{
1482+
Conn: conn,
1483+
Key: key,
1484+
Value: state.value,
1485+
})
1486+
1487+
// Acknowledge the notification.
1488+
state = <-st
1489+
ack := state.ack
1490+
state.ack = true
1491+
st <- state
1492+
1493+
if !ack {
1494+
conn.Do(newWatchRequest(key)).Get()
1495+
// We expect a reconnect and re-subscribe if it fails to
1496+
// send the watch request. So it looks ok do not check a
1497+
// result.
1498+
}
1499+
}
1500+
1501+
select {
1502+
case <-done:
1503+
state := <-st
1504+
state.cnt -= 1
1505+
if state.cnt == 0 {
1506+
// The last one sends IPROTO_UNWATCH.
1507+
conn.Do(newUnwatchRequest(key)).Get()
1508+
}
1509+
st <- state
1510+
1511+
close(finished)
1512+
return
1513+
case <-state.changed:
1514+
}
1515+
}
1516+
}()
1517+
1518+
return &connWatcher{
1519+
done: done,
1520+
finished: finished,
1521+
}, nil
1522+
}
1523+
12361524
// checkProtocolInfo checks that expected protocol version is
12371525
// and protocol features are supported.
12381526
func checkProtocolInfo(expected ProtocolInfo, actual ProtocolInfo) error {

0 commit comments

Comments
 (0)