diff --git a/replication/row_event.go b/replication/row_event.go index 5f481e18c..3883ae1dc 100644 --- a/replication/row_event.go +++ b/replication/row_event.go @@ -905,6 +905,7 @@ type RowsEvent struct { // for mariadb *_COMPRESSED_EVENT_V1 compressed bool + // raw event type associated with a RowsEvent eventType EventType Table *TableMapEvent @@ -950,6 +951,29 @@ type RowsEvent struct { ignoreJSONDecodeErr bool } +// EnumRowsEventType is an abridged type describing the operation which triggered the given RowsEvent. +type EnumRowsEventType byte + +const ( + EnumRowsEventTypeUnknown = EnumRowsEventType(iota) + EnumRowsEventTypeInsert + EnumRowsEventTypeUpdate + EnumRowsEventTypeDelete +) + +func (t EnumRowsEventType) String() string { + switch t { + case EnumRowsEventTypeInsert: + return "insert" + case EnumRowsEventTypeUpdate: + return "update" + case EnumRowsEventTypeDelete: + return "delete" + default: + return fmt.Sprintf("unknown (%d)", t) + } +} + // EnumRowImageType is allowed types for every row in mysql binlog. // See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39 // enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI }; @@ -1120,6 +1144,19 @@ func (e *RowsEvent) Decode(data []byte) error { return e.DecodeData(pos, data) } +func (e *RowsEvent) Type() EnumRowsEventType { + switch e.eventType { + case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: + return EnumRowsEventTypeInsert + case UPDATE_ROWS_EVENTv0, UPDATE_ROWS_EVENTv1, UPDATE_ROWS_EVENTv2, MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1: + return EnumRowsEventTypeUpdate + case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: + return EnumRowsEventTypeDelete + default: + return EnumRowsEventTypeUnknown + } +} + func isBitSet(bitmap []byte, i int) bool { return bitmap[i>>3]&(1<<(uint(i)&7)) > 0 } @@ -1817,6 +1854,7 @@ func (e *RowsEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Flags: %d\n", e.Flags) fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount) fmt.Fprintf(w, "NDB data: %s\n", e.NdbData) + fmt.Fprintf(w, "Event type: %s (%s)", e.Type(), e.eventType) fmt.Fprintf(w, "Values:\n") for _, rows := range e.Rows { diff --git a/replication/row_event_test.go b/replication/row_event_test.go index ea3a5363e..7b004c50a 100644 --- a/replication/row_event_test.go +++ b/replication/row_event_test.go @@ -1176,6 +1176,37 @@ func TestRowsDataExtraData(t *testing.T) { } } +func TestRowsEventType(t *testing.T) { + testcases := []struct { + eventType EventType + want EnumRowsEventType + }{ + {WRITE_ROWS_EVENTv0, EnumRowsEventTypeInsert}, + {WRITE_ROWS_EVENTv1, EnumRowsEventTypeInsert}, + {WRITE_ROWS_EVENTv2, EnumRowsEventTypeInsert}, + {MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeInsert}, + {UPDATE_ROWS_EVENTv0, EnumRowsEventTypeUpdate}, + {UPDATE_ROWS_EVENTv1, EnumRowsEventTypeUpdate}, + {UPDATE_ROWS_EVENTv2, EnumRowsEventTypeUpdate}, + {MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeUpdate}, + {DELETE_ROWS_EVENTv0, EnumRowsEventTypeDelete}, + {DELETE_ROWS_EVENTv1, EnumRowsEventTypeDelete}, + {DELETE_ROWS_EVENTv2, EnumRowsEventTypeDelete}, + {MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeDelete}, + + // Whoops, these are not rows events at all + {EXEC_LOAD_EVENT, EnumRowsEventTypeUnknown}, + {HEARTBEAT_EVENT, EnumRowsEventTypeUnknown}, + } + + for _, tc := range testcases { + rev := new(RowsEvent) + rev.eventType = tc.eventType + + require.Equal(t, tc.want, rev.Type()) + } +} + func TestTableMapHelperMaps(t *testing.T) { /* CREATE TABLE `_types` ( diff --git a/replication/transaction_payload_event_test.go b/replication/transaction_payload_event_test.go index 829d4602a..1d517a11a 100644 --- a/replication/transaction_payload_event_test.go +++ b/replication/transaction_payload_event_test.go @@ -69,6 +69,8 @@ func TestTransactionPayloadEventDecode(t *testing.T) { } err := e.decodePayload() require.NoError(t, err) + + // Check raw events require.Len(t, e.Events, 8) require.Equal(t, QUERY_EVENT, e.Events[0].Header.EventType) require.Equal(t, TABLE_MAP_EVENT, e.Events[1].Header.EventType) @@ -78,4 +80,17 @@ func TestTransactionPayloadEventDecode(t *testing.T) { require.Equal(t, TABLE_MAP_EVENT, e.Events[5].Header.EventType) require.Equal(t, DELETE_ROWS_EVENTv2, e.Events[6].Header.EventType) require.Equal(t, XID_EVENT, e.Events[7].Header.EventType) + + // Check insert/update/delete rows events casting + ievent, ok := e.Events[2].Event.(*RowsEvent) + require.True(t, ok) + require.Equal(t, ievent.Type(), EnumRowsEventTypeInsert) + + uevent, ok := e.Events[4].Event.(*RowsEvent) + require.True(t, ok) + require.Equal(t, uevent.Type(), EnumRowsEventTypeUpdate) + + devent, ok := e.Events[6].Event.(*RowsEvent) + require.True(t, ok) + require.Equal(t, devent.Type(), EnumRowsEventTypeDelete) }