Skip to content

Commit 5237be0

Browse files
RFC: ThrottledForwardingExecutor
1 parent e4a3644 commit 5237be0

File tree

2 files changed

+53
-37
lines changed

2 files changed

+53
-37
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/util/Executors.java

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,18 @@
1616

1717
import android.os.AsyncTask;
1818
import com.google.android.gms.tasks.TaskExecutors;
19-
import java.util.Queue;
20-
import java.util.concurrent.ConcurrentLinkedQueue;
2119
import java.util.concurrent.Executor;
22-
import java.util.concurrent.atomic.AtomicInteger;
2320

2421
/** Helper class for executors. */
2522
public final class Executors {
2623
/**
2724
* The maximum number of tasks we submit to AsyncTask.THREAD_POOL_EXECUTOR.
2825
*
29-
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and addresses
30-
* its queue size limit of 120 pending tasks.
26+
* <p>The limit is based on the number of core threads spun by THREAD_POOL_EXECUTOR and is well
27+
* below the queue size limit of 120 pending tasks. Limiting our usage of the THREAD_POOL_EXECUTOR
28+
* allows other users to schedule their own operations on the shared THREAD_POOL_EXECUTOR.
3129
*/
32-
private static final int MAXIMUM_CONCURRENT_BACKGROUND_TASKS = 4;
30+
private static final int ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY = 4;
3331

3432
/**
3533
* The default executor for user visible callbacks. It is an executor scheduling callbacks on
@@ -40,38 +38,10 @@ public final class Executors {
4038
/** An executor that executes the provided runnable immediately on the current thread. */
4139
public static final Executor DIRECT_EXECUTOR = Runnable::run;
4240

43-
/**
44-
* An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR.
45-
*
46-
* <p>Unlike the main THREAD_POOL_EXECUTOR, this executor manages its own queue of tasks and can
47-
* handle an unbounded number of pending tasks.
48-
*/
41+
/** An executor that runs tasks in parallel on Android's AsyncTask.THREAD_POOL_EXECUTOR. */
4942
public static final Executor BACKGROUND_EXECUTOR =
50-
new Executor() {
51-
AtomicInteger activeRunnerCount = new AtomicInteger(0);
52-
Queue<Runnable> pendingTasks = new ConcurrentLinkedQueue<>();
53-
54-
@Override
55-
public void execute(Runnable command) {
56-
pendingTasks.add(command);
57-
58-
if (activeRunnerCount.get() < MAXIMUM_CONCURRENT_BACKGROUND_TASKS) {
59-
// Note that the runner count could temporarily exceed
60-
// MAXIMUM_CONCURRENT_BACKGROUND_TASKS if this is code path was executed in parallel.
61-
// While undesired, this would merely queue another task on THREAD_POOL_EXECUTOR,
62-
// and we are unlikely to hit the 120 pending task limit.
63-
activeRunnerCount.incrementAndGet();
64-
AsyncTask.THREAD_POOL_EXECUTOR.execute(
65-
() -> {
66-
Runnable r;
67-
while ((r = pendingTasks.poll()) != null) {
68-
r.run();
69-
}
70-
activeRunnerCount.decrementAndGet();
71-
});
72-
}
73-
}
74-
};
43+
new ThrottledForwardingExecutor(
44+
ASYNC_THREAD_POOL_MAXIMUM_CONCURRENCY, AsyncTask.THREAD_POOL_EXECUTOR);
7545

7646
private Executors() {
7747
// Private constructor to prevent initialization
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.util;
16+
17+
import java.util.concurrent.Executor;
18+
import java.util.concurrent.RejectedExecutionException;
19+
import java.util.concurrent.Semaphore;
20+
21+
public class ThrottledForwardingExecutor implements Executor {
22+
private final Executor executor;
23+
private final Semaphore availableSlots;
24+
25+
public ThrottledForwardingExecutor(int maximumConcurrency, Executor executor) {
26+
this.availableSlots = new Semaphore(maximumConcurrency);
27+
this.executor = executor;
28+
}
29+
30+
@Override
31+
public void execute(Runnable command) {
32+
if (availableSlots.tryAcquire()) {
33+
try {
34+
executor.execute(
35+
() -> {
36+
command.run();
37+
availableSlots.release();
38+
});
39+
} catch (RejectedExecutionException e) {
40+
command.run();
41+
}
42+
} else {
43+
command.run();
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)