Skip to content

Performance: Decode documents in background thread #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jul 2, 2019

Conversation

schmidt-sebastian
Copy link
Contributor

@schmidt-sebastian schmidt-sebastian commented Jun 21, 2019

This PR reduces the time it takes to execute getAllDocumentsMatchingQuery by moving the Proto serialization into Android's background queue.

Note that the only real complication here is that Android's THREAD_POOL_EXECUTOR uses a LinkedBlockingQueue with a fixed size of 120. We can't enqueue more than 120 elements without triggering RejectedExecutionsExceptions. Instead, this PR uses its own queue and up to 4 runners (the core pool size of the THREAD_POOL_EXECUTOR).

On a Nexus 5X, reading 1700 documents from a single collection takes:

  • without PR: 3621ms, 3600ms, 3637ms, 3550ms, 3593ms (avg 3605 ms)
  • with this PR: 1498ms, 1438ms, 1502ms, 1496ms, 1433ms (avg 1473 ms)

Reading a collection with 10 document 100 times:

  • without PR: 2295 ms, 2280 ms, 2319 ms, 2296ms, 2315ms (avg 2301 ms)
  • with PR: 1380ms, 1646 ms, 1503 ms, 1191ms, 1451 ms (avg 1416 ms)

Reading a collection with 2 document 1000 times:

  • without PR: 8902ms, 8520ms, 7472ms, 7953ms, 10907ms (avg 8751 ms)
  • with PR: 7901ms, 8458ms, 9109ms, 8730ms, 778ms1 (avg 8395 ms)

Reading a collection with 1 document 1000 times:

  • without PR: 780ms, 763ms, 724ms, 774ms, 727ms (avg 753 ms)
  • with PR: 796ms, 752ms, 724ms, 786ms, 722ms (avg 756 ms)

Note that without the special handling of one document collections, the read time for a 1 document collection roughly doubled (from .7 ms to 1.5 ms).

@googlebot googlebot added cla: yes Override cla labels Jun 21, 2019
@schmidt-sebastian schmidt-sebastian force-pushed the mrschmidt/parallel branch 2 times, most recently from 84bea0f to 8e5872e Compare June 21, 2019 22:29
@schmidt-sebastian
Copy link
Contributor Author

/retest

@@ -1,4 +1,6 @@
# Unreleased
- [changed] Reduced execution time of queries with large result sets by up to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "reduced" as the verb here seems like the wrong pairing. Reduce resource consumption but improve or (increase) performance seem more customary.

How about "Improved the performance of queries with large result sets by up to 60%"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

results.put(doc.getKey(), doc);
byte[] rawContents = row.getBlob(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one method is now huge, tightly intermingling threading implementation details with the query processing. Most of this could be generic. There are other circumstances where we potentially process large numbers of rows (e.g. the mutation queue) and this could be extended to those places. Probably even more importantly, if we pull this out into something separate we could test it directly :-).

One thing that's preventing this from being generic is the intermediate results are stored in a concurrent map, which ties the results to the query structure. However, we don't need this to be a map of key to document until we're assembling the result. We could just as easily publish to each document into a dequeue.

Additional notes:

  • This implementation leaves time on the table by not starting query matching until all the protos are decoded. The Firestore worker thread that was processing query results can't start assembling the result before the worker threads are done.
  • Also, we're performing query matching serially on the Firestore worker, when we could be performing it in the parallel block.
  • This is a very low number of threads that are all doing significant work per item. We could probably get away with just a simple lock wrapping our output without building an intermediate ConcurrentHashMap.

If we were targeting Java 8, I'd suggest that you've essentially reimplemented a subset of java.util.stream. There are ports of that to Android API < 24, but that's a lot of code for something that's pretty small.

At a high level this could be phrased as map/reduce, where the map phase is parsing and filtering (first by instanceof Document, second by query.matches). The reduce phase is the assembly of the matchingDocuments map.

I suggest a few refactoring experiments:

  • See if pulling query matching into the worker threads helps
  • See if directly assembling the ImmutableSortedMap while holding a lock works
    • If not, see if a regular Deque of matching docs while holding a lock is OK
    • Alternatively try a ConcurrentLinkedQueue
  • Then try to extract a component that does the parallel processing in a way that's more self contained

WDYT?

Copy link
Contributor Author

@schmidt-sebastian schmidt-sebastian Jun 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to make this more generic, but I didn't want this to turn into a 500 line change with multiple new classes. I'll give it another go though.

Re: Query processing. Query matching is not thread safe (because of

) and If we do want to perform this concurrently, then I would rather tackle this in a follow-up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this turn into a problem in practice? What if we access the orderBy in this thread before enqueuing any work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikelehen mostly convince me that Query matching is not a problem in practice. I also believe that Query matching itself is not a huge bottleneck, but it doesn't hurt to also execute it in the background, especially given that we are already running a task there.

I rewrote the code in this function to use the Tasks API. By that I mean I added APIs to Util that mirror their equivalent methods of the Task API, but with some significant differences:

  • My version of await can be called on the main thread to support unit testing of the RemoteDocumentStore.
  • My version of whenAllComplete doesn't execute the continuation on the main queue, which blocks our SpecTests on Robolectric.

If you like this approach, I can write dedicated tests for these helpers. We might also want to move them to a dedicated place (such as TaskUtil).

@schmidt-sebastian
Copy link
Contributor Author

Taking this one back for now, since the Tasks change likely makes single document fetches much slower. It always queues an execution on a background thread.

@schmidt-sebastian
Copy link
Contributor Author

schmidt-sebastian commented Jun 25, 2019

I updated the PR to add a new queue for background tasks, which doesn't rely on Android's Task API to do its management (since that would move all state keeping to a background thread). The queue is not super general-purpose, but it works for the use case in this PR and should also work if we choose to do something similar in the mutation queue.

Copy link
Contributor

@wilhuff wilhuff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have only reviewed the executor here

* handle an unbounded number of pending tasks.
*/
public static final Executor BACKGROUND_EXECUTOR =
new Executor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't make major components we're going to reason about like this anonymous classes. Make it a private static final class with a name. This will make stack traces much, much saner.

* An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR.
*
* <p>Unlike the main THREAD_POOL_EXECUTOR, this executor manages its own queue of tasks and can
* handle an unbounded number of pending tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an unbounded number of pending tasks seems problematic from a memory usage point of view.

Previously we'd unpack one blob, parse it, compare it, and then allow it to be GCed.

Now, if we get significantly ahead of the background threads' ability to clear this queue each queued document will be resident. Previously, ignoring constant factors around the current document we're looking at, our memory usage was essentially O(matching documents), but this increases it to potentially be O(all documents in a collection). This could be a huge difference. We should block admission into the queue if the background threads aren't cleaning up.

A straightforward way to bound this is to just use a semaphore with a fixed number of permits to guard admission. This will have the side effect of fixing the thread-safety issue you've documented below, which makes me uncomfortable.

// While undesired, this would merely queue another task on THREAD_POOL_EXECUTOR,
// and we are unlikely to hit the 120 pending task limit.
activeRunnerCount.incrementAndGet();
AsyncTask.THREAD_POOL_EXECUTOR.execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to handle the fact that AsyncTask.THREAD_POOL_EXECUTOR.execute can reject adding this task.

We should implement a caller runs policy, rather than trying to block on the executor becoming available. i.e.

// acquire permit
Runnable wrappedCommand = // thing that runs and then releases semaphore
try {
  AsyncTask.THREAD_POOL_EXECUTOR.execute(wrappedCommand);
} catch (RejectedExecutionException ignored) {
  wrappedCommand.run();
} 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it occurs to me that there's essentially no reason to block under any circumstance. If we choose to implement a semaphore to limit the number of tasks we admit into the underlying executor, we could tryAcquire, and it we don't get a ticket just run the (unwrapped) command directly rather than blocking.

We probably still want a semaphore so that we don't abuse the AsyncTask.THREAD_POOL_EXECUTOR. If not for the fact that other participants are unlikely to be well behaved, we could just rely on the rejected exceptions + caller runs to implement pushback.

Copy link
Contributor

@wilhuff wilhuff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed the review.

The shape of this is now pretty great, but I have a bunch of questions/notes about the details.

// While undesired, this would merely queue another task on THREAD_POOL_EXECUTOR,
// and we are unlikely to hit the 120 pending task limit.
activeRunnerCount.incrementAndGet();
AsyncTask.THREAD_POOL_EXECUTOR.execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it occurs to me that there's essentially no reason to block under any circumstance. If we choose to implement a semaphore to limit the number of tasks we admit into the underlying executor, we could tryAcquire, and it we don't get a ticket just run the (unwrapped) command directly rather than blocking.

We probably still want a semaphore so that we don't abuse the AsyncTask.THREAD_POOL_EXECUTOR. If not for the fact that other participants are unlikely to be well behaved, we could just rely on the rejected exceptions + caller runs to implement pushback.

import java.util.concurrent.Semaphore;

/**
* A queue for parallel execution of independent tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should describe more about what's going on here.

First: why does this thing exist? There are plenty of mechanisms for parallel execution of independent tasks out there. In six months we're going to be wondering why this didn't just use Android Tasks or otherwise use some of the existing mechanisms you tried and discarded.

Second: what's the intended use case? Normally people think of task queues as things that live for a long time. This thing is meant to represent a stream of small work items that make up some larger unit of work, where that larger unit work needs to be something that could be completed. Instances of the class should essentially only live for the duration of that larger unit of work.

Your thread safety comment sort of hints at this, but doesn't spell it out. The intent is that there's some single thread producing work items to insert here. This can't be used in circumstances where there are multiple producer threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer addressing this comment in case we end up removing this class again.

*
* <p>This class is not thread-safe. All public methods must be called from the same thread.
*/
public class TaskQueue<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is pretty generic, and especially when compared against AsyncQueue which already exists I start to wonder when I might use one over the other. Also, AsyncQueue executes serially so it could be helpful to name this in a way that explicitly shows itself to be non-serial.

Consider renaming this to something that more closely suggests its purpose: ParallelMapper? ParallelRunner? BackgroundTaskRunner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer addressing this comment in case we end up removing this class again.

}

@Override
public int compareTo(TaskResult<T> other) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does execution order matter? In the one case we have this doesn't matter because each result we're just feeding into map that will order by key.

I suppose maybe this matters in the case of processing the mutation queue? If so please adjust the comment above about the rationale for this. "Allowing for sorting" doesn't intrinsically justify why we're doing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this class is a lot more useful if it returns an ordered set of results. It’s true that this is not needed here - but if we simplify it to what’s actual needed we just end up with a task counter and a semaphore (we also never throw checked exceptions). We might also not want to store the intermediate map that often mostly contains null values.
All that brings me back to the original version of this PR, which was simpler and a bit more explicit about what it did at the cost of increasing method size.

TaskResult<T> currentResult = taskResults.take();

if (currentResult.exception != null) {
throw new ExecutionException("Unhandled exception in task", currentResult.exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming task result ordering doesn't matter, since your policy here is just to throw on the first exception we encountered you could save all this translation code (and the TaskResult type declaration even) if you made the completedTasks buffer just contain T (and then stored the first exception separately).

}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry slightly that this will return partial allResults if this method is interrupted. If the caller performs no further I/O or blocking operations, they may not see another exception and may proceed as if the contents of allResults are all the actual results. It seems risky to rely on something else to observe the interrupted status to conclude that the results are actually incorrect.

go/java-practices/interruptedexception recommends throwing an alternative exception (after re-interrupting). Should we just propagate InterruptedException here and let the caller figure it out? Alternatively, should we throw the appropriate FirebaseFirestoreException already here? It's going to be hard to describe what we were trying to do, so it seems like propagating is like the right thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The page you linked to mostly deals with cancelled RPCs, in which case an Interrupt might be an expected event that needs to be treated explicitly. If we follow this advice, then awaitResults needs to re-throw some checked exception (either InterruptedException or FirebaseFirestoreException). This places all the burden on the callsite and again makes me question the usefulness of this class.

If we do see an InterruptedException here, we can probably just assume that the client will no longer be in a useable state after (acquire() and take()` always run on our AsyncQueue). I'm trying to convince myself that throwing an AssertionFailure is the right thing to do here.

schmidt-sebastian and others added 12 commits June 27, 2019 14:26
* Migrate ExperimentPayload to AB Testing

* Add issue link.

* Export proto definition.
Inapp Messaging does not have an interface to trigger updates from the
device. As a result, this test simply verifies intialization and build
compatibility.

However, the latest release of Inapp Messaging is build incompatible
with the head state of AB Testing. As such, this test is disabled for
now.
This change adds the necessary proguard files and a configuration block
for release builds. However, this change does not yet enable testing
against the release build.
* Move DocumentId to public space

* add nest object testing

* add changelog
@schmidt-sebastian
Copy link
Contributor Author

Updated the PR based on offline discussion:

  • Added tests/comments for ThrottledForwardingExecutor.
  • Made the "TaskQueue" much less generic and only capable of handling the needs of SQLiteRemoteDocumentCache.

@schmidt-sebastian
Copy link
Contributor Author

Updated the PR based on offline discussion:

  • Added tests/comments for ThrottledForwardingExecutor.
  • Made the "TaskQueue" much less generic and only capable of handling the needs of SQLiteRemoteDocumentCache.

@wilhuff When you have time, can you take another look?

Copy link
Contributor

@wilhuff wilhuff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a few final nits.

* <p>This class is not thread-safe. In particular, `execute()` and `drain()` should not be called
* from parallel threads.
*/
static class BackgroundQueue implements Executor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This particular class in which you've placed this class is huge. I'd love to see us move things out of here if possible.

Why not promote this to top-level? It's also totally generic, so you could also dump it in util, but leaving it local seems fine too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a top-level class in Util. For now, it doesn't have tests though.

// finished.
int numTasks = maximumConcurrency + 1;
CountDownLatch schedulingLatch = new CountDownLatch(1);
for (int i = 0; i < numTasks; ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem like there's any difference between currentTask and i. Maybe just make the loop use currentTask?

Similarly, numTasks isn't used except as the loop bound, maybe just use maximumConcurrency + 1 as the loop condition? (I bet if you shortened it to maxConcurrency it might even all still fit in one line.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentTask is the "effectively final" version of i and used in the inner class created below.

Furthermore, numTask is actually used twice - once in the loop and then below in line 60. For that reason, I left it as is.

@wilhuff wilhuff assigned schmidt-sebastian and unassigned wilhuff Jul 2, 2019
@schmidt-sebastian schmidt-sebastian merged commit 276f279 into master Jul 2, 2019
@schmidt-sebastian schmidt-sebastian deleted the mrschmidt/parallel branch July 3, 2019 15:46
@firebase firebase locked and limited conversation to collaborators Oct 9, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
cla: yes Override cla size/L
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants