Skip to content

Commit 649ed1d

Browse files
IndexedDB recovery for waitForPendingWrites()
1 parent a57dac5 commit 649ed1d

File tree

4 files changed

+98
-80
lines changed

4 files changed

+98
-80
lines changed

packages/firestore/src/core/event_manager.ts

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { OnlineState } from './types';
2424
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
2525
import { logError } from '../util/log';
2626
import { Code, FirestoreError } from '../util/error';
27+
import { executeWithIndexedDbRecovery } from '../util/async_queue';
2728

2829
const LOG_TAG = 'EventManager';
2930

@@ -62,47 +63,45 @@ export class EventManager implements SyncEngineListener {
6263
this.syncEngine.subscribe(this);
6364
}
6465

65-
async listen(listener: QueryListener): Promise<void> {
66-
const query = listener.query;
67-
let firstListen = false;
66+
listen(listener: QueryListener): Promise<void> {
67+
return executeWithIndexedDbRecovery(
68+
async () => {
69+
const query = listener.query;
70+
let firstListen = false;
6871

69-
let queryInfo = this.queries.get(query);
70-
if (!queryInfo) {
71-
firstListen = true;
72-
queryInfo = new QueryListenersInfo();
73-
}
72+
let queryInfo = this.queries.get(query);
73+
if (!queryInfo) {
74+
firstListen = true;
75+
queryInfo = new QueryListenersInfo();
76+
}
7477

75-
if (firstListen) {
76-
try {
77-
queryInfo.viewSnap = await this.syncEngine.listen(query);
78-
} catch (e) {
79-
const msg = `Initialization of query '${query}' failed: ${e}`;
80-
logError(LOG_TAG, msg);
81-
if (e.name === 'IndexedDbTransactionError') {
82-
listener.onError(new FirestoreError(Code.UNAVAILABLE, msg));
83-
} else {
84-
throw e;
78+
if (firstListen) {
79+
queryInfo.viewSnap = await this.syncEngine.listen(query);
8580
}
86-
return;
87-
}
88-
}
8981

90-
this.queries.set(query, queryInfo);
91-
queryInfo.listeners.push(listener);
82+
this.queries.set(query, queryInfo);
83+
queryInfo.listeners.push(listener);
9284

93-
// Run global snapshot listeners if a consistent snapshot has been emitted.
94-
const raisedEvent = listener.applyOnlineStateChange(this.onlineState);
95-
debugAssert(
96-
!raisedEvent,
97-
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
98-
);
99-
100-
if (queryInfo.viewSnap) {
101-
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
102-
if (raisedEvent) {
103-
this.raiseSnapshotsInSyncEvent();
85+
// Run global snapshot listeners if a consistent snapshot has been emitted.
86+
const raisedEvent = listener.applyOnlineStateChange(this.onlineState);
87+
debugAssert(
88+
!raisedEvent,
89+
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
90+
);
91+
92+
if (queryInfo.viewSnap) {
93+
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
94+
if (raisedEvent) {
95+
this.raiseSnapshotsInSyncEvent();
96+
}
97+
}
98+
},
99+
e => {
100+
const msg = `Initialization of query '${listener.query}' failed: ${e}`;
101+
logError(LOG_TAG, msg);
102+
listener.onError(new FirestoreError(Code.UNAVAILABLE, msg));
104103
}
105-
}
104+
);
106105
}
107106

108107
async unlisten(listener: QueryListener): Promise<void> {

packages/firestore/src/core/sync_engine.ts

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ import {
7373
ViewDocumentChanges
7474
} from './view';
7575
import { ViewSnapshot } from './view_snapshot';
76-
import { AsyncQueue } from '../util/async_queue';
76+
import { AsyncQueue, executeWithIndexedDbRecovery } from '../util/async_queue';
7777
import { TransactionRunner } from './transaction_runner';
7878

7979
const LOG_TAG = 'SyncEngine';
@@ -353,27 +353,22 @@ export class SyncEngine implements RemoteSyncer {
353353
async write(batch: Mutation[], userCallback: Deferred<void>): Promise<void> {
354354
this.assertSubscribed('write()');
355355

356-
let result: LocalWriteResult;
357-
try {
358-
result = await this.localStore.localWrite(batch);
359-
} catch (e) {
360-
if (e.name === 'IndexedDbTransactionError') {
356+
await executeWithIndexedDbRecovery(
357+
async () => {
358+
const result = await this.localStore.localWrite(batch);
359+
this.sharedClientState.addPendingMutation(result.batchId);
360+
this.addMutationCallback(result.batchId, userCallback);
361+
await this.emitNewSnapsAndNotifyLocalStore(result.changes);
362+
await this.remoteStore.fillWritePipeline();
363+
},
364+
e => {
361365
// If we can't persist the mutation, we reject the user callback and
362366
// don't send the mutation. The user can then retry the write.
363-
logError(LOG_TAG, 'Dropping write that cannot be persisted: ' + e);
364-
userCallback.reject(
365-
new FirestoreError(Code.UNAVAILABLE, 'Failed to persist write: ' + e)
366-
);
367-
return;
368-
} else {
369-
throw e;
367+
const msg = `Failed to persist write: ${e}`;
368+
logError(LOG_TAG, msg);
369+
userCallback.reject(new FirestoreError(Code.UNAVAILABLE, msg));
370370
}
371-
}
372-
373-
this.sharedClientState.addPendingMutation(result.batchId);
374-
this.addMutationCallback(result.batchId, userCallback);
375-
await this.emitNewSnapsAndNotifyLocalStore(result.changes);
376-
await this.remoteStore.fillWritePipeline();
371+
);
377372
}
378373

379374
/**
@@ -584,16 +579,24 @@ export class SyncEngine implements RemoteSyncer {
584579
);
585580
}
586581

587-
const highestBatchId = await this.localStore.getHighestUnacknowledgedBatchId();
588-
if (highestBatchId === BATCHID_UNKNOWN) {
589-
// Trigger the callback right away if there is no pending writes at the moment.
590-
callback.resolve();
591-
return;
592-
}
593-
594-
const callbacks = this.pendingWritesCallbacks.get(highestBatchId) || [];
595-
callbacks.push(callback);
596-
this.pendingWritesCallbacks.set(highestBatchId, callbacks);
582+
return executeWithIndexedDbRecovery(
583+
async () => {
584+
const highestBatchId = await this.localStore.getHighestUnacknowledgedBatchId();
585+
if (highestBatchId === BATCHID_UNKNOWN) {
586+
// Trigger the callback right away if there is no pending writes at the moment.
587+
callback.resolve();
588+
return;
589+
}
590+
const callbacks = this.pendingWritesCallbacks.get(highestBatchId) || [];
591+
callbacks.push(callback);
592+
this.pendingWritesCallbacks.set(highestBatchId, callbacks);
593+
},
594+
e => {
595+
const msg = `Initialization of waitForPendingWrites() operation failed: ${e}`;
596+
logError(LOG_TAG, msg);
597+
callback.reject(new FirestoreError(Code.UNAVAILABLE, msg));
598+
}
599+
);
597600
}
598601

599602
/**

packages/firestore/src/local/lru_garbage_collector.ts

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
import { ListenSequence } from '../core/listen_sequence';
1919
import { ListenSequenceNumber, TargetId } from '../core/types';
2020
import { debugAssert } from '../util/assert';
21-
import { AsyncQueue, TimerId } from '../util/async_queue';
21+
import {
22+
AsyncQueue,
23+
TimerId,
24+
executeWithIndexedDbRecovery
25+
} from '../util/async_queue';
2226
import { getLogLevel, logDebug, LogLevel } from '../util/log';
2327
import { primitiveComparator } from '../util/misc';
2428
import { CancelablePromise } from '../util/promise';
@@ -267,23 +271,15 @@ export class LruScheduler implements GarbageCollectionScheduler {
267271
this.gcTask = this.asyncQueue.enqueueAfterDelay(
268272
TimerId.LruGarbageCollection,
269273
delay,
270-
async () => {
274+
() => {
271275
this.gcTask = null;
272276
this.hasRun = true;
273-
try {
274-
await localStore.collectGarbage(this.garbageCollector);
275-
} catch (e) {
276-
if (e.name === 'IndexedDbTransactionError') {
277-
logDebug(
278-
LOG_TAG,
279-
'Ignoring IndexedDB error during garbage collection: ',
280-
e
281-
);
282-
} else {
283-
await ignoreIfPrimaryLeaseLoss(e);
284-
}
285-
}
286-
await this.scheduleGC(localStore);
277+
return executeWithIndexedDbRecovery(
278+
() => localStore.collectGarbage(this.garbageCollector).then(() => {}),
279+
/* recoveryHandler= */ () => {}
280+
)
281+
.then(() => this.scheduleGC(localStore))
282+
.catch(ignoreIfPrimaryLeaseLoss);
287283
}
288284
);
289285
}

packages/firestore/src/util/async_queue.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { logDebug, logError } from './log';
2121
import { CancelablePromise, Deferred } from './promise';
2222
import { ExponentialBackoff } from '../remote/backoff';
2323
import { PlatformSupport } from '../platform/platform';
24+
import { IndexedDbTransactionError } from '../local/simple_db';
2425

2526
const LOG_TAG = 'AsyncQueue';
2627

@@ -501,3 +502,22 @@ export class AsyncQueue {
501502
this.delayedOperations.splice(index, 1);
502503
}
503504
}
505+
506+
/**
507+
* Runs the provided `op`. If `op` fails with an `IndexedDbTransactionError`,
508+
* calls `recoveryHandler` and returns a resolved Promise. If `op` is successful
509+
* or fails with another type of error, returns op's result.
510+
*/
511+
export function executeWithIndexedDbRecovery<T>(
512+
op: () => Promise<void>,
513+
recoveryHandler: (e: IndexedDbTransactionError) => void
514+
): Promise<void> {
515+
return op().catch(e => {
516+
logDebug(LOG_TAG, 'Internal operation failed: ', e);
517+
if (e.name === 'IndexedDbTransactionError') {
518+
recoveryHandler(e);
519+
} else {
520+
throw e;
521+
}
522+
});
523+
}

0 commit comments

Comments
 (0)