CoroutineFileObserver.kt

/*
 * Copyright (C) 2023 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package androidx.test.services.shellexecutor

import android.os.Build
import android.os.FileObserver
import android.util.Log
import java.io.File
import java.util.concurrent.LinkedBlockingQueue
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.onClosed
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.runBlocking

/**
 * A FileObserver that is friendly with Kotlin coroutines.
 *
 * Note that the documentation on FileObserver is wrong: it doesn't see events from subdirectories.
 */
@Suppress("DEPRECATION") // the non-deprecated constructor needs API 29
open class CoroutineFileObserver(public val watch: File) :
  FileObserver(watch.toString(), FileObserver.ALL_EVENTS) {
  private data class Event(val event: Int, val file: File)

  private val eventChannel: EventChannel
  protected var logLevel: Int = 0 // by default, don't log at all; derived classes can override
  protected var logTag = "CoroutineFileObserver"

  init {
    // On API 21 and 22, about 1% of the time, Channel code will deadlock and, thirty seconds later,
    // crash the application with 'art/runtime/thread_list.cc:170] Thread suspend timeout'. In that
    // case, we resort to Java.
    if (
      Build.VERSION.SDK_INT < Build.VERSION_CODES.LOLLIPOP ||
        Build.VERSION.SDK_INT > Build.VERSION_CODES.LOLLIPOP_MR1
    ) {
      eventChannel = CoroutineEventChannel()
    } else {
      eventChannel = WorkaroundEventChannel()
    }
  }

  // This runs on a special FileObserver thread provided by Android.
  final override fun onEvent(event: Int, path: String?) {
    val file =
      when {
        path == null -> watch
        path.startsWith("/") -> File(path)
        else -> File(watch, path)
      }
    eventChannel.send(Event(event, file))
  }

  final fun stop(): Unit = eventChannel.stop()

  // Events are processed in order by run(). If you do nontrivial work in one of the handlers,
  // launch it in another job.
  final suspend fun run() {
    startWatching()
    try {
      onWatching()
      eventChannel.receive { event -> handleEvent(event) }
    } catch (x: Exception) {
      Log.w(logTag, "Error while processing events", x)
    } finally {
      log("stopWatching")
      stopWatching()
      log("stoppedWatching")
    }
  }

  private suspend fun handleEvent(event: Event) =
    when (event.event) {
      FileObserver.ACCESS -> onAccess(event.file)
      FileObserver.ATTRIB -> onAttrib(event.file)
      FileObserver.CLOSE_NOWRITE -> onCloseNoWrite(event.file)
      FileObserver.CLOSE_WRITE -> onCloseWrite(event.file)
      FileObserver.CREATE -> onCreate(event.file)
      FileObserver.DELETE -> onDelete(event.file)
      FileObserver.DELETE_SELF -> onDeleteSelf(event.file)
      FileObserver.MODIFY -> onModify(event.file)
      FileObserver.MOVED_FROM -> onMovedFrom(event.file)
      FileObserver.MOVED_TO -> onMovedTo(event.file)
      FileObserver.MOVE_SELF -> onMoveSelf(event.file)
      FileObserver.OPEN -> onOpen(event.file)
      else -> Unit
    }

  protected final fun log(message: String): Unit {
    if (logLevel >= Log.VERBOSE) Log.println(logLevel, logTag, message)
  }

  protected open suspend fun onAccess(file: File) = log("ACCESS $file")

  protected open suspend fun onAttrib(file: File) = log("ATTRIB $file")

  protected open suspend fun onCloseNoWrite(file: File) = log("CLOSE_NOWRITE $file")

  protected open suspend fun onCloseWrite(file: File) = log("CLOSE_WRITE $file")

  protected open suspend fun onCreate(file: File) = log("CREATE $file")

  protected open suspend fun onDelete(file: File) = log("DELETE $file")

  protected open suspend fun onDeleteSelf(file: File) = log("DELETE_SELF $file")

  protected open suspend fun onModify(file: File) = log("MODIFY $file")

  protected open suspend fun onMovedFrom(file: File) = log("MOVED_FROM $file")

  protected open suspend fun onMovedTo(file: File) = log("MOVED_TO $file")

  protected open suspend fun onMoveSelf(file: File) = log("MOVE_SELF $file")

  protected open suspend fun onOpen(file: File) = log("OPEN $file")

  /** Called in run() after startWatching(). Override as needed. */
  protected open fun onWatching() = Unit

  private companion object {
    private interface EventChannel {
      /** Send one event. */
      fun send(event: Event)

      /** Receive events until stop() is called. */
      suspend fun receive(handler: suspend (Event) -> Unit)

      /** Stops the channel. */
      fun stop()
    }

    private class CoroutineEventChannel : EventChannel {
      private val channel: Channel<Event> = Channel(Channel.UNLIMITED)

      override fun send(event: Event) {
        runBlocking {
          try {
            channel
              .trySendBlocking(event)
              .onFailure { t: Throwable? ->
                Log.w("CoroutineFileObserver", "Error while sending $event", t)
              }
              .onClosed { t: Throwable? ->
                Log.v("CoroutineFileObserver", "Event channel closed to $event", t)
              }
          } catch (x: InterruptedException) {
            Log.w("CoroutineFileObserver", "Interrupted while sending $event", x)
          }
        }
      }

      override suspend fun receive(handler: suspend (Event) -> Unit) {
        channel.receiveAsFlow().collect(handler)
      }

      override fun stop() {
        channel.close()
      }
    }

    private class WorkaroundEventChannel : EventChannel {
      private val queue = LinkedBlockingQueue<Event>()

      override fun send(event: Event) {
        queue.put(event)
      }

      override suspend fun receive(handler: suspend (Event) -> Unit) {
        while (true) {
          val event = queue.take()
          if (event.event < 0) return
          handler(event)
        }
      }

      override fun stop() {
        send(Event(-1, File(".")))
      }
    }
  }
}