Skip to content

Commit 276f279

Browse files
Performance: Decode documents in background thread (#559)
1 parent bbd9b7d commit 276f279

File tree

7 files changed

+262
-18
lines changed

7 files changed

+262
-18
lines changed

firebase-firestore/CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Unreleased
2-
- [fixed] Fixed an internal assertion that was triggered when an
3-
update with a `FieldValue.serverTimestamp()` and an update with a
2+
- [fixed] Fixed an internal assertion that was triggered when an update
3+
with a `FieldValue.serverTimestamp()` and an update with a
44
`FieldValue.increment()` were pending for the same document (#491).
5+
- [changed] Improved performance of queries with large result sets.
56
- [changed] Improved performance for queries with filters that only return a
67
small subset of the documents in a collection.
78
- [changed] Instead of failing silently, Firestore now crashes the client app

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteRemoteDocumentCache.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,19 @@
2020
import com.google.firebase.database.collection.ImmutableSortedMap;
2121
import com.google.firebase.firestore.core.Query;
2222
import com.google.firebase.firestore.model.Document;
23+
import com.google.firebase.firestore.model.DocumentCollections;
2324
import com.google.firebase.firestore.model.DocumentKey;
2425
import com.google.firebase.firestore.model.MaybeDocument;
2526
import com.google.firebase.firestore.model.ResourcePath;
27+
import com.google.firebase.firestore.util.BackgroundQueue;
28+
import com.google.firebase.firestore.util.Executors;
2629
import com.google.protobuf.InvalidProtocolBufferException;
2730
import com.google.protobuf.MessageLite;
2831
import java.util.ArrayList;
2932
import java.util.HashMap;
3033
import java.util.List;
3134
import java.util.Map;
35+
import java.util.concurrent.Executor;
3236
import javax.annotation.Nullable;
3337

3438
final class SQLiteRemoteDocumentCache implements RemoteDocumentCache {
@@ -118,7 +122,11 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
118122
String prefixPath = EncodedPath.encode(prefix);
119123
String prefixSuccessorPath = EncodedPath.prefixSuccessor(prefixPath);
120124

121-
Map<DocumentKey, Document> results = new HashMap<>();
125+
BackgroundQueue backgroundQueue = new BackgroundQueue();
126+
127+
ImmutableSortedMap<DocumentKey, Document>[] matchingDocuments =
128+
(ImmutableSortedMap<DocumentKey, Document>[])
129+
new ImmutableSortedMap[] {DocumentCollections.emptyDocumentMap()};
122130

123131
db.query("SELECT path, contents FROM remote_documents WHERE path >= ? AND path < ?")
124132
.binding(prefixPath, prefixSuccessorPath)
@@ -136,20 +144,31 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
136144
return;
137145
}
138146

139-
MaybeDocument maybeDoc = decodeMaybeDocument(row.getBlob(1));
140-
if (!(maybeDoc instanceof Document)) {
141-
return;
142-
}
143-
144-
Document doc = (Document) maybeDoc;
145-
if (!query.matches(doc)) {
146-
return;
147-
}
148-
149-
results.put(doc.getKey(), doc);
147+
byte[] rawDocument = row.getBlob(1);
148+
149+
// Since scheduling background tasks incurs overhead, we only dispatch to a background
150+
// thread if there are still some documents remaining.
151+
Executor executor = row.isLast() ? Executors.DIRECT_EXECUTOR : backgroundQueue;
152+
executor.execute(
153+
() -> {
154+
MaybeDocument maybeDoc = decodeMaybeDocument(rawDocument);
155+
156+
if (maybeDoc instanceof Document && query.matches((Document) maybeDoc)) {
157+
synchronized (SQLiteRemoteDocumentCache.this) {
158+
matchingDocuments[0] =
159+
matchingDocuments[0].insert(maybeDoc.getKey(), (Document) maybeDoc);
160+
}
161+
}
162+
});
150163
});
151164

152-
return ImmutableSortedMap.Builder.fromMap(results, DocumentKey.comparator());
165+
try {
166+
backgroundQueue.drain();
167+
} catch (InterruptedException e) {
168+
fail("Interrupted while deserializing documents", e);
169+
}
170+
171+
return matchingDocuments[0];
153172
}
154173

155174
private String pathForKey(DocumentKey key) {

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/GrpcCallProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package com.google.firebase.firestore.remote;
1616

1717
import android.content.Context;
18-
import android.os.AsyncTask;
1918
import androidx.annotation.VisibleForTesting;
2019
import com.google.android.gms.common.GooglePlayServicesNotAvailableException;
2120
import com.google.android.gms.common.GooglePlayServicesRepairableException;
@@ -24,6 +23,7 @@
2423
import com.google.android.gms.tasks.Tasks;
2524
import com.google.firebase.firestore.core.DatabaseInfo;
2625
import com.google.firebase.firestore.util.AsyncQueue;
26+
import com.google.firebase.firestore.util.Executors;
2727
import com.google.firebase.firestore.util.Logger;
2828
import com.google.firebase.firestore.util.Supplier;
2929
import com.google.firestore.v1.FirestoreGrpc;
@@ -69,11 +69,11 @@ public static void overrideChannelBuilder(
6969
CallCredentials firestoreHeaders) {
7070
this.asyncQueue = asyncQueue;
7171

72-
// We execute network initialization on a separate thred to not block operations that depend on
72+
// We execute network initialization on a separate thread to not block operations that depend on
7373
// the AsyncQueue.
7474
this.channelTask =
7575
Tasks.call(
76-
AsyncTask.THREAD_POOL_EXECUTOR,
76+
Executors.BACKGROUND_EXECUTOR,
7777
() -> {
7878
ManagedChannel channel = initChannel(context, databaseInfo);
7979
FirestoreGrpc.FirestoreStub firestoreStub =
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.util;
16+
17+
import java.util.concurrent.Executor;
18+
import java.util.concurrent.Semaphore;
19+
20+
/**
21+
* A simple queue that executes tasks in parallel on the Android's AsyncTask.THREAD_POOL_EXECUTOR
22+
* and supports blocking on their completion.
23+
*
24+
* <p>This class is not thread-safe. In particular, `execute()` and `drain()` should not be called
25+
* from parallel threads.
26+
*/
27+
public class BackgroundQueue implements Executor {
28+
private Semaphore completedTasks = new Semaphore(0);
29+
private int pendingTaskCount = 0;
30+
31+
/** Enqueue a task on Android's THREAD_POOL_EXECUTOR. */
32+
@Override
33+
public void execute(Runnable task) {
34+
++pendingTaskCount;
35+
Executors.BACKGROUND_EXECUTOR.execute(
36+
() -> {
37+
task.run();
38+
completedTasks.release();
39+
});
40+
}
41+
42+
/** Wait for all currently scheduled tasks to complete. */
43+
public void drain() throws InterruptedException {
44+
completedTasks.acquire(pendingTaskCount);
45+
pendingTaskCount = 0;
46+
}
47+
}

firebase-firestore/src/main/java/com/google/firebase/firestore/util/Executors.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,20 @@
1414

1515
package com.google.firebase.firestore.util;
1616

17+
import android.os.AsyncTask;
1718
import com.google.android.gms.tasks.TaskExecutors;
1819
import java.util.concurrent.Executor;
1920

2021
/** Helper class for executors. */
2122
public final class Executors {
23+
/**
24+
* The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR.
25+
*
26+
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and is well
27+
* below the queue size limit of 120 pending tasks. Limiting our usage of the THREAD_POOL_EXECUTOR
28+
* allows other users to schedule their own operations on the shared THREAD_POOL_EXECUTOR.
29+
*/
30+
private static final int ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY = 4;
2231

2332
/**
2433
* The default executor for user visible callbacks. It is an executor scheduling callbacks on
@@ -29,6 +38,11 @@ public final class Executors {
2938
/** An executor that executes the provided runnable immediately on the current thread. */
3039
public static final Executor DIRECT_EXECUTOR = Runnable::run;
3140

41+
/** An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR. */
42+
public static final Executor BACKGROUND_EXECUTOR =
43+
new ThrottledForwardingExecutor(
44+
ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY, AsyncTask.THREAD_POOL_EXECUTOR);
45+
3246
private Executors() {
3347
// Private constructor to prevent initialization
3448
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.util;
16+
17+
import java.util.concurrent.Executor;
18+
import java.util.concurrent.RejectedExecutionException;
19+
import java.util.concurrent.Semaphore;
20+
21+
/**
22+
* An executor that forwards executions to another executor, but caps the number of pending
23+
* operations. Tasks scheduled past the specified limit are directly invoked on the calling thread,
24+
* reducing the total memory consumed by pending tasks.
25+
*/
26+
class ThrottledForwardingExecutor implements Executor {
27+
private final Executor executor;
28+
private final Semaphore availableSlots;
29+
30+
/**
31+
* Instantiates a new ThrottledForwardingExecutor.
32+
*
33+
* @param maximumConcurrency The maximum number of pending tasks to schedule on the provided
34+
* executor.
35+
* @param executor The executor to forward tasks to.
36+
*/
37+
ThrottledForwardingExecutor(int maximumConcurrency, Executor executor) {
38+
this.availableSlots = new Semaphore(maximumConcurrency);
39+
this.executor = executor;
40+
}
41+
42+
/**
43+
* Forwards a task to the provided executor if the current number of pending tasks is less than
44+
* the configured limit. Otherwise, executes the task directly.
45+
*
46+
* @param command The task to run.
47+
*/
48+
@Override
49+
public void execute(Runnable command) {
50+
if (availableSlots.tryAcquire()) {
51+
try {
52+
executor.execute(
53+
() -> {
54+
command.run();
55+
availableSlots.release();
56+
});
57+
} catch (RejectedExecutionException e) {
58+
command.run();
59+
}
60+
} else {
61+
command.run();
62+
}
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.util;
16+
17+
import static org.junit.Assert.assertEquals;
18+
import static org.junit.Assert.fail;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.Executor;
22+
import java.util.concurrent.RejectedExecutionException;
23+
import java.util.concurrent.Semaphore;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.junit.Test;
26+
27+
public class ThrottledForwardingExecutorTest {
28+
@Test
29+
public void limitsNumberOfForwardedTasks() throws InterruptedException {
30+
Semaphore completedTasks = new Semaphore(0);
31+
int maximumConcurrency = 10;
32+
33+
CountingExecutor countingExecutor = new CountingExecutor();
34+
ThrottledForwardingExecutor throttledExecutor =
35+
new ThrottledForwardingExecutor(maximumConcurrency, countingExecutor);
36+
37+
// Schedule more than `maximumConcurrency` parallel tasks and wait until all scheduling has
38+
// finished.
39+
int numTasks = maximumConcurrency + 1;
40+
CountDownLatch schedulingLatch = new CountDownLatch(1);
41+
for (int i = 0; i < numTasks; ++i) {
42+
int currentTask = i;
43+
throttledExecutor.execute(
44+
() -> {
45+
try {
46+
if (currentTask < maximumConcurrency) {
47+
// Block if we are running on the forwarded executor. We can't block the thread that
48+
// is running this test.
49+
schedulingLatch.await();
50+
}
51+
completedTasks.release();
52+
} catch (InterruptedException e) {
53+
fail("Unexpected InterruptedException: " + e);
54+
}
55+
});
56+
}
57+
schedulingLatch.countDown();
58+
59+
// Verify that only `maximumConcurrency` tasks were forwarded to the executor.
60+
completedTasks.acquire(numTasks);
61+
assertEquals(maximumConcurrency, countingExecutor.getNumTasks());
62+
}
63+
64+
@Test
65+
public void handlesRejectedExecutionException() {
66+
AtomicInteger result = new AtomicInteger(0);
67+
68+
ThrottledForwardingExecutor executor =
69+
new ThrottledForwardingExecutor(
70+
10,
71+
command -> {
72+
throw new RejectedExecutionException();
73+
});
74+
75+
executor.execute(result::incrementAndGet);
76+
77+
assertEquals(1, result.get());
78+
}
79+
80+
/** An executor that counts the number of tasks submitted. */
81+
private static class CountingExecutor implements Executor {
82+
int numTasks = 0;
83+
84+
@Override
85+
public void execute(Runnable command) {
86+
++numTasks;
87+
new Thread() {
88+
@Override
89+
public void run() {
90+
command.run();
91+
}
92+
}.start();
93+
}
94+
95+
int getNumTasks() {
96+
return numTasks;
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)