
 * Copyright 2019 The Android Open Source Project
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package androidx.paging

import androidx.annotation.VisibleForTesting
import androidx.paging.LoadState.Error
import androidx.paging.LoadState.Loading
import androidx.paging.LoadState.NotLoading
import androidx.paging.LoadType.APPEND
import androidx.paging.LoadType.PREPEND
import androidx.paging.LoadType.REFRESH
import androidx.paging.PageEvent.LoadStateUpdate
import androidx.paging.PagingSource.LoadParams
import androidx.paging.PagingSource.LoadResult
import androidx.paging.PagingSource.LoadResult.Page
import androidx.paging.PagingSource.LoadResult.Page.Companion.COUNT_UNDEFINED
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.runningReduce
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicBoolean

 * Holds a generation of pageable data, a snapshot of data loaded by [PagingSource]. An instance
 * of [PageFetcherSnapshot] and its corresponding [PageFetcherSnapshotState] should be launched
 * within a scope that is cancelled when [PagingSource.invalidate] is called.
internal class PageFetcherSnapshot<Key : Any, Value : Any>(
    internal val initialKey: Key?,
    internal val pagingSource: PagingSource<Key, Value>,
    private val config: PagingConfig,
    private val retryFlow: Flow<Unit>,
    private val triggerRemoteRefresh: Boolean = false,
    val remoteMediatorConnection: RemoteMediatorConnection<Key, Value>? = null,
    private val invalidate: () -> Unit = {}
) {
    init {
        require(config.jumpThreshold == COUNT_UNDEFINED || pagingSource.jumpingSupported) {
            "PagingConfig.jumpThreshold was set, but the associated PagingSource has not marked " +
                "support for jumps by overriding PagingSource.jumpingSupported to true."

    private val hintSharedFlow = MutableSharedFlow<ViewportHint>(replay = 1)
    private var lastHint: ViewportHint.Access? = null

    private val pageEventChCollected = AtomicBoolean(false)
    private val pageEventCh = Channel<PageEvent<Value>>(BUFFERED)
    private val stateHolder = PageFetcherSnapshotState.Holder<Key, Value>(config = config)

    private val pageEventChannelFlowJob = Job()

    val pageEventFlow: Flow<PageEvent<Value>> = cancelableChannelFlow(pageEventChannelFlowJob) {
        check(pageEventChCollected.compareAndSet(false, true)) {
            "Attempt to collect twice from pageEventFlow, which is an illegal operation. Did you " +
                "forget to call Flow<PagingData<*>>.cachedIn(coroutineScope)?"

        // Start collection on pageEventCh, which the rest of this class uses to send PageEvents
        // to this flow.
        launch {
            pageEventCh.consumeAsFlow().collect {
                // Protect against races where a subsequent call to submitData invoked close(),
                // but a pageEvent arrives after closing causing ClosedSendChannelException.
                try {
                } catch (e: ClosedSendChannelException) {
                    // Safe to drop PageEvent here, since collection has been cancelled.

        // Wrap collection behind a RendezvousChannel to prevent the RetryChannel from buffering
        // retry signals.
        val retryChannel = Channel<Unit>(Channel.RENDEZVOUS)
        launch { retryFlow.collect { retryChannel.offer(it) } }

        // Start collection on retry signals.
        launch {
                .collect {
                    val (sourceLoadStates, remotePagingState) = stateHolder.withLock { state ->
                        state.sourceLoadStates to state.currentPagingState(lastHint)
                    // tell remote mediator to retry and it will trigger necessary work / change
                    // its state as necessary.
                    // change source (local) states
                    sourceLoadStates.forEach { loadType, loadState ->
                        if (loadState !is Error) return@forEach

                        // Reset error state before sending hint.
                        if (loadType != REFRESH) {
                            stateHolder.withLock { state -> state.setLoading(loadType) }

                            loadType = loadType,
                            viewportHint = when (loadType) {
                                // ViewportHint is only used when retrying source PREPEND / APPEND.
                                REFRESH -> null
                                else -> stateHolder.withLock { state ->

                        // If retrying REFRESH from PagingSource succeeds, start collection on
                        // ViewportHints for PREPEND / APPEND loads.
                        if (loadType == REFRESH) {
                            val newRefreshState = stateHolder.withLock { state ->

                            if (newRefreshState !is Error) {

        if (triggerRemoteRefresh) {
            remoteMediatorConnection?.let {
                val pagingState = stateHolder.withLock { state -> state.currentPagingState(null) }
                it.requestLoad(REFRESH, pagingState)

        // Setup finished, start the initial load even if RemoteMediator throws an error.

        // Only start collection on ViewportHints if the initial load succeeded.
        if (stateHolder.withLock { state -> state.sourceLoadStates.get(REFRESH) } !is Error) {

    private suspend fun retryLoadError(
        loadType: LoadType,
        viewportHint: ViewportHint?
    ) {
        when (loadType) {
            REFRESH -> {
            else -> {
                check(viewportHint != null) {
                    "Cannot retry APPEND / PREPEND load on PagingSource without ViewportHint"


    fun accessHint(viewportHint: ViewportHint) {
        if (viewportHint is ViewportHint.Access) {
            lastHint = viewportHint


    fun close() {

    suspend fun refreshKeyInfo(): PagingState<Key, Value>? {
        return stateHolder.withLock { state ->
            lastHint?.let { lastHint ->
                if (state.pages.isEmpty()) {
                    // Default to initialKey if no pages loaded.
                } else {

    private fun CoroutineScope.startConsumingHints() {
        // Pseudo-tiling via invalidation on jumps.
        if (config.jumpThreshold != COUNT_UNDEFINED) {
            launch {
                    .filter { hint ->
                        hint.presentedItemsBefore * -1 > config.jumpThreshold ||
                            hint.presentedItemsAfter * -1 > config.jumpThreshold
                    .collectLatest { invalidate() }

        launch {
            stateHolder.withLock { state -> state.consumePrependGenerationIdAsFlow() }

        launch {
            stateHolder.withLock { state -> state.consumeAppendGenerationIdAsFlow() }

     * Maps a [Flow] of generation ids from [PageFetcherSnapshotState] to [ViewportHint]s from
     * [hintSharedFlow] with back-pressure handling via conflation by prioritizing hints which
     * either update presenter state or those that would load the most items.
     * @param loadType [PREPEND] or [APPEND]
    private suspend fun Flow<Int>.collectAsGenerationalViewportHints(
        loadType: LoadType
    ) = flatMapLatest { generationId ->
        // Reset state to Idle and setup a new flow for consuming incoming load hints.
        // Subsequent generationIds are normally triggered by cancellation.
        stateHolder.withLock { state ->
            // Skip this generationId of loads if there is no more to load in this
            // direction. In the case of the terminal page getting dropped, a new
            // generationId will be sent after load state is updated to Idle.
            if (state.sourceLoadStates.get(loadType) == NotLoading.Complete) {
                return@flatMapLatest flowOf()
            } else if (state.sourceLoadStates.get(loadType) !is Error) {
                state.setSourceLoadState(loadType, NotLoading.Incomplete)

            // Prevent infinite loop when competing PREPEND / APPEND cancel each other
            .drop(if (generationId == 0) 0 else 1)
            .map { hint -> GenerationalViewportHint(generationId, hint) }
        // Prioritize new hints that would load the maximum number of items.
        .runningReduce { previous, next ->
            if (next.shouldPrioritizeOver(previous, loadType)) next else previous
        .collect { generationalHint ->
            doLoad(loadType, generationalHint)

    private fun loadParams(loadType: LoadType, key: Key?) = LoadParams.create(
        loadType = loadType,
        key = key,
        loadSize = if (loadType == REFRESH) config.initialLoadSize else config.pageSize,
        placeholdersEnabled = config.enablePlaceholders,

    private suspend fun doInitialLoad() {
        stateHolder.withLock { state -> state.setLoading(REFRESH) }

        val params = loadParams(REFRESH, initialKey)
        when (val result = pagingSource.load(params)) {
            is Page<Key, Value> -> {
                val insertApplied = stateHolder.withLock { state ->
                    state.insert(0, REFRESH, result)

                // Update loadStates which are sent along with this load's Insert PageEvent.
                stateHolder.withLock { state ->
                    state.setSourceLoadState(REFRESH, NotLoading.Incomplete)
                    if (result.prevKey == null) {
                            type = PREPEND,
                            newState = NotLoading.Complete
                    if (result.nextKey == null) {
                            type = APPEND,
                            newState = NotLoading.Complete

                // Send insert event after load state updates, so that endOfPaginationReached is
                // correctly reflected in the insert event. Note that we only send the event if the
                // insert was successfully applied in the case of cancellation due to page dropping.
                if (insertApplied) {
                    stateHolder.withLock { state ->
                        with(state) {

                // Launch any RemoteMediator boundary calls after applying initial insert.
                if (remoteMediatorConnection != null) {
                    if (result.prevKey == null || result.nextKey == null) {
                        val pagingState = stateHolder.withLock { state ->

                        if (result.prevKey == null) {
                            remoteMediatorConnection.requestLoad(PREPEND, pagingState)

                        if (result.nextKey == null) {
                            remoteMediatorConnection.requestLoad(APPEND, pagingState)
            is LoadResult.Error -> stateHolder.withLock { state ->
                val loadState = Error(result.throwable)
                if (state.setSourceLoadState(REFRESH, loadState)) {
                    pageEventCh.send(LoadStateUpdate(REFRESH, false, loadState))

    // TODO: Consider making this a transform operation which emits PageEvents
    private suspend fun doLoad(
        loadType: LoadType,
        generationalHint: GenerationalViewportHint
    ) {
        require(loadType != REFRESH) { "Use doInitialLoad for LoadType == REFRESH" }

        // If placeholder counts differ between the hint and PageFetcherSnapshotState, then
        // assume fetcher is ahead of presenter and account for the difference in itemsLoaded.
        var itemsLoaded = 0
        stateHolder.withLock { state ->
            when (loadType) {
                PREPEND -> {
                    val firstPageIndex =
                        state.initialPageIndex + generationalHint.hint.originalPageOffsetFirst - 1
                    for (pageIndex in 0..firstPageIndex) {
                        itemsLoaded += state.pages[pageIndex].data.size
                APPEND -> {
                    val lastPageIndex =
                        state.initialPageIndex + generationalHint.hint.originalPageOffsetLast + 1
                    for (pageIndex in lastPageIndex..state.pages.lastIndex) {
                        itemsLoaded += state.pages[pageIndex].data.size
                REFRESH -> throw IllegalStateException("Use doInitialLoad for LoadType == REFRESH")

        var loadKey: Key? = stateHolder.withLock { state ->
                generationalHint.presentedItemsBeyondAnchor(loadType) + itemsLoaded,
            )?.also { state.setLoading(loadType) }

        // Keep track of whether endOfPaginationReached so we can update LoadState accordingly when
        // this load loop terminates due to fulfilling prefetchDistance.
        var endOfPaginationReached = false
        loop@ while (loadKey != null) {
            val params = loadParams(loadType, loadKey)
            val result: LoadResult<Key, Value> = pagingSource.load(params)
            when (result) {
                is Page<Key, Value> -> {
                    // First, check for common error case where the same key is re-used to load
                    // new pages, often resulting in infinite loops.
                    val nextKey = when (loadType) {
                        PREPEND -> result.prevKey
                        APPEND -> result.nextKey
                        else -> throw IllegalArgumentException(
                            "Use doInitialLoad for LoadType == REFRESH"

                    check(pagingSource.keyReuseSupported || nextKey != loadKey) {
                        val keyFieldName = if (loadType == PREPEND) "prevKey" else "nextKey"
                        """The same value, $loadKey, was passed as the $keyFieldName in two
                            | sequential Pages loaded from a PagingSource. Re-using load keys in
                            | PagingSource is often an error, and must be explicitly enabled by
                            | overriding PagingSource.keyReuseSupported.

                    val insertApplied = stateHolder.withLock { state ->
                        state.insert(generationalHint.generationId, loadType, result)

                    // Break if insert was skipped due to cancellation
                    if (!insertApplied) break@loop

                    itemsLoaded +=

                    // Set endOfPaginationReached to false if no more data to load in current
                    // direction.
                    if ((loadType == PREPEND && result.prevKey == null) ||
                        (loadType == APPEND && result.nextKey == null)
                    ) {
                        endOfPaginationReached = true
                is LoadResult.Error -> {
                    stateHolder.withLock { state ->
                        val loadState = Error(result.throwable)
                        if (state.setSourceLoadState(loadType, loadState)) {
                            pageEventCh.send(LoadStateUpdate(loadType, false, loadState))

                        // Save the hint for retry on incoming retry signal, typically sent from
                        // user interaction.
                        state.failedHintsByLoadType[loadType] = generationalHint.hint

            val dropType = when (loadType) {
                PREPEND -> APPEND
                else -> PREPEND

            stateHolder.withLock { state ->
                state.dropEventOrNull(dropType, generationalHint.hint)?.let { event ->

                loadKey = state.nextLoadKeyOrNull(
                    generationalHint.presentedItemsBeyondAnchor(loadType) + itemsLoaded,

                // Update load state to success if this is the final load result for this
                // load hint, and only if we didn't error out.
                if (loadKey == null && state.sourceLoadStates.get(loadType) !is Error) {
                        type = loadType,
                        newState = when {
                            endOfPaginationReached -> NotLoading.Complete
                            else -> NotLoading.Incomplete

                // Send page event for successful insert, now that PagerState has been updated.
                val pageEvent = with(state) {


            val endsPrepend = params is LoadParams.Prepend && result.prevKey == null
            val endsAppend = params is LoadParams.Append && result.nextKey == null
            if (remoteMediatorConnection != null && (endsPrepend || endsAppend)) {
                val pagingState = stateHolder.withLock { state ->

                if (endsPrepend) {
                    remoteMediatorConnection.requestLoad(PREPEND, pagingState)

                if (endsAppend) {
                    remoteMediatorConnection.requestLoad(APPEND, pagingState)

    private suspend fun PageFetcherSnapshotState<Key, Value>.setLoading(loadType: LoadType) {
        if (setSourceLoadState(loadType, Loading)) {
                LoadStateUpdate(loadType, fromMediator = false, Loading)

     * The next load key for a [loadType] or `null` if we should stop loading in that direction.
    private fun PageFetcherSnapshotState<Key, Value>.nextLoadKeyOrNull(
        loadType: LoadType,
        generationId: Int,
        presentedItemsBeyondAnchor: Int
    ): Key? {
        if (generationId != generationId(loadType)) return null
        // Skip load if in error state, unless retrying.
        if (sourceLoadStates.get(loadType) is Error) return null

        // Skip loading if prefetchDistance has been fulfilled.
        if (presentedItemsBeyondAnchor >= config.prefetchDistance) return null

        return if (loadType == PREPEND) {
        } else {

 * Generation of cancel token not [PageFetcherSnapshot]. [generationId] is used to differentiate
 * between loads from jobs that have been cancelled, but continued to run to completion.
internal data class GenerationalViewportHint(val generationId: Int, val hint: ViewportHint) {
     * @return Count of presented items between [hint], and either:
     *  * the beginning of the list if [loadType] == PREPEND
     *  * the end of the list if loadType == APPEND
    internal fun presentedItemsBeyondAnchor(loadType: LoadType): Int = when (loadType) {
        REFRESH -> throw IllegalArgumentException(
            "Cannot get presentedItems for loadType: REFRESH"
        PREPEND -> hint.presentedItemsBefore
        APPEND -> hint.presentedItemsAfter

 * Helper for [GenerationalViewportHint] prioritization in cases where item accesses are being sent
 * to PageFetcherSnapshot] faster than they can be processed. A [GenerationalViewportHint] is
 * prioritized if it represents an update to presenter state or if it would cause
 * [PageFetcherSnapshot] to load more items.
 * @param previous [GenerationalViewportHint] that would normally be processed next if [this]
 * [GenerationalViewportHint] was not sent.
 * @return `true` if [this] [GenerationalViewportHint] should be prioritized over [previous].
internal fun GenerationalViewportHint.shouldPrioritizeOver(
    previous: GenerationalViewportHint,
    loadType: LoadType
): Boolean {
    return when {
        // Prioritize hints from new generations, which increments after dropping.
        generationId > previous.generationId -> true
        // Prioritize Access hints over Initialize hints
        previous.hint is ViewportHint.Initial && hint is ViewportHint.Access -> true
        hint is ViewportHint.Initial && previous.hint is ViewportHint.Access -> false
        // Prioritize hints from most recent presenter state
        hint.originalPageOffsetFirst != previous.hint.originalPageOffsetFirst -> true
        hint.originalPageOffsetLast != previous.hint.originalPageOffsetLast -> true
        // Prioritize hints that would load the most items in PREPEND direction.
        loadType == PREPEND && previous.hint.presentedItemsBefore < hint.presentedItemsBefore -> {
        // Prioritize hints that would load the most items in APPEND direction.
        loadType == APPEND && previous.hint.presentedItemsAfter < hint.presentedItemsAfter -> {
        else -> true