/*
 * Copyright (C) 2010 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef MY_HANDLER_H_

#define MY_HANDLER_H_

//#define LOG_NDEBUG 0

#ifndef LOG_TAG
#define LOG_TAG "MyHandler"
#endif

#include <utils/Log.h>

#include "APacketSource.h"
#include "ARTPConnection.h"
#include "ARTSPConnection.h"
#include "ASessionDescription.h"

#include <ctype.h>

#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/ALooper.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>
#include <media/stagefright/Utils.h>

#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>

#include "HTTPBase.h"

#if LOG_NDEBUG
#define UNUSED_UNLESS_VERBOSE(x) (void)(x)
#else
#define UNUSED_UNLESS_VERBOSE(x)
#endif

// If no access units are received within 5 secs, assume that the rtp
// stream has ended and signal end of stream.
static int64_t kAccessUnitTimeoutUs = 10000000ll;

// If no access units arrive for the first 10 secs after starting the
// stream, assume none ever will and signal EOS or switch transports.
static int64_t kStartupTimeoutUs = 10000000ll;

static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;

static int64_t kPauseDelayUs = 3000000ll;

namespace android {

static bool GetAttribute(const char *s, const char *key, AString *value) {
    value->clear();

    size_t keyLen = strlen(key);

    for (;;) {
        while (isspace(*s)) {
            ++s;
        }

        const char *colonPos = strchr(s, ';');

        size_t len =
            (colonPos == NULL) ? strlen(s) : colonPos - s;

        if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
            value->setTo(&s[keyLen + 1], len - keyLen - 1);
            return true;
        }

        if (colonPos == NULL) {
            return false;
        }

        s = colonPos + 1;
    }
}

struct MyHandler : public AHandler {
    enum {
        kWhatConnected                  = 'conn',
        kWhatDisconnected               = 'disc',
        kWhatSeekPaused                 = 'spau',
        kWhatSeekDone                   = 'sdon',

        kWhatAccessUnit                 = 'accU',
        kWhatEOS                        = 'eos!',
        kWhatSeekDiscontinuity          = 'seeD',
        kWhatNormalPlayTimeMapping      = 'nptM',
    };

    MyHandler(
            const char *url,
            const sp<AMessage> &notify,
            bool uidValid = false, uid_t uid = 0)
        : mNotify(notify),
          mUIDValid(uidValid),
          mUID(uid),
          mNetLooper(new ALooper),
          mConn(new ARTSPConnection(mUIDValid, mUID)),
          mRTPConn(new ARTPConnection),
          mOriginalSessionURL(url),
          mSessionURL(url),
          mSetupTracksSuccessful(false),
          mSeekPending(false),
          mFirstAccessUnit(true),
          mAllTracksHaveTime(false),
          mNTPAnchorUs(-1),
          mMediaAnchorUs(-1),
          mLastMediaTimeUs(0),
          mNumAccessUnitsReceived(0),
          mCheckPending(false),
          mCheckGeneration(0),
          mCheckTimeoutGeneration(0),
          mTryTCPInterleaving(false),
          mTryFakeRTCP(false),
          mReceivedFirstRTCPPacket(false),
          mReceivedFirstRTPPacket(false),
          mSeekable(true),
          mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
          mKeepAliveGeneration(0),
          mPausing(false),
          mPauseGeneration(0),
          mPlayResponseParsed(false) {
        mNetLooper->setName("rtsp net");
        mNetLooper->start(false /* runOnCallingThread */,
                          false /* canCallJava */,
                          PRIORITY_HIGHEST);

        // Strip any authentication info from the session url, we don't
        // want to transmit user/pass in cleartext.
        AString host, path, user, pass;
        unsigned port;
        CHECK(ARTSPConnection::ParseURL(
                    mSessionURL.c_str(), &host, &port, &path, &user, &pass));

        if (user.size() > 0) {
            mSessionURL.clear();
            mSessionURL.append("rtsp://");
            mSessionURL.append(host);
            mSessionURL.append(":");
            mSessionURL.append(AStringPrintf("%u", port));
            mSessionURL.append(path);

            ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
        }

        mSessionHost = host;
    }

    void connect() {
        looper()->registerHandler(mConn);
        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);

        sp<AMessage> notify = new AMessage('biny', this);
        mConn->observeBinaryData(notify);

        sp<AMessage> reply = new AMessage('conn', this);
        mConn->connect(mOriginalSessionURL.c_str(), reply);
    }

    void loadSDP(const sp<ASessionDescription>& desc) {
        looper()->registerHandler(mConn);
        (1 ? mNetLooper : looper())->registerHandler(mRTPConn);

        sp<AMessage> notify = new AMessage('biny', this);
        mConn->observeBinaryData(notify);

        sp<AMessage> reply = new AMessage('sdpl', this);
        reply->setObject("description", desc);
        mConn->connect(mOriginalSessionURL.c_str(), reply);
    }

    AString getControlURL() {
        AString sessionLevelControlURL;
        if (mSessionDesc->findAttribute(
                0,
                "a=control",
                &sessionLevelControlURL)) {
            if (sessionLevelControlURL.compare("*") == 0) {
                return mBaseURL;
            } else {
                AString controlURL;
                CHECK(MakeURL(
                        mBaseURL.c_str(),
                        sessionLevelControlURL.c_str(),
                        &controlURL));
                return controlURL;
            }
        } else {
            return mSessionURL;
        }
    }

    void disconnect() {
        (new AMessage('abor', this))->post();
    }

    void seek(int64_t timeUs) {
        sp<AMessage> msg = new AMessage('seek', this);
        msg->setInt64("time", timeUs);
        mPauseGeneration++;
        msg->post();
    }

    void continueSeekAfterPause(int64_t timeUs) {
        sp<AMessage> msg = new AMessage('see1', this);
        msg->setInt64("time", timeUs);
        msg->post();
    }

    bool isSeekable() const {
        return mSeekable;
    }

    void pause() {
        sp<AMessage> msg = new AMessage('paus', this);
        mPauseGeneration++;
        msg->setInt32("pausecheck", mPauseGeneration);
        msg->post(kPauseDelayUs);
    }

    void resume() {
        sp<AMessage> msg = new AMessage('resu', this);
        mPauseGeneration++;
        msg->post();
    }

    static void addRR(const sp<ABuffer> &buf) {
        uint8_t *ptr = buf->data() + buf->size();
        ptr[0] = 0x80 | 0;
        ptr[1] = 201;  // RR
        ptr[2] = 0;
        ptr[3] = 1;
        ptr[4] = 0xde;  // SSRC
        ptr[5] = 0xad;
        ptr[6] = 0xbe;
        ptr[7] = 0xef;

        buf->setRange(0, buf->size() + 8);
    }

    static void addSDES(int s, const sp<ABuffer> &buffer) {
        struct sockaddr_in addr;
        socklen_t addrSize = sizeof(addr);
        if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
            inet_aton("0.0.0.0", &(addr.sin_addr));
        }

        uint8_t *data = buffer->data() + buffer->size();
        data[0] = 0x80 | 1;
        data[1] = 202;  // SDES
        data[4] = 0xde;  // SSRC
        data[5] = 0xad;
        data[6] = 0xbe;
        data[7] = 0xef;

        size_t offset = 8;

        data[offset++] = 1;  // CNAME

        AString cname = "stagefright@";
        cname.append(inet_ntoa(addr.sin_addr));
        data[offset++] = cname.size();

        memcpy(&data[offset], cname.c_str(), cname.size());
        offset += cname.size();

        data[offset++] = 6;  // TOOL

        AString tool = MakeUserAgent();

        data[offset++] = tool.size();

        memcpy(&data[offset], tool.c_str(), tool.size());
        offset += tool.size();

        data[offset++] = 0;

        if ((offset % 4) > 0) {
            size_t count = 4 - (offset % 4);
            switch (count) {
                case 3:
                    data[offset++] = 0;
                case 2:
                    data[offset++] = 0;
                case 1:
                    data[offset++] = 0;
            }
        }

        size_t numWords = (offset / 4) - 1;
        data[2] = numWords >> 8;
        data[3] = numWords & 0xff;

        buffer->setRange(buffer->offset(), buffer->size() + offset);
    }

    // In case we're behind NAT, fire off two UDP packets to the remote
    // rtp/rtcp ports to poke a hole into the firewall for future incoming
    // packets. We're going to send an RR/SDES RTCP packet to both of them.
    bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
        struct sockaddr_in addr;
        memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
        addr.sin_family = AF_INET;

        AString source;
        AString server_port;
        if (!GetAttribute(transport.c_str(),
                          "source",
                          &source)) {
            ALOGW("Missing 'source' field in Transport response. Using "
                 "RTSP endpoint address.");

            struct hostent *ent = gethostbyname(mSessionHost.c_str());
            if (ent == NULL) {
                ALOGE("Failed to look up address of session host '%s'",
                     mSessionHost.c_str());

                return false;
            }

            addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
        } else {
            addr.sin_addr.s_addr = inet_addr(source.c_str());
        }

        if (!GetAttribute(transport.c_str(),
                                 "server_port",
                                 &server_port)) {
            ALOGI("Missing 'server_port' field in Transport response.");
            return false;
        }

        int rtpPort, rtcpPort;
        if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
                || rtpPort <= 0 || rtpPort > 65535
                || rtcpPort <=0 || rtcpPort > 65535
                || rtcpPort != rtpPort + 1) {
            ALOGE("Server picked invalid RTP/RTCP port pair %s,"
                 " RTP port must be even, RTCP port must be one higher.",
                 server_port.c_str());

            return false;
        }

        if (rtpPort & 1) {
            ALOGW("Server picked an odd RTP port, it should've picked an "
                 "even one, we'll let it pass for now, but this may break "
                 "in the future.");
        }

        if (addr.sin_addr.s_addr == INADDR_NONE) {
            return true;
        }

        if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
            // No firewalls to traverse on the loopback interface.
            return true;
        }

        // Make up an RR/SDES RTCP packet.
        sp<ABuffer> buf = new ABuffer(65536);
        buf->setRange(0, 0);
        addRR(buf);
        addSDES(rtpSocket, buf);

        addr.sin_port = htons(rtpPort);

        ssize_t n = sendto(
                rtpSocket, buf->data(), buf->size(), 0,
                (const sockaddr *)&addr, sizeof(addr));

        if (n < (ssize_t)buf->size()) {
            ALOGE("failed to poke a hole for RTP packets");
            return false;
        }

        addr.sin_port = htons(rtcpPort);

        n = sendto(
                rtcpSocket, buf->data(), buf->size(), 0,
                (const sockaddr *)&addr, sizeof(addr));

        if (n < (ssize_t)buf->size()) {
            ALOGE("failed to poke a hole for RTCP packets");
            return false;
        }

        ALOGV("successfully poked holes.");

        return true;
    }

    static bool isLiveStream(const sp<ASessionDescription> &desc) {
        AString attrLiveStream;
        if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
            ssize_t semicolonPos = attrLiveStream.find(";", 2);

            const char* liveStreamValue;
            if (semicolonPos < 0) {
                liveStreamValue = attrLiveStream.c_str();
            } else {
                AString valString;
                valString.setTo(attrLiveStream,
                        semicolonPos + 1,
                        attrLiveStream.size() - semicolonPos - 1);
                liveStreamValue = valString.c_str();
            }

            uint32_t value = strtoul(liveStreamValue, NULL, 10);
            if (value == 1) {
                ALOGV("found live stream");
                return true;
            }
        } else {
            // It is a live stream if no duration is returned
            int64_t durationUs;
            if (!desc->getDurationUs(&durationUs)) {
                ALOGV("No duration found, assume live stream");
                return true;
            }
        }

        return false;
    }

    virtual void onMessageReceived(const sp<AMessage> &msg) {
        switch (msg->what()) {
            case 'conn':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("connection request completed with result %d (%s)",
                     result, strerror(-result));

                if (result == OK) {
                    AString request;
                    request = "DESCRIBE ";
                    request.append(mSessionURL);
                    request.append(" RTSP/1.0\r\n");
                    request.append("Accept: application/sdp\r\n");
                    request.append("\r\n");

                    sp<AMessage> reply = new AMessage('desc', this);
                    mConn->sendRequest(request.c_str(), reply);
                } else {
                    (new AMessage('disc', this))->post();
                }
                break;
            }

            case 'disc':
            {
                ++mKeepAliveGeneration;

                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    sp<AMessage> reply = new AMessage('conn', this);
                    mConn->connect(mOriginalSessionURL.c_str(), reply);
                } else {
                    (new AMessage('quit', this))->post();
                }
                break;
            }

            case 'desc':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("DESCRIBE completed with result %d (%s)",
                     result, strerror(-result));

                if (result == OK) {
                    sp<RefBase> obj;
                    CHECK(msg->findObject("response", &obj));
                    sp<ARTSPResponse> response =
                        static_cast<ARTSPResponse *>(obj.get());

                    if (response->mStatusCode == 301 || response->mStatusCode == 302) {
                        ssize_t i = response->mHeaders.indexOfKey("location");
                        CHECK_GE(i, 0);

                        mOriginalSessionURL = response->mHeaders.valueAt(i);
                        mSessionURL = mOriginalSessionURL;

                        // Strip any authentication info from the session url, we don't
                        // want to transmit user/pass in cleartext.
                        AString host, path, user, pass;
                        unsigned port;
                        if (ARTSPConnection::ParseURL(
                                    mSessionURL.c_str(), &host, &port, &path, &user, &pass)
                                && user.size() > 0) {
                            mSessionURL.clear();
                            mSessionURL.append("rtsp://");
                            mSessionURL.append(host);
                            mSessionURL.append(":");
                            mSessionURL.append(AStringPrintf("%u", port));
                            mSessionURL.append(path);

                            ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
                        }

                        sp<AMessage> reply = new AMessage('conn', this);
                        mConn->connect(mOriginalSessionURL.c_str(), reply);
                        break;
                    }

                    if (response->mStatusCode != 200) {
                        result = UNKNOWN_ERROR;
                    } else if (response->mContent == NULL) {
                        result = ERROR_MALFORMED;
                        ALOGE("The response has no content.");
                    } else {
                        mSessionDesc = new ASessionDescription;

                        mSessionDesc->setTo(
                                response->mContent->data(),
                                response->mContent->size());

                        if (!mSessionDesc->isValid()) {
                            ALOGE("Failed to parse session description.");
                            result = ERROR_MALFORMED;
                        } else {
                            ssize_t i = response->mHeaders.indexOfKey("content-base");
                            if (i >= 0) {
                                mBaseURL = response->mHeaders.valueAt(i);
                            } else {
                                i = response->mHeaders.indexOfKey("content-location");
                                if (i >= 0) {
                                    mBaseURL = response->mHeaders.valueAt(i);
                                } else {
                                    mBaseURL = mSessionURL;
                                }
                            }

                            mSeekable = !isLiveStream(mSessionDesc);

                            if (!mBaseURL.startsWith("rtsp://")) {
                                // Some misbehaving servers specify a relative
                                // URL in one of the locations above, combine
                                // it with the absolute session URL to get
                                // something usable...

                                ALOGW("Server specified a non-absolute base URL"
                                     ", combining it with the session URL to "
                                     "get something usable...");

                                AString tmp;
                                CHECK(MakeURL(
                                            mSessionURL.c_str(),
                                            mBaseURL.c_str(),
                                            &tmp));

                                mBaseURL = tmp;
                            }

                            mControlURL = getControlURL();

                            if (mSessionDesc->countTracks() < 2) {
                                // There's no actual tracks in this session.
                                // The first "track" is merely session meta
                                // data.

                                ALOGW("Session doesn't contain any playable "
                                     "tracks. Aborting.");
                                result = ERROR_UNSUPPORTED;
                            } else {
                                setupTrack(1);
                            }
                        }
                    }
                }

                if (result != OK) {
                    sp<AMessage> reply = new AMessage('disc', this);
                    mConn->disconnect(reply);
                }
                break;
            }

            case 'sdpl':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("SDP connection request completed with result %d (%s)",
                     result, strerror(-result));

                if (result == OK) {
                    sp<RefBase> obj;
                    CHECK(msg->findObject("description", &obj));
                    mSessionDesc =
                        static_cast<ASessionDescription *>(obj.get());

                    if (!mSessionDesc->isValid()) {
                        ALOGE("Failed to parse session description.");
                        result = ERROR_MALFORMED;
                    } else {
                        mBaseURL = mSessionURL;

                        mSeekable = !isLiveStream(mSessionDesc);

                        mControlURL = getControlURL();

                        if (mSessionDesc->countTracks() < 2) {
                            // There's no actual tracks in this session.
                            // The first "track" is merely session meta
                            // data.

                            ALOGW("Session doesn't contain any playable "
                                 "tracks. Aborting.");
                            result = ERROR_UNSUPPORTED;
                        } else {
                            setupTrack(1);
                        }
                    }
                }

                if (result != OK) {
                    sp<AMessage> reply = new AMessage('disc', this);
                    mConn->disconnect(reply);
                }
                break;
            }

            case 'setu':
            {
                size_t index;
                CHECK(msg->findSize("index", &index));

                TrackInfo *track = NULL;
                size_t trackIndex;
                if (msg->findSize("track-index", &trackIndex)) {
                    track = &mTracks.editItemAt(trackIndex);
                }

                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("SETUP(%zu) completed with result %d (%s)",
                     index, result, strerror(-result));

                if (result == OK) {
                    CHECK(track != NULL);

                    sp<RefBase> obj;
                    CHECK(msg->findObject("response", &obj));
                    sp<ARTSPResponse> response =
                        static_cast<ARTSPResponse *>(obj.get());

                    if (response->mStatusCode != 200) {
                        result = UNKNOWN_ERROR;
                    } else {
                        ssize_t i = response->mHeaders.indexOfKey("session");
                        CHECK_GE(i, 0);

                        mSessionID = response->mHeaders.valueAt(i);

                        mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
                        AString timeoutStr;
                        if (GetAttribute(
                                    mSessionID.c_str(), "timeout", &timeoutStr)) {
                            char *end;
                            unsigned long timeoutSecs =
                                strtoul(timeoutStr.c_str(), &end, 10);

                            if (end == timeoutStr.c_str() || *end != '\0') {
                                ALOGW("server specified malformed timeout '%s'",
                                     timeoutStr.c_str());

                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
                            } else if (timeoutSecs < 15) {
                                ALOGW("server specified too short a timeout "
                                     "(%lu secs), using default.",
                                     timeoutSecs);

                                mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
                            } else {
                                mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;

                                ALOGI("server specified timeout of %lu secs.",
                                     timeoutSecs);
                            }
                        }

                        i = mSessionID.find(";");
                        if (i >= 0) {
                            // Remove options, i.e. ";timeout=90"
                            mSessionID.erase(i, mSessionID.size() - i);
                        }

                        sp<AMessage> notify = new AMessage('accu', this);
                        notify->setSize("track-index", trackIndex);

                        i = response->mHeaders.indexOfKey("transport");
                        CHECK_GE(i, 0);

                        if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
                            if (!track->mUsingInterleavedTCP) {
                                AString transport = response->mHeaders.valueAt(i);

                                // We are going to continue even if we were
                                // unable to poke a hole into the firewall...
                                pokeAHole(
                                        track->mRTPSocket,
                                        track->mRTCPSocket,
                                        transport);
                            }

                            mRTPConn->addStream(
                                    track->mRTPSocket, track->mRTCPSocket,
                                    mSessionDesc, index,
                                    notify, track->mUsingInterleavedTCP);

                            mSetupTracksSuccessful = true;
                        } else {
                            result = BAD_VALUE;
                        }
                    }
                }

                if (result != OK) {
                    if (track) {
                        if (!track->mUsingInterleavedTCP) {
                            // Clear the tag
                            if (mUIDValid) {
                                HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
                                HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
                                HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
                                HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
                            }

                            close(track->mRTPSocket);
                            close(track->mRTCPSocket);
                        }

                        mTracks.removeItemsAt(trackIndex);
                    }
                }

                ++index;
                if (result == OK && index < mSessionDesc->countTracks()) {
                    setupTrack(index);
                } else if (mSetupTracksSuccessful) {
                    ++mKeepAliveGeneration;
                    postKeepAlive();

                    AString request = "PLAY ";
                    request.append(mControlURL);
                    request.append(" RTSP/1.0\r\n");

                    request.append("Session: ");
                    request.append(mSessionID);
                    request.append("\r\n");

                    request.append("\r\n");

                    sp<AMessage> reply = new AMessage('play', this);
                    mConn->sendRequest(request.c_str(), reply);
                } else {
                    sp<AMessage> reply = new AMessage('disc', this);
                    mConn->disconnect(reply);
                }
                break;
            }

            case 'play':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("PLAY completed with result %d (%s)",
                     result, strerror(-result));

                if (result == OK) {
                    sp<RefBase> obj;
                    CHECK(msg->findObject("response", &obj));
                    sp<ARTSPResponse> response =
                        static_cast<ARTSPResponse *>(obj.get());

                    if (response->mStatusCode != 200) {
                        result = UNKNOWN_ERROR;
                    } else {
                        parsePlayResponse(response);

                        sp<AMessage> timeout = new AMessage('tiou', this);
                        mCheckTimeoutGeneration++;
                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
                        timeout->post(kStartupTimeoutUs);
                    }
                }

                if (result != OK) {
                    sp<AMessage> reply = new AMessage('disc', this);
                    mConn->disconnect(reply);
                }

                break;
            }

            case 'aliv':
            {
                int32_t generation;
                CHECK(msg->findInt32("generation", &generation));

                if (generation != mKeepAliveGeneration) {
                    // obsolete event.
                    break;
                }

                AString request;
                request.append("OPTIONS ");
                request.append(mSessionURL);
                request.append(" RTSP/1.0\r\n");
                request.append("Session: ");
                request.append(mSessionID);
                request.append("\r\n");
                request.append("\r\n");

                sp<AMessage> reply = new AMessage('opts', this);
                reply->setInt32("generation", mKeepAliveGeneration);
                mConn->sendRequest(request.c_str(), reply);
                break;
            }

            case 'opts':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("OPTIONS completed with result %d (%s)",
                     result, strerror(-result));

                int32_t generation;
                CHECK(msg->findInt32("generation", &generation));

                if (generation != mKeepAliveGeneration) {
                    // obsolete event.
                    break;
                }

                postKeepAlive();
                break;
            }

            case 'abor':
            {
                for (size_t i = 0; i < mTracks.size(); ++i) {
                    TrackInfo *info = &mTracks.editItemAt(i);

                    if (!mFirstAccessUnit) {
                        postQueueEOS(i, ERROR_END_OF_STREAM);
                    }

                    if (!info->mUsingInterleavedTCP) {
                        mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);

                        // Clear the tag
                        if (mUIDValid) {
                            HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
                            HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
                            HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
                            HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
                        }

                        close(info->mRTPSocket);
                        close(info->mRTCPSocket);
                    }
                }
                mTracks.clear();
                mSetupTracksSuccessful = false;
                mSeekPending = false;
                mFirstAccessUnit = true;
                mAllTracksHaveTime = false;
                mNTPAnchorUs = -1;
                mMediaAnchorUs = -1;
                mNumAccessUnitsReceived = 0;
                mReceivedFirstRTCPPacket = false;
                mReceivedFirstRTPPacket = false;
                mPausing = false;
                mSeekable = true;

                sp<AMessage> reply = new AMessage('tear', this);

                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    reply->setInt32("reconnect", true);
                }

                AString request;
                request = "TEARDOWN ";

                // XXX should use aggregate url from SDP here...
                request.append(mSessionURL);
                request.append(" RTSP/1.0\r\n");

                request.append("Session: ");
                request.append(mSessionID);
                request.append("\r\n");

                request.append("\r\n");

                mConn->sendRequest(request.c_str(), reply);
                break;
            }

            case 'tear':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("TEARDOWN completed with result %d (%s)",
                     result, strerror(-result));

                sp<AMessage> reply = new AMessage('disc', this);

                int32_t reconnect;
                if (msg->findInt32("reconnect", &reconnect) && reconnect) {
                    reply->setInt32("reconnect", true);
                }

                mConn->disconnect(reply);
                break;
            }

            case 'quit':
            {
                sp<AMessage> msg = mNotify->dup();
                msg->setInt32("what", kWhatDisconnected);
                msg->setInt32("result", UNKNOWN_ERROR);
                msg->post();
                break;
            }

            case 'chek':
            {
                int32_t generation;
                CHECK(msg->findInt32("generation", &generation));
                if (generation != mCheckGeneration) {
                    // This is an outdated message. Ignore.
                    break;
                }

                if (mNumAccessUnitsReceived == 0) {
#if 1
                    ALOGI("stream ended? aborting.");
                    (new AMessage('abor', this))->post();
                    break;
#else
                    ALOGI("haven't seen an AU in a looong time.");
#endif
                }

                mNumAccessUnitsReceived = 0;
                msg->post(kAccessUnitTimeoutUs);
                break;
            }

            case 'accu':
            {
                int32_t timeUpdate;
                if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
                    size_t trackIndex;
                    CHECK(msg->findSize("track-index", &trackIndex));

                    uint32_t rtpTime;
                    uint64_t ntpTime;
                    CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
                    CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));

                    onTimeUpdate(trackIndex, rtpTime, ntpTime);
                    break;
                }

                int32_t first;
                if (msg->findInt32("first-rtcp", &first)) {
                    mReceivedFirstRTCPPacket = true;
                    break;
                }

                if (msg->findInt32("first-rtp", &first)) {
                    mReceivedFirstRTPPacket = true;
                    break;
                }

                ++mNumAccessUnitsReceived;
                postAccessUnitTimeoutCheck();

                size_t trackIndex;
                CHECK(msg->findSize("track-index", &trackIndex));

                if (trackIndex >= mTracks.size()) {
                    ALOGV("late packets ignored.");
                    break;
                }

                TrackInfo *track = &mTracks.editItemAt(trackIndex);

                int32_t eos;
                if (msg->findInt32("eos", &eos)) {
                    ALOGI("received BYE on track index %zu", trackIndex);
                    if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
                        ALOGI("No time established => fake existing data");

                        track->mEOSReceived = true;
                        mTryFakeRTCP = true;
                        mReceivedFirstRTCPPacket = true;
                        fakeTimestamps();
                    } else {
                        postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
                    }
                    return;
                }

                sp<ABuffer> accessUnit;
                CHECK(msg->findBuffer("access-unit", &accessUnit));

                uint32_t seqNum = (uint32_t)accessUnit->int32Data();

                if (mSeekPending) {
                    ALOGV("we're seeking, dropping stale packet.");
                    break;
                }

                if (seqNum < track->mFirstSeqNumInSegment) {
                    ALOGV("dropping stale access-unit (%d < %d)",
                         seqNum, track->mFirstSeqNumInSegment);
                    break;
                }

                if (track->mNewSegment) {
                    track->mNewSegment = false;
                }

                onAccessUnitComplete(trackIndex, accessUnit);
                break;
            }

            case 'paus':
            {
                int32_t generation;
                CHECK(msg->findInt32("pausecheck", &generation));
                if (generation != mPauseGeneration) {
                    ALOGV("Ignoring outdated pause message.");
                    break;
                }

                if (!mSeekable) {
                    ALOGW("This is a live stream, ignoring pause request.");
                    break;
                }
                mCheckPending = true;
                ++mCheckGeneration;
                mPausing = true;

                AString request = "PAUSE ";
                request.append(mControlURL);
                request.append(" RTSP/1.0\r\n");

                request.append("Session: ");
                request.append(mSessionID);
                request.append("\r\n");

                request.append("\r\n");

                sp<AMessage> reply = new AMessage('pau2', this);
                mConn->sendRequest(request.c_str(), reply);
                break;
            }

            case 'pau2':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));
                mCheckTimeoutGeneration++;

                ALOGI("PAUSE completed with result %d (%s)",
                     result, strerror(-result));
                break;
            }

            case 'resu':
            {
                if (mPausing && mSeekPending) {
                    // If seeking, Play will be sent from see1 instead
                    break;
                }

                if (!mPausing) {
                    // Dont send PLAY if we have not paused
                    break;
                }
                AString request = "PLAY ";
                request.append(mControlURL);
                request.append(" RTSP/1.0\r\n");

                request.append("Session: ");
                request.append(mSessionID);
                request.append("\r\n");

                request.append("\r\n");

                sp<AMessage> reply = new AMessage('res2', this);
                mConn->sendRequest(request.c_str(), reply);
                break;
            }

            case 'res2':
            {
                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("PLAY completed with result %d (%s)",
                     result, strerror(-result));

                mCheckPending = false;
                postAccessUnitTimeoutCheck();

                if (result == OK) {
                    sp<RefBase> obj;
                    CHECK(msg->findObject("response", &obj));
                    sp<ARTSPResponse> response =
                        static_cast<ARTSPResponse *>(obj.get());

                    if (response->mStatusCode != 200) {
                        result = UNKNOWN_ERROR;
                    } else {
                        parsePlayResponse(response);

                        // Post new timeout in order to make sure to use
                        // fake timestamps if no new Sender Reports arrive
                        sp<AMessage> timeout = new AMessage('tiou', this);
                        mCheckTimeoutGeneration++;
                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
                        timeout->post(kStartupTimeoutUs);
                    }
                }

                if (result != OK) {
                    ALOGE("resume failed, aborting.");
                    (new AMessage('abor', this))->post();
                }

                mPausing = false;
                break;
            }

            case 'seek':
            {
                if (!mSeekable) {
                    ALOGW("This is a live stream, ignoring seek request.");

                    sp<AMessage> msg = mNotify->dup();
                    msg->setInt32("what", kWhatSeekDone);
                    msg->post();
                    break;
                }

                int64_t timeUs;
                CHECK(msg->findInt64("time", &timeUs));

                mSeekPending = true;

                // Disable the access unit timeout until we resumed
                // playback again.
                mCheckPending = true;
                ++mCheckGeneration;

                sp<AMessage> reply = new AMessage('see0', this);
                reply->setInt64("time", timeUs);

                if (mPausing) {
                    // PAUSE already sent
                    ALOGI("Pause already sent");
                    reply->post();
                    break;
                }
                AString request = "PAUSE ";
                request.append(mControlURL);
                request.append(" RTSP/1.0\r\n");

                request.append("Session: ");
                request.append(mSessionID);
                request.append("\r\n");

                request.append("\r\n");

                mConn->sendRequest(request.c_str(), reply);
                break;
            }

            case 'see0':
            {
                // Session is paused now.
                status_t err = OK;
                msg->findInt32("result", &err);

                int64_t timeUs;
                CHECK(msg->findInt64("time", &timeUs));

                sp<AMessage> notify = mNotify->dup();
                notify->setInt32("what", kWhatSeekPaused);
                notify->setInt32("err", err);
                notify->setInt64("time", timeUs);
                notify->post();
                break;

            }

            case 'see1':
            {
                for (size_t i = 0; i < mTracks.size(); ++i) {
                    TrackInfo *info = &mTracks.editItemAt(i);

                    postQueueSeekDiscontinuity(i);
                    info->mEOSReceived = false;

                    info->mRTPAnchor = 0;
                    info->mNTPAnchorUs = -1;
                }

                mAllTracksHaveTime = false;
                mNTPAnchorUs = -1;

                // Start new timeoutgeneration to avoid getting timeout
                // before PLAY response arrive
                sp<AMessage> timeout = new AMessage('tiou', this);
                mCheckTimeoutGeneration++;
                timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
                timeout->post(kStartupTimeoutUs);

                int64_t timeUs;
                CHECK(msg->findInt64("time", &timeUs));

                AString request = "PLAY ";
                request.append(mControlURL);
                request.append(" RTSP/1.0\r\n");

                request.append("Session: ");
                request.append(mSessionID);
                request.append("\r\n");

                request.append(
                        AStringPrintf(
                            "Range: npt=%lld-\r\n", timeUs / 1000000ll));

                request.append("\r\n");

                sp<AMessage> reply = new AMessage('see2', this);
                mConn->sendRequest(request.c_str(), reply);
                break;
            }

            case 'see2':
            {
                if (mTracks.size() == 0) {
                    // We have already hit abor, break
                    break;
                }

                int32_t result;
                CHECK(msg->findInt32("result", &result));

                ALOGI("PLAY completed with result %d (%s)",
                     result, strerror(-result));

                mCheckPending = false;
                postAccessUnitTimeoutCheck();

                if (result == OK) {
                    sp<RefBase> obj;
                    CHECK(msg->findObject("response", &obj));
                    sp<ARTSPResponse> response =
                        static_cast<ARTSPResponse *>(obj.get());

                    if (response->mStatusCode != 200) {
                        result = UNKNOWN_ERROR;
                    } else {
                        parsePlayResponse(response);

                        // Post new timeout in order to make sure to use
                        // fake timestamps if no new Sender Reports arrive
                        sp<AMessage> timeout = new AMessage('tiou', this);
                        mCheckTimeoutGeneration++;
                        timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
                        timeout->post(kStartupTimeoutUs);

                        ssize_t i = response->mHeaders.indexOfKey("rtp-info");
                        CHECK_GE(i, 0);

                        ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());

                        ALOGI("seek completed.");
                    }
                }

                if (result != OK) {
                    ALOGE("seek failed, aborting.");
                    (new AMessage('abor', this))->post();
                }

                mPausing = false;
                mSeekPending = false;

                sp<AMessage> msg = mNotify->dup();
                msg->setInt32("what", kWhatSeekDone);
                msg->post();
                break;
            }

            case 'biny':
            {
                sp<ABuffer> buffer;
                CHECK(msg->findBuffer("buffer", &buffer));

                int32_t index;
                CHECK(buffer->meta()->findInt32("index", &index));

                mRTPConn->injectPacket(index, buffer);
                break;
            }

            case 'tiou':
            {
                int32_t timeoutGenerationCheck;
                CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
                if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
                    // This is an outdated message. Ignore.
                    // This typically happens if a lot of seeks are
                    // performed, since new timeout messages now are
                    // posted at seek as well.
                    break;
                }
                if (!mReceivedFirstRTCPPacket) {
                    if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
                        ALOGW("We received RTP packets but no RTCP packets, "
                             "using fake timestamps.");

                        mTryFakeRTCP = true;

                        mReceivedFirstRTCPPacket = true;

                        fakeTimestamps();
                    } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
                        ALOGW("Never received any data, switching transports.");

                        mTryTCPInterleaving = true;

                        sp<AMessage> msg = new AMessage('abor', this);
                        msg->setInt32("reconnect", true);
                        msg->post();
                    } else {
                        ALOGW("Never received any data, disconnecting.");
                        (new AMessage('abor', this))->post();
                    }
                } else {
                    if (!mAllTracksHaveTime) {
                        ALOGW("We received some RTCP packets, but time "
                              "could not be established on all tracks, now "
                              "using fake timestamps");

                        fakeTimestamps();
                    }
                }
                break;
            }

            default:
                TRESPASS();
                break;
        }
    }

    void postKeepAlive() {
        sp<AMessage> msg = new AMessage('aliv', this);
        msg->setInt32("generation", mKeepAliveGeneration);
        msg->post((mKeepAliveTimeoutUs * 9) / 10);
    }

    void postAccessUnitTimeoutCheck() {
        if (mCheckPending) {
            return;
        }

        mCheckPending = true;
        sp<AMessage> check = new AMessage('chek', this);
        check->setInt32("generation", mCheckGeneration);
        check->post(kAccessUnitTimeoutUs);
    }

    static void SplitString(
            const AString &s, const char *separator, List<AString> *items) {
        items->clear();
        size_t start = 0;
        while (start < s.size()) {
            ssize_t offset = s.find(separator, start);

            if (offset < 0) {
                items->push_back(AString(s, start, s.size() - start));
                break;
            }

            items->push_back(AString(s, start, offset - start));
            start = offset + strlen(separator);
        }
    }

    void parsePlayResponse(const sp<ARTSPResponse> &response) {
        mPlayResponseParsed = true;
        if (mTracks.size() == 0) {
            ALOGV("parsePlayResponse: late packets ignored.");
            return;
        }

        ssize_t i = response->mHeaders.indexOfKey("range");
        if (i < 0) {
            // Server doesn't even tell use what range it is going to
            // play, therefore we won't support seeking.
            return;
        }

        AString range = response->mHeaders.valueAt(i);
        ALOGV("Range: %s", range.c_str());

        AString val;
        CHECK(GetAttribute(range.c_str(), "npt", &val));

        float npt1, npt2;
        if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
            // This is a live stream and therefore not seekable.

            ALOGI("This is a live stream");
            return;
        }

        i = response->mHeaders.indexOfKey("rtp-info");
        CHECK_GE(i, 0);

        AString rtpInfo = response->mHeaders.valueAt(i);
        List<AString> streamInfos;
        SplitString(rtpInfo, ",", &streamInfos);

        int n = 1;
        for (List<AString>::iterator it = streamInfos.begin();
             it != streamInfos.end(); ++it) {
            (*it).trim();
            ALOGV("streamInfo[%d] = %s", n, (*it).c_str());

            CHECK(GetAttribute((*it).c_str(), "url", &val));

            size_t trackIndex = 0;
            while (trackIndex < mTracks.size()
                    && !(val == mTracks.editItemAt(trackIndex).mURL)) {
                ++trackIndex;
            }
            CHECK_LT(trackIndex, mTracks.size());

            CHECK(GetAttribute((*it).c_str(), "seq", &val));

            char *end;
            unsigned long seq = strtoul(val.c_str(), &end, 10);

            TrackInfo *info = &mTracks.editItemAt(trackIndex);
            info->mFirstSeqNumInSegment = seq;
            info->mNewSegment = true;

            CHECK(GetAttribute((*it).c_str(), "rtptime", &val));

            uint32_t rtpTime = strtoul(val.c_str(), &end, 10);

            ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);

            info->mNormalPlayTimeRTP = rtpTime;
            info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);

            if (!mFirstAccessUnit) {
                postNormalPlayTimeMapping(
                        trackIndex,
                        info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
            }

            ++n;
        }
    }

    sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
        CHECK_GE(index, 0u);
        CHECK_LT(index, mTracks.size());

        const TrackInfo &info = mTracks.itemAt(index);

        *timeScale = info.mTimeScale;

        return info.mPacketSource->getFormat();
    }

    size_t countTracks() const {
        return mTracks.size();
    }

private:
    struct TrackInfo {
        AString mURL;
        int mRTPSocket;
        int mRTCPSocket;
        bool mUsingInterleavedTCP;
        uint32_t mFirstSeqNumInSegment;
        bool mNewSegment;

        uint32_t mRTPAnchor;
        int64_t mNTPAnchorUs;
        int32_t mTimeScale;
        bool mEOSReceived;

        uint32_t mNormalPlayTimeRTP;
        int64_t mNormalPlayTimeUs;

        sp<APacketSource> mPacketSource;

        // Stores packets temporarily while no notion of time
        // has been established yet.
        List<sp<ABuffer> > mPackets;
    };

    sp<AMessage> mNotify;
    bool mUIDValid;
    uid_t mUID;
    sp<ALooper> mNetLooper;
    sp<ARTSPConnection> mConn;
    sp<ARTPConnection> mRTPConn;
    sp<ASessionDescription> mSessionDesc;
    AString mOriginalSessionURL;  // This one still has user:pass@
    AString mSessionURL;
    AString mSessionHost;
    AString mBaseURL;
    AString mControlURL;
    AString mSessionID;
    bool mSetupTracksSuccessful;
    bool mSeekPending;
    bool mFirstAccessUnit;

    bool mAllTracksHaveTime;
    int64_t mNTPAnchorUs;
    int64_t mMediaAnchorUs;
    int64_t mLastMediaTimeUs;

    int64_t mNumAccessUnitsReceived;
    bool mCheckPending;
    int32_t mCheckGeneration;
    int32_t mCheckTimeoutGeneration;
    bool mTryTCPInterleaving;
    bool mTryFakeRTCP;
    bool mReceivedFirstRTCPPacket;
    bool mReceivedFirstRTPPacket;
    bool mSeekable;
    int64_t mKeepAliveTimeoutUs;
    int32_t mKeepAliveGeneration;
    bool mPausing;
    int32_t mPauseGeneration;

    Vector<TrackInfo> mTracks;

    bool mPlayResponseParsed;

    void setupTrack(size_t index) {
        sp<APacketSource> source =
            new APacketSource(mSessionDesc, index);

        if (source->initCheck() != OK) {
            ALOGW("Unsupported format. Ignoring track #%zu.", index);

            sp<AMessage> reply = new AMessage('setu', this);
            reply->setSize("index", index);
            reply->setInt32("result", ERROR_UNSUPPORTED);
            reply->post();
            return;
        }

        AString url;
        CHECK(mSessionDesc->findAttribute(index, "a=control", &url));

        AString trackURL;
        CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));

        mTracks.push(TrackInfo());
        TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
        info->mURL = trackURL;
        info->mPacketSource = source;
        info->mUsingInterleavedTCP = false;
        info->mFirstSeqNumInSegment = 0;
        info->mNewSegment = true;
        info->mRTPSocket = -1;
        info->mRTCPSocket = -1;
        info->mRTPAnchor = 0;
        info->mNTPAnchorUs = -1;
        info->mNormalPlayTimeRTP = 0;
        info->mNormalPlayTimeUs = 0ll;

        unsigned long PT;
        AString formatDesc;
        AString formatParams;
        mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);

        int32_t timescale;
        int32_t numChannels;
        ASessionDescription::ParseFormatDesc(
                formatDesc.c_str(), &timescale, &numChannels);

        info->mTimeScale = timescale;
        info->mEOSReceived = false;

        ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());

        AString request = "SETUP ";
        request.append(trackURL);
        request.append(" RTSP/1.0\r\n");

        if (mTryTCPInterleaving) {
            size_t interleaveIndex = 2 * (mTracks.size() - 1);
            info->mUsingInterleavedTCP = true;
            info->mRTPSocket = interleaveIndex;
            info->mRTCPSocket = interleaveIndex + 1;

            request.append("Transport: RTP/AVP/TCP;interleaved=");
            request.append(interleaveIndex);
            request.append("-");
            request.append(interleaveIndex + 1);
        } else {
            unsigned rtpPort;
            ARTPConnection::MakePortPair(
                    &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);

            if (mUIDValid) {
                HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
                                                (uint32_t)*(uint32_t*) "RTP_");
                HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
                                                (uint32_t)*(uint32_t*) "RTP_");
                HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
                HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
            }

            request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
            request.append(rtpPort);
            request.append("-");
            request.append(rtpPort + 1);
        }

        request.append("\r\n");

        if (index > 1) {
            request.append("Session: ");
            request.append(mSessionID);
            request.append("\r\n");
        }

        request.append("\r\n");

        sp<AMessage> reply = new AMessage('setu', this);
        reply->setSize("index", index);
        reply->setSize("track-index", mTracks.size() - 1);
        mConn->sendRequest(request.c_str(), reply);
    }

    static bool MakeURL(const char *baseURL, const char *url, AString *out) {
        out->clear();

        if (strncasecmp("rtsp://", baseURL, 7)) {
            // Base URL must be absolute
            return false;
        }

        if (!strncasecmp("rtsp://", url, 7)) {
            // "url" is already an absolute URL, ignore base URL.
            out->setTo(url);
            return true;
        }

        size_t n = strlen(baseURL);
        out->setTo(baseURL);
        if (baseURL[n - 1] != '/') {
            out->append("/");
        }
        out->append(url);

        return true;
    }

    void fakeTimestamps() {
        mNTPAnchorUs = -1ll;
        for (size_t i = 0; i < mTracks.size(); ++i) {
            onTimeUpdate(i, 0, 0ll);
        }
    }

    bool dataReceivedOnAllChannels() {
        TrackInfo *track;
        for (size_t i = 0; i < mTracks.size(); ++i) {
            track = &mTracks.editItemAt(i);
            if (track->mPackets.empty()) {
                return false;
            }
        }
        return true;
    }

    void handleFirstAccessUnit() {
        if (mFirstAccessUnit) {
            sp<AMessage> msg = mNotify->dup();
            msg->setInt32("what", kWhatConnected);
            msg->post();

            if (mSeekable) {
                for (size_t i = 0; i < mTracks.size(); ++i) {
                    TrackInfo *info = &mTracks.editItemAt(i);

                    postNormalPlayTimeMapping(
                            i,
                            info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
                }
            }

            mFirstAccessUnit = false;
        }
    }

    void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
        ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
             trackIndex, rtpTime, (long long)ntpTime);

        int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));

        TrackInfo *track = &mTracks.editItemAt(trackIndex);

        track->mRTPAnchor = rtpTime;
        track->mNTPAnchorUs = ntpTimeUs;

        if (mNTPAnchorUs < 0) {
            mNTPAnchorUs = ntpTimeUs;
            mMediaAnchorUs = mLastMediaTimeUs;
        }

        if (!mAllTracksHaveTime) {
            bool allTracksHaveTime = (mTracks.size() > 0);
            for (size_t i = 0; i < mTracks.size(); ++i) {
                TrackInfo *track = &mTracks.editItemAt(i);
                if (track->mNTPAnchorUs < 0) {
                    allTracksHaveTime = false;
                    break;
                }
            }
            if (allTracksHaveTime) {
                mAllTracksHaveTime = true;
                ALOGI("Time now established for all tracks.");
            }
        }
        if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
            handleFirstAccessUnit();

            // Time is now established, lets start timestamping immediately
            for (size_t i = 0; i < mTracks.size(); ++i) {
                TrackInfo *trackInfo = &mTracks.editItemAt(i);
                while (!trackInfo->mPackets.empty()) {
                    sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
                    trackInfo->mPackets.erase(trackInfo->mPackets.begin());

                    if (addMediaTimestamp(i, trackInfo, accessUnit)) {
                        postQueueAccessUnit(i, accessUnit);
                    }
                }
            }
            for (size_t i = 0; i < mTracks.size(); ++i) {
                TrackInfo *trackInfo = &mTracks.editItemAt(i);
                if (trackInfo->mEOSReceived) {
                    postQueueEOS(i, ERROR_END_OF_STREAM);
                    trackInfo->mEOSReceived = false;
                }
            }
        }
    }

    void onAccessUnitComplete(
            int32_t trackIndex, const sp<ABuffer> &accessUnit) {
        ALOGV("onAccessUnitComplete track %d", trackIndex);

        if(!mPlayResponseParsed){
            ALOGI("play response is not parsed, storing accessunit");
            TrackInfo *track = &mTracks.editItemAt(trackIndex);
            track->mPackets.push_back(accessUnit);
            return;
        }

        handleFirstAccessUnit();

        TrackInfo *track = &mTracks.editItemAt(trackIndex);

        if (!mAllTracksHaveTime) {
            ALOGV("storing accessUnit, no time established yet");
            track->mPackets.push_back(accessUnit);
            return;
        }

        while (!track->mPackets.empty()) {
            sp<ABuffer> accessUnit = *track->mPackets.begin();
            track->mPackets.erase(track->mPackets.begin());

            if (addMediaTimestamp(trackIndex, track, accessUnit)) {
                postQueueAccessUnit(trackIndex, accessUnit);
            }
        }

        if (addMediaTimestamp(trackIndex, track, accessUnit)) {
            postQueueAccessUnit(trackIndex, accessUnit);
        }

        if (track->mEOSReceived) {
            postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
            track->mEOSReceived = false;
        }
    }

    bool addMediaTimestamp(
            int32_t trackIndex, const TrackInfo *track,
            const sp<ABuffer> &accessUnit) {
        UNUSED_UNLESS_VERBOSE(trackIndex);

        uint32_t rtpTime;
        CHECK(accessUnit->meta()->findInt32(
                    "rtp-time", (int32_t *)&rtpTime));

        int64_t relRtpTimeUs =
            (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
                / track->mTimeScale;

        int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;

        int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;

        if (mediaTimeUs > mLastMediaTimeUs) {
            mLastMediaTimeUs = mediaTimeUs;
        }

        if (mediaTimeUs < 0) {
            ALOGV("dropping early accessUnit.");
            return false;
        }

        ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
             trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);

        accessUnit->meta()->setInt64("timeUs", mediaTimeUs);

        return true;
    }

    void postQueueAccessUnit(
            size_t trackIndex, const sp<ABuffer> &accessUnit) {
        sp<AMessage> msg = mNotify->dup();
        msg->setInt32("what", kWhatAccessUnit);
        msg->setSize("trackIndex", trackIndex);
        msg->setBuffer("accessUnit", accessUnit);
        msg->post();
    }

    void postQueueEOS(size_t trackIndex, status_t finalResult) {
        sp<AMessage> msg = mNotify->dup();
        msg->setInt32("what", kWhatEOS);
        msg->setSize("trackIndex", trackIndex);
        msg->setInt32("finalResult", finalResult);
        msg->post();
    }

    void postQueueSeekDiscontinuity(size_t trackIndex) {
        sp<AMessage> msg = mNotify->dup();
        msg->setInt32("what", kWhatSeekDiscontinuity);
        msg->setSize("trackIndex", trackIndex);
        msg->post();
    }

    void postNormalPlayTimeMapping(
            size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
        sp<AMessage> msg = mNotify->dup();
        msg->setInt32("what", kWhatNormalPlayTimeMapping);
        msg->setSize("trackIndex", trackIndex);
        msg->setInt32("rtpTime", rtpTime);
        msg->setInt64("nptUs", nptUs);
        msg->post();
    }

    DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
};

}  // namespace android

#endif  // MY_HANDLER_H_
