/*
* Copyright 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package androidx.media3.exoplayer.rtsp;
import static androidx.media3.common.util.Assertions.checkNotNull;
import android.os.SystemClock;
import androidx.annotation.GuardedBy;
import androidx.annotation.Nullable;
import androidx.media3.common.C;
import androidx.media3.common.util.ParsableByteArray;
import androidx.media3.exoplayer.rtsp.reader.DefaultRtpPayloadReaderFactory;
import androidx.media3.exoplayer.rtsp.reader.RtpPayloadReader;
import androidx.media3.extractor.Extractor;
import androidx.media3.extractor.ExtractorInput;
import androidx.media3.extractor.ExtractorOutput;
import androidx.media3.extractor.PositionHolder;
import androidx.media3.extractor.SeekMap;
import java.io.IOException;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** Extracts data from RTP packets. */
/* package */ final class RtpExtractor implements Extractor {
private final RtpPayloadReader payloadReader;
private final ParsableByteArray rtpPacketScratchBuffer;
private final ParsableByteArray rtpPacketDataBuffer;
private final int trackId;
private final Object lock;
private final RtpPacketReorderingQueue reorderingQueue;
private @MonotonicNonNull ExtractorOutput output;
private boolean firstPacketRead;
private volatile long firstTimestamp;
private volatile int firstSequenceNumber;
@GuardedBy("lock")
private boolean isSeekPending;
@GuardedBy("lock")
private long nextRtpTimestamp;
@GuardedBy("lock")
private long playbackStartTimeUs;
public RtpExtractor(RtpPayloadFormat payloadFormat, int trackId) {
this.trackId = trackId;
payloadReader =
checkNotNull(new DefaultRtpPayloadReaderFactory().createPayloadReader(payloadFormat));
rtpPacketScratchBuffer = new ParsableByteArray(RtpPacket.MAX_SIZE);
rtpPacketDataBuffer = new ParsableByteArray();
lock = new Object();
reorderingQueue = new RtpPacketReorderingQueue();
firstTimestamp = C.TIME_UNSET;
firstSequenceNumber = C.INDEX_UNSET;
nextRtpTimestamp = C.TIME_UNSET;
playbackStartTimeUs = C.TIME_UNSET;
}
/** Sets the timestamp of the first RTP packet to arrive. */
public void setFirstTimestamp(long firstTimestamp) {
this.firstTimestamp = firstTimestamp;
}
/** Sets the sequence number of the first RTP packet to arrive. */
public void setFirstSequenceNumber(int firstSequenceNumber) {
this.firstSequenceNumber = firstSequenceNumber;
}
/** Returns whether the first RTP packet is processed. */
public boolean hasReadFirstRtpPacket() {
return firstPacketRead;
}
/**
* Signals when performing an RTSP seek that involves RTSP message exchange.
*
* <p>{@link #seek} must be called after a successful RTSP seek.
*
* <p>After this method in called, the incoming RTP packets are read from the {@link
* ExtractorInput}, but they are not further processed by the {@link RtpPayloadReader readers}.
*
* <p>The user must clear the {@link ExtractorOutput} after calling this method, to ensure no
* samples are written to {@link ExtractorOutput}.
*/
public void preSeek() {
synchronized (lock) {
isSeekPending = true;
}
}
@Override
public boolean sniff(ExtractorInput input) {
throw new UnsupportedOperationException(
"RTP packets are transmitted in a packet stream do not support sniffing.");
}
@Override
public void init(ExtractorOutput output) {
payloadReader.createTracks(output, trackId);
output.endTracks();
// RTP does not embed duration or seek info.
output.seekMap(new SeekMap.Unseekable(C.TIME_UNSET));
this.output = output;
}
@Override
public int read(ExtractorInput input, PositionHolder seekPosition) throws IOException {
checkNotNull(output); // Asserts init is called.
// Reads one RTP packet at a time.
int bytesRead = input.read(rtpPacketScratchBuffer.getData(), 0, RtpPacket.MAX_SIZE);
if (bytesRead == C.RESULT_END_OF_INPUT) {
return Extractor.RESULT_END_OF_INPUT;
} else if (bytesRead == 0) {
return Extractor.RESULT_CONTINUE;
}
rtpPacketScratchBuffer.setPosition(0);
rtpPacketScratchBuffer.setLimit(bytesRead);
@Nullable RtpPacket packet = RtpPacket.parse(rtpPacketScratchBuffer);
if (packet == null) {
return RESULT_CONTINUE;
}
long packetArrivalTimeMs = SystemClock.elapsedRealtime();
long packetCutoffTimeMs = getCutoffTimeMs(packetArrivalTimeMs);
reorderingQueue.offer(packet, packetArrivalTimeMs);
@Nullable RtpPacket dequeuedPacket = reorderingQueue.poll(packetCutoffTimeMs);
if (dequeuedPacket == null) {
// No packet is available for reading.
return RESULT_CONTINUE;
}
packet = dequeuedPacket;
if (!firstPacketRead) {
// firstTimestamp and firstSequenceNumber are transmitted over RTSP. There is no guarantee
// that they arrive before the RTP packets. We use whichever comes first.
if (firstTimestamp == C.TIME_UNSET) {
firstTimestamp = packet.timestamp;
}
if (firstSequenceNumber == C.INDEX_UNSET) {
firstSequenceNumber = packet.sequenceNumber;
}
payloadReader.onReceivingFirstPacket(firstTimestamp, firstSequenceNumber);
firstPacketRead = true;
}
synchronized (lock) {
// Ignores the incoming packets while seek is pending.
if (isSeekPending) {
if (nextRtpTimestamp != C.TIME_UNSET && playbackStartTimeUs != C.TIME_UNSET) {
reorderingQueue.reset();
payloadReader.seek(nextRtpTimestamp, playbackStartTimeUs);
isSeekPending = false;
nextRtpTimestamp = C.TIME_UNSET;
playbackStartTimeUs = C.TIME_UNSET;
}
} else {
do {
// Deplete the reordering queue as much as possible.
rtpPacketDataBuffer.reset(packet.payloadData);
payloadReader.consume(
rtpPacketDataBuffer, packet.timestamp, packet.sequenceNumber, packet.marker);
packet = reorderingQueue.poll(packetCutoffTimeMs);
} while (packet != null);
}
}
return RESULT_CONTINUE;
}
@Override
public void seek(long nextRtpTimestamp, long playbackStartTimeUs) {
synchronized (lock) {
this.nextRtpTimestamp = nextRtpTimestamp;
this.playbackStartTimeUs = playbackStartTimeUs;
}
}
@Override
public void release() {
// Do nothing.
}
/**
* Returns the cutoff time of waiting for an out-of-order packet.
*
* <p>Returns the cutoff time to pass to {@link RtpPacketReorderingQueue#poll(long)} based on the
* given RtpPacket arrival time.
*/
private static long getCutoffTimeMs(long packetArrivalTimeMs) {
// TODO(internal b/172331505) 30ms is roughly the time for one video frame. It is not rigorously
// chosen and will need fine tuning in the future.
return packetArrivalTimeMs - 30;
}
}