StateObservable.java

/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.camera.core.impl;

import androidx.annotation.GuardedBy;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.camera.core.impl.utils.futures.Futures;
import androidx.core.util.Preconditions;

import com.google.auto.value.AutoValue;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
 * An observable which reports a dynamic state.
 *
 * <p>The state of a state observable is conflated. That is, the value received by an
 * {@link androidx.camera.core.impl.Observable.Observer} will only be the latest state; some
 * state updates may never be observed if the state changes quickly enough.
 *
 * <p>State observables require an initial state, and thus always have a state available for
 * retrieval via {@link #fetchData()}, which will return an already-complete
 * {@link ListenableFuture}.
 *
 * <p>Errors are also possible as states, and when an error is present, any previous state
 * information is lost. State observables may transition in and out of error states at any time,
 * including the initial state.
 *
 * <p>All states, including errors, are conflated via {@link Object#equals(Object)}. That is, if
 * two states evaluate to {@code true}, it will be as if the state didn't change and no update
 * will be sent to observers.
 *
 * @param <T> The state type.
 */
public abstract class StateObservable<T> implements Observable<T> {
    private static final int INITIAL_VERSION = 0;

    private final Object mLock = new Object();
    private final AtomicReference<Object> mState;
    @GuardedBy("mLock")
    private int mVersion = INITIAL_VERSION;
    @GuardedBy("mLock")
    private boolean mUpdating = false;

    // Must be updated together under lock
    @GuardedBy("mLock")
    private final Map<Observer<? super T>, ObserverWrapper<T>> mWrapperMap = new HashMap<>();
    @GuardedBy("mLock")
    private final CopyOnWriteArraySet<ObserverWrapper<T>> mNotifySet = new CopyOnWriteArraySet<>();

    StateObservable(@Nullable Object initialState, boolean isError) {
        if (isError) {
            Preconditions.checkArgument(initialState instanceof Throwable, "Initial errors must "
                    + "be Throwable");
            mState = new AtomicReference<>(ErrorWrapper.wrap((Throwable) initialState));
        } else {
            mState = new AtomicReference<>(initialState);
        }

    }

    void updateState(@Nullable T state) {
        updateStateInternal(state);
    }

    void updateStateAsError(@NonNull Throwable error) {
        updateStateInternal(ErrorWrapper.wrap(error));
    }

    private void updateStateInternal(@Nullable Object newState) {
        Iterator<ObserverWrapper<T>> notifyIter;
        int currentVersion;
        synchronized (mLock) {
            Object oldState = mState.getAndSet(newState);
            // If new state is equal to old state, no need to do anything.
            if (Objects.equals(oldState, newState)) return;
            currentVersion = ++mVersion; // State was updated. Next version.
            if (mUpdating) return; // Already updating. New state will get used due to version bump.
            mUpdating = true;
            notifyIter = mNotifySet.iterator();
        }

        while (true) {
            // Update observers unlocked in case of direct executor.
            while (notifyIter.hasNext()) {
                notifyIter.next().update(currentVersion);
            }

            // Check if a new version was added while updating
            synchronized (mLock) {
                if (mVersion == currentVersion) {
                    // Updating complete. Break out.
                    mUpdating = false;
                    break;
                }

                // A new version was added. Update again on next loop.
                // Get a new iterator in case the observers changed during update.
                notifyIter = mNotifySet.iterator();
                currentVersion = mVersion;
            }
        }
    }

    /**
     * Fetch the latest state.
     *
     * <p>For state observables, the future returned by {@code fetchData()} is guaranteed to be
     * complete and will contain either the current state or an error state which will be thrown
     * as an exception from {@link ListenableFuture#get()}.
     *
     * @return A future which will contain the latest value or an error.
     */
    @SuppressWarnings("unchecked")
    @NonNull
    @Override
    public ListenableFuture<T> fetchData() {
        Object state = mState.get();
        if (state instanceof ErrorWrapper) {
            return Futures.immediateFailedFuture(((ErrorWrapper) state).getError());
        } else {
            return Futures.immediateFuture((T) state);
        }
    }

    @Override
    public void addObserver(@NonNull Executor executor, @NonNull Observer<? super T> observer) {
        ObserverWrapper<T> wrapper;
        synchronized (mLock) {
            // If observer is already registered, remove it. It will get notified again immediately.
            removeObserverLocked(observer);

            wrapper = new ObserverWrapper<>(mState, executor, observer);
            mWrapperMap.put(observer, wrapper);
            mNotifySet.add(wrapper);
        }

        // INITIAL_VERSION won't necessarily match the current tracked version constant, but it
        // will be the initial version this wrapper receives. Any future version updates will
        // always be higher than INITIAL_VERSION.
        wrapper.update(INITIAL_VERSION);
    }

    @Override
    public void removeObserver(@NonNull Observer<? super T> observer) {
        synchronized (mLock) {
            removeObserverLocked(observer);
        }
    }

    @GuardedBy("mLock")
    private void removeObserverLocked(@NonNull Observable.Observer<? super T> observer) {
        ObserverWrapper<T> wrapper = mWrapperMap.remove(observer);
        if (wrapper != null) {
            wrapper.close();
            mNotifySet.remove(wrapper);
        }
    }

    private static final class ObserverWrapper<T> implements Runnable {
        private static final Object NOT_SET = new Object();
        private static final int NO_VERSION = -1;

        private final Executor mExecutor;
        private final Observer<? super T> mObserver;
        private final AtomicBoolean mActive = new AtomicBoolean(true);
        private final AtomicReference<Object> mStateRef;

        // Since run() will always run sequentially, no need to lock for this variable.
        private Object mLastState = NOT_SET;
        @GuardedBy("this")
        private int mLatestSignalledVersion = NO_VERSION;
        @GuardedBy("this")
        private boolean mWrapperUpdating = false;

        ObserverWrapper(@NonNull AtomicReference<Object> stateRef, @NonNull Executor executor,
                @NonNull Observer<? super T> observer) {
            mStateRef = stateRef;
            mExecutor = executor;
            mObserver = observer;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void run() {
            Object newState;
            int currentVersion;
            synchronized (this) {
                // Only update if we're still active.
                if (!mActive.get()) {
                    mWrapperUpdating = false;
                    return;
                }
                // Get latest state.
                newState = mStateRef.get();
                currentVersion = mLatestSignalledVersion;
            }

            // Continue to check if we're active before updating
            while (true) {
                // Conflate notification using equality
                if (!Objects.equals(mLastState, newState)) {
                    mLastState = newState;
                    if (newState instanceof ErrorWrapper) {
                        mObserver.onError(((ErrorWrapper) newState).getError());
                    } else {
                        mObserver.onNewData((T) newState);
                    }
                }

                synchronized (this) {
                    if (currentVersion == mLatestSignalledVersion || !mActive.get()) {
                        // Updating complete or no longer active. Break out of update loop.
                        mWrapperUpdating = false;
                        break;
                    }

                    // Get state and version for next update.
                    newState = mStateRef.get();
                    currentVersion = mLatestSignalledVersion;
                }
            }
        }

        void update(int version) {
            synchronized (this) {
                // If no longer active, then don't attempt update.
                if (!mActive.get()) return;
                // No need to update (but this probably shouldn't happen anyways)
                if (version <= mLatestSignalledVersion) return;
                mLatestSignalledVersion = version;
                // No need to update if already updating. Version bump will cause update.
                if (mWrapperUpdating) return;
                mWrapperUpdating = true;
            }

            try {
                mExecutor.execute(this);
            } catch (Throwable t) {
                // Unable to notify due to state of Executor. The update is lost, but there's
                // not much we can do here since the executor rejected the update. Note this
                // may also mean that any updates which occurred while mWrapperUpdating ==
                // true will have also been lost.
                synchronized (this) {
                    // Update mWrapperUpdating so the next update can try again
                    mWrapperUpdating = false;
                }
            }
        }

        void close() {
            // Best effort cancellation. In progress updates will not be cancelled.
            mActive.set(false);
        }
    }

    @AutoValue
    abstract static class ErrorWrapper {
        @NonNull
        static ErrorWrapper wrap(@NonNull Throwable error) {
            return new AutoValue_StateObservable_ErrorWrapper(error);
        }

        @NonNull
        public abstract Throwable getError();
    }
}