• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26 
27 #include <utils/Log.h>
28 
29 #include "APacketSource.h"
30 #include "ARTPConnection.h"
31 #include "ARTSPConnection.h"
32 #include "ASessionDescription.h"
33 
34 #include <ctype.h>
35 
36 #include <media/stagefright/foundation/ABuffer.h>
37 #include <media/stagefright/foundation/ADebug.h>
38 #include <media/stagefright/foundation/ALooper.h>
39 #include <media/stagefright/foundation/AMessage.h>
40 #include <media/stagefright/MediaErrors.h>
41 #include <media/stagefright/Utils.h>
42 
43 #include <arpa/inet.h>
44 #include <sys/socket.h>
45 #include <netdb.h>
46 
47 #include "HTTPBase.h"
48 
49 #if LOG_NDEBUG
50 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
51 #else
52 #define UNUSED_UNLESS_VERBOSE(x)
53 #endif
54 
55 // If no access units are received within 5 secs, assume that the rtp
56 // stream has ended and signal end of stream.
57 static int64_t kAccessUnitTimeoutUs = 10000000ll;
58 
59 // If no access units arrive for the first 10 secs after starting the
60 // stream, assume none ever will and signal EOS or switch transports.
61 static int64_t kStartupTimeoutUs = 10000000ll;
62 
63 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
64 
65 static int64_t kPauseDelayUs = 3000000ll;
66 
67 // The allowed maximum number of stale access units at the beginning of
68 // a new sequence.
69 static int32_t kMaxAllowedStaleAccessUnits = 20;
70 
71 namespace android {
72 
GetAttribute(const char * s,const char * key,AString * value)73 static bool GetAttribute(const char *s, const char *key, AString *value) {
74     value->clear();
75 
76     size_t keyLen = strlen(key);
77 
78     for (;;) {
79         while (isspace(*s)) {
80             ++s;
81         }
82 
83         const char *colonPos = strchr(s, ';');
84 
85         size_t len =
86             (colonPos == NULL) ? strlen(s) : colonPos - s;
87 
88         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
89             value->setTo(&s[keyLen + 1], len - keyLen - 1);
90             return true;
91         }
92 
93         if (colonPos == NULL) {
94             return false;
95         }
96 
97         s = colonPos + 1;
98     }
99 }
100 
101 struct MyHandler : public AHandler {
102     enum {
103         kWhatConnected                  = 'conn',
104         kWhatDisconnected               = 'disc',
105         kWhatSeekPaused                 = 'spau',
106         kWhatSeekDone                   = 'sdon',
107 
108         kWhatAccessUnit                 = 'accU',
109         kWhatEOS                        = 'eos!',
110         kWhatSeekDiscontinuity          = 'seeD',
111         kWhatNormalPlayTimeMapping      = 'nptM',
112     };
113 
114     MyHandler(
115             const char *url,
116             const sp<AMessage> &notify,
117             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler118         : mNotify(notify),
119           mUIDValid(uidValid),
120           mUID(uid),
121           mNetLooper(new ALooper),
122           mConn(new ARTSPConnection(mUIDValid, mUID)),
123           mRTPConn(new ARTPConnection),
124           mOriginalSessionURL(url),
125           mSessionURL(url),
126           mSetupTracksSuccessful(false),
127           mSeekPending(false),
128           mFirstAccessUnit(true),
129           mAllTracksHaveTime(false),
130           mNTPAnchorUs(-1),
131           mMediaAnchorUs(-1),
132           mLastMediaTimeUs(0),
133           mNumAccessUnitsReceived(0),
134           mCheckPending(false),
135           mCheckGeneration(0),
136           mCheckTimeoutGeneration(0),
137           mTryTCPInterleaving(false),
138           mTryFakeRTCP(false),
139           mReceivedFirstRTCPPacket(false),
140           mReceivedFirstRTPPacket(false),
141           mSeekable(true),
142           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
143           mKeepAliveGeneration(0),
144           mPausing(false),
145           mPauseGeneration(0),
146           mPlayResponseParsed(false) {
147         mNetLooper->setName("rtsp net");
148         mNetLooper->start(false /* runOnCallingThread */,
149                           false /* canCallJava */,
150                           PRIORITY_HIGHEST);
151 
152         // Strip any authentication info from the session url, we don't
153         // want to transmit user/pass in cleartext.
154         AString host, path, user, pass;
155         unsigned port;
156         CHECK(ARTSPConnection::ParseURL(
157                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
158 
159         if (user.size() > 0) {
160             mSessionURL.clear();
161             mSessionURL.append("rtsp://");
162             mSessionURL.append(host);
163             mSessionURL.append(":");
164             mSessionURL.append(AStringPrintf("%u", port));
165             mSessionURL.append(path);
166 
167             ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
168         }
169 
170         mSessionHost = host;
171     }
172 
connectMyHandler173     void connect() {
174         looper()->registerHandler(mConn);
175         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
176 
177         sp<AMessage> notify = new AMessage('biny', this);
178         mConn->observeBinaryData(notify);
179 
180         sp<AMessage> reply = new AMessage('conn', this);
181         mConn->connect(mOriginalSessionURL.c_str(), reply);
182     }
183 
loadSDPMyHandler184     void loadSDP(const sp<ASessionDescription>& desc) {
185         looper()->registerHandler(mConn);
186         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
187 
188         sp<AMessage> notify = new AMessage('biny', this);
189         mConn->observeBinaryData(notify);
190 
191         sp<AMessage> reply = new AMessage('sdpl', this);
192         reply->setObject("description", desc);
193         mConn->connect(mOriginalSessionURL.c_str(), reply);
194     }
195 
getControlURLMyHandler196     AString getControlURL() {
197         AString sessionLevelControlURL;
198         if (mSessionDesc->findAttribute(
199                 0,
200                 "a=control",
201                 &sessionLevelControlURL)) {
202             if (sessionLevelControlURL.compare("*") == 0) {
203                 return mBaseURL;
204             } else {
205                 AString controlURL;
206                 CHECK(MakeURL(
207                         mBaseURL.c_str(),
208                         sessionLevelControlURL.c_str(),
209                         &controlURL));
210                 return controlURL;
211             }
212         } else {
213             return mSessionURL;
214         }
215     }
216 
disconnectMyHandler217     void disconnect() {
218         (new AMessage('abor', this))->post();
219     }
220 
seekMyHandler221     void seek(int64_t timeUs) {
222         sp<AMessage> msg = new AMessage('seek', this);
223         msg->setInt64("time", timeUs);
224         mPauseGeneration++;
225         msg->post();
226     }
227 
continueSeekAfterPauseMyHandler228     void continueSeekAfterPause(int64_t timeUs) {
229         sp<AMessage> msg = new AMessage('see1', this);
230         msg->setInt64("time", timeUs);
231         msg->post();
232     }
233 
isSeekableMyHandler234     bool isSeekable() const {
235         return mSeekable;
236     }
237 
pauseMyHandler238     void pause() {
239         sp<AMessage> msg = new AMessage('paus', this);
240         mPauseGeneration++;
241         msg->setInt32("pausecheck", mPauseGeneration);
242         msg->post();
243     }
244 
resumeMyHandler245     void resume() {
246         sp<AMessage> msg = new AMessage('resu', this);
247         mPauseGeneration++;
248         msg->post();
249     }
250 
addRRMyHandler251     static void addRR(const sp<ABuffer> &buf) {
252         uint8_t *ptr = buf->data() + buf->size();
253         ptr[0] = 0x80 | 0;
254         ptr[1] = 201;  // RR
255         ptr[2] = 0;
256         ptr[3] = 1;
257         ptr[4] = 0xde;  // SSRC
258         ptr[5] = 0xad;
259         ptr[6] = 0xbe;
260         ptr[7] = 0xef;
261 
262         buf->setRange(0, buf->size() + 8);
263     }
264 
addSDESMyHandler265     static void addSDES(int s, const sp<ABuffer> &buffer) {
266         struct sockaddr_in addr;
267         socklen_t addrSize = sizeof(addr);
268         if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
269             inet_aton("0.0.0.0", &(addr.sin_addr));
270         }
271 
272         uint8_t *data = buffer->data() + buffer->size();
273         data[0] = 0x80 | 1;
274         data[1] = 202;  // SDES
275         data[4] = 0xde;  // SSRC
276         data[5] = 0xad;
277         data[6] = 0xbe;
278         data[7] = 0xef;
279 
280         size_t offset = 8;
281 
282         data[offset++] = 1;  // CNAME
283 
284         AString cname = "stagefright@";
285         cname.append(inet_ntoa(addr.sin_addr));
286         data[offset++] = cname.size();
287 
288         memcpy(&data[offset], cname.c_str(), cname.size());
289         offset += cname.size();
290 
291         data[offset++] = 6;  // TOOL
292 
293         AString tool = MakeUserAgent();
294 
295         data[offset++] = tool.size();
296 
297         memcpy(&data[offset], tool.c_str(), tool.size());
298         offset += tool.size();
299 
300         data[offset++] = 0;
301 
302         if ((offset % 4) > 0) {
303             size_t count = 4 - (offset % 4);
304             switch (count) {
305                 case 3:
306                     data[offset++] = 0;
307                 case 2:
308                     data[offset++] = 0;
309                 case 1:
310                     data[offset++] = 0;
311             }
312         }
313 
314         size_t numWords = (offset / 4) - 1;
315         data[2] = numWords >> 8;
316         data[3] = numWords & 0xff;
317 
318         buffer->setRange(buffer->offset(), buffer->size() + offset);
319     }
320 
321     // In case we're behind NAT, fire off two UDP packets to the remote
322     // rtp/rtcp ports to poke a hole into the firewall for future incoming
323     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler324     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
325         struct sockaddr_in addr;
326         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
327         addr.sin_family = AF_INET;
328 
329         AString source;
330         AString server_port;
331         if (!GetAttribute(transport.c_str(),
332                           "source",
333                           &source)) {
334             ALOGW("Missing 'source' field in Transport response. Using "
335                  "RTSP endpoint address.");
336 
337             struct hostent *ent = gethostbyname(mSessionHost.c_str());
338             if (ent == NULL) {
339                 ALOGE("Failed to look up address of session host '%s'",
340                      mSessionHost.c_str());
341 
342                 return false;
343             }
344 
345             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
346         } else {
347             addr.sin_addr.s_addr = inet_addr(source.c_str());
348         }
349 
350         if (!GetAttribute(transport.c_str(),
351                                  "server_port",
352                                  &server_port)) {
353             ALOGI("Missing 'server_port' field in Transport response.");
354             return false;
355         }
356 
357         int rtpPort, rtcpPort;
358         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
359                 || rtpPort <= 0 || rtpPort > 65535
360                 || rtcpPort <=0 || rtcpPort > 65535
361                 || rtcpPort != rtpPort + 1) {
362             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
363                  " RTP port must be even, RTCP port must be one higher.",
364                  server_port.c_str());
365 
366             return false;
367         }
368 
369         if (rtpPort & 1) {
370             ALOGW("Server picked an odd RTP port, it should've picked an "
371                  "even one, we'll let it pass for now, but this may break "
372                  "in the future.");
373         }
374 
375         if (addr.sin_addr.s_addr == INADDR_NONE) {
376             return true;
377         }
378 
379         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
380             // No firewalls to traverse on the loopback interface.
381             return true;
382         }
383 
384         // Make up an RR/SDES RTCP packet.
385         sp<ABuffer> buf = new ABuffer(65536);
386         buf->setRange(0, 0);
387         addRR(buf);
388         addSDES(rtpSocket, buf);
389 
390         addr.sin_port = htons(rtpPort);
391 
392         ssize_t n = sendto(
393                 rtpSocket, buf->data(), buf->size(), 0,
394                 (const sockaddr *)&addr, sizeof(addr));
395 
396         if (n < (ssize_t)buf->size()) {
397             ALOGE("failed to poke a hole for RTP packets");
398             return false;
399         }
400 
401         addr.sin_port = htons(rtcpPort);
402 
403         n = sendto(
404                 rtcpSocket, buf->data(), buf->size(), 0,
405                 (const sockaddr *)&addr, sizeof(addr));
406 
407         if (n < (ssize_t)buf->size()) {
408             ALOGE("failed to poke a hole for RTCP packets");
409             return false;
410         }
411 
412         ALOGV("successfully poked holes.");
413 
414         return true;
415     }
416 
isLiveStreamMyHandler417     static bool isLiveStream(const sp<ASessionDescription> &desc) {
418         AString attrLiveStream;
419         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
420             ssize_t semicolonPos = attrLiveStream.find(";", 2);
421 
422             const char* liveStreamValue;
423             if (semicolonPos < 0) {
424                 liveStreamValue = attrLiveStream.c_str();
425             } else {
426                 AString valString;
427                 valString.setTo(attrLiveStream,
428                         semicolonPos + 1,
429                         attrLiveStream.size() - semicolonPos - 1);
430                 liveStreamValue = valString.c_str();
431             }
432 
433             uint32_t value = strtoul(liveStreamValue, NULL, 10);
434             if (value == 1) {
435                 ALOGV("found live stream");
436                 return true;
437             }
438         } else {
439             // It is a live stream if no duration is returned
440             int64_t durationUs;
441             if (!desc->getDurationUs(&durationUs)) {
442                 ALOGV("No duration found, assume live stream");
443                 return true;
444             }
445         }
446 
447         return false;
448     }
449 
onMessageReceivedMyHandler450     virtual void onMessageReceived(const sp<AMessage> &msg) {
451         switch (msg->what()) {
452             case 'conn':
453             {
454                 int32_t result;
455                 CHECK(msg->findInt32("result", &result));
456 
457                 ALOGI("connection request completed with result %d (%s)",
458                      result, strerror(-result));
459 
460                 if (result == OK) {
461                     AString request;
462                     request = "DESCRIBE ";
463                     request.append(mSessionURL);
464                     request.append(" RTSP/1.0\r\n");
465                     request.append("Accept: application/sdp\r\n");
466                     request.append("\r\n");
467 
468                     sp<AMessage> reply = new AMessage('desc', this);
469                     mConn->sendRequest(request.c_str(), reply);
470                 } else {
471                     (new AMessage('disc', this))->post();
472                 }
473                 break;
474             }
475 
476             case 'disc':
477             {
478                 ++mKeepAliveGeneration;
479 
480                 int32_t reconnect;
481                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
482                     sp<AMessage> reply = new AMessage('conn', this);
483                     mConn->connect(mOriginalSessionURL.c_str(), reply);
484                 } else {
485                     (new AMessage('quit', this))->post();
486                 }
487                 break;
488             }
489 
490             case 'desc':
491             {
492                 int32_t result;
493                 CHECK(msg->findInt32("result", &result));
494 
495                 ALOGI("DESCRIBE completed with result %d (%s)",
496                      result, strerror(-result));
497 
498                 if (result == OK) {
499                     sp<RefBase> obj;
500                     CHECK(msg->findObject("response", &obj));
501                     sp<ARTSPResponse> response =
502                         static_cast<ARTSPResponse *>(obj.get());
503 
504                     if (response->mStatusCode == 301 || response->mStatusCode == 302) {
505                         ssize_t i = response->mHeaders.indexOfKey("location");
506                         CHECK_GE(i, 0);
507 
508                         mOriginalSessionURL = response->mHeaders.valueAt(i);
509                         mSessionURL = mOriginalSessionURL;
510 
511                         // Strip any authentication info from the session url, we don't
512                         // want to transmit user/pass in cleartext.
513                         AString host, path, user, pass;
514                         unsigned port;
515                         if (ARTSPConnection::ParseURL(
516                                     mSessionURL.c_str(), &host, &port, &path, &user, &pass)
517                                 && user.size() > 0) {
518                             mSessionURL.clear();
519                             mSessionURL.append("rtsp://");
520                             mSessionURL.append(host);
521                             mSessionURL.append(":");
522                             mSessionURL.append(AStringPrintf("%u", port));
523                             mSessionURL.append(path);
524 
525                             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
526                         }
527 
528                         sp<AMessage> reply = new AMessage('conn', this);
529                         mConn->connect(mOriginalSessionURL.c_str(), reply);
530                         break;
531                     }
532 
533                     if (response->mStatusCode != 200) {
534                         result = UNKNOWN_ERROR;
535                     } else if (response->mContent == NULL) {
536                         result = ERROR_MALFORMED;
537                         ALOGE("The response has no content.");
538                     } else {
539                         mSessionDesc = new ASessionDescription;
540 
541                         mSessionDesc->setTo(
542                                 response->mContent->data(),
543                                 response->mContent->size());
544 
545                         if (!mSessionDesc->isValid()) {
546                             ALOGE("Failed to parse session description.");
547                             result = ERROR_MALFORMED;
548                         } else {
549                             ssize_t i = response->mHeaders.indexOfKey("content-base");
550                             if (i >= 0) {
551                                 mBaseURL = response->mHeaders.valueAt(i);
552                             } else {
553                                 i = response->mHeaders.indexOfKey("content-location");
554                                 if (i >= 0) {
555                                     mBaseURL = response->mHeaders.valueAt(i);
556                                 } else {
557                                     mBaseURL = mSessionURL;
558                                 }
559                             }
560 
561                             mSeekable = !isLiveStream(mSessionDesc);
562 
563                             if (!mBaseURL.startsWith("rtsp://")) {
564                                 // Some misbehaving servers specify a relative
565                                 // URL in one of the locations above, combine
566                                 // it with the absolute session URL to get
567                                 // something usable...
568 
569                                 ALOGW("Server specified a non-absolute base URL"
570                                      ", combining it with the session URL to "
571                                      "get something usable...");
572 
573                                 AString tmp;
574                                 CHECK(MakeURL(
575                                             mSessionURL.c_str(),
576                                             mBaseURL.c_str(),
577                                             &tmp));
578 
579                                 mBaseURL = tmp;
580                             }
581 
582                             mControlURL = getControlURL();
583 
584                             if (mSessionDesc->countTracks() < 2) {
585                                 // There's no actual tracks in this session.
586                                 // The first "track" is merely session meta
587                                 // data.
588 
589                                 ALOGW("Session doesn't contain any playable "
590                                      "tracks. Aborting.");
591                                 result = ERROR_UNSUPPORTED;
592                             } else {
593                                 setupTrack(1);
594                             }
595                         }
596                     }
597                 }
598 
599                 if (result != OK) {
600                     sp<AMessage> reply = new AMessage('disc', this);
601                     mConn->disconnect(reply);
602                 }
603                 break;
604             }
605 
606             case 'sdpl':
607             {
608                 int32_t result;
609                 CHECK(msg->findInt32("result", &result));
610 
611                 ALOGI("SDP connection request completed with result %d (%s)",
612                      result, strerror(-result));
613 
614                 if (result == OK) {
615                     sp<RefBase> obj;
616                     CHECK(msg->findObject("description", &obj));
617                     mSessionDesc =
618                         static_cast<ASessionDescription *>(obj.get());
619 
620                     if (!mSessionDesc->isValid()) {
621                         ALOGE("Failed to parse session description.");
622                         result = ERROR_MALFORMED;
623                     } else {
624                         mBaseURL = mSessionURL;
625 
626                         mSeekable = !isLiveStream(mSessionDesc);
627 
628                         mControlURL = getControlURL();
629 
630                         if (mSessionDesc->countTracks() < 2) {
631                             // There's no actual tracks in this session.
632                             // The first "track" is merely session meta
633                             // data.
634 
635                             ALOGW("Session doesn't contain any playable "
636                                  "tracks. Aborting.");
637                             result = ERROR_UNSUPPORTED;
638                         } else {
639                             setupTrack(1);
640                         }
641                     }
642                 }
643 
644                 if (result != OK) {
645                     sp<AMessage> reply = new AMessage('disc', this);
646                     mConn->disconnect(reply);
647                 }
648                 break;
649             }
650 
651             case 'setu':
652             {
653                 size_t index;
654                 CHECK(msg->findSize("index", &index));
655 
656                 TrackInfo *track = NULL;
657                 size_t trackIndex;
658                 if (msg->findSize("track-index", &trackIndex)) {
659                     track = &mTracks.editItemAt(trackIndex);
660                 }
661 
662                 int32_t result;
663                 CHECK(msg->findInt32("result", &result));
664 
665                 ALOGI("SETUP(%zu) completed with result %d (%s)",
666                      index, result, strerror(-result));
667 
668                 if (result == OK) {
669                     CHECK(track != NULL);
670 
671                     sp<RefBase> obj;
672                     CHECK(msg->findObject("response", &obj));
673                     sp<ARTSPResponse> response =
674                         static_cast<ARTSPResponse *>(obj.get());
675 
676                     if (response->mStatusCode != 200) {
677                         result = UNKNOWN_ERROR;
678                     } else {
679                         ssize_t i = response->mHeaders.indexOfKey("session");
680                         CHECK_GE(i, 0);
681 
682                         mSessionID = response->mHeaders.valueAt(i);
683 
684                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
685                         AString timeoutStr;
686                         if (GetAttribute(
687                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
688                             char *end;
689                             unsigned long timeoutSecs =
690                                 strtoul(timeoutStr.c_str(), &end, 10);
691 
692                             if (end == timeoutStr.c_str() || *end != '\0') {
693                                 ALOGW("server specified malformed timeout '%s'",
694                                      timeoutStr.c_str());
695 
696                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
697                             } else if (timeoutSecs < 15) {
698                                 ALOGW("server specified too short a timeout "
699                                      "(%lu secs), using default.",
700                                      timeoutSecs);
701 
702                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
703                             } else {
704                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
705 
706                                 ALOGI("server specified timeout of %lu secs.",
707                                      timeoutSecs);
708                             }
709                         }
710 
711                         i = mSessionID.find(";");
712                         if (i >= 0) {
713                             // Remove options, i.e. ";timeout=90"
714                             mSessionID.erase(i, mSessionID.size() - i);
715                         }
716 
717                         sp<AMessage> notify = new AMessage('accu', this);
718                         notify->setSize("track-index", trackIndex);
719 
720                         i = response->mHeaders.indexOfKey("transport");
721                         CHECK_GE(i, 0);
722 
723                         if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
724                             if (!track->mUsingInterleavedTCP) {
725                                 AString transport = response->mHeaders.valueAt(i);
726 
727                                 // We are going to continue even if we were
728                                 // unable to poke a hole into the firewall...
729                                 pokeAHole(
730                                         track->mRTPSocket,
731                                         track->mRTCPSocket,
732                                         transport);
733                             }
734 
735                             mRTPConn->addStream(
736                                     track->mRTPSocket, track->mRTCPSocket,
737                                     mSessionDesc, index,
738                                     notify, track->mUsingInterleavedTCP);
739 
740                             mSetupTracksSuccessful = true;
741                         } else {
742                             result = BAD_VALUE;
743                         }
744                     }
745                 }
746 
747                 if (result != OK) {
748                     if (track) {
749                         if (!track->mUsingInterleavedTCP) {
750                             // Clear the tag
751                             if (mUIDValid) {
752                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
753                                 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
754                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
755                                 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
756                             }
757 
758                             close(track->mRTPSocket);
759                             close(track->mRTCPSocket);
760                         }
761 
762                         mTracks.removeItemsAt(trackIndex);
763                     }
764                 }
765 
766                 ++index;
767                 if (result == OK && index < mSessionDesc->countTracks()) {
768                     setupTrack(index);
769                 } else if (mSetupTracksSuccessful) {
770                     ++mKeepAliveGeneration;
771                     postKeepAlive();
772 
773                     AString request = "PLAY ";
774                     request.append(mControlURL);
775                     request.append(" RTSP/1.0\r\n");
776 
777                     request.append("Session: ");
778                     request.append(mSessionID);
779                     request.append("\r\n");
780 
781                     request.append("\r\n");
782 
783                     sp<AMessage> reply = new AMessage('play', this);
784                     mConn->sendRequest(request.c_str(), reply);
785                 } else {
786                     sp<AMessage> reply = new AMessage('disc', this);
787                     mConn->disconnect(reply);
788                 }
789                 break;
790             }
791 
792             case 'play':
793             {
794                 int32_t result;
795                 CHECK(msg->findInt32("result", &result));
796 
797                 ALOGI("PLAY completed with result %d (%s)",
798                      result, strerror(-result));
799 
800                 if (result == OK) {
801                     sp<RefBase> obj;
802                     CHECK(msg->findObject("response", &obj));
803                     sp<ARTSPResponse> response =
804                         static_cast<ARTSPResponse *>(obj.get());
805 
806                     if (response->mStatusCode != 200) {
807                         result = UNKNOWN_ERROR;
808                     } else {
809                         parsePlayResponse(response);
810 
811                         sp<AMessage> timeout = new AMessage('tiou', this);
812                         mCheckTimeoutGeneration++;
813                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
814                         timeout->post(kStartupTimeoutUs);
815                     }
816                 }
817 
818                 if (result != OK) {
819                     sp<AMessage> reply = new AMessage('disc', this);
820                     mConn->disconnect(reply);
821                 }
822 
823                 break;
824             }
825 
826             case 'aliv':
827             {
828                 int32_t generation;
829                 CHECK(msg->findInt32("generation", &generation));
830 
831                 if (generation != mKeepAliveGeneration) {
832                     // obsolete event.
833                     break;
834                 }
835 
836                 AString request;
837                 request.append("OPTIONS ");
838                 request.append(mSessionURL);
839                 request.append(" RTSP/1.0\r\n");
840                 request.append("Session: ");
841                 request.append(mSessionID);
842                 request.append("\r\n");
843                 request.append("\r\n");
844 
845                 sp<AMessage> reply = new AMessage('opts', this);
846                 reply->setInt32("generation", mKeepAliveGeneration);
847                 mConn->sendRequest(request.c_str(), reply);
848                 break;
849             }
850 
851             case 'opts':
852             {
853                 int32_t result;
854                 CHECK(msg->findInt32("result", &result));
855 
856                 ALOGI("OPTIONS completed with result %d (%s)",
857                      result, strerror(-result));
858 
859                 int32_t generation;
860                 CHECK(msg->findInt32("generation", &generation));
861 
862                 if (generation != mKeepAliveGeneration) {
863                     // obsolete event.
864                     break;
865                 }
866 
867                 postKeepAlive();
868                 break;
869             }
870 
871             case 'abor':
872             {
873                 for (size_t i = 0; i < mTracks.size(); ++i) {
874                     TrackInfo *info = &mTracks.editItemAt(i);
875 
876                     if (!mFirstAccessUnit) {
877                         postQueueEOS(i, ERROR_END_OF_STREAM);
878                     }
879 
880                     if (!info->mUsingInterleavedTCP) {
881                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
882 
883                         // Clear the tag
884                         if (mUIDValid) {
885                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
886                             HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
887                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
888                             HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
889                         }
890 
891                         close(info->mRTPSocket);
892                         close(info->mRTCPSocket);
893                     }
894                 }
895                 mTracks.clear();
896                 mSetupTracksSuccessful = false;
897                 mSeekPending = false;
898                 mFirstAccessUnit = true;
899                 mAllTracksHaveTime = false;
900                 mNTPAnchorUs = -1;
901                 mMediaAnchorUs = -1;
902                 mNumAccessUnitsReceived = 0;
903                 mReceivedFirstRTCPPacket = false;
904                 mReceivedFirstRTPPacket = false;
905                 mPausing = false;
906                 mSeekable = true;
907 
908                 sp<AMessage> reply = new AMessage('tear', this);
909 
910                 int32_t reconnect;
911                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
912                     reply->setInt32("reconnect", true);
913                 }
914 
915                 AString request;
916                 request = "TEARDOWN ";
917 
918                 // XXX should use aggregate url from SDP here...
919                 request.append(mSessionURL);
920                 request.append(" RTSP/1.0\r\n");
921 
922                 request.append("Session: ");
923                 request.append(mSessionID);
924                 request.append("\r\n");
925 
926                 request.append("\r\n");
927 
928                 mConn->sendRequest(request.c_str(), reply);
929                 break;
930             }
931 
932             case 'tear':
933             {
934                 int32_t result;
935                 CHECK(msg->findInt32("result", &result));
936 
937                 ALOGI("TEARDOWN completed with result %d (%s)",
938                      result, strerror(-result));
939 
940                 sp<AMessage> reply = new AMessage('disc', this);
941 
942                 int32_t reconnect;
943                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
944                     reply->setInt32("reconnect", true);
945                 }
946 
947                 mConn->disconnect(reply);
948                 break;
949             }
950 
951             case 'quit':
952             {
953                 sp<AMessage> msg = mNotify->dup();
954                 msg->setInt32("what", kWhatDisconnected);
955                 msg->setInt32("result", UNKNOWN_ERROR);
956                 msg->post();
957                 break;
958             }
959 
960             case 'chek':
961             {
962                 int32_t generation;
963                 CHECK(msg->findInt32("generation", &generation));
964                 if (generation != mCheckGeneration) {
965                     // This is an outdated message. Ignore.
966                     break;
967                 }
968 
969                 if (mNumAccessUnitsReceived == 0) {
970 #if 1
971                     ALOGI("stream ended? aborting.");
972                     (new AMessage('abor', this))->post();
973                     break;
974 #else
975                     ALOGI("haven't seen an AU in a looong time.");
976 #endif
977                 }
978 
979                 mNumAccessUnitsReceived = 0;
980                 msg->post(kAccessUnitTimeoutUs);
981                 break;
982             }
983 
984             case 'accu':
985             {
986                 if (mSeekPending) {
987                     ALOGV("Stale access unit.");
988                     break;
989                 }
990 
991                 int32_t timeUpdate;
992                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
993                     size_t trackIndex;
994                     CHECK(msg->findSize("track-index", &trackIndex));
995 
996                     uint32_t rtpTime;
997                     uint64_t ntpTime;
998                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
999                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1000 
1001                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
1002                     break;
1003                 }
1004 
1005                 int32_t first;
1006                 if (msg->findInt32("first-rtcp", &first)) {
1007                     mReceivedFirstRTCPPacket = true;
1008                     break;
1009                 }
1010 
1011                 if (msg->findInt32("first-rtp", &first)) {
1012                     mReceivedFirstRTPPacket = true;
1013                     break;
1014                 }
1015 
1016                 ++mNumAccessUnitsReceived;
1017                 postAccessUnitTimeoutCheck();
1018 
1019                 size_t trackIndex;
1020                 CHECK(msg->findSize("track-index", &trackIndex));
1021 
1022                 if (trackIndex >= mTracks.size()) {
1023                     ALOGV("late packets ignored.");
1024                     break;
1025                 }
1026 
1027                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1028 
1029                 int32_t eos;
1030                 if (msg->findInt32("eos", &eos)) {
1031                     ALOGI("received BYE on track index %zu", trackIndex);
1032                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1033                         ALOGI("No time established => fake existing data");
1034 
1035                         track->mEOSReceived = true;
1036                         mTryFakeRTCP = true;
1037                         mReceivedFirstRTCPPacket = true;
1038                         fakeTimestamps();
1039                     } else {
1040                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1041                     }
1042                     return;
1043                 }
1044 
1045                 if (mSeekPending) {
1046                     ALOGV("we're seeking, dropping stale packet.");
1047                     break;
1048                 }
1049 
1050                 sp<ABuffer> accessUnit;
1051                 CHECK(msg->findBuffer("access-unit", &accessUnit));
1052                 onAccessUnitComplete(trackIndex, accessUnit);
1053                 break;
1054             }
1055 
1056             case 'paus':
1057             {
1058                 int32_t generation;
1059                 CHECK(msg->findInt32("pausecheck", &generation));
1060                 if (generation != mPauseGeneration) {
1061                     ALOGV("Ignoring outdated pause message.");
1062                     break;
1063                 }
1064 
1065                 if (!mSeekable) {
1066                     ALOGW("This is a live stream, ignoring pause request.");
1067                     break;
1068                 }
1069 
1070                 if (mPausing) {
1071                     ALOGV("This stream is already paused.");
1072                     break;
1073                 }
1074 
1075                 mCheckPending = true;
1076                 ++mCheckGeneration;
1077                 mPausing = true;
1078 
1079                 AString request = "PAUSE ";
1080                 request.append(mControlURL);
1081                 request.append(" RTSP/1.0\r\n");
1082 
1083                 request.append("Session: ");
1084                 request.append(mSessionID);
1085                 request.append("\r\n");
1086 
1087                 request.append("\r\n");
1088 
1089                 sp<AMessage> reply = new AMessage('pau2', this);
1090                 mConn->sendRequest(request.c_str(), reply);
1091                 break;
1092             }
1093 
1094             case 'pau2':
1095             {
1096                 int32_t result;
1097                 CHECK(msg->findInt32("result", &result));
1098                 mCheckTimeoutGeneration++;
1099 
1100                 ALOGI("PAUSE completed with result %d (%s)",
1101                      result, strerror(-result));
1102                 break;
1103             }
1104 
1105             case 'resu':
1106             {
1107                 if (mPausing && mSeekPending) {
1108                     // If seeking, Play will be sent from see1 instead
1109                     break;
1110                 }
1111 
1112                 if (!mPausing) {
1113                     // Dont send PLAY if we have not paused
1114                     break;
1115                 }
1116                 AString request = "PLAY ";
1117                 request.append(mControlURL);
1118                 request.append(" RTSP/1.0\r\n");
1119 
1120                 request.append("Session: ");
1121                 request.append(mSessionID);
1122                 request.append("\r\n");
1123 
1124                 request.append("\r\n");
1125 
1126                 sp<AMessage> reply = new AMessage('res2', this);
1127                 mConn->sendRequest(request.c_str(), reply);
1128                 break;
1129             }
1130 
1131             case 'res2':
1132             {
1133                 int32_t result;
1134                 CHECK(msg->findInt32("result", &result));
1135 
1136                 ALOGI("PLAY (for resume) completed with result %d (%s)",
1137                      result, strerror(-result));
1138 
1139                 mCheckPending = false;
1140                 ++mCheckGeneration;
1141                 postAccessUnitTimeoutCheck();
1142 
1143                 if (result == OK) {
1144                     sp<RefBase> obj;
1145                     CHECK(msg->findObject("response", &obj));
1146                     sp<ARTSPResponse> response =
1147                         static_cast<ARTSPResponse *>(obj.get());
1148 
1149                     if (response->mStatusCode != 200) {
1150                         result = UNKNOWN_ERROR;
1151                     } else {
1152                         parsePlayResponse(response);
1153 
1154                         // Post new timeout in order to make sure to use
1155                         // fake timestamps if no new Sender Reports arrive
1156                         sp<AMessage> timeout = new AMessage('tiou', this);
1157                         mCheckTimeoutGeneration++;
1158                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1159                         timeout->post(kStartupTimeoutUs);
1160                     }
1161                 }
1162 
1163                 if (result != OK) {
1164                     ALOGE("resume failed, aborting.");
1165                     (new AMessage('abor', this))->post();
1166                 }
1167 
1168                 mPausing = false;
1169                 break;
1170             }
1171 
1172             case 'seek':
1173             {
1174                 if (!mSeekable) {
1175                     ALOGW("This is a live stream, ignoring seek request.");
1176 
1177                     sp<AMessage> msg = mNotify->dup();
1178                     msg->setInt32("what", kWhatSeekDone);
1179                     msg->post();
1180                     break;
1181                 }
1182 
1183                 int64_t timeUs;
1184                 CHECK(msg->findInt64("time", &timeUs));
1185 
1186                 mSeekPending = true;
1187 
1188                 // Disable the access unit timeout until we resumed
1189                 // playback again.
1190                 mCheckPending = true;
1191                 ++mCheckGeneration;
1192 
1193                 sp<AMessage> reply = new AMessage('see0', this);
1194                 reply->setInt64("time", timeUs);
1195 
1196                 if (mPausing) {
1197                     // PAUSE already sent
1198                     ALOGI("Pause already sent");
1199                     reply->post();
1200                     break;
1201                 }
1202                 AString request = "PAUSE ";
1203                 request.append(mControlURL);
1204                 request.append(" RTSP/1.0\r\n");
1205 
1206                 request.append("Session: ");
1207                 request.append(mSessionID);
1208                 request.append("\r\n");
1209 
1210                 request.append("\r\n");
1211 
1212                 mConn->sendRequest(request.c_str(), reply);
1213                 break;
1214             }
1215 
1216             case 'see0':
1217             {
1218                 // Session is paused now.
1219                 status_t err = OK;
1220                 msg->findInt32("result", &err);
1221 
1222                 int64_t timeUs;
1223                 CHECK(msg->findInt64("time", &timeUs));
1224 
1225                 sp<AMessage> notify = mNotify->dup();
1226                 notify->setInt32("what", kWhatSeekPaused);
1227                 notify->setInt32("err", err);
1228                 notify->setInt64("time", timeUs);
1229                 notify->post();
1230                 break;
1231 
1232             }
1233 
1234             case 'see1':
1235             {
1236                 for (size_t i = 0; i < mTracks.size(); ++i) {
1237                     TrackInfo *info = &mTracks.editItemAt(i);
1238 
1239                     postQueueSeekDiscontinuity(i);
1240                     info->mEOSReceived = false;
1241 
1242                     info->mRTPAnchor = 0;
1243                     info->mNTPAnchorUs = -1;
1244                 }
1245 
1246                 mAllTracksHaveTime = false;
1247                 mNTPAnchorUs = -1;
1248 
1249                 // Start new timeoutgeneration to avoid getting timeout
1250                 // before PLAY response arrive
1251                 sp<AMessage> timeout = new AMessage('tiou', this);
1252                 mCheckTimeoutGeneration++;
1253                 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1254                 timeout->post(kStartupTimeoutUs);
1255 
1256                 int64_t timeUs;
1257                 CHECK(msg->findInt64("time", &timeUs));
1258 
1259                 AString request = "PLAY ";
1260                 request.append(mControlURL);
1261                 request.append(" RTSP/1.0\r\n");
1262 
1263                 request.append("Session: ");
1264                 request.append(mSessionID);
1265                 request.append("\r\n");
1266 
1267                 request.append(
1268                         AStringPrintf(
1269                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1270 
1271                 request.append("\r\n");
1272 
1273                 sp<AMessage> reply = new AMessage('see2', this);
1274                 mConn->sendRequest(request.c_str(), reply);
1275                 break;
1276             }
1277 
1278             case 'see2':
1279             {
1280                 if (mTracks.size() == 0) {
1281                     // We have already hit abor, break
1282                     break;
1283                 }
1284 
1285                 int32_t result;
1286                 CHECK(msg->findInt32("result", &result));
1287 
1288                 ALOGI("PLAY (for seek) completed with result %d (%s)",
1289                      result, strerror(-result));
1290 
1291                 mCheckPending = false;
1292                 ++mCheckGeneration;
1293                 postAccessUnitTimeoutCheck();
1294 
1295                 if (result == OK) {
1296                     sp<RefBase> obj;
1297                     CHECK(msg->findObject("response", &obj));
1298                     sp<ARTSPResponse> response =
1299                         static_cast<ARTSPResponse *>(obj.get());
1300 
1301                     if (response->mStatusCode != 200) {
1302                         result = UNKNOWN_ERROR;
1303                     } else {
1304                         parsePlayResponse(response);
1305 
1306                         // Post new timeout in order to make sure to use
1307                         // fake timestamps if no new Sender Reports arrive
1308                         sp<AMessage> timeout = new AMessage('tiou', this);
1309                         mCheckTimeoutGeneration++;
1310                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1311                         timeout->post(kStartupTimeoutUs);
1312 
1313                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1314                         CHECK_GE(i, 0);
1315 
1316                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1317 
1318                         ALOGI("seek completed.");
1319                     }
1320                 }
1321 
1322                 if (result != OK) {
1323                     ALOGE("seek failed, aborting.");
1324                     (new AMessage('abor', this))->post();
1325                 }
1326 
1327                 mPausing = false;
1328                 mSeekPending = false;
1329 
1330                 // Discard all stale access units.
1331                 for (size_t i = 0; i < mTracks.size(); ++i) {
1332                     TrackInfo *track = &mTracks.editItemAt(i);
1333                     track->mPackets.clear();
1334                 }
1335 
1336                 sp<AMessage> msg = mNotify->dup();
1337                 msg->setInt32("what", kWhatSeekDone);
1338                 msg->post();
1339                 break;
1340             }
1341 
1342             case 'biny':
1343             {
1344                 sp<ABuffer> buffer;
1345                 CHECK(msg->findBuffer("buffer", &buffer));
1346 
1347                 int32_t index;
1348                 CHECK(buffer->meta()->findInt32("index", &index));
1349 
1350                 mRTPConn->injectPacket(index, buffer);
1351                 break;
1352             }
1353 
1354             case 'tiou':
1355             {
1356                 int32_t timeoutGenerationCheck;
1357                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1358                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1359                     // This is an outdated message. Ignore.
1360                     // This typically happens if a lot of seeks are
1361                     // performed, since new timeout messages now are
1362                     // posted at seek as well.
1363                     break;
1364                 }
1365                 if (!mReceivedFirstRTCPPacket) {
1366                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1367                         ALOGW("We received RTP packets but no RTCP packets, "
1368                              "using fake timestamps.");
1369 
1370                         mTryFakeRTCP = true;
1371 
1372                         mReceivedFirstRTCPPacket = true;
1373 
1374                         fakeTimestamps();
1375                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1376                         ALOGW("Never received any data, switching transports.");
1377 
1378                         mTryTCPInterleaving = true;
1379 
1380                         sp<AMessage> msg = new AMessage('abor', this);
1381                         msg->setInt32("reconnect", true);
1382                         msg->post();
1383                     } else {
1384                         ALOGW("Never received any data, disconnecting.");
1385                         (new AMessage('abor', this))->post();
1386                     }
1387                 } else {
1388                     if (!mAllTracksHaveTime) {
1389                         ALOGW("We received some RTCP packets, but time "
1390                               "could not be established on all tracks, now "
1391                               "using fake timestamps");
1392 
1393                         fakeTimestamps();
1394                     }
1395                 }
1396                 break;
1397             }
1398 
1399             default:
1400                 TRESPASS();
1401                 break;
1402         }
1403     }
1404 
postKeepAliveMyHandler1405     void postKeepAlive() {
1406         sp<AMessage> msg = new AMessage('aliv', this);
1407         msg->setInt32("generation", mKeepAliveGeneration);
1408         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1409     }
1410 
postAccessUnitTimeoutCheckMyHandler1411     void postAccessUnitTimeoutCheck() {
1412         if (mCheckPending) {
1413             return;
1414         }
1415 
1416         mCheckPending = true;
1417         sp<AMessage> check = new AMessage('chek', this);
1418         check->setInt32("generation", mCheckGeneration);
1419         check->post(kAccessUnitTimeoutUs);
1420     }
1421 
SplitStringMyHandler1422     static void SplitString(
1423             const AString &s, const char *separator, List<AString> *items) {
1424         items->clear();
1425         size_t start = 0;
1426         while (start < s.size()) {
1427             ssize_t offset = s.find(separator, start);
1428 
1429             if (offset < 0) {
1430                 items->push_back(AString(s, start, s.size() - start));
1431                 break;
1432             }
1433 
1434             items->push_back(AString(s, start, offset - start));
1435             start = offset + strlen(separator);
1436         }
1437     }
1438 
parsePlayResponseMyHandler1439     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1440         mPlayResponseParsed = true;
1441         if (mTracks.size() == 0) {
1442             ALOGV("parsePlayResponse: late packets ignored.");
1443             return;
1444         }
1445 
1446         ssize_t i = response->mHeaders.indexOfKey("range");
1447         if (i < 0) {
1448             // Server doesn't even tell use what range it is going to
1449             // play, therefore we won't support seeking.
1450             return;
1451         }
1452 
1453         AString range = response->mHeaders.valueAt(i);
1454         ALOGV("Range: %s", range.c_str());
1455 
1456         AString val;
1457         CHECK(GetAttribute(range.c_str(), "npt", &val));
1458 
1459         float npt1, npt2;
1460         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1461             // This is a live stream and therefore not seekable.
1462 
1463             ALOGI("This is a live stream");
1464             return;
1465         }
1466 
1467         i = response->mHeaders.indexOfKey("rtp-info");
1468         CHECK_GE(i, 0);
1469 
1470         AString rtpInfo = response->mHeaders.valueAt(i);
1471         List<AString> streamInfos;
1472         SplitString(rtpInfo, ",", &streamInfos);
1473 
1474         int n = 1;
1475         for (List<AString>::iterator it = streamInfos.begin();
1476              it != streamInfos.end(); ++it) {
1477             (*it).trim();
1478             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1479 
1480             CHECK(GetAttribute((*it).c_str(), "url", &val));
1481 
1482             size_t trackIndex = 0;
1483             while (trackIndex < mTracks.size()
1484                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1485                 ++trackIndex;
1486             }
1487             CHECK_LT(trackIndex, mTracks.size());
1488 
1489             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1490 
1491             char *end;
1492             unsigned long seq = strtoul(val.c_str(), &end, 10);
1493 
1494             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1495             info->mFirstSeqNumInSegment = seq;
1496             info->mNewSegment = true;
1497             info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1498 
1499             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1500 
1501             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1502 
1503             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1504 
1505             info->mNormalPlayTimeRTP = rtpTime;
1506             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1507 
1508             if (!mFirstAccessUnit) {
1509                 postNormalPlayTimeMapping(
1510                         trackIndex,
1511                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1512             }
1513 
1514             ++n;
1515         }
1516     }
1517 
getTrackFormatMyHandler1518     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1519         CHECK_GE(index, 0u);
1520         CHECK_LT(index, mTracks.size());
1521 
1522         const TrackInfo &info = mTracks.itemAt(index);
1523 
1524         *timeScale = info.mTimeScale;
1525 
1526         return info.mPacketSource->getFormat();
1527     }
1528 
countTracksMyHandler1529     size_t countTracks() const {
1530         return mTracks.size();
1531     }
1532 
1533 private:
1534     struct TrackInfo {
1535         AString mURL;
1536         int mRTPSocket;
1537         int mRTCPSocket;
1538         bool mUsingInterleavedTCP;
1539         uint32_t mFirstSeqNumInSegment;
1540         bool mNewSegment;
1541         int32_t mAllowedStaleAccessUnits;
1542 
1543         uint32_t mRTPAnchor;
1544         int64_t mNTPAnchorUs;
1545         int32_t mTimeScale;
1546         bool mEOSReceived;
1547 
1548         uint32_t mNormalPlayTimeRTP;
1549         int64_t mNormalPlayTimeUs;
1550 
1551         sp<APacketSource> mPacketSource;
1552 
1553         // Stores packets temporarily while no notion of time
1554         // has been established yet.
1555         List<sp<ABuffer> > mPackets;
1556     };
1557 
1558     sp<AMessage> mNotify;
1559     bool mUIDValid;
1560     uid_t mUID;
1561     sp<ALooper> mNetLooper;
1562     sp<ARTSPConnection> mConn;
1563     sp<ARTPConnection> mRTPConn;
1564     sp<ASessionDescription> mSessionDesc;
1565     AString mOriginalSessionURL;  // This one still has user:pass@
1566     AString mSessionURL;
1567     AString mSessionHost;
1568     AString mBaseURL;
1569     AString mControlURL;
1570     AString mSessionID;
1571     bool mSetupTracksSuccessful;
1572     bool mSeekPending;
1573     bool mFirstAccessUnit;
1574 
1575     bool mAllTracksHaveTime;
1576     int64_t mNTPAnchorUs;
1577     int64_t mMediaAnchorUs;
1578     int64_t mLastMediaTimeUs;
1579 
1580     int64_t mNumAccessUnitsReceived;
1581     bool mCheckPending;
1582     int32_t mCheckGeneration;
1583     int32_t mCheckTimeoutGeneration;
1584     bool mTryTCPInterleaving;
1585     bool mTryFakeRTCP;
1586     bool mReceivedFirstRTCPPacket;
1587     bool mReceivedFirstRTPPacket;
1588     bool mSeekable;
1589     int64_t mKeepAliveTimeoutUs;
1590     int32_t mKeepAliveGeneration;
1591     bool mPausing;
1592     int32_t mPauseGeneration;
1593 
1594     Vector<TrackInfo> mTracks;
1595 
1596     bool mPlayResponseParsed;
1597 
setupTrackMyHandler1598     void setupTrack(size_t index) {
1599         sp<APacketSource> source =
1600             new APacketSource(mSessionDesc, index);
1601 
1602         if (source->initCheck() != OK) {
1603             ALOGW("Unsupported format. Ignoring track #%zu.", index);
1604 
1605             sp<AMessage> reply = new AMessage('setu', this);
1606             reply->setSize("index", index);
1607             reply->setInt32("result", ERROR_UNSUPPORTED);
1608             reply->post();
1609             return;
1610         }
1611 
1612         AString url;
1613         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1614 
1615         AString trackURL;
1616         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1617 
1618         mTracks.push(TrackInfo());
1619         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1620         info->mURL = trackURL;
1621         info->mPacketSource = source;
1622         info->mUsingInterleavedTCP = false;
1623         info->mFirstSeqNumInSegment = 0;
1624         info->mNewSegment = true;
1625         info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1626         info->mRTPSocket = -1;
1627         info->mRTCPSocket = -1;
1628         info->mRTPAnchor = 0;
1629         info->mNTPAnchorUs = -1;
1630         info->mNormalPlayTimeRTP = 0;
1631         info->mNormalPlayTimeUs = 0ll;
1632 
1633         unsigned long PT;
1634         AString formatDesc;
1635         AString formatParams;
1636         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1637 
1638         int32_t timescale;
1639         int32_t numChannels;
1640         ASessionDescription::ParseFormatDesc(
1641                 formatDesc.c_str(), &timescale, &numChannels);
1642 
1643         info->mTimeScale = timescale;
1644         info->mEOSReceived = false;
1645 
1646         ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1647 
1648         AString request = "SETUP ";
1649         request.append(trackURL);
1650         request.append(" RTSP/1.0\r\n");
1651 
1652         if (mTryTCPInterleaving) {
1653             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1654             info->mUsingInterleavedTCP = true;
1655             info->mRTPSocket = interleaveIndex;
1656             info->mRTCPSocket = interleaveIndex + 1;
1657 
1658             request.append("Transport: RTP/AVP/TCP;interleaved=");
1659             request.append(interleaveIndex);
1660             request.append("-");
1661             request.append(interleaveIndex + 1);
1662         } else {
1663             unsigned rtpPort;
1664             ARTPConnection::MakePortPair(
1665                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1666 
1667             if (mUIDValid) {
1668                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1669                                                 (uint32_t)*(uint32_t*) "RTP_");
1670                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1671                                                 (uint32_t)*(uint32_t*) "RTP_");
1672                 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1673                 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1674             }
1675 
1676             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1677             request.append(rtpPort);
1678             request.append("-");
1679             request.append(rtpPort + 1);
1680         }
1681 
1682         request.append("\r\n");
1683 
1684         if (index > 1) {
1685             request.append("Session: ");
1686             request.append(mSessionID);
1687             request.append("\r\n");
1688         }
1689 
1690         request.append("\r\n");
1691 
1692         sp<AMessage> reply = new AMessage('setu', this);
1693         reply->setSize("index", index);
1694         reply->setSize("track-index", mTracks.size() - 1);
1695         mConn->sendRequest(request.c_str(), reply);
1696     }
1697 
MakeURLMyHandler1698     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1699         out->clear();
1700 
1701         if (strncasecmp("rtsp://", baseURL, 7)) {
1702             // Base URL must be absolute
1703             return false;
1704         }
1705 
1706         if (!strncasecmp("rtsp://", url, 7)) {
1707             // "url" is already an absolute URL, ignore base URL.
1708             out->setTo(url);
1709             return true;
1710         }
1711 
1712         size_t n = strlen(baseURL);
1713         out->setTo(baseURL);
1714         if (baseURL[n - 1] != '/') {
1715             out->append("/");
1716         }
1717         out->append(url);
1718 
1719         return true;
1720     }
1721 
fakeTimestampsMyHandler1722     void fakeTimestamps() {
1723         mNTPAnchorUs = -1ll;
1724         for (size_t i = 0; i < mTracks.size(); ++i) {
1725             onTimeUpdate(i, 0, 0ll);
1726         }
1727     }
1728 
dataReceivedOnAllChannelsMyHandler1729     bool dataReceivedOnAllChannels() {
1730         TrackInfo *track;
1731         for (size_t i = 0; i < mTracks.size(); ++i) {
1732             track = &mTracks.editItemAt(i);
1733             if (track->mPackets.empty()) {
1734                 return false;
1735             }
1736         }
1737         return true;
1738     }
1739 
handleFirstAccessUnitMyHandler1740     void handleFirstAccessUnit() {
1741         if (mFirstAccessUnit) {
1742             sp<AMessage> msg = mNotify->dup();
1743             msg->setInt32("what", kWhatConnected);
1744             msg->post();
1745 
1746             if (mSeekable) {
1747                 for (size_t i = 0; i < mTracks.size(); ++i) {
1748                     TrackInfo *info = &mTracks.editItemAt(i);
1749 
1750                     postNormalPlayTimeMapping(
1751                             i,
1752                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1753                 }
1754             }
1755 
1756             mFirstAccessUnit = false;
1757         }
1758     }
1759 
onTimeUpdateMyHandler1760     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1761         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1762              trackIndex, rtpTime, (long long)ntpTime);
1763 
1764         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1765 
1766         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1767 
1768         track->mRTPAnchor = rtpTime;
1769         track->mNTPAnchorUs = ntpTimeUs;
1770 
1771         if (mNTPAnchorUs < 0) {
1772             mNTPAnchorUs = ntpTimeUs;
1773             mMediaAnchorUs = mLastMediaTimeUs;
1774         }
1775 
1776         if (!mAllTracksHaveTime) {
1777             bool allTracksHaveTime = (mTracks.size() > 0);
1778             for (size_t i = 0; i < mTracks.size(); ++i) {
1779                 TrackInfo *track = &mTracks.editItemAt(i);
1780                 if (track->mNTPAnchorUs < 0) {
1781                     allTracksHaveTime = false;
1782                     break;
1783                 }
1784             }
1785             if (allTracksHaveTime) {
1786                 mAllTracksHaveTime = true;
1787                 ALOGI("Time now established for all tracks.");
1788             }
1789         }
1790         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1791             handleFirstAccessUnit();
1792 
1793             // Time is now established, lets start timestamping immediately
1794             for (size_t i = 0; i < mTracks.size(); ++i) {
1795                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1796                 while (!trackInfo->mPackets.empty()) {
1797                     sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1798                     trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1799 
1800                     if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1801                         postQueueAccessUnit(i, accessUnit);
1802                     }
1803                 }
1804             }
1805             for (size_t i = 0; i < mTracks.size(); ++i) {
1806                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1807                 if (trackInfo->mEOSReceived) {
1808                     postQueueEOS(i, ERROR_END_OF_STREAM);
1809                     trackInfo->mEOSReceived = false;
1810                 }
1811             }
1812         }
1813     }
1814 
onAccessUnitCompleteMyHandler1815     void onAccessUnitComplete(
1816             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1817         ALOGV("onAccessUnitComplete track %d", trackIndex);
1818 
1819         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1820         if(!mPlayResponseParsed){
1821             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1822             ALOGI("play response is not parsed, storing accessunit %u", seqNum);
1823             track->mPackets.push_back(accessUnit);
1824             return;
1825         }
1826 
1827         handleFirstAccessUnit();
1828 
1829         if (!mAllTracksHaveTime) {
1830             ALOGV("storing accessUnit, no time established yet");
1831             track->mPackets.push_back(accessUnit);
1832             return;
1833         }
1834 
1835         while (!track->mPackets.empty()) {
1836             sp<ABuffer> accessUnit = *track->mPackets.begin();
1837             track->mPackets.erase(track->mPackets.begin());
1838 
1839             uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1840             if (track->mNewSegment) {
1841                 // The sequence number from RTP packet has only 16 bits and is extended
1842                 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1843                 // RTSP "PLAY" command should be used to detect the first RTP packet
1844                 // after seeking.
1845                 if (track->mAllowedStaleAccessUnits > 0) {
1846                     uint32_t seqNum16 = seqNum & 0xffff;
1847                     uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1848                     if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1849                             || seqNum16 < firstSeqNumInSegment16) {
1850                         // Not the first rtp packet of the stream after seeking, discarding.
1851                         track->mAllowedStaleAccessUnits--;
1852                         ALOGV("discarding stale access unit (0x%x : 0x%x)",
1853                              seqNum, track->mFirstSeqNumInSegment);
1854                         continue;
1855                     }
1856                     ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1857                             "Missing the first packet(%u), now take packet(%u) as first one",
1858                             track->mFirstSeqNumInSegment, seqNum);
1859                 } else { // track->mAllowedStaleAccessUnits <= 0
1860                     mNumAccessUnitsReceived = 0;
1861                     ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1862                          "Still no first rtp packet after %d stale ones",
1863                          kMaxAllowedStaleAccessUnits);
1864                     track->mAllowedStaleAccessUnits = -1;
1865                     return;
1866                 }
1867 
1868                 // Now found the first rtp packet of the stream after seeking.
1869                 track->mFirstSeqNumInSegment = seqNum;
1870                 track->mNewSegment = false;
1871             }
1872 
1873             if (seqNum < track->mFirstSeqNumInSegment) {
1874                 ALOGV("dropping stale access-unit (%d < %d)",
1875                      seqNum, track->mFirstSeqNumInSegment);
1876                 continue;
1877             }
1878 
1879 
1880             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1881                 postQueueAccessUnit(trackIndex, accessUnit);
1882             }
1883         }
1884 
1885         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1886             postQueueAccessUnit(trackIndex, accessUnit);
1887         }
1888 
1889         if (track->mEOSReceived) {
1890             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1891             track->mEOSReceived = false;
1892         }
1893     }
1894 
addMediaTimestampMyHandler1895     bool addMediaTimestamp(
1896             int32_t trackIndex, const TrackInfo *track,
1897             const sp<ABuffer> &accessUnit) {
1898         UNUSED_UNLESS_VERBOSE(trackIndex);
1899 
1900         uint32_t rtpTime;
1901         CHECK(accessUnit->meta()->findInt32(
1902                     "rtp-time", (int32_t *)&rtpTime));
1903 
1904         int64_t relRtpTimeUs =
1905             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1906                 / track->mTimeScale;
1907 
1908         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1909 
1910         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1911 
1912         if (mediaTimeUs > mLastMediaTimeUs) {
1913             mLastMediaTimeUs = mediaTimeUs;
1914         }
1915 
1916         if (mediaTimeUs < 0) {
1917             ALOGV("dropping early accessUnit.");
1918             return false;
1919         }
1920 
1921         ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1922              trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1923 
1924         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1925 
1926         return true;
1927     }
1928 
postQueueAccessUnitMyHandler1929     void postQueueAccessUnit(
1930             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1931         sp<AMessage> msg = mNotify->dup();
1932         msg->setInt32("what", kWhatAccessUnit);
1933         msg->setSize("trackIndex", trackIndex);
1934         msg->setBuffer("accessUnit", accessUnit);
1935         msg->post();
1936     }
1937 
postQueueEOSMyHandler1938     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1939         sp<AMessage> msg = mNotify->dup();
1940         msg->setInt32("what", kWhatEOS);
1941         msg->setSize("trackIndex", trackIndex);
1942         msg->setInt32("finalResult", finalResult);
1943         msg->post();
1944     }
1945 
postQueueSeekDiscontinuityMyHandler1946     void postQueueSeekDiscontinuity(size_t trackIndex) {
1947         sp<AMessage> msg = mNotify->dup();
1948         msg->setInt32("what", kWhatSeekDiscontinuity);
1949         msg->setSize("trackIndex", trackIndex);
1950         msg->post();
1951     }
1952 
postNormalPlayTimeMappingMyHandler1953     void postNormalPlayTimeMapping(
1954             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1955         sp<AMessage> msg = mNotify->dup();
1956         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1957         msg->setSize("trackIndex", trackIndex);
1958         msg->setInt32("rtpTime", rtpTime);
1959         msg->setInt64("nptUs", nptUs);
1960         msg->post();
1961     }
1962 
1963     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1964 };
1965 
1966 }  // namespace android
1967 
1968 #endif  // MY_HANDLER_H_
1969