Skip to content

Commit 6e944e1

Browse files
authored
MariaDB Metadata skipping and DEPRECATE_EOF (#1708)
[MariaDB metadata skipping](https://mariadb.com/kb/en/mariadb-protocol-differences-with-mysql/#prepare-statement-skipping-metadata). With this change, MariaDB server won't send metadata when they have not changed, saving client parsing metadata and network. This feature rely on these changes: * extended capabilities support * EOF packet deprecation makes current implementation to be revised A benchmark BenchmarkReceiveMetadata has been added to show the difference.
1 parent 0fd55eb commit 6e944e1

8 files changed

+309
-147
lines changed

benchmark_test.go

+57-2
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func BenchmarkExec(b *testing.B) {
129129
b.ReportAllocs()
130130
b.ResetTimer()
131131

132-
for i := 0; i < concurrencyLevel; i++ {
132+
for i := 0; i < concurrencyLevel; i++ {
133133
go func() {
134134
for {
135135
if atomic.AddInt64(&remain, -1) < 0 {
@@ -400,7 +400,7 @@ func benchmark10kRows(b *testing.B, compress bool) {
400400
}
401401

402402
args := make([]any, 200)
403-
for i := 1; i < 200; i+=2 {
403+
for i := 1; i < 200; i += 2 {
404404
args[i] = sval
405405
}
406406
for i := 0; i < 10000; i += 100 {
@@ -455,3 +455,58 @@ func BenchmarkReceive10kRows(b *testing.B) {
455455
func BenchmarkReceive10kRowsCompressed(b *testing.B) {
456456
benchmark10kRows(b, true)
457457
}
458+
459+
// BenchmarkReceiveMetadata measures performance of receiving lots of metadata compare to data in rows
460+
func BenchmarkReceiveMetadata(b *testing.B) {
461+
tb := (*TB)(b)
462+
463+
// Create a table with 1000 integer fields
464+
createTableQuery := "CREATE TABLE large_integer_table ("
465+
for i := 0; i < 1000; i++ {
466+
createTableQuery += fmt.Sprintf("col_%d INT", i)
467+
if i < 999 {
468+
createTableQuery += ", "
469+
}
470+
}
471+
createTableQuery += ")"
472+
473+
// Initialize database
474+
db := initDB(b, false,
475+
"DROP TABLE IF EXISTS large_integer_table",
476+
createTableQuery,
477+
"INSERT INTO large_integer_table VALUES ("+
478+
strings.Repeat("0,", 999)+"0)", // Insert a row of zeros
479+
)
480+
defer db.Close()
481+
482+
b.Run("query", func(b *testing.B) {
483+
db.SetMaxIdleConns(0)
484+
db.SetMaxIdleConns(1)
485+
486+
// Create a slice to scan all columns
487+
values := make([]any, 1000)
488+
valuePtrs := make([]any, 1000)
489+
for j := range values {
490+
valuePtrs[j] = &values[j]
491+
}
492+
493+
b.ReportAllocs()
494+
b.ResetTimer()
495+
496+
// Prepare a SELECT query to retrieve metadata
497+
stmt := tb.checkStmt(db.Prepare("SELECT * FROM large_integer_table LIMIT 1"))
498+
defer stmt.Close()
499+
500+
// Benchmark metadata retrieval
501+
for range b.N {
502+
rows := tb.checkRows(stmt.Query())
503+
504+
rows.Next()
505+
// Scan the row
506+
err := rows.Scan(valuePtrs...)
507+
tb.check(err)
508+
509+
rows.Close()
510+
}
511+
})
512+
}

connection.go

+19-10
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ type mysqlConn struct {
3333
connector *connector
3434
maxAllowedPacket int
3535
maxWriteSize int
36-
flags clientFlag
36+
capabilities capabilityFlag
37+
extCapabilities extendedCapabilityFlag
3738
status statusFlag
3839
sequence uint8
3940
compressSequence uint8
@@ -223,13 +224,21 @@ func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) {
223224
columnCount, err := stmt.readPrepareResultPacket()
224225
if err == nil {
225226
if stmt.paramCount > 0 {
226-
if err = mc.readUntilEOF(); err != nil {
227+
if err = mc.skipColumns(stmt.paramCount); err != nil {
227228
return nil, err
228229
}
229230
}
230231

231232
if columnCount > 0 {
232-
err = mc.readUntilEOF()
233+
if mc.extCapabilities&clientCacheMetadata != 0 {
234+
if stmt.columns, err = mc.readColumns(int(columnCount)); err != nil {
235+
return nil, err
236+
}
237+
} else {
238+
if err = mc.skipColumns(int(columnCount)); err != nil {
239+
return nil, err
240+
}
241+
}
233242
}
234243
}
235244

@@ -370,19 +379,19 @@ func (mc *mysqlConn) exec(query string) error {
370379
}
371380

372381
// Read Result
373-
resLen, err := handleOk.readResultSetHeaderPacket()
382+
resLen, _, err := handleOk.readResultSetHeaderPacket()
374383
if err != nil {
375384
return err
376385
}
377386

378387
if resLen > 0 {
379388
// columns
380-
if err := mc.readUntilEOF(); err != nil {
389+
if err := mc.skipColumns(resLen); err != nil {
381390
return err
382391
}
383392

384393
// rows
385-
if err := mc.readUntilEOF(); err != nil {
394+
if err := mc.skipRows(); err != nil {
386395
return err
387396
}
388397
}
@@ -419,7 +428,7 @@ func (mc *mysqlConn) query(query string, args []driver.Value) (*textRows, error)
419428

420429
// Read Result
421430
var resLen int
422-
resLen, err = handleOk.readResultSetHeaderPacket()
431+
resLen, _, err = handleOk.readResultSetHeaderPacket()
423432
if err != nil {
424433
return nil, err
425434
}
@@ -453,22 +462,22 @@ func (mc *mysqlConn) getSystemVar(name string) ([]byte, error) {
453462
}
454463

455464
// Read Result
456-
resLen, err := handleOk.readResultSetHeaderPacket()
465+
resLen, _, err := handleOk.readResultSetHeaderPacket()
457466
if err == nil {
458467
rows := new(textRows)
459468
rows.mc = mc
460469
rows.rs.columns = []mysqlField{{fieldType: fieldTypeVarChar}}
461470

462471
if resLen > 0 {
463472
// Columns
464-
if err := mc.readUntilEOF(); err != nil {
473+
if err := mc.skipColumns(resLen); err != nil {
465474
return nil, err
466475
}
467476
}
468477

469478
dest := make([]driver.Value, resLen)
470479
if err = rows.readRow(dest); err == nil {
471-
return dest[0].([]byte), mc.readUntilEOF()
480+
return dest[0].([]byte), mc.skipRows()
472481
}
473482
}
474483
return nil, err

connector.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
131131
mc.buf = newBuffer()
132132

133133
// Reading Handshake Initialization Packet
134-
authData, plugin, err := mc.readHandshakePacket()
134+
authData, serverCapabilities, serverExtCapabilities, plugin, err := mc.readHandshakePacket()
135135
if err != nil {
136136
mc.cleanup()
137137
return nil, err
@@ -153,6 +153,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
153153
return nil, err
154154
}
155155
}
156+
mc.initCapabilities(serverCapabilities, serverExtCapabilities, mc.cfg)
156157
if err = mc.writeHandshakeResponsePacket(authResp, plugin); err != nil {
157158
mc.cleanup()
158159
return nil, err
@@ -167,7 +168,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
167168
return nil, err
168169
}
169170

170-
if mc.cfg.compress && mc.flags&clientCompress == clientCompress {
171+
// compression is enabled after auth, not right after sending handshake response.
172+
if mc.capabilities&clientCompress > 0 {
171173
mc.compress = true
172174
mc.compIO = newCompIO(mc)
173175
}

const.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@ const (
4242
iERR byte = 0xff
4343
)
4444

45-
// https://dev.mysql.com/doc/internals/en/capability-flags.html#packet-Protocol::CapabilityFlags
46-
type clientFlag uint32
45+
// https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__capabilities__flags.html
46+
// https://mariadb.com/kb/en/connection/#capabilities
47+
type capabilityFlag uint32
4748

4849
const (
49-
clientLongPassword clientFlag = 1 << iota
50+
clientMySQL capabilityFlag = 1 << iota
5051
clientFoundRows
5152
clientLongFlag
5253
clientConnectWithDB
@@ -73,6 +74,18 @@ const (
7374
clientDeprecateEOF
7475
)
7576

77+
// https://mariadb.com/kb/en/connection/#capabilities
78+
type extendedCapabilityFlag uint32
79+
80+
const (
81+
progressIndicator extendedCapabilityFlag = 1 << iota
82+
clientComMulti
83+
clientStmtBulkOperations
84+
clientExtendedMetadata
85+
clientCacheMetadata
86+
clientUnitBulkResult
87+
)
88+
7689
const (
7790
comQuit byte = iota + 1
7891
comInitDB

0 commit comments

Comments
 (0)