SharedByteBuffer.java

/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.camera.video.internal;

import androidx.annotation.GuardedBy;
import androidx.annotation.NonNull;
import androidx.annotation.RequiresApi;
import androidx.camera.core.Logger;
import androidx.core.util.Pair;
import androidx.core.util.Preconditions;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * A read-only wrapper for a {@link ByteBuffer} that may be shared between multiple clients.
 *
 * <p>Every {@code SharedByteBuffer} must be closed with {@link #close()} when no longer in use.
 * This will signal to other clients that it may be safe to reclaim or reuse the wrapped ByteBuffer.
 * Failure to call {@link #close()} may lead to memory leaks or create stalls in processing
 * pipelines.
 *
 * <p>The {@link ByteBuffer} returned by {@link #get()} is safe to read from as long as
 * {@link #close()} has not been called. Once {@link #close()} has been called, it should be
 * assumed the {@link ByteBuffer} contains invalid data.
 */
@RequiresApi(21) // TODO(b/200306659): Remove and replace with annotation on package-info.java
public final class SharedByteBuffer implements Closeable {

    private static final String TAG = "SharedByteBuffer";

    private final ByteBuffer mSharedByteBuffer;

    // Unique ID for the original SharedByteBuffer. Transferred to all share() instances.
    private final int mShareId;

    private final Object mCloseLock = new Object();

    private final Pair<Executor, Runnable> mFinalCloseAction;

    @GuardedBy("mCloseLock")
    private final AtomicInteger mSharedRefCount;

    @GuardedBy("mCloseLock")
    private boolean mClosed = false;

    /**
     * Creates an initial instance of SharedByteBuffer for sharing a {@link ByteBuffer} between
     * multiple clients.
     *
     * <p>The initial SharedByteBuffer holds the initial reference count. Each subsequent call to
     * {@link #share()} will increment the reference count. Once all instances generated by
     * {@link #share()} have been closed, and the initial instance has also been closed, the
     * provided {@code finalCloseAction} will run.
     *
     * @param sharedBuf The buffer to be wrapped.
     * @param closeActionExecutor The executor used to execute {@code finalCloseAction}.
     * @param finalCloseAction The action to take once all shared instances have been closed.
     *                         This action will only be run once the initial instance and all
     *                         instances generated by {@link #share()} have been closed. A common
     *                         {@code finalCloseAction} might be to return the ByteBuffer to a
     *                         pool so it can be reused.
     */
    @NonNull
    static SharedByteBuffer newSharedInstance(@NonNull ByteBuffer sharedBuf,
            @NonNull Executor closeActionExecutor, @NonNull Runnable finalCloseAction) {
        AtomicInteger sharedRefCount = new AtomicInteger(1);
        int shareId = System.identityHashCode(sharedBuf);
        ByteBuffer readOnlyBuf = Preconditions.checkNotNull(sharedBuf).asReadOnlyBuffer();
        return new SharedByteBuffer(readOnlyBuf, sharedRefCount,
                new Pair<>(Preconditions.checkNotNull(closeActionExecutor),
                        Preconditions.checkNotNull(finalCloseAction)), shareId);
    }

    /**
     * Creates a new instance of a SharedByteBuffer from an existing ByteBuffer and ref count.
     *
     * <p>This constructor must always be used with a pre-incremented {@code sharedRefCount}.
     */
    private SharedByteBuffer(@NonNull ByteBuffer sharedBuf, @NonNull AtomicInteger sharedRefCount,
            @NonNull Pair<Executor, Runnable> finalCloseAction, int shareId) {

        mSharedByteBuffer = sharedBuf;
        mSharedRefCount = sharedRefCount;
        mFinalCloseAction = finalCloseAction;
        mShareId = shareId;

        if (Logger.isDebugEnabled(TAG)) {
            int refCount = sharedRefCount.get();
            if (refCount < 1) {
                throw new AssertionError(String.format(Locale.US, "Cannot create new "
                        + "instance of SharedByteBuffer with invalid ref count %d. Ref count must "
                        + "be >= 1. [%s]", refCount, toString()));
            }
        }
    }

    /**
     * Creates a new instance of SharedByteBuffer which references the same underlying ByteBuffer.
     *
     * <p>This creates a new read-only instance of the same underlying {@link ByteBuffer} that
     * can be retrieved with {@link #get()}. While the two instances are independent, the new
     * instance's initial position, limit, and mark values will match those of this instance's
     * values at the time {@code share()} is called.
     */
    @NonNull
    SharedByteBuffer share() {
        int newRefCount;
        AtomicInteger sharedRefCountLocal;
        synchronized (mCloseLock) {
            checkNotClosed("share()");
            newRefCount = mSharedRefCount.incrementAndGet();
            sharedRefCountLocal = mSharedRefCount;
        }

        if (Logger.isDebugEnabled(TAG)) {
            if (newRefCount <= 1) {
                throw new AssertionError("Invalid ref count. share() should always produce "
                        + "a ref count of 2 or more.");
            }
            Logger.d(TAG, String.format(Locale.US, "Ref count incremented: %d [%s]",
                    newRefCount, toString()));
        }

        return new SharedByteBuffer(mSharedByteBuffer.asReadOnlyBuffer(), sharedRefCountLocal,
                mFinalCloseAction, mShareId);
    }

    /**
     * Closes this instance of SharedByteBuffer.
     *
     * <p>Once an instance is closed, {@link #get()} can no longer be used to retrieve the
     * buffer. If any reference to the buffer exists from a previous invocation of {@link #get()}
     * it should no longer be used.
     *
     * <p>Closing a SharedByteBuffer is idempotent. Only the first {@code close()} will have any
     * effect. Subsequent calls will be no-ops.
     *
     */
    @Override
    public void close() {
        closeInternal();
    }

    /**
     * Returns a read-only view into the {@link ByteBuffer} instance that is being shared.
     *
     *  <p>Once an instance is closed with {@link #close()}, this method can no longer be used to
     *  retrieve the buffer. Once closed, the object returned by this method should be assumed
     *  invalid and should no longer be used.
     *
     * @return the underlying shared {@link ByteBuffer}
     * @throws IllegalStateException if this SharedByteBuffer is closed.
     * @see #close()
     */
    @NonNull
    public ByteBuffer get() {
        synchronized (mCloseLock) {
            checkNotClosed("get()");
            return mSharedByteBuffer;
        }
    }

    @GuardedBy("mCloseLock")
    private void checkNotClosed(@NonNull String caller) {
        if (mClosed) {
            throw new IllegalStateException("Cannot call " + caller + " on a closed "
                    + "SharedByteBuffer.");
        }
    }

    @NonNull
    @Override
    public String toString() {
        return String.format(Locale.US, "SharedByteBuffer[buf: %s, shareId: 0x%x, instanceId:0x%x]",
                mSharedByteBuffer, mShareId, System.identityHashCode(this));
    }

    // Finalizer as a safety net. This will introduce a performance penalty on construction but it
    // is often critical that all SharedByteBuffer instances are closed, so we eat the penalty.
    @Override
    protected void finalize() throws Throwable {
        try {
            if (closeInternal()) {
                Logger.w(TAG, String.format(Locale.US,
                        "SharedByteBuffer closed by finalizer, but should have been closed "
                                + "manually with SharedByteBuffer.close() [%s]",
                        toString()));
            }
        } finally {
            super.finalize();
        }

    }

    // Closes the SharedByteBuffer and returns true if the buffer was not already closed. Returns
    // false if the SharedByteBuffer was already closed.
    private boolean closeInternal() {
        int newRefCount;
        synchronized (mCloseLock) {
            if (mClosed) {
                // close() is idempotent. Only initial close() should have any affect on ref count.
                return false;
            } else {
                mClosed = true;
            }
            newRefCount = mSharedRefCount.decrementAndGet();
        }

        if (Logger.isDebugEnabled(TAG)) {
            if (newRefCount < 0) {
                throw new AssertionError("Invalid ref count. close() should never produce a "
                        + "ref count below 0");
            }
            // Note there is no guarantee this log will be printed in order with other shared
            // instances log since there is no mutual exclusion around when logs are printed.
            // This may make it appear as if ref counts are being decremented out of order, but
            // that is not the case.
            Logger.d(TAG, String.format(Locale.US, "Ref count decremented: %d [%s]", newRefCount,
                    toString()));
        }

        if (newRefCount == 0) {
            if (Logger.isDebugEnabled(TAG)) {
                Logger.d(TAG, String.format(Locale.US, "Final reference released. Running final "
                        + "close action. [%s]", toString()));
            }

            try {
                Executor executor = Preconditions.checkNotNull(mFinalCloseAction.first);
                Runnable runnable = Preconditions.checkNotNull(mFinalCloseAction.second);
                executor.execute(runnable);
            } catch (RejectedExecutionException e) {
                Logger.e(TAG, String.format(Locale.US, "Unable to execute final close action. "
                                + "[%s]", toString()), e);
            }
        }

        return true;
    }
}