Skip to content

Commit 9d962bf

Browse files
authored
Merge branch 'master' into auth-error
2 parents 7923fbe + d6f5ffa commit 9d962bf

File tree

8 files changed

+67
-12
lines changed

8 files changed

+67
-12
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,8 @@ err := conn.ExecuteSelectStreaming(`select id, name from table LIMIT 100500`, &r
237237
// Copy it if you need.
238238
// ...
239239
}
240-
return false, nil
241-
})
240+
return nil
241+
}, nil)
242242

243243
// ...
244244
```

client/conn.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ type Conn struct {
3939
// This function will be called for every row in resultset from ExecuteSelectStreaming.
4040
type SelectPerRowCallback func(row []FieldValue) error
4141

42+
// This function will be called once per result from ExecuteSelectStreaming
43+
type SelectPerResultCallback func(result *Result) error
44+
4245
func getNetProto(addr string) string {
4346
proto := "tcp"
4447
if strings.Contains(addr, "/") {
@@ -183,6 +186,7 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
183186

184187
// ExecuteSelectStreaming will call perRowCallback for every row in resultset
185188
// WITHOUT saving any row data to Result.{Values/RawPkg/RowDatas} fields.
189+
// When given, perResultCallback will be called once per result
186190
//
187191
// ExecuteSelectStreaming should be used only for SELECT queries with a large response resultset for memory preserving.
188192
//
@@ -193,14 +197,14 @@ func (c *Conn) Execute(command string, args ...interface{}) (*Result, error) {
193197
// // Use the row as you want.
194198
// // You must not save FieldValue.AsString() value after this callback is done. Copy it if you need.
195199
// return nil
196-
// })
200+
// }, nil)
197201
//
198-
func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback) error {
202+
func (c *Conn) ExecuteSelectStreaming(command string, result *Result, perRowCallback SelectPerRowCallback, perResultCallback SelectPerResultCallback) error {
199203
if err := c.writeCommandStr(COM_QUERY, command); err != nil {
200204
return errors.Trace(err)
201205
}
202206

203-
return c.readResultStreaming(false, result, perRowCallback)
207+
return c.readResultStreaming(false, result, perRowCallback, perResultCallback)
204208
}
205209

206210
func (c *Conn) Begin() error {

client/resp.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (c *Conn) readResult(binary bool) (*Result, error) {
235235
return c.readResultset(firstPkgBuf, binary)
236236
}
237237

238-
func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback) error {
238+
func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error {
239239
firstPkgBuf, err := c.ReadPacketReuseMem(utils.ByteSliceGet(16)[:0])
240240
defer utils.ByteSlicePut(firstPkgBuf)
241241

@@ -269,7 +269,7 @@ func (c *Conn) readResultStreaming(binary bool, result *Result, perRowCb SelectP
269269
return ErrMalformPacket
270270
}
271271

272-
return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb)
272+
return c.readResultsetStreaming(firstPkgBuf, binary, result, perRowCb, perResCb)
273273
}
274274

275275
func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) {
@@ -295,7 +295,7 @@ func (c *Conn) readResultset(data []byte, binary bool) (*Result, error) {
295295
return result, nil
296296
}
297297

298-
func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback) error {
298+
func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback) error {
299299
columnCount, _, n := LengthEncodedInt(data)
300300

301301
if n-len(data) != 0 {
@@ -309,14 +309,26 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
309309
result.Reset(int(columnCount))
310310
}
311311

312+
// this is a streaming resultset
313+
result.Resultset.Streaming = true
314+
312315
if err := c.readResultColumns(result); err != nil {
313316
return errors.Trace(err)
314317
}
315318

319+
if perResCb != nil {
320+
if err := perResCb(result); err != nil {
321+
return err
322+
}
323+
}
324+
316325
if err := c.readResultRowsStreaming(result, binary, perRowCb); err != nil {
317326
return errors.Trace(err)
318327
}
319328

329+
// this resultset is done streaming
330+
result.Resultset.StreamingDone = true
331+
320332
return nil
321333
}
322334

client/stmt.go

+8
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ func (s *Stmt) Execute(args ...interface{}) (*Result, error) {
3939
return s.conn.readResult(true)
4040
}
4141

42+
func (s *Stmt) ExecuteSelectStreaming(result *Result, perRowCb SelectPerRowCallback, perResCb SelectPerResultCallback, args ...interface{}) error {
43+
if err := s.write(args...); err != nil {
44+
return errors.Trace(err)
45+
}
46+
47+
return s.conn.readResultStreaming(true, result, perRowCb, perResCb)
48+
}
49+
4250
func (s *Stmt) Close() error {
4351
if err := s.conn.writeCommandUint32(COM_STMT_CLOSE, s.id); err != nil {
4452
return errors.Trace(err)

mysql/resultset.go

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type Resultset struct {
1717
RawPkg []byte
1818

1919
RowDatas []RowData
20+
21+
Streaming bool
22+
StreamingDone bool
2023
}
2124

2225
var (

mysql/resultset_helper.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/siddontang/go/hack"
99
)
1010

11-
func formatTextValue(value interface{}) ([]byte, error) {
11+
func FormatTextValue(value interface{}) ([]byte, error) {
1212
switch v := value.(type) {
1313
case int8:
1414
return strconv.AppendInt(nil, int64(v), 10), nil
@@ -165,7 +165,7 @@ func BuildSimpleTextResultset(names []string, values [][]interface{}) (*Resultse
165165
return nil, errors.Errorf("row types aren't consistent")
166166
}
167167
}
168-
b, err = formatTextValue(value)
168+
b, err = FormatTextValue(value)
169169

170170
if err != nil {
171171
return nil, errors.Trace(err)

server/command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (c *Conn) HandleCommand() error {
4444

4545
v := c.dispatch(data)
4646

47-
err = c.writeValue(v)
47+
err = c.WriteValue(v)
4848

4949
if c.Conn != nil {
5050
c.ResetSequence()

server/resp.go

+29-1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ func (c *Conn) writeAuthMoreDataFastAuth() error {
116116
}
117117

118118
func (c *Conn) writeResultset(r *Resultset) error {
119+
// for a streaming resultset, that handled rowdata separately in a callback
120+
// of type SelectPerRowCallback, we can suffice by ending the stream with
121+
// an EOF
122+
if r.StreamingDone {
123+
return c.writeEOF()
124+
}
125+
119126
columnLen := PutLengthEncodedInt(uint64(len(r.Fields)))
120127

121128
data := make([]byte, 4, 1024)
@@ -129,6 +136,12 @@ func (c *Conn) writeResultset(r *Resultset) error {
129136
return err
130137
}
131138

139+
// streaming resultsets handle rowdata in a separate callback of type
140+
// SelectPerRowCallback so we're done here
141+
if r.Streaming {
142+
return nil
143+
}
144+
132145
for _, v := range r.RowDatas {
133146
data = data[0:4]
134147
data = append(data, v...)
@@ -163,10 +176,23 @@ func (c *Conn) writeFieldList(fs []*Field, data []byte) error {
163176
return nil
164177
}
165178

179+
func (c *Conn) writeFieldValues(fv []FieldValue) error {
180+
data := make([]byte, 4, 1024)
181+
for _, v := range fv {
182+
tv, err := FormatTextValue(v.Value())
183+
if err != nil {
184+
return err
185+
}
186+
data = append(data, PutLengthEncodedString(tv)...)
187+
}
188+
189+
return c.WritePacket(data)
190+
}
191+
166192
type noResponse struct{}
167193
type eofResponse struct{}
168194

169-
func (c *Conn) writeValue(value interface{}) error {
195+
func (c *Conn) WriteValue(value interface{}) error {
170196
switch v := value.(type) {
171197
case noResponse:
172198
return nil
@@ -184,6 +210,8 @@ func (c *Conn) writeValue(value interface{}) error {
184210
}
185211
case []*Field:
186212
return c.writeFieldList(v, nil)
213+
case []FieldValue:
214+
return c.writeFieldValues(v)
187215
case *Stmt:
188216
return c.writePrepare(v)
189217
default:

0 commit comments

Comments
 (0)