@@ -10,54 +10,42 @@ package mysql
10
10
11
11
import (
12
12
"io"
13
- "net"
14
- "time"
15
13
)
16
14
17
15
const defaultBufSize = 4096
18
16
const maxCachedBufSize = 256 * 1024
19
17
18
+ // readerFunc is a function that compatible with io.Reader.
19
+ // We use this function type instead of io.Reader because we want to
20
+ // just pass mc.readWithTimeout.
21
+ type readerFunc func ([]byte ) (int , error )
22
+
20
23
// A buffer which is used for both reading and writing.
21
24
// This is possible since communication on each connection is synchronous.
22
25
// In other words, we can't write and read simultaneously on the same connection.
23
26
// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
24
27
// Also highly optimized for this particular use case.
25
- // This buffer is backed by two byte slices in a double-buffering scheme
26
28
type buffer struct {
27
- buf []byte // buf is a byte buffer who's length and capacity are equal.
28
- nc net.Conn
29
- idx int
30
- length int
31
- timeout time.Duration
32
- dbuf [2 ][]byte // dbuf is an array with the two byte slices that back this buffer
33
- flipcnt uint // flipccnt is the current buffer counter for double-buffering
29
+ buf []byte // read buffer.
30
+ cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize.
34
31
}
35
32
36
33
// newBuffer allocates and returns a new buffer.
37
- func newBuffer (nc net.Conn ) buffer {
38
- fg := make ([]byte , defaultBufSize )
34
+ func newBuffer () buffer {
39
35
return buffer {
40
- buf : fg ,
41
- nc : nc ,
42
- dbuf : [2 ][]byte {fg , nil },
36
+ cachedBuf : make ([]byte , defaultBufSize ),
43
37
}
44
38
}
45
39
46
- // flip replaces the active buffer with the background buffer
47
- // this is a delayed flip that simply increases the buffer counter;
48
- // the actual flip will be performed the next time we call `buffer.fill`
49
- func (b * buffer ) flip () {
50
- b .flipcnt += 1
40
+ // busy returns true if the read buffer is not empty.
41
+ func (b * buffer ) busy () bool {
42
+ return len (b .buf ) > 0
51
43
}
52
44
53
- // fill reads into the buffer until at least _need_ bytes are in it
54
- func (b * buffer ) fill (need int ) error {
55
- n := b .length
56
- // fill data into its double-buffering target: if we've called
57
- // flip on this buffer, we'll be copying to the background buffer,
58
- // and then filling it with network data; otherwise we'll just move
59
- // the contents of the current buffer to the front before filling it
60
- dest := b .dbuf [b .flipcnt & 1 ]
45
+ // fill reads into the read buffer until at least _need_ bytes are in it.
46
+ func (b * buffer ) fill (need int , r readerFunc ) error {
47
+ // we'll move the contents of the current buffer to dest before filling it.
48
+ dest := b .cachedBuf
61
49
62
50
// grow buffer if necessary to fit the whole packet.
63
51
if need > len (dest ) {
@@ -67,83 +55,67 @@ func (b *buffer) fill(need int) error {
67
55
// if the allocated buffer is not too large, move it to backing storage
68
56
// to prevent extra allocations on applications that perform large reads
69
57
if len (dest ) <= maxCachedBufSize {
70
- b .dbuf [ b . flipcnt & 1 ] = dest
58
+ b .cachedBuf = dest
71
59
}
72
60
}
73
61
74
- // if we're filling the fg buffer, move the existing data to the start of it.
75
- // if we're filling the bg buffer, copy over the data
76
- if n > 0 {
77
- copy (dest [:n ], b .buf [b .idx :])
78
- }
79
-
80
- b .buf = dest
81
- b .idx = 0
62
+ // move the existing data to the start of the buffer.
63
+ n := len (b .buf )
64
+ copy (dest [:n ], b .buf )
82
65
83
66
for {
84
- if b .timeout > 0 {
85
- if err := b .nc .SetReadDeadline (time .Now ().Add (b .timeout )); err != nil {
86
- return err
87
- }
67
+ nn , err := r (dest [n :])
68
+ n += nn
69
+
70
+ if err == nil && n < need {
71
+ continue
88
72
}
89
73
90
- nn , err := b .nc .Read (b .buf [n :])
91
- n += nn
74
+ b .buf = dest [:n ]
92
75
93
- switch err {
94
- case nil :
76
+ if err == io .EOF {
95
77
if n < need {
96
- continue
78
+ err = io .ErrUnexpectedEOF
79
+ } else {
80
+ err = nil
97
81
}
98
- b .length = n
99
- return nil
100
-
101
- case io .EOF :
102
- if n >= need {
103
- b .length = n
104
- return nil
105
- }
106
- return io .ErrUnexpectedEOF
107
-
108
- default :
109
- return err
110
82
}
83
+ return err
111
84
}
112
85
}
113
86
114
87
// returns next N bytes from buffer.
115
88
// The returned slice is only guaranteed to be valid until the next read
116
- func (b * buffer ) readNext (need int ) ([]byte , error ) {
117
- if b . length < need {
89
+ func (b * buffer ) readNext (need int , r readerFunc ) ([]byte , error ) {
90
+ if len ( b . buf ) < need {
118
91
// refill
119
- if err := b .fill (need ); err != nil {
92
+ if err := b .fill (need , r ); err != nil {
120
93
return nil , err
121
94
}
122
95
}
123
96
124
- offset := b .idx
125
- b .idx += need
126
- b .length -= need
127
- return b .buf [offset :b .idx ], nil
97
+ data := b .buf [:need ]
98
+ b .buf = b .buf [need :]
99
+ return data , nil
128
100
}
129
101
130
102
// takeBuffer returns a buffer with the requested size.
131
103
// If possible, a slice from the existing buffer is returned.
132
104
// Otherwise a bigger buffer is made.
133
105
// Only one buffer (total) can be used at a time.
134
106
func (b * buffer ) takeBuffer (length int ) ([]byte , error ) {
135
- if b .length > 0 {
107
+ if b .busy () {
136
108
return nil , ErrBusyBuffer
137
109
}
138
110
139
111
// test (cheap) general case first
140
- if length <= cap (b .buf ) {
141
- return b .buf [:length ], nil
112
+ if length <= len (b .cachedBuf ) {
113
+ return b .cachedBuf [:length ], nil
142
114
}
143
115
144
- if length < maxPacketSize {
145
- b .buf = make ([]byte , length )
146
- return b .buf , nil
116
+ if length < maxCachedBufSize {
117
+ b .cachedBuf = make ([]byte , length )
118
+ return b .cachedBuf , nil
147
119
}
148
120
149
121
// buffer is larger than we want to store.
@@ -154,29 +126,26 @@ func (b *buffer) takeBuffer(length int) ([]byte, error) {
154
126
// known to be smaller than defaultBufSize.
155
127
// Only one buffer (total) can be used at a time.
156
128
func (b * buffer ) takeSmallBuffer (length int ) ([]byte , error ) {
157
- if b .length > 0 {
129
+ if b .busy () {
158
130
return nil , ErrBusyBuffer
159
131
}
160
- return b .buf [:length ], nil
132
+ return b .cachedBuf [:length ], nil
161
133
}
162
134
163
135
// takeCompleteBuffer returns the complete existing buffer.
164
136
// This can be used if the necessary buffer size is unknown.
165
137
// cap and len of the returned buffer will be equal.
166
138
// Only one buffer (total) can be used at a time.
167
139
func (b * buffer ) takeCompleteBuffer () ([]byte , error ) {
168
- if b .length > 0 {
140
+ if b .busy () {
169
141
return nil , ErrBusyBuffer
170
142
}
171
- return b .buf , nil
143
+ return b .cachedBuf , nil
172
144
}
173
145
174
146
// store stores buf, an updated buffer, if its suitable to do so.
175
- func (b * buffer ) store (buf []byte ) error {
176
- if b .length > 0 {
177
- return ErrBusyBuffer
178
- } else if cap (buf ) <= maxPacketSize && cap (buf ) > cap (b .buf ) {
179
- b .buf = buf [:cap (buf )]
147
+ func (b * buffer ) store (buf []byte ) {
148
+ if cap (buf ) <= maxCachedBufSize && cap (buf ) > cap (b .cachedBuf ) {
149
+ b .cachedBuf = buf [:cap (buf )]
180
150
}
181
- return nil
182
151
}
0 commit comments