ObserverManager.java

/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.appsearch.localstorage;

import android.util.Log;

import androidx.annotation.GuardedBy;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.RestrictTo;
import androidx.appsearch.localstorage.util.PrefixUtil;
import androidx.appsearch.localstorage.visibilitystore.CallerAccess;
import androidx.appsearch.localstorage.visibilitystore.VisibilityChecker;
import androidx.appsearch.localstorage.visibilitystore.VisibilityStore;
import androidx.appsearch.localstorage.visibilitystore.VisibilityUtil;
import androidx.appsearch.observer.DocumentChangeInfo;
import androidx.appsearch.observer.ObserverCallback;
import androidx.appsearch.observer.ObserverSpec;
import androidx.appsearch.observer.SchemaChangeInfo;
import androidx.collection.ArrayMap;
import androidx.collection.ArraySet;
import androidx.core.util.ObjectsCompat;
import androidx.core.util.Preconditions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;

/**
 * Manages {@link ObserverCallback} instances and queues notifications to them for later
 * dispatch.
 *
 * <p>This class is thread-safe.
 *
 * @hide
 */
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)
public class ObserverManager {
    private static final String TAG = "AppSearchObserverManage";

    /** The combination of fields by which {@link DocumentChangeInfo} is grouped. */
    private static final class DocumentChangeGroupKey {
        final String mPackageName;
        final String mDatabaseName;
        final String mNamespace;
        final String mSchemaName;

        DocumentChangeGroupKey(
                @NonNull String packageName,
                @NonNull String databaseName,
                @NonNull String namespace,
                @NonNull String schemaName) {
            mPackageName = Preconditions.checkNotNull(packageName);
            mDatabaseName = Preconditions.checkNotNull(databaseName);
            mNamespace = Preconditions.checkNotNull(namespace);
            mSchemaName = Preconditions.checkNotNull(schemaName);
        }

        @Override
        public boolean equals(@Nullable Object o) {
            if (this == o) return true;
            if (!(o instanceof DocumentChangeGroupKey)) return false;
            DocumentChangeGroupKey that = (DocumentChangeGroupKey) o;
            return mPackageName.equals(that.mPackageName)
                    && mDatabaseName.equals(that.mDatabaseName)
                    && mNamespace.equals(that.mNamespace)
                    && mSchemaName.equals(that.mSchemaName);
        }

        @Override
        public int hashCode() {
            return ObjectsCompat.hash(mPackageName, mDatabaseName, mNamespace, mSchemaName);
        }
    }

    private static final class ObserverInfo {
        /** The package which registered the observer. */
        final CallerAccess mListeningPackageAccess;
        final ObserverSpec mObserverSpec;
        final Executor mExecutor;
        final ObserverCallback mObserverCallback;
        // Values is a set of document IDs
        volatile Map<DocumentChangeGroupKey, Set<String>> mDocumentChanges = new ArrayMap<>();
        // Keys are database prefixes, values are a set of schema names
        volatile Map<String, Set<String>> mSchemaChanges = new ArrayMap<>();

        ObserverInfo(
                @NonNull CallerAccess listeningPackageAccess,
                @NonNull ObserverSpec observerSpec,
                @NonNull Executor executor,
                @NonNull ObserverCallback observerCallback) {
            mListeningPackageAccess = Preconditions.checkNotNull(listeningPackageAccess);
            mObserverSpec = Preconditions.checkNotNull(observerSpec);
            mExecutor = Preconditions.checkNotNull(executor);
            mObserverCallback = Preconditions.checkNotNull(observerCallback);
        }
    }

    private final Object mLock = new Object();

    /** Maps target packages to ObserverInfos watching something in that package. */
    @GuardedBy("mLock")
    private final Map<String, List<ObserverInfo>> mObserversLocked = new ArrayMap<>();

    private volatile boolean mHasNotifications = false;

    /**
     * Adds an {@link ObserverCallback} to monitor changes within the databases owned by
     * {@code targetPackageName} if they match the given
     * {@link androidx.appsearch.observer.ObserverSpec}.
     *
     * <p>If the data owned by {@code targetPackageName} is not visible to you, the registration
     * call will succeed but no notifications will be dispatched. Notifications could start flowing
     * later if {@code targetPackageName} changes its schema visibility settings.
     *
     * <p>If no package matching {@code targetPackageName} exists on the system, the registration
     * call will succeed but no notifications will be dispatched. Notifications could start flowing
     * later if {@code targetPackageName} is installed and starts indexing data.
     *
     * <p>Note that this method does not take the standard read/write lock that guards I/O, so it
     * will not queue behind I/O. Therefore it is safe to call from any thread including UI or
     * binder threads.
     *
     * @param listeningPackageAccess Visibility information about the app that wants to receive
     *                               notifications.
     * @param targetPackageName      The package that owns the data the observerCallback wants to be
     *                               notified for.
     * @param spec                   Describes the kind of data changes the observerCallback should
     *                               trigger for.
     * @param executor               The executor on which to trigger the observerCallback callback
     *                               to deliver notifications.
     * @param observerCallback       The callback to trigger on notifications.
     */
    public void registerObserverCallback(
            @NonNull CallerAccess listeningPackageAccess,
            @NonNull String targetPackageName,
            @NonNull ObserverSpec spec,
            @NonNull Executor executor,
            @NonNull ObserverCallback observerCallback) {
        synchronized (mLock) {
            List<ObserverInfo> infos = mObserversLocked.get(targetPackageName);
            if (infos == null) {
                infos = new ArrayList<>();
                mObserversLocked.put(targetPackageName, infos);
            }
            infos.add(new ObserverInfo(listeningPackageAccess, spec, executor, observerCallback));
        }
    }

    /**
     * Removes all observers that match via {@link ObserverCallback#equals} to the given observer
     * from watching the targetPackageName.
     *
     * <p>Pending notifications queued for this observer, if any, are discarded.
     */
    public void unregisterObserverCallback(
            @NonNull String targetPackageName, @NonNull ObserverCallback observer) {
        synchronized (mLock) {
            List<ObserverInfo> infos = mObserversLocked.get(targetPackageName);
            if (infos == null) {
                return;
            }
            for (int i = 0; i < infos.size(); i++) {
                if (infos.get(i).mObserverCallback.equals(observer)) {
                    infos.remove(i);
                    i--;
                }
            }
        }
    }

    /**
     * Should be called when a change occurs to a document.
     *
     * <p>The notification will be queued in memory for later dispatch. You must call
     * {@link #dispatchAndClearPendingNotifications} to dispatch all such pending notifications.
     *
     * @param visibilityStore   Store for visibility information. If not provided, only access to
     *                          own data will be allowed.
     * @param visibilityChecker Checker for visibility access. If not provided, only access to own
     *                          data will be allowed.
     */
    public void onDocumentChange(
            @NonNull String packageName,
            @NonNull String databaseName,
            @NonNull String namespace,
            @NonNull String schemaType,
            @NonNull String documentId,
            @Nullable VisibilityStore visibilityStore,
            @Nullable VisibilityChecker visibilityChecker) {
        synchronized (mLock) {
            List<ObserverInfo> allObserverInfosForPackage = mObserversLocked.get(packageName);
            if (allObserverInfosForPackage == null || allObserverInfosForPackage.isEmpty()) {
                return; // No observers for this type
            }
            // Enqueue changes for later dispatch once the call returns
            String prefixedSchema =
                    PrefixUtil.createPrefix(packageName, databaseName) + schemaType;
            DocumentChangeGroupKey key = null;
            for (int i = 0; i < allObserverInfosForPackage.size(); i++) {
                ObserverInfo observerInfo = allObserverInfosForPackage.get(i);
                if (!matchesSpec(schemaType, observerInfo.mObserverSpec)) {
                    continue;  // Observer doesn't want this notification
                }
                if (!VisibilityUtil.isSchemaSearchableByCaller(
                        /*callerAccess=*/observerInfo.mListeningPackageAccess,
                        /*targetPackageName=*/packageName,
                        /*prefixedSchema=*/prefixedSchema,
                        visibilityStore,
                        visibilityChecker)) {
                    continue;  // Observer can't have this notification.
                }
                if (key == null) {
                    key = new DocumentChangeGroupKey(
                            packageName, databaseName, namespace, schemaType);
                }
                Set<String> changedDocumentIds = observerInfo.mDocumentChanges.get(key);
                if (changedDocumentIds == null) {
                    changedDocumentIds = new ArraySet<>();
                    observerInfo.mDocumentChanges.put(key, changedDocumentIds);
                }
                changedDocumentIds.add(documentId);
            }
            mHasNotifications = true;
        }
    }

    /**
     * Enqueues a change to a schema type for a single observer.
     *
     * <p>The notification will be queued in memory for later dispatch. You must call
     * {@link #dispatchAndClearPendingNotifications} to dispatch all such pending notifications.
     *
     * <p>Note that unlike {@link #onDocumentChange}, the changes reported here are not dropped
     * for observers that don't have visibility. This is because the observer might have had
     * visibility before the schema change, and a final deletion needs to be sent to it. Caller
     * is responsible for checking visibility of these notifications.
     *
     * @param listeningPackageName Name of package that subscribed to notifications and has been
     *                             validated by the caller to have the right access to receive
     *                             this notification.
     * @param targetPackageName Name of package that owns the changed schema types.
     * @param databaseName Database in which the changed schema types reside.
     * @param schemaName Unprefixed name of the changed schema type.
     */
    public void onSchemaChange(
            @NonNull String listeningPackageName,
            @NonNull String targetPackageName,
            @NonNull String databaseName,
            @NonNull String schemaName) {
        synchronized (mLock) {
            List<ObserverInfo> allObserverInfosForPackage = mObserversLocked.get(targetPackageName);
            if (allObserverInfosForPackage == null || allObserverInfosForPackage.isEmpty()) {
                return; // No observers for this type
            }
            // Enqueue changes for later dispatch once the call returns
            String prefix = null;
            for (int i = 0; i < allObserverInfosForPackage.size(); i++) {
                ObserverInfo observerInfo = allObserverInfosForPackage.get(i);
                if (!observerInfo.mListeningPackageAccess.getCallingPackageName()
                        .equals(listeningPackageName)) {
                    continue;  // Not the observer we've been requested to update right now.
                }
                if (!matchesSpec(schemaName, observerInfo.mObserverSpec)) {
                    continue;  // Observer doesn't want this notification
                }
                if (prefix == null) {
                    prefix = PrefixUtil.createPrefix(targetPackageName, databaseName);
                }
                Set<String> changedSchemaNames = observerInfo.mSchemaChanges.get(prefix);
                if (changedSchemaNames == null) {
                    changedSchemaNames = new ArraySet<>();
                    observerInfo.mSchemaChanges.put(prefix, changedSchemaNames);
                }
                changedSchemaNames.add(schemaName);
            }
            mHasNotifications = true;
        }
    }

    /** Returns whether there are any observers registered to watch the given package. */
    public boolean isPackageObserved(@NonNull String packageName) {
        synchronized (mLock) {
            return mObserversLocked.containsKey(packageName);
        }
    }

    /**
     * Returns whether there are any observers registered to watch the given package and
     * unprefixed schema type.
     */
    public boolean isSchemaTypeObserved(@NonNull String packageName, @NonNull String schemaType) {
        synchronized (mLock) {
            List<ObserverInfo> allObserverInfosForPackage = mObserversLocked.get(packageName);
            if (allObserverInfosForPackage == null) {
                return false;
            }
            for (int i = 0; i < allObserverInfosForPackage.size(); i++) {
                ObserverInfo observerInfo = allObserverInfosForPackage.get(i);
                if (matchesSpec(schemaType, observerInfo.mObserverSpec)) {
                    return true;
                }
            }
            return false;
        }
    }

    /**
     * Returns package names of listening packages registered for changes on the given
     * {@code packageName}, {@code databaseName} and unprefixed {@code schemaType}, only if they
     * have access to that type according to the provided {@code visibilityChecker}.
     */
    @NonNull
    public Set<String> getObserversForSchemaType(
            @NonNull String packageName,
            @NonNull String databaseName,
            @NonNull String schemaType,
            @Nullable VisibilityStore visibilityStore,
            @Nullable VisibilityChecker visibilityChecker) {
        synchronized (mLock) {
            List<ObserverInfo> allObserverInfosForPackage = mObserversLocked.get(packageName);
            if (allObserverInfosForPackage == null) {
                return Collections.emptySet();
            }
            Set<String> result = new ArraySet<>();
            String prefixedSchema = PrefixUtil.createPrefix(packageName, databaseName) + schemaType;
            for (int i = 0; i < allObserverInfosForPackage.size(); i++) {
                ObserverInfo observerInfo = allObserverInfosForPackage.get(i);
                if (!matchesSpec(schemaType, observerInfo.mObserverSpec)) {
                    continue;  // Observer doesn't want this notification
                }
                if (!VisibilityUtil.isSchemaSearchableByCaller(
                        /*callerAccess=*/observerInfo.mListeningPackageAccess,
                        /*targetPackageName=*/packageName,
                        /*prefixedSchema=*/prefixedSchema,
                        visibilityStore,
                        visibilityChecker)) {
                    continue;  // Observer can't have this notification.
                }
                result.add(observerInfo.mListeningPackageAccess.getCallingPackageName());
            }
            return result;
        }
    }

    /** Returns whether any notifications have been queued for dispatch. */
    public boolean hasNotifications() {
        return mHasNotifications;
    }

    /** Dispatches notifications on their corresponding executors. */
    public void dispatchAndClearPendingNotifications() {
        if (!mHasNotifications) {
            return;
        }
        synchronized (mLock) {
            if (mObserversLocked.isEmpty() || !mHasNotifications) {
                return;
            }
            for (List<ObserverInfo> observerInfos : mObserversLocked.values()) {
                for (int i = 0; i < observerInfos.size(); i++) {
                    dispatchAndClearPendingNotificationsLocked(observerInfos.get(i));
                }
            }
            mHasNotifications = false;
        }
    }

    /** Dispatches pending notifications for the given observerInfo and clears the pending list. */
    @GuardedBy("mLock")
    private void dispatchAndClearPendingNotificationsLocked(@NonNull ObserverInfo observerInfo) {
        // Get and clear the pending changes
        Map<String, Set<String>> schemaChanges = observerInfo.mSchemaChanges;
        Map<DocumentChangeGroupKey, Set<String>> documentChanges = observerInfo.mDocumentChanges;
        if (schemaChanges.isEmpty() && documentChanges.isEmpty()) {
            return;
        }
        if (!schemaChanges.isEmpty()) {
            observerInfo.mSchemaChanges = new ArrayMap<>();
        }
        if (!documentChanges.isEmpty()) {
            observerInfo.mDocumentChanges = new ArrayMap<>();
        }

        // Dispatch the pending changes
        observerInfo.mExecutor.execute(() -> {
            // Schema changes
            if (!schemaChanges.isEmpty()) {
                for (Map.Entry<String, Set<String>> entry : schemaChanges.entrySet()) {
                    SchemaChangeInfo schemaChangeInfo = new SchemaChangeInfo(
                            /*packageName=*/PrefixUtil.getPackageName(entry.getKey()),
                            /*databaseName=*/PrefixUtil.getDatabaseName(entry.getKey()),
                            /*changedSchemaNames=*/entry.getValue());

                    try {
                        observerInfo.mObserverCallback.onSchemaChanged(schemaChangeInfo);
                    } catch (Throwable t) {
                        Log.w(TAG, "ObserverCallback threw exception during dispatch", t);
                    }
                }
            }

            // Document changes
            if (!documentChanges.isEmpty()) {
                for (Map.Entry<DocumentChangeGroupKey, Set<String>> entry
                        : documentChanges.entrySet()) {
                    DocumentChangeInfo documentChangeInfo = new DocumentChangeInfo(
                            entry.getKey().mPackageName,
                            entry.getKey().mDatabaseName,
                            entry.getKey().mNamespace,
                            entry.getKey().mSchemaName,
                            entry.getValue());

                    try {
                        observerInfo.mObserverCallback.onDocumentChanged(documentChangeInfo);
                    } catch (Throwable t) {
                        Log.w(TAG, "ObserverCallback threw exception during dispatch", t);
                    }
                }
            }
        });
    }

    /**
     * Checks whether a change in the given {@code databaseName}, {@code namespace} and
     * {@code schemaType} passes all the filters defined in the given {@code observerSpec}.
     *
     * <p>Note that this method does not check packageName; you must only use it to check
     * observerSpecs which you know are observing the same package as the change.
     */
    private static boolean matchesSpec(
            @NonNull String schemaType, @NonNull ObserverSpec observerSpec) {
        Set<String> schemaFilters = observerSpec.getFilterSchemas();
        return schemaFilters.isEmpty() || schemaFilters.contains(schemaType);
    }
}