-
Notifications
You must be signed in to change notification settings - Fork 617
Performance: Decode documents in background thread #559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4da49cf
629cd45
f9c9c1f
00717dc
c5afc57
d93feaf
95de378
d9ef372
37d4803
f2e25f0
45ca056
1799a29
94b5b94
1e99b39
6904cc9
6c663cb
4fa2fef
e4a3644
5237be0
06173ad
afc4382
2075fc7
21c8562
adda141
0afb5d7
5eca22c
b9e15bb
a2fba58
bd24bfe
0b2d9ad
7fabd7f
6fb8e9f
1c8c04c
28bda9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -23,12 +23,16 @@ | |||
import com.google.firebase.firestore.model.DocumentKey; | ||||
import com.google.firebase.firestore.model.MaybeDocument; | ||||
import com.google.firebase.firestore.model.ResourcePath; | ||||
import com.google.firebase.firestore.util.Executors; | ||||
import com.google.protobuf.InvalidProtocolBufferException; | ||||
import com.google.protobuf.MessageLite; | ||||
import java.util.ArrayList; | ||||
import java.util.HashMap; | ||||
import java.util.List; | ||||
import java.util.Map; | ||||
import java.util.concurrent.ConcurrentHashMap; | ||||
import java.util.concurrent.Executor; | ||||
import java.util.concurrent.Semaphore; | ||||
import javax.annotation.Nullable; | ||||
|
||||
final class SQLiteRemoteDocumentCache implements RemoteDocumentCache { | ||||
|
@@ -118,7 +122,10 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu | |||
String prefixPath = EncodedPath.encode(prefix); | ||||
String prefixSuccessorPath = EncodedPath.prefixSuccessor(prefixPath); | ||||
|
||||
Map<DocumentKey, Document> results = new HashMap<>(); | ||||
Map<DocumentKey, Document> allDocuments = new ConcurrentHashMap<>(); | ||||
|
||||
int[] pendingTaskCount = new int[] {0}; | ||||
Semaphore completedTasks = new Semaphore(0); | ||||
|
||||
db.query("SELECT path, contents FROM remote_documents WHERE path >= ? AND path < ?") | ||||
.binding(prefixPath, prefixSuccessorPath) | ||||
|
@@ -136,20 +143,38 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu | |||
return; | ||||
} | ||||
|
||||
MaybeDocument maybeDoc = decodeMaybeDocument(row.getBlob(1)); | ||||
if (!(maybeDoc instanceof Document)) { | ||||
return; | ||||
} | ||||
|
||||
Document doc = (Document) maybeDoc; | ||||
if (!query.matches(doc)) { | ||||
return; | ||||
} | ||||
|
||||
results.put(doc.getKey(), doc); | ||||
byte[] rawContents = row.getBlob(1); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one method is now huge, tightly intermingling threading implementation details with the query processing. Most of this could be generic. There are other circumstances where we potentially process large numbers of rows (e.g. the mutation queue) and this could be extended to those places. Probably even more importantly, if we pull this out into something separate we could test it directly :-). One thing that's preventing this from being generic is the intermediate results are stored in a concurrent map, which ties the results to the query structure. However, we don't need this to be a map of key to document until we're assembling the result. We could just as easily publish to each document into a dequeue. Additional notes:
If we were targeting Java 8, I'd suggest that you've essentially reimplemented a subset of At a high level this could be phrased as map/reduce, where the map phase is parsing and filtering (first by I suggest a few refactoring experiments:
WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll try to make this more generic, but I didn't want this to turn into a 500 line change with multiple new classes. I'll give it another go though. Re: Query processing. Query matching is not thread safe (because of firebase-android-sdk/firebase-firestore/src/main/java/com/google/firebase/firestore/core/Query.java Line 54 in 745272a
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this turn into a problem in practice? What if we access the orderBy in this thread before enqueuing any work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mikelehen mostly convince me that Query matching is not a problem in practice. I also believe that Query matching itself is not a huge bottleneck, but it doesn't hurt to also execute it in the background, especially given that we are already running a task there. I rewrote the code in this function to use the Tasks API. By that I mean I added APIs to Util that mirror their equivalent methods of the Task API, but with some significant differences:
If you like this approach, I can write dedicated tests for these helpers. We might also want to move them to a dedicated place (such as TaskUtil). |
||||
|
||||
++pendingTaskCount[0]; | ||||
|
||||
// Since scheduling background tasks incurs overhead, we only dispatch to a background | ||||
// thread if there are still some documents remaining. | ||||
Executor deserializationExecutor = | ||||
row.isLast() ? Executors.DIRECT_EXECUTOR : Executors.BACKGROUND_EXECUTOR; | ||||
deserializationExecutor.execute( | ||||
() -> { | ||||
MaybeDocument maybeDoc = decodeMaybeDocument(rawContents); | ||||
if (maybeDoc instanceof Document) { | ||||
allDocuments.put(maybeDoc.getKey(), (Document) maybeDoc); | ||||
} | ||||
completedTasks.release(); | ||||
}); | ||||
}); | ||||
|
||||
return ImmutableSortedMap.Builder.fromMap(results, DocumentKey.comparator()); | ||||
try { | ||||
completedTasks.acquire(pendingTaskCount[0]); | ||||
} catch (InterruptedException e) { | ||||
Thread.currentThread().interrupt(); | ||||
} | ||||
|
||||
ImmutableSortedMap<DocumentKey, Document> matchingDocuments = | ||||
ImmutableSortedMap.Builder.emptyMap(DocumentKey.comparator()); | ||||
for (Map.Entry<DocumentKey, Document> entry : allDocuments.entrySet()) { | ||||
if (query.matches(entry.getValue())) { | ||||
matchingDocuments = matchingDocuments.insert(entry.getKey(), entry.getValue()); | ||||
} | ||||
} | ||||
return matchingDocuments; | ||||
} | ||||
|
||||
private String pathForKey(DocumentKey key) { | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,11 +14,22 @@ | |
|
||
package com.google.firebase.firestore.util; | ||
|
||
import android.os.AsyncTask; | ||
import com.google.android.gms.tasks.TaskExecutors; | ||
import java.util.Queue; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
/** Helper class for executors. */ | ||
public final class Executors { | ||
/** | ||
* The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR. | ||
* | ||
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and addresses | ||
* its queue size limit of 120 pending tasks. | ||
*/ | ||
private static final int MAXIMUM_CONCURRENT_BACKGROUND_TASKS = 4; | ||
|
||
/** | ||
* The default executor for user visible callbacks. It is an executor scheduling callbacks on | ||
|
@@ -29,6 +40,39 @@ public final class Executors { | |
/** An executor that executes the provided runnable immediately on the current thread. */ | ||
public static final Executor DIRECT_EXECUTOR = Runnable::run; | ||
|
||
/** | ||
* An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR. | ||
* | ||
* <p>Unlike the main THREAD_POOL_EXECUTOR, this executor manages its own queue of tasks and can | ||
* handle an unbounded number of pending tasks. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having an unbounded number of pending tasks seems problematic from a memory usage point of view. Previously we'd unpack one blob, parse it, compare it, and then allow it to be GCed. Now, if we get significantly ahead of the background threads' ability to clear this queue each queued document will be resident. Previously, ignoring constant factors around the current document we're looking at, our memory usage was essentially O(matching documents), but this increases it to potentially be O(all documents in a collection). This could be a huge difference. We should block admission into the queue if the background threads aren't cleaning up. A straightforward way to bound this is to just use a semaphore with a fixed number of permits to guard admission. This will have the side effect of fixing the thread-safety issue you've documented below, which makes me uncomfortable. |
||
*/ | ||
public static final Executor BACKGROUND_EXECUTOR = | ||
new Executor() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't make major components we're going to reason about like this anonymous classes. Make it a |
||
AtomicInteger activeRunnerCount = new AtomicInteger(0); | ||
Queue<Runnable> pendingTasks = new ConcurrentLinkedQueue<>(); | ||
|
||
@Override | ||
public void execute(Runnable command) { | ||
pendingTasks.add(command); | ||
|
||
if (activeRunnerCount.get() < MAXIMUM_CONCURRENT_BACKGROUND_TASKS) { | ||
// Note that the runner count could temporarily exceed | ||
// MAXIMUM_CONCURRENT_BACKGROUND_TASKS if this is code path was executed in parallel. | ||
// While undesired, this would merely queue another task on THREAD_POOL_EXECUTOR, | ||
// and we are unlikely to hit the 120 pending task limit. | ||
activeRunnerCount.incrementAndGet(); | ||
AsyncTask.THREAD_POOL_EXECUTOR.execute( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still need to handle the fact that We should implement a caller runs policy, rather than trying to block on the executor becoming available. i.e. // acquire permit
Runnable wrappedCommand = // thing that runs and then releases semaphore
try {
AsyncTask.THREAD_POOL_EXECUTOR.execute(wrappedCommand);
} catch (RejectedExecutionException ignored) {
wrappedCommand.run();
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it occurs to me that there's essentially no reason to block under any circumstance. If we choose to implement a semaphore to limit the number of tasks we admit into the underlying executor, we could tryAcquire, and it we don't get a ticket just run the (unwrapped) command directly rather than blocking. We probably still want a semaphore so that we don't abuse the |
||
() -> { | ||
Runnable r; | ||
while ((r = pendingTasks.poll()) != null) { | ||
r.run(); | ||
} | ||
activeRunnerCount.decrementAndGet(); | ||
}); | ||
} | ||
} | ||
}; | ||
|
||
private Executors() { | ||
// Private constructor to prevent initialization | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using "reduced" as the verb here seems like the wrong pairing. Reduce resource consumption but improve or (increase) performance seem more customary.
How about "Improved the performance of queries with large result sets by up to 60%"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.