Skip to content

Commit aa9b61c

Browse files
committed
Auto merge of #1804 - sgrif:sg-more-resilient-job-runner, r=jtgeibel
Make the job runner a bit more resilient to slow jobs or other errors A brief incident was caused by #1798. A band-aid fix is in place, and #1803 (included in this branch) makes it possible to apply similar band-aids in the future without requiring a rebuild of the code. This commit attempts to better address the root problem though. The short version (which is expanded on below, but not required to understand this commit or why it's needed) is that `update_downloads` takes longer than our job timeout to run. When we moved that task to a background job, we did not increase the number of concurrent jobs, nor did we increase the timeout. This meant that swirl timed out trying to start new jobs, and our behavior in that case was to crash the process. This would mean that `update_downloads` never completes, and remains at the front of the queue. This PR addresses all 3 of the problematic cases. - Increasing concurrency - When this system was added, the only jobs we had were index updates. These want to be serial, so we set the thread pool size to 1. We added readme renderings, which probably should have been parallel, but only happen with crate publishes anyway so it was fine. `update_downloads` *always* takes longer than the timeout to run though. We can't have it block everything else while it's running. The main downside to this is that index updates are no longer guaranteed to run in serial, which means that if two crates are uploaded simultaneously one job will fail and will have to wait for a retry to update the index. In theory if a crate happened to be uploaded at the exact instant of the retry 7 or 8 times in a row this could even result in getting paged. This is exceptionally unlikely, and I'm not concerned about it for now. As more features land in swirl we may want to move index updates to their own queue or tweak the retry behavior on that job though. Swirl will eventually handle this for us by default, and we should use its defaults once that lands. - Increasing the default timeout - 10s was a bit too aggressive. Fundamentally there is always a condition where we hit this timeout, and if the reason for hitting it is that we are receiving more jobs than we can process (either because of volume of jobs, or our jobs are too slow). The most common reason we would hit this is that all threads are occupied by a job which takes longer than the timeout to execute. Increasing the concurrency makes this less likely to occur since our jobs are low volume, but we were actually seeing this crash before the addition of `update_downloads` meaning that our other jobs are sometimes taking >10s to run. Increasing the concurrency beyond 2 would make it extremely unlikely we will ever hit this, but since we theoretically can with a burst of crate uploads at any concurrency, I've also upped the timeout. - Rebuild the runner a few times before crashing the process - This is the most important change, though it's the only one that wouldn't fix the problem by itself. The first two changes address why the problem occurred, this last change addresses why it placed us in an unrecoverable state. What would happen is we would time out trying to start another job after `update_downloads`, and then the process would crash. This would mean that `update_downloads` would never complete, so as soon as we restarted, we'd just try to run it again (I may also change swirl to increment the retry counter before even beginning to run the job, but there are issues with that which are out of scope for this commit to discuss). This commit changes the behavior to instead built a new runner (which means a new thread pool and DB pool) up to 5 times before crashing the process. This means that any spawned threads will get a bit more time to run before the process itself crashes, so any jobs clogging the runner still get a chance to complete. I've opted to have a hard limit on the number of failures in the runner to avoid potentially unbounded growth in DB connections. We do still want to eventually fail, since being unable to start jobs can indicate issues that are only solved by starting a new process or moving to another physical machine. More specific technical details on the issue that are not required to review this PR, but may be interesting -- I've written this issue up at sgrif/swirl#16 as well. The main entry point for a Swirl runner today is `run_all_pending_jobs`. This method is fairly low level. The intent is to eventually add a "reasonable defaults" binary shipped with swirl, probably somewhat based on what crates.io needs here. This method will run in a loop, attempting to fully saturate its thread pool on each iteration. It will check the number of availble threads, spawning that many tasks. Each task that is spawned will quickly communicate back to the coordinator via an mpsc channel. The coordinator keeps track of how many messages it's expecting (we get exactly 1 message per spawned task). If we aren't currently expecting any messages, and there are also 0 available threads, we will attempt to spawn 1 task no matter what. This is to ensure we don't loop forever waiting for a free thread, and respsect the given timeout. We do this in a loop until we hear from a thread that there was no job available, or receive an error (caused by a thread being unable to get a DB connection, an error loading the job from the DB [which should only happen if the DB has gone away], or if we time out waiting to hear back at all). That's exactly what happened in this case. We would see 1 available thread, spawn 1 task, and have 1 pending message. The worker would communicate back that it got a job. We'd loop. There are 0 available threads. We are expecting 0 messages, so we spawn 1 task anyway. We are now expecting 1 pending message. We block waiting for it. The only way we will receive a message is for the job we started in the first iteration to complete before the timeout. It doesn't, so `run_all_pending_jobs` returns an error. Our runner was calling `.expect` on that, so the process crashes. This shows several issues both in the configuration that was being used by crates.io, and also in Swirl itself. I discussed the configuration issues above, but there are also questions WRT Swirl's design. The first issue is whether this case should be separated from not getting a response from the worker at all. The latter should *never* happen under reasonable circumstances, so my gut is that we can assume if it does happen it was due to this case... The second issue is that this was put us in an unrecoverable state rather than causing one class of issues to fail to run. This could be prevented by increasing the retry counter outside of a transaction before running the job. This has issues though, which are out of scope for this commit, but basically boil down to introducing non-atomic pieces to an otherwise atomic operation.
2 parents f73b7a7 + ad2bfe7 commit aa9b61c

File tree

2 files changed

+57
-21
lines changed

2 files changed

+57
-21
lines changed

src/background_jobs.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::panic::AssertUnwindSafe;
2-
use std::sync::{Mutex, MutexGuard, PoisonError};
2+
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
33
use swirl::PerformError;
44

55
use crate::db::{DieselPool, DieselPooledConn};
@@ -21,14 +21,28 @@ impl swirl::db::DieselPool for DieselPool {
2121

2222
#[allow(missing_debug_implementations)]
2323
pub struct Environment {
24-
index: Mutex<Repository>,
24+
index: Arc<Mutex<Repository>>,
2525
pub credentials: Option<(String, String)>,
2626
// FIXME: https://github.com/sfackler/r2d2/pull/70
2727
pub connection_pool: AssertUnwindSafe<DieselPool>,
2828
pub uploader: Uploader,
2929
http_client: AssertUnwindSafe<reqwest::Client>,
3030
}
3131

32+
// FIXME: AssertUnwindSafe should be `Clone`, this can be replaced with
33+
// `#[derive(Clone)]` if that is fixed in the standard lib
34+
impl Clone for Environment {
35+
fn clone(&self) -> Self {
36+
Self {
37+
index: self.index.clone(),
38+
credentials: self.credentials.clone(),
39+
connection_pool: AssertUnwindSafe(self.connection_pool.0.clone()),
40+
uploader: self.uploader.clone(),
41+
http_client: AssertUnwindSafe(self.http_client.0.clone()),
42+
}
43+
}
44+
}
45+
3246
impl Environment {
3347
pub fn new(
3448
index: Repository,
@@ -38,7 +52,7 @@ impl Environment {
3852
http_client: reqwest::Client,
3953
) -> Self {
4054
Self {
41-
index: Mutex::new(index),
55+
index: Arc::new(Mutex::new(index)),
4256
credentials,
4357
connection_pool: AssertUnwindSafe(connection_pool),
4458
uploader,

src/bin/background-worker.rs

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1-
// Runs enqueued background jobs
2-
//
3-
// This binary will loop until interrupted. Every second, it will attempt to
4-
// run any jobs in the background queue. Panics if attempting to count
5-
// available jobs fails.
6-
//
7-
// Usage:
8-
// cargo run --bin background-worker
1+
//! Runs enqueued background jobs
2+
//!
3+
//! This binary will loop until interrupted. It will run all jobs in the
4+
//! background queue, sleeping for 1 second whenever the queue is empty. If we
5+
//! are unable to spawn workers to run jobs (either because we couldn't connect
6+
//! to the DB, an error occurred while loading, or we just never heard back from
7+
//! the worker thread), we will rebuild the runner and try again up to 5 times.
8+
//! After the 5th occurrance, we will panic.
9+
//!
10+
//! Usage:
11+
//! cargo run --bin background-worker
912
1013
#![deny(warnings, clippy::all, rust_2018_idioms)]
1114

@@ -20,8 +23,13 @@ fn main() {
2023

2124
let config = cargo_registry::Config::default();
2225

23-
// We're only using 1 thread, so we only need 2 connections
24-
let db_config = r2d2::Pool::builder().max_size(2);
26+
// 2x the thread pool size -- not all our jobs need a DB connection,
27+
// but we want to always be able to run our jobs in parallel, rather
28+
// than adjusting based on how many concurrent jobs need a connection.
29+
// Eventually swirl will do this for us, and this will be the default
30+
// -- we should just let it do a thread pool size of CPU count, and a
31+
// a connection pool size of 2x that when that lands.
32+
let db_config = r2d2::Pool::builder().max_size(4);
2533
let db_pool = db::diesel_pool(&config.db_url, config.env, db_config);
2634

2735
let username = dotenv::var("GIT_HTTP_USER");
@@ -32,7 +40,7 @@ fn main() {
3240
};
3341

3442
let job_start_timeout = dotenv::var("BACKGROUND_JOB_TIMEOUT")
35-
.unwrap_or_else(|_| "10".into())
43+
.unwrap_or_else(|_| "30".into())
3644
.parse()
3745
.expect("Invalid value for `BACKGROUND_JOB_TIMEOUT`");
3846

@@ -48,17 +56,31 @@ fn main() {
4856
reqwest::Client::new(),
4957
);
5058

51-
let runner = swirl::Runner::builder(db_pool, environment)
52-
.thread_count(1)
53-
.job_start_timeout(Duration::from_secs(job_start_timeout))
54-
.build();
59+
let build_runner = || {
60+
swirl::Runner::builder(db_pool.clone(), environment.clone())
61+
.thread_count(2)
62+
.job_start_timeout(Duration::from_secs(job_start_timeout))
63+
.build()
64+
};
65+
let mut runner = build_runner();
5566

5667
println!("Runner booted, running jobs");
5768

69+
let mut failure_count = 0;
70+
5871
loop {
59-
runner
60-
.run_all_pending_jobs()
61-
.expect("Could not begin running jobs");
72+
if let Err(e) = runner.run_all_pending_jobs() {
73+
failure_count += 1;
74+
if failure_count < 5 {
75+
eprintln!(
76+
"Error running jobs (n = {}) -- retrying: {:?}",
77+
failure_count, e,
78+
);
79+
runner = build_runner();
80+
} else {
81+
panic!("Failed to begin running jobs 5 times. Restarting the process");
82+
}
83+
}
6284
sleep(Duration::from_secs(1));
6385
}
6486
}

0 commit comments

Comments
 (0)