RtspMessageChannel.java

/*
 * Copyright 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package androidx.media3.exoplayer.rtsp;

import static androidx.media3.common.util.Assertions.checkArgument;
import static androidx.media3.common.util.Assertions.checkState;
import static androidx.media3.common.util.Assertions.checkStateNotNull;
import static androidx.media3.exoplayer.rtsp.RtspMessageUtil.isRtspStartLine;
import static java.lang.annotation.ElementType.TYPE_USE;

import android.os.Handler;
import android.os.HandlerThread;
import androidx.annotation.IntDef;
import androidx.annotation.Nullable;
import androidx.media3.common.C;
import androidx.media3.common.ParserException;
import androidx.media3.common.util.UnstableApi;
import androidx.media3.exoplayer.upstream.Loader;
import androidx.media3.exoplayer.upstream.Loader.LoadErrorAction;
import androidx.media3.exoplayer.upstream.Loader.Loadable;
import com.google.common.base.Ascii;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

/** Sends and receives RTSP messages. */
@UnstableApi
/* package */ final class RtspMessageChannel implements Closeable {

  /** RTSP uses UTF-8 (RFC2326 Section 1.1). */
  public static final Charset CHARSET = Charsets.UTF_8;

  private static final String TAG = "RtspMessageChannel";

  /** A listener for received RTSP messages and possible failures. */
  public interface MessageListener {

    /**
     * Called when an RTSP message is received.
     *
     * @param message The non-empty list of received lines, with line terminators removed.
     */
    void onRtspMessageReceived(List<String> message);

    /**
     * Called when failed to send an RTSP message.
     *
     * @param message The list of lines making up the RTSP message that is failed to send.
     * @param e The thrown {@link Exception}.
     */
    default void onSendingFailed(List<String> message, Exception e) {}

    /**
     * Called when failed to receive an RTSP message.
     *
     * @param e The thrown {@link Exception}.
     */
    default void onReceivingFailed(Exception e) {}
  }

  /** A listener for received interleaved binary data from RTSP. */
  public interface InterleavedBinaryDataListener {

    /**
     * Called when interleaved binary data is received on RTSP.
     *
     * @param data The received binary data. The byte array will not be reused by {@link
     *     RtspMessageChannel}, and will always be full.
     */
    void onInterleavedBinaryDataReceived(byte[] data);
  }

  /**
   * The IANA-registered default port for RTSP. See <a
   * href="https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml">here</a>
   */
  public static final int DEFAULT_RTSP_PORT = 554;

  private final MessageListener messageListener;
  private final Loader receiverLoader;
  private final Map<Integer, InterleavedBinaryDataListener> interleavedBinaryDataListeners;
  private @MonotonicNonNull Sender sender;
  private @MonotonicNonNull Socket socket;

  private volatile boolean closed;

  /**
   * Constructs a new instance.
   *
   * <p>A connected {@link Socket} must be provided in {@link #open} in order to send and receive
   * RTSP messages. {@link #close} must be called when done, which would also close the socket.
   *
   * <p>{@link MessageListener} and {@link InterleavedBinaryDataListener} implementations must not
   * make assumptions about which thread called their listener methods; and must be thread-safe.
   *
   * <p>Note: all method invocations must be made from the thread on which this class is created.
   *
   * @param messageListener The {@link MessageListener} to receive events.
   */
  public RtspMessageChannel(MessageListener messageListener) {
    this.messageListener = messageListener;
    this.receiverLoader = new Loader("ExoPlayer:RtspMessageChannel:ReceiverLoader");
    this.interleavedBinaryDataListeners = Collections.synchronizedMap(new HashMap<>());
  }

  /**
   * Opens the message channel to send and receive RTSP messages.
   *
   * <p>Note: If an {@link IOException} is thrown, callers must still call {@link #close()} to
   * ensure that any partial effects of the invocation are cleaned up.
   *
   * @param socket A connected {@link Socket}.
   */
  public void open(Socket socket) throws IOException {
    this.socket = socket;
    sender = new Sender(socket.getOutputStream());

    receiverLoader.startLoading(
        new Receiver(socket.getInputStream()),
        new LoaderCallbackImpl(),
        /* defaultMinRetryCount= */ 0);
  }

  /**
   * Closes the RTSP message channel.
   *
   * <p>The closed instance must not be re-opened again. The {@link MessageListener} will not
   * receive further messages after closing.
   *
   * @throws IOException If an error occurs closing the message channel.
   */
  @Override
  public void close() throws IOException {
    // TODO(internal b/172331505) Make sure most resources are closed before throwing, and close()
    // can be called again to close the resources that are still open.
    if (closed) {
      return;
    }
    try {
      if (sender != null) {
        sender.close();
      }
      receiverLoader.release();

      if (socket != null) {
        socket.close();
      }
    } finally {
      closed = true;
    }
  }

  /**
   * Sends a serialized RTSP message.
   *
   * @param message The list of strings representing the serialized RTSP message.
   */
  public void send(List<String> message) {
    checkStateNotNull(sender);
    sender.send(message);
  }

  /**
   * Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data.
   *
   * <p>The listener method {@link InterleavedBinaryDataListener#onInterleavedBinaryDataReceived} is
   * called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages.
   */
  public void registerInterleavedBinaryDataListener(
      int channel, InterleavedBinaryDataListener listener) {
    interleavedBinaryDataListeners.put(channel, listener);
  }

  private final class Sender implements Closeable {

    private final OutputStream outputStream;
    private final HandlerThread senderThread;
    private final Handler senderThreadHandler;

    /**
     * Creates a new instance.
     *
     * @param outputStream The {@link OutputStream} of the opened RTSP {@link Socket}, to which the
     *     request is sent. The caller needs to close the {@link OutputStream}.
     */
    public Sender(OutputStream outputStream) {
      this.outputStream = outputStream;
      this.senderThread = new HandlerThread("ExoPlayer:RtspMessageChannel:Sender");
      this.senderThread.start();
      this.senderThreadHandler = new Handler(this.senderThread.getLooper());
    }

    /**
     * Sends out RTSP messages that are in the forms of lists of strings.
     *
     * <p>If {@link Exception} is thrown while sending, the message {@link
     * MessageListener#onSendingFailed} is dispatched to the thread that created the {@link
     * RtspMessageChannel}.
     *
     * @param message The must of strings representing the serialized RTSP message.
     */
    public void send(List<String> message) {
      byte[] data = RtspMessageUtil.convertMessageToByteArray(message);
      senderThreadHandler.post(
          () -> {
            try {
              outputStream.write(data);
            } catch (Exception e) {
              if (!closed) {
                messageListener.onSendingFailed(message, e);
              }
            }
          });
    }

    @Override
    public void close() {
      senderThreadHandler.post(senderThread::quit);
      try {
        // Waits until all the messages posted to the sender thread are handled.
        senderThread.join();
      } catch (InterruptedException e) {
        senderThread.interrupt();
      }
    }
  }

  /** A {@link Loadable} for receiving RTSP responses. */
  private final class Receiver implements Loadable {

    /** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */
    private static final byte INTERLEAVED_MESSAGE_MARKER = '$';

    private final DataInputStream dataInputStream;
    private final MessageParser messageParser;
    private volatile boolean loadCanceled;

    /**
     * Creates a new instance.
     *
     * @param inputStream The {@link InputStream} of the opened RTSP {@link Socket}, from which the
     *     {@link RtspResponse RtspResponses} are received. The caller needs to close the {@link
     *     InputStream}.
     */
    public Receiver(InputStream inputStream) {
      dataInputStream = new DataInputStream(inputStream);
      messageParser = new MessageParser();
    }

    @Override
    public void cancelLoad() {
      loadCanceled = true;
    }

    @Override
    public void load() throws IOException {
      while (!loadCanceled) {
        byte firstByte = dataInputStream.readByte();
        if (firstByte == INTERLEAVED_MESSAGE_MARKER) {
          handleInterleavedBinaryData();
        } else {
          handleRtspMessage(firstByte);
        }
      }
    }

    /** Handles an entire RTSP message. */
    private void handleRtspMessage(byte firstByte) throws IOException {
      if (!closed) {
        messageListener.onRtspMessageReceived(messageParser.parseNext(firstByte, dataInputStream));
      }
    }

    private void handleInterleavedBinaryData() throws IOException {
      int channel = dataInputStream.readUnsignedByte();
      int size = dataInputStream.readUnsignedShort();
      byte[] data = new byte[size];
      dataInputStream.readFully(data, /* off= */ 0, size);

      @Nullable
      InterleavedBinaryDataListener listener = interleavedBinaryDataListeners.get(channel);
      if (listener != null && !closed) {
        listener.onInterleavedBinaryDataReceived(data);
      }
    }
  }

  private final class LoaderCallbackImpl implements Loader.Callback<Receiver> {
    @Override
    public void onLoadCompleted(Receiver loadable, long elapsedRealtimeMs, long loadDurationMs) {}

    @Override
    public void onLoadCanceled(
        Receiver loadable, long elapsedRealtimeMs, long loadDurationMs, boolean released) {}

    @Override
    public LoadErrorAction onLoadError(
        Receiver loadable,
        long elapsedRealtimeMs,
        long loadDurationMs,
        IOException error,
        int errorCount) {
      if (!closed) {
        messageListener.onReceivingFailed(error);
      }
      return Loader.DONT_RETRY;
    }
  }

  /** Processes RTSP messages line-by-line. */
  private static final class MessageParser {

    @Documented
    @Retention(RetentionPolicy.SOURCE)
    @Target(TYPE_USE)
    @IntDef({STATE_READING_FIRST_LINE, STATE_READING_HEADER, STATE_READING_BODY})
    @interface ReadingState {}

    private static final int STATE_READING_FIRST_LINE = 1;
    private static final int STATE_READING_HEADER = 2;
    private static final int STATE_READING_BODY = 3;

    private final List<String> messageLines;

    private @ReadingState int state;
    private long messageBodyLength;

    /** Creates a new instance. */
    public MessageParser() {
      messageLines = new ArrayList<>();
      state = STATE_READING_FIRST_LINE;
    }

    /**
     * Receives and parses an entire RTSP message.
     *
     * @param firstByte The first byte received for the RTSP message.
     * @param dataInputStream The {@link DataInputStream} on which RTSP messages are received.
     * @return An {@link ImmutableList} of the lines that make up an RTSP message.
     */
    public ImmutableList<String> parseNext(byte firstByte, DataInputStream dataInputStream)
        throws IOException {
      @Nullable
      ImmutableList<String> parsedMessageLines =
          addMessageLine(parseNextLine(firstByte, dataInputStream));

      while (parsedMessageLines == null) {
        if (state == STATE_READING_BODY) {
          if (messageBodyLength > 0) {
            // Message body's format is not regulated under RTSP, so it could use LF (instead of
            // RTSP's CRLF) as line ending. The length of the message body is included in the RTSP
            // Content-Length header.
            // Assume the message body length is within a 32-bit integer.
            int messageBodyLengthInt = Ints.checkedCast(messageBodyLength);
            checkState(messageBodyLengthInt != C.LENGTH_UNSET);
            byte[] messageBodyBytes = new byte[messageBodyLengthInt];
            dataInputStream.readFully(messageBodyBytes, /* off= */ 0, messageBodyLengthInt);
            parsedMessageLines = addMessageBody(messageBodyBytes);
          } else {
            throw new IllegalStateException("Expects a greater than zero Content-Length.");
          }
        } else {
          parsedMessageLines =
              addMessageLine(parseNextLine(dataInputStream.readByte(), dataInputStream));
        }
      }
      return parsedMessageLines;
    }

    /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
    private static byte[] parseNextLine(byte firstByte, DataInputStream dataInputStream)
        throws IOException {
      ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream();

      byte[] peekedBytes = new byte[2];
      peekedBytes[0] = firstByte;
      peekedBytes[1] = dataInputStream.readByte();
      messageByteStream.write(peekedBytes);

      while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) {
        // Shift the CRLF buffer.
        peekedBytes[0] = peekedBytes[1];
        peekedBytes[1] = dataInputStream.readByte();
        messageByteStream.write(peekedBytes[1]);
      }

      return messageByteStream.toByteArray();
    }

    /**
     * Returns a list of completed RTSP message lines, without the CRLF line terminators; or {@code
     * null} if the message is not yet complete.
     */
    @Nullable
    private ImmutableList<String> addMessageLine(byte[] lineBytes) throws ParserException {
      // Trim CRLF. RTSP lists are terminated by a CRLF.
      checkArgument(
          lineBytes.length >= 2
              && lineBytes[lineBytes.length - 2] == Ascii.CR
              && lineBytes[lineBytes.length - 1] == Ascii.LF);
      String line =
          new String(lineBytes, /* offset= */ 0, /* length= */ lineBytes.length - 2, CHARSET);
      messageLines.add(line);

      switch (state) {
        case STATE_READING_FIRST_LINE:
          if (isRtspStartLine(line)) {
            state = STATE_READING_HEADER;
          }
          break;

        case STATE_READING_HEADER:
          // Check if the line contains RTSP Content-Length header.
          long contentLength = RtspMessageUtil.parseContentLengthHeader(line);
          if (contentLength != C.LENGTH_UNSET) {
            messageBodyLength = contentLength;
          }

          if (line.isEmpty()) {
            // An empty line signals the end of the header section.
            if (messageBodyLength > 0) {
              state = STATE_READING_BODY;
            } else {
              ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
              reset();
              return linesToReturn;
            }
          }
          break;

        case STATE_READING_BODY:
          // Message body must be handled by addMessageBody().

        default:
          throw new IllegalStateException();
      }
      return null;
    }

    /** Returns a list of completed RTSP message lines, without the line terminators. */
    private ImmutableList<String> addMessageBody(byte[] messageBodyBytes) {
      checkState(state == STATE_READING_BODY);

      String messageBody;
      if (messageBodyBytes.length > 0
          && messageBodyBytes[messageBodyBytes.length - 1] == Ascii.LF) {
        if (messageBodyBytes.length > 1
            && messageBodyBytes[messageBodyBytes.length - 2] == Ascii.CR) {
          // Line ends with CRLF.
          messageBody =
              new String(
                  messageBodyBytes,
                  /* offset= */ 0,
                  /* length= */ messageBodyBytes.length - 2,
                  CHARSET);
        } else {
          // Line ends with LF.
          messageBody =
              new String(
                  messageBodyBytes,
                  /* offset= */ 0,
                  /* length= */ messageBodyBytes.length - 1,
                  CHARSET);
        }
      } else {
        throw new IllegalArgumentException("Message body is empty or does not end with a LF.");
      }

      messageLines.add(messageBody);
      ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
      reset();
      return linesToReturn;
    }

    private void reset() {
      messageLines.clear();
      state = STATE_READING_FIRST_LINE;
      messageBodyLength = 0;
    }
  }
}