Skip to content

Commit a73c034

Browse files
committed
Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues
Signed-off-by: Andrew Thornton <[email protected]>
1 parent 22572ec commit a73c034

File tree

4 files changed

+191
-26
lines changed

4 files changed

+191
-26
lines changed

modules/queue/queue_channel.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package queue
77
import (
88
"context"
99
"fmt"
10+
"sync/atomic"
11+
"time"
1012

1113
"code.gitea.io/gitea/modules/log"
1214
)
@@ -51,7 +53,6 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
5153
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
5254

5355
queue := &ChannelQueue{
54-
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
5556
shutdownCtx: shutdownCtx,
5657
shutdownCtxCancel: shutdownCtxCancel,
5758
terminateCtx: terminateCtx,
@@ -60,6 +61,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
6061
workers: config.Workers,
6162
name: config.Name,
6263
}
64+
queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data {
65+
unhandled := handle(data...)
66+
if len(unhandled) > 0 {
67+
// We can only pushback to the channel if we're paused.
68+
if queue.IsPaused() {
69+
atomic.AddInt64(&queue.numInQueue, int64(len(unhandled)))
70+
go func() {
71+
for _, datum := range data {
72+
queue.dataChan <- datum
73+
}
74+
}()
75+
return nil
76+
}
77+
}
78+
return unhandled
79+
}, config.WorkerPoolConfiguration)
80+
6381
queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
6482
return queue, nil
6583
}
@@ -81,6 +99,52 @@ func (q *ChannelQueue) Push(data Data) error {
8199
return nil
82100
}
83101

102+
// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
103+
func (q *ChannelQueue) Flush(timeout time.Duration) error {
104+
q.lock.Lock()
105+
paused := q.paused
106+
q.lock.Unlock()
107+
if paused {
108+
return nil
109+
}
110+
ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
111+
defer cancel()
112+
return q.FlushWithContext(ctx)
113+
}
114+
115+
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
116+
func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
117+
log.Trace("ChannelQueue: %d Flush", q.qid)
118+
for {
119+
q.lock.Lock()
120+
paused := q.paused
121+
q.lock.Unlock()
122+
if paused {
123+
return nil
124+
}
125+
select {
126+
case data := <-q.dataChan:
127+
if q.IsPaused() {
128+
// we're paused so we should push this back and stop
129+
// (whilst handle will check this too we need to stop the flusher for this to work.)
130+
go func() {
131+
q.dataChan <- data
132+
}()
133+
return nil
134+
} else if unhandled := q.handle(data); unhandled != nil {
135+
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
136+
}
137+
atomic.AddInt64(&q.numInQueue, -1)
138+
case <-q.baseCtx.Done():
139+
return q.baseCtx.Err()
140+
case <-ctx.Done():
141+
return ctx.Err()
142+
default:
143+
return nil
144+
}
145+
}
146+
}
147+
84148
// Shutdown processing from this queue
85149
func (q *ChannelQueue) Shutdown() {
86150
q.lock.Lock()
@@ -94,6 +158,7 @@ func (q *ChannelQueue) Shutdown() {
94158
log.Trace("ChannelQueue: %s Shutting down", q.name)
95159
go func() {
96160
log.Trace("ChannelQueue: %s Flushing", q.name)
161+
// We can't use Cleanup here because that will close the channel
97162
if err := q.FlushWithContext(q.terminateCtx); err != nil {
98163
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
99164
return

modules/queue/queue_disk_channel.go

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,20 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
5151
}
5252
config := configInterface.(PersistableChannelQueueConfiguration)
5353

54-
channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
54+
queue := &PersistableChannelQueue{
55+
closed: make(chan struct{}),
56+
}
57+
58+
wrappedHandle := func(data ...Data) (failed []Data) {
59+
for _, unhandled := range handle(data...) {
60+
if fail := queue.PushBack(unhandled); fail != nil {
61+
failed = append(failed, fail)
62+
}
63+
}
64+
return
65+
}
66+
67+
channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{
5568
WorkerPoolConfiguration: WorkerPoolConfiguration{
5669
QueueLength: config.QueueLength,
5770
BatchLength: config.BatchLength,
@@ -84,15 +97,12 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
8497
DataDir: config.DataDir,
8598
}
8699

87-
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
100+
levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
88101
if err == nil {
89-
queue := &PersistableChannelQueue{
90-
channelQueue: channelQueue.(*ChannelQueue),
91-
delayedStarter: delayedStarter{
92-
internal: levelQueue.(*LevelQueue),
93-
name: config.Name,
94-
},
95-
closed: make(chan struct{}),
102+
queue.channelQueue = channelQueue.(*ChannelQueue)
103+
queue.delayedStarter = delayedStarter{
104+
internal: levelQueue.(*LevelQueue),
105+
name: config.Name,
96106
}
97107
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
98108
return queue, nil
@@ -102,16 +112,13 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
102112
return nil, ErrInvalidConfiguration{cfg: cfg}
103113
}
104114

105-
queue := &PersistableChannelQueue{
106-
channelQueue: channelQueue.(*ChannelQueue),
107-
delayedStarter: delayedStarter{
108-
cfg: levelCfg,
109-
underlying: LevelQueueType,
110-
timeout: config.Timeout,
111-
maxAttempts: config.MaxAttempts,
112-
name: config.Name,
113-
},
114-
closed: make(chan struct{}),
115+
queue.channelQueue = channelQueue.(*ChannelQueue)
116+
queue.delayedStarter = delayedStarter{
117+
cfg: levelCfg,
118+
underlying: LevelQueueType,
119+
timeout: config.Timeout,
120+
maxAttempts: config.MaxAttempts,
121+
name: config.Name,
115122
}
116123
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
117124
return queue, nil
@@ -132,6 +139,19 @@ func (q *PersistableChannelQueue) Push(data Data) error {
132139
}
133140
}
134141

142+
// PushBack will push the indexer data to queue
143+
func (q *PersistableChannelQueue) PushBack(data Data) error {
144+
select {
145+
case <-q.closed:
146+
if pbr, ok := q.internal.(PushBackable); ok {
147+
return pbr.PushBack(data)
148+
}
149+
return q.internal.Push(data)
150+
default:
151+
return q.channelQueue.Push(data)
152+
}
153+
}
154+
135155
// Run starts to run the queue
136156
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
137157
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)

modules/queue/unique_queue_channel.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"context"
99
"fmt"
1010
"sync"
11+
"sync/atomic"
12+
"time"
1113

1214
"code.gitea.io/gitea/modules/log"
1315
)
@@ -69,7 +71,16 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
6971
delete(queue.table, datum)
7072
queue.lock.Unlock()
7173
if u := handle(datum); u != nil {
72-
unhandled = append(unhandled, u...)
74+
if queue.IsPaused() {
75+
// We can only pushback to the channel if we're paused.
76+
go func() {
77+
if err := queue.Push(u); err != nil {
78+
log.Error("Unable to push back to queue %d", queue.qid)
79+
}
80+
}()
81+
} else {
82+
unhandled = append(unhandled, u...)
83+
}
7384
}
7485
}
7586
return unhandled
@@ -131,6 +142,52 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
131142
return has, nil
132143
}
133144

145+
// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
146+
func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
147+
q.lock.Lock()
148+
paused := q.paused
149+
q.lock.Unlock()
150+
if paused {
151+
return nil
152+
}
153+
ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
154+
defer cancel()
155+
return q.FlushWithContext(ctx)
156+
}
157+
158+
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
159+
func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
160+
log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
161+
for {
162+
q.lock.Lock()
163+
paused := q.paused
164+
q.lock.Unlock()
165+
if paused {
166+
return nil
167+
}
168+
select {
169+
case data := <-q.dataChan:
170+
if q.IsPaused() {
171+
// we're paused so we should push this back and stop
172+
// (whilst handle will check this too we need to stop the flusher for this to work.)
173+
go func() {
174+
q.dataChan <- data
175+
}()
176+
return nil
177+
} else if unhandled := q.handle(data); unhandled != nil {
178+
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
179+
}
180+
atomic.AddInt64(&q.numInQueue, -1)
181+
case <-q.baseCtx.Done():
182+
return q.baseCtx.Err()
183+
case <-ctx.Done():
184+
return ctx.Err()
185+
default:
186+
return nil
187+
}
188+
}
189+
}
190+
134191
// Shutdown processing from this queue
135192
func (q *ChannelUniqueQueue) Shutdown() {
136193
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)

modules/queue/unique_queue_disk_channel.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,20 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
5151
}
5252
config := configInterface.(PersistableChannelUniqueQueueConfiguration)
5353

54-
channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{
54+
queue := &PersistableChannelUniqueQueue{
55+
closed: make(chan struct{}),
56+
}
57+
58+
wrappedHandle := func(data ...Data) (failed []Data) {
59+
for _, unhandled := range handle(data...) {
60+
if fail := queue.PushBack(unhandled); fail != nil {
61+
failed = append(failed, fail)
62+
}
63+
}
64+
return
65+
}
66+
67+
channelUniqueQueue, err := NewChannelUniqueQueue(wrappedHandle, ChannelUniqueQueueConfiguration{
5568
WorkerPoolConfiguration: WorkerPoolConfiguration{
5669
QueueLength: config.QueueLength,
5770
BatchLength: config.BatchLength,
@@ -84,10 +97,7 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
8497
DataDir: config.DataDir,
8598
}
8699

87-
queue := &PersistableChannelUniqueQueue{
88-
channelQueue: channelUniqueQueue.(*ChannelUniqueQueue),
89-
closed: make(chan struct{}),
90-
}
100+
queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
91101

92102
levelQueue, err := NewLevelUniqueQueue(func(data ...Data) []Data {
93103
for _, datum := range data {
@@ -143,6 +153,19 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err
143153
}
144154
}
145155

156+
// PushBack will push the indexer data to queue
157+
func (q *PersistableChannelUniqueQueue) PushBack(data Data) error {
158+
select {
159+
case <-q.closed:
160+
if pbr, ok := q.internal.(PushBackable); ok {
161+
return pbr.PushBack(data)
162+
}
163+
return q.internal.Push(data)
164+
default:
165+
return q.channelQueue.Push(data)
166+
}
167+
}
168+
146169
// Has will test if the queue has the data
147170
func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
148171
// This is more difficult...

0 commit comments

Comments
 (0)