Skip to content

Commit bf9f8ec

Browse files
authored
Merge branch 'main' into bugfix/avatar-align
2 parents 65aab92 + 787f6c3 commit bf9f8ec

File tree

4 files changed

+99
-32
lines changed

4 files changed

+99
-32
lines changed

models/webhook/hooktask.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,30 @@ func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask,
233233
return newTask, db.Insert(ctx, newTask)
234234
}
235235

236-
// FindUndeliveredHookTasks represents find the undelivered hook tasks
237-
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
238-
tasks := make([]*HookTask, 0, 10)
236+
// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
237+
func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
238+
const batchSize = 100
239+
240+
tasks := make([]int64, 0, batchSize)
239241
return tasks, db.GetEngine(ctx).
242+
Select("id").
243+
Table(new(HookTask)).
240244
Where("is_delivered=?", false).
245+
And("id > ?", lowerID).
246+
Asc("id").
247+
Limit(batchSize).
241248
Find(&tasks)
242249
}
243250

251+
func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
252+
count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{
253+
ID: task.ID,
254+
IsDelivered: true,
255+
})
256+
257+
return count != 0, err
258+
}
259+
244260
// CleanupHookTaskTable deletes rows from hook_task as needed.
245261
func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
246262
log.Trace("Doing: CleanupHookTaskTable")

services/webhook/deliver.go

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"code.gitea.io/gitea/modules/graceful"
2424
"code.gitea.io/gitea/modules/hostmatcher"
2525
"code.gitea.io/gitea/modules/log"
26+
"code.gitea.io/gitea/modules/process"
2627
"code.gitea.io/gitea/modules/proxy"
2728
"code.gitea.io/gitea/modules/queue"
2829
"code.gitea.io/gitea/modules/setting"
@@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
4344
return
4445
}
4546
// There was a panic whilst delivering a hook...
46-
log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
47+
log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2))
4748
}()
4849

4950
t.IsDelivered = true
@@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
5253

5354
switch w.HTTPMethod {
5455
case "":
55-
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID)
56+
log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL)
5657
fallthrough
5758
case http.MethodPost:
5859
switch w.ContentType {
@@ -78,14 +79,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
7879
case http.MethodGet:
7980
u, err := url.Parse(w.URL)
8081
if err != nil {
81-
return err
82+
return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err)
8283
}
8384
vals := u.Query()
8485
vals["payload"] = []string{t.PayloadContent}
8586
u.RawQuery = vals.Encode()
8687
req, err = http.NewRequest("GET", u.String(), nil)
8788
if err != nil {
88-
return err
89+
return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err)
8990
}
9091
case http.MethodPut:
9192
switch w.Type {
@@ -97,13 +98,13 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
9798
url := fmt.Sprintf("%s/%s", w.URL, url.PathEscape(txnID))
9899
req, err = http.NewRequest("PUT", url, strings.NewReader(t.PayloadContent))
99100
if err != nil {
100-
return err
101+
return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err)
101102
}
102103
default:
103-
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
104+
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
104105
}
105106
default:
106-
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod)
107+
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod)
107108
}
108109

109110
var signatureSHA1 string
@@ -159,6 +160,20 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
159160
Headers: map[string]string{},
160161
}
161162

163+
// OK We're now ready to attempt to deliver the task - we must double check that it
164+
// has not been delivered in the meantime
165+
updated, err := webhook_model.MarkTaskDelivered(ctx, t)
166+
if err != nil {
167+
log.Error("MarkTaskDelivered[%d]: %v", t.ID, err)
168+
return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err)
169+
}
170+
if !updated {
171+
// This webhook task has already been attempted to be delivered or is in the process of being delivered
172+
log.Trace("Webhook Task[%d] already delivered", t.ID)
173+
return nil
174+
}
175+
176+
// All code from this point will update the hook task
162177
defer func() {
163178
t.Delivered = time.Now().UnixNano()
164179
if t.IsSucceed {
@@ -190,13 +205,14 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
190205
}
191206

192207
if !w.IsActive {
208+
log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID)
193209
return nil
194210
}
195211

196212
resp, err := webhookHTTPClient.Do(req.WithContext(ctx))
197213
if err != nil {
198214
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err)
199-
return err
215+
return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err)
200216
}
201217
defer resp.Body.Close()
202218

@@ -210,7 +226,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
210226
p, err := io.ReadAll(resp.Body)
211227
if err != nil {
212228
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err)
213-
return err
229+
return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err)
214230
}
215231
t.ResponseInfo.Body = string(p)
216232
return nil
@@ -272,17 +288,37 @@ func Init() error {
272288
}
273289
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
274290

275-
tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext())
276-
if err != nil {
277-
log.Error("FindUndeliveredHookTasks failed: %v", err)
278-
return err
279-
}
291+
go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue)
292+
293+
return nil
294+
}
295+
296+
func populateWebhookSendingQueue(ctx context.Context) {
297+
ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue")
298+
defer finished()
280299

281-
for _, task := range tasks {
282-
if err := enqueueHookTask(task); err != nil {
283-
log.Error("enqueueHookTask failed: %v", err)
300+
lowerID := int64(0)
301+
for {
302+
taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID)
303+
if err != nil {
304+
log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err)
305+
return
306+
}
307+
if len(taskIDs) == 0 {
308+
return
309+
}
310+
lowerID = taskIDs[len(taskIDs)-1]
311+
312+
for _, taskID := range taskIDs {
313+
select {
314+
case <-ctx.Done():
315+
log.Warn("Shutdown before Webhook Sending queue finishing being populated")
316+
return
317+
default:
318+
}
319+
if err := enqueueHookTask(taskID); err != nil {
320+
log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err)
321+
}
284322
}
285323
}
286-
287-
return nil
288324
}

services/webhook/deliver_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"code.gitea.io/gitea/models/unittest"
1717
webhook_model "code.gitea.io/gitea/models/webhook"
1818
"code.gitea.io/gitea/modules/setting"
19+
api "code.gitea.io/gitea/modules/structs"
1920

2021
"github.com/stretchr/testify/assert"
2122
)
@@ -67,8 +68,15 @@ func TestWebhookDeliverAuthorizationHeader(t *testing.T) {
6768
err := hook.SetHeaderAuthorization("Bearer s3cr3t-t0ken")
6869
assert.NoError(t, err)
6970
assert.NoError(t, webhook_model.CreateWebhook(db.DefaultContext, hook))
71+
db.GetEngine(db.DefaultContext).NoAutoTime().DB().Logger.ShowSQL(true)
7072

71-
hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush}
73+
hookTask := &webhook_model.HookTask{HookID: hook.ID, EventType: webhook_model.HookEventPush, Payloader: &api.PushPayload{}}
74+
75+
hookTask, err = webhook_model.CreateHookTask(db.DefaultContext, hookTask)
76+
assert.NoError(t, err)
77+
if !assert.NotNil(t, hookTask) {
78+
return
79+
}
7280

7381
assert.NoError(t, Deliver(context.Background(), hookTask))
7482
select {

services/webhook/webhook.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,26 @@ func handle(data ...queue.Data) []queue.Data {
116116
for _, taskID := range data {
117117
task, err := webhook_model.GetHookTaskByID(ctx, taskID.(int64))
118118
if err != nil {
119-
log.Error("GetHookTaskByID failed: %v", err)
120-
} else {
121-
if err := Deliver(ctx, task); err != nil {
122-
log.Error("webhook.Deliver failed: %v", err)
123-
}
119+
log.Error("GetHookTaskByID[%d] failed: %v", taskID.(int64), err)
120+
continue
121+
}
122+
123+
if task.IsDelivered {
124+
// Already delivered in the meantime
125+
log.Trace("Task[%d] has already been delivered", task.ID)
126+
continue
127+
}
128+
129+
if err := Deliver(ctx, task); err != nil {
130+
log.Error("Unable to deliver webhook task[%d]: %v", task.ID, err)
124131
}
125132
}
126133

127134
return nil
128135
}
129136

130-
func enqueueHookTask(task *webhook_model.HookTask) error {
131-
err := hookQueue.PushFunc(task.ID, nil)
137+
func enqueueHookTask(taskID int64) error {
138+
err := hookQueue.Push(taskID)
132139
if err != nil && err != queue.ErrAlreadyInQueue {
133140
return err
134141
}
@@ -205,7 +212,7 @@ func PrepareWebhook(ctx context.Context, w *webhook_model.Webhook, event webhook
205212
return fmt.Errorf("CreateHookTask: %w", err)
206213
}
207214

208-
return enqueueHookTask(task)
215+
return enqueueHookTask(task.ID)
209216
}
210217

211218
// PrepareWebhooks adds new webhooks to task queue for given payload.
@@ -265,5 +272,5 @@ func ReplayHookTask(ctx context.Context, w *webhook_model.Webhook, uuid string)
265272
return err
266273
}
267274

268-
return enqueueHookTask(task)
275+
return enqueueHookTask(task.ID)
269276
}

0 commit comments

Comments
 (0)