Skip to content

Commit d68d8d1

Browse files
Review
1 parent a0eb1df commit d68d8d1

File tree

6 files changed

+108
-104
lines changed

6 files changed

+108
-104
lines changed

packages/firestore/src/core/event_manager.ts

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,7 @@ import { Query } from './query';
2222
import { SyncEngine, SyncEngineListener } from './sync_engine';
2323
import { OnlineState } from './types';
2424
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
25-
import { logError } from '../util/log';
26-
import { Code, FirestoreError } from '../util/error';
27-
import { executeWithIndexedDbRecovery } from '../util/async_queue';
28-
29-
const LOG_TAG = 'EventManager';
25+
import { wrapInUserErrorIfRecoverable } from '../util/async_queue';
3026

3127
/**
3228
* Holds the listeners and the last received ViewSnapshot for a query being
@@ -63,45 +59,45 @@ export class EventManager implements SyncEngineListener {
6359
this.syncEngine.subscribe(this);
6460
}
6561

66-
listen(listener: QueryListener): Promise<void> {
67-
return executeWithIndexedDbRecovery(
68-
async () => {
69-
const query = listener.query;
70-
let firstListen = false;
62+
async listen(listener: QueryListener): Promise<void> {
63+
const query = listener.query;
64+
let firstListen = false;
7165

72-
let queryInfo = this.queries.get(query);
73-
if (!queryInfo) {
74-
firstListen = true;
75-
queryInfo = new QueryListenersInfo();
76-
}
66+
let queryInfo = this.queries.get(query);
67+
if (!queryInfo) {
68+
firstListen = true;
69+
queryInfo = new QueryListenersInfo();
70+
}
7771

78-
if (firstListen) {
79-
queryInfo.viewSnap = await this.syncEngine.listen(query);
80-
}
72+
if (firstListen) {
73+
try {
74+
queryInfo.viewSnap = await this.syncEngine.listen(query);
75+
} catch (e) {
76+
const firestoreError = wrapInUserErrorIfRecoverable(
77+
e,
78+
`Initialization of query '${listener.query}' failed`
79+
);
80+
listener.onError(firestoreError);
81+
}
82+
return;
83+
}
8184

82-
this.queries.set(query, queryInfo);
83-
queryInfo.listeners.push(listener);
85+
this.queries.set(query, queryInfo);
86+
queryInfo.listeners.push(listener);
8487

85-
// Run global snapshot listeners if a consistent snapshot has been emitted.
86-
const raisedEvent = listener.applyOnlineStateChange(this.onlineState);
87-
debugAssert(
88-
!raisedEvent,
89-
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
90-
);
88+
// Run global snapshot listeners if a consistent snapshot has been emitted.
89+
const raisedEvent = listener.applyOnlineStateChange(this.onlineState);
90+
debugAssert(
91+
!raisedEvent,
92+
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
93+
);
9194

92-
if (queryInfo.viewSnap) {
93-
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
94-
if (raisedEvent) {
95-
this.raiseSnapshotsInSyncEvent();
96-
}
97-
}
98-
},
99-
e => {
100-
const msg = `Initialization of query '${listener.query}' failed: ${e}`;
101-
logError(LOG_TAG, msg);
102-
listener.onError(new FirestoreError(Code.UNAVAILABLE, msg));
95+
if (queryInfo.viewSnap) {
96+
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
97+
if (raisedEvent) {
98+
this.raiseSnapshotsInSyncEvent();
10399
}
104-
);
100+
}
105101
}
106102

107103
async unlisten(listener: QueryListener): Promise<void> {

packages/firestore/src/core/sync_engine.ts

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import { RemoteStore } from '../remote/remote_store';
3838
import { RemoteSyncer } from '../remote/remote_syncer';
3939
import { debugAssert, fail, hardAssert } from '../util/assert';
4040
import { Code, FirestoreError } from '../util/error';
41-
import { logDebug, logError } from '../util/log';
41+
import { logDebug } from '../util/log';
4242
import { primitiveComparator } from '../util/misc';
4343
import { ObjectMap } from '../util/obj_map';
4444
import { Deferred } from '../util/promise';
@@ -72,7 +72,7 @@ import {
7272
ViewDocumentChanges
7373
} from './view';
7474
import { ViewSnapshot } from './view_snapshot';
75-
import { AsyncQueue, executeWithIndexedDbRecovery } from '../util/async_queue';
75+
import { AsyncQueue, wrapInUserErrorIfRecoverable } from '../util/async_queue';
7676
import { TransactionRunner } from './transaction_runner';
7777

7878
const LOG_TAG = 'SyncEngine';
@@ -349,25 +349,21 @@ export class SyncEngine implements RemoteSyncer {
349349
* userCallback is resolved once the write was acked/rejected by the
350350
* backend (or failed locally for any other reason).
351351
*/
352-
write(batch: Mutation[], userCallback: Deferred<void>): Promise<void> {
352+
async write(batch: Mutation[], userCallback: Deferred<void>): Promise<void> {
353353
this.assertSubscribed('write()');
354354

355-
return executeWithIndexedDbRecovery(
356-
async () => {
357-
const result = await this.localStore.localWrite(batch);
358-
this.sharedClientState.addPendingMutation(result.batchId);
359-
this.addMutationCallback(result.batchId, userCallback);
360-
await this.emitNewSnapsAndNotifyLocalStore(result.changes);
361-
await this.remoteStore.fillWritePipeline();
362-
},
363-
e => {
364-
// If we can't persist the mutation, we reject the user callback and
365-
// don't send the mutation. The user can then retry the write.
366-
const msg = `Failed to persist write: ${e}`;
367-
logError(LOG_TAG, msg);
368-
userCallback.reject(new FirestoreError(Code.UNAVAILABLE, msg));
369-
}
370-
);
355+
try {
356+
const result = await this.localStore.localWrite(batch);
357+
this.sharedClientState.addPendingMutation(result.batchId);
358+
this.addMutationCallback(result.batchId, userCallback);
359+
await this.emitNewSnapsAndNotifyLocalStore(result.changes);
360+
await this.remoteStore.fillWritePipeline();
361+
} catch (e) {
362+
// If we can't persist the mutation, we reject the user callback and
363+
// don't send the mutation. The user can then retry the write.
364+
const error = wrapInUserErrorIfRecoverable(e, `Failed to persist write`);
365+
userCallback.reject(error);
366+
}
371367
}
372368

373369
/**
@@ -582,24 +578,24 @@ export class SyncEngine implements RemoteSyncer {
582578
);
583579
}
584580

585-
return executeWithIndexedDbRecovery(
586-
async () => {
587-
const highestBatchId = await this.localStore.getHighestUnacknowledgedBatchId();
588-
if (highestBatchId === BATCHID_UNKNOWN) {
589-
// Trigger the callback right away if there is no pending writes at the moment.
590-
callback.resolve();
591-
return;
592-
}
593-
const callbacks = this.pendingWritesCallbacks.get(highestBatchId) || [];
594-
callbacks.push(callback);
595-
this.pendingWritesCallbacks.set(highestBatchId, callbacks);
596-
},
597-
e => {
598-
const msg = `Initialization of waitForPendingWrites() operation failed: ${e}`;
599-
logError(LOG_TAG, msg);
600-
callback.reject(new FirestoreError(Code.UNAVAILABLE, msg));
581+
try {
582+
const highestBatchId = await this.localStore.getHighestUnacknowledgedBatchId();
583+
if (highestBatchId === BATCHID_UNKNOWN) {
584+
// Trigger the callback right away if there is no pending writes at the moment.
585+
callback.resolve();
586+
return;
601587
}
602-
);
588+
589+
const callbacks = this.pendingWritesCallbacks.get(highestBatchId) || [];
590+
callbacks.push(callback);
591+
this.pendingWritesCallbacks.set(highestBatchId, callbacks);
592+
} catch (e) {
593+
const firestoreError = wrapInUserErrorIfRecoverable(
594+
e,
595+
'Initialization of waitForPendingWrites() operation failed'
596+
);
597+
callback.reject(firestoreError);
598+
}
603599
}
604600

605601
/**

packages/firestore/src/local/lru_garbage_collector.ts

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@
1818
import { ListenSequence } from '../core/listen_sequence';
1919
import { ListenSequenceNumber, TargetId } from '../core/types';
2020
import { debugAssert } from '../util/assert';
21-
import {
22-
AsyncQueue,
23-
TimerId,
24-
executeWithIndexedDbRecovery
25-
} from '../util/async_queue';
21+
import { AsyncQueue, TimerId } from '../util/async_queue';
2622
import { getLogLevel, logDebug, LogLevel } from '../util/log';
2723
import { primitiveComparator } from '../util/misc';
2824
import { CancelablePromise } from '../util/promise';
@@ -35,6 +31,9 @@ import {
3531
} from './persistence';
3632
import { PersistencePromise } from './persistence_promise';
3733
import { TargetData } from './target_data';
34+
import { isIndexedDbTransactionError } from './simple_db';
35+
36+
const LOG_TAG = 'LruGarbageCollector';
3837

3938
/**
4039
* Persistence layers intending to use LRU Garbage collection should have reference delegates that
@@ -269,15 +268,23 @@ export class LruScheduler implements GarbageCollectionScheduler {
269268
this.gcTask = this.asyncQueue.enqueueAfterDelay(
270269
TimerId.LruGarbageCollection,
271270
delay,
272-
() => {
271+
async () => {
273272
this.gcTask = null;
274273
this.hasRun = true;
275-
return executeWithIndexedDbRecovery(
276-
() => localStore.collectGarbage(this.garbageCollector).then(() => {}),
277-
/* recoveryHandler= */ () => {}
278-
)
279-
.then(() => this.scheduleGC(localStore))
280-
.catch(ignoreIfPrimaryLeaseLoss);
274+
try {
275+
await localStore.collectGarbage(this.garbageCollector);
276+
} catch (e) {
277+
if (isIndexedDbTransactionError(e)) {
278+
logDebug(
279+
LOG_TAG,
280+
'Ignoring IndexedDB error during garbage collection: ',
281+
e
282+
);
283+
} else {
284+
await ignoreIfPrimaryLeaseLoss(e);
285+
}
286+
}
287+
await this.scheduleGC(localStore);
281288
}
282289
);
283290
}

packages/firestore/src/local/simple_db.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,13 @@ export class IndexedDbTransactionError extends FirestoreError {
418418
}
419419
}
420420

421+
/** Verifies whether `e` is an IndexedDbTransactionError. */
422+
export function isIndexedDbTransactionError(e: Error): boolean {
423+
// Use name equality, as instanceof checks on errors don't work with errors
424+
// that wrap other errors.
425+
return e.name === 'IndexedDbTransactionError';
426+
}
427+
421428
/**
422429
* Wraps an IDBTransaction and exposes a store() method to get a handle to a
423430
* specific object store.

packages/firestore/src/remote/remote_store.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import {
5454
WatchTargetChangeState
5555
} from './watch_change';
5656
import { ByteString } from '../util/byte_string';
57+
import { isIndexedDbTransactionError } from '../local/simple_db';
5758

5859
const LOG_TAG = 'RemoteStore';
5960

@@ -448,7 +449,7 @@ export class RemoteStore implements TargetMetadataProvider {
448449
* `enqueueRetryable()`.
449450
*/
450451
private async disableNetworkUntilRecovery(e: FirestoreError): Promise<void> {
451-
if (e.name === 'IndexedDbTransactionError') {
452+
if (isIndexedDbTransactionError(e)) {
452453
debugAssert(
453454
!this.indexedDbFailed,
454455
'Unexpected network event when IndexedDB was marked failed.'

packages/firestore/src/util/async_queue.ts

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { logDebug, logError } from './log';
2121
import { CancelablePromise, Deferred } from './promise';
2222
import { ExponentialBackoff } from '../remote/backoff';
2323
import { PlatformSupport } from '../platform/platform';
24-
import { IndexedDbTransactionError } from '../local/simple_db';
24+
import { isIndexedDbTransactionError } from '../local/simple_db';
2525

2626
const LOG_TAG = 'AsyncQueue';
2727

@@ -339,7 +339,7 @@ export class AsyncQueue {
339339
deferred.resolve();
340340
this.backoff.reset();
341341
} catch (e) {
342-
if (e.name === 'IndexedDbTransactionError') {
342+
if (isIndexedDbTransactionError(e)) {
343343
logDebug(LOG_TAG, 'Operation failed with retryable error: ' + e);
344344
this.backoff.backoffAndRun(retryingOp);
345345
} else {
@@ -504,20 +504,17 @@ export class AsyncQueue {
504504
}
505505

506506
/**
507-
* Runs the provided `op`. If `op` fails with an `IndexedDbTransactionError`,
508-
* calls `recoveryHandler` and returns a resolved Promise. If `op` is successful
509-
* or fails with another type of error, returns op's result.
507+
* Returns a FirestoreError that can be surfaced to the user if the provided
508+
* error is an IndexedDbTransactionError. Re-throws the error otherwise.
510509
*/
511-
export function executeWithIndexedDbRecovery<T>(
512-
op: () => Promise<void>,
513-
recoveryHandler: (e: IndexedDbTransactionError) => void
514-
): Promise<void> {
515-
return op().catch(e => {
516-
logDebug(LOG_TAG, 'Internal operation failed: ', e);
517-
if (e.name === 'IndexedDbTransactionError') {
518-
recoveryHandler(e);
519-
} else {
520-
throw e;
521-
}
522-
});
510+
export function wrapInUserErrorIfRecoverable(
511+
e: Error,
512+
msg: string
513+
): FirestoreError {
514+
logError(LOG_TAG, `${msg}: ${e}`);
515+
if (isIndexedDbTransactionError(e)) {
516+
return new FirestoreError(Code.UNAVAILABLE, `${msg}: ${e}`);
517+
} else {
518+
throw e;
519+
}
523520
}

0 commit comments

Comments
 (0)