/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.camera.core.impl.utils.futures;
import androidx.annotation.Nullable;
import androidx.camera.core.impl.utils.executor.CameraXExecutors;
import androidx.core.util.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A future made up of a collection of sub-futures.
*
* <p>Copied and adapted from Guava.
*
* @param <InputT> the type of the individual inputs
* @param <OutputT> the type of the output (i.e. this) future
*/
abstract class AggregateFuture<InputT, OutputT> extends AbstractFuture.TrustedFuture<OutputT> {
@SuppressWarnings("WeakerAccess") /* synthetic access */
static final Logger sLogger = Logger.getLogger(AggregateFuture.class.getName());
/*
* In certain circumstances, this field might theoretically not be visible to an afterDone()
* call triggered by cancel(). For details, see the comments on the fields of TimeoutFuture.
*/
@SuppressWarnings("WeakerAccess") /* synthetic access */
@Nullable
RunningState mRunningState;
@Override
protected final void afterDone() {
super.afterDone();
RunningState localRunningState = mRunningState;
if (localRunningState != null) {
// Let go of the memory held by the running state
this.mRunningState = null;
Collection<? extends ListenableFuture<? extends InputT>> futures =
localRunningState.mFutures;
boolean wasInterrupted = wasInterrupted();
if (wasInterrupted) {
localRunningState.interruptTask();
}
if (isCancelled() & futures != null) {
for (ListenableFuture<?> future : futures) {
future.cancel(wasInterrupted);
}
}
}
}
@Override
protected String pendingToString() {
RunningState localRunningState = mRunningState;
if (localRunningState == null) {
return null;
}
Collection<? extends ListenableFuture<? extends InputT>> localFutures =
localRunningState.mFutures;
if (localFutures != null) {
return "mFutures=[" + localFutures + "]";
}
return null;
}
/** Must be called at the end of each sub-class's constructor. */
final void init(RunningState runningState) {
this.mRunningState = runningState;
runningState.init();
}
abstract class RunningState extends AggregateFutureState implements Runnable {
@SuppressWarnings("WeakerAccess") /* synthetic access */
Collection<? extends ListenableFuture<? extends InputT>> mFutures;
private final boolean mAllMustSucceed;
private final boolean mCollectsValues;
RunningState(
Collection<? extends ListenableFuture<? extends InputT>> futures,
boolean allMustSucceed,
boolean collectsValues) {
super(futures.size());
this.mFutures = Preconditions.checkNotNull(futures);
this.mAllMustSucceed = allMustSucceed;
this.mCollectsValues = collectsValues;
}
/* Used in the !mAllMustSucceed case so we don't have to instantiate a listener. */
@Override
public final void run() {
decrementCountAndMaybeComplete();
}
/**
* The "real" initialization; we can't put this in the constructor because, in the case
* where mFutures are already complete, we would not initialize the subclass before calling
* {@link #handleOneInputDone}. As this is called after the subclass is constructed, we're
* guaranteed to have properly initialized the subclass.
*/
void init() {
// Corner case: List is empty.
if (mFutures.isEmpty()) {
handleAllCompleted();
return;
}
// NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture
// as we'll need to handle RejectedExecutionException
if (mAllMustSucceed) {
// We need fail fast, so we have to keep track of which future failed so we can
// propagate the exception immediately
// Register a listener on each Future in the list to update the state of this
// future. Note that if all the mFutures on the list are done prior to completing
// this loop, the last call to addListener() will callback to setOneValue(),
// transitively call our cleanup listener, and set this.mFutures to null. This is
// not actually a problem, since the foreach only needs this.mFutures to be
// non-null at the beginning of the loop.
int i = 0;
for (final ListenableFuture<? extends InputT> listenable : mFutures) {
final int index = i++;
listenable.addListener(
new Runnable() {
@Override
public void run() {
try {
handleOneInputDone(index, listenable);
} finally {
decrementCountAndMaybeComplete();
}
}
},
CameraXExecutors.directExecutor());
}
} else {
// We'll only call the callback when all mFutures complete, regardless of whether
// some failed. Hold off on calling setOneValue until all complete, so we can share
// the same listener.
for (ListenableFuture<? extends InputT> listenable : mFutures) {
listenable.addListener(this, CameraXExecutors.directExecutor());
}
}
}
/**
* Fails this future with the given Throwable if {@link #mAllMustSucceed} is true. Also,
* logs the throwable if it is an {@link Error} or if {@link #mAllMustSucceed} is {@code
* true}, the throwable did not cause this future to fail, and it is the first time we've
* seen that particular Throwable.
*/
private void handleException(Throwable throwable) {
Preconditions.checkNotNull(throwable);
boolean completedWithFailure = false;
boolean firstTimeSeeingThisException = true;
if (mAllMustSucceed) {
// As soon as the first one fails, throw the exception up.
// The result of all other inputs is then ignored.
completedWithFailure = setException(throwable);
if (completedWithFailure) {
releaseResourcesAfterFailure();
} else {
// Go up the causal chain to see if we've already seen this cause; if we
// have, even if it's wrapped by a different exception, don't log it.
firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(),
throwable);
}
}
// | and & used because it's faster than the branch required for || and &&
if (throwable instanceof Error
| (mAllMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) {
String message =
(throwable instanceof Error)
? "Input Future failed with Error"
: "Got more than one input Future failure. Logging failures after"
+ " the first";
sLogger.log(Level.SEVERE, message, throwable);
}
}
@Override
final void addInitialException(Set<Throwable> seen) {
if (!isCancelled()) {
boolean unused = addCausalChain(seen, tryInternalFastPathGetFailure());
}
}
/** Handles the input at the given index completing. */
void handleOneInputDone(int index, Future<? extends InputT> future) {
// The only cases in which this Future should already be done are (a) if it was
// cancelled or (b) if an input failed and we propagated that immediately because of
// mAllMustSucceed.
Preconditions.checkState(
mAllMustSucceed || !isDone() || isCancelled(),
"Future was done before all dependencies completed");
try {
Preconditions.checkState(future.isDone(),
"Tried to set value from future which is not done");
if (mAllMustSucceed) {
if (future.isCancelled()) {
// clear running state prior to cancelling children, this sets our own
// state but lets the input mFutures keep running as some of them may be
// used elsewhere.
mRunningState = null;
cancel(false);
} else {
// We always get the result so that we can have fail-fast, even if we
// don't collect
InputT result = Futures.getDone(future);
if (mCollectsValues) {
collectOneValue(mAllMustSucceed, index, result);
}
}
} else if (mCollectsValues && !future.isCancelled()) {
collectOneValue(mAllMustSucceed, index, Futures.getDone(future));
}
} catch (ExecutionException e) {
handleException(e.getCause());
} catch (Throwable t) {
handleException(t);
}
}
void decrementCountAndMaybeComplete() {
int newRemaining = decrementRemainingAndGet();
Preconditions.checkState(newRemaining >= 0, "Less than 0 remaining mFutures");
if (newRemaining == 0) {
processCompleted();
}
}
private void processCompleted() {
// Collect the values if (a) our output requires collecting them and (b) we haven't been
// collecting them as we go. (We've collected them as we go only if we needed to fail
// fast)
if (mCollectsValues & !mAllMustSucceed) {
int i = 0;
for (ListenableFuture<? extends InputT> listenable : mFutures) {
handleOneInputDone(i++, listenable);
}
}
handleAllCompleted();
}
/**
* Listeners implicitly keep a reference to {@link RunningState} as they're inner
* classes, so we free resources here as well for the mAllMustSucceed=true case (i.e. when a
* future fails, we immediately release resources we no longer need); additionally, the
* future will release its reference to {@link RunningState}, which should free all
* associated memory when all the mFutures complete and the listeners are released.
*/
void releaseResourcesAfterFailure() {
this.mFutures = null;
}
/**
* Called only if {@code mCollectsValues} is true.
*
* <p>If {@code mAllMustSucceed} is true, called as each future completes; otherwise,
* called for each future when all mFutures complete.
*/
abstract void collectOneValue(boolean allMustSucceed, int index,
@Nullable InputT returnValue);
abstract void handleAllCompleted();
void interruptTask() {
}
}
/** Adds the chain to the seen set, and returns whether all the chain was new to us. */
static boolean addCausalChain(Set<Throwable> seen, Throwable t) {
for (; t != null; t = t.getCause()) {
boolean firstTimeSeen = seen.add(t);
if (!firstTimeSeen) {
/*
* We've seen this, so we've seen its causes, too. No need to re-add them. (There's
* one case where this isn't true, but we ignore it: If we record an exception, then
* someone calls initCause() on it, and then we examine it again, we'll conclude
* that we've seen the whole chain before when it fact we haven't. But this should
* be rare.)
*/
return false;
}
}
return true;
}
}