Skip to content

Performance: Decode documents in background thread #559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4da49cf
Decode documents in background thread
schmidt-sebastian Jun 21, 2019
629cd45
Merge branch 'master' into mrschmidt/parallel
schmidt-sebastian Jun 25, 2019
f9c9c1f
Use Task API
schmidt-sebastian Jun 25, 2019
00717dc
Realize Android Tasks are slow
schmidt-sebastian Jun 25, 2019
c5afc57
RFC: ThrottledForwardingExecutor
schmidt-sebastian Jun 26, 2019
d93feaf
Add IN and ARRAY_CONTAINS_ANY (not publicly exposed) (#519)
Jun 20, 2019
95de378
Add Remote Config smoke tests. (#543)
allisonbm92 Jun 20, 2019
d9ef372
Generalize parent repository logic. (#548)
allisonbm92 Jun 20, 2019
37d4803
Open source firebase remote config and firebase abt
miraziz Jun 20, 2019
f2e25f0
Backporting documentation changes (#550)
rsgowman Jun 21, 2019
45ca056
Update Remote Config and AB Testing versions (#553)
allisonbm92 Jun 21, 2019
1799a29
Disable FTL for AB Testing (#554)
allisonbm92 Jun 21, 2019
94b5b94
Move RC to firebase-config. (#555)
allisonbm92 Jun 21, 2019
1e99b39
Fix FirebaseFirestore formatting. (#556)
allisonbm92 Jun 21, 2019
6904cc9
Remove move product flavors from smoke tests. (#557)
allisonbm92 Jun 21, 2019
6c663cb
Add option to run tests against emulator (#540)
Jun 21, 2019
4fa2fef
Use Task API
schmidt-sebastian Jun 25, 2019
e4a3644
Realize Android Tasks are slow
schmidt-sebastian Jun 25, 2019
5237be0
RFC: ThrottledForwardingExecutor
schmidt-sebastian Jun 26, 2019
06173ad
Update documentation to reflect emulator is default for integration t…
Jun 24, 2019
afc4382
Allow IN queries with arrays of documentIds (#560)
Jun 24, 2019
2075fc7
Migrate ExperimentPayload to AB Testing (#563)
allisonbm92 Jun 25, 2019
21c8562
Run IN query integration tests only when running against emulator (#567)
Jun 25, 2019
adda141
Add Inapp Messaging smoke test. (#569)
allisonbm92 Jun 25, 2019
0afb5d7
Add Proguard Files to Smoke Tests (#572)
allisonbm92 Jun 26, 2019
5eca22c
Move DocumentId to public space (#571)
Jun 26, 2019
b9e15bb
fdsdfs
schmidt-sebastian Jun 27, 2019
a2fba58
Finishing touches
schmidt-sebastian Jun 27, 2019
bd24bfe
Merge
schmidt-sebastian Jun 27, 2019
0b2d9ad
Merge
schmidt-sebastian Jun 27, 2019
7fabd7f
Fix merge
schmidt-sebastian Jun 27, 2019
6fb8e9f
Cleanup
schmidt-sebastian Jun 27, 2019
1c8c04c
Make BackgroundQueue a top-level class
schmidt-sebastian Jul 2, 2019
28bda9e
Merge branch 'master' into mrschmidt/parallel
schmidt-sebastian Jul 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions firebase-firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Unreleased
- [fixed] Fixed an internal assertion that was triggered when an
update with a `FieldValue.serverTimestamp()` and an update with a
- [fixed] Fixed an internal assertion that was triggered when an update
with a `FieldValue.serverTimestamp()` and an update with a
`FieldValue.increment()` were pending for the same document (#491).
- [changed] Improved performance of queries with large result sets.
- [changed] Improved performance for queries with filters that only return a
small subset of the documents in a collection.
- [changed] Instead of failing silently, Firestore now crashes the client app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@
import com.google.firebase.database.collection.ImmutableSortedMap;
import com.google.firebase.firestore.core.Query;
import com.google.firebase.firestore.model.Document;
import com.google.firebase.firestore.model.DocumentCollections;
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.MaybeDocument;
import com.google.firebase.firestore.model.ResourcePath;
import com.google.firebase.firestore.util.BackgroundQueue;
import com.google.firebase.firestore.util.Executors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;

final class SQLiteRemoteDocumentCache implements RemoteDocumentCache {
Expand Down Expand Up @@ -118,7 +122,11 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
String prefixPath = EncodedPath.encode(prefix);
String prefixSuccessorPath = EncodedPath.prefixSuccessor(prefixPath);

Map<DocumentKey, Document> results = new HashMap<>();
BackgroundQueue backgroundQueue = new BackgroundQueue();

ImmutableSortedMap<DocumentKey, Document>[] matchingDocuments =
(ImmutableSortedMap<DocumentKey, Document>[])
new ImmutableSortedMap[] {DocumentCollections.emptyDocumentMap()};

db.query("SELECT path, contents FROM remote_documents WHERE path >= ? AND path < ?")
.binding(prefixPath, prefixSuccessorPath)
Expand All @@ -136,20 +144,31 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
return;
}

MaybeDocument maybeDoc = decodeMaybeDocument(row.getBlob(1));
if (!(maybeDoc instanceof Document)) {
return;
}

Document doc = (Document) maybeDoc;
if (!query.matches(doc)) {
return;
}

results.put(doc.getKey(), doc);
byte[] rawDocument = row.getBlob(1);

// Since scheduling background tasks incurs overhead, we only dispatch to a background
// thread if there are still some documents remaining.
Executor executor = row.isLast() ? Executors.DIRECT_EXECUTOR : backgroundQueue;
executor.execute(
() -> {
MaybeDocument maybeDoc = decodeMaybeDocument(rawDocument);

if (maybeDoc instanceof Document && query.matches((Document) maybeDoc)) {
synchronized (SQLiteRemoteDocumentCache.this) {
matchingDocuments[0] =
matchingDocuments[0].insert(maybeDoc.getKey(), (Document) maybeDoc);
}
}
});
});

return ImmutableSortedMap.Builder.fromMap(results, DocumentKey.comparator());
try {
backgroundQueue.drain();
} catch (InterruptedException e) {
fail("Interrupted while deserializing documents", e);
}

return matchingDocuments[0];
}

private String pathForKey(DocumentKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.google.firebase.firestore.remote;

import android.content.Context;
import android.os.AsyncTask;
import androidx.annotation.VisibleForTesting;
import com.google.android.gms.common.GooglePlayServicesNotAvailableException;
import com.google.android.gms.common.GooglePlayServicesRepairableException;
Expand All @@ -24,6 +23,7 @@
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.core.DatabaseInfo;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.Executors;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Supplier;
import com.google.firestore.v1.FirestoreGrpc;
Expand Down Expand Up @@ -69,11 +69,11 @@ public static void overrideChannelBuilder(
CallCredentials firestoreHeaders) {
this.asyncQueue = asyncQueue;

// We execute network initialization on a separate thred to not block operations that depend on
// We execute network initialization on a separate thread to not block operations that depend on
// the AsyncQueue.
this.channelTask =
Tasks.call(
AsyncTask.THREAD_POOL_EXECUTOR,
Executors.BACKGROUND_EXECUTOR,
() -> {
ManagedChannel channel = initChannel(context, databaseInfo);
FirestoreGrpc.FirestoreStub firestoreStub =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.firebase.firestore.util;

import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;

/**
* A simple queue that executes tasks in parallel on the Android's AsyncTask.THREAD_POOL_EXECUTOR
* and supports blocking on their completion.
*
* <p>This class is not thread-safe. In particular, `execute()` and `drain()` should not be called
* from parallel threads.
*/
public class BackgroundQueue implements Executor {
private Semaphore completedTasks = new Semaphore(0);
private int pendingTaskCount = 0;

/** Enqueue a task on Android's THREAD_POOL_EXECUTOR. */
@Override
public void execute(Runnable task) {
++pendingTaskCount;
Executors.BACKGROUND_EXECUTOR.execute(
() -> {
task.run();
completedTasks.release();
});
}

/** Wait for all currently scheduled tasks to complete. */
public void drain() throws InterruptedException {
completedTasks.acquire(pendingTaskCount);
pendingTaskCount = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@

package com.google.firebase.firestore.util;

import android.os.AsyncTask;
import com.google.android.gms.tasks.TaskExecutors;
import java.util.concurrent.Executor;

/** Helper class for executors. */
public final class Executors {
/**
* The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR.
*
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and is well
* below the queue size limit of 120 pending tasks. Limiting our usage of the THREAD_POOL_EXECUTOR
* allows other users to schedule their own operations on the shared THREAD_POOL_EXECUTOR.
*/
private static final int ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY = 4;

/**
* The default executor for user visible callbacks. It is an executor scheduling callbacks on
Expand All @@ -29,6 +38,11 @@ public final class Executors {
/** An executor that executes the provided runnable immediately on the current thread. */
public static final Executor DIRECT_EXECUTOR = Runnable::run;

/** An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR. */
public static final Executor BACKGROUND_EXECUTOR =
new ThrottledForwardingExecutor(
ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY, AsyncTask.THREAD_POOL_EXECUTOR);

private Executors() {
// Private constructor to prevent initialization
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.firebase.firestore.util;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

/**
* An executor that forwards executions to another executor, but caps the number of pending
* operations. Tasks scheduled past the specified limit are directly invoked on the calling thread,
* reducing the total memory consumed by pending tasks.
*/
class ThrottledForwardingExecutor implements Executor {
private final Executor executor;
private final Semaphore availableSlots;

/**
* Instantiates a new ThrottledForwardingExecutor.
*
* @param maximumConcurrency The maximum number of pending tasks to schedule on the provided
* executor.
* @param executor The executor to forward tasks to.
*/
ThrottledForwardingExecutor(int maximumConcurrency, Executor executor) {
this.availableSlots = new Semaphore(maximumConcurrency);
this.executor = executor;
}

/**
* Forwards a task to the provided executor if the current number of pending tasks is less than
* the configured limit. Otherwise, executes the task directly.
*
* @param command The task to run.
*/
@Override
public void execute(Runnable command) {
if (availableSlots.tryAcquire()) {
try {
executor.execute(
() -> {
command.run();
availableSlots.release();
});
} catch (RejectedExecutionException e) {
command.run();
}
} else {
command.run();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.firebase.firestore.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

public class ThrottledForwardingExecutorTest {
@Test
public void limitsNumberOfForwardedTasks() throws InterruptedException {
Semaphore completedTasks = new Semaphore(0);
int maximumConcurrency = 10;

CountingExecutor countingExecutor = new CountingExecutor();
ThrottledForwardingExecutor throttledExecutor =
new ThrottledForwardingExecutor(maximumConcurrency, countingExecutor);

// Schedule more than `maximumConcurrency` parallel tasks and wait until all scheduling has
// finished.
int numTasks = maximumConcurrency + 1;
CountDownLatch schedulingLatch = new CountDownLatch(1);
for (int i = 0; i < numTasks; ++i) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem like there's any difference between currentTask and i. Maybe just make the loop use currentTask?

Similarly, numTasks isn't used except as the loop bound, maybe just use maximumConcurrency + 1 as the loop condition? (I bet if you shortened it to maxConcurrency it might even all still fit in one line.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentTask is the "effectively final" version of i and used in the inner class created below.

Furthermore, numTask is actually used twice - once in the loop and then below in line 60. For that reason, I left it as is.

int currentTask = i;
throttledExecutor.execute(
() -> {
try {
if (currentTask < maximumConcurrency) {
// Block if we are running on the forwarded executor. We can't block the thread that
// is running this test.
schedulingLatch.await();
}
completedTasks.release();
} catch (InterruptedException e) {
fail("Unexpected InterruptedException: " + e);
}
});
}
schedulingLatch.countDown();

// Verify that only `maximumConcurrency` tasks were forwarded to the executor.
completedTasks.acquire(numTasks);
assertEquals(maximumConcurrency, countingExecutor.getNumTasks());
}

@Test
public void handlesRejectedExecutionException() {
AtomicInteger result = new AtomicInteger(0);

ThrottledForwardingExecutor executor =
new ThrottledForwardingExecutor(
10,
command -> {
throw new RejectedExecutionException();
});

executor.execute(result::incrementAndGet);

assertEquals(1, result.get());
}

/** An executor that counts the number of tasks submitted. */
private static class CountingExecutor implements Executor {
int numTasks = 0;

@Override
public void execute(Runnable command) {
++numTasks;
new Thread() {
@Override
public void run() {
command.run();
}
}.start();
}

int getNumTasks() {
return numTasks;
}
}
}