Skip to content

Performance: Decode documents in background thread #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4da49cf
Decode documents in background thread
schmidt-sebastian Jun 21, 2019
629cd45
Merge branch 'master' into mrschmidt/parallel
schmidt-sebastian Jun 25, 2019
f9c9c1f
Use Task API
schmidt-sebastian Jun 25, 2019
00717dc
Realize Android Tasks are slow
schmidt-sebastian Jun 25, 2019
c5afc57
RFC: ThrottledForwardingExecutor
schmidt-sebastian Jun 26, 2019
d93feaf
Add IN and ARRAY_CONTAINS_ANY (not publicly exposed) (#519)
Jun 20, 2019
95de378
Add Remote Config smoke tests. (#543)
allisonbm92 Jun 20, 2019
d9ef372
Generalize parent repository logic. (#548)
allisonbm92 Jun 20, 2019
37d4803
Open source firebase remote config and firebase abt
miraziz Jun 20, 2019
f2e25f0
Backporting documentation changes (#550)
rsgowman Jun 21, 2019
45ca056
Update Remote Config and AB Testing versions (#553)
allisonbm92 Jun 21, 2019
1799a29
Disable FTL for AB Testing (#554)
allisonbm92 Jun 21, 2019
94b5b94
Move RC to firebase-config. (#555)
allisonbm92 Jun 21, 2019
1e99b39
Fix FirebaseFirestore formatting. (#556)
allisonbm92 Jun 21, 2019
6904cc9
Remove move product flavors from smoke tests. (#557)
allisonbm92 Jun 21, 2019
6c663cb
Add option to run tests against emulator (#540)
Jun 21, 2019
4fa2fef
Use Task API
schmidt-sebastian Jun 25, 2019
e4a3644
Realize Android Tasks are slow
schmidt-sebastian Jun 25, 2019
5237be0
RFC: ThrottledForwardingExecutor
schmidt-sebastian Jun 26, 2019
06173ad
Update documentation to reflect emulator is default for integration t…
Jun 24, 2019
afc4382
Allow IN queries with arrays of documentIds (#560)
Jun 24, 2019
2075fc7
Migrate ExperimentPayload to AB Testing (#563)
allisonbm92 Jun 25, 2019
21c8562
Run IN query integration tests only when running against emulator (#567)
Jun 25, 2019
adda141
Add Inapp Messaging smoke test. (#569)
allisonbm92 Jun 25, 2019
0afb5d7
Add Proguard Files to Smoke Tests (#572)
allisonbm92 Jun 26, 2019
5eca22c
Move DocumentId to public space (#571)
Jun 26, 2019
b9e15bb
fdsdfs
schmidt-sebastian Jun 27, 2019
a2fba58
Finishing touches
schmidt-sebastian Jun 27, 2019
bd24bfe
Merge
schmidt-sebastian Jun 27, 2019
0b2d9ad
Merge
schmidt-sebastian Jun 27, 2019
7fabd7f
Fix merge
schmidt-sebastian Jun 27, 2019
6fb8e9f
Cleanup
schmidt-sebastian Jun 27, 2019
1c8c04c
Make BackgroundQueue a top-level class
schmidt-sebastian Jul 2, 2019
28bda9e
Merge branch 'master' into mrschmidt/parallel
schmidt-sebastian Jul 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions firebase-firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Unreleased
- [changed] Reduced execution time of queries with large result sets by up to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "reduced" as the verb here seems like the wrong pairing. Reduce resource consumption but improve or (increase) performance seem more customary.

How about "Improved the performance of queries with large result sets by up to 60%"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

60%.
- [changed] Instead of failing silently, Firestore now crashes the client app
if it fails to load SSL Ciphers. To avoid these crashes, you must bundle
Conscrypt to support non-GMSCore devices on Android KitKat or JellyBean (see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@
import java.util.List;
import java.util.Map;

/** Serializer for values stored in the LocalStore. */
/**
* Serializer for values stored in the LocalStore.
*
* <p>This class is thread-safe.
*/
public final class LocalSerializer {

private final RemoteSerializer rpcSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.MaybeDocument;
import com.google.firebase.firestore.model.ResourcePath;
import com.google.firebase.firestore.util.Executors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

final class SQLiteRemoteDocumentCache implements RemoteDocumentCache {
Expand Down Expand Up @@ -118,7 +122,10 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
String prefixPath = EncodedPath.encode(prefix);
String prefixSuccessorPath = EncodedPath.prefixSuccessor(prefixPath);

Map<DocumentKey, Document> results = new HashMap<>();
Map<DocumentKey, Document> allDocuments = new ConcurrentHashMap<>();

int[] pendingTaskCount = new int[] {0};
Semaphore completedTasks = new Semaphore(0);

db.query("SELECT path, contents FROM remote_documents WHERE path >= ? AND path < ?")
.binding(prefixPath, prefixSuccessorPath)
Expand All @@ -136,20 +143,38 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
return;
}

MaybeDocument maybeDoc = decodeMaybeDocument(row.getBlob(1));
if (!(maybeDoc instanceof Document)) {
return;
}

Document doc = (Document) maybeDoc;
if (!query.matches(doc)) {
return;
}

results.put(doc.getKey(), doc);
byte[] rawContents = row.getBlob(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one method is now huge, tightly intermingling threading implementation details with the query processing. Most of this could be generic. There are other circumstances where we potentially process large numbers of rows (e.g. the mutation queue) and this could be extended to those places. Probably even more importantly, if we pull this out into something separate we could test it directly :-).

One thing that's preventing this from being generic is the intermediate results are stored in a concurrent map, which ties the results to the query structure. However, we don't need this to be a map of key to document until we're assembling the result. We could just as easily publish to each document into a dequeue.

Additional notes:

  • This implementation leaves time on the table by not starting query matching until all the protos are decoded. The Firestore worker thread that was processing query results can't start assembling the result before the worker threads are done.
  • Also, we're performing query matching serially on the Firestore worker, when we could be performing it in the parallel block.
  • This is a very low number of threads that are all doing significant work per item. We could probably get away with just a simple lock wrapping our output without building an intermediate ConcurrentHashMap.

If we were targeting Java 8, I'd suggest that you've essentially reimplemented a subset of java.util.stream. There are ports of that to Android API < 24, but that's a lot of code for something that's pretty small.

At a high level this could be phrased as map/reduce, where the map phase is parsing and filtering (first by instanceof Document, second by query.matches). The reduce phase is the assembly of the matchingDocuments map.

I suggest a few refactoring experiments:

  • See if pulling query matching into the worker threads helps
  • See if directly assembling the ImmutableSortedMap while holding a lock works
    • If not, see if a regular Deque of matching docs while holding a lock is OK
    • Alternatively try a ConcurrentLinkedQueue
  • Then try to extract a component that does the parallel processing in a way that's more self contained

WDYT?

Copy link
Contributor Author

@schmidt-sebastian schmidt-sebastian Jun 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to make this more generic, but I didn't want this to turn into a 500 line change with multiple new classes. I'll give it another go though.

Re: Query processing. Query matching is not thread safe (because of

) and If we do want to perform this concurrently, then I would rather tackle this in a follow-up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this turn into a problem in practice? What if we access the orderBy in this thread before enqueuing any work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikelehen mostly convince me that Query matching is not a problem in practice. I also believe that Query matching itself is not a huge bottleneck, but it doesn't hurt to also execute it in the background, especially given that we are already running a task there.

I rewrote the code in this function to use the Tasks API. By that I mean I added APIs to Util that mirror their equivalent methods of the Task API, but with some significant differences:

  • My version of await can be called on the main thread to support unit testing of the RemoteDocumentStore.
  • My version of whenAllComplete doesn't execute the continuation on the main queue, which blocks our SpecTests on Robolectric.

If you like this approach, I can write dedicated tests for these helpers. We might also want to move them to a dedicated place (such as TaskUtil).


++pendingTaskCount[0];

// Since scheduling background tasks incurs overhead, we only dispatch to a background
// thread if there are still some documents remaining.
Executor deserializationExecutor =
row.isLast() ? Executors.DIRECT_EXECUTOR : Executors.BACKGROUND_EXECUTOR;
deserializationExecutor.execute(
() -> {
MaybeDocument maybeDoc = decodeMaybeDocument(rawContents);
if (maybeDoc instanceof Document) {
allDocuments.put(maybeDoc.getKey(), (Document) maybeDoc);
}
completedTasks.release();
});
});

return ImmutableSortedMap.Builder.fromMap(results, DocumentKey.comparator());
try {
completedTasks.acquire(pendingTaskCount[0]);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

ImmutableSortedMap<DocumentKey, Document> matchingDocuments =
ImmutableSortedMap.Builder.emptyMap(DocumentKey.comparator());
for (Map.Entry<DocumentKey, Document> entry : allDocuments.entrySet()) {
if (query.matches(entry.getValue())) {
matchingDocuments = matchingDocuments.insert(entry.getKey(), entry.getValue());
}
}
return matchingDocuments;
}

private String pathForKey(DocumentKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

import androidx.annotation.NonNull;

/** Represents a particular database in Firestore */
/**
* Represents a particular database in Firestore.
*
* <p>This class is thread-safe.
*/
public final class DatabaseId implements Comparable<DatabaseId> {
public static final String DEFAULT_DATABASE_ID = "(default)";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.google.firebase.firestore.remote;

import android.content.Context;
import android.os.AsyncTask;
import androidx.annotation.VisibleForTesting;
import com.google.android.gms.common.GooglePlayServicesNotAvailableException;
import com.google.android.gms.common.GooglePlayServicesRepairableException;
Expand All @@ -24,6 +23,7 @@
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.core.DatabaseInfo;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.Executors;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Supplier;
import com.google.firestore.v1.FirestoreGrpc;
Expand Down Expand Up @@ -69,11 +69,11 @@ public static void overrideChannelBuilder(
CallCredentials firestoreHeaders) {
this.asyncQueue = asyncQueue;

// We execute network initialization on a separate thred to not block operations that depend on
// We execute network initialization on a separate thread to not block operations that depend on
// the AsyncQueue.
this.channelTask =
Tasks.call(
AsyncTask.THREAD_POOL_EXECUTOR,
Executors.BACKGROUND_EXECUTOR,
() -> {
ManagedChannel channel = initChannel(context, databaseInfo);
FirestoreGrpc.FirestoreStub firestoreStub =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@
import java.util.Map;
import java.util.Set;

/** Serializer that converts to and from Firestore API protos. */
/**
* Serializer that converts to and from Firestore API protos.
*
* <p>This class is thread-safe.
*/
public final class RemoteSerializer {

private final DatabaseId databaseId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,22 @@

package com.google.firebase.firestore.util;

import android.os.AsyncTask;
import com.google.android.gms.tasks.TaskExecutors;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

/** Helper class for executors. */
public final class Executors {
/**
* The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR.
*
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and addresses
* its queue size limit of 120 pending tasks.
*/
private static final int MAXIMUM_CONCURRENT_BACKGROUND_TASKS = 4;

/**
* The default executor for user visible callbacks. It is an executor scheduling callbacks on
Expand All @@ -29,6 +40,39 @@ public final class Executors {
/** An executor that executes the provided runnable immediately on the current thread. */
public static final Executor DIRECT_EXECUTOR = Runnable::run;

/**
* An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR.
*
* <p>Unlike the main THREAD_POOL_EXECUTOR, this executor manages its own queue of tasks and can
* handle an unbounded number of pending tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an unbounded number of pending tasks seems problematic from a memory usage point of view.

Previously we'd unpack one blob, parse it, compare it, and then allow it to be GCed.

Now, if we get significantly ahead of the background threads' ability to clear this queue each queued document will be resident. Previously, ignoring constant factors around the current document we're looking at, our memory usage was essentially O(matching documents), but this increases it to potentially be O(all documents in a collection). This could be a huge difference. We should block admission into the queue if the background threads aren't cleaning up.

A straightforward way to bound this is to just use a semaphore with a fixed number of permits to guard admission. This will have the side effect of fixing the thread-safety issue you've documented below, which makes me uncomfortable.

*/
public static final Executor BACKGROUND_EXECUTOR =
new Executor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't make major components we're going to reason about like this anonymous classes. Make it a private static final class with a name. This will make stack traces much, much saner.

AtomicInteger activeRunnerCount = new AtomicInteger(0);
Queue<Runnable> pendingTasks = new ConcurrentLinkedQueue<>();

@Override
public void execute(Runnable command) {
pendingTasks.add(command);

if (activeRunnerCount.get() < MAXIMUM_CONCURRENT_BACKGROUND_TASKS) {
// Note that the runner count could temporarily exceed
// MAXIMUM_CONCURRENT_BACKGROUND_TASKS if this is code path was executed in parallel.
// While undesired, this would merely queue another task on THREAD_POOL_EXECUTOR,
// and we are unlikely to hit the 120 pending task limit.
activeRunnerCount.incrementAndGet();
AsyncTask.THREAD_POOL_EXECUTOR.execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to handle the fact that AsyncTask.THREAD_POOL_EXECUTOR.execute can reject adding this task.

We should implement a caller runs policy, rather than trying to block on the executor becoming available. i.e.

// acquire permit
Runnable wrappedCommand = // thing that runs and then releases semaphore
try {
  AsyncTask.THREAD_POOL_EXECUTOR.execute(wrappedCommand);
} catch (RejectedExecutionException ignored) {
  wrappedCommand.run();
} 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it occurs to me that there's essentially no reason to block under any circumstance. If we choose to implement a semaphore to limit the number of tasks we admit into the underlying executor, we could tryAcquire, and it we don't get a ticket just run the (unwrapped) command directly rather than blocking.

We probably still want a semaphore so that we don't abuse the AsyncTask.THREAD_POOL_EXECUTOR. If not for the fact that other participants are unlikely to be well behaved, we could just rely on the rejected exceptions + caller runs to implement pushback.

() -> {
Runnable r;
while ((r = pendingTasks.poll()) != null) {
r.run();
}
activeRunnerCount.decrementAndGet();
});
}
}
};

private Executors() {
// Private constructor to prevent initialization
}
Expand Down