Multicaster.kt
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.paging.multicast
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.transform
/**
* Like a publish, shares 1 upstream value with multiple downstream receiver.
* It has one store specific behavior where upstream flow is suspended until at least 1 downstream
* flow emits the value to ensure we don't abuse the upstream flow of downstream cannot keep up.
*/
internal class Multicaster<T>(
/**
* The [CoroutineScope] to use for upstream subscription
*/
private val scope: CoroutineScope,
/**
* The buffer size that is used only if the upstream has not complete yet.
* Defaults to 0.
*/
bufferSize: Int = 0,
/**
* Source function to create a new flow when necessary.
*/
private val source: Flow<T>,
/**
* If true, downstream is never closed by the multicaster unless upstream throws an error.
* Instead, it is kept open and if a new downstream shows up that causes us to restart the flow,
* it will receive values as well.
*/
private val piggybackingDownstream: Boolean = false,
/**
* Called when upstream dispatches a value.
*/
private val onEach: suspend (T) -> Unit,
private val keepUpstreamAlive: Boolean = false
) {
private val channelManager by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {
ChannelManager(
scope = scope,
bufferSize = bufferSize,
upstream = source,
piggybackingDownstream = piggybackingDownstream,
onEach = onEach,
keepUpstreamAlive = keepUpstreamAlive
)
}
val flow: Flow<T> = flow {
val channel = Channel<ChannelManager.Message.Dispatch.Value<T>>(Channel.UNLIMITED)
val subFlow = channel.consumeAsFlow()
.onStart {
channelManager.addDownstream(channel)
}
.transform {
emit(it.value)
it.delivered.complete(Unit)
}.onCompletion {
channelManager.removeDownstream(channel)
}
emitAll(subFlow)
}
suspend fun close() {
channelManager.close()
}
}