Skip to content

Commit c07223b

Browse files
committed
Auto merge of #2157 - jtgeibel:add-lock-to-update-downloads-job, r=sgrif
Ensure the update_downloads job doesn't run concurrently If multiple instances of this job are run concurrently then it is possible to overcount downloads, at least temporarily. The job first selects all matching `version_downloads` and later uses those values to calculate how many downloads to add to `versions` and `crates`. If a second job is run, it would select some rows from `version_downloads` that were already queued for processing by the first task. If an overcount were to occur, the next time the job is run it should calculate a negative adjustment and correct the situation. There's no point in doing extra work and if we eventually need concurrency we should built that out intentionally. Therefore, this commit wraps the entire job in a transaction and obtains an transaction level advisory lock from the database. If the lock has already been taken the job will fail and will be retried by swirl. If the duration of this job begins to approach the scheduling interval, then we will want to increase that interval to avoid triggering alerts.
2 parents c6d13eb + 0b03ae6 commit c07223b

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

src/tasks.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,8 @@ mod update_downloads;
33

44
pub use dump_db::dump_db;
55
pub use update_downloads::update_downloads;
6+
7+
use diesel::sql_types::BigInt;
8+
sql_function!(fn pg_try_advisory_xact_lock(key: BigInt) -> Bool);
9+
10+
const UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY: i64 = 1;

src/tasks/update_downloads.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use super::pg_try_advisory_xact_lock;
2+
use super::UPDATE_DOWNLOADS_ADVISORY_LOCK_KEY as LOCK_KEY;
13
use crate::{
24
background_jobs::Environment,
35
models::VersionDownload,
@@ -9,9 +11,17 @@ use swirl::PerformError;
911

1012
#[swirl::background_job]
1113
pub fn update_downloads(env: &Environment) -> Result<(), PerformError> {
14+
use diesel::select;
15+
1216
let conn = env.connection()?;
13-
update(&conn)?;
14-
Ok(())
17+
conn.transaction::<_, PerformError, _>(|| {
18+
// If this job runs concurrently with itself, it could result in a overcount
19+
if !select(pg_try_advisory_xact_lock(LOCK_KEY)).get_result(&*conn)? {
20+
return Err("The advisory lock for update_downloads is already taken".into());
21+
}
22+
23+
update(&conn).map_err(Into::into)
24+
})
1525
}
1626

1727
fn update(conn: &PgConnection) -> QueryResult<()> {

0 commit comments

Comments
 (0)