-
-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Ensure that Webhook tasks are not double delivered #21558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
f964274
790ecde
88cc381
57f13c1
5267a15
6225c5d
c830c79
be23648
4982907
0cd4215
0e82d12
cc86689
0413d73
4b1351d
877d101
8dbfd89
165d18c
91ad108
fa0bba0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import ( | |
"code.gitea.io/gitea/modules/graceful" | ||
"code.gitea.io/gitea/modules/hostmatcher" | ||
"code.gitea.io/gitea/modules/log" | ||
"code.gitea.io/gitea/modules/process" | ||
"code.gitea.io/gitea/modules/proxy" | ||
"code.gitea.io/gitea/modules/queue" | ||
"code.gitea.io/gitea/modules/setting" | ||
|
@@ -43,7 +44,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { | |
return | ||
} | ||
// There was a panic whilst delivering a hook... | ||
log.Error("PANIC whilst trying to deliver webhook[%d] to %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) | ||
log.Error("PANIC whilst trying to deliver webhook task[%d] to webhook %s Panic: %v\nStacktrace: %s", t.ID, w.URL, err, log.Stack(2)) | ||
}() | ||
|
||
t.IsDelivered = true | ||
|
@@ -52,7 +53,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { | |
|
||
switch w.HTTPMethod { | ||
case "": | ||
log.Info("HTTP Method for webhook %d empty, setting to POST as default", t.ID) | ||
log.Info("HTTP Method for webhook %s empty, setting to POST as default", w.URL) | ||
fallthrough | ||
case http.MethodPost: | ||
switch w.ContentType { | ||
|
@@ -78,27 +79,27 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { | |
case http.MethodGet: | ||
u, err := url.Parse(w.URL) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("unable to deliver webhook task[%d] as cannot parse webhook url %s: %w", t.ID, w.URL, err) | ||
} | ||
vals := u.Query() | ||
vals["payload"] = []string{t.PayloadContent} | ||
u.RawQuery = vals.Encode() | ||
req, err = http.NewRequest("GET", u.String(), nil) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("unable to deliver webhook task[%d] as unable to create HTTP request for webhook url %s: %w", t.ID, w.URL, err) | ||
} | ||
case http.MethodPut: | ||
switch w.Type { | ||
case webhook_model.MATRIX: | ||
req, err = getMatrixHookRequest(w, t) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("unable to deliver webhook task[%d] as cannot create matrix request for webhook url %s: %w", t.ID, w.URL, err) | ||
} | ||
default: | ||
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) | ||
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) | ||
} | ||
default: | ||
return fmt.Errorf("invalid http method for webhook: [%d] %v", t.ID, w.HTTPMethod) | ||
return fmt.Errorf("invalid http method for webhook task[%d] in webhook %s: %v", t.ID, w.URL, w.HTTPMethod) | ||
} | ||
|
||
var signatureSHA1 string | ||
|
@@ -170,18 +171,32 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { | |
} | ||
}() | ||
|
||
// OK We're now ready to attempt to deliver the task - we must double check that it | ||
// has not been delivered in the meantime | ||
updated, err := webhook_model.MarkTaskDelivered(ctx, t) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we move these lines after line 194? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. We must do it as soon as the defer is in action. ACTUALLY - We must do it BEFORE the defer is in action. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. read the defer
zeripath marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
log.Error("MarkTaskDelivered[%d]: %v", err) | ||
return fmt.Errorf("unable to mark task[%d] delivered in the db: %w", t.ID, err) | ||
} | ||
if !updated { | ||
// This webhook task has already been attempted to be delivered or is in the process of being delivered | ||
log.Trace("Webhook Task[%d] already delivered", t.ID) | ||
return nil | ||
} | ||
|
||
if setting.DisableWebhooks { | ||
return fmt.Errorf("webhook task skipped (webhooks disabled): [%d]", t.ID) | ||
} | ||
|
||
if !w.IsActive { | ||
log.Trace("Webhook %s in Webhook Task[%d] is not active", w.URL, t.ID) | ||
return nil | ||
} | ||
|
||
resp, err := webhookHTTPClient.Do(req.WithContext(ctx)) | ||
if err != nil { | ||
t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) | ||
return err | ||
return fmt.Errorf("unable to deliver webhook task[%d] in %s due to error in http client: %w", t.ID, w.URL, err) | ||
} | ||
defer resp.Body.Close() | ||
|
||
|
@@ -195,7 +210,7 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error { | |
p, err := io.ReadAll(resp.Body) | ||
if err != nil { | ||
t.ResponseInfo.Body = fmt.Sprintf("read body: %s", err) | ||
return err | ||
return fmt.Errorf("unable to deliver webhook task[%d] in %s as unable to read response body: %w", t.ID, w.URL, err) | ||
} | ||
t.ResponseInfo.Body = string(p) | ||
return nil | ||
|
@@ -257,17 +272,37 @@ func Init() error { | |
} | ||
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run) | ||
|
||
tasks, err := webhook_model.FindUndeliveredHookTasks(graceful.GetManager().HammerContext()) | ||
if err != nil { | ||
log.Error("FindUndeliveredHookTasks failed: %v", err) | ||
return err | ||
} | ||
go graceful.GetManager().RunWithShutdownContext(populateWebhookSendingQueue) | ||
|
||
return nil | ||
} | ||
|
||
func populateWebhookSendingQueue(ctx context.Context) { | ||
ctx, _, finished := process.GetManager().AddContext(ctx, "Webhook: Populate sending queue") | ||
defer finished() | ||
|
||
for _, task := range tasks { | ||
if err := enqueueHookTask(task); err != nil { | ||
log.Error("enqueueHookTask failed: %v", err) | ||
lowerID := int64(0) | ||
for { | ||
lunny marked this conversation as resolved.
Show resolved
Hide resolved
|
||
taskIDs, err := webhook_model.FindUndeliveredHookTaskIDs(ctx, lowerID) | ||
if err != nil { | ||
log.Error("Unable to populate webhook queue as FindUndeliveredHookTaskIDs failed: %v", err) | ||
return | ||
} | ||
if len(taskIDs) == 0 { | ||
return | ||
} | ||
lowerID = taskIDs[len(taskIDs)-1] | ||
|
||
for _, taskID := range taskIDs { | ||
select { | ||
case <-ctx.Done(): | ||
log.Warn("Shutdown before Webhook Sending queue finishing being populated") | ||
return | ||
default: | ||
} | ||
if err := enqueueHookTask(taskID); err != nil { | ||
log.Error("Unable to push HookTask[%d] to the Webhook Sending queue: %v", taskID, err) | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} |
Uh oh!
There was an error while loading. Please reload this page.