RxRoom.java

/*
 * Copyright 2020 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.room.rxjava3;

import androidx.annotation.NonNull;
import androidx.annotation.RestrictTo;
import androidx.room.InvalidationTracker;
import androidx.room.RoomDatabase;

import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeSource;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
 * Helper class to add RxJava3 support to Room.
 */
public final class RxRoom {
    /**
     * Data dispatched by the publisher created by {@link #createFlowable(RoomDatabase, String...)}.
     */
    @NonNull
    public static final Object NOTHING = new Object();

    /**
     * Creates a {@link Flowable} that emits at least once and also re-emits whenever one of the
     * observed tables is updated.
     * <p>
     * You can easily chain a database operation to downstream of this {@link Flowable} to ensure
     * that it re-runs when database is modified.
     * <p>
     * Since database invalidation is batched, multiple changes in the database may results in just
     * 1 emission.
     *
     * @param database   The database instance
     * @param tableNames The list of table names that should be observed
     * @return A {@link Flowable} which emits {@link #NOTHING} when one of the observed tables
     * is modified (also once when the invalidation tracker connection is established).
     */
    @NonNull
    public static Flowable<Object> createFlowable(@NonNull final RoomDatabase database,
            @NonNull final String... tableNames) {
        return Flowable.create(emitter -> {
            final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                    tableNames) {
                @Override
                public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(NOTHING);
                    }
                }
            };
            if (!emitter.isCancelled()) {
                database.getInvalidationTracker().addObserver(observer);
                emitter.setDisposable(Disposable.fromAction(
                        () -> database.getInvalidationTracker().removeObserver(observer)));
            }

            // emit once to avoid missing any data and also easy chaining
            if (!emitter.isCancelled()) {
                emitter.onNext(NOTHING);
            }
        }, BackpressureStrategy.LATEST);
    }

    /**
     * Helper method used by generated code to bind a Callable such that it will be run in
     * our disk io thread and will automatically block null values since RxJava3 does not like null.
     *
     * @hide
     */
    @NonNull
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Flowable<T> createFlowable(@NonNull final RoomDatabase database,
            final boolean inTransaction, @NonNull final String[] tableNames,
            @NonNull final Callable<T> callable) {
        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createFlowable(database, tableNames)
                .subscribeOn(scheduler)
                .unsubscribeOn(scheduler)
                .observeOn(scheduler)
                .flatMapMaybe((Function<Object, MaybeSource<T>>) o -> maybe);
    }

    /**
     * Creates a {@link Observable} that emits at least once and also re-emits whenever one of the
     * observed tables is updated.
     * <p>
     * You can easily chain a database operation to downstream of this {@link Observable} to ensure
     * that it re-runs when database is modified.
     * <p>
     * Since database invalidation is batched, multiple changes in the database may results in just
     * 1 emission.
     *
     * @param database   The database instance
     * @param tableNames The list of table names that should be observed
     * @return A {@link Observable} which emits {@link #NOTHING} when one of the observed tables
     * is modified (also once when the invalidation tracker connection is established).
     */
    @NonNull
    public static Observable<Object> createObservable(@NonNull final RoomDatabase database,
            @NonNull final String... tableNames) {
        return Observable.create(emitter -> {
            final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                    tableNames) {
                @Override
                public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                    emitter.onNext(NOTHING);
                }
            };
            database.getInvalidationTracker().addObserver(observer);
            emitter.setDisposable(Disposable.fromAction(
                    () -> database.getInvalidationTracker().removeObserver(observer)));

            // emit once to avoid missing any data and also easy chaining
            emitter.onNext(NOTHING);
        });
    }

    /**
     * Helper method used by generated code to bind a Callable such that it will be run in
     * our disk io thread and will automatically block null values since RxJava3 does not like null.
     *
     * @hide
     */
    @NonNull
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Observable<T> createObservable(@NonNull final RoomDatabase database,
            final boolean inTransaction, @NonNull final String[] tableNames,
            @NonNull final Callable<T> callable) {
        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createObservable(database, tableNames)
                .subscribeOn(scheduler)
                .unsubscribeOn(scheduler)
                .observeOn(scheduler)
                .flatMapMaybe(o -> maybe);
    }

    /**
     * Helper method used by generated code to create a Single from a Callable that will ignore
     * the EmptyResultSetException if the stream is already disposed.
     *
     * @hide
     */
    @NonNull
    @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
    public static <T> Single<T> createSingle(@NonNull final Callable<T> callable) {
        return Single.create(emitter -> {
            try {
                emitter.onSuccess(callable.call());
            } catch (EmptyResultSetException e) {
                emitter.tryOnError(e);
            }
        });
    }

    private static Executor getExecutor(@NonNull RoomDatabase database, boolean inTransaction) {
        if (inTransaction) {
            return database.getTransactionExecutor();
        } else {
            return database.getQueryExecutor();
        }
    }

    private RxRoom() {
    }
}