Skip to content

Commit ecca6ff

Browse files
committed
feat - database schema & enqueue/dequeue logic
1 parent 9b0e4a2 commit ecca6ff

File tree

4 files changed

+1052
-8
lines changed

4 files changed

+1052
-8
lines changed

database/src/lib.rs

Lines changed: 267 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use chrono::{DateTime, Utc};
33
use hashbrown::HashMap;
44
use intern::intern;
55
use serde::{Deserialize, Serialize};
6-
use std::fmt;
6+
use std::fmt::{self, Display, Formatter};
77
use std::hash;
8-
use std::ops::{Add, Sub};
8+
use std::ops::{Add, Deref, Sub};
99
use std::sync::Arc;
1010
use std::time::Duration;
1111

@@ -155,6 +155,15 @@ impl FromStr for CommitType {
155155
}
156156
}
157157

158+
impl Display for CommitType {
159+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
160+
match self {
161+
CommitType::Try => f.write_str("try"),
162+
CommitType::Master => f.write_str("master"),
163+
}
164+
}
165+
}
166+
158167
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
159168
pub struct Commit {
160169
pub sha: String,
@@ -794,3 +803,259 @@ pub struct ArtifactCollection {
794803
pub duration: Duration,
795804
pub end_time: DateTime<Utc>,
796805
}
806+
807+
#[derive(Debug, Clone, Serialize, Deserialize)]
808+
pub enum CommitJobType {
809+
Try(u32),
810+
Master(u32),
811+
Release(String),
812+
}
813+
814+
#[derive(Debug, Clone, Serialize, Deserialize)]
815+
pub struct CommitJobEntity {
816+
pub sha: String,
817+
pub parent_sha: String,
818+
pub commit_time: Date,
819+
pub target: Target,
820+
pub include: Option<String>,
821+
pub exclude: Option<String>,
822+
pub runs: Option<i32>,
823+
pub backends: Option<String>,
824+
pub job_type: CommitJobType,
825+
}
826+
827+
#[derive(Debug, Clone, Serialize, Deserialize)]
828+
pub struct CommitJobInProgress {
829+
pub commit_job: CommitJobEntity,
830+
pub machine_id: String,
831+
pub started_at: Date,
832+
}
833+
834+
#[derive(Debug, Clone, Serialize, Deserialize)]
835+
pub struct CommitJobFinished {
836+
pub commit_job: CommitJobEntity,
837+
pub machine_id: String,
838+
pub started_at: Date,
839+
pub finished_at: Date,
840+
}
841+
842+
#[derive(Debug, Clone, Serialize, Deserialize)]
843+
pub struct CommitJobFailed {
844+
pub commit_job: CommitJobEntity,
845+
pub machine_id: String,
846+
pub started_at: Date,
847+
pub finished_at: Date,
848+
}
849+
850+
#[derive(Debug, Clone, Serialize, Deserialize)]
851+
pub enum CommitJob {
852+
Queued(CommitJobEntity),
853+
InProgress(CommitJobInProgress),
854+
Finished(CommitJobFinished),
855+
Failed(CommitJobFailed),
856+
}
857+
858+
impl CommitJob {
859+
/// Returns `Some(&CommitJobEntity)` only if the job is still queued.
860+
pub fn as_queued(&self) -> Option<&CommitJobEntity> {
861+
match self {
862+
CommitJob::Queued(e) => Some(e),
863+
_ => None,
864+
}
865+
}
866+
867+
/// Returns `Some(&CommitJobInProgress)` while the job is running.
868+
pub fn as_in_progress(&self) -> Option<&CommitJobInProgress> {
869+
match self {
870+
CommitJob::InProgress(ip) => Some(ip),
871+
_ => None,
872+
}
873+
}
874+
875+
/// Returns `Some(&CommitJobFinished)` once the job is done.
876+
pub fn as_finished(&self) -> Option<&CommitJobFinished> {
877+
match self {
878+
CommitJob::Finished(fin) => Some(fin),
879+
_ => None,
880+
}
881+
}
882+
883+
/// Get the status as a string
884+
pub fn status(&self) -> &'static str {
885+
match self {
886+
CommitJob::Queued(_) => "queued",
887+
CommitJob::InProgress(_) => "in_progress",
888+
CommitJob::Finished(_) => "finished",
889+
CommitJob::Failed(_) => "failed",
890+
}
891+
}
892+
893+
/// True when `status == "finished"`.
894+
pub fn is_finished(&self) -> bool {
895+
matches!(self, CommitJob::Finished(_))
896+
}
897+
898+
/// Will compose the column names for the job type
899+
pub fn get_enqueue_column_names(&self) -> Vec<String> {
900+
let mut base_columns = vec![
901+
String::from("sha"),
902+
String::from("parent_sha"),
903+
String::from("commit_type"),
904+
String::from("commit_time"),
905+
String::from("status"),
906+
String::from("target"),
907+
String::from("include"),
908+
String::from("exclude"),
909+
String::from("runs"),
910+
String::from("backends"),
911+
];
912+
913+
/* This is the last column */
914+
match self.job_type {
915+
CommitJobType::Try(_) => base_columns.push("pr".into()),
916+
CommitJobType::Master(_) => base_columns.push("pr".into()),
917+
CommitJobType::Release(_) => base_columns.push("release_tag".into()),
918+
};
919+
920+
base_columns
921+
}
922+
}
923+
924+
impl Deref for CommitJob {
925+
type Target = CommitJobEntity;
926+
fn deref(&self) -> &Self::Target {
927+
match self {
928+
CommitJob::Queued(e) => e,
929+
CommitJob::InProgress(ip) => &ip.commit_job,
930+
CommitJob::Finished(fin) => &fin.commit_job,
931+
CommitJob::Failed(fail) => &fail.commit_job,
932+
}
933+
}
934+
}
935+
936+
/// Maps from the database to a Rust struct
937+
#[allow(clippy::too_many_arguments)]
938+
fn commit_job_create(
939+
sha: String,
940+
parent_sha: String,
941+
commit_type: &str,
942+
pr: Option<u32>,
943+
release_tag: Option<String>,
944+
commit_time: Date,
945+
target: Target,
946+
machine_id: Option<String>,
947+
started_at: Option<Date>,
948+
finished_at: Option<Date>,
949+
status: &str,
950+
include: Option<String>,
951+
exclude: Option<String>,
952+
runs: Option<i32>,
953+
backends: Option<String>,
954+
) -> CommitJob {
955+
let job_type = match commit_type {
956+
"try" => CommitJobType::Try(pr.expect("`pr` cannot be `None` for a Commit of type `try`")),
957+
"master" => {
958+
CommitJobType::Master(pr.expect("`pr` cannot be `None` for a Commit of type `master`"))
959+
}
960+
"release" => CommitJobType::Release(
961+
release_tag.expect("`release_tag` cannot be `None` for a Commit of type `release`"),
962+
),
963+
_ => panic!("Unhandled commit_type {}", commit_type),
964+
};
965+
966+
let commit_job = CommitJobEntity {
967+
sha,
968+
parent_sha,
969+
commit_time,
970+
target,
971+
include,
972+
exclude,
973+
runs,
974+
backends,
975+
job_type,
976+
};
977+
978+
match status {
979+
"queued" => CommitJob::Queued(commit_job),
980+
981+
"in_progress" => {
982+
let started_at =
983+
started_at.expect("`started_at` must be Some for an `in_progress` job");
984+
let machine_id =
985+
machine_id.expect("`machine_id` must be Some for an `in_progress` job");
986+
987+
CommitJob::InProgress(CommitJobInProgress {
988+
commit_job,
989+
started_at,
990+
machine_id,
991+
})
992+
}
993+
994+
"finished" | "failed" => {
995+
let started_at =
996+
started_at.expect("`started_at` must be Some for finished or failed job");
997+
let finished_at =
998+
finished_at.expect("`finished_at` must be Some for finished or failed");
999+
let machine_id =
1000+
machine_id.expect("`machine_id` must be Some for finished or failed a job");
1001+
1002+
if status == "finished" {
1003+
CommitJob::Finished(CommitJobFinished {
1004+
commit_job,
1005+
started_at,
1006+
finished_at,
1007+
machine_id,
1008+
})
1009+
} else {
1010+
CommitJob::Failed(CommitJobFailed {
1011+
commit_job,
1012+
started_at,
1013+
finished_at,
1014+
machine_id,
1015+
})
1016+
}
1017+
}
1018+
1019+
other => {
1020+
panic!("unknown status `{other}` (expected `queued`, `in_progress`, `finished` or `failed`)")
1021+
}
1022+
}
1023+
}
1024+
1025+
pub struct CommitsByType<'a> {
1026+
pub r#try: Vec<(&'a CommitJob, u32)>,
1027+
pub master: Vec<(&'a CommitJob, u32)>,
1028+
pub release: Vec<(&'a CommitJob, String)>,
1029+
}
1030+
1031+
/// Given a vector of `CommitJobs` bucket them out into;
1032+
/// `try`, `master` and `release` (in that order)
1033+
pub fn split_queued_commit_jobs(commit_jobs: &[CommitJob]) -> CommitsByType<'_> {
1034+
// Split jobs by type as that determines what we enter into the database,
1035+
// `ToSql` is quite finiky about lifetimes. Moreover the column names
1036+
// change depending on the commit job type. `master` and `try` have
1037+
// a `pr` column whereas `release` has a `release_rag` column
1038+
let (try_commits, master_commits, release_commits) = commit_jobs.iter().fold(
1039+
(vec![], vec![], vec![]),
1040+
|(mut try_commits, mut master_commits, mut release_commits), job| {
1041+
let entity = job
1042+
.as_queued()
1043+
.expect("Can only enqueue jobs with a status of `queued`");
1044+
1045+
match &entity.job_type {
1046+
crate::CommitJobType::Try(pr) => try_commits.push((job, *pr)),
1047+
crate::CommitJobType::Master(pr) => master_commits.push((job, *pr)),
1048+
crate::CommitJobType::Release(release_tag) => {
1049+
release_commits.push((job, release_tag.clone()))
1050+
}
1051+
}
1052+
(try_commits, master_commits, release_commits)
1053+
},
1054+
);
1055+
1056+
CommitsByType {
1057+
r#try: try_commits,
1058+
master: master_commits,
1059+
release: release_commits,
1060+
}
1061+
}

database/src/pool.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
2-
ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CompileBenchmark, Target,
2+
ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark,
3+
Target,
34
};
45
use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step};
56
use chrono::{DateTime, Utc};
@@ -178,6 +179,16 @@ pub trait Connection: Send + Sync {
178179

179180
/// Removes all data associated with the given artifact.
180181
async fn purge_artifact(&self, aid: &ArtifactId);
182+
183+
/// Add a jobs to the queue
184+
async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]);
185+
186+
/// Dequeue jobs, we pass `machine_id` and `target` in case there are jobs
187+
/// the machine was previously doing and can pick up again
188+
async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option<CommitJob>;
189+
190+
/// Mark the job as finished
191+
async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool;
181192
}
182193

183194
#[async_trait::async_trait]

0 commit comments

Comments
 (0)