/*
* Copyright 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.camera.impl.utils.futures
import androidx.arch.core.util.Function
import androidx.camera.impl.utils.executor.CameraExecutors
import androidx.concurrent.futures.CallbackToFutureAdapter
import androidx.core.util.Preconditions
import com.google.common.util.concurrent.ListenableFuture
import java.util.concurrent.CancellationException
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.Future
import java.util.concurrent.ScheduledFuture
/**
* Utility class for generating specific implementations of [ListenableFuture].
*/
object Futures {
/**
* Returns an implementation of [ListenableFuture] which immediately contains a result.
*
* @param value The result that is immediately set on the future.
* @param <V> The type of the result.
* @return A future which immediately contains the result.
</V> */
private fun <V> immediateFuture(value: V?): ListenableFuture<V> {
return if (value == null) {
ImmediateFuture.nullFuture()
} else ImmediateFuture.ImmediateSuccessfulFuture(
value
)
}
/**
* Returns an implementation of [ScheduledFuture] which immediately contains an
* exception that will be thrown by [Future.get].
*
* @param cause The cause of the [ExecutionException] that will be thrown by
* [Future.get].
* @param <V> The type of the result.
* @return A future which immediately contains an exception.
</V> */
fun <V> immediateFailedScheduledFuture(cause: Throwable): ScheduledFuture<V> {
return ImmediateFuture.ImmediateFailedScheduledFuture(
cause
)
}
/**
* Returns a new `Future` whose result is asynchronously derived from the result
* of the given `Future`. If the given `Future` fails, the returned `Future`
* fails with the same exception (and the function is not invoked).
*
* @param input The future to transform
* @param function A function to transform the result of the input future to the result of the
* output future
* @param executor Executor to run the function in.
* @return A future that holds result of the function (if the input succeeded) or the original
* input's failure (if not)
*/
fun <I, O> transformAsync(
input: ListenableFuture<I>,
function: AsyncFunction<in I, out O>,
executor: Executor
): ListenableFuture<O> {
val output: ChainingListenableFuture<I, O> =
ChainingListenableFuture(
function,
input
)
input.addListener(output, executor)
return output
}
/**
* Returns a new `Future` whose result is derived from the result of the given `Future`. If `input` fails, the returned `Future` fails with the same
* exception (and the function is not invoked)
*
* @param input The future to transform
* @param function A function to transform the results of the provided future to the results of
* the returned future.
* @param executor Executor to run the function in.
* @return A future that holds result of the transformation.
*/
fun <I, O> transform(
input: ListenableFuture<I>,
function: Function<in I?, out O>,
executor: Executor
): ListenableFuture<O> {
Preconditions.checkNotNull(function)
return transformAsync(
input,
{ immediateFuture(function.apply(it)) },
executor
)
}
/**
* Propagates the result of the given `ListenableFuture` to the given [ ] directly.
*
*
* If `input` fails, the failure will be propagated to the `completer`.
*
* @param input The future being propagated.
* @param completer The completer which will receive the result of the provided future.
*/
// ListenableFuture not needed for SAM conversion
@JvmStatic
fun <V> propagate(
input: ListenableFuture<V>,
completer: CallbackToFutureAdapter.Completer<V>
) {
// Use direct executor here since function is just unpacking the output and should be quick
propagateTransform(
input,
{ functionInput -> functionInput },
completer,
CameraExecutors.directExecutor()
)
}
/**
* Propagates the result of the given `ListenableFuture` to the given [ ] by applying the provided transformation function.
*
*
* If `input` fails, the failure will be propagated to the `completer` (and the
* function is not invoked)
*
* @param input The future to transform.
* @param function A function to transform the results of the provided future to the results of
* the provided completer.
* @param completer The completer which will receive the result of the provided future.
* @param executor Executor to run the function in.
*/
private fun <I, O> propagateTransform(
input: ListenableFuture<I>,
function: Function<in I, out O>,
completer: CallbackToFutureAdapter.Completer<O>,
executor: Executor
) {
propagateTransform(true, input, function, completer, executor)
}
/**
* Propagates the result of the given `ListenableFuture` to the given [ ] by applying the provided transformation function.
*
*
* If `input` fails, the failure will be propagated to the `completer` (and the
* function is not invoked)
*
* @param propagateCancellation `true` to propagate the cancellation from completer to
* input future.
* @param input The future to transform.
* @param function A function to transform the results of the provided future to
* the results of the provided completer.
* @param completer The completer which will receive the result of the provided
* future.
* @param executor Executor to run the function in.
*/
private fun <I, O> propagateTransform(
propagateCancellation: Boolean,
input: ListenableFuture<I>,
function: Function<in I, out O>,
completer: CallbackToFutureAdapter.Completer<O>,
executor: Executor
) {
Preconditions.checkNotNull(input)
Preconditions.checkNotNull(function)
Preconditions.checkNotNull(completer)
Preconditions.checkNotNull(executor)
addCallback(
input,
object : FutureCallback<I> {
override fun onSuccess(result: I?) {
try {
completer.set(function.apply(result))
} catch (t: Throwable) {
completer.setException(t)
}
}
override fun onFailure(t: Throwable) {
completer.setException(t)
}
},
executor
)
if (propagateCancellation) {
// Propagate cancellation from completer to input future
completer.addCancellationListener(
{ input.cancel(true) },
CameraExecutors.directExecutor()
)
}
}
/**
* Returns a `ListenableFuture` whose result is set from the supplied future when it
* completes.
*
*
* Cancelling the supplied future will also cancel the returned future, but
* cancelling the returned future will have no effect on the supplied future.
*/
@JvmStatic
fun <V> nonCancellationPropagating(
future: ListenableFuture<V>
): ListenableFuture<V> {
Preconditions.checkNotNull(
future
)
return if (future.isDone) {
future
} else CallbackToFutureAdapter.getFuture {
// Input of function is same as output
propagateTransform(
false, future, { input -> input }, it,
CameraExecutors.directExecutor()
)
"nonCancellationPropagating[$future]"
}
}
/**
* Creates a new `ListenableFuture` whose value is a list containing the values of all its
* successful input futures. The list of results is in the same order as the input list, and if
* any of the provided futures fails or is canceled, its corresponding position will contain
* `null` (which is indistinguishable from the future having a successful value of `null`).
*
*
* Canceling this future will attempt to cancel all the component futures.
*
* @param futures futures to combine
* @return a future that provides a list of the results of the component futures
*/
fun <V> successfulAsList(
futures: Collection<ListenableFuture<out V>>
): ListenableFuture<List<V?>?> {
return ListFuture(
futures.toList(), false,
CameraExecutors.directExecutor()
)
}
/**
* Creates a new `ListenableFuture` whose value is a list containing the values of all its
* input futures, if all succeed.
*
*
* The list of results is in the same order as the input list.
*
*
* Canceling this future will attempt to cancel all the component futures, and if any of the
* provided futures fails or is canceled, this one is, too.
*
* @param futures futures to combine
* @return a future that provides a list of the results of the component futures
*/
fun <V> allAsList(
futures: Collection<ListenableFuture<out V>>
): ListenableFuture<List<V?>?> {
return ListFuture(
futures.toList(),
true,
CameraExecutors.directExecutor()
)
}
/**
* Registers separate success and failure callbacks to be run when the `Future`'s
* computation is [complete][Future.isDone] or, if the
* computation is already complete, immediately.
*
* @param future The future attach the callback to.
* @param callback The callback to invoke when `future` is completed.
* @param executor The executor to run `callback` when the future completes.
*/
@JvmStatic
fun <V> addCallback(
future: ListenableFuture<V>,
callback: FutureCallback<in V>,
executor: Executor
) {
Preconditions.checkNotNull(callback)
future.addListener(CallbackListener(future, callback), executor)
}
/**
* Returns the result of the input `Future`, which must have already completed.
*
*
* The benefits of this method are twofold. First, the name "getDone" suggests to readers
* that the `Future` is already done. Second, if buggy code calls `getDone` on a
* `Future` that is still pending, the program will throw instead of block.
*
* @throws ExecutionException if the `Future` failed with an exception
* @throws CancellationException if the `Future` was cancelled
* @throws IllegalStateException if the `Future` is not done
*/
@Throws(ExecutionException::class)
fun <V> getDone(future: Future<V>): V? {
/*
* We throw IllegalStateException, since the call could succeed later. Perhaps we
* "should" throw IllegalArgumentException, since the call could succeed with a different
* argument. Those exceptions' docs suggest that either is acceptable. Google's Java
* Practices page recommends IllegalArgumentException here, in part to keep its
* recommendation simple: Static methods should throw IllegalStateException only when
* they use static state.
*
* Why do we deviate here? The answer: We want for fluentFuture.getDone() to throw the same
* exception as Futures.getDone(fluentFuture).
*/
Preconditions.checkState(
future.isDone,
"Future was expected to be done, $future"
)
return getUninterruptibly(future)
}
/**
* Invokes `Future.`[get()][Future.get] uninterruptibly.
*
* @throws ExecutionException if the computation threw an exception
* @throws CancellationException if the computation was cancelled
*/
@Throws(ExecutionException::class)
fun <V> getUninterruptibly(future: Future<V>): V {
var interrupted = false
try {
while (true) {
interrupted = try {
return future.get()
} catch (e: InterruptedException) {
true
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt()
}
}
}
/**
* See [.addCallback] for behavioral notes.
*/
private class CallbackListener<V>(
val mFuture: Future<V>,
callback: FutureCallback<in V>
) :
Runnable {
val mCallback: FutureCallback<in V>
init {
mCallback = callback
}
override fun run() {
val value: V?
try {
value = getDone(mFuture)
} catch (e: ExecutionException) {
e.cause?.let { mCallback.onFailure(it) }
return
} catch (e: RuntimeException) {
mCallback.onFailure(e)
return
} catch (e: Error) {
mCallback.onFailure(e)
return
}
mCallback.onSuccess(value)
}
override fun toString(): String {
return javaClass.getSimpleName() + "," + mCallback
}
}
}