Skip to content

Commit 31d9f56

Browse files
goffrieConvex, Inc.
authored andcommitted
Thread prev_ts through database impls (#33277)
1. Merge DatabaseDocumentUpdate into DatabaseLogEntry, so that the same type is inserted into and queried from documents 2. Also modify `fn previous_revisions` to return DatabaseLogEntry 3. Thread prev_ts through all the SQL queries for sqlite/mysql/postgres. GitOrigin-RevId: 75040afced34435ee6915f571296acd0c18f6334
1 parent 7a87c93 commit 31d9f56

File tree

23 files changed

+551
-380
lines changed

23 files changed

+551
-380
lines changed

crates/common/src/persistence.rs

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ use crate::{
5050
},
5151
};
5252

53-
pub type DocumentLogEntry = (Timestamp, InternalDocumentId, Option<ResolvedDocument>);
53+
#[derive(Debug, Clone, PartialEq)]
54+
pub struct DocumentLogEntry {
55+
pub ts: Timestamp,
56+
pub id: InternalDocumentId,
57+
pub value: Option<ResolvedDocument>,
58+
pub prev_ts: Option<Timestamp>,
59+
}
5460

5561
pub type DocumentStream<'a> = BoxStream<'a, anyhow::Result<DocumentLogEntry>>;
5662

@@ -161,14 +167,6 @@ impl PersistenceGlobalKey {
161167
}
162168
}
163169

164-
#[derive(Debug, Clone, PartialEq)]
165-
pub struct DatabaseDocumentUpdate {
166-
pub ts: Timestamp,
167-
pub id: InternalDocumentId,
168-
pub value: Option<ResolvedDocument>,
169-
pub prev_ts: Option<Timestamp>,
170-
}
171-
172170
#[async_trait]
173171
pub trait Persistence: Sync + Send + 'static {
174172
/// Whether the persistence layer is freshely created or not.
@@ -179,7 +177,7 @@ pub trait Persistence: Sync + Send + 'static {
179177
/// Writes documents and the respective derived indexes.
180178
async fn write(
181179
&self,
182-
documents: Vec<DatabaseDocumentUpdate>,
180+
documents: Vec<DocumentLogEntry>,
183181
indexes: BTreeSet<(Timestamp, DatabaseIndexUpdate)>,
184182
conflict_strategy: ConflictStrategy,
185183
) -> anyhow::Result<()>;
@@ -338,7 +336,7 @@ pub trait PersistenceReader: Send + Sync + 'static {
338336
retention_validator: Arc<dyn RetentionValidator>,
339337
) -> DocumentStream<'_> {
340338
self.load_documents(range, order, page_size, retention_validator)
341-
.try_filter(move |(_, doc_id, _)| future::ready(doc_id.table() == tablet_id))
339+
.try_filter(move |doc| future::ready(doc.id.table() == tablet_id))
342340
.boxed()
343341
}
344342

@@ -352,9 +350,7 @@ pub trait PersistenceReader: Send + Sync + 'static {
352350
&self,
353351
ids: BTreeSet<(InternalDocumentId, Timestamp)>,
354352
retention_validator: Arc<dyn RetentionValidator>,
355-
) -> anyhow::Result<
356-
BTreeMap<(InternalDocumentId, Timestamp), (Timestamp, Option<ResolvedDocument>)>,
357-
>;
353+
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>>;
358354

359355
/// Loads documentIds with respective timestamps that match the
360356
/// index query criteria.
@@ -431,7 +427,7 @@ pub trait PersistenceReader: Send + Sync + 'static {
431427
let max_repeatable =
432428
self.get_persistence_global(PersistenceGlobalKey::MaxRepeatableTimestamp);
433429
let (max_committed, max_repeatable) = try_join!(stream.try_next(), max_repeatable)?;
434-
let max_committed_ts = max_committed.map(|(ts, ..)| ts);
430+
let max_committed_ts = max_committed.map(|entry| entry.ts);
435431
let max_repeatable_ts = max_repeatable.map(Timestamp::try_from).transpose()?;
436432
let max_ts = cmp::max(max_committed_ts, max_repeatable_ts); // note None < Some
437433
Ok(max_ts)
@@ -525,7 +521,7 @@ impl RepeatablePersistence {
525521
*DEFAULT_DOCUMENTS_PAGE_SIZE,
526522
self.retention_validator.clone(),
527523
);
528-
Box::pin(stream.try_filter(|(ts, ..)| future::ready(*ts <= *self.upper_bound)))
524+
Box::pin(stream.try_filter(|entry| future::ready(entry.ts <= *self.upper_bound)))
529525
}
530526

531527
/// Same as `load_documents` but doesn't use the `RetentionValidator` from
@@ -543,15 +539,13 @@ impl RepeatablePersistence {
543539
*DEFAULT_DOCUMENTS_PAGE_SIZE,
544540
retention_validator,
545541
);
546-
Box::pin(stream.try_filter(|(ts, ..)| future::ready(*ts <= *self.upper_bound)))
542+
Box::pin(stream.try_filter(|entry| future::ready(entry.ts <= *self.upper_bound)))
547543
}
548544

549545
pub async fn previous_revisions(
550546
&self,
551547
ids: BTreeSet<(InternalDocumentId, Timestamp)>,
552-
) -> anyhow::Result<
553-
BTreeMap<(InternalDocumentId, Timestamp), (Timestamp, Option<ResolvedDocument>)>,
554-
> {
548+
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>> {
555549
for (_, ts) in &ids {
556550
// Reading documents <ts, so ts-1 needs to be repeatable.
557551
anyhow::ensure!(*ts <= self.upper_bound.succ()?);
@@ -566,9 +560,7 @@ impl RepeatablePersistence {
566560
&self,
567561
ids: BTreeSet<(InternalDocumentId, Timestamp)>,
568562
retention_validator: Arc<dyn RetentionValidator>,
569-
) -> anyhow::Result<
570-
BTreeMap<(InternalDocumentId, Timestamp), (Timestamp, Option<ResolvedDocument>)>,
571-
> {
563+
) -> anyhow::Result<BTreeMap<(InternalDocumentId, Timestamp), DocumentLogEntry>> {
572564
for (_, ts) in &ids {
573565
// Reading documents <ts, so ts-1 needs to be repeatable.
574566
anyhow::ensure!(*ts <= self.upper_bound.succ()?);

crates/common/src/persistence_helpers.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use crate::{
99
comparators::AsComparator,
1010
document::ResolvedDocument,
1111
knobs::DOCUMENTS_IN_MEMORY,
12-
persistence::RepeatablePersistence,
12+
persistence::{
13+
DocumentLogEntry,
14+
RepeatablePersistence,
15+
},
1316
try_chunks::TryChunksExt,
1417
types::Timestamp,
1518
};
@@ -41,8 +44,7 @@ impl RevisionPair {
4144
}
4245
}
4346

44-
type RevisionStreamEntry =
45-
anyhow::Result<(Timestamp, InternalDocumentId, Option<ResolvedDocument>)>;
47+
type RevisionStreamEntry = anyhow::Result<DocumentLogEntry>;
4648

4749
#[allow(clippy::needless_lifetimes)]
4850
#[try_stream(ok = RevisionPair, error = anyhow::Error)]
@@ -54,16 +56,26 @@ pub async fn stream_revision_pairs<'a>(
5456
futures::pin_mut!(documents);
5557

5658
while let Some(read_chunk) = documents.try_next().await? {
57-
let ids = read_chunk.iter().map(|(ts, id, _)| (*id, *ts)).collect();
59+
// TODO: use prev_ts when it is available
60+
let ids = read_chunk
61+
.iter()
62+
.map(|entry| (entry.id, entry.ts))
63+
.collect();
5864
let mut prev_revs = reader.previous_revisions(ids).await?;
59-
for (ts, id, document) in read_chunk {
65+
for DocumentLogEntry {
66+
ts,
67+
id,
68+
value: document,
69+
..
70+
} in read_chunk
71+
{
6072
let rev = DocumentRevision { ts, document };
6173
let prev_rev =
6274
prev_revs
6375
.remove((&id, &ts).as_comparator())
64-
.map(|(prev_ts, prev_document)| DocumentRevision {
65-
ts: prev_ts,
66-
document: prev_document,
76+
.map(|entry| DocumentRevision {
77+
ts: entry.ts,
78+
document: entry.value,
6779
});
6880
yield RevisionPair { id, rev, prev_rev };
6981
}

0 commit comments

Comments
 (0)