FrameDeferringContinuationInterceptor.kt
/*
* Copyright 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.compose.ui.test
import androidx.compose.ui.test.FrameDeferringContinuationInterceptor.FrameDeferredContinuation
import androidx.compose.ui.test.internal.DelayPropagatingContinuationInterceptorWrapper
import kotlin.coroutines.Continuation
import kotlin.coroutines.ContinuationInterceptor
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineDispatcher
/**
* A [ContinuationInterceptor] that wraps continuations in a [FrameDeferredContinuation] to
* defer dispatching while frame callbacks are being ran by [TestMonotonicFrameClock.performFrame].
*
* Only delegates to the parent/wrapped interceptor if it is a [CoroutineDispatcher] that says
* the continuation needs to be dispatched.
*/
@OptIn(InternalTestApi::class)
internal class FrameDeferringContinuationInterceptor(
parentInterceptor: ContinuationInterceptor?
) : DelayPropagatingContinuationInterceptorWrapper(parentInterceptor) {
private val parentDispatcher = parentInterceptor as? CoroutineDispatcher
private val toRunTrampolined = ArrayDeque<TrampolinedTask<*>>()
private val lock = Any()
private var isDeferringContinuations = false
val hasTrampolinedTasks: Boolean
get() = synchronized(lock) {
toRunTrampolined.isNotEmpty()
}
/**
* Runs [block] so that any continuations that are resumed on this dispatcher will not be
* dispatched until after [block] returns. All such continuations will be dispatched before
* this function returns.
*/
fun runWithoutResumingCoroutines(block: () -> Unit) {
synchronized(lock) {
check(!isDeferringContinuations)
isDeferringContinuations = true
}
// TODO(aosp/2121435) Tested in CL that adds onPerformTraversals callback.
try {
block()
} catch (e: Throwable) {
// We still need to resume any queued trampoline tasks so they don't hang, but any
// individual failures need to also report the traversal failure in case they win the
// race to cancel the test job. See the kdoc on resumeWithSuppressed for more
// information.
finishFrameTasks {
it.resumeWithSuppressed(e)
}
throw e
}
// Resume any continuations that were dispatched inside the frame callbacks, as well as the
// coroutines that called withFrameNanos.
finishFrameTasks {
it.resume()
}
}
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
val deferringContinuation = FrameDeferredContinuation(continuation)
// If we also ask the parent dispatcher to intercept the continuation when it doesn't
// need to dispatch, it may return a DispatchedContinuation and dispatch anyway.
return if (parentDispatcher?.isDispatchNeeded(continuation.context) == true) {
// Since the parent wraps the deferred continuation, if the parent is a dispatcher
// it will immediately _dispatch_ the continuation using whatever mechanism it
// otherwise would, but the underlying continuation won't actually resume until
// after the frame callbacks. If instead we asked the parent to wrap the original
// continuation and then wrapped the parent's in a FrameDeferredContinuation, the
// dispatch itself would not happen until after the frame callback. In practice,
// both should be effectively the same as long as nothing else is dispatching
// directly to the parent dispatcher without going through this wrapper (which it
// shouldn't, since the compose test harness installs both the clock and the
// dispatcher).
parentDispatcher.interceptContinuation(deferringContinuation)
} else {
deferringContinuation
}
}
/**
* Dequeues every [toRunTrampolined] task and passes it to [block], then sets
* [runningFrameCallbacks] to false. Does the proper synchronization to ensure that no
* trampolined tasks will be enqueued but not processed by this call until
* [runningFrameCallbacks] is explicitly set to true again.
*/
private inline fun finishFrameTasks(block: (TrampolinedTask<*>) -> Unit) {
do {
var task = nextTrampolinedTask()
while (task != null) {
block(task)
task = nextTrampolinedTask()
}
} while (
// We don't dispatch holding the lock so that other tasks can get in on our
// trampolining time slice, but once we're done, make sure nothing added a new task
// before we set runningFrameCallbacks = false, which would prevent the next dispatch
// from being correctly scheduled. Loop to run these stragglers now.
synchronized(lock) {
if (toRunTrampolined.isEmpty()) {
// Setting this to false means that once the lock is released, any dispatches
// will be delegated directly to the parent dispatcher again.
isDeferringContinuations = false
false
} else true
}
)
}
private fun nextTrampolinedTask(): TrampolinedTask<*>? = synchronized(lock) {
toRunTrampolined.removeFirstOrNull()
}
private class TrampolinedTask<T>(
private val continuation: Continuation<T>,
private val result: Result<T>
) {
fun resume() = continuation.resumeWith(result)
/**
* Resume the continuation with [result], but include [cause] as a suppressed exception if
* the result is a failure.
*
* If this method is being called, it means the [runWithoutResumingCoroutines]' block failed
* somehow after executing individual `withFrameNanos` callbacks. That failure is
* significant and should be reported as at least part of the test failure, by cancelling
* the root test job.
* If all the frame callbacks have a success result, then [performFrame] will throw its
* exception and cancel the test job, failing the test.
* However, if a `withFrameNanos` callback failed, and the underlying dispatcher is
* unconfined and the coroutine call stack doesn't block the exception, resuming the
* continuation with the exception result may end up bubbling up to the test job first
* and cancelling it and failing the test before the more general frame failure has a
* chance to. If that happens, we still want to report the frame failure somewhere, so
* we add it to the suppressed list of the individual failure's exception.
*
* TODO(b/255802670): It's still possible for a coroutine that is resumed successfully and
* dispatched synchronously to immediately throw _after_ returning, and thus still beat us
* to failing the test.
*/
fun resumeWithSuppressed(cause: Throwable) {
result.exceptionOrNull()?.addSuppressed(cause)
continuation.resumeWith(result)
}
}
/**
* A [Continuation] wrapper that defers dispatching continuations that are resumed while
* [TestMonotonicFrameClock.performFrame] is running frame callbacks. Such continuations are
* instead dispatched after all the frame callbacks have finished executing.
*/
private inner class FrameDeferredContinuation<T>(
private val continuation: Continuation<T>
) : Continuation<T> {
override val context: CoroutineContext
get() = continuation.context
override fun resumeWith(result: Result<T>) {
val defer = synchronized(lock) {
if (isDeferringContinuations) {
// Defer all continuations resumed while running frame callbacks until
// after the frame callbacks have finished running. This needs to be
// done while holding the lock to ensure the task actually gets executed
// by the current performFrame.
toRunTrampolined.addLast(TrampolinedTask(continuation, result))
true
} else {
false
}
}
// If resuming immediately, do so outside the critical section above to avoid
// deadlocking if the continuation tries to resume another continuation.
if (!defer) {
continuation.resumeWith(result)
}
}
}
}