• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 #define LOG_TAG "MyHandler"
23 #include <utils/Log.h>
24 
25 #include "APacketSource.h"
26 #include "ARTPConnection.h"
27 #include "ARTSPConnection.h"
28 #include "ASessionDescription.h"
29 
30 #include <ctype.h>
31 
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/ALooper.h>
35 #include <media/stagefright/foundation/AMessage.h>
36 #include <media/stagefright/MediaErrors.h>
37 #include <media/stagefright/Utils.h>
38 
39 #include <arpa/inet.h>
40 #include <sys/socket.h>
41 #include <netdb.h>
42 
43 #include "HTTPBase.h"
44 
45 // If no access units are received within 5 secs, assume that the rtp
46 // stream has ended and signal end of stream.
47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
48 
49 // If no access units arrive for the first 10 secs after starting the
50 // stream, assume none ever will and signal EOS or switch transports.
51 static int64_t kStartupTimeoutUs = 10000000ll;
52 
53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54 
55 static int64_t kPauseDelayUs = 3000000ll;
56 
57 namespace android {
58 
GetAttribute(const char * s,const char * key,AString * value)59 static bool GetAttribute(const char *s, const char *key, AString *value) {
60     value->clear();
61 
62     size_t keyLen = strlen(key);
63 
64     for (;;) {
65         while (isspace(*s)) {
66             ++s;
67         }
68 
69         const char *colonPos = strchr(s, ';');
70 
71         size_t len =
72             (colonPos == NULL) ? strlen(s) : colonPos - s;
73 
74         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
75             value->setTo(&s[keyLen + 1], len - keyLen - 1);
76             return true;
77         }
78 
79         if (colonPos == NULL) {
80             return false;
81         }
82 
83         s = colonPos + 1;
84     }
85 }
86 
87 struct MyHandler : public AHandler {
88     enum {
89         kWhatConnected                  = 'conn',
90         kWhatDisconnected               = 'disc',
91         kWhatSeekDone                   = 'sdon',
92 
93         kWhatAccessUnit                 = 'accU',
94         kWhatEOS                        = 'eos!',
95         kWhatSeekDiscontinuity          = 'seeD',
96         kWhatNormalPlayTimeMapping      = 'nptM',
97     };
98 
99     MyHandler(
100             const char *url,
101             const sp<AMessage> &notify,
102             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler103         : mNotify(notify),
104           mUIDValid(uidValid),
105           mUID(uid),
106           mNetLooper(new ALooper),
107           mConn(new ARTSPConnection(mUIDValid, mUID)),
108           mRTPConn(new ARTPConnection),
109           mOriginalSessionURL(url),
110           mSessionURL(url),
111           mSetupTracksSuccessful(false),
112           mSeekPending(false),
113           mFirstAccessUnit(true),
114           mAllTracksHaveTime(false),
115           mNTPAnchorUs(-1),
116           mMediaAnchorUs(-1),
117           mLastMediaTimeUs(0),
118           mNumAccessUnitsReceived(0),
119           mCheckPending(false),
120           mCheckGeneration(0),
121           mCheckTimeoutGeneration(0),
122           mTryTCPInterleaving(false),
123           mTryFakeRTCP(false),
124           mReceivedFirstRTCPPacket(false),
125           mReceivedFirstRTPPacket(false),
126           mSeekable(true),
127           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
128           mKeepAliveGeneration(0),
129           mPausing(false),
130           mPauseGeneration(0),
131           mPlayResponseParsed(false) {
132         mNetLooper->setName("rtsp net");
133         mNetLooper->start(false /* runOnCallingThread */,
134                           false /* canCallJava */,
135                           PRIORITY_HIGHEST);
136 
137         // Strip any authentication info from the session url, we don't
138         // want to transmit user/pass in cleartext.
139         AString host, path, user, pass;
140         unsigned port;
141         CHECK(ARTSPConnection::ParseURL(
142                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
143 
144         if (user.size() > 0) {
145             mSessionURL.clear();
146             mSessionURL.append("rtsp://");
147             mSessionURL.append(host);
148             mSessionURL.append(":");
149             mSessionURL.append(StringPrintf("%u", port));
150             mSessionURL.append(path);
151 
152             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
153         }
154 
155         mSessionHost = host;
156     }
157 
connectMyHandler158     void connect() {
159         looper()->registerHandler(mConn);
160         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
161 
162         sp<AMessage> notify = new AMessage('biny', id());
163         mConn->observeBinaryData(notify);
164 
165         sp<AMessage> reply = new AMessage('conn', id());
166         mConn->connect(mOriginalSessionURL.c_str(), reply);
167     }
168 
loadSDPMyHandler169     void loadSDP(const sp<ASessionDescription>& desc) {
170         looper()->registerHandler(mConn);
171         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
172 
173         sp<AMessage> notify = new AMessage('biny', id());
174         mConn->observeBinaryData(notify);
175 
176         sp<AMessage> reply = new AMessage('sdpl', id());
177         reply->setObject("description", desc);
178         mConn->connect(mOriginalSessionURL.c_str(), reply);
179     }
180 
getControlURLMyHandler181     AString getControlURL(sp<ASessionDescription> desc) {
182         AString sessionLevelControlURL;
183         if (mSessionDesc->findAttribute(
184                 0,
185                 "a=control",
186                 &sessionLevelControlURL)) {
187             if (sessionLevelControlURL.compare("*") == 0) {
188                 return mBaseURL;
189             } else {
190                 AString controlURL;
191                 CHECK(MakeURL(
192                         mBaseURL.c_str(),
193                         sessionLevelControlURL.c_str(),
194                         &controlURL));
195                 return controlURL;
196             }
197         } else {
198             return mSessionURL;
199         }
200     }
201 
disconnectMyHandler202     void disconnect() {
203         (new AMessage('abor', id()))->post();
204     }
205 
seekMyHandler206     void seek(int64_t timeUs) {
207         sp<AMessage> msg = new AMessage('seek', id());
208         msg->setInt64("time", timeUs);
209         mPauseGeneration++;
210         msg->post();
211     }
212 
isSeekableMyHandler213     bool isSeekable() const {
214         return mSeekable;
215     }
216 
pauseMyHandler217     void pause() {
218         sp<AMessage> msg = new AMessage('paus', id());
219         mPauseGeneration++;
220         msg->setInt32("pausecheck", mPauseGeneration);
221         msg->post(kPauseDelayUs);
222     }
223 
resumeMyHandler224     void resume() {
225         sp<AMessage> msg = new AMessage('resu', id());
226         mPauseGeneration++;
227         msg->post();
228     }
229 
addRRMyHandler230     static void addRR(const sp<ABuffer> &buf) {
231         uint8_t *ptr = buf->data() + buf->size();
232         ptr[0] = 0x80 | 0;
233         ptr[1] = 201;  // RR
234         ptr[2] = 0;
235         ptr[3] = 1;
236         ptr[4] = 0xde;  // SSRC
237         ptr[5] = 0xad;
238         ptr[6] = 0xbe;
239         ptr[7] = 0xef;
240 
241         buf->setRange(0, buf->size() + 8);
242     }
243 
addSDESMyHandler244     static void addSDES(int s, const sp<ABuffer> &buffer) {
245         struct sockaddr_in addr;
246         socklen_t addrSize = sizeof(addr);
247         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
248 
249         uint8_t *data = buffer->data() + buffer->size();
250         data[0] = 0x80 | 1;
251         data[1] = 202;  // SDES
252         data[4] = 0xde;  // SSRC
253         data[5] = 0xad;
254         data[6] = 0xbe;
255         data[7] = 0xef;
256 
257         size_t offset = 8;
258 
259         data[offset++] = 1;  // CNAME
260 
261         AString cname = "stagefright@";
262         cname.append(inet_ntoa(addr.sin_addr));
263         data[offset++] = cname.size();
264 
265         memcpy(&data[offset], cname.c_str(), cname.size());
266         offset += cname.size();
267 
268         data[offset++] = 6;  // TOOL
269 
270         AString tool = MakeUserAgent();
271 
272         data[offset++] = tool.size();
273 
274         memcpy(&data[offset], tool.c_str(), tool.size());
275         offset += tool.size();
276 
277         data[offset++] = 0;
278 
279         if ((offset % 4) > 0) {
280             size_t count = 4 - (offset % 4);
281             switch (count) {
282                 case 3:
283                     data[offset++] = 0;
284                 case 2:
285                     data[offset++] = 0;
286                 case 1:
287                     data[offset++] = 0;
288             }
289         }
290 
291         size_t numWords = (offset / 4) - 1;
292         data[2] = numWords >> 8;
293         data[3] = numWords & 0xff;
294 
295         buffer->setRange(buffer->offset(), buffer->size() + offset);
296     }
297 
298     // In case we're behind NAT, fire off two UDP packets to the remote
299     // rtp/rtcp ports to poke a hole into the firewall for future incoming
300     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler301     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
302         struct sockaddr_in addr;
303         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
304         addr.sin_family = AF_INET;
305 
306         AString source;
307         AString server_port;
308         if (!GetAttribute(transport.c_str(),
309                           "source",
310                           &source)) {
311             ALOGW("Missing 'source' field in Transport response. Using "
312                  "RTSP endpoint address.");
313 
314             struct hostent *ent = gethostbyname(mSessionHost.c_str());
315             if (ent == NULL) {
316                 ALOGE("Failed to look up address of session host '%s'",
317                      mSessionHost.c_str());
318 
319                 return false;
320             }
321 
322             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
323         } else {
324             addr.sin_addr.s_addr = inet_addr(source.c_str());
325         }
326 
327         if (!GetAttribute(transport.c_str(),
328                                  "server_port",
329                                  &server_port)) {
330             ALOGI("Missing 'server_port' field in Transport response.");
331             return false;
332         }
333 
334         int rtpPort, rtcpPort;
335         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
336                 || rtpPort <= 0 || rtpPort > 65535
337                 || rtcpPort <=0 || rtcpPort > 65535
338                 || rtcpPort != rtpPort + 1) {
339             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
340                  " RTP port must be even, RTCP port must be one higher.",
341                  server_port.c_str());
342 
343             return false;
344         }
345 
346         if (rtpPort & 1) {
347             ALOGW("Server picked an odd RTP port, it should've picked an "
348                  "even one, we'll let it pass for now, but this may break "
349                  "in the future.");
350         }
351 
352         if (addr.sin_addr.s_addr == INADDR_NONE) {
353             return true;
354         }
355 
356         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
357             // No firewalls to traverse on the loopback interface.
358             return true;
359         }
360 
361         // Make up an RR/SDES RTCP packet.
362         sp<ABuffer> buf = new ABuffer(65536);
363         buf->setRange(0, 0);
364         addRR(buf);
365         addSDES(rtpSocket, buf);
366 
367         addr.sin_port = htons(rtpPort);
368 
369         ssize_t n = sendto(
370                 rtpSocket, buf->data(), buf->size(), 0,
371                 (const sockaddr *)&addr, sizeof(addr));
372 
373         if (n < (ssize_t)buf->size()) {
374             ALOGE("failed to poke a hole for RTP packets");
375             return false;
376         }
377 
378         addr.sin_port = htons(rtcpPort);
379 
380         n = sendto(
381                 rtcpSocket, buf->data(), buf->size(), 0,
382                 (const sockaddr *)&addr, sizeof(addr));
383 
384         if (n < (ssize_t)buf->size()) {
385             ALOGE("failed to poke a hole for RTCP packets");
386             return false;
387         }
388 
389         ALOGV("successfully poked holes.");
390 
391         return true;
392     }
393 
isLiveStreamMyHandler394     static bool isLiveStream(const sp<ASessionDescription> &desc) {
395         AString attrLiveStream;
396         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
397             ssize_t semicolonPos = attrLiveStream.find(";", 2);
398 
399             const char* liveStreamValue;
400             if (semicolonPos < 0) {
401                 liveStreamValue = attrLiveStream.c_str();
402             } else {
403                 AString valString;
404                 valString.setTo(attrLiveStream,
405                         semicolonPos + 1,
406                         attrLiveStream.size() - semicolonPos - 1);
407                 liveStreamValue = valString.c_str();
408             }
409 
410             uint32_t value = strtoul(liveStreamValue, NULL, 10);
411             if (value == 1) {
412                 ALOGV("found live stream");
413                 return true;
414             }
415         } else {
416             // It is a live stream if no duration is returned
417             int64_t durationUs;
418             if (!desc->getDurationUs(&durationUs)) {
419                 ALOGV("No duration found, assume live stream");
420                 return true;
421             }
422         }
423 
424         return false;
425     }
426 
onMessageReceivedMyHandler427     virtual void onMessageReceived(const sp<AMessage> &msg) {
428         switch (msg->what()) {
429             case 'conn':
430             {
431                 int32_t result;
432                 CHECK(msg->findInt32("result", &result));
433 
434                 ALOGI("connection request completed with result %d (%s)",
435                      result, strerror(-result));
436 
437                 if (result == OK) {
438                     AString request;
439                     request = "DESCRIBE ";
440                     request.append(mSessionURL);
441                     request.append(" RTSP/1.0\r\n");
442                     request.append("Accept: application/sdp\r\n");
443                     request.append("\r\n");
444 
445                     sp<AMessage> reply = new AMessage('desc', id());
446                     mConn->sendRequest(request.c_str(), reply);
447                 } else {
448                     (new AMessage('disc', id()))->post();
449                 }
450                 break;
451             }
452 
453             case 'disc':
454             {
455                 ++mKeepAliveGeneration;
456 
457                 int32_t reconnect;
458                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
459                     sp<AMessage> reply = new AMessage('conn', id());
460                     mConn->connect(mOriginalSessionURL.c_str(), reply);
461                 } else {
462                     (new AMessage('quit', id()))->post();
463                 }
464                 break;
465             }
466 
467             case 'desc':
468             {
469                 int32_t result;
470                 CHECK(msg->findInt32("result", &result));
471 
472                 ALOGI("DESCRIBE completed with result %d (%s)",
473                      result, strerror(-result));
474 
475                 if (result == OK) {
476                     sp<RefBase> obj;
477                     CHECK(msg->findObject("response", &obj));
478                     sp<ARTSPResponse> response =
479                         static_cast<ARTSPResponse *>(obj.get());
480 
481                     if (response->mStatusCode == 302) {
482                         ssize_t i = response->mHeaders.indexOfKey("location");
483                         CHECK_GE(i, 0);
484 
485                         mSessionURL = response->mHeaders.valueAt(i);
486 
487                         AString request;
488                         request = "DESCRIBE ";
489                         request.append(mSessionURL);
490                         request.append(" RTSP/1.0\r\n");
491                         request.append("Accept: application/sdp\r\n");
492                         request.append("\r\n");
493 
494                         sp<AMessage> reply = new AMessage('desc', id());
495                         mConn->sendRequest(request.c_str(), reply);
496                         break;
497                     }
498 
499                     if (response->mStatusCode != 200) {
500                         result = UNKNOWN_ERROR;
501                     } else if (response->mContent == NULL) {
502                         result = ERROR_MALFORMED;
503                         ALOGE("The response has no content.");
504                     } else {
505                         mSessionDesc = new ASessionDescription;
506 
507                         mSessionDesc->setTo(
508                                 response->mContent->data(),
509                                 response->mContent->size());
510 
511                         if (!mSessionDesc->isValid()) {
512                             ALOGE("Failed to parse session description.");
513                             result = ERROR_MALFORMED;
514                         } else {
515                             ssize_t i = response->mHeaders.indexOfKey("content-base");
516                             if (i >= 0) {
517                                 mBaseURL = response->mHeaders.valueAt(i);
518                             } else {
519                                 i = response->mHeaders.indexOfKey("content-location");
520                                 if (i >= 0) {
521                                     mBaseURL = response->mHeaders.valueAt(i);
522                                 } else {
523                                     mBaseURL = mSessionURL;
524                                 }
525                             }
526 
527                             mSeekable = !isLiveStream(mSessionDesc);
528 
529                             if (!mBaseURL.startsWith("rtsp://")) {
530                                 // Some misbehaving servers specify a relative
531                                 // URL in one of the locations above, combine
532                                 // it with the absolute session URL to get
533                                 // something usable...
534 
535                                 ALOGW("Server specified a non-absolute base URL"
536                                      ", combining it with the session URL to "
537                                      "get something usable...");
538 
539                                 AString tmp;
540                                 CHECK(MakeURL(
541                                             mSessionURL.c_str(),
542                                             mBaseURL.c_str(),
543                                             &tmp));
544 
545                                 mBaseURL = tmp;
546                             }
547 
548                             mControlURL = getControlURL(mSessionDesc);
549 
550                             if (mSessionDesc->countTracks() < 2) {
551                                 // There's no actual tracks in this session.
552                                 // The first "track" is merely session meta
553                                 // data.
554 
555                                 ALOGW("Session doesn't contain any playable "
556                                      "tracks. Aborting.");
557                                 result = ERROR_UNSUPPORTED;
558                             } else {
559                                 setupTrack(1);
560                             }
561                         }
562                     }
563                 }
564 
565                 if (result != OK) {
566                     sp<AMessage> reply = new AMessage('disc', id());
567                     mConn->disconnect(reply);
568                 }
569                 break;
570             }
571 
572             case 'sdpl':
573             {
574                 int32_t result;
575                 CHECK(msg->findInt32("result", &result));
576 
577                 ALOGI("SDP connection request completed with result %d (%s)",
578                      result, strerror(-result));
579 
580                 if (result == OK) {
581                     sp<RefBase> obj;
582                     CHECK(msg->findObject("description", &obj));
583                     mSessionDesc =
584                         static_cast<ASessionDescription *>(obj.get());
585 
586                     if (!mSessionDesc->isValid()) {
587                         ALOGE("Failed to parse session description.");
588                         result = ERROR_MALFORMED;
589                     } else {
590                         mBaseURL = mSessionURL;
591 
592                         mSeekable = !isLiveStream(mSessionDesc);
593 
594                         mControlURL = getControlURL(mSessionDesc);
595 
596                         if (mSessionDesc->countTracks() < 2) {
597                             // There's no actual tracks in this session.
598                             // The first "track" is merely session meta
599                             // data.
600 
601                             ALOGW("Session doesn't contain any playable "
602                                  "tracks. Aborting.");
603                             result = ERROR_UNSUPPORTED;
604                         } else {
605                             setupTrack(1);
606                         }
607                     }
608                 }
609 
610                 if (result != OK) {
611                     sp<AMessage> reply = new AMessage('disc', id());
612                     mConn->disconnect(reply);
613                 }
614                 break;
615             }
616 
617             case 'setu':
618             {
619                 size_t index;
620                 CHECK(msg->findSize("index", &index));
621 
622                 TrackInfo *track = NULL;
623                 size_t trackIndex;
624                 if (msg->findSize("track-index", &trackIndex)) {
625                     track = &mTracks.editItemAt(trackIndex);
626                 }
627 
628                 int32_t result;
629                 CHECK(msg->findInt32("result", &result));
630 
631                 ALOGI("SETUP(%d) completed with result %d (%s)",
632                      index, result, strerror(-result));
633 
634                 if (result == OK) {
635                     CHECK(track != NULL);
636 
637                     sp<RefBase> obj;
638                     CHECK(msg->findObject("response", &obj));
639                     sp<ARTSPResponse> response =
640                         static_cast<ARTSPResponse *>(obj.get());
641 
642                     if (response->mStatusCode != 200) {
643                         result = UNKNOWN_ERROR;
644                     } else {
645                         ssize_t i = response->mHeaders.indexOfKey("session");
646                         CHECK_GE(i, 0);
647 
648                         mSessionID = response->mHeaders.valueAt(i);
649 
650                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
651                         AString timeoutStr;
652                         if (GetAttribute(
653                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
654                             char *end;
655                             unsigned long timeoutSecs =
656                                 strtoul(timeoutStr.c_str(), &end, 10);
657 
658                             if (end == timeoutStr.c_str() || *end != '\0') {
659                                 ALOGW("server specified malformed timeout '%s'",
660                                      timeoutStr.c_str());
661 
662                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
663                             } else if (timeoutSecs < 15) {
664                                 ALOGW("server specified too short a timeout "
665                                      "(%lu secs), using default.",
666                                      timeoutSecs);
667 
668                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
669                             } else {
670                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
671 
672                                 ALOGI("server specified timeout of %lu secs.",
673                                      timeoutSecs);
674                             }
675                         }
676 
677                         i = mSessionID.find(";");
678                         if (i >= 0) {
679                             // Remove options, i.e. ";timeout=90"
680                             mSessionID.erase(i, mSessionID.size() - i);
681                         }
682 
683                         sp<AMessage> notify = new AMessage('accu', id());
684                         notify->setSize("track-index", trackIndex);
685 
686                         i = response->mHeaders.indexOfKey("transport");
687                         CHECK_GE(i, 0);
688 
689                         if (!track->mUsingInterleavedTCP) {
690                             AString transport = response->mHeaders.valueAt(i);
691 
692                             // We are going to continue even if we were
693                             // unable to poke a hole into the firewall...
694                             pokeAHole(
695                                     track->mRTPSocket,
696                                     track->mRTCPSocket,
697                                     transport);
698                         }
699 
700                         mRTPConn->addStream(
701                                 track->mRTPSocket, track->mRTCPSocket,
702                                 mSessionDesc, index,
703                                 notify, track->mUsingInterleavedTCP);
704 
705                         mSetupTracksSuccessful = true;
706                     }
707                 }
708 
709                 if (result != OK) {
710                     if (track) {
711                         if (!track->mUsingInterleavedTCP) {
712                             // Clear the tag
713                             if (mUIDValid) {
714                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
715                                 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
716                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
717                                 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
718                             }
719 
720                             close(track->mRTPSocket);
721                             close(track->mRTCPSocket);
722                         }
723 
724                         mTracks.removeItemsAt(trackIndex);
725                     }
726                 }
727 
728                 ++index;
729                 if (index < mSessionDesc->countTracks()) {
730                     setupTrack(index);
731                 } else if (mSetupTracksSuccessful) {
732                     ++mKeepAliveGeneration;
733                     postKeepAlive();
734 
735                     AString request = "PLAY ";
736                     request.append(mControlURL);
737                     request.append(" RTSP/1.0\r\n");
738 
739                     request.append("Session: ");
740                     request.append(mSessionID);
741                     request.append("\r\n");
742 
743                     request.append("\r\n");
744 
745                     sp<AMessage> reply = new AMessage('play', id());
746                     mConn->sendRequest(request.c_str(), reply);
747                 } else {
748                     sp<AMessage> reply = new AMessage('disc', id());
749                     mConn->disconnect(reply);
750                 }
751                 break;
752             }
753 
754             case 'play':
755             {
756                 int32_t result;
757                 CHECK(msg->findInt32("result", &result));
758 
759                 ALOGI("PLAY completed with result %d (%s)",
760                      result, strerror(-result));
761 
762                 if (result == OK) {
763                     sp<RefBase> obj;
764                     CHECK(msg->findObject("response", &obj));
765                     sp<ARTSPResponse> response =
766                         static_cast<ARTSPResponse *>(obj.get());
767 
768                     if (response->mStatusCode != 200) {
769                         result = UNKNOWN_ERROR;
770                     } else {
771                         parsePlayResponse(response);
772 
773                         sp<AMessage> timeout = new AMessage('tiou', id());
774                         mCheckTimeoutGeneration++;
775                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
776                         timeout->post(kStartupTimeoutUs);
777                     }
778                 }
779 
780                 if (result != OK) {
781                     sp<AMessage> reply = new AMessage('disc', id());
782                     mConn->disconnect(reply);
783                 }
784 
785                 break;
786             }
787 
788             case 'aliv':
789             {
790                 int32_t generation;
791                 CHECK(msg->findInt32("generation", &generation));
792 
793                 if (generation != mKeepAliveGeneration) {
794                     // obsolete event.
795                     break;
796                 }
797 
798                 AString request;
799                 request.append("OPTIONS ");
800                 request.append(mSessionURL);
801                 request.append(" RTSP/1.0\r\n");
802                 request.append("Session: ");
803                 request.append(mSessionID);
804                 request.append("\r\n");
805                 request.append("\r\n");
806 
807                 sp<AMessage> reply = new AMessage('opts', id());
808                 reply->setInt32("generation", mKeepAliveGeneration);
809                 mConn->sendRequest(request.c_str(), reply);
810                 break;
811             }
812 
813             case 'opts':
814             {
815                 int32_t result;
816                 CHECK(msg->findInt32("result", &result));
817 
818                 ALOGI("OPTIONS completed with result %d (%s)",
819                      result, strerror(-result));
820 
821                 int32_t generation;
822                 CHECK(msg->findInt32("generation", &generation));
823 
824                 if (generation != mKeepAliveGeneration) {
825                     // obsolete event.
826                     break;
827                 }
828 
829                 postKeepAlive();
830                 break;
831             }
832 
833             case 'abor':
834             {
835                 for (size_t i = 0; i < mTracks.size(); ++i) {
836                     TrackInfo *info = &mTracks.editItemAt(i);
837 
838                     if (!mFirstAccessUnit) {
839                         postQueueEOS(i, ERROR_END_OF_STREAM);
840                     }
841 
842                     if (!info->mUsingInterleavedTCP) {
843                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
844 
845                         // Clear the tag
846                         if (mUIDValid) {
847                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
848                             HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
849                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
850                             HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
851                         }
852 
853                         close(info->mRTPSocket);
854                         close(info->mRTCPSocket);
855                     }
856                 }
857                 mTracks.clear();
858                 mSetupTracksSuccessful = false;
859                 mSeekPending = false;
860                 mFirstAccessUnit = true;
861                 mAllTracksHaveTime = false;
862                 mNTPAnchorUs = -1;
863                 mMediaAnchorUs = -1;
864                 mNumAccessUnitsReceived = 0;
865                 mReceivedFirstRTCPPacket = false;
866                 mReceivedFirstRTPPacket = false;
867                 mPausing = false;
868                 mSeekable = true;
869 
870                 sp<AMessage> reply = new AMessage('tear', id());
871 
872                 int32_t reconnect;
873                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
874                     reply->setInt32("reconnect", true);
875                 }
876 
877                 AString request;
878                 request = "TEARDOWN ";
879 
880                 // XXX should use aggregate url from SDP here...
881                 request.append(mSessionURL);
882                 request.append(" RTSP/1.0\r\n");
883 
884                 request.append("Session: ");
885                 request.append(mSessionID);
886                 request.append("\r\n");
887 
888                 request.append("\r\n");
889 
890                 mConn->sendRequest(request.c_str(), reply);
891                 break;
892             }
893 
894             case 'tear':
895             {
896                 int32_t result;
897                 CHECK(msg->findInt32("result", &result));
898 
899                 ALOGI("TEARDOWN completed with result %d (%s)",
900                      result, strerror(-result));
901 
902                 sp<AMessage> reply = new AMessage('disc', id());
903 
904                 int32_t reconnect;
905                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
906                     reply->setInt32("reconnect", true);
907                 }
908 
909                 mConn->disconnect(reply);
910                 break;
911             }
912 
913             case 'quit':
914             {
915                 sp<AMessage> msg = mNotify->dup();
916                 msg->setInt32("what", kWhatDisconnected);
917                 msg->setInt32("result", UNKNOWN_ERROR);
918                 msg->post();
919                 break;
920             }
921 
922             case 'chek':
923             {
924                 int32_t generation;
925                 CHECK(msg->findInt32("generation", &generation));
926                 if (generation != mCheckGeneration) {
927                     // This is an outdated message. Ignore.
928                     break;
929                 }
930 
931                 if (mNumAccessUnitsReceived == 0) {
932 #if 1
933                     ALOGI("stream ended? aborting.");
934                     (new AMessage('abor', id()))->post();
935                     break;
936 #else
937                     ALOGI("haven't seen an AU in a looong time.");
938 #endif
939                 }
940 
941                 mNumAccessUnitsReceived = 0;
942                 msg->post(kAccessUnitTimeoutUs);
943                 break;
944             }
945 
946             case 'accu':
947             {
948                 int32_t timeUpdate;
949                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
950                     size_t trackIndex;
951                     CHECK(msg->findSize("track-index", &trackIndex));
952 
953                     uint32_t rtpTime;
954                     uint64_t ntpTime;
955                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
956                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
957 
958                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
959                     break;
960                 }
961 
962                 int32_t first;
963                 if (msg->findInt32("first-rtcp", &first)) {
964                     mReceivedFirstRTCPPacket = true;
965                     break;
966                 }
967 
968                 if (msg->findInt32("first-rtp", &first)) {
969                     mReceivedFirstRTPPacket = true;
970                     break;
971                 }
972 
973                 ++mNumAccessUnitsReceived;
974                 postAccessUnitTimeoutCheck();
975 
976                 size_t trackIndex;
977                 CHECK(msg->findSize("track-index", &trackIndex));
978 
979                 if (trackIndex >= mTracks.size()) {
980                     ALOGV("late packets ignored.");
981                     break;
982                 }
983 
984                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
985 
986                 int32_t eos;
987                 if (msg->findInt32("eos", &eos)) {
988                     ALOGI("received BYE on track index %d", trackIndex);
989                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
990                         ALOGI("No time established => fake existing data");
991 
992                         track->mEOSReceived = true;
993                         mTryFakeRTCP = true;
994                         mReceivedFirstRTCPPacket = true;
995                         fakeTimestamps();
996                     } else {
997                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
998                     }
999                     return;
1000                 }
1001 
1002                 sp<ABuffer> accessUnit;
1003                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1004 
1005                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1006 
1007                 if (mSeekPending) {
1008                     ALOGV("we're seeking, dropping stale packet.");
1009                     break;
1010                 }
1011 
1012                 if (seqNum < track->mFirstSeqNumInSegment) {
1013                     ALOGV("dropping stale access-unit (%d < %d)",
1014                          seqNum, track->mFirstSeqNumInSegment);
1015                     break;
1016                 }
1017 
1018                 if (track->mNewSegment) {
1019                     track->mNewSegment = false;
1020                 }
1021 
1022                 onAccessUnitComplete(trackIndex, accessUnit);
1023                 break;
1024             }
1025 
1026             case 'paus':
1027             {
1028                 int32_t generation;
1029                 CHECK(msg->findInt32("pausecheck", &generation));
1030                 if (generation != mPauseGeneration) {
1031                     ALOGV("Ignoring outdated pause message.");
1032                     break;
1033                 }
1034 
1035                 if (!mSeekable) {
1036                     ALOGW("This is a live stream, ignoring pause request.");
1037                     break;
1038                 }
1039                 mCheckPending = true;
1040                 ++mCheckGeneration;
1041                 mPausing = true;
1042 
1043                 AString request = "PAUSE ";
1044                 request.append(mControlURL);
1045                 request.append(" RTSP/1.0\r\n");
1046 
1047                 request.append("Session: ");
1048                 request.append(mSessionID);
1049                 request.append("\r\n");
1050 
1051                 request.append("\r\n");
1052 
1053                 sp<AMessage> reply = new AMessage('pau2', id());
1054                 mConn->sendRequest(request.c_str(), reply);
1055                 break;
1056             }
1057 
1058             case 'pau2':
1059             {
1060                 int32_t result;
1061                 CHECK(msg->findInt32("result", &result));
1062                 mCheckTimeoutGeneration++;
1063 
1064                 ALOGI("PAUSE completed with result %d (%s)",
1065                      result, strerror(-result));
1066                 break;
1067             }
1068 
1069             case 'resu':
1070             {
1071                 if (mPausing && mSeekPending) {
1072                     // If seeking, Play will be sent from see1 instead
1073                     break;
1074                 }
1075 
1076                 if (!mPausing) {
1077                     // Dont send PLAY if we have not paused
1078                     break;
1079                 }
1080                 AString request = "PLAY ";
1081                 request.append(mControlURL);
1082                 request.append(" RTSP/1.0\r\n");
1083 
1084                 request.append("Session: ");
1085                 request.append(mSessionID);
1086                 request.append("\r\n");
1087 
1088                 request.append("\r\n");
1089 
1090                 sp<AMessage> reply = new AMessage('res2', id());
1091                 mConn->sendRequest(request.c_str(), reply);
1092                 break;
1093             }
1094 
1095             case 'res2':
1096             {
1097                 int32_t result;
1098                 CHECK(msg->findInt32("result", &result));
1099 
1100                 ALOGI("PLAY completed with result %d (%s)",
1101                      result, strerror(-result));
1102 
1103                 mCheckPending = false;
1104                 postAccessUnitTimeoutCheck();
1105 
1106                 if (result == OK) {
1107                     sp<RefBase> obj;
1108                     CHECK(msg->findObject("response", &obj));
1109                     sp<ARTSPResponse> response =
1110                         static_cast<ARTSPResponse *>(obj.get());
1111 
1112                     if (response->mStatusCode != 200) {
1113                         result = UNKNOWN_ERROR;
1114                     } else {
1115                         parsePlayResponse(response);
1116 
1117                         // Post new timeout in order to make sure to use
1118                         // fake timestamps if no new Sender Reports arrive
1119                         sp<AMessage> timeout = new AMessage('tiou', id());
1120                         mCheckTimeoutGeneration++;
1121                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1122                         timeout->post(kStartupTimeoutUs);
1123                     }
1124                 }
1125 
1126                 if (result != OK) {
1127                     ALOGE("resume failed, aborting.");
1128                     (new AMessage('abor', id()))->post();
1129                 }
1130 
1131                 mPausing = false;
1132                 break;
1133             }
1134 
1135             case 'seek':
1136             {
1137                 if (!mSeekable) {
1138                     ALOGW("This is a live stream, ignoring seek request.");
1139 
1140                     sp<AMessage> msg = mNotify->dup();
1141                     msg->setInt32("what", kWhatSeekDone);
1142                     msg->post();
1143                     break;
1144                 }
1145 
1146                 int64_t timeUs;
1147                 CHECK(msg->findInt64("time", &timeUs));
1148 
1149                 mSeekPending = true;
1150 
1151                 // Disable the access unit timeout until we resumed
1152                 // playback again.
1153                 mCheckPending = true;
1154                 ++mCheckGeneration;
1155 
1156                 sp<AMessage> reply = new AMessage('see1', id());
1157                 reply->setInt64("time", timeUs);
1158 
1159                 if (mPausing) {
1160                     // PAUSE already sent
1161                     ALOGI("Pause already sent");
1162                     reply->post();
1163                     break;
1164                 }
1165                 AString request = "PAUSE ";
1166                 request.append(mControlURL);
1167                 request.append(" RTSP/1.0\r\n");
1168 
1169                 request.append("Session: ");
1170                 request.append(mSessionID);
1171                 request.append("\r\n");
1172 
1173                 request.append("\r\n");
1174 
1175                 mConn->sendRequest(request.c_str(), reply);
1176                 break;
1177             }
1178 
1179             case 'see1':
1180             {
1181                 // Session is paused now.
1182                 for (size_t i = 0; i < mTracks.size(); ++i) {
1183                     TrackInfo *info = &mTracks.editItemAt(i);
1184 
1185                     postQueueSeekDiscontinuity(i);
1186                     info->mEOSReceived = false;
1187 
1188                     info->mRTPAnchor = 0;
1189                     info->mNTPAnchorUs = -1;
1190                 }
1191 
1192                 mAllTracksHaveTime = false;
1193                 mNTPAnchorUs = -1;
1194 
1195                 // Start new timeoutgeneration to avoid getting timeout
1196                 // before PLAY response arrive
1197                 sp<AMessage> timeout = new AMessage('tiou', id());
1198                 mCheckTimeoutGeneration++;
1199                 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1200                 timeout->post(kStartupTimeoutUs);
1201 
1202                 int64_t timeUs;
1203                 CHECK(msg->findInt64("time", &timeUs));
1204 
1205                 AString request = "PLAY ";
1206                 request.append(mControlURL);
1207                 request.append(" RTSP/1.0\r\n");
1208 
1209                 request.append("Session: ");
1210                 request.append(mSessionID);
1211                 request.append("\r\n");
1212 
1213                 request.append(
1214                         StringPrintf(
1215                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1216 
1217                 request.append("\r\n");
1218 
1219                 sp<AMessage> reply = new AMessage('see2', id());
1220                 mConn->sendRequest(request.c_str(), reply);
1221                 break;
1222             }
1223 
1224             case 'see2':
1225             {
1226                 if (mTracks.size() == 0) {
1227                     // We have already hit abor, break
1228                     break;
1229                 }
1230 
1231                 int32_t result;
1232                 CHECK(msg->findInt32("result", &result));
1233 
1234                 ALOGI("PLAY completed with result %d (%s)",
1235                      result, strerror(-result));
1236 
1237                 mCheckPending = false;
1238                 postAccessUnitTimeoutCheck();
1239 
1240                 if (result == OK) {
1241                     sp<RefBase> obj;
1242                     CHECK(msg->findObject("response", &obj));
1243                     sp<ARTSPResponse> response =
1244                         static_cast<ARTSPResponse *>(obj.get());
1245 
1246                     if (response->mStatusCode != 200) {
1247                         result = UNKNOWN_ERROR;
1248                     } else {
1249                         parsePlayResponse(response);
1250 
1251                         // Post new timeout in order to make sure to use
1252                         // fake timestamps if no new Sender Reports arrive
1253                         sp<AMessage> timeout = new AMessage('tiou', id());
1254                         mCheckTimeoutGeneration++;
1255                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1256                         timeout->post(kStartupTimeoutUs);
1257 
1258                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1259                         CHECK_GE(i, 0);
1260 
1261                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1262 
1263                         ALOGI("seek completed.");
1264                     }
1265                 }
1266 
1267                 if (result != OK) {
1268                     ALOGE("seek failed, aborting.");
1269                     (new AMessage('abor', id()))->post();
1270                 }
1271 
1272                 mPausing = false;
1273                 mSeekPending = false;
1274 
1275                 sp<AMessage> msg = mNotify->dup();
1276                 msg->setInt32("what", kWhatSeekDone);
1277                 msg->post();
1278                 break;
1279             }
1280 
1281             case 'biny':
1282             {
1283                 sp<ABuffer> buffer;
1284                 CHECK(msg->findBuffer("buffer", &buffer));
1285 
1286                 int32_t index;
1287                 CHECK(buffer->meta()->findInt32("index", &index));
1288 
1289                 mRTPConn->injectPacket(index, buffer);
1290                 break;
1291             }
1292 
1293             case 'tiou':
1294             {
1295                 int32_t timeoutGenerationCheck;
1296                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1297                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1298                     // This is an outdated message. Ignore.
1299                     // This typically happens if a lot of seeks are
1300                     // performed, since new timeout messages now are
1301                     // posted at seek as well.
1302                     break;
1303                 }
1304                 if (!mReceivedFirstRTCPPacket) {
1305                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1306                         ALOGW("We received RTP packets but no RTCP packets, "
1307                              "using fake timestamps.");
1308 
1309                         mTryFakeRTCP = true;
1310 
1311                         mReceivedFirstRTCPPacket = true;
1312 
1313                         fakeTimestamps();
1314                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1315                         ALOGW("Never received any data, switching transports.");
1316 
1317                         mTryTCPInterleaving = true;
1318 
1319                         sp<AMessage> msg = new AMessage('abor', id());
1320                         msg->setInt32("reconnect", true);
1321                         msg->post();
1322                     } else {
1323                         ALOGW("Never received any data, disconnecting.");
1324                         (new AMessage('abor', id()))->post();
1325                     }
1326                 } else {
1327                     if (!mAllTracksHaveTime) {
1328                         ALOGW("We received some RTCP packets, but time "
1329                               "could not be established on all tracks, now "
1330                               "using fake timestamps");
1331 
1332                         fakeTimestamps();
1333                     }
1334                 }
1335                 break;
1336             }
1337 
1338             default:
1339                 TRESPASS();
1340                 break;
1341         }
1342     }
1343 
postKeepAliveMyHandler1344     void postKeepAlive() {
1345         sp<AMessage> msg = new AMessage('aliv', id());
1346         msg->setInt32("generation", mKeepAliveGeneration);
1347         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1348     }
1349 
postAccessUnitTimeoutCheckMyHandler1350     void postAccessUnitTimeoutCheck() {
1351         if (mCheckPending) {
1352             return;
1353         }
1354 
1355         mCheckPending = true;
1356         sp<AMessage> check = new AMessage('chek', id());
1357         check->setInt32("generation", mCheckGeneration);
1358         check->post(kAccessUnitTimeoutUs);
1359     }
1360 
SplitStringMyHandler1361     static void SplitString(
1362             const AString &s, const char *separator, List<AString> *items) {
1363         items->clear();
1364         size_t start = 0;
1365         while (start < s.size()) {
1366             ssize_t offset = s.find(separator, start);
1367 
1368             if (offset < 0) {
1369                 items->push_back(AString(s, start, s.size() - start));
1370                 break;
1371             }
1372 
1373             items->push_back(AString(s, start, offset - start));
1374             start = offset + strlen(separator);
1375         }
1376     }
1377 
parsePlayResponseMyHandler1378     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1379         mPlayResponseParsed = true;
1380         if (mTracks.size() == 0) {
1381             ALOGV("parsePlayResponse: late packets ignored.");
1382             return;
1383         }
1384 
1385         ssize_t i = response->mHeaders.indexOfKey("range");
1386         if (i < 0) {
1387             // Server doesn't even tell use what range it is going to
1388             // play, therefore we won't support seeking.
1389             return;
1390         }
1391 
1392         AString range = response->mHeaders.valueAt(i);
1393         ALOGV("Range: %s", range.c_str());
1394 
1395         AString val;
1396         CHECK(GetAttribute(range.c_str(), "npt", &val));
1397 
1398         float npt1, npt2;
1399         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1400             // This is a live stream and therefore not seekable.
1401 
1402             ALOGI("This is a live stream");
1403             return;
1404         }
1405 
1406         i = response->mHeaders.indexOfKey("rtp-info");
1407         CHECK_GE(i, 0);
1408 
1409         AString rtpInfo = response->mHeaders.valueAt(i);
1410         List<AString> streamInfos;
1411         SplitString(rtpInfo, ",", &streamInfos);
1412 
1413         int n = 1;
1414         for (List<AString>::iterator it = streamInfos.begin();
1415              it != streamInfos.end(); ++it) {
1416             (*it).trim();
1417             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1418 
1419             CHECK(GetAttribute((*it).c_str(), "url", &val));
1420 
1421             size_t trackIndex = 0;
1422             while (trackIndex < mTracks.size()
1423                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1424                 ++trackIndex;
1425             }
1426             CHECK_LT(trackIndex, mTracks.size());
1427 
1428             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1429 
1430             char *end;
1431             unsigned long seq = strtoul(val.c_str(), &end, 10);
1432 
1433             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1434             info->mFirstSeqNumInSegment = seq;
1435             info->mNewSegment = true;
1436 
1437             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1438 
1439             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1440 
1441             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1442 
1443             info->mNormalPlayTimeRTP = rtpTime;
1444             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1445 
1446             if (!mFirstAccessUnit) {
1447                 postNormalPlayTimeMapping(
1448                         trackIndex,
1449                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1450             }
1451 
1452             ++n;
1453         }
1454     }
1455 
getTrackFormatMyHandler1456     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1457         CHECK_GE(index, 0u);
1458         CHECK_LT(index, mTracks.size());
1459 
1460         const TrackInfo &info = mTracks.itemAt(index);
1461 
1462         *timeScale = info.mTimeScale;
1463 
1464         return info.mPacketSource->getFormat();
1465     }
1466 
countTracksMyHandler1467     size_t countTracks() const {
1468         return mTracks.size();
1469     }
1470 
1471 private:
1472     struct TrackInfo {
1473         AString mURL;
1474         int mRTPSocket;
1475         int mRTCPSocket;
1476         bool mUsingInterleavedTCP;
1477         uint32_t mFirstSeqNumInSegment;
1478         bool mNewSegment;
1479 
1480         uint32_t mRTPAnchor;
1481         int64_t mNTPAnchorUs;
1482         int32_t mTimeScale;
1483         bool mEOSReceived;
1484 
1485         uint32_t mNormalPlayTimeRTP;
1486         int64_t mNormalPlayTimeUs;
1487 
1488         sp<APacketSource> mPacketSource;
1489 
1490         // Stores packets temporarily while no notion of time
1491         // has been established yet.
1492         List<sp<ABuffer> > mPackets;
1493     };
1494 
1495     sp<AMessage> mNotify;
1496     bool mUIDValid;
1497     uid_t mUID;
1498     sp<ALooper> mNetLooper;
1499     sp<ARTSPConnection> mConn;
1500     sp<ARTPConnection> mRTPConn;
1501     sp<ASessionDescription> mSessionDesc;
1502     AString mOriginalSessionURL;  // This one still has user:pass@
1503     AString mSessionURL;
1504     AString mSessionHost;
1505     AString mBaseURL;
1506     AString mControlURL;
1507     AString mSessionID;
1508     bool mSetupTracksSuccessful;
1509     bool mSeekPending;
1510     bool mFirstAccessUnit;
1511 
1512     bool mAllTracksHaveTime;
1513     int64_t mNTPAnchorUs;
1514     int64_t mMediaAnchorUs;
1515     int64_t mLastMediaTimeUs;
1516 
1517     int64_t mNumAccessUnitsReceived;
1518     bool mCheckPending;
1519     int32_t mCheckGeneration;
1520     int32_t mCheckTimeoutGeneration;
1521     bool mTryTCPInterleaving;
1522     bool mTryFakeRTCP;
1523     bool mReceivedFirstRTCPPacket;
1524     bool mReceivedFirstRTPPacket;
1525     bool mSeekable;
1526     int64_t mKeepAliveTimeoutUs;
1527     int32_t mKeepAliveGeneration;
1528     bool mPausing;
1529     int32_t mPauseGeneration;
1530 
1531     Vector<TrackInfo> mTracks;
1532 
1533     bool mPlayResponseParsed;
1534 
setupTrackMyHandler1535     void setupTrack(size_t index) {
1536         sp<APacketSource> source =
1537             new APacketSource(mSessionDesc, index);
1538 
1539         if (source->initCheck() != OK) {
1540             ALOGW("Unsupported format. Ignoring track #%d.", index);
1541 
1542             sp<AMessage> reply = new AMessage('setu', id());
1543             reply->setSize("index", index);
1544             reply->setInt32("result", ERROR_UNSUPPORTED);
1545             reply->post();
1546             return;
1547         }
1548 
1549         AString url;
1550         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1551 
1552         AString trackURL;
1553         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1554 
1555         mTracks.push(TrackInfo());
1556         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1557         info->mURL = trackURL;
1558         info->mPacketSource = source;
1559         info->mUsingInterleavedTCP = false;
1560         info->mFirstSeqNumInSegment = 0;
1561         info->mNewSegment = true;
1562         info->mRTPAnchor = 0;
1563         info->mNTPAnchorUs = -1;
1564         info->mNormalPlayTimeRTP = 0;
1565         info->mNormalPlayTimeUs = 0ll;
1566 
1567         unsigned long PT;
1568         AString formatDesc;
1569         AString formatParams;
1570         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1571 
1572         int32_t timescale;
1573         int32_t numChannels;
1574         ASessionDescription::ParseFormatDesc(
1575                 formatDesc.c_str(), &timescale, &numChannels);
1576 
1577         info->mTimeScale = timescale;
1578         info->mEOSReceived = false;
1579 
1580         ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1581 
1582         AString request = "SETUP ";
1583         request.append(trackURL);
1584         request.append(" RTSP/1.0\r\n");
1585 
1586         if (mTryTCPInterleaving) {
1587             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1588             info->mUsingInterleavedTCP = true;
1589             info->mRTPSocket = interleaveIndex;
1590             info->mRTCPSocket = interleaveIndex + 1;
1591 
1592             request.append("Transport: RTP/AVP/TCP;interleaved=");
1593             request.append(interleaveIndex);
1594             request.append("-");
1595             request.append(interleaveIndex + 1);
1596         } else {
1597             unsigned rtpPort;
1598             ARTPConnection::MakePortPair(
1599                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1600 
1601             if (mUIDValid) {
1602                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1603                                                 (uint32_t)*(uint32_t*) "RTP_");
1604                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1605                                                 (uint32_t)*(uint32_t*) "RTP_");
1606                 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1607                 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1608             }
1609 
1610             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1611             request.append(rtpPort);
1612             request.append("-");
1613             request.append(rtpPort + 1);
1614         }
1615 
1616         request.append("\r\n");
1617 
1618         if (index > 1) {
1619             request.append("Session: ");
1620             request.append(mSessionID);
1621             request.append("\r\n");
1622         }
1623 
1624         request.append("\r\n");
1625 
1626         sp<AMessage> reply = new AMessage('setu', id());
1627         reply->setSize("index", index);
1628         reply->setSize("track-index", mTracks.size() - 1);
1629         mConn->sendRequest(request.c_str(), reply);
1630     }
1631 
MakeURLMyHandler1632     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1633         out->clear();
1634 
1635         if (strncasecmp("rtsp://", baseURL, 7)) {
1636             // Base URL must be absolute
1637             return false;
1638         }
1639 
1640         if (!strncasecmp("rtsp://", url, 7)) {
1641             // "url" is already an absolute URL, ignore base URL.
1642             out->setTo(url);
1643             return true;
1644         }
1645 
1646         size_t n = strlen(baseURL);
1647         if (baseURL[n - 1] == '/') {
1648             out->setTo(baseURL);
1649             out->append(url);
1650         } else {
1651             const char *slashPos = strrchr(baseURL, '/');
1652 
1653             if (slashPos > &baseURL[6]) {
1654                 out->setTo(baseURL, slashPos - baseURL);
1655             } else {
1656                 out->setTo(baseURL);
1657             }
1658 
1659             out->append("/");
1660             out->append(url);
1661         }
1662 
1663         return true;
1664     }
1665 
fakeTimestampsMyHandler1666     void fakeTimestamps() {
1667         mNTPAnchorUs = -1ll;
1668         for (size_t i = 0; i < mTracks.size(); ++i) {
1669             onTimeUpdate(i, 0, 0ll);
1670         }
1671     }
1672 
dataReceivedOnAllChannelsMyHandler1673     bool dataReceivedOnAllChannels() {
1674         TrackInfo *track;
1675         for (size_t i = 0; i < mTracks.size(); ++i) {
1676             track = &mTracks.editItemAt(i);
1677             if (track->mPackets.empty()) {
1678                 return false;
1679             }
1680         }
1681         return true;
1682     }
1683 
handleFirstAccessUnitMyHandler1684     void handleFirstAccessUnit() {
1685         if (mFirstAccessUnit) {
1686             sp<AMessage> msg = mNotify->dup();
1687             msg->setInt32("what", kWhatConnected);
1688             msg->post();
1689 
1690             if (mSeekable) {
1691                 for (size_t i = 0; i < mTracks.size(); ++i) {
1692                     TrackInfo *info = &mTracks.editItemAt(i);
1693 
1694                     postNormalPlayTimeMapping(
1695                             i,
1696                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1697                 }
1698             }
1699 
1700             mFirstAccessUnit = false;
1701         }
1702     }
1703 
onTimeUpdateMyHandler1704     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1705         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1706              trackIndex, rtpTime, ntpTime);
1707 
1708         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1709 
1710         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1711 
1712         track->mRTPAnchor = rtpTime;
1713         track->mNTPAnchorUs = ntpTimeUs;
1714 
1715         if (mNTPAnchorUs < 0) {
1716             mNTPAnchorUs = ntpTimeUs;
1717             mMediaAnchorUs = mLastMediaTimeUs;
1718         }
1719 
1720         if (!mAllTracksHaveTime) {
1721             bool allTracksHaveTime = true;
1722             for (size_t i = 0; i < mTracks.size(); ++i) {
1723                 TrackInfo *track = &mTracks.editItemAt(i);
1724                 if (track->mNTPAnchorUs < 0) {
1725                     allTracksHaveTime = false;
1726                     break;
1727                 }
1728             }
1729             if (allTracksHaveTime) {
1730                 mAllTracksHaveTime = true;
1731                 ALOGI("Time now established for all tracks.");
1732             }
1733         }
1734         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1735             handleFirstAccessUnit();
1736 
1737             // Time is now established, lets start timestamping immediately
1738             for (size_t i = 0; i < mTracks.size(); ++i) {
1739                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1740                 while (!trackInfo->mPackets.empty()) {
1741                     sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1742                     trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1743 
1744                     if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1745                         postQueueAccessUnit(i, accessUnit);
1746                     }
1747                 }
1748             }
1749             for (size_t i = 0; i < mTracks.size(); ++i) {
1750                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1751                 if (trackInfo->mEOSReceived) {
1752                     postQueueEOS(i, ERROR_END_OF_STREAM);
1753                     trackInfo->mEOSReceived = false;
1754                 }
1755             }
1756         }
1757     }
1758 
onAccessUnitCompleteMyHandler1759     void onAccessUnitComplete(
1760             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1761         ALOGV("onAccessUnitComplete track %d", trackIndex);
1762 
1763         if(!mPlayResponseParsed){
1764             ALOGI("play response is not parsed, storing accessunit");
1765             TrackInfo *track = &mTracks.editItemAt(trackIndex);
1766             track->mPackets.push_back(accessUnit);
1767             return;
1768         }
1769 
1770         handleFirstAccessUnit();
1771 
1772         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1773 
1774         if (!mAllTracksHaveTime) {
1775             ALOGV("storing accessUnit, no time established yet");
1776             track->mPackets.push_back(accessUnit);
1777             return;
1778         }
1779 
1780         while (!track->mPackets.empty()) {
1781             sp<ABuffer> accessUnit = *track->mPackets.begin();
1782             track->mPackets.erase(track->mPackets.begin());
1783 
1784             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1785                 postQueueAccessUnit(trackIndex, accessUnit);
1786             }
1787         }
1788 
1789         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1790             postQueueAccessUnit(trackIndex, accessUnit);
1791         }
1792 
1793         if (track->mEOSReceived) {
1794             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1795             track->mEOSReceived = false;
1796         }
1797     }
1798 
addMediaTimestampMyHandler1799     bool addMediaTimestamp(
1800             int32_t trackIndex, const TrackInfo *track,
1801             const sp<ABuffer> &accessUnit) {
1802         uint32_t rtpTime;
1803         CHECK(accessUnit->meta()->findInt32(
1804                     "rtp-time", (int32_t *)&rtpTime));
1805 
1806         int64_t relRtpTimeUs =
1807             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1808                 / track->mTimeScale;
1809 
1810         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1811 
1812         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1813 
1814         if (mediaTimeUs > mLastMediaTimeUs) {
1815             mLastMediaTimeUs = mediaTimeUs;
1816         }
1817 
1818         if (mediaTimeUs < 0) {
1819             ALOGV("dropping early accessUnit.");
1820             return false;
1821         }
1822 
1823         ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1824              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1825 
1826         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1827 
1828         return true;
1829     }
1830 
postQueueAccessUnitMyHandler1831     void postQueueAccessUnit(
1832             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1833         sp<AMessage> msg = mNotify->dup();
1834         msg->setInt32("what", kWhatAccessUnit);
1835         msg->setSize("trackIndex", trackIndex);
1836         msg->setBuffer("accessUnit", accessUnit);
1837         msg->post();
1838     }
1839 
postQueueEOSMyHandler1840     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1841         sp<AMessage> msg = mNotify->dup();
1842         msg->setInt32("what", kWhatEOS);
1843         msg->setSize("trackIndex", trackIndex);
1844         msg->setInt32("finalResult", finalResult);
1845         msg->post();
1846     }
1847 
postQueueSeekDiscontinuityMyHandler1848     void postQueueSeekDiscontinuity(size_t trackIndex) {
1849         sp<AMessage> msg = mNotify->dup();
1850         msg->setInt32("what", kWhatSeekDiscontinuity);
1851         msg->setSize("trackIndex", trackIndex);
1852         msg->post();
1853     }
1854 
postNormalPlayTimeMappingMyHandler1855     void postNormalPlayTimeMapping(
1856             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1857         sp<AMessage> msg = mNotify->dup();
1858         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1859         msg->setSize("trackIndex", trackIndex);
1860         msg->setInt32("rtpTime", rtpTime);
1861         msg->setInt64("nptUs", nptUs);
1862         msg->post();
1863     }
1864 
1865     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1866 };
1867 
1868 }  // namespace android
1869 
1870 #endif  // MY_HANDLER_H_
1871