ShellCommandFileObserverClient.kt
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.test.services.shellexecutor
import java.io.File
import java.io.InputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.util.concurrent.Executors
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Semaphore
/**
* Client that sends requests to the ShellCommandFileObserverExecutorServer.
*
* This client is designed to be callable from Java.
*/
public final class ShellCommandFileObserverClient {
private val scope = CoroutineScope(Executors.newCachedThreadPool().asCoroutineDispatcher())
public final fun run(secret: String, message: Messages.Command): Execution {
val execution = Execution(File(secret), message)
execution.start()
return execution
}
public inner class Execution
internal constructor(val exchangeDir: File, val message: Messages.Command) {
private val messageWritten: Semaphore = Semaphore(1, 1)
private val client = Client(exchangeDir, messageWritten, message)
private lateinit var clientJob: Job
internal fun start() {
runBlocking { clientJob = scope.launch { client.run() } }
}
/** Blocks until the message has been written. */
public fun waitForMessageWritten() {
runBlocking { messageWritten.acquire() }
}
/** Standard method for obtaining the response. */
public fun await(): Messages.CommandResult {
runBlocking { clientJob.join() }
return client.result
}
/** Alternative method for compatibility with methods that expect only an InputStream. */
public fun asStream(): InputStream {
val output = PipedOutputStream()
val input = PipedInputStream(output)
runBlocking {
scope.launch {
clientJob.join()
output.use { it.write(client.result.stdout) }
}
}
return input
}
}
private inner class Client(
val exchangeDir: File,
val messageWritten: Semaphore,
val message: Messages.Command
) : CoroutineFileObserver(exchangeDir) {
private lateinit var response: File
public lateinit var result: Messages.CommandResult
init {
// Uncomment this line to see the event-level chatter.
// logLevel = Log.INFO
logTag = "${TAG}.Client"
}
override fun onWatching() {
// Wait to write the request file until we're sure we'll see the response.
response = FileObserverProtocol.writeRequestFile(exchangeDir, message)
// Make sure any interested parties are notified that we've finished creating the request.
runBlocking { messageWritten.release() }
}
override suspend fun onCloseWrite(file: File) {
super.onCloseWrite(file)
if (file != response) return
result = FileObserverProtocol.readResponseFile(response)
stop()
}
}
private companion object {
const val TAG = "SCFOC"
}
}