SimpleActor.kt

/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.datastore.core

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.onClosed
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.launch
import java.util.concurrent.atomic.AtomicInteger

internal class SimpleActor<T>(
    /**
     * The scope in which to consume messages.
     */
    private val scope: CoroutineScope,
    /**
     * Function that will be called when scope is cancelled. Should *not* throw exceptions.
     */
    onComplete: (Throwable?) -> Unit,
    /**
     * Function that will be called for each element when the scope is cancelled. Should *not*
     * throw exceptions.
     */
    onUndeliveredElement: (T, Throwable?) -> Unit,
    /**
     * Function that will be called once for each message.
     *
     * Must *not* throw an exception (other than CancellationException if scope is cancelled).
     */
    private val consumeMessage: suspend (T) -> Unit
) {
    private val messageQueue = Channel<T>(capacity = UNLIMITED)

    /**
     * Count of the number of remaining messages to process. When the messageQueue is closed,
     * this is no longer used.
     */
    private val remainingMessages = AtomicInteger(0)

    init {
        // If the scope doesn't have a job, it won't be cancelled, so we don't need to register a
        // callback.
        scope.coroutineContext[Job]?.invokeOnCompletion { ex ->
            onComplete(ex)

            // TODO(rohitsat): replace this with Channel(onUndeliveredElement) when it
            // is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/2435

            messageQueue.close(ex)

            while (true) {
                messageQueue.tryReceive().getOrNull()?.let { msg ->
                    onUndeliveredElement(msg, ex)
                } ?: break
            }
        }
    }

    /**
     * Sends a message to a message queue to be processed by [consumeMessage] in [scope].
     *
     * If [offer] completes successfully, the msg *will* be processed either by
     * consumeMessage or
     * onUndeliveredElement. If [offer] throws an exception, the message may or may not be
     * processed.
     */
    fun offer(msg: T) {
        /**
         * Possible states:
         * 1) remainingMessages = 0
         *   All messages have been consumed, so there is no active consumer
         * 2) remainingMessages > 0, no active consumer
         *   One of the senders is responsible for triggering the consumer
         * 3) remainingMessages > 0, active consumer
         *   Consumer will continue to consume until remainingMessages is 0
         * 4) messageQueue is closed, there are remaining messages to consume
         *   Attempts to offer messages will fail, onComplete() will consume remaining messages
         *   with onUndelivered. The Consumer has already completed since close() is called by
         *   onComplete().
         * 5) messageQueue is closed, there are no remaining messages to consume
         *   Attempts to offer messages will fail.
         */

        // should never return false bc the channel capacity is unlimited
        check(
            messageQueue.trySend(msg)
                .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
                .isSuccess
        )

        // If the number of remaining messages was 0, there is no active consumer, since it quits
        // consuming once remaining messages hits 0. We must kick off a new consumer.
        if (remainingMessages.getAndIncrement() == 0) {
            scope.launch {
                // We shouldn't have started a new consumer unless there are remaining messages...
                check(remainingMessages.get() > 0)

                do {
                    // We don't want to try to consume a new message unless we are still active.
                    // If ensureActive throws, the scope is no longer active, so it doesn't
                    // matter that we have remaining messages.
                    scope.ensureActive()

                    consumeMessage(messageQueue.receive())
                } while (remainingMessages.decrementAndGet() != 0)
            }
        }
    }
}