-
Notifications
You must be signed in to change notification settings - Fork 619
Performance: Decode documents in background thread #559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 32 commits
4da49cf
629cd45
f9c9c1f
00717dc
c5afc57
d93feaf
95de378
d9ef372
37d4803
f2e25f0
45ca056
1799a29
94b5b94
1e99b39
6904cc9
6c663cb
4fa2fef
e4a3644
5237be0
06173ad
afc4382
2075fc7
21c8562
adda141
0afb5d7
5eca22c
b9e15bb
a2fba58
bd24bfe
0b2d9ad
7fabd7f
6fb8e9f
1c8c04c
28bda9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// Copyright 2019 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package com.google.firebase.firestore.util; | ||
|
||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.Semaphore; | ||
|
||
/** | ||
* An executor that forwards executions to another executor, but caps the number of pending | ||
* operations. Tasks scheduled past the specified limit are directly invoked on the calling thread, | ||
* reducing the total memory consumed by pending tasks. | ||
*/ | ||
class ThrottledForwardingExecutor implements Executor { | ||
private final Executor executor; | ||
private final Semaphore availableSlots; | ||
|
||
/** | ||
* Instantiates a new ThrottledForwardingExecutor. | ||
* | ||
* @param maximumConcurrency The maximum number of pending tasks to schedule on the provided | ||
* executor. | ||
* @param executor The executor to forward tasks to. | ||
*/ | ||
ThrottledForwardingExecutor(int maximumConcurrency, Executor executor) { | ||
this.availableSlots = new Semaphore(maximumConcurrency); | ||
this.executor = executor; | ||
} | ||
|
||
/** | ||
* Forwards a task to the provided executor if the current number of pending tasks is less than | ||
* the configured limit. Otherwise, executes the task directly. | ||
* | ||
* @param command The task to run. | ||
*/ | ||
@Override | ||
public void execute(Runnable command) { | ||
if (availableSlots.tryAcquire()) { | ||
try { | ||
executor.execute( | ||
() -> { | ||
command.run(); | ||
availableSlots.release(); | ||
}); | ||
} catch (RejectedExecutionException e) { | ||
command.run(); | ||
} | ||
} else { | ||
command.run(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
// Copyright 2019 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package com.google.firebase.firestore.util; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.fail; | ||
|
||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.RejectedExecutionException; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import org.junit.Test; | ||
|
||
public class ThrottledForwardingExecutorTest { | ||
@Test | ||
public void limitsNumberOfForwardedTasks() throws InterruptedException { | ||
Semaphore completedTasks = new Semaphore(0); | ||
int maximumConcurrency = 10; | ||
|
||
CountingExecutor countingExecutor = new CountingExecutor(); | ||
ThrottledForwardingExecutor throttledExecutor = | ||
new ThrottledForwardingExecutor(maximumConcurrency, countingExecutor); | ||
|
||
// Schedule more than `maximumConcurrency` parallel tasks and wait until all scheduling has | ||
// finished. | ||
int numTasks = maximumConcurrency + 1; | ||
CountDownLatch schedulingLatch = new CountDownLatch(1); | ||
for (int i = 0; i < numTasks; ++i) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't seem like there's any difference between Similarly, numTasks isn't used except as the loop bound, maybe just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Furthermore, |
||
int currentTask = i; | ||
throttledExecutor.execute( | ||
() -> { | ||
try { | ||
if (currentTask < maximumConcurrency) { | ||
// Block if we are running on the forwarded executor. We can't block the thread that | ||
// is running this test. | ||
schedulingLatch.await(); | ||
} | ||
completedTasks.release(); | ||
} catch (InterruptedException e) { | ||
fail("Unexpected InterruptedException: " + e); | ||
} | ||
}); | ||
} | ||
schedulingLatch.countDown(); | ||
|
||
// Verify that only `maximumConcurrency` tasks were forwarded to the executor. | ||
completedTasks.acquire(numTasks); | ||
assertEquals(maximumConcurrency, countingExecutor.getNumTasks()); | ||
} | ||
|
||
@Test | ||
public void handlesRejectedExecutionException() { | ||
AtomicInteger result = new AtomicInteger(0); | ||
|
||
ThrottledForwardingExecutor executor = | ||
new ThrottledForwardingExecutor( | ||
10, | ||
command -> { | ||
throw new RejectedExecutionException(); | ||
}); | ||
|
||
executor.execute(result::incrementAndGet); | ||
|
||
assertEquals(1, result.get()); | ||
} | ||
|
||
/** An executor that counts the number of tasks submitted. */ | ||
private static class CountingExecutor implements Executor { | ||
int numTasks = 0; | ||
|
||
@Override | ||
public void execute(Runnable command) { | ||
++numTasks; | ||
new Thread() { | ||
@Override | ||
public void run() { | ||
command.run(); | ||
} | ||
}.start(); | ||
} | ||
|
||
int getNumTasks() { | ||
return numTasks; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This particular class in which you've placed this class is huge. I'd love to see us move things out of here if possible.
Why not promote this to top-level? It's also totally generic, so you could also dump it in
util
, but leaving itlocal
seems fine too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a top-level class in Util. For now, it doesn't have tests though.