ChainingListenableFuture.java

/*
 * Copyright 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.test.espresso.web.util.concurrent;

import static androidx.test.espresso.web.util.concurrent.Futures.getUninterruptibly;
import static androidx.test.internal.util.Checks.checkNotNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.test.platform.concurrent.DirectExecutor;
import com.google.common.util.concurrent.ListenableFuture;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Forked from androidx.camera package to avoid full Guava dependency.
 *
 * <p>The Class is based on the ChainingListenableFuture in Guava, the constructor of FutureChain
 * will use the CallbackToFutureAdapter instead of the AbstractFuture.
 *
 * <p>An implementation of {@code ListenableFuture} that also implements {@code Runnable} so that it
 * can be used to nest ListenableFutures. Once the passed-in {@code ListenableFuture} is complete,
 * it calls the passed-in {@code Function} to generate the result.
 *
 * <p>If the Function throws any checked exceptions, they should be wrapped in a {@code
 * UndeclaredThrowableException} so that this class can get access to the cause.
 */
class ChainingListenableFuture<I, O> extends FutureChain<O> implements Runnable {
  @Nullable private AsyncFunction<? super I, ? extends O> function;
  private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = new LinkedBlockingQueue<>(1);
  private final CountDownLatch outputCreated = new CountDownLatch(1);
  @Nullable private ListenableFuture<? extends I> inputFuture;
  @Nullable volatile ListenableFuture<? extends O> outputFuture;

  ChainingListenableFuture(
      @NonNull AsyncFunction<? super I, ? extends O> function,
      @NonNull ListenableFuture<? extends I> inputFuture) {
    super();
    this.function = checkNotNull(function);
    this.inputFuture = checkNotNull(inputFuture);
  }

  /**
   * Delegate the get() to the input and output mFutures, in case their implementations defer
   * starting computation until their own get() is invoked.
   */
  @Override
  @Nullable
  public O get() throws InterruptedException, ExecutionException {
    if (!isDone()) {
      // Invoking get on the mInputFuture will ensure our own run()
      // method below is invoked as a listener when mInputFuture sets
      // its value.  Therefore when get() returns we should then see
      // the mOutputFuture be created.
      ListenableFuture<? extends I> inputFuture = this.inputFuture;
      if (inputFuture != null) {
        inputFuture.get();
      }

      // If our listener was scheduled to run on an executor we may
      // need to wait for our listener to finish running before the
      // mOutputFuture has been constructed by the mFunction.
      outputCreated.await();

      // Like above with the mInputFuture, we have a listener on
      // the mOutputFuture that will set our own value when its
      // value is set.  Invoking get will ensure the output can
      // complete and invoke our listener, so that we can later
      // get the mResult.
      ListenableFuture<? extends O> outputFuture = this.outputFuture;
      if (outputFuture != null) {
        outputFuture.get();
      }
    }
    return super.get();
  }

  /**
   * Delegate the get() to the input and output mFutures, in case their implementations defer
   * starting computation until their own get() is invoked.
   */
  @Override
  @Nullable
  public O get(long timeout, @NonNull TimeUnit unit)
      throws TimeoutException, ExecutionException, InterruptedException {
    if (!isDone()) {
      // Use a single time unit so we can decrease mRemaining timeout
      // as we wait for various phases to complete.
      if (unit != NANOSECONDS) {
        timeout = NANOSECONDS.convert(timeout, unit);
        unit = NANOSECONDS;
      }

      // Invoking get on the mInputFuture will ensure our own run()
      // method below is invoked as a listener when mInputFuture sets
      // its value.  Therefore when get() returns we should then see
      // the mOutputFuture be created.
      ListenableFuture<? extends I> inputFuture = this.inputFuture;
      if (inputFuture != null) {
        long start = System.nanoTime();
        inputFuture.get(timeout, unit);
        timeout -= Math.max(0, System.nanoTime() - start);
      }

      // If our listener was scheduled to run on an executor we may
      // need to wait for our listener to finish running before the
      // mOutputFuture has been constructed by the mFunction.
      long start = System.nanoTime();
      if (!outputCreated.await(timeout, unit)) {
        throw new TimeoutException();
      }
      timeout -= Math.max(0, System.nanoTime() - start);

      // Like above with the mInputFuture, we have a listener on
      // the mOutputFuture that will set our own value when its
      // value is set.  Invoking get will ensure the output can
      // complete and invoke our listener, so that we can later
      // get the mResult.
      ListenableFuture<? extends O> outputFuture = this.outputFuture;
      if (outputFuture != null) {
        outputFuture.get(timeout, unit);
      }
    }
    return super.get(timeout, unit);
  }

  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
    /*
     * Our additional cancellation work needs to occur even if
     * !mayInterruptIfRunning, so we can't move it into interruptTask().
     */
    if (super.cancel(mayInterruptIfRunning)) {
      // This should never block since only one thread is allowed to cancel
      // this Future.
      putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
      cancel(inputFuture, mayInterruptIfRunning);
      cancel(outputFuture, mayInterruptIfRunning);
      return true;
    }
    return false;
  }

  private void cancel(@Nullable Future<?> future, boolean mayInterruptIfRunning) {
    if (future != null) {
      future.cancel(mayInterruptIfRunning);
    }
  }

  @Override
  public void run() {
    try {
      I sourceResult;
      try {
        sourceResult = getUninterruptibly(inputFuture);
      } catch (CancellationException e) {
        // Cancel this future and return.
        // At this point, mInputFuture is cancelled and mOutputFuture doesn't
        // exist, so the value of mayInterruptIfRunning is irrelevant.
        cancel(false);
        return;
      } catch (ExecutionException e) {
        // Set the cause of the exception as this future's exception
        setException(e.getCause());
        return;
      }

      final ListenableFuture<? extends O> outputFuture =
          this.outputFuture = function.apply(sourceResult);
      if (isCancelled()) {
        // Handles the case where cancel was called while the mFunction was
        // being applied.
        // There is a gap in cancel(boolean) between calling sync.cancel()
        // and storing the value of mayInterruptIfRunning, so this thread
        // needs to block, waiting for that value.
        outputFuture.cancel(takeUninterruptibly(mayInterruptIfRunningChannel));
        this.outputFuture = null;
        return;
      }
      outputFuture.addListener(
          new Runnable() {
            @Override
            public void run() {
              try {
                // Here it would have been nice to have had an
                // UninterruptibleListenableFuture, but we don't want to start a
                // combinatorial explosion of interfaces, so we have to make do.
                set(getUninterruptibly(outputFuture));
              } catch (CancellationException e) {
                // Cancel this future and return.
                // At this point, mInputFuture and mOutputFuture are done, so the
                // value of mayInterruptIfRunning is irrelevant.
                cancel(false);
                return;
              } catch (ExecutionException e) {
                // Set the cause of the exception as this future's exception
                setException(e.getCause());
              } finally {
                // Don't pin inputs beyond completion
                ChainingListenableFuture.this.outputFuture = null;
              }
            }
          },
          DirectExecutor.INSTANCE);
    } catch (UndeclaredThrowableException e) {
      // Set the cause of the exception as this future's exception
      setException(e.getCause());
    } catch (Exception e) {
      // This exception is irrelevant in this thread, but useful for the
      // client
      setException(e);
    } catch (Error e) {
      // Propagate errors up ASAP - our superclass will rethrow the error
      setException(e);
    } finally {
      // Don't pin inputs beyond completion
      function = null;
      inputFuture = null;
      // Allow our get routines to examine mOutputFuture now.
      outputCreated.countDown();
    }
  }

  /** Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly. */
  private <E> E takeUninterruptibly(@NonNull BlockingQueue<E> queue) {
    boolean interrupted = false;
    try {
      while (true) {
        try {
          return queue.take();
        } catch (InterruptedException e) {
          interrupted = true;
        }
      }
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt();
      }
    }
  }

  /** Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)} uninterruptibly. */
  private <E> void putUninterruptibly(@NonNull BlockingQueue<E> queue, @NonNull E element) {
    boolean interrupted = false;
    try {
      while (true) {
        try {
          queue.put(element);
          return;
        } catch (InterruptedException e) {
          interrupted = true;
        }
      }
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt();
      }
    }
  }
}