ChainingListenableFuture.kt

/*
 * Copyright 2023 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.camera.impl.utils.futures

import androidx.camera.impl.utils.executor.ViewfinderExecutors
import androidx.core.util.Preconditions
import com.google.common.util.concurrent.ListenableFuture
import java.lang.reflect.UndeclaredThrowableException
import java.util.concurrent.BlockingQueue
import java.util.concurrent.CancellationException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import kotlin.math.max

/**
 * The Class is based on the ChainingListenableFuture in Guava, the constructor of FutureChain
 * will use the CallbackToFutureAdapter instead of the AbstractFuture.
 *
 * An implementation of `ListenableFuture` that also implements
 * `Runnable` so that it can be used to nest ListenableFutures.
 * Once the passed-in `ListenableFuture` is complete, it calls the
 * passed-in `Function` to generate the result.
 *
 *
 * If the Function throws any checked exceptions, they should be wrapped
 * in a `UndeclaredThrowableException` so that this class can get access to the cause.
 *
 */
internal class ChainingListenableFuture<I, O>(
    function: AsyncFunction<in I, out O>,
    inputFuture: ListenableFuture<out I>
) :
    FutureChain<O>(), Runnable {
    private var mFunction: AsyncFunction<in I, out O>?
    private val mMayInterruptIfRunningChannel: BlockingQueue<Boolean> = LinkedBlockingQueue(1)
    private val mOutputCreated = CountDownLatch(1)
    private var mInputFuture: ListenableFuture<out I>?

    @Volatile
    var mOutputFuture: ListenableFuture<out O>? = null

    init {
        mFunction = Preconditions.checkNotNull(function)
        mInputFuture = Preconditions.checkNotNull(inputFuture)
    }

    /**
     * Delegate the get() to the input and output mFutures, in case
     * their implementations defer starting computation until their
     * own get() is invoked.
     */
    @Throws(InterruptedException::class, ExecutionException::class)
    override fun get(): O? {
        if (!isDone) {
            // Invoking get on the mInputFuture will ensure our own run()
            // method below is invoked as a listener when mInputFuture sets
            // its value.  Therefore when get() returns we should then see
            // the mOutputFuture be created.
            val inputFuture = mInputFuture
            inputFuture?.get()

            // If our listener was scheduled to run on an executor we may
            // need to wait for our listener to finish running before the
            // mOutputFuture has been constructed by the mFunction.
            mOutputCreated.await()

            // Like above with the mInputFuture, we have a listener on
            // the mOutputFuture that will set our own value when its
            // value is set.  Invoking get will ensure the output can
            // complete and invoke our listener, so that we can later
            // get the mResult.
            val outputFuture = mOutputFuture
            outputFuture?.get()
        }
        return super.get()
    }

    /**
     * Delegate the get() to the input and output mFutures, in case
     * their implementations defer starting computation until their
     * own get() is invoked.
     */
    @Throws(
        TimeoutException::class,
        ExecutionException::class,
        InterruptedException::class
    )
    override operator fun get(timeout: Long, unit: TimeUnit): O? {
        var resultTimeout = timeout
        var resultUnit = unit
        if (!isDone) {
            // Use a single time unit so we can decrease mRemaining timeout
            // as we wait for various phases to complete.
            if (resultUnit != TimeUnit.NANOSECONDS) {
                resultTimeout = TimeUnit.NANOSECONDS.convert(resultTimeout, resultUnit)
                resultUnit = TimeUnit.NANOSECONDS
            }

            // Invoking get on the mInputFuture will ensure our own run()
            // method below is invoked as a listener when mInputFuture sets
            // its value.  Therefore when get() returns we should then see
            // the mOutputFuture be created.
            val inputFuture = mInputFuture
            if (inputFuture != null) {
                val start = System.nanoTime()
                inputFuture[resultTimeout, resultUnit]
                resultTimeout -= max(0L, (System.nanoTime() - start))
            }

            // If our listener was scheduled to run on an executor we may
            // need to wait for our listener to finish running before the
            // mOutputFuture has been constructed by the mFunction.
            val start = System.nanoTime()
            if (!mOutputCreated.await(resultTimeout, resultUnit)) {
                throw TimeoutException()
            }
            resultTimeout -= max(0L, (System.nanoTime() - start))

            // Like above with the mInputFuture, we have a listener on
            // the mOutputFuture that will set our own value when its
            // value is set.  Invoking get will ensure the output can
            // complete and invoke our listener, so that we can later
            // get the mResult.
            val outputFuture = mOutputFuture
            outputFuture?.get(resultTimeout, resultUnit)
        }
        return super.get(resultTimeout, resultUnit)
    }

    override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
        /*
         * Our additional cancellation work needs to occur even if
         * !mayInterruptIfRunning, so we can't move it into interruptTask().
         */
        if (super.cancel(mayInterruptIfRunning)) {
            // This should never block since only one thread is allowed to cancel
            // this Future.
            putUninterruptibly(mMayInterruptIfRunningChannel, mayInterruptIfRunning)
            cancel(mInputFuture, mayInterruptIfRunning)
            cancel(mOutputFuture, mayInterruptIfRunning)
            return true
        }
        return false
    }

    private fun cancel(
        future: Future<*>?,
        mayInterruptIfRunning: Boolean
    ) {
        future?.cancel(mayInterruptIfRunning)
    }

    override fun run() {
        try {
            val sourceResult: I = try {
                Futures.getUninterruptibly(
                    mInputFuture!!
                )
            } catch (e: CancellationException) {
                // Cancel this future and return.
                // At this point, mInputFuture is cancelled and mOutputFuture doesn't
                // exist, so the value of mayInterruptIfRunning is irrelevant.
                cancel(false)
                return
            } catch (e: ExecutionException) {
                // Set the cause of the exception as this future's exception
                e.cause?.let { setException(it) }
                return
            }
            mOutputFuture = mFunction!!.apply(sourceResult)
            val outputFuture = mOutputFuture
            if (isCancelled) {
                // Handles the case where cancel was called while the mFunction was
                // being applied.
                // There is a gap in cancel(boolean) between calling sync.cancel()
                // and storing the value of mayInterruptIfRunning, so this thread
                // needs to block, waiting for that value.
                outputFuture!!.cancel(takeUninterruptibly(mMayInterruptIfRunningChannel))
                mOutputFuture = null
                return
            }
            outputFuture!!.addListener(Runnable {
                try {
                    // Here it would have been nice to have had an
                    // UninterruptibleListenableFuture, but we don't want to start a
                    // combinatorial explosion of interfaces, so we have to make do.
                    set(
                        Futures.getUninterruptibly(
                            outputFuture
                        )
                    )
                } catch (e: CancellationException) {
                    // Cancel this future and return.
                    // At this point, mInputFuture and mOutputFuture are done, so the
                    // value of mayInterruptIfRunning is irrelevant.
                    cancel(false)
                    return@Runnable
                } catch (e: ExecutionException) {
                    // Set the cause of the exception as this future's exception
                    e.cause?.let { setException(it) }
                } finally {
                    // Don't pin inputs beyond completion
                    mOutputFuture = null
                }
            }, ViewfinderExecutors.directExecutor())
        } catch (e: UndeclaredThrowableException) {
            // Set the cause of the exception as this future's exception
            e.cause?.let { setException(it) }
        } catch (e: Exception) {
            // This exception is irrelevant in this thread, but useful for the
            // client
            setException(e)
        } catch (e: Error) {
            // Propagate errors up ASAP - our superclass will rethrow the error
            setException(e)
        } finally {
            // Don't pin inputs beyond completion
            mFunction = null
            mInputFuture = null
            // Allow our get routines to examine mOutputFuture now.
            mOutputCreated.countDown()
        }
    }

    /**
     * Invokes `queue.`[take()][BlockingQueue.take] uninterruptibly.
     */
    private fun <E> takeUninterruptibly(queue: BlockingQueue<E>): E {
        var interrupted = false
        try {
            while (true) {
                interrupted = try {
                    return queue.take()
                } catch (e: InterruptedException) {
                    true
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt()
            }
        }
    }

    /**
     * Invokes `queue.`[put(element)][BlockingQueue.put]
     * uninterruptibly.
     */
    private fun <E> putUninterruptibly(queue: BlockingQueue<E>, element: E) {
        var interrupted = false
        try {
            while (true) {
                interrupted = try {
                    queue.put(element)
                    return
                } catch (e: InterruptedException) {
                    true
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt()
            }
        }
    }
}