ListFuture.kt

/*
 * Copyright 2023 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.camera.impl.utils.futures

import androidx.camera.impl.utils.executor.CameraExecutors
import androidx.concurrent.futures.CallbackToFutureAdapter
import androidx.core.util.Preconditions
import com.google.common.util.concurrent.ListenableFuture
import java.util.concurrent.CancellationException
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

/**
 * The Class is based on the ListFuture in Guava and to use the CallbackToFutureAdapter instead
 * of the AbstractFuture.
 *
 * Class that implements [Futures.allAsList] and
 * [Futures.successfulAsList].
 * The idea is to create a (null-filled) List and register a listener with
 * each component future to fill out the value in the List when that future
 * completes.

 * @param futures          all the futures to build the list from
 * @param allMustSucceed   whether a single failure or cancellation should
 * propagate to this future
 * @param listenerExecutor used to run listeners on all the passed in futures.
 */

internal class ListFuture<V>(
    private var futures: List<ListenableFuture<out V>>,
    private val allMustSucceed: Boolean,
    listenerExecutor: Executor
) :
    ListenableFuture<List<V?>?> {
    private var futuresInternal: List<ListenableFuture<out V>>?
    private var values: MutableList<V?>?
    private val remaining: AtomicInteger
    private val result: ListenableFuture<List<V?>?>
    var resultNotifier: CallbackToFutureAdapter.Completer<List<V?>?>? = null

    init {
        futuresInternal = futures
        values = ArrayList(futures.size)
        remaining = AtomicInteger(futures.size)
        result = CallbackToFutureAdapter.getFuture(
            object : CallbackToFutureAdapter.Resolver<List<V?>?> {
                override fun attachCompleter(
                    completer: CallbackToFutureAdapter.Completer<List<V?>?>
                ): Any {
                    Preconditions.checkState(
                        resultNotifier == null,
                        "The result can only set once!"
                    )
                    resultNotifier = completer
                    return "ListFuture[$this]"
                }
            })
        init(listenerExecutor)
    }

    private fun init(listenerExecutor: Executor) {
        // First, schedule cleanup to execute when the Future is done.
        addListener({ // By now the values array has either been set as the Future's value,
            // or (in case of failure) is no longer useful.
            values = null

            // Let go of the memory held by other futuresInternal
            futuresInternal = null
        }, CameraExecutors.directExecutor())

        // Now begin the "real" initialization.

        // Corner case: List is empty.
        if (futuresInternal!!.isEmpty()) {
            resultNotifier!!.set(ArrayList(values!!))
            return
        }

        // Populate the results list with null initially.
        for (i in futuresInternal!!.indices) {
            values!!.add(null)
        }

        // Register a listener on each Future in the list to update
        // the state of this future.
        // Note that if all the futuresInternal on the list are done prior to completing
        // this loop, the last call to addListener() will callback to
        // setOneValue(), transitively call our cleanup listener, and set
        // futuresInternal to null.
        // We store a reference to futuresInternal to avoid the NPE.
        val localFutures = futuresInternal
        for (i in localFutures!!.indices) {
            val listenable = localFutures[i]
            listenable.addListener({ setOneValue(i, listenable) }, listenerExecutor)
        }
    }

    /**
     * Sets the value at the given index to that of the given future.
     */
    private fun setOneValue(index: Int, future: Future<out V>) {
        var localValues = values
        if (isDone || localValues == null) {
            // Some other future failed or has been cancelled, causing this one to
            // also be cancelled or have an exception set. This should only happen
            // if mAllMustSucceed is true.
            Preconditions.checkState(
                this.allMustSucceed,
                "Future was done before all dependencies completed"
            )
            return
        }
        try {
            Preconditions.checkState(
                future.isDone,
                "Tried to set value from future which is not done"
            )
            localValues[index] =
                Futures.getUninterruptibly(future)
        } catch (e: CancellationException) {
            if (this.allMustSucceed) {
                // Set ourselves as cancelled. Let the input futures keep running
                // as some of them may be used elsewhere.
                // (Currently we don't override interruptTask, so
                // mayInterruptIfRunning==false isn't technically necessary.)
                cancel(false)
            }
        } catch (e: ExecutionException) {
            if (this.allMustSucceed) {
                // As soon as the first one fails, throw the exception up.
                // The result of all other inputs is then ignored.
                resultNotifier!!.setException(e.cause!!)
            }
        } catch (e: RuntimeException) {
            if (this.allMustSucceed) {
                resultNotifier!!.setException(e)
            }
        } catch (e: Error) {
            // Propagate errors up ASAP - our superclass will rethrow the error
            resultNotifier!!.setException(e)
        } finally {
            val newRemaining = remaining.decrementAndGet()
            Preconditions.checkState(newRemaining >= 0, "Less than 0 remaining futures")
            if (newRemaining == 0) {
                localValues = values
                if (localValues != null) {
                    resultNotifier!!.set(ArrayList(localValues))
                } else {
                    Preconditions.checkState(isDone)
                }
            }
        }
    }

    override fun addListener(listener: Runnable, executor: Executor) {
        result.addListener(listener, executor)
    }

    override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
        if (futuresInternal != null) {
            for (f in futuresInternal!!) {
                f.cancel(mayInterruptIfRunning)
            }
        }
        return result.cancel(mayInterruptIfRunning)
    }

    override fun isCancelled(): Boolean {
        return result.isCancelled
    }

    override fun isDone(): Boolean {
        return result.isDone
    }

    @Throws(InterruptedException::class, ExecutionException::class)
    override fun get(): List<V?>? {
        callAllGets()

        // This may still block in spite of the calls above, as the listeners may
        // be scheduled for execution in other threads.
        return result.get()
    }

    @Throws(
        InterruptedException::class,
        ExecutionException::class,
        TimeoutException::class
    )
    override fun get(timeout: Long, unit: TimeUnit): List<V?>? {
        return result[timeout, unit]
    }

    /**
     * Calls the get method of all dependency futures to work around a bug in
     * some ListenableFutures where the listeners aren't called until get() is
     * called.
     */
    @Throws(InterruptedException::class)
    private fun callAllGets() {
        val oldFutures = futuresInternal
        if (oldFutures != null && !isDone) {
            for (future in oldFutures) {
                // We wait for a little while for the future, but if it's not done,
                // we check that no other futures caused a cancellation or failure.
                // This can introduce a delay of up to 10ms in reporting an exception.
                while (!future.isDone) {
                    try {
                        future.get()
                    } catch (e: Error) {
                        throw e
                    } catch (e: InterruptedException) {
                        throw e
                    } catch (e: Throwable) {
                        // ExecutionException / CancellationException / RuntimeException
                        if (this.allMustSucceed) {
                            return
                        } else {
                            continue
                        }
                    }
                }
            }
        }
    }
}