Skip to content

Adds generally useful executors #4305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

/** @hide */
@SuppressLint("ThreadPoolCreation")
public class ExecutorsRegistrar implements ComponentRegistrar {
private static final Lazy<ScheduledExecutorService> BG_EXECUTOR =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.firebase.concurrent;

import java.util.concurrent.Executor;

/** Provides commonly useful executors. */
public class FirebaseExecutors {
private FirebaseExecutors() {}

/**
* Creates a sequential executor.
*
* <p>Executes tasks sequentially and provides memory synchronization guarantees for any mutations
* of shared state.
*
* <p>For details see:
* https://guava.dev/releases/31.1-jre/api/docs/com/google/common/util/concurrent/MoreExecutors.html#newSequentialExecutor(java.util.concurrent.Executor)
*/
public static Executor newSequentialExecutor(Executor delegate) {
return new SequentialExecutor(delegate);
}

/** Returns a direct executor. */
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}

private enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(Runnable command) {
command.run();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package com.google.firebase.concurrent;

import static com.google.android.gms.common.internal.Preconditions.checkNotNull;
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.IDLE;
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.QUEUED;
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.QUEUING;
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.RUNNING;
import static java.lang.System.identityHashCode;

import androidx.annotation.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;

/**
* Executor ensuring that all Runnables submitted are executed in order, using the provided
* Executor, and sequentially such that no two will ever be running at the same time.
*
* <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
*
* <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
* When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
* continues. See {@link QueueWorker#workOnQueue} for details.
*
* <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
* If an {@code Error} is thrown, the error will propagate and execution will stop until it is
* restarted by a call to {@link #execute}.
*/
final class SequentialExecutor implements Executor {
private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName());

enum WorkerRunningState {
/** Runnable is not running and not queued for execution */
IDLE,
/** Runnable is not running, but is being queued for execution */
QUEUING,
/** runnable has been submitted but has not yet begun execution */
QUEUED,
RUNNING,
}

/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor;

@GuardedBy("queue")
private final Deque<Runnable> queue = new ArrayDeque<>();

/** see {@link WorkerRunningState} */
@GuardedBy("queue")
private WorkerRunningState workerRunningState = IDLE;

/**
* This counter prevents an ABA issue where a thread may successfully schedule the worker, the
* worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
* worker, and then the first thread's call to delegate.execute() returns. Without this counter,
* it would observe the QUEUING state and set it to QUEUED, and the worker would never be
* scheduled again for future submissions.
*/
@GuardedBy("queue")
private long workerRunCount = 0;

private final QueueWorker worker = new QueueWorker();

SequentialExecutor(Executor executor) {
this.executor = checkNotNull(executor);
}

/**
* Adds a task to the queue and makes sure a worker thread is running.
*
* <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
* execution of tasks will stop until a call to this method is made.
*/
@Override
public void execute(Runnable task) {
checkNotNull(task);
Runnable submittedTask;
long oldRunCount;
synchronized (queue) {
// If the worker is already running (or execute() on the delegate returned successfully, and
// the worker has yet to start) then we don't need to start the worker.
if (workerRunningState == RUNNING || workerRunningState == QUEUED) {
queue.add(task);
return;
}

oldRunCount = workerRunCount;

// If the worker is not yet running, the delegate Executor might reject our attempt to start
// it. To preserve FIFO order and failure atomicity of rejected execution when the same
// Runnable is executed more than once, allocate a wrapper that we know is safe to remove by
// object identity.
// A data structure that returned a removal handle from add() would allow eliminating this
// allocation.
submittedTask =
new Runnable() {
@Override
public void run() {
task.run();
}

@Override
public String toString() {
return task.toString();
}
};
queue.add(submittedTask);
workerRunningState = QUEUING;
}

try {
executor.execute(worker);
} catch (RuntimeException | Error t) {
synchronized (queue) {
boolean removed =
(workerRunningState == IDLE || workerRunningState == QUEUING)
&& queue.removeLastOccurrence(submittedTask);
// If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But
// that's handled by the log check that catches RuntimeExceptions in the queue worker.
if (!(t instanceof RejectedExecutionException) || removed) {
throw t;
}
}
return;
}

/*
* This is an unsynchronized read! After the read, the function returns immediately or acquires
* the lock to check again. Since an IDLE state was observed inside the preceding synchronized
* block, and reference field assignment is atomic, this may save reacquiring the lock when
* another thread or the worker task has cleared the count and set the state.
*
* <p>When {@link #executor} is a directExecutor(), the value written to
* {@code workerRunningState} will be available synchronously, and behaviour will be
* deterministic.
*/
@SuppressWarnings("GuardedBy")
boolean alreadyMarkedQueued = workerRunningState != QUEUING;
if (alreadyMarkedQueued) {
return;
}
synchronized (queue) {
if (workerRunCount == oldRunCount && workerRunningState == QUEUING) {
workerRunningState = QUEUED;
}
}
}

/** Worker that runs tasks from {@link #queue} until it is empty. */
private final class QueueWorker implements Runnable {
@CheckForNull Runnable task;

@Override
public void run() {
try {
workOnQueue();
} catch (Error e) {
synchronized (queue) {
workerRunningState = IDLE;
}
throw e;
// The execution of a task has ended abnormally.
// We could have tasks left in the queue, so should perhaps try to restart a worker,
// but then the Error will get delayed if we are using a direct (same thread) executor.
}
}

/**
* Continues executing tasks from {@link #queue} until it is empty.
*
* <p>The thread's interrupt bit is cleared before execution of each task.
*
* <p>If the Thread in use is interrupted before or during execution of the tasks in {@link
* #queue}, the Executor will complete its tasks, and then restore the interruption. This means
* that once the Thread returns to the Executor that this Executor composes, the interruption
* will still be present. If the composed Executor is an ExecutorService, it can respond to
* shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue.
*/
private void workOnQueue() {
boolean interruptedDuringTask = false;
boolean hasSetRunning = false;
try {
while (true) {
synchronized (queue) {
// Choose whether this thread will run or not after acquiring the lock on the first
// iteration
if (!hasSetRunning) {
if (workerRunningState == RUNNING) {
// Don't want to have two workers pulling from the queue.
return;
} else {
// Increment the run counter to avoid the ABA problem of a submitter marking the
// thread as QUEUED after it already ran and exhausted the queue before returning
// from execute().
workerRunCount++;
workerRunningState = RUNNING;
hasSetRunning = true;
}
}
task = queue.poll();
if (task == null) {
workerRunningState = IDLE;
return;
}
}
// Remove the interrupt bit before each task. The interrupt is for the "current task" when
// it is sent, so subsequent tasks in the queue should not be caused to be interrupted
// by a previous one in the queue being interrupted.
interruptedDuringTask |= Thread.interrupted();
try {
task.run();
} catch (RuntimeException e) {
log.log(Level.SEVERE, "Exception while executing runnable " + task, e);
} finally {
task = null;
}
}
} finally {
// Ensure that if the thread was interrupted at all while processing the task queue, it
// is returned to the delegate Executor interrupted so that it may handle the
// interruption if it likes.
if (interruptedDuringTask) {
Thread.currentThread().interrupt();
}
}
}

@SuppressWarnings("GuardedBy")
@Override
public String toString() {
Runnable currentlyRunning = task;
if (currentlyRunning != null) {
return "SequentialExecutorWorker{running=" + currentlyRunning + "}";
}
return "SequentialExecutorWorker{state=" + workerRunningState + "}";
}
}

@Override
public String toString() {
return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/** @hide */
package com.google.firebase.concurrent;