Skip to content

Commit f62aac9

Browse files
committed
simplify shard choose + separate lock for buf and queue
1 parent ee19637 commit f62aac9

File tree

2 files changed

+81
-82
lines changed

2 files changed

+81
-82
lines changed

connection.go

+81-78
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,22 @@ import (
1515
)
1616

1717
const shards = 512
18-
const requestsMap = 16
18+
const requestsMap = 32
1919

2020
var epoch = time.Now()
2121

22+
type connShard struct {
23+
rmut sync.Mutex
24+
requests [requestsMap]*Future
25+
first *Future
26+
last **Future
27+
bufmut sync.Mutex
28+
buf smallWBuf
29+
enc *msgpack.Encoder
30+
bcache smallWBuf
31+
_pad [16]uint64
32+
}
33+
2234
type Connection struct {
2335
addr string
2436
c *net.TCPConn
@@ -28,18 +40,11 @@ type Connection struct {
2840
Schema *Schema
2941
requestId uint32
3042
Greeting *Greeting
31-
shard [shards]struct {
32-
sync.Mutex
33-
count uint32
34-
requests [requestsMap]*Future
35-
first *Future
36-
last **Future
37-
buf smallWBuf
38-
enc *msgpack.Encoder
39-
bcache smallWBuf
40-
}
43+
44+
shard [shards]connShard
45+
dirtyShard chan uint32
46+
4147
rlimit chan struct{}
42-
packets chan struct{}
4348
control chan struct{}
4449
opts Opts
4550
closed bool
@@ -62,14 +67,14 @@ type Opts struct {
6267
func Connect(addr string, opts Opts) (conn *Connection, err error) {
6368

6469
conn = &Connection{
65-
addr: addr,
66-
requestId: 0,
67-
Greeting: &Greeting{},
68-
rlimit: make(chan struct{}, 1024*1024),
69-
packets: make(chan struct{}, 1024*1024),
70-
control: make(chan struct{}),
71-
opts: opts,
72-
dec: msgpack.NewDecoder(&smallBuf{}),
70+
addr: addr,
71+
requestId: 0,
72+
Greeting: &Greeting{},
73+
rlimit: make(chan struct{}, 1024*1024),
74+
dirtyShard: make(chan uint32, shards),
75+
control: make(chan struct{}),
76+
opts: opts,
77+
dec: msgpack.NewDecoder(&smallBuf{}),
7378
}
7479
for i := range conn.shard {
7580
conn.shard[i].last = &conn.shard[i].first
@@ -277,13 +282,13 @@ func (conn *Connection) closeConnection(neterr error, r *bufio.Reader, w *bufio.
277282

278283
func (conn *Connection) lockShards() {
279284
for i := range conn.shard {
280-
conn.shard[i].Lock()
285+
conn.shard[i].rmut.Lock()
281286
}
282287
}
283288

284289
func (conn *Connection) unlockShards() {
285290
for i := range conn.shard {
286-
conn.shard[i].Unlock()
291+
conn.shard[i].rmut.Unlock()
287292
}
288293
}
289294

@@ -299,20 +304,20 @@ func (conn *Connection) closeConnectionForever(err error) error {
299304
func (conn *Connection) writer() {
300305
var w *bufio.Writer
301306
var err error
302-
var shardn int
307+
var shardn uint32
303308
Main:
304309
for !conn.closed {
305310
select {
306-
case <-conn.packets:
311+
case shardn = <-conn.dirtyShard:
307312
default:
308313
runtime.Gosched()
309-
if len(conn.packets) == 0 && w != nil {
314+
if len(conn.dirtyShard) == 0 && w != nil {
310315
if err := w.Flush(); err != nil {
311316
_, w, _ = conn.closeConnection(err, nil, w)
312317
}
313318
}
314319
select {
315-
case <-conn.packets:
320+
case shardn = <-conn.dirtyShard:
316321
case <-conn.control:
317322
return
318323
}
@@ -323,31 +328,18 @@ Main:
323328
return
324329
}
325330
}
326-
for stop := shardn + shards; shardn != stop; shardn++ {
327-
shard := &conn.shard[shardn&(shards-1)]
328-
if nreq := atomic.LoadUint32(&shard.count); nreq > 0 {
329-
shard.Lock()
330-
nreq, shard.count = shard.count, 0
331-
packet := shard.buf
332-
shard.buf = nil
333-
shard.Unlock()
334-
if err := write(w, packet); err != nil {
335-
_, w, _ = conn.closeConnection(err, nil, w)
336-
continue Main
337-
}
338-
shard.Lock()
339-
shard.bcache = packet[0:0]
340-
shard.Unlock()
341-
for ; nreq > 1; nreq-- {
342-
select {
343-
case <-conn.packets:
344-
default:
345-
break
346-
}
347-
}
348-
break
349-
}
331+
shard := &conn.shard[shardn]
332+
shard.bufmut.Lock()
333+
packet := shard.buf
334+
shard.buf = nil
335+
shard.bufmut.Unlock()
336+
if err := write(w, packet); err != nil {
337+
_, w, _ = conn.closeConnection(err, nil, w)
338+
continue Main
350339
}
340+
shard.bufmut.Lock()
341+
shard.bcache = packet[0:0]
342+
shard.bufmut.Unlock()
351343
}
352344
}
353345

@@ -385,24 +377,31 @@ func (conn *Connection) reader() {
385377
func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error) {
386378
shardn := fut.requestId & (shards - 1)
387379
shard := &conn.shard[shardn]
388-
shard.Lock()
389-
if conn.closed {
390-
shard.Unlock()
391-
fut.err = ClientError{ErrConnectionClosed, "using closed connection"}
392-
return
393-
}
380+
shard.bufmut.Lock()
381+
firstWritten := len(shard.buf) == 0
394382
if cap(shard.buf) == 0 {
395383
shard.buf, shard.bcache = shard.bcache, nil
396384
if cap(shard.buf) == 0 {
397385
shard.buf = make(smallWBuf, 0, 128)
398386
}
399387
shard.enc = msgpack.NewEncoder(&shard.buf)
400388
}
389+
blen := len(shard.buf)
401390
if err := fut.pack(&shard.buf, shard.enc, body); err != nil {
391+
shard.buf = shard.buf[:blen]
402392
fut.err = err
403-
shard.Unlock()
393+
shard.bufmut.Unlock()
394+
return
395+
}
396+
shard.rmut.Lock()
397+
if conn.closed {
398+
shard.buf = shard.buf[:blen]
399+
shard.bufmut.Unlock()
400+
shard.rmut.Unlock()
401+
fut.err = ClientError{ErrConnectionClosed, "using closed connection"}
404402
return
405403
}
404+
shard.bufmut.Unlock()
406405
pos := (fut.requestId / shards) & (requestsMap - 1)
407406
fut.next = shard.requests[pos]
408407
shard.requests[pos] = fut
@@ -412,17 +411,19 @@ func (conn *Connection) putFuture(fut *Future, body func(*msgpack.Encoder) error
412411
*shard.last = fut
413412
shard.last = &fut.time.next
414413
}
415-
atomic.AddUint32(&shard.count, 1)
416-
shard.Unlock()
414+
shard.rmut.Unlock()
415+
if firstWritten {
416+
conn.dirtyShard <- shardn
417+
}
417418
}
418419

419-
func (conn *Connection) unlinkFutureTime(shard uint32, fut *Future) {
420+
func (conn *Connection) unlinkFutureTime(fut *Future) {
420421
if fut.time.prev != nil {
421-
i := fut.requestId & (shards - 1)
422422
*fut.time.prev = fut.time.next
423423
if fut.time.next != nil {
424424
fut.time.next.time.prev = fut.time.prev
425425
} else {
426+
i := fut.requestId & (shards - 1)
426427
conn.shard[i].last = fut.time.prev
427428
}
428429
fut.time.next = nil
@@ -431,33 +432,34 @@ func (conn *Connection) unlinkFutureTime(shard uint32, fut *Future) {
431432
}
432433

433434
func (conn *Connection) fetchFuture(reqid uint32) (fut *Future) {
434-
conn.shard[reqid&(shards-1)].Lock()
435+
shard := &conn.shard[reqid&(shards-1)]
436+
shard.rmut.Lock()
435437
fut = conn.fetchFutureImp(reqid)
436-
conn.shard[reqid&(shards-1)].Unlock()
438+
shard.rmut.Unlock()
437439
return fut
438440
}
439441

440442
func (conn *Connection) fetchFutureImp(reqid uint32) *Future {
441-
shardn := reqid & (shards - 1)
442-
shard := &conn.shard[shardn]
443+
shard := &conn.shard[reqid&(shards-1)]
443444
pos := (reqid / shards) & (requestsMap - 1)
444445
fut := shard.requests[pos]
445446
if fut == nil {
446447
return nil
447448
}
448449
if fut.requestId == reqid {
449450
shard.requests[pos] = fut.next
450-
conn.unlinkFutureTime(shardn, fut)
451+
conn.unlinkFutureTime(fut)
451452
return fut
452453
}
453454
for fut.next != nil {
454-
if fut.next.requestId == reqid {
455-
fut, fut.next = fut.next, fut.next.next
456-
conn.unlinkFutureTime(shardn, fut)
455+
next := fut.next
456+
if next.requestId == reqid {
457+
fut, fut.next = next, next.next
457458
fut.next = nil
459+
conn.unlinkFutureTime(fut)
458460
return fut
459461
}
460-
fut = fut.next
462+
fut = next
461463
}
462464
return nil
463465
}
@@ -479,17 +481,18 @@ func (conn *Connection) timeouts() {
479481
}
480482
minNext := nowepoch + timeout
481483
for i := range conn.shard {
482-
conn.shard[i].Lock()
483-
for conn.shard[i].first != nil && conn.shard[i].first.timeout < nowepoch {
484-
fut := conn.shard[i].first
485-
conn.shard[i].Unlock()
484+
shard := &conn.shard[i]
485+
shard.rmut.Lock()
486+
for shard.first != nil && shard.first.timeout < nowepoch {
487+
fut := shard.first
488+
shard.rmut.Unlock()
486489
fut.timeouted()
487-
conn.shard[i].Lock()
490+
shard.rmut.Lock()
488491
}
489-
if conn.shard[i].first != nil && conn.shard[i].first.timeout < minNext {
490-
minNext = conn.shard[i].first.timeout
492+
if shard.first != nil && shard.first.timeout < minNext {
493+
minNext = shard.first.timeout
491494
}
492-
conn.shard[i].Unlock()
495+
shard.rmut.Unlock()
493496
}
494497
t.Reset(minNext - time.Now().Sub(epoch))
495498
}

request.go

-4
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ func (conn *Connection) EvalAsync(expr string, args []interface{}) *Future {
278278

279279
func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.Encoder) error) (err error) {
280280
rid := fut.requestId
281-
//h := make(smallWBuf, 0, 48)
282281
hl := len(*h)
283282
*h = append(*h, smallWBuf{
284283
0xce, 0, 0, 0, 0, // length
@@ -290,7 +289,6 @@ func (fut *Future) pack(h *smallWBuf, enc *msgpack.Encoder, body func(*msgpack.E
290289
}...)
291290

292291
if err = body(enc); err != nil {
293-
*h = (*h)[:hl]
294292
return
295293
}
296294

@@ -324,8 +322,6 @@ func (fut *Future) send(body func(*msgpack.Encoder) error) *Future {
324322
return fut
325323
}
326324

327-
fut.conn.packets <- struct{}{}
328-
329325
return fut
330326
}
331327

0 commit comments

Comments
 (0)