Skip to content

Commit 43aa9b9

Browse files
authored
Merge branch 'master' into tls-in-driver
2 parents 0da459f + e7478b1 commit 43aa9b9

21 files changed

+670
-48
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ jobs:
55
test:
66
strategy:
77
matrix:
8-
go: [ 1.16, 1.15 ]
8+
go: [ 1.18, 1.17, 1.16, 1.15 ]
99
name: Tests Go ${{ matrix.go }}
1010
runs-on: ubuntu-18.04
1111

client/auth.go

+26
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,20 @@ func (c *Conn) genAuthResponse(authData []byte) ([]byte, bool, error) {
139139
}
140140
}
141141

142+
// generate connection attributes data
143+
func (c *Conn) genAttributes() []byte {
144+
if len(c.attributes) == 0 {
145+
return nil
146+
}
147+
148+
attrData := make([]byte, 0)
149+
for k, v := range c.attributes {
150+
attrData = append(attrData, PutLengthEncodedString([]byte(k))...)
151+
attrData = append(attrData, PutLengthEncodedString([]byte(v))...)
152+
}
153+
return append(PutLengthEncodedInt(uint64(len(attrData))), attrData...)
154+
}
155+
142156
// See: http://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse
143157
func (c *Conn) writeAuthHandshake() error {
144158
if !authPluginAllowed(c.authPluginName) {
@@ -195,6 +209,12 @@ func (c *Conn) writeAuthHandshake() error {
195209
capability |= CLIENT_CONNECT_WITH_DB
196210
length += len(c.db) + 1
197211
}
212+
// connection attributes
213+
attrData := c.genAttributes()
214+
if len(attrData) > 0 {
215+
capability |= CLIENT_CONNECT_ATTRS
216+
length += len(attrData)
217+
}
198218

199219
data := make([]byte, length+4)
200220

@@ -264,6 +284,12 @@ func (c *Conn) writeAuthHandshake() error {
264284
// Assume native client during response
265285
pos += copy(data[pos:], c.authPluginName)
266286
data[pos] = 0x00
287+
pos++
288+
289+
// connection attributes
290+
if len(attrData) > 0 {
291+
copy(data[pos:], attrData)
292+
}
267293

268294
return c.WritePacket(data)
269295
}

client/auth_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package client
2+
3+
import (
4+
"bytes"
5+
"testing"
6+
7+
"github.com/go-mysql-org/go-mysql/mysql"
8+
)
9+
10+
func TestConnGenAttributes(t *testing.T) {
11+
c := &Conn{
12+
// example data from
13+
// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse41
14+
attributes: map[string]string{
15+
"_os": "debian6.0",
16+
"_client_name": "libmysql",
17+
"_pid": "22344",
18+
"_client_version": "5.6.6-m9",
19+
"_platform": "x86_64",
20+
"foo": "bar",
21+
},
22+
}
23+
24+
data := c.genAttributes()
25+
26+
// the order of the attributes map cannot be guaranteed so to test the content
27+
// of the attribute data we need to check its partial contents
28+
29+
if len(data) != 98 {
30+
t.Fatalf("unexpected data length, got %d", len(data))
31+
}
32+
if data[0] != 0x61 {
33+
t.Fatalf("unexpected length-encoded int, got %#x", data[0])
34+
}
35+
36+
for k, v := range c.attributes {
37+
fixt := append(mysql.PutLengthEncodedString([]byte(k)), mysql.PutLengthEncodedString([]byte(v))...)
38+
if !bytes.Contains(data, fixt) {
39+
t.Fatalf("%s attribute not found", k)
40+
}
41+
}
42+
}

client/conn.go

+72
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
. "github.com/go-mysql-org/go-mysql/mysql"
1212
"github.com/go-mysql-org/go-mysql/packet"
13+
"github.com/go-mysql-org/go-mysql/utils"
1314
"github.com/pingcap/errors"
1415
)
1516

@@ -27,6 +28,8 @@ type Conn struct {
2728
// client-set capabilities only
2829
ccaps uint32
2930

31+
attributes map[string]string
32+
3033
status uint16
3134

3235
charset string
@@ -43,6 +46,9 @@ type SelectPerRowCallback func(row []FieldValue) error
4346
// This function will be called once per result from ExecuteSelectStreaming
4447
type SelectPerResultCallback func(result *Result) error
4548

49+
// This function will be called once per result from ExecuteMultiple
50+
type ExecPerResultCallback func(result *Result, err error)
51+
4652
func getNetProto(addr string) string {
4753
proto := "tcp"
4854
if strings.Contains(addr, "/") {
@@ -198,6 +204,68 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
198204
}
199205
}
200206

207+
// ExecuteMultiple will call perResultCallback for every result of the multiple queries
208+
// that are executed.
209+
//
210+
// When ExecuteMultiple is used, the connection should have the SERVER_MORE_RESULTS_EXISTS
211+
// flag set to signal the server multiple queries are executed. Handling the responses
212+
// is up to the implementation of perResultCallback.
213+
//
214+
// Example:
215+
//
216+
// queries := "SELECT 1; SELECT NOW();"
217+
// conn.ExecuteMultiple(queries, func(result *mysql.Result, err error) {
218+
// // Use the result as you want
219+
// })
220+
//
221+
func (c *Conn) ExecuteMultiple(query string, perResultCallback ExecPerResultCallback) (*Result, error) {
222+
if err := c.writeCommandStr(COM_QUERY, query); err != nil {
223+
return nil, errors.Trace(err)
224+
}
225+
226+
var buf []byte
227+
var err error
228+
var result *Result
229+
defer utils.ByteSlicePut(buf)
230+
231+
for {
232+
buf, err = c.ReadPacketReuseMem(utils.ByteSliceGet(16)[:0])
233+
if err != nil {
234+
return nil, errors.Trace(err)
235+
}
236+
237+
switch buf[0] {
238+
case OK_HEADER:
239+
result, err = c.handleOKPacket(buf)
240+
case ERR_HEADER:
241+
err = c.handleErrorPacket(append([]byte{}, buf...))
242+
result = nil
243+
case LocalInFile_HEADER:
244+
err = ErrMalformPacket
245+
result = nil
246+
default:
247+
result, err = c.readResultset(buf, false)
248+
}
249+
250+
// call user-defined callback
251+
perResultCallback(result, err)
252+
253+
// if there was an error of this was the last result, stop looping
254+
if err != nil || result.Status&SERVER_MORE_RESULTS_EXISTS == 0 {
255+
break
256+
}
257+
}
258+
259+
// return an empty result(set) signaling we're done streaming a multiple
260+
// streaming session
261+
// if this would end up in WriteValue, it would just be ignored as all
262+
// responses should have been handled in perResultCallback
263+
return &Result{Resultset: &Resultset{
264+
Streaming: StreamingMultiple,
265+
StreamingDone: true,
266+
}}, nil
267+
}
268+
201269
// ExecuteSelectStreaming will call perRowCallback for every row in resultset
202270
// WITHOUT saving any row data to Result.{Values/RawPkg/RowDatas} fields.
203271
// When given, perResultCallback will be called once per result
@@ -236,6 +304,10 @@ func (c *Conn) Rollback() error {
236304
return errors.Trace(err)
237305
}
238306

307+
func (c *Conn) SetAttributes(attributes map[string]string) {
308+
c.attributes = attributes
309+
}
310+
239311
func (c *Conn) SetCharset(charset string) error {
240312
if c.charset == charset {
241313
return nil

client/conn_test.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"fmt"
5+
"strings"
56

67
. "github.com/pingcap/check"
78

@@ -16,7 +17,10 @@ type connTestSuite struct {
1617
func (s *connTestSuite) SetUpSuite(c *C) {
1718
var err error
1819
addr := fmt.Sprintf("%s:%s", *testHost, s.port)
19-
s.c, err = Connect(addr, *testUser, *testPassword, "")
20+
s.c, err = Connect(addr, *testUser, *testPassword, "", func(c *Conn) {
21+
// required for the ExecuteMultiple test
22+
c.SetCapability(mysql.CLIENT_MULTI_STATEMENTS)
23+
})
2024
if err != nil {
2125
c.Fatal(err)
2226
}
@@ -78,6 +82,46 @@ func (s *connTestSuite) testExecute_DropTable(c *C) {
7882
c.Assert(err, IsNil)
7983
}
8084

85+
func (s *connTestSuite) TestExecuteMultiple(c *C) {
86+
queries := []string{
87+
`INSERT INTO ` + testExecuteSelectStreamingTablename + ` (id, str) VALUES (999, "executemultiple")`,
88+
`SELECT id FROM ` + testExecuteSelectStreamingTablename + ` LIMIT 2`,
89+
`DELETE FROM ` + testExecuteSelectStreamingTablename + ` WHERE id=999`,
90+
`THIS IS BOGUS()`,
91+
}
92+
93+
count := 0
94+
result, err := s.c.ExecuteMultiple(strings.Join(queries, "; "), func(result *mysql.Result, err error) {
95+
switch count {
96+
// the INSERT/DELETE query have no resultset, but should have set affectedrows
97+
// the err should be nil
98+
// also, since this is not the last query, the SERVER_MORE_RESULTS_EXISTS
99+
// flag should be set
100+
case 0, 2:
101+
c.Assert(result.Status&mysql.SERVER_MORE_RESULTS_EXISTS, Not(Equals), 0)
102+
c.Assert(result.Resultset, IsNil)
103+
c.Assert(result.AffectedRows, Equals, uint64(1))
104+
c.Assert(err, IsNil)
105+
case 1:
106+
// the SELECT query should have an resultset
107+
// still not the last query, flag should be set
108+
c.Assert(result.Status&mysql.SERVER_MORE_RESULTS_EXISTS, Not(Equals), 0)
109+
c.Assert(result.Resultset, NotNil)
110+
c.Assert(err, IsNil)
111+
case 3:
112+
// this query is obviously bogus so the error should be non-nil
113+
c.Assert(result, IsNil)
114+
c.Assert(err, NotNil)
115+
}
116+
count++
117+
})
118+
119+
c.Assert(count, Equals, 4)
120+
c.Assert(err, IsNil)
121+
c.Assert(result.StreamingDone, Equals, true)
122+
c.Assert(result.Streaming, Equals, mysql.StreamingMultiple)
123+
}
124+
81125
func (s *connTestSuite) TestExecuteSelectStreaming(c *C) {
82126
var (
83127
expectedRowId int64

client/resp.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
309309
}
310310

311311
// this is a streaming resultset
312-
result.Resultset.Streaming = true
312+
result.Resultset.Streaming = StreamingSelect
313313

314314
if err := c.readResultColumns(result); err != nil {
315315
return errors.Trace(err)

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24
1515
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
1616
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07
17+
github.com/stretchr/testify v1.7.0
1718
go.uber.org/multierr v1.6.0 // indirect
1819
go.uber.org/zap v1.16.0 // indirect
1920
golang.org/x/text v0.3.6 // indirect

go.sum

+5-1
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@ github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8
5151
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
5252
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q=
5353
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
54+
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
5455
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
5556
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
56-
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
5757
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
58+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
59+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
5860
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
5961
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
6062
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
@@ -112,5 +114,7 @@ gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXL
112114
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
113115
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
114116
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
117+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
118+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
115119
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
116120
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=

0 commit comments

Comments
 (0)