• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 #define LOG_TAG "MyHandler"
23 #include <utils/Log.h>
24 
25 #include "APacketSource.h"
26 #include "ARTPConnection.h"
27 #include "ARTSPConnection.h"
28 #include "ASessionDescription.h"
29 
30 #include <ctype.h>
31 #include <cutils/properties.h>
32 
33 #include <media/stagefright/foundation/ABuffer.h>
34 #include <media/stagefright/foundation/ADebug.h>
35 #include <media/stagefright/foundation/ALooper.h>
36 #include <media/stagefright/foundation/AMessage.h>
37 #include <media/stagefright/MediaErrors.h>
38 
39 #include <arpa/inet.h>
40 #include <sys/socket.h>
41 #include <netdb.h>
42 
43 #include "HTTPBase.h"
44 
45 // If no access units are received within 5 secs, assume that the rtp
46 // stream has ended and signal end of stream.
47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
48 
49 // If no access units arrive for the first 10 secs after starting the
50 // stream, assume none ever will and signal EOS or switch transports.
51 static int64_t kStartupTimeoutUs = 10000000ll;
52 
53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54 
55 namespace android {
56 
MakeUserAgentString(AString * s)57 static void MakeUserAgentString(AString *s) {
58     s->setTo("stagefright/1.1 (Linux;Android ");
59 
60 #if (PROPERTY_VALUE_MAX < 8)
61 #error "PROPERTY_VALUE_MAX must be at least 8"
62 #endif
63 
64     char value[PROPERTY_VALUE_MAX];
65     property_get("ro.build.version.release", value, "Unknown");
66     s->append(value);
67     s->append(")");
68 }
69 
GetAttribute(const char * s,const char * key,AString * value)70 static bool GetAttribute(const char *s, const char *key, AString *value) {
71     value->clear();
72 
73     size_t keyLen = strlen(key);
74 
75     for (;;) {
76         while (isspace(*s)) {
77             ++s;
78         }
79 
80         const char *colonPos = strchr(s, ';');
81 
82         size_t len =
83             (colonPos == NULL) ? strlen(s) : colonPos - s;
84 
85         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
86             value->setTo(&s[keyLen + 1], len - keyLen - 1);
87             return true;
88         }
89 
90         if (colonPos == NULL) {
91             return false;
92         }
93 
94         s = colonPos + 1;
95     }
96 }
97 
98 struct MyHandler : public AHandler {
99     enum {
100         kWhatConnected                  = 'conn',
101         kWhatDisconnected               = 'disc',
102         kWhatSeekDone                   = 'sdon',
103 
104         kWhatAccessUnit                 = 'accU',
105         kWhatEOS                        = 'eos!',
106         kWhatSeekDiscontinuity          = 'seeD',
107         kWhatNormalPlayTimeMapping      = 'nptM',
108     };
109 
110     MyHandler(
111             const char *url,
112             const sp<AMessage> &notify,
113             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler114         : mNotify(notify),
115           mUIDValid(uidValid),
116           mUID(uid),
117           mNetLooper(new ALooper),
118           mConn(new ARTSPConnection(mUIDValid, mUID)),
119           mRTPConn(new ARTPConnection),
120           mOriginalSessionURL(url),
121           mSessionURL(url),
122           mSetupTracksSuccessful(false),
123           mSeekPending(false),
124           mFirstAccessUnit(true),
125           mAllTracksHaveTime(false),
126           mNTPAnchorUs(-1),
127           mMediaAnchorUs(-1),
128           mLastMediaTimeUs(0),
129           mNumAccessUnitsReceived(0),
130           mCheckPending(false),
131           mCheckGeneration(0),
132           mTryTCPInterleaving(false),
133           mTryFakeRTCP(false),
134           mReceivedFirstRTCPPacket(false),
135           mReceivedFirstRTPPacket(false),
136           mSeekable(false),
137           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
138           mKeepAliveGeneration(0) {
139         mNetLooper->setName("rtsp net");
140         mNetLooper->start(false /* runOnCallingThread */,
141                           false /* canCallJava */,
142                           PRIORITY_HIGHEST);
143 
144         // Strip any authentication info from the session url, we don't
145         // want to transmit user/pass in cleartext.
146         AString host, path, user, pass;
147         unsigned port;
148         CHECK(ARTSPConnection::ParseURL(
149                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
150 
151         if (user.size() > 0) {
152             mSessionURL.clear();
153             mSessionURL.append("rtsp://");
154             mSessionURL.append(host);
155             mSessionURL.append(":");
156             mSessionURL.append(StringPrintf("%u", port));
157             mSessionURL.append(path);
158 
159             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
160         }
161 
162         mSessionHost = host;
163     }
164 
connectMyHandler165     void connect() {
166         looper()->registerHandler(mConn);
167         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
168 
169         sp<AMessage> notify = new AMessage('biny', id());
170         mConn->observeBinaryData(notify);
171 
172         sp<AMessage> reply = new AMessage('conn', id());
173         mConn->connect(mOriginalSessionURL.c_str(), reply);
174     }
175 
disconnectMyHandler176     void disconnect() {
177         (new AMessage('abor', id()))->post();
178     }
179 
seekMyHandler180     void seek(int64_t timeUs) {
181         sp<AMessage> msg = new AMessage('seek', id());
182         msg->setInt64("time", timeUs);
183         msg->post();
184     }
185 
addRRMyHandler186     static void addRR(const sp<ABuffer> &buf) {
187         uint8_t *ptr = buf->data() + buf->size();
188         ptr[0] = 0x80 | 0;
189         ptr[1] = 201;  // RR
190         ptr[2] = 0;
191         ptr[3] = 1;
192         ptr[4] = 0xde;  // SSRC
193         ptr[5] = 0xad;
194         ptr[6] = 0xbe;
195         ptr[7] = 0xef;
196 
197         buf->setRange(0, buf->size() + 8);
198     }
199 
addSDESMyHandler200     static void addSDES(int s, const sp<ABuffer> &buffer) {
201         struct sockaddr_in addr;
202         socklen_t addrSize = sizeof(addr);
203         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
204 
205         uint8_t *data = buffer->data() + buffer->size();
206         data[0] = 0x80 | 1;
207         data[1] = 202;  // SDES
208         data[4] = 0xde;  // SSRC
209         data[5] = 0xad;
210         data[6] = 0xbe;
211         data[7] = 0xef;
212 
213         size_t offset = 8;
214 
215         data[offset++] = 1;  // CNAME
216 
217         AString cname = "stagefright@";
218         cname.append(inet_ntoa(addr.sin_addr));
219         data[offset++] = cname.size();
220 
221         memcpy(&data[offset], cname.c_str(), cname.size());
222         offset += cname.size();
223 
224         data[offset++] = 6;  // TOOL
225 
226         AString tool;
227         MakeUserAgentString(&tool);
228 
229         data[offset++] = tool.size();
230 
231         memcpy(&data[offset], tool.c_str(), tool.size());
232         offset += tool.size();
233 
234         data[offset++] = 0;
235 
236         if ((offset % 4) > 0) {
237             size_t count = 4 - (offset % 4);
238             switch (count) {
239                 case 3:
240                     data[offset++] = 0;
241                 case 2:
242                     data[offset++] = 0;
243                 case 1:
244                     data[offset++] = 0;
245             }
246         }
247 
248         size_t numWords = (offset / 4) - 1;
249         data[2] = numWords >> 8;
250         data[3] = numWords & 0xff;
251 
252         buffer->setRange(buffer->offset(), buffer->size() + offset);
253     }
254 
255     // In case we're behind NAT, fire off two UDP packets to the remote
256     // rtp/rtcp ports to poke a hole into the firewall for future incoming
257     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler258     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
259         struct sockaddr_in addr;
260         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
261         addr.sin_family = AF_INET;
262 
263         AString source;
264         AString server_port;
265         if (!GetAttribute(transport.c_str(),
266                           "source",
267                           &source)) {
268             ALOGW("Missing 'source' field in Transport response. Using "
269                  "RTSP endpoint address.");
270 
271             struct hostent *ent = gethostbyname(mSessionHost.c_str());
272             if (ent == NULL) {
273                 ALOGE("Failed to look up address of session host '%s'",
274                      mSessionHost.c_str());
275 
276                 return false;
277             }
278 
279             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
280         } else {
281             addr.sin_addr.s_addr = inet_addr(source.c_str());
282         }
283 
284         if (!GetAttribute(transport.c_str(),
285                                  "server_port",
286                                  &server_port)) {
287             ALOGI("Missing 'server_port' field in Transport response.");
288             return false;
289         }
290 
291         int rtpPort, rtcpPort;
292         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
293                 || rtpPort <= 0 || rtpPort > 65535
294                 || rtcpPort <=0 || rtcpPort > 65535
295                 || rtcpPort != rtpPort + 1) {
296             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
297                  " RTP port must be even, RTCP port must be one higher.",
298                  server_port.c_str());
299 
300             return false;
301         }
302 
303         if (rtpPort & 1) {
304             ALOGW("Server picked an odd RTP port, it should've picked an "
305                  "even one, we'll let it pass for now, but this may break "
306                  "in the future.");
307         }
308 
309         if (addr.sin_addr.s_addr == INADDR_NONE) {
310             return true;
311         }
312 
313         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
314             // No firewalls to traverse on the loopback interface.
315             return true;
316         }
317 
318         // Make up an RR/SDES RTCP packet.
319         sp<ABuffer> buf = new ABuffer(65536);
320         buf->setRange(0, 0);
321         addRR(buf);
322         addSDES(rtpSocket, buf);
323 
324         addr.sin_port = htons(rtpPort);
325 
326         ssize_t n = sendto(
327                 rtpSocket, buf->data(), buf->size(), 0,
328                 (const sockaddr *)&addr, sizeof(addr));
329 
330         if (n < (ssize_t)buf->size()) {
331             ALOGE("failed to poke a hole for RTP packets");
332             return false;
333         }
334 
335         addr.sin_port = htons(rtcpPort);
336 
337         n = sendto(
338                 rtcpSocket, buf->data(), buf->size(), 0,
339                 (const sockaddr *)&addr, sizeof(addr));
340 
341         if (n < (ssize_t)buf->size()) {
342             ALOGE("failed to poke a hole for RTCP packets");
343             return false;
344         }
345 
346         ALOGV("successfully poked holes.");
347 
348         return true;
349     }
350 
onMessageReceivedMyHandler351     virtual void onMessageReceived(const sp<AMessage> &msg) {
352         switch (msg->what()) {
353             case 'conn':
354             {
355                 int32_t result;
356                 CHECK(msg->findInt32("result", &result));
357 
358                 ALOGI("connection request completed with result %d (%s)",
359                      result, strerror(-result));
360 
361                 if (result == OK) {
362                     AString request;
363                     request = "DESCRIBE ";
364                     request.append(mSessionURL);
365                     request.append(" RTSP/1.0\r\n");
366                     request.append("Accept: application/sdp\r\n");
367                     request.append("\r\n");
368 
369                     sp<AMessage> reply = new AMessage('desc', id());
370                     mConn->sendRequest(request.c_str(), reply);
371                 } else {
372                     (new AMessage('disc', id()))->post();
373                 }
374                 break;
375             }
376 
377             case 'disc':
378             {
379                 ++mKeepAliveGeneration;
380 
381                 int32_t reconnect;
382                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
383                     sp<AMessage> reply = new AMessage('conn', id());
384                     mConn->connect(mOriginalSessionURL.c_str(), reply);
385                 } else {
386                     (new AMessage('quit', id()))->post();
387                 }
388                 break;
389             }
390 
391             case 'desc':
392             {
393                 int32_t result;
394                 CHECK(msg->findInt32("result", &result));
395 
396                 ALOGI("DESCRIBE completed with result %d (%s)",
397                      result, strerror(-result));
398 
399                 if (result == OK) {
400                     sp<RefBase> obj;
401                     CHECK(msg->findObject("response", &obj));
402                     sp<ARTSPResponse> response =
403                         static_cast<ARTSPResponse *>(obj.get());
404 
405                     if (response->mStatusCode == 302) {
406                         ssize_t i = response->mHeaders.indexOfKey("location");
407                         CHECK_GE(i, 0);
408 
409                         mSessionURL = response->mHeaders.valueAt(i);
410 
411                         AString request;
412                         request = "DESCRIBE ";
413                         request.append(mSessionURL);
414                         request.append(" RTSP/1.0\r\n");
415                         request.append("Accept: application/sdp\r\n");
416                         request.append("\r\n");
417 
418                         sp<AMessage> reply = new AMessage('desc', id());
419                         mConn->sendRequest(request.c_str(), reply);
420                         break;
421                     }
422 
423                     if (response->mStatusCode != 200) {
424                         result = UNKNOWN_ERROR;
425                     } else {
426                         mSessionDesc = new ASessionDescription;
427 
428                         mSessionDesc->setTo(
429                                 response->mContent->data(),
430                                 response->mContent->size());
431 
432                         if (!mSessionDesc->isValid()) {
433                             ALOGE("Failed to parse session description.");
434                             result = ERROR_MALFORMED;
435                         } else {
436                             ssize_t i = response->mHeaders.indexOfKey("content-base");
437                             if (i >= 0) {
438                                 mBaseURL = response->mHeaders.valueAt(i);
439                             } else {
440                                 i = response->mHeaders.indexOfKey("content-location");
441                                 if (i >= 0) {
442                                     mBaseURL = response->mHeaders.valueAt(i);
443                                 } else {
444                                     mBaseURL = mSessionURL;
445                                 }
446                             }
447 
448                             if (!mBaseURL.startsWith("rtsp://")) {
449                                 // Some misbehaving servers specify a relative
450                                 // URL in one of the locations above, combine
451                                 // it with the absolute session URL to get
452                                 // something usable...
453 
454                                 ALOGW("Server specified a non-absolute base URL"
455                                      ", combining it with the session URL to "
456                                      "get something usable...");
457 
458                                 AString tmp;
459                                 CHECK(MakeURL(
460                                             mSessionURL.c_str(),
461                                             mBaseURL.c_str(),
462                                             &tmp));
463 
464                                 mBaseURL = tmp;
465                             }
466 
467                             if (mSessionDesc->countTracks() < 2) {
468                                 // There's no actual tracks in this session.
469                                 // The first "track" is merely session meta
470                                 // data.
471 
472                                 ALOGW("Session doesn't contain any playable "
473                                      "tracks. Aborting.");
474                                 result = ERROR_UNSUPPORTED;
475                             } else {
476                                 setupTrack(1);
477                             }
478                         }
479                     }
480                 }
481 
482                 if (result != OK) {
483                     sp<AMessage> reply = new AMessage('disc', id());
484                     mConn->disconnect(reply);
485                 }
486                 break;
487             }
488 
489             case 'setu':
490             {
491                 size_t index;
492                 CHECK(msg->findSize("index", &index));
493 
494                 TrackInfo *track = NULL;
495                 size_t trackIndex;
496                 if (msg->findSize("track-index", &trackIndex)) {
497                     track = &mTracks.editItemAt(trackIndex);
498                 }
499 
500                 int32_t result;
501                 CHECK(msg->findInt32("result", &result));
502 
503                 ALOGI("SETUP(%d) completed with result %d (%s)",
504                      index, result, strerror(-result));
505 
506                 if (result == OK) {
507                     CHECK(track != NULL);
508 
509                     sp<RefBase> obj;
510                     CHECK(msg->findObject("response", &obj));
511                     sp<ARTSPResponse> response =
512                         static_cast<ARTSPResponse *>(obj.get());
513 
514                     if (response->mStatusCode != 200) {
515                         result = UNKNOWN_ERROR;
516                     } else {
517                         ssize_t i = response->mHeaders.indexOfKey("session");
518                         CHECK_GE(i, 0);
519 
520                         mSessionID = response->mHeaders.valueAt(i);
521 
522                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
523                         AString timeoutStr;
524                         if (GetAttribute(
525                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
526                             char *end;
527                             unsigned long timeoutSecs =
528                                 strtoul(timeoutStr.c_str(), &end, 10);
529 
530                             if (end == timeoutStr.c_str() || *end != '\0') {
531                                 ALOGW("server specified malformed timeout '%s'",
532                                      timeoutStr.c_str());
533 
534                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
535                             } else if (timeoutSecs < 15) {
536                                 ALOGW("server specified too short a timeout "
537                                      "(%lu secs), using default.",
538                                      timeoutSecs);
539 
540                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
541                             } else {
542                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
543 
544                                 ALOGI("server specified timeout of %lu secs.",
545                                      timeoutSecs);
546                             }
547                         }
548 
549                         i = mSessionID.find(";");
550                         if (i >= 0) {
551                             // Remove options, i.e. ";timeout=90"
552                             mSessionID.erase(i, mSessionID.size() - i);
553                         }
554 
555                         sp<AMessage> notify = new AMessage('accu', id());
556                         notify->setSize("track-index", trackIndex);
557 
558                         i = response->mHeaders.indexOfKey("transport");
559                         CHECK_GE(i, 0);
560 
561                         if (!track->mUsingInterleavedTCP) {
562                             AString transport = response->mHeaders.valueAt(i);
563 
564                             // We are going to continue even if we were
565                             // unable to poke a hole into the firewall...
566                             pokeAHole(
567                                     track->mRTPSocket,
568                                     track->mRTCPSocket,
569                                     transport);
570                         }
571 
572                         mRTPConn->addStream(
573                                 track->mRTPSocket, track->mRTCPSocket,
574                                 mSessionDesc, index,
575                                 notify, track->mUsingInterleavedTCP);
576 
577                         mSetupTracksSuccessful = true;
578                     }
579                 }
580 
581                 if (result != OK) {
582                     if (track) {
583                         if (!track->mUsingInterleavedTCP) {
584                             // Clear the tag
585                             if (mUIDValid) {
586                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
587                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
588                             }
589 
590                             close(track->mRTPSocket);
591                             close(track->mRTCPSocket);
592                         }
593 
594                         mTracks.removeItemsAt(trackIndex);
595                     }
596                 }
597 
598                 ++index;
599                 if (index < mSessionDesc->countTracks()) {
600                     setupTrack(index);
601                 } else if (mSetupTracksSuccessful) {
602                     ++mKeepAliveGeneration;
603                     postKeepAlive();
604 
605                     AString request = "PLAY ";
606                     request.append(mSessionURL);
607                     request.append(" RTSP/1.0\r\n");
608 
609                     request.append("Session: ");
610                     request.append(mSessionID);
611                     request.append("\r\n");
612 
613                     request.append("\r\n");
614 
615                     sp<AMessage> reply = new AMessage('play', id());
616                     mConn->sendRequest(request.c_str(), reply);
617                 } else {
618                     sp<AMessage> reply = new AMessage('disc', id());
619                     mConn->disconnect(reply);
620                 }
621                 break;
622             }
623 
624             case 'play':
625             {
626                 int32_t result;
627                 CHECK(msg->findInt32("result", &result));
628 
629                 ALOGI("PLAY completed with result %d (%s)",
630                      result, strerror(-result));
631 
632                 if (result == OK) {
633                     sp<RefBase> obj;
634                     CHECK(msg->findObject("response", &obj));
635                     sp<ARTSPResponse> response =
636                         static_cast<ARTSPResponse *>(obj.get());
637 
638                     if (response->mStatusCode != 200) {
639                         result = UNKNOWN_ERROR;
640                     } else {
641                         parsePlayResponse(response);
642 
643                         sp<AMessage> timeout = new AMessage('tiou', id());
644                         timeout->post(kStartupTimeoutUs);
645                     }
646                 }
647 
648                 if (result != OK) {
649                     sp<AMessage> reply = new AMessage('disc', id());
650                     mConn->disconnect(reply);
651                 }
652 
653                 break;
654             }
655 
656             case 'aliv':
657             {
658                 int32_t generation;
659                 CHECK(msg->findInt32("generation", &generation));
660 
661                 if (generation != mKeepAliveGeneration) {
662                     // obsolete event.
663                     break;
664                 }
665 
666                 AString request;
667                 request.append("OPTIONS ");
668                 request.append(mSessionURL);
669                 request.append(" RTSP/1.0\r\n");
670                 request.append("Session: ");
671                 request.append(mSessionID);
672                 request.append("\r\n");
673                 request.append("\r\n");
674 
675                 sp<AMessage> reply = new AMessage('opts', id());
676                 reply->setInt32("generation", mKeepAliveGeneration);
677                 mConn->sendRequest(request.c_str(), reply);
678                 break;
679             }
680 
681             case 'opts':
682             {
683                 int32_t result;
684                 CHECK(msg->findInt32("result", &result));
685 
686                 ALOGI("OPTIONS completed with result %d (%s)",
687                      result, strerror(-result));
688 
689                 int32_t generation;
690                 CHECK(msg->findInt32("generation", &generation));
691 
692                 if (generation != mKeepAliveGeneration) {
693                     // obsolete event.
694                     break;
695                 }
696 
697                 postKeepAlive();
698                 break;
699             }
700 
701             case 'abor':
702             {
703                 for (size_t i = 0; i < mTracks.size(); ++i) {
704                     TrackInfo *info = &mTracks.editItemAt(i);
705 
706                     if (!mFirstAccessUnit) {
707                         postQueueEOS(i, ERROR_END_OF_STREAM);
708                     }
709 
710                     if (!info->mUsingInterleavedTCP) {
711                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
712 
713                         // Clear the tag
714                         if (mUIDValid) {
715                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
716                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
717                         }
718 
719                         close(info->mRTPSocket);
720                         close(info->mRTCPSocket);
721                     }
722                 }
723                 mTracks.clear();
724                 mSetupTracksSuccessful = false;
725                 mSeekPending = false;
726                 mFirstAccessUnit = true;
727                 mAllTracksHaveTime = false;
728                 mNTPAnchorUs = -1;
729                 mMediaAnchorUs = -1;
730                 mNumAccessUnitsReceived = 0;
731                 mReceivedFirstRTCPPacket = false;
732                 mReceivedFirstRTPPacket = false;
733                 mSeekable = false;
734 
735                 sp<AMessage> reply = new AMessage('tear', id());
736 
737                 int32_t reconnect;
738                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
739                     reply->setInt32("reconnect", true);
740                 }
741 
742                 AString request;
743                 request = "TEARDOWN ";
744 
745                 // XXX should use aggregate url from SDP here...
746                 request.append(mSessionURL);
747                 request.append(" RTSP/1.0\r\n");
748 
749                 request.append("Session: ");
750                 request.append(mSessionID);
751                 request.append("\r\n");
752 
753                 request.append("\r\n");
754 
755                 mConn->sendRequest(request.c_str(), reply);
756                 break;
757             }
758 
759             case 'tear':
760             {
761                 int32_t result;
762                 CHECK(msg->findInt32("result", &result));
763 
764                 ALOGI("TEARDOWN completed with result %d (%s)",
765                      result, strerror(-result));
766 
767                 sp<AMessage> reply = new AMessage('disc', id());
768 
769                 int32_t reconnect;
770                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
771                     reply->setInt32("reconnect", true);
772                 }
773 
774                 mConn->disconnect(reply);
775                 break;
776             }
777 
778             case 'quit':
779             {
780                 sp<AMessage> msg = mNotify->dup();
781                 msg->setInt32("what", kWhatDisconnected);
782                 msg->setInt32("result", UNKNOWN_ERROR);
783                 msg->post();
784                 break;
785             }
786 
787             case 'chek':
788             {
789                 int32_t generation;
790                 CHECK(msg->findInt32("generation", &generation));
791                 if (generation != mCheckGeneration) {
792                     // This is an outdated message. Ignore.
793                     break;
794                 }
795 
796                 if (mNumAccessUnitsReceived == 0) {
797 #if 1
798                     ALOGI("stream ended? aborting.");
799                     (new AMessage('abor', id()))->post();
800                     break;
801 #else
802                     ALOGI("haven't seen an AU in a looong time.");
803 #endif
804                 }
805 
806                 mNumAccessUnitsReceived = 0;
807                 msg->post(kAccessUnitTimeoutUs);
808                 break;
809             }
810 
811             case 'accu':
812             {
813                 int32_t timeUpdate;
814                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
815                     size_t trackIndex;
816                     CHECK(msg->findSize("track-index", &trackIndex));
817 
818                     uint32_t rtpTime;
819                     uint64_t ntpTime;
820                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
821                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
822 
823                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
824                     break;
825                 }
826 
827                 int32_t first;
828                 if (msg->findInt32("first-rtcp", &first)) {
829                     mReceivedFirstRTCPPacket = true;
830                     break;
831                 }
832 
833                 if (msg->findInt32("first-rtp", &first)) {
834                     mReceivedFirstRTPPacket = true;
835                     break;
836                 }
837 
838                 ++mNumAccessUnitsReceived;
839                 postAccessUnitTimeoutCheck();
840 
841                 size_t trackIndex;
842                 CHECK(msg->findSize("track-index", &trackIndex));
843 
844                 if (trackIndex >= mTracks.size()) {
845                     ALOGV("late packets ignored.");
846                     break;
847                 }
848 
849                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
850 
851                 int32_t eos;
852                 if (msg->findInt32("eos", &eos)) {
853                     ALOGI("received BYE on track index %d", trackIndex);
854 #if 0
855                     track->mPacketSource->signalEOS(ERROR_END_OF_STREAM);
856 #endif
857                     return;
858                 }
859 
860                 sp<ABuffer> accessUnit;
861                 CHECK(msg->findBuffer("access-unit", &accessUnit));
862 
863                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
864 
865                 if (mSeekPending) {
866                     ALOGV("we're seeking, dropping stale packet.");
867                     break;
868                 }
869 
870                 if (seqNum < track->mFirstSeqNumInSegment) {
871                     ALOGV("dropping stale access-unit (%d < %d)",
872                          seqNum, track->mFirstSeqNumInSegment);
873                     break;
874                 }
875 
876                 if (track->mNewSegment) {
877                     track->mNewSegment = false;
878                 }
879 
880                 onAccessUnitComplete(trackIndex, accessUnit);
881                 break;
882             }
883 
884             case 'seek':
885             {
886                 if (!mSeekable) {
887                     ALOGW("This is a live stream, ignoring seek request.");
888 
889                     sp<AMessage> msg = mNotify->dup();
890                     msg->setInt32("what", kWhatSeekDone);
891                     msg->post();
892                     break;
893                 }
894 
895                 int64_t timeUs;
896                 CHECK(msg->findInt64("time", &timeUs));
897 
898                 mSeekPending = true;
899 
900                 // Disable the access unit timeout until we resumed
901                 // playback again.
902                 mCheckPending = true;
903                 ++mCheckGeneration;
904 
905                 AString request = "PAUSE ";
906                 request.append(mSessionURL);
907                 request.append(" RTSP/1.0\r\n");
908 
909                 request.append("Session: ");
910                 request.append(mSessionID);
911                 request.append("\r\n");
912 
913                 request.append("\r\n");
914 
915                 sp<AMessage> reply = new AMessage('see1', id());
916                 reply->setInt64("time", timeUs);
917                 mConn->sendRequest(request.c_str(), reply);
918                 break;
919             }
920 
921             case 'see1':
922             {
923                 // Session is paused now.
924                 for (size_t i = 0; i < mTracks.size(); ++i) {
925                     TrackInfo *info = &mTracks.editItemAt(i);
926 
927                     postQueueSeekDiscontinuity(i);
928 
929                     info->mRTPAnchor = 0;
930                     info->mNTPAnchorUs = -1;
931                 }
932 
933                 mAllTracksHaveTime = false;
934                 mNTPAnchorUs = -1;
935 
936                 int64_t timeUs;
937                 CHECK(msg->findInt64("time", &timeUs));
938 
939                 AString request = "PLAY ";
940                 request.append(mSessionURL);
941                 request.append(" RTSP/1.0\r\n");
942 
943                 request.append("Session: ");
944                 request.append(mSessionID);
945                 request.append("\r\n");
946 
947                 request.append(
948                         StringPrintf(
949                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
950 
951                 request.append("\r\n");
952 
953                 sp<AMessage> reply = new AMessage('see2', id());
954                 mConn->sendRequest(request.c_str(), reply);
955                 break;
956             }
957 
958             case 'see2':
959             {
960                 CHECK(mSeekPending);
961 
962                 int32_t result;
963                 CHECK(msg->findInt32("result", &result));
964 
965                 ALOGI("PLAY completed with result %d (%s)",
966                      result, strerror(-result));
967 
968                 mCheckPending = false;
969                 postAccessUnitTimeoutCheck();
970 
971                 if (result == OK) {
972                     sp<RefBase> obj;
973                     CHECK(msg->findObject("response", &obj));
974                     sp<ARTSPResponse> response =
975                         static_cast<ARTSPResponse *>(obj.get());
976 
977                     if (response->mStatusCode != 200) {
978                         result = UNKNOWN_ERROR;
979                     } else {
980                         parsePlayResponse(response);
981 
982                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
983                         CHECK_GE(i, 0);
984 
985                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
986 
987                         ALOGI("seek completed.");
988                     }
989                 }
990 
991                 if (result != OK) {
992                     ALOGE("seek failed, aborting.");
993                     (new AMessage('abor', id()))->post();
994                 }
995 
996                 mSeekPending = false;
997 
998                 sp<AMessage> msg = mNotify->dup();
999                 msg->setInt32("what", kWhatSeekDone);
1000                 msg->post();
1001                 break;
1002             }
1003 
1004             case 'biny':
1005             {
1006                 sp<ABuffer> buffer;
1007                 CHECK(msg->findBuffer("buffer", &buffer));
1008 
1009                 int32_t index;
1010                 CHECK(buffer->meta()->findInt32("index", &index));
1011 
1012                 mRTPConn->injectPacket(index, buffer);
1013                 break;
1014             }
1015 
1016             case 'tiou':
1017             {
1018                 if (!mReceivedFirstRTCPPacket) {
1019                     if (mReceivedFirstRTPPacket && !mTryFakeRTCP) {
1020                         ALOGW("We received RTP packets but no RTCP packets, "
1021                              "using fake timestamps.");
1022 
1023                         mTryFakeRTCP = true;
1024 
1025                         mReceivedFirstRTCPPacket = true;
1026 
1027                         fakeTimestamps();
1028                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1029                         ALOGW("Never received any data, switching transports.");
1030 
1031                         mTryTCPInterleaving = true;
1032 
1033                         sp<AMessage> msg = new AMessage('abor', id());
1034                         msg->setInt32("reconnect", true);
1035                         msg->post();
1036                     } else {
1037                         ALOGW("Never received any data, disconnecting.");
1038                         (new AMessage('abor', id()))->post();
1039                     }
1040                 } else {
1041                     if (!mAllTracksHaveTime) {
1042                         ALOGW("We received some RTCP packets, but time "
1043                               "could not be established on all tracks, now "
1044                               "using fake timestamps");
1045 
1046                         fakeTimestamps();
1047                     }
1048                 }
1049                 break;
1050             }
1051 
1052             default:
1053                 TRESPASS();
1054                 break;
1055         }
1056     }
1057 
postKeepAliveMyHandler1058     void postKeepAlive() {
1059         sp<AMessage> msg = new AMessage('aliv', id());
1060         msg->setInt32("generation", mKeepAliveGeneration);
1061         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1062     }
1063 
postAccessUnitTimeoutCheckMyHandler1064     void postAccessUnitTimeoutCheck() {
1065         if (mCheckPending) {
1066             return;
1067         }
1068 
1069         mCheckPending = true;
1070         sp<AMessage> check = new AMessage('chek', id());
1071         check->setInt32("generation", mCheckGeneration);
1072         check->post(kAccessUnitTimeoutUs);
1073     }
1074 
SplitStringMyHandler1075     static void SplitString(
1076             const AString &s, const char *separator, List<AString> *items) {
1077         items->clear();
1078         size_t start = 0;
1079         while (start < s.size()) {
1080             ssize_t offset = s.find(separator, start);
1081 
1082             if (offset < 0) {
1083                 items->push_back(AString(s, start, s.size() - start));
1084                 break;
1085             }
1086 
1087             items->push_back(AString(s, start, offset - start));
1088             start = offset + strlen(separator);
1089         }
1090     }
1091 
parsePlayResponseMyHandler1092     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1093         mSeekable = false;
1094 
1095         ssize_t i = response->mHeaders.indexOfKey("range");
1096         if (i < 0) {
1097             // Server doesn't even tell use what range it is going to
1098             // play, therefore we won't support seeking.
1099             return;
1100         }
1101 
1102         AString range = response->mHeaders.valueAt(i);
1103         ALOGV("Range: %s", range.c_str());
1104 
1105         AString val;
1106         CHECK(GetAttribute(range.c_str(), "npt", &val));
1107 
1108         float npt1, npt2;
1109         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1110             // This is a live stream and therefore not seekable.
1111 
1112             ALOGI("This is a live stream");
1113             return;
1114         }
1115 
1116         i = response->mHeaders.indexOfKey("rtp-info");
1117         CHECK_GE(i, 0);
1118 
1119         AString rtpInfo = response->mHeaders.valueAt(i);
1120         List<AString> streamInfos;
1121         SplitString(rtpInfo, ",", &streamInfos);
1122 
1123         int n = 1;
1124         for (List<AString>::iterator it = streamInfos.begin();
1125              it != streamInfos.end(); ++it) {
1126             (*it).trim();
1127             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1128 
1129             CHECK(GetAttribute((*it).c_str(), "url", &val));
1130 
1131             size_t trackIndex = 0;
1132             while (trackIndex < mTracks.size()
1133                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1134                 ++trackIndex;
1135             }
1136             CHECK_LT(trackIndex, mTracks.size());
1137 
1138             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1139 
1140             char *end;
1141             unsigned long seq = strtoul(val.c_str(), &end, 10);
1142 
1143             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1144             info->mFirstSeqNumInSegment = seq;
1145             info->mNewSegment = true;
1146 
1147             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1148 
1149             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1150 
1151             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1152 
1153             info->mNormalPlayTimeRTP = rtpTime;
1154             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1155 
1156             if (!mFirstAccessUnit) {
1157                 postNormalPlayTimeMapping(
1158                         trackIndex,
1159                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1160             }
1161 
1162             ++n;
1163         }
1164 
1165         mSeekable = true;
1166     }
1167 
getTrackFormatMyHandler1168     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1169         CHECK_GE(index, 0u);
1170         CHECK_LT(index, mTracks.size());
1171 
1172         const TrackInfo &info = mTracks.itemAt(index);
1173 
1174         *timeScale = info.mTimeScale;
1175 
1176         return info.mPacketSource->getFormat();
1177     }
1178 
countTracksMyHandler1179     size_t countTracks() const {
1180         return mTracks.size();
1181     }
1182 
1183 private:
1184     struct TrackInfo {
1185         AString mURL;
1186         int mRTPSocket;
1187         int mRTCPSocket;
1188         bool mUsingInterleavedTCP;
1189         uint32_t mFirstSeqNumInSegment;
1190         bool mNewSegment;
1191 
1192         uint32_t mRTPAnchor;
1193         int64_t mNTPAnchorUs;
1194         int32_t mTimeScale;
1195 
1196         uint32_t mNormalPlayTimeRTP;
1197         int64_t mNormalPlayTimeUs;
1198 
1199         sp<APacketSource> mPacketSource;
1200 
1201         // Stores packets temporarily while no notion of time
1202         // has been established yet.
1203         List<sp<ABuffer> > mPackets;
1204     };
1205 
1206     sp<AMessage> mNotify;
1207     bool mUIDValid;
1208     uid_t mUID;
1209     sp<ALooper> mNetLooper;
1210     sp<ARTSPConnection> mConn;
1211     sp<ARTPConnection> mRTPConn;
1212     sp<ASessionDescription> mSessionDesc;
1213     AString mOriginalSessionURL;  // This one still has user:pass@
1214     AString mSessionURL;
1215     AString mSessionHost;
1216     AString mBaseURL;
1217     AString mSessionID;
1218     bool mSetupTracksSuccessful;
1219     bool mSeekPending;
1220     bool mFirstAccessUnit;
1221 
1222     bool mAllTracksHaveTime;
1223     int64_t mNTPAnchorUs;
1224     int64_t mMediaAnchorUs;
1225     int64_t mLastMediaTimeUs;
1226 
1227     int64_t mNumAccessUnitsReceived;
1228     bool mCheckPending;
1229     int32_t mCheckGeneration;
1230     bool mTryTCPInterleaving;
1231     bool mTryFakeRTCP;
1232     bool mReceivedFirstRTCPPacket;
1233     bool mReceivedFirstRTPPacket;
1234     bool mSeekable;
1235     int64_t mKeepAliveTimeoutUs;
1236     int32_t mKeepAliveGeneration;
1237 
1238     Vector<TrackInfo> mTracks;
1239 
setupTrackMyHandler1240     void setupTrack(size_t index) {
1241         sp<APacketSource> source =
1242             new APacketSource(mSessionDesc, index);
1243 
1244         if (source->initCheck() != OK) {
1245             ALOGW("Unsupported format. Ignoring track #%d.", index);
1246 
1247             sp<AMessage> reply = new AMessage('setu', id());
1248             reply->setSize("index", index);
1249             reply->setInt32("result", ERROR_UNSUPPORTED);
1250             reply->post();
1251             return;
1252         }
1253 
1254         AString url;
1255         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1256 
1257         AString trackURL;
1258         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1259 
1260         mTracks.push(TrackInfo());
1261         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1262         info->mURL = trackURL;
1263         info->mPacketSource = source;
1264         info->mUsingInterleavedTCP = false;
1265         info->mFirstSeqNumInSegment = 0;
1266         info->mNewSegment = true;
1267         info->mRTPAnchor = 0;
1268         info->mNTPAnchorUs = -1;
1269         info->mNormalPlayTimeRTP = 0;
1270         info->mNormalPlayTimeUs = 0ll;
1271 
1272         unsigned long PT;
1273         AString formatDesc;
1274         AString formatParams;
1275         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1276 
1277         int32_t timescale;
1278         int32_t numChannels;
1279         ASessionDescription::ParseFormatDesc(
1280                 formatDesc.c_str(), &timescale, &numChannels);
1281 
1282         info->mTimeScale = timescale;
1283 
1284         ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1285 
1286         AString request = "SETUP ";
1287         request.append(trackURL);
1288         request.append(" RTSP/1.0\r\n");
1289 
1290         if (mTryTCPInterleaving) {
1291             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1292             info->mUsingInterleavedTCP = true;
1293             info->mRTPSocket = interleaveIndex;
1294             info->mRTCPSocket = interleaveIndex + 1;
1295 
1296             request.append("Transport: RTP/AVP/TCP;interleaved=");
1297             request.append(interleaveIndex);
1298             request.append("-");
1299             request.append(interleaveIndex + 1);
1300         } else {
1301             unsigned rtpPort;
1302             ARTPConnection::MakePortPair(
1303                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1304 
1305             if (mUIDValid) {
1306                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1307                                                 (uint32_t)*(uint32_t*) "RTP_");
1308                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1309                                                 (uint32_t)*(uint32_t*) "RTP_");
1310             }
1311 
1312             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1313             request.append(rtpPort);
1314             request.append("-");
1315             request.append(rtpPort + 1);
1316         }
1317 
1318         request.append("\r\n");
1319 
1320         if (index > 1) {
1321             request.append("Session: ");
1322             request.append(mSessionID);
1323             request.append("\r\n");
1324         }
1325 
1326         request.append("\r\n");
1327 
1328         sp<AMessage> reply = new AMessage('setu', id());
1329         reply->setSize("index", index);
1330         reply->setSize("track-index", mTracks.size() - 1);
1331         mConn->sendRequest(request.c_str(), reply);
1332     }
1333 
MakeURLMyHandler1334     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1335         out->clear();
1336 
1337         if (strncasecmp("rtsp://", baseURL, 7)) {
1338             // Base URL must be absolute
1339             return false;
1340         }
1341 
1342         if (!strncasecmp("rtsp://", url, 7)) {
1343             // "url" is already an absolute URL, ignore base URL.
1344             out->setTo(url);
1345             return true;
1346         }
1347 
1348         size_t n = strlen(baseURL);
1349         if (baseURL[n - 1] == '/') {
1350             out->setTo(baseURL);
1351             out->append(url);
1352         } else {
1353             const char *slashPos = strrchr(baseURL, '/');
1354 
1355             if (slashPos > &baseURL[6]) {
1356                 out->setTo(baseURL, slashPos - baseURL);
1357             } else {
1358                 out->setTo(baseURL);
1359             }
1360 
1361             out->append("/");
1362             out->append(url);
1363         }
1364 
1365         return true;
1366     }
1367 
fakeTimestampsMyHandler1368     void fakeTimestamps() {
1369         mNTPAnchorUs = -1ll;
1370         for (size_t i = 0; i < mTracks.size(); ++i) {
1371             onTimeUpdate(i, 0, 0ll);
1372         }
1373     }
1374 
onTimeUpdateMyHandler1375     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1376         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1377              trackIndex, rtpTime, ntpTime);
1378 
1379         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1380 
1381         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1382 
1383         track->mRTPAnchor = rtpTime;
1384         track->mNTPAnchorUs = ntpTimeUs;
1385 
1386         if (mNTPAnchorUs < 0) {
1387             mNTPAnchorUs = ntpTimeUs;
1388             mMediaAnchorUs = mLastMediaTimeUs;
1389         }
1390 
1391         if (!mAllTracksHaveTime) {
1392             bool allTracksHaveTime = true;
1393             for (size_t i = 0; i < mTracks.size(); ++i) {
1394                 TrackInfo *track = &mTracks.editItemAt(i);
1395                 if (track->mNTPAnchorUs < 0) {
1396                     allTracksHaveTime = false;
1397                     break;
1398                 }
1399             }
1400             if (allTracksHaveTime) {
1401                 mAllTracksHaveTime = true;
1402                 ALOGI("Time now established for all tracks.");
1403             }
1404         }
1405     }
1406 
onAccessUnitCompleteMyHandler1407     void onAccessUnitComplete(
1408             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1409         ALOGV("onAccessUnitComplete track %d", trackIndex);
1410 
1411         if (mFirstAccessUnit) {
1412             sp<AMessage> msg = mNotify->dup();
1413             msg->setInt32("what", kWhatConnected);
1414             msg->post();
1415 
1416             if (mSeekable) {
1417                 for (size_t i = 0; i < mTracks.size(); ++i) {
1418                     TrackInfo *info = &mTracks.editItemAt(i);
1419 
1420                     postNormalPlayTimeMapping(
1421                             i,
1422                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1423                 }
1424             }
1425 
1426             mFirstAccessUnit = false;
1427         }
1428 
1429         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1430 
1431         if (!mAllTracksHaveTime) {
1432             ALOGV("storing accessUnit, no time established yet");
1433             track->mPackets.push_back(accessUnit);
1434             return;
1435         }
1436 
1437         while (!track->mPackets.empty()) {
1438             sp<ABuffer> accessUnit = *track->mPackets.begin();
1439             track->mPackets.erase(track->mPackets.begin());
1440 
1441             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1442                 postQueueAccessUnit(trackIndex, accessUnit);
1443             }
1444         }
1445 
1446         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1447             postQueueAccessUnit(trackIndex, accessUnit);
1448         }
1449     }
1450 
addMediaTimestampMyHandler1451     bool addMediaTimestamp(
1452             int32_t trackIndex, const TrackInfo *track,
1453             const sp<ABuffer> &accessUnit) {
1454         uint32_t rtpTime;
1455         CHECK(accessUnit->meta()->findInt32(
1456                     "rtp-time", (int32_t *)&rtpTime));
1457 
1458         int64_t relRtpTimeUs =
1459             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1460                 / track->mTimeScale;
1461 
1462         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1463 
1464         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1465 
1466         if (mediaTimeUs > mLastMediaTimeUs) {
1467             mLastMediaTimeUs = mediaTimeUs;
1468         }
1469 
1470         if (mediaTimeUs < 0) {
1471             ALOGV("dropping early accessUnit.");
1472             return false;
1473         }
1474 
1475         ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1476              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1477 
1478         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1479 
1480         return true;
1481     }
1482 
postQueueAccessUnitMyHandler1483     void postQueueAccessUnit(
1484             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1485         sp<AMessage> msg = mNotify->dup();
1486         msg->setInt32("what", kWhatAccessUnit);
1487         msg->setSize("trackIndex", trackIndex);
1488         msg->setBuffer("accessUnit", accessUnit);
1489         msg->post();
1490     }
1491 
postQueueEOSMyHandler1492     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1493         sp<AMessage> msg = mNotify->dup();
1494         msg->setInt32("what", kWhatEOS);
1495         msg->setSize("trackIndex", trackIndex);
1496         msg->setInt32("finalResult", finalResult);
1497         msg->post();
1498     }
1499 
postQueueSeekDiscontinuityMyHandler1500     void postQueueSeekDiscontinuity(size_t trackIndex) {
1501         sp<AMessage> msg = mNotify->dup();
1502         msg->setInt32("what", kWhatSeekDiscontinuity);
1503         msg->setSize("trackIndex", trackIndex);
1504         msg->post();
1505     }
1506 
postNormalPlayTimeMappingMyHandler1507     void postNormalPlayTimeMapping(
1508             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1509         sp<AMessage> msg = mNotify->dup();
1510         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1511         msg->setSize("trackIndex", trackIndex);
1512         msg->setInt32("rtpTime", rtpTime);
1513         msg->setInt64("nptUs", nptUs);
1514         msg->post();
1515     }
1516 
1517     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1518 };
1519 
1520 }  // namespace android
1521 
1522 #endif  // MY_HANDLER_H_
1523