VideoFrameProcessingTaskExecutor.java

/*
 * Copyright 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package androidx.media3.effect;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import androidx.annotation.GuardedBy;
import androidx.annotation.Nullable;
import androidx.media3.common.VideoFrameProcessingException;
import androidx.media3.common.util.GlUtil;
import androidx.media3.common.util.UnstableApi;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

/**
 * Wrapper around a single thread {@link ExecutorService} for executing {@link Task} instances.
 *
 * <p>Public methods can be called from any thread.
 *
 * <p>Calls {@link ErrorListener#onError} for errors that occur during these tasks. The listener is
 * invoked from the {@link ExecutorService}.
 *
 * <p>{@linkplain #submitWithHighPriority(Task) High priority tasks} are always executed before
 * {@linkplain #submit(Task) default priority tasks}. Tasks with equal priority are executed in FIFO
 * order.
 */
@UnstableApi
/* package */ final class VideoFrameProcessingTaskExecutor {
  /**
   * Interface for tasks that may throw a {@link GlUtil.GlException} or {@link
   * VideoFrameProcessingException}.
   */
  interface Task {
    /** Runs the task. */
    void run() throws VideoFrameProcessingException, GlUtil.GlException;
  }

  /** Listener for errors. */
  interface ErrorListener {
    /**
     * Called when an exception occurs while executing submitted tasks.
     *
     * <p>If this is called, the calling {@link VideoFrameProcessingTaskExecutor} must immediately
     * be {@linkplain VideoFrameProcessingTaskExecutor#release} released}.
     */
    void onError(VideoFrameProcessingException exception);
  }

  private static final long RELEASE_WAIT_TIME_MS = 500;

  private final boolean shouldShutdownExecutorService;
  private final ExecutorService singleThreadExecutorService;
  private final ErrorListener errorListener;
  private final Object lock;

  @GuardedBy("lock")
  private final Queue<Task> highPriorityTasks;

  @GuardedBy("lock")
  private boolean shouldCancelTasks;

  /** Creates a new instance. */
  public VideoFrameProcessingTaskExecutor(
      ExecutorService singleThreadExecutorService,
      boolean shouldShutdownExecutorService,
      ErrorListener errorListener) {
    this.singleThreadExecutorService = singleThreadExecutorService;
    this.shouldShutdownExecutorService = shouldShutdownExecutorService;
    this.errorListener = errorListener;
    lock = new Object();
    highPriorityTasks = new ArrayDeque<>();
  }

  /** Submits the given {@link Task} to be executed after all pending tasks have completed. */
  @SuppressWarnings("FutureReturnValueIgnored")
  public void submit(Task task) {
    @Nullable RejectedExecutionException executionException = null;
    synchronized (lock) {
      if (shouldCancelTasks) {
        return;
      }
      try {
        wrapTaskAndSubmitToExecutorService(task, /* isFlushOrReleaseTask= */ false);
      } catch (RejectedExecutionException e) {
        executionException = e;
      }
    }

    if (executionException != null) {
      handleException(executionException);
    }
  }

  /**
   * Submits the given {@link Task} to be executed after the currently running task and all
   * previously submitted high-priority tasks have completed.
   *
   * <p>Tasks that were previously {@linkplain #submit(Task) submitted} without high-priority and
   * have not started executing will be executed after this task is complete.
   */
  public void submitWithHighPriority(Task task) {
    synchronized (lock) {
      if (shouldCancelTasks) {
        return;
      }
      highPriorityTasks.add(task);
    }
    // If the ExecutorService has non-started tasks, the first of these non-started tasks will run
    // the task passed to this method. Just in case there are no non-started tasks, submit another
    // task to run high-priority tasks.
    submit(() -> {});
  }

  /**
   * Flushes all scheduled tasks.
   *
   * <p>During flush, the {@code VideoFrameProcessingTaskExecutor} ignores the {@linkplain #submit
   * submission of new tasks}. The tasks that are submitted before flushing are either executed or
   * canceled when this method returns.
   */
  @SuppressWarnings("FutureReturnValueIgnored")
  public void flush() throws InterruptedException {
    synchronized (lock) {
      shouldCancelTasks = true;
      highPriorityTasks.clear();
    }

    CountDownLatch latch = new CountDownLatch(1);
    wrapTaskAndSubmitToExecutorService(
        () -> {
          synchronized (lock) {
            shouldCancelTasks = false;
          }
          latch.countDown();
        },
        /* isFlushOrReleaseTask= */ true);
    latch.await();
  }

  /**
   * Cancels remaining tasks, runs the given release task
   *
   * <p>If {@code shouldShutdownExecutorService} is {@code true}, shuts down the {@linkplain
   * ExecutorService background thread}.
   *
   * <p>This {@link VideoFrameProcessingTaskExecutor} instance must not be used after this method is
   * called.
   *
   * @param releaseTask A {@link Task} to execute before shutting down the background thread.
   * @throws InterruptedException If interrupted while releasing resources.
   */
  public void release(Task releaseTask) throws InterruptedException {
    synchronized (lock) {
      shouldCancelTasks = true;
      highPriorityTasks.clear();
    }
    Future<?> unused =
        wrapTaskAndSubmitToExecutorService(releaseTask, /* isFlushOrReleaseTask= */ true);
    if (shouldShutdownExecutorService) {
      singleThreadExecutorService.shutdown();
      if (!singleThreadExecutorService.awaitTermination(RELEASE_WAIT_TIME_MS, MILLISECONDS)) {
        errorListener.onError(
            new VideoFrameProcessingException(
                "Release timed out. OpenGL resources may not be cleaned up properly."));
      }
    }
  }

  private Future<?> wrapTaskAndSubmitToExecutorService(
      Task defaultPriorityTask, boolean isFlushOrReleaseTask) {
    return singleThreadExecutorService.submit(
        () -> {
          try {
            synchronized (lock) {
              if (shouldCancelTasks && !isFlushOrReleaseTask) {
                return;
              }
            }

            @Nullable Task nextHighPriorityTask;
            while (true) {
              synchronized (lock) {
                // Lock only polling to prevent blocking the public method calls.
                nextHighPriorityTask = highPriorityTasks.poll();
              }
              if (nextHighPriorityTask == null) {
                break;
              }
              nextHighPriorityTask.run();
            }
            defaultPriorityTask.run();
          } catch (Exception e) {
            handleException(e);
          }
        });
  }

  private void handleException(Exception exception) {
    synchronized (lock) {
      if (shouldCancelTasks) {
        // Ignore exception after cancelation as it can be caused by a previously reported exception
        // that is the reason for the cancelation.
        return;
      }
      shouldCancelTasks = true;
    }
    errorListener.onError(VideoFrameProcessingException.from(exception));
  }
}