/*
* Copyright 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.paging
import androidx.paging.RemoteMediator.MediatorResult
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
/**
* Usage of [RemoteMediator] within [PageFetcher] and [PageFetcherSnapshot] should always be
* accessed behind this class, which handles state tracking of active remote jobs.
*/
@OptIn(ExperimentalPagingApi::class)
internal class RemoteMediatorAccessor<Key : Any, Value : Any>(
private val remoteMediator: RemoteMediator<Key, Value>
) {
private val jobsByLoadTypeLock = Mutex()
private val jobsByLoadType = HashMap<LoadType, Deferred<MediatorResult>>()
suspend fun initialize(): RemoteMediator.InitializeAction {
return remoteMediator.initialize()
}
/**
* Launches a remote load request with the backing [MediatorResult] if no current
* [kotlinx.coroutines.Job] for the passed [LoadType] is running, otherwise returns the
* result of the existing [kotlinx.coroutines.Job].
*/
internal suspend fun load(
scope: CoroutineScope,
loadType: LoadType,
state: PagingState<Key, Value>
): MediatorResult {
val deferred = jobsByLoadTypeLock.withLock {
if (jobsByLoadType[loadType]?.isActive != true) {
// List of RemoteMediator.load jobs that were registered prior to this one.
val existingJobs = jobsByLoadType.values.toList() // Immutable copy.
val existingBoundaryJobs = listOfNotNull(
jobsByLoadType[LoadType.PREPEND],
jobsByLoadType[LoadType.APPEND]
)
// Launch the actual call to RemoteMediator.load asynchronously to release
// jobsByLoadTypeLock.
jobsByLoadType[loadType] = scope.async {
doLoad(loadType, state, existingJobs, existingBoundaryJobs)
}
}
jobsByLoadType.getValue(loadType)
}
return deferred.await()
}
private suspend fun doLoad(
loadType: LoadType,
state: PagingState<Key, Value>,
existingJobs: List<Job>,
existingBoundaryJobs: List<Job>
): MediatorResult {
if (loadType == LoadType.REFRESH) {
// Since RemoteMediator is expected to perform writes to the local DB
// in the common case, it's not safe to just cancel and proceed with
// REFRESH here. If we do that, the REFRESH could race with e.g. the
// START job, and it's unsafe for an old START to land in the DB after
// a newer REFRESH. Due to cooperative cancellation, the START job may
// not actually realize it's cancelled before performing its write.
existingBoundaryJobs.forEach { it.cancel() }
existingBoundaryJobs.joinAll()
}
// Only allow one active RemoteMediator.load at a time, by joining all jobs
// registered in jobsByLoadType before this one.
existingJobs.forEach { it.join() }
return remoteMediator.load(loadType, state)
}
}