AudioProcessingPipeline.java
/*
* Copyright 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.media3.common.audio;
import static androidx.media3.common.audio.AudioProcessor.EMPTY_BUFFER;
import static androidx.media3.common.util.Assertions.checkState;
import androidx.annotation.Nullable;
import androidx.media3.common.audio.AudioProcessor.AudioFormat;
import androidx.media3.common.util.UnstableApi;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
/**
* Handles passing buffers through multiple {@link AudioProcessor} instances.
*
* <p>Two instances of {@link AudioProcessingPipeline} are considered {@linkplain #equals(Object)
* equal} if they have the same underlying {@link AudioProcessor} references, in the same order.
*
* <p>To make use of this class, the caller must:
*
* <ul>
* <li>Initialize an instance, passing in all audio processors that may be used for processing.
* <li>Call {@link #configure(AudioFormat)} with the {@link AudioFormat} of the input data. This
* method will give back the {@link AudioFormat} that will be output from the pipeline when
* this configuration is in use.
* <li>Call {@link #flush()} to apply the pending configuration.
* <li>Check if the pipeline {@link #isOperational()}. If not, then the pipeline can not be used
* to process buffers in the current configuration. This is because none of the underlying
* {@link AudioProcessor} instances are {@linkplain AudioProcessor#isActive active}.
* <li>If the pipeline {@link #isOperational()}, {@link #queueInput(ByteBuffer)} then {@link
* #getOutput()} to process buffers.
* <li>{@link #queueEndOfStream()} to inform the pipeline the current input stream is at an end.
* <li>Repeatedly call {@link #getOutput()} and handle those buffers until {@link #isEnded()}
* returns true.
* <li>When finished with the pipeline, call {@link #reset()} to release underlying resources.
* </ul>
*
* <p>If underlying {@link AudioProcessor} instances have pending configuration changes, or the
* {@link AudioFormat} of the input is changing:
*
* <ul>
* <li>Call {@link #configure(AudioFormat)} to configure the pipeline for the new input stream.
* You can still {@link #queueInput(ByteBuffer)} and {@link #getOutput()} in the old setup at
* this time.
* <li>{@link #queueEndOfStream()} to inform the pipeline the current input stream is at an end.
* <li>Repeatedly call {@link #getOutput()} until {@link #isEnded()} returns true.
* <li>Call {@link #flush()} to apply the new configuration and flush the pipeline.
* <li>Begin {@linkplain #queueInput(ByteBuffer) queuing input} and handling the {@linkplain
* #getOutput() output} in the new configuration.
* </ul>
*/
@UnstableApi
public final class AudioProcessingPipeline {
/** The {@link AudioProcessor} instances passed to {@link AudioProcessingPipeline}. */
private final ImmutableList<AudioProcessor> audioProcessors;
/**
* The processors that are {@linkplain AudioProcessor#isActive() active} based on the current
* configuration.
*/
private final List<AudioProcessor> activeAudioProcessors;
/**
* The buffers output by the {@link #activeAudioProcessors}. This has the same number of elements
* as {@link #activeAudioProcessors}.
*/
private ByteBuffer[] outputBuffers;
/** The {@link AudioFormat} currently being output by the pipeline. */
private AudioFormat outputAudioFormat;
/** The {@link AudioFormat} that will be output following a {@link #flush()}. */
private AudioFormat pendingOutputAudioFormat;
/** Whether input has ended, either due to configuration change or end of stream. */
private boolean inputEnded;
/**
* Creates an instance.
*
* @param audioProcessors The {@link AudioProcessor} instances to be used for processing buffers.
*/
public AudioProcessingPipeline(ImmutableList<AudioProcessor> audioProcessors) {
this.audioProcessors = audioProcessors;
activeAudioProcessors = new ArrayList<>();
outputBuffers = new ByteBuffer[0];
outputAudioFormat = AudioFormat.NOT_SET;
pendingOutputAudioFormat = AudioFormat.NOT_SET;
inputEnded = false;
}
/**
* Configures the pipeline to process input audio with the specified format. Returns the
* configured output audio format.
*
* <p>To apply the new configuration for use, the pipeline must be {@linkplain #flush() flushed}.
* Before applying the new configuration, it is safe to queue input and get output in the old
* input/output formats/configuration. Call {@link #queueEndOfStream()} when no more input will be
* supplied for processing in the old configuration.
*
* @param inputAudioFormat The format of audio that will be queued after the next call to {@link
* #flush()}.
* @return The configured output audio format.
* @throws AudioProcessor.UnhandledAudioFormatException If the specified format is not supported
* by the pipeline.
*/
@CanIgnoreReturnValue
public AudioFormat configure(AudioFormat inputAudioFormat)
throws AudioProcessor.UnhandledAudioFormatException {
if (inputAudioFormat.equals(AudioFormat.NOT_SET)) {
throw new AudioProcessor.UnhandledAudioFormatException(inputAudioFormat);
}
AudioFormat intermediateAudioFormat = inputAudioFormat;
for (int i = 0; i < audioProcessors.size(); i++) {
AudioProcessor audioProcessor = audioProcessors.get(i);
AudioFormat nextFormat = audioProcessor.configure(intermediateAudioFormat);
if (audioProcessor.isActive()) {
checkState(!nextFormat.equals(AudioFormat.NOT_SET));
intermediateAudioFormat = nextFormat;
}
}
return pendingOutputAudioFormat = intermediateAudioFormat;
}
/**
* Clears any buffered data and pending output. If any underlying audio processors are {@linkplain
* AudioProcessor#isActive() active}, this also prepares them to receive a new stream of input in
* the last {@linkplain #configure(AudioFormat) configured} (pending) format.
*
* <p>{@link #configure(AudioFormat)} must have been called at least once since the last call to
* {@link #reset()} before calling this.
*/
public void flush() {
activeAudioProcessors.clear();
outputAudioFormat = pendingOutputAudioFormat;
inputEnded = false;
for (int i = 0; i < audioProcessors.size(); i++) {
AudioProcessor audioProcessor = audioProcessors.get(i);
audioProcessor.flush();
if (audioProcessor.isActive()) {
activeAudioProcessors.add(audioProcessor);
}
}
outputBuffers = new ByteBuffer[activeAudioProcessors.size()];
for (int i = 0; i <= getFinalOutputBufferIndex(); i++) {
outputBuffers[i] = activeAudioProcessors.get(i).getOutput();
}
}
/**
* Returns the {@link AudioFormat} of data being output through {@link #getOutput()}.
*
* @return The {@link AudioFormat} currently being output, or {@link AudioFormat#NOT_SET} if no
* {@linkplain #configure(AudioFormat) configuration} has been {@linkplain #flush() applied}.
*/
public AudioFormat getOutputAudioFormat() {
return outputAudioFormat;
}
/**
* Whether the pipeline can be used for processing buffers.
*
* <p>For this to happen the pipeline must be {@linkplain #configure(AudioFormat) configured},
* {@linkplain #flush() flushed} and have {@linkplain AudioProcessor#isActive() active}
* {@linkplain AudioProcessor underlying audio processors} that are ready to process buffers with
* the current configuration.
*/
public boolean isOperational() {
return !activeAudioProcessors.isEmpty();
}
/**
* Queues audio data between the position and limit of the {@code inputBuffer} for processing.
* After calling this method, processed output may be available via {@link #getOutput()}.
*
* @param inputBuffer The input buffer to process. It must be a direct {@link ByteBuffer} with
* native byte order. Its contents are treated as read-only. Its position will be advanced by
* the number of bytes consumed (which may be zero). The caller retains ownership of the
* provided buffer.
*/
public void queueInput(ByteBuffer inputBuffer) {
if (!isOperational() || inputEnded) {
return;
}
processData(inputBuffer);
}
/**
* Returns a {@link ByteBuffer} containing processed output data between its position and limit.
* The buffer will be empty if no output is available.
*
* <p>Buffers returned from this method are retained by pipeline, and it is necessary to consume
* the data (or copy it into another buffer) to allow the pipeline to progress.
*
* @return A buffer containing processed output data between its position and limit.
*/
public ByteBuffer getOutput() {
if (!isOperational()) {
return EMPTY_BUFFER;
}
ByteBuffer outputBuffer = outputBuffers[getFinalOutputBufferIndex()];
if (outputBuffer.hasRemaining()) {
return outputBuffer;
}
processData(EMPTY_BUFFER);
return outputBuffers[getFinalOutputBufferIndex()];
}
/**
* Queues an end of stream signal. After this method has been called, {@link
* #queueInput(ByteBuffer)} should not be called until after the next call to {@link #flush()}.
* Calling {@link #getOutput()} will return any remaining output data. Multiple calls may be
* required to read all of the remaining output data. {@link #isEnded()} will return {@code true}
* once all remaining output data has been read.
*/
public void queueEndOfStream() {
if (!isOperational() || inputEnded) {
return;
}
inputEnded = true;
activeAudioProcessors.get(0).queueEndOfStream();
}
/**
* Returns whether the pipeline has ended.
*
* <p>The pipeline is considered ended when:
*
* <ul>
* <li>End of stream has been {@linkplain #queueEndOfStream() queued}.
* <li>Every {@linkplain #queueInput(ByteBuffer) input buffer} has been processed.
* <li>Every {@linkplain #getOutput() output buffer} has been fully consumed.
* </ul>
*/
public boolean isEnded() {
return inputEnded
&& activeAudioProcessors.get(getFinalOutputBufferIndex()).isEnded()
&& !outputBuffers[getFinalOutputBufferIndex()].hasRemaining();
}
/**
* Resets the pipeline and its underlying {@link AudioProcessor} instances to their unconfigured
* state, releasing any resources.
*/
public void reset() {
for (int i = 0; i < audioProcessors.size(); i++) {
AudioProcessor audioProcessor = audioProcessors.get(i);
audioProcessor.flush();
audioProcessor.reset();
}
outputBuffers = new ByteBuffer[0];
outputAudioFormat = AudioFormat.NOT_SET;
pendingOutputAudioFormat = AudioFormat.NOT_SET;
inputEnded = false;
}
/**
* Indicates whether some other object is "equal to" this one.
*
* <p>Two instances of {@link AudioProcessingPipeline} are considered equal if they have the same
* underlying {@link AudioProcessor} references in the same order.
*/
@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AudioProcessingPipeline)) {
return false;
}
AudioProcessingPipeline that = (AudioProcessingPipeline) o;
if (this.audioProcessors.size() != that.audioProcessors.size()) {
return false;
}
for (int i = 0; i < this.audioProcessors.size(); i++) {
if (this.audioProcessors.get(i) != that.audioProcessors.get(i)) {
return false;
}
}
return true;
}
@Override
public int hashCode() {
return audioProcessors.hashCode();
}
private void processData(ByteBuffer inputBuffer) {
boolean progressMade = true;
while (progressMade) {
progressMade = false;
for (int index = 0; index <= getFinalOutputBufferIndex(); index++) {
if (outputBuffers[index].hasRemaining()) {
// Processor at this index has output that has not been consumed. Do not queue input.
continue;
}
AudioProcessor audioProcessor = activeAudioProcessors.get(index);
if (audioProcessor.isEnded()) {
if (!outputBuffers[index].hasRemaining() && index < getFinalOutputBufferIndex()) {
activeAudioProcessors.get(index + 1).queueEndOfStream();
}
continue;
}
ByteBuffer input =
index > 0
? outputBuffers[index - 1]
: inputBuffer.hasRemaining() ? inputBuffer : EMPTY_BUFFER;
long inputBytes = input.remaining();
audioProcessor.queueInput(input);
outputBuffers[index] = audioProcessor.getOutput();
progressMade |= (inputBytes - input.remaining()) > 0 || outputBuffers[index].hasRemaining();
}
}
}
private int getFinalOutputBufferIndex() {
return outputBuffers.length - 1;
}
}