RxDataStore.kt
/*
* Copyright 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:JvmName("RxDataStore")
package androidx.datastore.rxjava3
import androidx.datastore.core.DataStore
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.functions.Function
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.rx3.asFlowable
import kotlinx.coroutines.rx3.asSingle
import kotlinx.coroutines.rx3.await
/**
* Gets a reactivex.Flowable of the data from DataStore. See [DataStore.data] for more information.
*
* Provides efficient, cached (when possible) access to the latest durably persisted state.
* The flow will always either emit a value or throw an exception encountered when attempting
* to read from disk. If an exception is encountered, collecting again will attempt to read the
* data again.
*
* Do not layer a cache on top of this API: it will be be impossible to guarantee consistency.
* Instead, use data.first() to access a single snapshot.
*
* The Flowable will complete with an IOException when an exception is encountered when reading
* data.
*
* @return a flow representing the current state of the data
*/
@ExperimentalCoroutinesApi
public fun <T : Any> DataStore<T>.data(): Flowable<T> {
return this.data.asFlowable()
}
/**
* See [DataStore.updateData]
*
* Updates the data transactionally in an atomic read-modify-write operation. All operations
* are serialized, and the transform itself is a async so it can perform heavy work
* such as RPCs.
*
* The Single completes when the data has been persisted durably to disk (after which
* [data] will reflect the update). If the transform or write to disk fails, the
* transaction is aborted and the returned Single is completed with the error.
*
* The transform will be run on the scheduler that DataStore was constructed with.
*
* @return the snapshot returned by the transform
* @throws Exception when thrown by the transform function
*/
@ExperimentalCoroutinesApi
public fun <T : Any> DataStore<T>.updateDataAsync(transform: Function<T, Single<T>>): Single<T> {
return CoroutineScope(Dispatchers.Unconfined).async {
this@updateDataAsync.updateData {
transform.apply(it).await()
}
}.asSingle(Dispatchers.Unconfined)
}