Recomposer.kt
/*
* Copyright 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.compose.runtime
import androidx.compose.runtime.dispatch.BroadcastFrameClock
import androidx.compose.runtime.dispatch.DefaultMonotonicFrameClock
import androidx.compose.runtime.dispatch.MonotonicFrameClock
import androidx.compose.runtime.snapshots.MutableSnapshot
import androidx.compose.runtime.snapshots.Snapshot
import androidx.compose.runtime.snapshots.SnapshotApplyResult
import androidx.compose.runtime.snapshots.SnapshotReadObserver
import androidx.compose.runtime.snapshots.SnapshotWriteObserver
import androidx.compose.runtime.snapshots.takeMutableSnapshot
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.coroutines.resume
// TODO: Can we use rootKey for this since all compositions will have an eventual Recomposer parent?
private const val RecomposerCompoundHashKey = 1000
/**
* Runs [block] with a new, active [Recomposer] applying changes in the calling [CoroutineContext].
*/
suspend fun <R> withRunningRecomposer(
block: suspend CoroutineScope.(recomposer: Recomposer) -> R
): R = coroutineScope {
val recomposerJob = Job(coroutineContext[Job])
val recomposer = Recomposer(coroutineContext + recomposerJob)
// Will be cancelled when recomposerJob cancels
launch { recomposer.runRecomposeAndApplyChanges() }
try {
block(recomposer)
} finally {
recomposerJob.cancel()
}
}
/**
* The scheduler for performing recomposition and applying updates to one or more [Composition]s.
*/
// RedundantVisibilityModifier suppressed because metalava picks up internal function overrides
// if 'internal' is not explicitly specified - b/171342041
@Suppress("RedundantVisibilityModifier")
@OptIn(
ExperimentalComposeApi::class,
InternalComposeApi::class
)
class Recomposer(
effectCoroutineContext: CoroutineContext
) : CompositionReference() {
/**
* This collection is its own lock, shared with [invalidComposersAwaiter]
*/
private val invalidComposers = mutableSetOf<Composer<*>>()
/**
* The continuation to resume when there are invalid composers to process.
*/
private var invalidComposersAwaiter: Continuation<Unit>? = null
/**
* Track if any outstanding invalidated composers are awaiting recomposition.
* This latch is closed any time we resume invalidComposersAwaiter and opened
* by [recomposeAndApplyChanges] when it suspends when it has no further work to do.
*/
private val idlingLatch = Latch()
private val broadcastFrameClock = BroadcastFrameClock {
synchronized(invalidComposers) {
invalidComposersAwaiter?.let {
invalidComposersAwaiter = null
idlingLatch.closeLatch()
it.resume(Unit)
}
}
}
private val runningRecomposeJobOrException = AtomicReference<Any?>(null)
/**
* A [Job] used as a parent of any effects created by this [Recomposer]'s compositions.
*/
private val effectJob = Job(effectCoroutineContext[Job]).apply {
invokeOnCompletion { throwable ->
// Since the running recompose job is operating in a disjoint job if present,
// kick it out and make sure no new ones start.
val cancellation = throwable ?: CancellationException("Recomposer completed")
val old = runningRecomposeJobOrException.getAndSet(cancellation)
if (old is Job) {
old.cancel(CancellationException("Recomposer cancelled", cancellation))
}
}
}
/**
* The [effectCoroutineContext] is derived from [effectCoroutineContext]
*/
internal override val effectCoroutineContext: CoroutineContext =
effectCoroutineContext + broadcastFrameClock + effectJob
/**
* This collection is its own lock. When a composer performs initial composition it is added
* to this collection; when it is unregistered it is removed. Any composers in this set will
* receive state modification events from this [Recomposer] when snapshots change.
*/
private val recomposableComposers = mutableSetOf<Composer<*>>()
/**
* Record any changes in [snapshotChanges] with the currently registered
* [recomposableComposers], then drains [appliedChanges] of any further snapshot change sets
* and records them with the composers before returning.
*/
private fun recordModificationsForComposers(
snapshotChanges: Set<Any>,
appliedChanges: ReceiveChannel<Set<Any>>
) {
synchronized(recomposableComposers) {
recomposableComposers.forEach {
it.recordModificationsOf(snapshotChanges)
}
// Strange loop to keep nullability smart casts happy
while (true) {
appliedChanges.poll()?.also { additionalChanges ->
recomposableComposers.forEach {
it.recordModificationsOf(additionalChanges)
}
} ?: break
}
}
}
/**
* Await the invalidation of any associated [Composer]s, recompose them, and apply their
* changes to their associated [Composition]s if recomposition is successful.
*
* While [runRecomposeAndApplyChanges] is running, [awaitIdle] will suspend until there are no
* more invalid composers awaiting recomposition.
*
* This method never returns. Cancel the calling [CoroutineScope] to stop.
*/
@Suppress("DEPRECATION") // Remove once recomposeAndApplyChanges is removed from API
suspend fun runRecomposeAndApplyChanges(): Nothing {
recomposeAndApplyChanges(Long.MAX_VALUE)
error("this function never returns")
}
/**
* Await the invalidation of any associated [Composer]s, recompose them, and apply their
* changes to their associated [Composition]s if recomposition is successful.
*
* While [runRecomposeAndApplyChanges] is running, [awaitIdle] will suspend until there are no
* more invalid composers awaiting recomposition.
*
* This method returns after recomposing [frameCount] times, or throws [CancellationException]
* if the [Recomposer] is [shutDown] or if the [effectCoroutineContext] used to construct the
* [Recomposer] is cancelled.
*/
@Deprecated("Drive the calling MonotonicFrameClock to control recomposition by frame")
suspend fun recomposeAndApplyChanges(frameCount: Long) {
val parentFrameClock = coroutineContext[MonotonicFrameClock] ?: DefaultMonotonicFrameClock
withContext(broadcastFrameClock) {
// Enforce mutual exclusion of callers
val myJob = coroutineContext[Job]
while (true) {
when (val old = runningRecomposeJobOrException.get()) {
is Exception -> throw CancellationException("Recomposer cancelled", old)
is Job -> error("Recomposition is already running")
null -> if (runningRecomposeJobOrException.compareAndSet(null, myJob)) break
}
}
var framesRemaining = frameCount
val toRecompose = mutableListOf<Composer<*>>()
// Due to a breaking change in Coroutines 1.4.0, some update events might be dropped.
// see: https://github.com/Kotlin/kotlinx.coroutines/releases/tag/1.4.0-M1
// For Compose, we prefer them not to be lost hence we try to re-offer them back to
// the channel.
// see: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/
var addChangeBack: ((value: Set<Any>) -> Unit)? = null
val appliedChanges = Channel<Set<Any>>(Channel.UNLIMITED) { undeliveredChanges ->
addChangeBack?.invoke(undeliveredChanges)
}
addChangeBack = {
try {
appliedChanges.offer(it)
} catch (ignored: ClosedSendChannelException) {}
}
// unregisterApplyObserver is called as part of the big finally below
val unregisterApplyObserver = Snapshot.registerApplyObserver { changed, _ ->
appliedChanges.offer(changed)
}
try {
idlingLatch.closeLatch()
synchronized(recomposableComposers) {
// Invalidate everyone we know about since we weren't listening
// for snapshot changes while the recomposer wasn't running
recomposableComposers.forEach { it.invalidateAll() }
}
while (frameCount == Long.MAX_VALUE || framesRemaining-- > 0L) {
// Don't hold the monitor lock across suspension.
val hasInvalidComposers = synchronized(invalidComposers) {
invalidComposers.isNotEmpty()
}
if (!hasInvalidComposers && !broadcastFrameClock.hasAwaiters) {
// Listen for snapshot invalidations while we're suspending for reasons
// to perform a frame. Recording modifications with a composer might
// make us wake up. invalidator will cancelAndJoin below after wakeup
// ensuring we will not work with any one composer in parallel during
// the frame.
val invalidator = launch {
for (snapshotChanges in appliedChanges) {
recordModificationsForComposers(snapshotChanges, appliedChanges)
}
}
// Suspend until we have something to do. Work might come from:
// * Waking up for new invalidations as a result of the launched
// invalidator job above
// * Waking up for new invalidations as a result of other external
// composer invalidate calls, e.g. the manual invalidate composable
// * Waking up for new broadcast frame awaiters
suspendCancellableCoroutine<Unit> { co ->
synchronized(invalidComposers) {
if (invalidComposers.isEmpty()) {
invalidComposersAwaiter = co
idlingLatch.openLatch()
} else {
// We raced and lost, someone invalidated between our check
// and suspension. Resume immediately.
co.resume(Unit)
return@suspendCancellableCoroutine
}
}
co.invokeOnCancellation {
synchronized(invalidComposers) {
if (invalidComposersAwaiter === co) {
invalidComposersAwaiter = null
}
}
}
}
// Stop listening to frame invalidations for now, we'll catch the rest
// when we actually reach the next frame below. It's important that we not
// modify/operate a composer in parallel with itself, which might happen
// with recording changes vs. recomposing if we are on a parallel
// dispatcher.
invalidator.cancelAndJoin()
}
// Align work with the next frame to coalesce changes.
// Note: it is possible to resume from the above with no recompositions pending,
// instead someone might be awaiting our frame clock dispatch below.
// We use the cached frame clock from above not just so that we don't locate it
// each time, but because we've installed the broadcastFrameClock as the scope
// clock above for user code to locate.
parentFrameClock.withFrameNanos { frameTime ->
trace("recomposeFrame") {
// Propagate the frame time to anyone who is awaiting from the
// recomposer clock.
broadcastFrameClock.sendFrame(frameTime)
// Ensure any global changes are observed
Snapshot.sendApplyNotifications()
// Drain any composer invalidations from snapshot changes
appliedChanges.poll()?.let { snapshotChanges ->
recordModificationsForComposers(snapshotChanges, appliedChanges)
}
// ...and pick up any stragglers as a result of the above snapshot sync
synchronized(invalidComposers) {
toRecompose.addAll(invalidComposers)
invalidComposers.clear()
}
// Actually perform recomposition for any invalidated composers
if (toRecompose.isNotEmpty()) {
for (i in 0 until toRecompose.size) {
performRecompose(toRecompose[i])
}
toRecompose.clear()
}
}
}
}
} finally {
unregisterApplyObserver()
// Only replace the value if it currently matches; a new caller may have already
// set its own job as a replacement before we resume to cancel.
runningRecomposeJobOrException.compareAndSet(myJob, null)
// If we're not still running frames, we're effectively idle.
idlingLatch.openLatch()
}
}
}
/**
* Permanently shut down this [Recomposer] for future use. All ongoing recompositions will stop,
* new composer invalidations with this [Recomposer] at the root will no longer occur,
* and any [LaunchedEffect]s currently running in compositions managed by this [Recomposer]
* will be cancelled. Any [rememberCoroutineScope] scopes from compositions managed by this
* [Recomposer] will also be cancelled. See [join] to await the completion of all of these
* outstanding tasks.
*/
fun shutDown() {
effectJob.cancel()
}
/**
* Await the completion of a [shutDown] operation.
*/
suspend fun join() {
effectJob.join()
}
internal override fun composeInitial(
composer: Composer<*>,
composable: @Composable () -> Unit
) {
val composerWasComposing = composer.isComposing
composing(composer) {
composer.composeInitial(composable)
}
// TODO(b/143755743)
if (!composerWasComposing) {
Snapshot.notifyObjectsInitialized()
}
composer.applyChanges()
synchronized(recomposableComposers) {
recomposableComposers.add(composer)
}
if (!composerWasComposing) {
// Ensure that any state objects created during applyChanges are seen as changed
// if modified after this call.
Snapshot.notifyObjectsInitialized()
}
}
private fun performRecompose(composer: Composer<*>): Boolean {
if (composer.isComposing || composer.isDisposed) return false
return composing(composer) {
composer.recompose().also {
Snapshot.notifyObjectsInitialized()
composer.applyChanges()
}
}
}
private fun readObserverOf(composer: Composer<*>): SnapshotReadObserver {
return { value -> composer.recordReadOf(value) }
}
private fun writeObserverOf(composer: Composer<*>): SnapshotWriteObserver {
return { value -> composer.recordWriteOf(value) }
}
private inline fun <T> composing(composer: Composer<*>, block: () -> T): T {
val snapshot = takeMutableSnapshot(
readObserverOf(composer), writeObserverOf(composer)
)
try {
return snapshot.enter(block)
} finally {
applyAndCheck(snapshot)
}
}
private fun applyAndCheck(snapshot: MutableSnapshot) {
val applyResult = snapshot.apply()
if (applyResult is SnapshotApplyResult.Failure) {
error(
"Unsupported concurrent change during composition. A state object was " +
"modified by composition as well as being modified outside composition."
)
// TODO(chuckj): Consider lifting this restriction by forcing a recompose
}
}
/**
* Returns true if any pending invalidations have been scheduled.
*/
fun hasInvalidations(): Boolean =
!idlingLatch.isOpen || synchronized(invalidComposers) { invalidComposers.isNotEmpty() }
/**
* Suspends until the currently pending recomposition frame is complete.
* Any recomposition for this recomposer triggered by actions before this call begins
* will be complete and applied (if recomposition was successful) when this call returns.
*
* If [runRecomposeAndApplyChanges] is not currently running the [Recomposer] is considered idle
* and this method will not suspend.
*/
suspend fun awaitIdle(): Unit = idlingLatch.await()
// Recomposer always starts with a constant compound hash
internal override val compoundHashKey: Int
get() = RecomposerCompoundHashKey
// Collecting key sources happens at the level of a composer; starts as false
internal override val collectingKeySources: Boolean
get() = false
internal override fun recordInspectionTable(table: MutableSet<SlotTable>) {
// TODO: The root recomposer might be a better place to set up inspection
// than the current configuration with an ambient
}
internal override fun registerComposerWithRoot(composer: Composer<*>) {
// Do nothing.
}
internal override fun unregisterComposerWithRoot(composer: Composer<*>) {
synchronized(recomposableComposers) {
recomposableComposers.remove(composer)
}
synchronized(invalidComposers) {
invalidComposers.remove(composer)
}
}
internal override fun invalidate(composer: Composer<*>) {
synchronized(invalidComposers) {
invalidComposers.add(composer)
invalidComposersAwaiter?.let {
invalidComposersAwaiter = null
idlingLatch.closeLatch()
it.resume(Unit)
}
}
}
companion object {
@OptIn(ExperimentalCoroutinesApi::class)
private val mainRecomposer: Recomposer by lazy {
val embeddingContext = EmbeddingContext()
val mainScope = CoroutineScope(
NonCancellable + embeddingContext.mainThreadCompositionContext()
)
Recomposer(mainScope.coroutineContext).also {
// NOTE: Launching undispatched so that compositions created with the
// Recomposer.current() singleton instance can assume the recomposer is running
// when they perform initial composition. The relevant Recomposer code is
// appropriately thread-safe for this.
mainScope.launch(start = CoroutineStart.UNDISPATCHED) {
it.runRecomposeAndApplyChanges()
}
}
}
/**
* Retrieves [Recomposer] for the current thread. Needs to be the main thread.
*/
@TestOnly
fun current(): Recomposer = mainRecomposer
}
}