Skip to content

Merging the latest merge into the previous merge #1077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# Unreleased
- [changed] Improved how Firestore handles idle queries to reduce the cost of
re-listening within 30 minutes.
- [changed] Improved offline performance with many outstanding writes.

# 0.6.0
- [fixed] Fixed an issue where queries returned fewer results than they should,
caused by documents that were cached as deleted when they should not have
been (firebase/firebase-ios-sdk#1548). Because some cache data is cleared,
Expand Down
14 changes: 1 addition & 13 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import {
MutationBatchState,
OnlineState,
OnlineStateSource,
ProtoByteString,
TargetId
} from './types';
import {
Expand Down Expand Up @@ -88,12 +87,6 @@ class QueryView {
* stream to identify this query.
*/
public targetId: TargetId,
/**
* An identifier from the datastore backend that indicates the last state
* of the results that was received. This can be used to indicate where
* to continue receiving new doc changes for the query.
*/
public resumeToken: ProtoByteString,
/**
* The view is responsible for computing the final merged truth of what
* docs are in the query. It gets notified of local and remote changes,
Expand Down Expand Up @@ -274,12 +267,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
'applyChanges for new view should always return a snapshot'
);

const data = new QueryView(
query,
queryData.targetId,
queryData.resumeToken,
view
);
const data = new QueryView(query, queryData.targetId, view);
this.queryViewsByQuery.set(query, data);
this.queryViewsByTarget[queryData.targetId] = data;
return viewChange.snapshot!;
Expand Down
122 changes: 86 additions & 36 deletions packages/firestore/src/local/indexeddb_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Timestamp } from '../api/timestamp';
import { User } from '../auth/user';
import { Query } from '../core/query';
import { BatchId, ProtoByteString } from '../core/types';
import { DocumentKeySet } from '../model/collections';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { BATCHID_UNKNOWN, MutationBatch } from '../model/mutation_batch';
Expand All @@ -40,8 +41,8 @@ import { LocalSerializer } from './local_serializer';
import { MutationQueue } from './mutation_queue';
import { PersistenceTransaction } from './persistence';
import { PersistencePromise } from './persistence_promise';
import { SimpleDb, SimpleDbStore } from './simple_db';
import { DocumentKeySet } from '../model/collections';
import { SimpleDbStore } from './simple_db';
import { IndexedDbPersistence } from './indexeddb_persistence';

/** A mutation queue for a specific user, backed by IndexedDB. */
export class IndexedDbMutationQueue implements MutationQueue {
Expand Down Expand Up @@ -342,6 +343,50 @@ export class IndexedDbMutationQueue implements MutationQueue {
.next(() => results);
}

getAllMutationBatchesAffectingDocumentKeys(
transaction: PersistenceTransaction,
documentKeys: DocumentKeySet
): PersistencePromise<MutationBatch[]> {
let uniqueBatchIDs = new SortedSet<BatchId>(primitiveComparator);

const promises: Array<PersistencePromise<void>> = [];
documentKeys.forEach(documentKey => {
const indexStart = DbDocumentMutation.prefixForPath(
this.userId,
documentKey.path
);
const range = IDBKeyRange.lowerBound(indexStart);

const promise = documentMutationsStore(transaction).iterate(
{ range },
(indexKey, _, control) => {
const [userID, encodedPath, batchID] = indexKey;

// Only consider rows matching exactly the specific key of
// interest. Note that because we order by path first, and we
// order terminators before path separators, we'll encounter all
// the index rows for documentKey contiguously. In particular, all
// the rows for documentKey will occur before any rows for
// documents nested in a subcollection beneath documentKey so we
// can stop as soon as we hit any such row.
const path = EncodedResourcePath.decode(encodedPath);
if (userID !== this.userId || !documentKey.path.isEqual(path)) {
control.done();
return;
}

uniqueBatchIDs = uniqueBatchIDs.add(batchID);
}
);

promises.push(promise);
});

return PersistencePromise.waitFor(promises).next(() =>
this.lookupMutationBatches(transaction, uniqueBatchIDs)
);
}

getAllMutationBatchesAffectingQuery(
transaction: PersistenceTransaction,
query: Query
Expand Down Expand Up @@ -393,34 +438,39 @@ export class IndexedDbMutationQueue implements MutationQueue {
}
uniqueBatchIDs = uniqueBatchIDs.add(batchID);
})
.next(() => {
const results: MutationBatch[] = [];
const promises: Array<PersistencePromise<void>> = [];
// TODO(rockwood): Implement this using iterate.
uniqueBatchIDs.forEach(batchId => {
promises.push(
mutationsStore(transaction)
.get(batchId)
.next(mutation => {
if (!mutation) {
fail(
'Dangling document-mutation reference found, ' +
'which points to ' +
batchId
);
}
assert(
mutation.userId === this.userId,
`Unexpected user '${
mutation.userId
}' for mutation batch ${batchId}`
);
results.push(this.serializer.fromDbMutationBatch(mutation!));
})
);
});
return PersistencePromise.waitFor(promises).next(() => results);
});
.next(() => this.lookupMutationBatches(transaction, uniqueBatchIDs));
}

private lookupMutationBatches(
transaction: PersistenceTransaction,
batchIDs: SortedSet<BatchId>
): PersistencePromise<MutationBatch[]> {
const results: MutationBatch[] = [];
const promises: Array<PersistencePromise<void>> = [];
// TODO(rockwood): Implement this using iterate.
batchIDs.forEach(batchId => {
promises.push(
mutationsStore(transaction)
.get(batchId)
.next(mutation => {
if (mutation === null) {
fail(
'Dangling document-mutation reference found, ' +
'which points to ' +
batchId
);
}
assert(
mutation.userId === this.userId,
`Unexpected user '${
mutation.userId
}' for mutation batch ${batchId}`
);
results.push(this.serializer.fromDbMutationBatch(mutation!));
})
);
});
return PersistencePromise.waitFor(promises).next(() => results);
}

removeMutationBatches(
Expand Down Expand Up @@ -567,7 +617,7 @@ function convertStreamToken(token: ProtoByteString): string {
function mutationsStore(
txn: PersistenceTransaction
): SimpleDbStore<DbMutationBatchKey, DbMutationBatch> {
return SimpleDb.getStore<DbMutationBatchKey, DbMutationBatch>(
return IndexedDbPersistence.getStore<DbMutationBatchKey, DbMutationBatch>(
txn,
DbMutationBatch.store
);
Expand All @@ -579,10 +629,10 @@ function mutationsStore(
function documentMutationsStore(
txn: PersistenceTransaction
): SimpleDbStore<DbDocumentMutationKey, DbDocumentMutation> {
return SimpleDb.getStore<DbDocumentMutationKey, DbDocumentMutation>(
txn,
DbDocumentMutation.store
);
return IndexedDbPersistence.getStore<
DbDocumentMutationKey,
DbDocumentMutation
>(txn, DbDocumentMutation.store);
}

/**
Expand All @@ -591,7 +641,7 @@ function documentMutationsStore(
function mutationQueuesStore(
txn: PersistenceTransaction
): SimpleDbStore<DbMutationQueueKey, DbMutationQueue> {
return SimpleDb.getStore<DbMutationQueueKey, DbMutationQueue>(
return IndexedDbPersistence.getStore<DbMutationQueueKey, DbMutationQueue>(
txn,
DbMutationQueue.store
);
Expand Down
89 changes: 57 additions & 32 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import { User } from '../auth/user';
import { DatabaseInfo } from '../core/database_info';
import { JsonProtoSerializer } from '../remote/serializer';
import { assert } from '../util/assert';
import { assert, fail } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import * as log from '../util/log';

Expand Down Expand Up @@ -83,6 +83,12 @@ const UNSUPPORTED_PLATFORM_ERROR_MSG =
// firestore_zombie_<persistence_prefix>_<instance_key>
const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie';

export class IndexedDbTransaction extends PersistenceTransaction {
constructor(readonly simpleDbTransaction: SimpleDbTransaction) {
super();
}
}

/**
* An IndexedDB-backed instance of Persistence. Data is stored persistently
* across sessions.
Expand Down Expand Up @@ -115,6 +121,17 @@ const ZOMBIED_CLIENTS_KEY_PREFIX = 'firestore_zombie';
* TODO(multitab): Update this comment with multi-tab changes.
*/
export class IndexedDbPersistence implements Persistence {
static getStore<Key extends IDBValidKey, Value>(
txn: PersistenceTransaction,
store: string
): SimpleDbStore<Key, Value> {
if (txn instanceof IndexedDbTransaction) {
return SimpleDb.getStore<Key, Value>(txn.simpleDbTransaction, store);
} else {
fail('IndexedDbPersistence must use instances of IndexedDbTransaction');
}
}

/**
* The name of the main (and currently only) IndexedDB database. this name is
* appended to the prefix provided to the IndexedDbPersistence constructor.
Expand Down Expand Up @@ -470,7 +487,7 @@ export class IndexedDbPersistence implements Persistence {
action: string,
requirePrimaryLease: boolean,
transactionOperation: (
transaction: PersistenceTransaction
transaction: IndexedDbTransaction
) => PersistencePromise<T>
): Promise<T> {
// TODO(multitab): Consider removing `requirePrimaryLease` and exposing
Expand All @@ -483,39 +500,47 @@ export class IndexedDbPersistence implements Persistence {

// Do all transactions as readwrite against all object stores, since we
// are the only reader/writer.
return this.simpleDb.runTransaction('readwrite', ALL_STORES, txn => {
if (requirePrimaryLease) {
// While we merely verify that we have (or can acquire) the lease
// immediately, we wait to extend the primary lease until after
// executing transactionOperation(). This ensures that even if the
// transactionOperation takes a long time, we'll use a recent
// leaseTimestampMs in the extended (or newly acquired) lease.
return this.canActAsPrimary(txn)
.next(canActAsPrimary => {
if (!canActAsPrimary) {
// TODO(multitab): Handle this gracefully and transition back to
// secondary state.
log.error(
`Failed to obtain primary lease for action '${action}'.`
return this.simpleDb.runTransaction(
'readwrite',
ALL_STORES,
simpleDbTxn => {
if (requirePrimaryLease) {
// While we merely verify that we have (or can acquire) the lease
// immediately, we wait to extend the primary lease until after
// executing transactionOperation(). This ensures that even if the
// transactionOperation takes a long time, we'll use a recent
// leaseTimestampMs in the extended (or newly acquired) lease.
return this.canActAsPrimary(simpleDbTxn)
.next(canActAsPrimary => {
if (!canActAsPrimary) {
// TODO(multitab): Handle this gracefully and transition back to
// secondary state.
log.error(
`Failed to obtain primary lease for action '${action}'.`
);
this.isPrimary = false;
this.queue.enqueue(() => this.primaryStateListener(false));
throw new FirestoreError(
Code.FAILED_PRECONDITION,
PRIMARY_LEASE_LOST_ERROR_MSG
);
}
return transactionOperation(
new IndexedDbTransaction(simpleDbTxn)
);
this.isPrimary = false;
this.queue.enqueue(() => this.primaryStateListener(false));
throw new FirestoreError(
Code.FAILED_PRECONDITION,
PRIMARY_LEASE_LOST_ERROR_MSG
})
.next(result => {
return this.acquireOrExtendPrimaryLease(simpleDbTxn).next(
() => result
);
}
return transactionOperation(txn);
})
.next(result => {
return this.acquireOrExtendPrimaryLease(txn).next(() => result);
});
} else {
return this.verifyAllowTabSynchronization(txn).next(() =>
transactionOperation(txn)
);
});
} else {
return this.verifyAllowTabSynchronization(simpleDbTxn).next(() =>
transactionOperation(new IndexedDbTransaction(simpleDbTxn))
);
}
}
});
);
}

/**
Expand Down
Loading