From 8835b501cc78d4e77e5e5d6a51ca4a89ea90100e Mon Sep 17 00:00:00 2001 From: Vladimir Kryachko Date: Thu, 10 Nov 2022 12:11:18 -0500 Subject: [PATCH] Adds generally useful executors Namely, SequentialExecutor and directExecutor. --- .../concurrent/ExecutorsRegistrar.java | 1 - .../concurrent/FirebaseExecutors.java | 49 ++++ .../concurrent/SequentialExecutor.java | 247 ++++++++++++++++++ .../firebase/concurrent/package-info.java | 16 ++ 4 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 firebase-common/src/main/java/com/google/firebase/concurrent/FirebaseExecutors.java create mode 100644 firebase-common/src/main/java/com/google/firebase/concurrent/SequentialExecutor.java create mode 100644 firebase-common/src/main/java/com/google/firebase/concurrent/package-info.java diff --git a/firebase-common/src/main/java/com/google/firebase/concurrent/ExecutorsRegistrar.java b/firebase-common/src/main/java/com/google/firebase/concurrent/ExecutorsRegistrar.java index 1b28eb8aec9..c1a6aada792 100644 --- a/firebase-common/src/main/java/com/google/firebase/concurrent/ExecutorsRegistrar.java +++ b/firebase-common/src/main/java/com/google/firebase/concurrent/ExecutorsRegistrar.java @@ -32,7 +32,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -/** @hide */ @SuppressLint("ThreadPoolCreation") public class ExecutorsRegistrar implements ComponentRegistrar { private static final Lazy BG_EXECUTOR = diff --git a/firebase-common/src/main/java/com/google/firebase/concurrent/FirebaseExecutors.java b/firebase-common/src/main/java/com/google/firebase/concurrent/FirebaseExecutors.java new file mode 100644 index 00000000000..a31ff759fa6 --- /dev/null +++ b/firebase-common/src/main/java/com/google/firebase/concurrent/FirebaseExecutors.java @@ -0,0 +1,49 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.firebase.concurrent; + +import java.util.concurrent.Executor; + +/** Provides commonly useful executors. */ +public class FirebaseExecutors { + private FirebaseExecutors() {} + + /** + * Creates a sequential executor. + * + *

Executes tasks sequentially and provides memory synchronization guarantees for any mutations + * of shared state. + * + *

For details see: + * https://guava.dev/releases/31.1-jre/api/docs/com/google/common/util/concurrent/MoreExecutors.html#newSequentialExecutor(java.util.concurrent.Executor) + */ + public static Executor newSequentialExecutor(Executor delegate) { + return new SequentialExecutor(delegate); + } + + /** Returns a direct executor. */ + public static Executor directExecutor() { + return DirectExecutor.INSTANCE; + } + + private enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + } +} diff --git a/firebase-common/src/main/java/com/google/firebase/concurrent/SequentialExecutor.java b/firebase-common/src/main/java/com/google/firebase/concurrent/SequentialExecutor.java new file mode 100644 index 00000000000..cd190a914c6 --- /dev/null +++ b/firebase-common/src/main/java/com/google/firebase/concurrent/SequentialExecutor.java @@ -0,0 +1,247 @@ +package com.google.firebase.concurrent; + +import static com.google.android.gms.common.internal.Preconditions.checkNotNull; +import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.IDLE; +import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.QUEUED; +import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.QUEUING; +import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.RUNNING; +import static java.lang.System.identityHashCode; + +import androidx.annotation.GuardedBy; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.CheckForNull; + +/** + * Executor ensuring that all Runnables submitted are executed in order, using the provided + * Executor, and sequentially such that no two will ever be running at the same time. + * + *

Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order. + * + *

The execution of tasks is done by one thread as long as there are tasks left in the queue. + * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks + * continues. See {@link QueueWorker#workOnQueue} for details. + * + *

{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking. + * If an {@code Error} is thrown, the error will propagate and execution will stop until it is + * restarted by a call to {@link #execute}. + */ +final class SequentialExecutor implements Executor { + private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName()); + + enum WorkerRunningState { + /** Runnable is not running and not queued for execution */ + IDLE, + /** Runnable is not running, but is being queued for execution */ + QUEUING, + /** runnable has been submitted but has not yet begun execution */ + QUEUED, + RUNNING, + } + + /** Underlying executor that all submitted Runnable objects are run on. */ + private final Executor executor; + + @GuardedBy("queue") + private final Deque queue = new ArrayDeque<>(); + + /** see {@link WorkerRunningState} */ + @GuardedBy("queue") + private WorkerRunningState workerRunningState = IDLE; + + /** + * This counter prevents an ABA issue where a thread may successfully schedule the worker, the + * worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the + * worker, and then the first thread's call to delegate.execute() returns. Without this counter, + * it would observe the QUEUING state and set it to QUEUED, and the worker would never be + * scheduled again for future submissions. + */ + @GuardedBy("queue") + private long workerRunCount = 0; + + private final QueueWorker worker = new QueueWorker(); + + SequentialExecutor(Executor executor) { + this.executor = checkNotNull(executor); + } + + /** + * Adds a task to the queue and makes sure a worker thread is running. + * + *

If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor, + * execution of tasks will stop until a call to this method is made. + */ + @Override + public void execute(Runnable task) { + checkNotNull(task); + Runnable submittedTask; + long oldRunCount; + synchronized (queue) { + // If the worker is already running (or execute() on the delegate returned successfully, and + // the worker has yet to start) then we don't need to start the worker. + if (workerRunningState == RUNNING || workerRunningState == QUEUED) { + queue.add(task); + return; + } + + oldRunCount = workerRunCount; + + // If the worker is not yet running, the delegate Executor might reject our attempt to start + // it. To preserve FIFO order and failure atomicity of rejected execution when the same + // Runnable is executed more than once, allocate a wrapper that we know is safe to remove by + // object identity. + // A data structure that returned a removal handle from add() would allow eliminating this + // allocation. + submittedTask = + new Runnable() { + @Override + public void run() { + task.run(); + } + + @Override + public String toString() { + return task.toString(); + } + }; + queue.add(submittedTask); + workerRunningState = QUEUING; + } + + try { + executor.execute(worker); + } catch (RuntimeException | Error t) { + synchronized (queue) { + boolean removed = + (workerRunningState == IDLE || workerRunningState == QUEUING) + && queue.removeLastOccurrence(submittedTask); + // If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But + // that's handled by the log check that catches RuntimeExceptions in the queue worker. + if (!(t instanceof RejectedExecutionException) || removed) { + throw t; + } + } + return; + } + + /* + * This is an unsynchronized read! After the read, the function returns immediately or acquires + * the lock to check again. Since an IDLE state was observed inside the preceding synchronized + * block, and reference field assignment is atomic, this may save reacquiring the lock when + * another thread or the worker task has cleared the count and set the state. + * + *

When {@link #executor} is a directExecutor(), the value written to + * {@code workerRunningState} will be available synchronously, and behaviour will be + * deterministic. + */ + @SuppressWarnings("GuardedBy") + boolean alreadyMarkedQueued = workerRunningState != QUEUING; + if (alreadyMarkedQueued) { + return; + } + synchronized (queue) { + if (workerRunCount == oldRunCount && workerRunningState == QUEUING) { + workerRunningState = QUEUED; + } + } + } + + /** Worker that runs tasks from {@link #queue} until it is empty. */ + private final class QueueWorker implements Runnable { + @CheckForNull Runnable task; + + @Override + public void run() { + try { + workOnQueue(); + } catch (Error e) { + synchronized (queue) { + workerRunningState = IDLE; + } + throw e; + // The execution of a task has ended abnormally. + // We could have tasks left in the queue, so should perhaps try to restart a worker, + // but then the Error will get delayed if we are using a direct (same thread) executor. + } + } + + /** + * Continues executing tasks from {@link #queue} until it is empty. + * + *

The thread's interrupt bit is cleared before execution of each task. + * + *

If the Thread in use is interrupted before or during execution of the tasks in {@link + * #queue}, the Executor will complete its tasks, and then restore the interruption. This means + * that once the Thread returns to the Executor that this Executor composes, the interruption + * will still be present. If the composed Executor is an ExecutorService, it can respond to + * shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue. + */ + private void workOnQueue() { + boolean interruptedDuringTask = false; + boolean hasSetRunning = false; + try { + while (true) { + synchronized (queue) { + // Choose whether this thread will run or not after acquiring the lock on the first + // iteration + if (!hasSetRunning) { + if (workerRunningState == RUNNING) { + // Don't want to have two workers pulling from the queue. + return; + } else { + // Increment the run counter to avoid the ABA problem of a submitter marking the + // thread as QUEUED after it already ran and exhausted the queue before returning + // from execute(). + workerRunCount++; + workerRunningState = RUNNING; + hasSetRunning = true; + } + } + task = queue.poll(); + if (task == null) { + workerRunningState = IDLE; + return; + } + } + // Remove the interrupt bit before each task. The interrupt is for the "current task" when + // it is sent, so subsequent tasks in the queue should not be caused to be interrupted + // by a previous one in the queue being interrupted. + interruptedDuringTask |= Thread.interrupted(); + try { + task.run(); + } catch (RuntimeException e) { + log.log(Level.SEVERE, "Exception while executing runnable " + task, e); + } finally { + task = null; + } + } + } finally { + // Ensure that if the thread was interrupted at all while processing the task queue, it + // is returned to the delegate Executor interrupted so that it may handle the + // interruption if it likes. + if (interruptedDuringTask) { + Thread.currentThread().interrupt(); + } + } + } + + @SuppressWarnings("GuardedBy") + @Override + public String toString() { + Runnable currentlyRunning = task; + if (currentlyRunning != null) { + return "SequentialExecutorWorker{running=" + currentlyRunning + "}"; + } + return "SequentialExecutorWorker{state=" + workerRunningState + "}"; + } + } + + @Override + public String toString() { + return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}"; + } +} diff --git a/firebase-common/src/main/java/com/google/firebase/concurrent/package-info.java b/firebase-common/src/main/java/com/google/firebase/concurrent/package-info.java new file mode 100644 index 00000000000..9452eb485fd --- /dev/null +++ b/firebase-common/src/main/java/com/google/firebase/concurrent/package-info.java @@ -0,0 +1,16 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** @hide */ +package com.google.firebase.concurrent;