LiveDataObservable.java
/*
* Copyright 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.camera.core.impl;
import android.os.SystemClock;
import androidx.annotation.GuardedBy;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.RequiresApi;
import androidx.camera.core.impl.utils.executor.CameraXExecutors;
import androidx.concurrent.futures.CallbackToFutureAdapter;
import androidx.core.util.Preconditions;
import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* An observable implemented using {@link LiveData}.
*
* <p>While this class can provide error reporting, it is prone to other issues. First, all updates
* will originate from the main thread before being sent to the observer's executor. Second, there
* exists the possibility of error and value elision. This means that some posted values and some
* posted errors may be ignored if a newer error/value is posted before the observers can be
* updated. If it is important for observers to receive all updates, then this class should not be
* used.
*
* @param <T> The data type used for
* {@link Observable.Observer#onNewData(Object)}.
*/
@RequiresApi(21) // TODO(b/200306659): Remove and replace with annotation on package-info.java
public final class LiveDataObservable<T> implements Observable<T> {
@SuppressWarnings("WeakerAccess") /* synthetic accessor */
final MutableLiveData<Result<T>> mLiveData = new MutableLiveData<>();
@GuardedBy("mObservers")
private final Map<Observer<? super T>, LiveDataObserverAdapter<T>> mObservers = new HashMap<>();
/**
* Posts a new value to be used as the current value of this Observable.
*/
public void postValue(@Nullable T value) {
mLiveData.postValue(Result.fromValue(value));
}
/**
* Posts a new error to be used as the current error state of this Observable.
*/
public void postError(@NonNull Throwable error) {
mLiveData.postValue(Result.fromError(error));
}
/**
* Returns the underlying {@link LiveData} used to store and update {@link Result Results}.
*/
@NonNull
public LiveData<Result<T>> getLiveData() {
return mLiveData;
}
@NonNull
@Override
@SuppressWarnings("ObjectToString")
public ListenableFuture<T> fetchData() {
return CallbackToFutureAdapter.getFuture(completer -> {
CameraXExecutors.mainThreadExecutor().execute(() -> {
Result<T> result = mLiveData.getValue();
if (result == null) {
completer.setException(new IllegalStateException(
"Observable has not yet been initialized with a value."));
} else if (result.completedSuccessfully()) {
completer.set(result.getValue());
} else {
Preconditions.checkNotNull(result.getError());
completer.setException(result.getError());
}
});
return LiveDataObservable.this + " [fetch@" + SystemClock.uptimeMillis() + "]";
});
}
@Override
public void addObserver(@NonNull Executor executor, @NonNull Observer<? super T> observer) {
synchronized (mObservers) {
final LiveDataObserverAdapter<T> oldAdapter = mObservers.get(observer);
if (oldAdapter != null) {
oldAdapter.disable();
}
final LiveDataObserverAdapter<T> newAdapter = new LiveDataObserverAdapter<>(executor,
observer);
mObservers.put(observer, newAdapter);
CameraXExecutors.mainThreadExecutor().execute(() -> {
if (oldAdapter != null) {
mLiveData.removeObserver(oldAdapter);
}
mLiveData.observeForever(newAdapter);
});
}
}
@Override
public void removeObserver(@NonNull Observer<? super T> observer) {
synchronized (mObservers) {
LiveDataObserverAdapter<T> adapter = mObservers.remove(observer);
if (adapter != null) {
adapter.disable();
CameraXExecutors.mainThreadExecutor().execute(
() -> mLiveData.removeObserver(adapter));
}
}
}
/**
* A wrapper class that allows error reporting.
*
* A Result can contain either a value or an error, but not both.
*
* @param <T> The data type used for
* {@link Observable.Observer#onNewData(Object)}.
*/
public static final class Result<T> {
@Nullable
private final T mValue;
@Nullable
private final Throwable mError;
private Result(@Nullable T value, @Nullable Throwable error) {
mValue = value;
mError = error;
}
/**
* Creates a successful result that contains a value.
*/
static <T> Result<T> fromValue(@Nullable T value) {
return new Result<>(value, null);
}
/**
* Creates a failed result that contains an error.
*/
static <T> Result<T> fromError(@NonNull Throwable error) {
return new Result<>(null, Preconditions.checkNotNull(error));
}
/**
* Returns whether this result contains a value or an error.
*
* <p>A successful result will contain a value.
*/
public boolean completedSuccessfully() {
return mError == null;
}
/**
* Returns the value contained within this result.
*
* @throws IllegalStateException if the result contains an error rather than a value.
*/
@Nullable
public T getValue() {
if (!completedSuccessfully()) {
throw new IllegalStateException(
"Result contains an error. Does not contain a value.");
}
return mValue;
}
/**
* Returns the error contained within this result, or {@code null} if the result contains
* a value.
*/
@Nullable
public Throwable getError() {
return mError;
}
@Override
@NonNull
public String toString() {
return "[Result: <" + (completedSuccessfully() ? "Value: " + mValue :
"Error: " + mError) + ">]";
}
}
private static final class LiveDataObserverAdapter<T> implements
androidx.lifecycle.Observer<Result<T>> {
final AtomicBoolean mActive = new AtomicBoolean(true);
final Observer<? super T> mObserver;
final Executor mExecutor;
LiveDataObserverAdapter(@NonNull Executor executor, @NonNull Observer<? super T> observer) {
mExecutor = executor;
mObserver = observer;
}
void disable() {
mActive.set(false);
}
@Override
public void onChanged(@NonNull final Result<T> result) {
mExecutor.execute(() -> {
if (!mActive.get()) {
// Observer has been disabled.
return;
}
if (result.completedSuccessfully()) {
mObserver.onNewData(result.getValue());
} else {
Preconditions.checkNotNull(result.getError());
mObserver.onError(result.getError());
}
});
}
}
}