SequentialExecutor.java

/*
 * Copyright 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.camera.core.impl.utils.executor;

import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.IDLE;
import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.QUEUED;
import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.QUEUING;
import static androidx.camera.core.impl.utils.executor.SequentialExecutor.WorkerRunningState.RUNNING;

import android.util.Log;

import androidx.annotation.GuardedBy;
import androidx.core.util.Preconditions;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

/**
 * Executor ensuring that all Runnables submitted are executed in order, using the provided
 * Executor, and sequentially such that no two will ever be running at the same time.
 *
 * <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
 *
 * <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
 * When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
 * continues. See {@link QueueWorker#workOnQueue} for details.
 *
 * <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
 * If an {@code Error} is thrown, the error will propagate and execution will stop until it is
 * restarted by a call to {@link #execute}.
 *
 * <p>Copied and adapted from Guava.
 */
final class SequentialExecutor implements Executor {
    private static final String TAG = "SequentialExecutor";
    @SuppressWarnings("WeakerAccess") /* synthetic accessor */
    @GuardedBy("mQueue")
    final Deque<Runnable> mQueue = new ArrayDeque<>();
    /** Underlying executor that all submitted Runnable objects are run on. */
    private final Executor mExecutor;
    private final QueueWorker mWorker = new QueueWorker();
    /** see {@link WorkerRunningState} */
    @SuppressWarnings("WeakerAccess") /* synthetic accessor */
    @GuardedBy("mQueue")
    WorkerRunningState mWorkerRunningState = IDLE;

    /**
     * This counter prevents an ABA issue where a thread may successfully schedule the worker, the
     * worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
     * worker, and then the first thread's call to delegate.execute() returns. Without this counter,
     * it would observe the QUEUING state and set it to QUEUED, and the worker would never be
     * scheduled again for future submissions.
     */
    @SuppressWarnings("WeakerAccess") /* synthetic accessor */
    @GuardedBy("mQueue")
    long mWorkerRunCount = 0;

    /** Use {@link CameraXExecutors#newSequentialExecutor} */
    SequentialExecutor(Executor executor) {
        mExecutor = Preconditions.checkNotNull(executor);
    }

    /**
     * Adds a task to the queue and makes sure a worker thread is running.
     *
     * <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate
     * executor, execution of tasks will stop until a call to this method is made.
     */
    @Override
    public void execute(final Runnable task) {
        Preconditions.checkNotNull(task);
        final Runnable submittedTask;
        final long oldRunCount;
        synchronized (mQueue) {
            // If the worker is already running (or execute() on the delegate returned
            // successfully, and
            // the worker has yet to start) then we don't need to start the worker.
            if (mWorkerRunningState == RUNNING || mWorkerRunningState == QUEUED) {
                mQueue.add(task);
                return;
            }

            oldRunCount = mWorkerRunCount;

            // If the worker is not yet running, the delegate Executor might reject our attempt
            // to start it. To preserve FIFO order and failure atomicity of rejected execution when
            // the same Runnable is executed more than once, allocate a wrapper that we know is safe
            // to remove by object identity. A data structure that returned a removal handle from
            // add() would allow eliminating this allocation.
            submittedTask =
                    new Runnable() {
                        @Override
                        public void run() {
                            task.run();
                        }
                    };
            mQueue.add(submittedTask);
            mWorkerRunningState = QUEUING;
        }

        try {
            mExecutor.execute(mWorker);
        } catch (RuntimeException | Error t) {
            synchronized (mQueue) {
                boolean removed =
                        (mWorkerRunningState == IDLE || mWorkerRunningState == QUEUING)
                                && mQueue.removeLastOccurrence(submittedTask);
                // If the delegate is directExecutor(), the submitted runnable could have thrown
                // a REE. But that's handled by the log check that catches RuntimeExceptions in the
                // queue worker.
                if (!(t instanceof RejectedExecutionException) || removed) {
                    throw t;
                }
            }
            return;
        }

        /*
         * This is an unsynchronized read! After the read, the function returns immediately or
         * acquires the lock to check again. Since an IDLE state was observed inside the preceding
         * synchronized block, and reference field assignment is atomic, this may save reacquiring
         * the lock when another thread or the worker task has cleared the count and set the state.
         *
         * <p>When {@link #executor} is a directExecutor(), the value written to
         * {@code mWorkerRunningState} will be available synchronously, and behaviour will be
         * deterministic.
         */
        @SuppressWarnings("GuardedBy")
        boolean alreadyMarkedQueued = mWorkerRunningState != QUEUING;
        if (alreadyMarkedQueued) {
            return;
        }
        synchronized (mQueue) {
            if (mWorkerRunCount == oldRunCount && mWorkerRunningState == QUEUING) {
                mWorkerRunningState = QUEUED;
            }
        }
    }

    enum WorkerRunningState {
        /** Runnable is not running and not queued for execution */
        IDLE,
        /** Runnable is not running, but is being queued for execution */
        QUEUING,
        /** Runnable has been submitted but has not yet begun execution */
        QUEUED,
        /** Runnable is running */
        RUNNING,
    }

    /** Worker that runs tasks from {@link #mQueue} until it is empty. */
    final class QueueWorker implements Runnable {
        @Override
        public void run() {
            try {
                workOnQueue();
            } catch (Error e) {
                synchronized (mQueue) {
                    mWorkerRunningState = IDLE;
                }
                throw e;
                // The execution of a task has ended abnormally.
                // We could have tasks left in the queue, so should perhaps try to restart a worker,
                // but then the Error will get delayed if we are using a direct (same thread)
                // executor.
            }
        }

        /**
         * Continues executing tasks from {@link #mQueue} until it is empty.
         *
         * <p>The thread's interrupt bit is cleared before execution of each task.
         *
         * <p>If the Thread in use is interrupted before or during execution of the tasks in {@link
         * #mQueue}, the Executor will complete its tasks, and then restore the interruption.
         * This means that once the Thread returns to the Executor that this Executor composes, the
         * interruption will still be present. If the composed Executor is an ExecutorService, it
         * can respond to shutdown() by returning tasks queued on that Thread after {@link #mWorker}
         * drains the queue.
         */
        private void workOnQueue() {
            boolean interruptedDuringTask = false;
            boolean hasSetRunning = false;
            try {
                while (true) {
                    Runnable task;
                    synchronized (mQueue) {
                        // Choose whether this thread will run or not after acquiring the lock on
                        // the first iteration
                        if (!hasSetRunning) {
                            if (mWorkerRunningState == RUNNING) {
                                // Don't want to have two workers pulling from the queue.
                                return;
                            } else {
                                // Increment the run counter to avoid the ABA problem of a submitter
                                // marking the thread as QUEUED after it already ran and exhausted
                                // the queue before returning from execute().
                                mWorkerRunCount++;
                                mWorkerRunningState = RUNNING;
                                hasSetRunning = true;
                            }
                        }
                        task = mQueue.poll();
                        if (task == null) {
                            mWorkerRunningState = IDLE;
                            return;
                        }
                    }
                    // Remove the interrupt bit before each task. The interrupt is for the
                    // "current task" when it is sent, so subsequent tasks in the queue should not
                    // be caused to be interrupted by a previous one in the queue being interrupted.
                    interruptedDuringTask |= Thread.interrupted();
                    try {
                        task.run();
                    } catch (RuntimeException e) {
                        Log.e(TAG, "Exception while executing runnable " + task, e);
                    }
                }
            } finally {
                // Ensure that if the thread was interrupted at all while processing the task
                // queue, it is returned to the delegate Executor interrupted so that it may handle
                // the interruption if it likes.
                if (interruptedDuringTask) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}