• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MY_HANDLER_H_
18 
19 #define MY_HANDLER_H_
20 
21 //#define LOG_NDEBUG 0
22 #define LOG_TAG "MyHandler"
23 #include <utils/Log.h>
24 
25 #include "APacketSource.h"
26 #include "ARTPConnection.h"
27 #include "ARTSPConnection.h"
28 #include "ASessionDescription.h"
29 
30 #include <ctype.h>
31 
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/ALooper.h>
35 #include <media/stagefright/foundation/AMessage.h>
36 #include <media/stagefright/MediaErrors.h>
37 #include <media/stagefright/Utils.h>
38 
39 #include <arpa/inet.h>
40 #include <sys/socket.h>
41 #include <netdb.h>
42 
43 #include "HTTPBase.h"
44 
45 // If no access units are received within 5 secs, assume that the rtp
46 // stream has ended and signal end of stream.
47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
48 
49 // If no access units arrive for the first 10 secs after starting the
50 // stream, assume none ever will and signal EOS or switch transports.
51 static int64_t kStartupTimeoutUs = 10000000ll;
52 
53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54 
55 static int64_t kPauseDelayUs = 3000000ll;
56 
57 namespace android {
58 
GetAttribute(const char * s,const char * key,AString * value)59 static bool GetAttribute(const char *s, const char *key, AString *value) {
60     value->clear();
61 
62     size_t keyLen = strlen(key);
63 
64     for (;;) {
65         while (isspace(*s)) {
66             ++s;
67         }
68 
69         const char *colonPos = strchr(s, ';');
70 
71         size_t len =
72             (colonPos == NULL) ? strlen(s) : colonPos - s;
73 
74         if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
75             value->setTo(&s[keyLen + 1], len - keyLen - 1);
76             return true;
77         }
78 
79         if (colonPos == NULL) {
80             return false;
81         }
82 
83         s = colonPos + 1;
84     }
85 }
86 
87 struct MyHandler : public AHandler {
88     enum {
89         kWhatConnected                  = 'conn',
90         kWhatDisconnected               = 'disc',
91         kWhatSeekDone                   = 'sdon',
92 
93         kWhatAccessUnit                 = 'accU',
94         kWhatEOS                        = 'eos!',
95         kWhatSeekDiscontinuity          = 'seeD',
96         kWhatNormalPlayTimeMapping      = 'nptM',
97     };
98 
99     MyHandler(
100             const char *url,
101             const sp<AMessage> &notify,
102             bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler103         : mNotify(notify),
104           mUIDValid(uidValid),
105           mUID(uid),
106           mNetLooper(new ALooper),
107           mConn(new ARTSPConnection(mUIDValid, mUID)),
108           mRTPConn(new ARTPConnection),
109           mOriginalSessionURL(url),
110           mSessionURL(url),
111           mSetupTracksSuccessful(false),
112           mSeekPending(false),
113           mFirstAccessUnit(true),
114           mAllTracksHaveTime(false),
115           mNTPAnchorUs(-1),
116           mMediaAnchorUs(-1),
117           mLastMediaTimeUs(0),
118           mNumAccessUnitsReceived(0),
119           mCheckPending(false),
120           mCheckGeneration(0),
121           mCheckTimeoutGeneration(0),
122           mTryTCPInterleaving(false),
123           mTryFakeRTCP(false),
124           mReceivedFirstRTCPPacket(false),
125           mReceivedFirstRTPPacket(false),
126           mSeekable(true),
127           mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
128           mKeepAliveGeneration(0),
129           mPausing(false),
130           mPauseGeneration(0) {
131         mNetLooper->setName("rtsp net");
132         mNetLooper->start(false /* runOnCallingThread */,
133                           false /* canCallJava */,
134                           PRIORITY_HIGHEST);
135 
136         // Strip any authentication info from the session url, we don't
137         // want to transmit user/pass in cleartext.
138         AString host, path, user, pass;
139         unsigned port;
140         CHECK(ARTSPConnection::ParseURL(
141                     mSessionURL.c_str(), &host, &port, &path, &user, &pass));
142 
143         if (user.size() > 0) {
144             mSessionURL.clear();
145             mSessionURL.append("rtsp://");
146             mSessionURL.append(host);
147             mSessionURL.append(":");
148             mSessionURL.append(StringPrintf("%u", port));
149             mSessionURL.append(path);
150 
151             ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
152         }
153 
154         mSessionHost = host;
155     }
156 
connectMyHandler157     void connect() {
158         looper()->registerHandler(mConn);
159         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
160 
161         sp<AMessage> notify = new AMessage('biny', id());
162         mConn->observeBinaryData(notify);
163 
164         sp<AMessage> reply = new AMessage('conn', id());
165         mConn->connect(mOriginalSessionURL.c_str(), reply);
166     }
167 
loadSDPMyHandler168     void loadSDP(const sp<ASessionDescription>& desc) {
169         looper()->registerHandler(mConn);
170         (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
171 
172         sp<AMessage> notify = new AMessage('biny', id());
173         mConn->observeBinaryData(notify);
174 
175         sp<AMessage> reply = new AMessage('sdpl', id());
176         reply->setObject("description", desc);
177         mConn->connect(mOriginalSessionURL.c_str(), reply);
178     }
179 
getControlURLMyHandler180     AString getControlURL(sp<ASessionDescription> desc) {
181         AString sessionLevelControlURL;
182         if (mSessionDesc->findAttribute(
183                 0,
184                 "a=control",
185                 &sessionLevelControlURL)) {
186             if (sessionLevelControlURL.compare("*") == 0) {
187                 return mBaseURL;
188             } else {
189                 AString controlURL;
190                 CHECK(MakeURL(
191                         mBaseURL.c_str(),
192                         sessionLevelControlURL.c_str(),
193                         &controlURL));
194                 return controlURL;
195             }
196         } else {
197             return mSessionURL;
198         }
199     }
200 
disconnectMyHandler201     void disconnect() {
202         (new AMessage('abor', id()))->post();
203     }
204 
seekMyHandler205     void seek(int64_t timeUs) {
206         sp<AMessage> msg = new AMessage('seek', id());
207         msg->setInt64("time", timeUs);
208         mPauseGeneration++;
209         msg->post();
210     }
211 
isSeekableMyHandler212     bool isSeekable() const {
213         return mSeekable;
214     }
215 
pauseMyHandler216     void pause() {
217         sp<AMessage> msg = new AMessage('paus', id());
218         mPauseGeneration++;
219         msg->setInt32("pausecheck", mPauseGeneration);
220         msg->post(kPauseDelayUs);
221     }
222 
resumeMyHandler223     void resume() {
224         sp<AMessage> msg = new AMessage('resu', id());
225         mPauseGeneration++;
226         msg->post();
227     }
228 
addRRMyHandler229     static void addRR(const sp<ABuffer> &buf) {
230         uint8_t *ptr = buf->data() + buf->size();
231         ptr[0] = 0x80 | 0;
232         ptr[1] = 201;  // RR
233         ptr[2] = 0;
234         ptr[3] = 1;
235         ptr[4] = 0xde;  // SSRC
236         ptr[5] = 0xad;
237         ptr[6] = 0xbe;
238         ptr[7] = 0xef;
239 
240         buf->setRange(0, buf->size() + 8);
241     }
242 
addSDESMyHandler243     static void addSDES(int s, const sp<ABuffer> &buffer) {
244         struct sockaddr_in addr;
245         socklen_t addrSize = sizeof(addr);
246         CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
247 
248         uint8_t *data = buffer->data() + buffer->size();
249         data[0] = 0x80 | 1;
250         data[1] = 202;  // SDES
251         data[4] = 0xde;  // SSRC
252         data[5] = 0xad;
253         data[6] = 0xbe;
254         data[7] = 0xef;
255 
256         size_t offset = 8;
257 
258         data[offset++] = 1;  // CNAME
259 
260         AString cname = "stagefright@";
261         cname.append(inet_ntoa(addr.sin_addr));
262         data[offset++] = cname.size();
263 
264         memcpy(&data[offset], cname.c_str(), cname.size());
265         offset += cname.size();
266 
267         data[offset++] = 6;  // TOOL
268 
269         AString tool = MakeUserAgent();
270 
271         data[offset++] = tool.size();
272 
273         memcpy(&data[offset], tool.c_str(), tool.size());
274         offset += tool.size();
275 
276         data[offset++] = 0;
277 
278         if ((offset % 4) > 0) {
279             size_t count = 4 - (offset % 4);
280             switch (count) {
281                 case 3:
282                     data[offset++] = 0;
283                 case 2:
284                     data[offset++] = 0;
285                 case 1:
286                     data[offset++] = 0;
287             }
288         }
289 
290         size_t numWords = (offset / 4) - 1;
291         data[2] = numWords >> 8;
292         data[3] = numWords & 0xff;
293 
294         buffer->setRange(buffer->offset(), buffer->size() + offset);
295     }
296 
297     // In case we're behind NAT, fire off two UDP packets to the remote
298     // rtp/rtcp ports to poke a hole into the firewall for future incoming
299     // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler300     bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
301         struct sockaddr_in addr;
302         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
303         addr.sin_family = AF_INET;
304 
305         AString source;
306         AString server_port;
307         if (!GetAttribute(transport.c_str(),
308                           "source",
309                           &source)) {
310             ALOGW("Missing 'source' field in Transport response. Using "
311                  "RTSP endpoint address.");
312 
313             struct hostent *ent = gethostbyname(mSessionHost.c_str());
314             if (ent == NULL) {
315                 ALOGE("Failed to look up address of session host '%s'",
316                      mSessionHost.c_str());
317 
318                 return false;
319             }
320 
321             addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
322         } else {
323             addr.sin_addr.s_addr = inet_addr(source.c_str());
324         }
325 
326         if (!GetAttribute(transport.c_str(),
327                                  "server_port",
328                                  &server_port)) {
329             ALOGI("Missing 'server_port' field in Transport response.");
330             return false;
331         }
332 
333         int rtpPort, rtcpPort;
334         if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
335                 || rtpPort <= 0 || rtpPort > 65535
336                 || rtcpPort <=0 || rtcpPort > 65535
337                 || rtcpPort != rtpPort + 1) {
338             ALOGE("Server picked invalid RTP/RTCP port pair %s,"
339                  " RTP port must be even, RTCP port must be one higher.",
340                  server_port.c_str());
341 
342             return false;
343         }
344 
345         if (rtpPort & 1) {
346             ALOGW("Server picked an odd RTP port, it should've picked an "
347                  "even one, we'll let it pass for now, but this may break "
348                  "in the future.");
349         }
350 
351         if (addr.sin_addr.s_addr == INADDR_NONE) {
352             return true;
353         }
354 
355         if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
356             // No firewalls to traverse on the loopback interface.
357             return true;
358         }
359 
360         // Make up an RR/SDES RTCP packet.
361         sp<ABuffer> buf = new ABuffer(65536);
362         buf->setRange(0, 0);
363         addRR(buf);
364         addSDES(rtpSocket, buf);
365 
366         addr.sin_port = htons(rtpPort);
367 
368         ssize_t n = sendto(
369                 rtpSocket, buf->data(), buf->size(), 0,
370                 (const sockaddr *)&addr, sizeof(addr));
371 
372         if (n < (ssize_t)buf->size()) {
373             ALOGE("failed to poke a hole for RTP packets");
374             return false;
375         }
376 
377         addr.sin_port = htons(rtcpPort);
378 
379         n = sendto(
380                 rtcpSocket, buf->data(), buf->size(), 0,
381                 (const sockaddr *)&addr, sizeof(addr));
382 
383         if (n < (ssize_t)buf->size()) {
384             ALOGE("failed to poke a hole for RTCP packets");
385             return false;
386         }
387 
388         ALOGV("successfully poked holes.");
389 
390         return true;
391     }
392 
isLiveStreamMyHandler393     static bool isLiveStream(const sp<ASessionDescription> &desc) {
394         AString attrLiveStream;
395         if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
396             ssize_t semicolonPos = attrLiveStream.find(";", 2);
397 
398             const char* liveStreamValue;
399             if (semicolonPos < 0) {
400                 liveStreamValue = attrLiveStream.c_str();
401             } else {
402                 AString valString;
403                 valString.setTo(attrLiveStream,
404                         semicolonPos + 1,
405                         attrLiveStream.size() - semicolonPos - 1);
406                 liveStreamValue = valString.c_str();
407             }
408 
409             uint32_t value = strtoul(liveStreamValue, NULL, 10);
410             if (value == 1) {
411                 ALOGV("found live stream");
412                 return true;
413             }
414         } else {
415             // It is a live stream if no duration is returned
416             int64_t durationUs;
417             if (!desc->getDurationUs(&durationUs)) {
418                 ALOGV("No duration found, assume live stream");
419                 return true;
420             }
421         }
422 
423         return false;
424     }
425 
onMessageReceivedMyHandler426     virtual void onMessageReceived(const sp<AMessage> &msg) {
427         switch (msg->what()) {
428             case 'conn':
429             {
430                 int32_t result;
431                 CHECK(msg->findInt32("result", &result));
432 
433                 ALOGI("connection request completed with result %d (%s)",
434                      result, strerror(-result));
435 
436                 if (result == OK) {
437                     AString request;
438                     request = "DESCRIBE ";
439                     request.append(mSessionURL);
440                     request.append(" RTSP/1.0\r\n");
441                     request.append("Accept: application/sdp\r\n");
442                     request.append("\r\n");
443 
444                     sp<AMessage> reply = new AMessage('desc', id());
445                     mConn->sendRequest(request.c_str(), reply);
446                 } else {
447                     (new AMessage('disc', id()))->post();
448                 }
449                 break;
450             }
451 
452             case 'disc':
453             {
454                 ++mKeepAliveGeneration;
455 
456                 int32_t reconnect;
457                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
458                     sp<AMessage> reply = new AMessage('conn', id());
459                     mConn->connect(mOriginalSessionURL.c_str(), reply);
460                 } else {
461                     (new AMessage('quit', id()))->post();
462                 }
463                 break;
464             }
465 
466             case 'desc':
467             {
468                 int32_t result;
469                 CHECK(msg->findInt32("result", &result));
470 
471                 ALOGI("DESCRIBE completed with result %d (%s)",
472                      result, strerror(-result));
473 
474                 if (result == OK) {
475                     sp<RefBase> obj;
476                     CHECK(msg->findObject("response", &obj));
477                     sp<ARTSPResponse> response =
478                         static_cast<ARTSPResponse *>(obj.get());
479 
480                     if (response->mStatusCode == 302) {
481                         ssize_t i = response->mHeaders.indexOfKey("location");
482                         CHECK_GE(i, 0);
483 
484                         mSessionURL = response->mHeaders.valueAt(i);
485 
486                         AString request;
487                         request = "DESCRIBE ";
488                         request.append(mSessionURL);
489                         request.append(" RTSP/1.0\r\n");
490                         request.append("Accept: application/sdp\r\n");
491                         request.append("\r\n");
492 
493                         sp<AMessage> reply = new AMessage('desc', id());
494                         mConn->sendRequest(request.c_str(), reply);
495                         break;
496                     }
497 
498                     if (response->mStatusCode != 200) {
499                         result = UNKNOWN_ERROR;
500                     } else if (response->mContent == NULL) {
501                         result = ERROR_MALFORMED;
502                         ALOGE("The response has no content.");
503                     } else {
504                         mSessionDesc = new ASessionDescription;
505 
506                         mSessionDesc->setTo(
507                                 response->mContent->data(),
508                                 response->mContent->size());
509 
510                         if (!mSessionDesc->isValid()) {
511                             ALOGE("Failed to parse session description.");
512                             result = ERROR_MALFORMED;
513                         } else {
514                             ssize_t i = response->mHeaders.indexOfKey("content-base");
515                             if (i >= 0) {
516                                 mBaseURL = response->mHeaders.valueAt(i);
517                             } else {
518                                 i = response->mHeaders.indexOfKey("content-location");
519                                 if (i >= 0) {
520                                     mBaseURL = response->mHeaders.valueAt(i);
521                                 } else {
522                                     mBaseURL = mSessionURL;
523                                 }
524                             }
525 
526                             mSeekable = !isLiveStream(mSessionDesc);
527 
528                             if (!mBaseURL.startsWith("rtsp://")) {
529                                 // Some misbehaving servers specify a relative
530                                 // URL in one of the locations above, combine
531                                 // it with the absolute session URL to get
532                                 // something usable...
533 
534                                 ALOGW("Server specified a non-absolute base URL"
535                                      ", combining it with the session URL to "
536                                      "get something usable...");
537 
538                                 AString tmp;
539                                 CHECK(MakeURL(
540                                             mSessionURL.c_str(),
541                                             mBaseURL.c_str(),
542                                             &tmp));
543 
544                                 mBaseURL = tmp;
545                             }
546 
547                             mControlURL = getControlURL(mSessionDesc);
548 
549                             if (mSessionDesc->countTracks() < 2) {
550                                 // There's no actual tracks in this session.
551                                 // The first "track" is merely session meta
552                                 // data.
553 
554                                 ALOGW("Session doesn't contain any playable "
555                                      "tracks. Aborting.");
556                                 result = ERROR_UNSUPPORTED;
557                             } else {
558                                 setupTrack(1);
559                             }
560                         }
561                     }
562                 }
563 
564                 if (result != OK) {
565                     sp<AMessage> reply = new AMessage('disc', id());
566                     mConn->disconnect(reply);
567                 }
568                 break;
569             }
570 
571             case 'sdpl':
572             {
573                 int32_t result;
574                 CHECK(msg->findInt32("result", &result));
575 
576                 ALOGI("SDP connection request completed with result %d (%s)",
577                      result, strerror(-result));
578 
579                 if (result == OK) {
580                     sp<RefBase> obj;
581                     CHECK(msg->findObject("description", &obj));
582                     mSessionDesc =
583                         static_cast<ASessionDescription *>(obj.get());
584 
585                     if (!mSessionDesc->isValid()) {
586                         ALOGE("Failed to parse session description.");
587                         result = ERROR_MALFORMED;
588                     } else {
589                         mBaseURL = mSessionURL;
590 
591                         mSeekable = !isLiveStream(mSessionDesc);
592 
593                         mControlURL = getControlURL(mSessionDesc);
594 
595                         if (mSessionDesc->countTracks() < 2) {
596                             // There's no actual tracks in this session.
597                             // The first "track" is merely session meta
598                             // data.
599 
600                             ALOGW("Session doesn't contain any playable "
601                                  "tracks. Aborting.");
602                             result = ERROR_UNSUPPORTED;
603                         } else {
604                             setupTrack(1);
605                         }
606                     }
607                 }
608 
609                 if (result != OK) {
610                     sp<AMessage> reply = new AMessage('disc', id());
611                     mConn->disconnect(reply);
612                 }
613                 break;
614             }
615 
616             case 'setu':
617             {
618                 size_t index;
619                 CHECK(msg->findSize("index", &index));
620 
621                 TrackInfo *track = NULL;
622                 size_t trackIndex;
623                 if (msg->findSize("track-index", &trackIndex)) {
624                     track = &mTracks.editItemAt(trackIndex);
625                 }
626 
627                 int32_t result;
628                 CHECK(msg->findInt32("result", &result));
629 
630                 ALOGI("SETUP(%d) completed with result %d (%s)",
631                      index, result, strerror(-result));
632 
633                 if (result == OK) {
634                     CHECK(track != NULL);
635 
636                     sp<RefBase> obj;
637                     CHECK(msg->findObject("response", &obj));
638                     sp<ARTSPResponse> response =
639                         static_cast<ARTSPResponse *>(obj.get());
640 
641                     if (response->mStatusCode != 200) {
642                         result = UNKNOWN_ERROR;
643                     } else {
644                         ssize_t i = response->mHeaders.indexOfKey("session");
645                         CHECK_GE(i, 0);
646 
647                         mSessionID = response->mHeaders.valueAt(i);
648 
649                         mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
650                         AString timeoutStr;
651                         if (GetAttribute(
652                                     mSessionID.c_str(), "timeout", &timeoutStr)) {
653                             char *end;
654                             unsigned long timeoutSecs =
655                                 strtoul(timeoutStr.c_str(), &end, 10);
656 
657                             if (end == timeoutStr.c_str() || *end != '\0') {
658                                 ALOGW("server specified malformed timeout '%s'",
659                                      timeoutStr.c_str());
660 
661                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
662                             } else if (timeoutSecs < 15) {
663                                 ALOGW("server specified too short a timeout "
664                                      "(%lu secs), using default.",
665                                      timeoutSecs);
666 
667                                 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
668                             } else {
669                                 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
670 
671                                 ALOGI("server specified timeout of %lu secs.",
672                                      timeoutSecs);
673                             }
674                         }
675 
676                         i = mSessionID.find(";");
677                         if (i >= 0) {
678                             // Remove options, i.e. ";timeout=90"
679                             mSessionID.erase(i, mSessionID.size() - i);
680                         }
681 
682                         sp<AMessage> notify = new AMessage('accu', id());
683                         notify->setSize("track-index", trackIndex);
684 
685                         i = response->mHeaders.indexOfKey("transport");
686                         CHECK_GE(i, 0);
687 
688                         if (!track->mUsingInterleavedTCP) {
689                             AString transport = response->mHeaders.valueAt(i);
690 
691                             // We are going to continue even if we were
692                             // unable to poke a hole into the firewall...
693                             pokeAHole(
694                                     track->mRTPSocket,
695                                     track->mRTCPSocket,
696                                     transport);
697                         }
698 
699                         mRTPConn->addStream(
700                                 track->mRTPSocket, track->mRTCPSocket,
701                                 mSessionDesc, index,
702                                 notify, track->mUsingInterleavedTCP);
703 
704                         mSetupTracksSuccessful = true;
705                     }
706                 }
707 
708                 if (result != OK) {
709                     if (track) {
710                         if (!track->mUsingInterleavedTCP) {
711                             // Clear the tag
712                             if (mUIDValid) {
713                                 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
714                                 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
715                             }
716 
717                             close(track->mRTPSocket);
718                             close(track->mRTCPSocket);
719                         }
720 
721                         mTracks.removeItemsAt(trackIndex);
722                     }
723                 }
724 
725                 ++index;
726                 if (index < mSessionDesc->countTracks()) {
727                     setupTrack(index);
728                 } else if (mSetupTracksSuccessful) {
729                     ++mKeepAliveGeneration;
730                     postKeepAlive();
731 
732                     AString request = "PLAY ";
733                     request.append(mControlURL);
734                     request.append(" RTSP/1.0\r\n");
735 
736                     request.append("Session: ");
737                     request.append(mSessionID);
738                     request.append("\r\n");
739 
740                     request.append("\r\n");
741 
742                     sp<AMessage> reply = new AMessage('play', id());
743                     mConn->sendRequest(request.c_str(), reply);
744                 } else {
745                     sp<AMessage> reply = new AMessage('disc', id());
746                     mConn->disconnect(reply);
747                 }
748                 break;
749             }
750 
751             case 'play':
752             {
753                 int32_t result;
754                 CHECK(msg->findInt32("result", &result));
755 
756                 ALOGI("PLAY completed with result %d (%s)",
757                      result, strerror(-result));
758 
759                 if (result == OK) {
760                     sp<RefBase> obj;
761                     CHECK(msg->findObject("response", &obj));
762                     sp<ARTSPResponse> response =
763                         static_cast<ARTSPResponse *>(obj.get());
764 
765                     if (response->mStatusCode != 200) {
766                         result = UNKNOWN_ERROR;
767                     } else {
768                         parsePlayResponse(response);
769 
770                         sp<AMessage> timeout = new AMessage('tiou', id());
771                         mCheckTimeoutGeneration++;
772                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
773                         timeout->post(kStartupTimeoutUs);
774                     }
775                 }
776 
777                 if (result != OK) {
778                     sp<AMessage> reply = new AMessage('disc', id());
779                     mConn->disconnect(reply);
780                 }
781 
782                 break;
783             }
784 
785             case 'aliv':
786             {
787                 int32_t generation;
788                 CHECK(msg->findInt32("generation", &generation));
789 
790                 if (generation != mKeepAliveGeneration) {
791                     // obsolete event.
792                     break;
793                 }
794 
795                 AString request;
796                 request.append("OPTIONS ");
797                 request.append(mSessionURL);
798                 request.append(" RTSP/1.0\r\n");
799                 request.append("Session: ");
800                 request.append(mSessionID);
801                 request.append("\r\n");
802                 request.append("\r\n");
803 
804                 sp<AMessage> reply = new AMessage('opts', id());
805                 reply->setInt32("generation", mKeepAliveGeneration);
806                 mConn->sendRequest(request.c_str(), reply);
807                 break;
808             }
809 
810             case 'opts':
811             {
812                 int32_t result;
813                 CHECK(msg->findInt32("result", &result));
814 
815                 ALOGI("OPTIONS completed with result %d (%s)",
816                      result, strerror(-result));
817 
818                 int32_t generation;
819                 CHECK(msg->findInt32("generation", &generation));
820 
821                 if (generation != mKeepAliveGeneration) {
822                     // obsolete event.
823                     break;
824                 }
825 
826                 postKeepAlive();
827                 break;
828             }
829 
830             case 'abor':
831             {
832                 for (size_t i = 0; i < mTracks.size(); ++i) {
833                     TrackInfo *info = &mTracks.editItemAt(i);
834 
835                     if (!mFirstAccessUnit) {
836                         postQueueEOS(i, ERROR_END_OF_STREAM);
837                     }
838 
839                     if (!info->mUsingInterleavedTCP) {
840                         mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
841 
842                         // Clear the tag
843                         if (mUIDValid) {
844                             HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
845                             HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
846                         }
847 
848                         close(info->mRTPSocket);
849                         close(info->mRTCPSocket);
850                     }
851                 }
852                 mTracks.clear();
853                 mSetupTracksSuccessful = false;
854                 mSeekPending = false;
855                 mFirstAccessUnit = true;
856                 mAllTracksHaveTime = false;
857                 mNTPAnchorUs = -1;
858                 mMediaAnchorUs = -1;
859                 mNumAccessUnitsReceived = 0;
860                 mReceivedFirstRTCPPacket = false;
861                 mReceivedFirstRTPPacket = false;
862                 mPausing = false;
863                 mSeekable = true;
864 
865                 sp<AMessage> reply = new AMessage('tear', id());
866 
867                 int32_t reconnect;
868                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
869                     reply->setInt32("reconnect", true);
870                 }
871 
872                 AString request;
873                 request = "TEARDOWN ";
874 
875                 // XXX should use aggregate url from SDP here...
876                 request.append(mSessionURL);
877                 request.append(" RTSP/1.0\r\n");
878 
879                 request.append("Session: ");
880                 request.append(mSessionID);
881                 request.append("\r\n");
882 
883                 request.append("\r\n");
884 
885                 mConn->sendRequest(request.c_str(), reply);
886                 break;
887             }
888 
889             case 'tear':
890             {
891                 int32_t result;
892                 CHECK(msg->findInt32("result", &result));
893 
894                 ALOGI("TEARDOWN completed with result %d (%s)",
895                      result, strerror(-result));
896 
897                 sp<AMessage> reply = new AMessage('disc', id());
898 
899                 int32_t reconnect;
900                 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
901                     reply->setInt32("reconnect", true);
902                 }
903 
904                 mConn->disconnect(reply);
905                 break;
906             }
907 
908             case 'quit':
909             {
910                 sp<AMessage> msg = mNotify->dup();
911                 msg->setInt32("what", kWhatDisconnected);
912                 msg->setInt32("result", UNKNOWN_ERROR);
913                 msg->post();
914                 break;
915             }
916 
917             case 'chek':
918             {
919                 int32_t generation;
920                 CHECK(msg->findInt32("generation", &generation));
921                 if (generation != mCheckGeneration) {
922                     // This is an outdated message. Ignore.
923                     break;
924                 }
925 
926                 if (mNumAccessUnitsReceived == 0) {
927 #if 1
928                     ALOGI("stream ended? aborting.");
929                     (new AMessage('abor', id()))->post();
930                     break;
931 #else
932                     ALOGI("haven't seen an AU in a looong time.");
933 #endif
934                 }
935 
936                 mNumAccessUnitsReceived = 0;
937                 msg->post(kAccessUnitTimeoutUs);
938                 break;
939             }
940 
941             case 'accu':
942             {
943                 int32_t timeUpdate;
944                 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
945                     size_t trackIndex;
946                     CHECK(msg->findSize("track-index", &trackIndex));
947 
948                     uint32_t rtpTime;
949                     uint64_t ntpTime;
950                     CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
951                     CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
952 
953                     onTimeUpdate(trackIndex, rtpTime, ntpTime);
954                     break;
955                 }
956 
957                 int32_t first;
958                 if (msg->findInt32("first-rtcp", &first)) {
959                     mReceivedFirstRTCPPacket = true;
960                     break;
961                 }
962 
963                 if (msg->findInt32("first-rtp", &first)) {
964                     mReceivedFirstRTPPacket = true;
965                     break;
966                 }
967 
968                 ++mNumAccessUnitsReceived;
969                 postAccessUnitTimeoutCheck();
970 
971                 size_t trackIndex;
972                 CHECK(msg->findSize("track-index", &trackIndex));
973 
974                 if (trackIndex >= mTracks.size()) {
975                     ALOGV("late packets ignored.");
976                     break;
977                 }
978 
979                 TrackInfo *track = &mTracks.editItemAt(trackIndex);
980 
981                 int32_t eos;
982                 if (msg->findInt32("eos", &eos)) {
983                     ALOGI("received BYE on track index %d", trackIndex);
984                     if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
985                         ALOGI("No time established => fake existing data");
986 
987                         track->mEOSReceived = true;
988                         mTryFakeRTCP = true;
989                         mReceivedFirstRTCPPacket = true;
990                         fakeTimestamps();
991                     } else {
992                         postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
993                     }
994                     return;
995                 }
996 
997                 sp<ABuffer> accessUnit;
998                 CHECK(msg->findBuffer("access-unit", &accessUnit));
999 
1000                 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1001 
1002                 if (mSeekPending) {
1003                     ALOGV("we're seeking, dropping stale packet.");
1004                     break;
1005                 }
1006 
1007                 if (seqNum < track->mFirstSeqNumInSegment) {
1008                     ALOGV("dropping stale access-unit (%d < %d)",
1009                          seqNum, track->mFirstSeqNumInSegment);
1010                     break;
1011                 }
1012 
1013                 if (track->mNewSegment) {
1014                     track->mNewSegment = false;
1015                 }
1016 
1017                 onAccessUnitComplete(trackIndex, accessUnit);
1018                 break;
1019             }
1020 
1021             case 'paus':
1022             {
1023                 int32_t generation;
1024                 CHECK(msg->findInt32("pausecheck", &generation));
1025                 if (generation != mPauseGeneration) {
1026                     ALOGV("Ignoring outdated pause message.");
1027                     break;
1028                 }
1029 
1030                 if (!mSeekable) {
1031                     ALOGW("This is a live stream, ignoring pause request.");
1032                     break;
1033                 }
1034                 mCheckPending = true;
1035                 ++mCheckGeneration;
1036                 mPausing = true;
1037 
1038                 AString request = "PAUSE ";
1039                 request.append(mControlURL);
1040                 request.append(" RTSP/1.0\r\n");
1041 
1042                 request.append("Session: ");
1043                 request.append(mSessionID);
1044                 request.append("\r\n");
1045 
1046                 request.append("\r\n");
1047 
1048                 sp<AMessage> reply = new AMessage('pau2', id());
1049                 mConn->sendRequest(request.c_str(), reply);
1050                 break;
1051             }
1052 
1053             case 'pau2':
1054             {
1055                 int32_t result;
1056                 CHECK(msg->findInt32("result", &result));
1057                 mCheckTimeoutGeneration++;
1058 
1059                 ALOGI("PAUSE completed with result %d (%s)",
1060                      result, strerror(-result));
1061                 break;
1062             }
1063 
1064             case 'resu':
1065             {
1066                 if (mPausing && mSeekPending) {
1067                     // If seeking, Play will be sent from see1 instead
1068                     break;
1069                 }
1070 
1071                 if (!mPausing) {
1072                     // Dont send PLAY if we have not paused
1073                     break;
1074                 }
1075                 AString request = "PLAY ";
1076                 request.append(mControlURL);
1077                 request.append(" RTSP/1.0\r\n");
1078 
1079                 request.append("Session: ");
1080                 request.append(mSessionID);
1081                 request.append("\r\n");
1082 
1083                 request.append("\r\n");
1084 
1085                 sp<AMessage> reply = new AMessage('res2', id());
1086                 mConn->sendRequest(request.c_str(), reply);
1087                 break;
1088             }
1089 
1090             case 'res2':
1091             {
1092                 int32_t result;
1093                 CHECK(msg->findInt32("result", &result));
1094 
1095                 ALOGI("PLAY completed with result %d (%s)",
1096                      result, strerror(-result));
1097 
1098                 mCheckPending = false;
1099                 postAccessUnitTimeoutCheck();
1100 
1101                 if (result == OK) {
1102                     sp<RefBase> obj;
1103                     CHECK(msg->findObject("response", &obj));
1104                     sp<ARTSPResponse> response =
1105                         static_cast<ARTSPResponse *>(obj.get());
1106 
1107                     if (response->mStatusCode != 200) {
1108                         result = UNKNOWN_ERROR;
1109                     } else {
1110                         parsePlayResponse(response);
1111 
1112                         // Post new timeout in order to make sure to use
1113                         // fake timestamps if no new Sender Reports arrive
1114                         sp<AMessage> timeout = new AMessage('tiou', id());
1115                         mCheckTimeoutGeneration++;
1116                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1117                         timeout->post(kStartupTimeoutUs);
1118                     }
1119                 }
1120 
1121                 if (result != OK) {
1122                     ALOGE("resume failed, aborting.");
1123                     (new AMessage('abor', id()))->post();
1124                 }
1125 
1126                 mPausing = false;
1127                 break;
1128             }
1129 
1130             case 'seek':
1131             {
1132                 if (!mSeekable) {
1133                     ALOGW("This is a live stream, ignoring seek request.");
1134 
1135                     sp<AMessage> msg = mNotify->dup();
1136                     msg->setInt32("what", kWhatSeekDone);
1137                     msg->post();
1138                     break;
1139                 }
1140 
1141                 int64_t timeUs;
1142                 CHECK(msg->findInt64("time", &timeUs));
1143 
1144                 mSeekPending = true;
1145 
1146                 // Disable the access unit timeout until we resumed
1147                 // playback again.
1148                 mCheckPending = true;
1149                 ++mCheckGeneration;
1150 
1151                 sp<AMessage> reply = new AMessage('see1', id());
1152                 reply->setInt64("time", timeUs);
1153 
1154                 if (mPausing) {
1155                     // PAUSE already sent
1156                     ALOGI("Pause already sent");
1157                     reply->post();
1158                     break;
1159                 }
1160                 AString request = "PAUSE ";
1161                 request.append(mControlURL);
1162                 request.append(" RTSP/1.0\r\n");
1163 
1164                 request.append("Session: ");
1165                 request.append(mSessionID);
1166                 request.append("\r\n");
1167 
1168                 request.append("\r\n");
1169 
1170                 mConn->sendRequest(request.c_str(), reply);
1171                 break;
1172             }
1173 
1174             case 'see1':
1175             {
1176                 // Session is paused now.
1177                 for (size_t i = 0; i < mTracks.size(); ++i) {
1178                     TrackInfo *info = &mTracks.editItemAt(i);
1179 
1180                     postQueueSeekDiscontinuity(i);
1181                     info->mEOSReceived = false;
1182 
1183                     info->mRTPAnchor = 0;
1184                     info->mNTPAnchorUs = -1;
1185                 }
1186 
1187                 mAllTracksHaveTime = false;
1188                 mNTPAnchorUs = -1;
1189 
1190                 // Start new timeoutgeneration to avoid getting timeout
1191                 // before PLAY response arrive
1192                 sp<AMessage> timeout = new AMessage('tiou', id());
1193                 mCheckTimeoutGeneration++;
1194                 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1195                 timeout->post(kStartupTimeoutUs);
1196 
1197                 int64_t timeUs;
1198                 CHECK(msg->findInt64("time", &timeUs));
1199 
1200                 AString request = "PLAY ";
1201                 request.append(mControlURL);
1202                 request.append(" RTSP/1.0\r\n");
1203 
1204                 request.append("Session: ");
1205                 request.append(mSessionID);
1206                 request.append("\r\n");
1207 
1208                 request.append(
1209                         StringPrintf(
1210                             "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1211 
1212                 request.append("\r\n");
1213 
1214                 sp<AMessage> reply = new AMessage('see2', id());
1215                 mConn->sendRequest(request.c_str(), reply);
1216                 break;
1217             }
1218 
1219             case 'see2':
1220             {
1221                 if (mTracks.size() == 0) {
1222                     // We have already hit abor, break
1223                     break;
1224                 }
1225 
1226                 int32_t result;
1227                 CHECK(msg->findInt32("result", &result));
1228 
1229                 ALOGI("PLAY completed with result %d (%s)",
1230                      result, strerror(-result));
1231 
1232                 mCheckPending = false;
1233                 postAccessUnitTimeoutCheck();
1234 
1235                 if (result == OK) {
1236                     sp<RefBase> obj;
1237                     CHECK(msg->findObject("response", &obj));
1238                     sp<ARTSPResponse> response =
1239                         static_cast<ARTSPResponse *>(obj.get());
1240 
1241                     if (response->mStatusCode != 200) {
1242                         result = UNKNOWN_ERROR;
1243                     } else {
1244                         parsePlayResponse(response);
1245 
1246                         // Post new timeout in order to make sure to use
1247                         // fake timestamps if no new Sender Reports arrive
1248                         sp<AMessage> timeout = new AMessage('tiou', id());
1249                         mCheckTimeoutGeneration++;
1250                         timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1251                         timeout->post(kStartupTimeoutUs);
1252 
1253                         ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1254                         CHECK_GE(i, 0);
1255 
1256                         ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1257 
1258                         ALOGI("seek completed.");
1259                     }
1260                 }
1261 
1262                 if (result != OK) {
1263                     ALOGE("seek failed, aborting.");
1264                     (new AMessage('abor', id()))->post();
1265                 }
1266 
1267                 mPausing = false;
1268                 mSeekPending = false;
1269 
1270                 sp<AMessage> msg = mNotify->dup();
1271                 msg->setInt32("what", kWhatSeekDone);
1272                 msg->post();
1273                 break;
1274             }
1275 
1276             case 'biny':
1277             {
1278                 sp<ABuffer> buffer;
1279                 CHECK(msg->findBuffer("buffer", &buffer));
1280 
1281                 int32_t index;
1282                 CHECK(buffer->meta()->findInt32("index", &index));
1283 
1284                 mRTPConn->injectPacket(index, buffer);
1285                 break;
1286             }
1287 
1288             case 'tiou':
1289             {
1290                 int32_t timeoutGenerationCheck;
1291                 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1292                 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1293                     // This is an outdated message. Ignore.
1294                     // This typically happens if a lot of seeks are
1295                     // performed, since new timeout messages now are
1296                     // posted at seek as well.
1297                     break;
1298                 }
1299                 if (!mReceivedFirstRTCPPacket) {
1300                     if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1301                         ALOGW("We received RTP packets but no RTCP packets, "
1302                              "using fake timestamps.");
1303 
1304                         mTryFakeRTCP = true;
1305 
1306                         mReceivedFirstRTCPPacket = true;
1307 
1308                         fakeTimestamps();
1309                     } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1310                         ALOGW("Never received any data, switching transports.");
1311 
1312                         mTryTCPInterleaving = true;
1313 
1314                         sp<AMessage> msg = new AMessage('abor', id());
1315                         msg->setInt32("reconnect", true);
1316                         msg->post();
1317                     } else {
1318                         ALOGW("Never received any data, disconnecting.");
1319                         (new AMessage('abor', id()))->post();
1320                     }
1321                 } else {
1322                     if (!mAllTracksHaveTime) {
1323                         ALOGW("We received some RTCP packets, but time "
1324                               "could not be established on all tracks, now "
1325                               "using fake timestamps");
1326 
1327                         fakeTimestamps();
1328                     }
1329                 }
1330                 break;
1331             }
1332 
1333             default:
1334                 TRESPASS();
1335                 break;
1336         }
1337     }
1338 
postKeepAliveMyHandler1339     void postKeepAlive() {
1340         sp<AMessage> msg = new AMessage('aliv', id());
1341         msg->setInt32("generation", mKeepAliveGeneration);
1342         msg->post((mKeepAliveTimeoutUs * 9) / 10);
1343     }
1344 
postAccessUnitTimeoutCheckMyHandler1345     void postAccessUnitTimeoutCheck() {
1346         if (mCheckPending) {
1347             return;
1348         }
1349 
1350         mCheckPending = true;
1351         sp<AMessage> check = new AMessage('chek', id());
1352         check->setInt32("generation", mCheckGeneration);
1353         check->post(kAccessUnitTimeoutUs);
1354     }
1355 
SplitStringMyHandler1356     static void SplitString(
1357             const AString &s, const char *separator, List<AString> *items) {
1358         items->clear();
1359         size_t start = 0;
1360         while (start < s.size()) {
1361             ssize_t offset = s.find(separator, start);
1362 
1363             if (offset < 0) {
1364                 items->push_back(AString(s, start, s.size() - start));
1365                 break;
1366             }
1367 
1368             items->push_back(AString(s, start, offset - start));
1369             start = offset + strlen(separator);
1370         }
1371     }
1372 
parsePlayResponseMyHandler1373     void parsePlayResponse(const sp<ARTSPResponse> &response) {
1374         if (mTracks.size() == 0) {
1375             ALOGV("parsePlayResponse: late packets ignored.");
1376             return;
1377         }
1378 
1379         ssize_t i = response->mHeaders.indexOfKey("range");
1380         if (i < 0) {
1381             // Server doesn't even tell use what range it is going to
1382             // play, therefore we won't support seeking.
1383             return;
1384         }
1385 
1386         AString range = response->mHeaders.valueAt(i);
1387         ALOGV("Range: %s", range.c_str());
1388 
1389         AString val;
1390         CHECK(GetAttribute(range.c_str(), "npt", &val));
1391 
1392         float npt1, npt2;
1393         if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1394             // This is a live stream and therefore not seekable.
1395 
1396             ALOGI("This is a live stream");
1397             return;
1398         }
1399 
1400         i = response->mHeaders.indexOfKey("rtp-info");
1401         CHECK_GE(i, 0);
1402 
1403         AString rtpInfo = response->mHeaders.valueAt(i);
1404         List<AString> streamInfos;
1405         SplitString(rtpInfo, ",", &streamInfos);
1406 
1407         int n = 1;
1408         for (List<AString>::iterator it = streamInfos.begin();
1409              it != streamInfos.end(); ++it) {
1410             (*it).trim();
1411             ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1412 
1413             CHECK(GetAttribute((*it).c_str(), "url", &val));
1414 
1415             size_t trackIndex = 0;
1416             while (trackIndex < mTracks.size()
1417                     && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1418                 ++trackIndex;
1419             }
1420             CHECK_LT(trackIndex, mTracks.size());
1421 
1422             CHECK(GetAttribute((*it).c_str(), "seq", &val));
1423 
1424             char *end;
1425             unsigned long seq = strtoul(val.c_str(), &end, 10);
1426 
1427             TrackInfo *info = &mTracks.editItemAt(trackIndex);
1428             info->mFirstSeqNumInSegment = seq;
1429             info->mNewSegment = true;
1430 
1431             CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1432 
1433             uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1434 
1435             ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1436 
1437             info->mNormalPlayTimeRTP = rtpTime;
1438             info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1439 
1440             if (!mFirstAccessUnit) {
1441                 postNormalPlayTimeMapping(
1442                         trackIndex,
1443                         info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1444             }
1445 
1446             ++n;
1447         }
1448     }
1449 
getTrackFormatMyHandler1450     sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1451         CHECK_GE(index, 0u);
1452         CHECK_LT(index, mTracks.size());
1453 
1454         const TrackInfo &info = mTracks.itemAt(index);
1455 
1456         *timeScale = info.mTimeScale;
1457 
1458         return info.mPacketSource->getFormat();
1459     }
1460 
countTracksMyHandler1461     size_t countTracks() const {
1462         return mTracks.size();
1463     }
1464 
1465 private:
1466     struct TrackInfo {
1467         AString mURL;
1468         int mRTPSocket;
1469         int mRTCPSocket;
1470         bool mUsingInterleavedTCP;
1471         uint32_t mFirstSeqNumInSegment;
1472         bool mNewSegment;
1473 
1474         uint32_t mRTPAnchor;
1475         int64_t mNTPAnchorUs;
1476         int32_t mTimeScale;
1477         bool mEOSReceived;
1478 
1479         uint32_t mNormalPlayTimeRTP;
1480         int64_t mNormalPlayTimeUs;
1481 
1482         sp<APacketSource> mPacketSource;
1483 
1484         // Stores packets temporarily while no notion of time
1485         // has been established yet.
1486         List<sp<ABuffer> > mPackets;
1487     };
1488 
1489     sp<AMessage> mNotify;
1490     bool mUIDValid;
1491     uid_t mUID;
1492     sp<ALooper> mNetLooper;
1493     sp<ARTSPConnection> mConn;
1494     sp<ARTPConnection> mRTPConn;
1495     sp<ASessionDescription> mSessionDesc;
1496     AString mOriginalSessionURL;  // This one still has user:pass@
1497     AString mSessionURL;
1498     AString mSessionHost;
1499     AString mBaseURL;
1500     AString mControlURL;
1501     AString mSessionID;
1502     bool mSetupTracksSuccessful;
1503     bool mSeekPending;
1504     bool mFirstAccessUnit;
1505 
1506     bool mAllTracksHaveTime;
1507     int64_t mNTPAnchorUs;
1508     int64_t mMediaAnchorUs;
1509     int64_t mLastMediaTimeUs;
1510 
1511     int64_t mNumAccessUnitsReceived;
1512     bool mCheckPending;
1513     int32_t mCheckGeneration;
1514     int32_t mCheckTimeoutGeneration;
1515     bool mTryTCPInterleaving;
1516     bool mTryFakeRTCP;
1517     bool mReceivedFirstRTCPPacket;
1518     bool mReceivedFirstRTPPacket;
1519     bool mSeekable;
1520     int64_t mKeepAliveTimeoutUs;
1521     int32_t mKeepAliveGeneration;
1522     bool mPausing;
1523     int32_t mPauseGeneration;
1524 
1525     Vector<TrackInfo> mTracks;
1526 
setupTrackMyHandler1527     void setupTrack(size_t index) {
1528         sp<APacketSource> source =
1529             new APacketSource(mSessionDesc, index);
1530 
1531         if (source->initCheck() != OK) {
1532             ALOGW("Unsupported format. Ignoring track #%d.", index);
1533 
1534             sp<AMessage> reply = new AMessage('setu', id());
1535             reply->setSize("index", index);
1536             reply->setInt32("result", ERROR_UNSUPPORTED);
1537             reply->post();
1538             return;
1539         }
1540 
1541         AString url;
1542         CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1543 
1544         AString trackURL;
1545         CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1546 
1547         mTracks.push(TrackInfo());
1548         TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1549         info->mURL = trackURL;
1550         info->mPacketSource = source;
1551         info->mUsingInterleavedTCP = false;
1552         info->mFirstSeqNumInSegment = 0;
1553         info->mNewSegment = true;
1554         info->mRTPAnchor = 0;
1555         info->mNTPAnchorUs = -1;
1556         info->mNormalPlayTimeRTP = 0;
1557         info->mNormalPlayTimeUs = 0ll;
1558 
1559         unsigned long PT;
1560         AString formatDesc;
1561         AString formatParams;
1562         mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1563 
1564         int32_t timescale;
1565         int32_t numChannels;
1566         ASessionDescription::ParseFormatDesc(
1567                 formatDesc.c_str(), &timescale, &numChannels);
1568 
1569         info->mTimeScale = timescale;
1570         info->mEOSReceived = false;
1571 
1572         ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1573 
1574         AString request = "SETUP ";
1575         request.append(trackURL);
1576         request.append(" RTSP/1.0\r\n");
1577 
1578         if (mTryTCPInterleaving) {
1579             size_t interleaveIndex = 2 * (mTracks.size() - 1);
1580             info->mUsingInterleavedTCP = true;
1581             info->mRTPSocket = interleaveIndex;
1582             info->mRTCPSocket = interleaveIndex + 1;
1583 
1584             request.append("Transport: RTP/AVP/TCP;interleaved=");
1585             request.append(interleaveIndex);
1586             request.append("-");
1587             request.append(interleaveIndex + 1);
1588         } else {
1589             unsigned rtpPort;
1590             ARTPConnection::MakePortPair(
1591                     &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1592 
1593             if (mUIDValid) {
1594                 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1595                                                 (uint32_t)*(uint32_t*) "RTP_");
1596                 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1597                                                 (uint32_t)*(uint32_t*) "RTP_");
1598             }
1599 
1600             request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1601             request.append(rtpPort);
1602             request.append("-");
1603             request.append(rtpPort + 1);
1604         }
1605 
1606         request.append("\r\n");
1607 
1608         if (index > 1) {
1609             request.append("Session: ");
1610             request.append(mSessionID);
1611             request.append("\r\n");
1612         }
1613 
1614         request.append("\r\n");
1615 
1616         sp<AMessage> reply = new AMessage('setu', id());
1617         reply->setSize("index", index);
1618         reply->setSize("track-index", mTracks.size() - 1);
1619         mConn->sendRequest(request.c_str(), reply);
1620     }
1621 
MakeURLMyHandler1622     static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1623         out->clear();
1624 
1625         if (strncasecmp("rtsp://", baseURL, 7)) {
1626             // Base URL must be absolute
1627             return false;
1628         }
1629 
1630         if (!strncasecmp("rtsp://", url, 7)) {
1631             // "url" is already an absolute URL, ignore base URL.
1632             out->setTo(url);
1633             return true;
1634         }
1635 
1636         size_t n = strlen(baseURL);
1637         if (baseURL[n - 1] == '/') {
1638             out->setTo(baseURL);
1639             out->append(url);
1640         } else {
1641             const char *slashPos = strrchr(baseURL, '/');
1642 
1643             if (slashPos > &baseURL[6]) {
1644                 out->setTo(baseURL, slashPos - baseURL);
1645             } else {
1646                 out->setTo(baseURL);
1647             }
1648 
1649             out->append("/");
1650             out->append(url);
1651         }
1652 
1653         return true;
1654     }
1655 
fakeTimestampsMyHandler1656     void fakeTimestamps() {
1657         mNTPAnchorUs = -1ll;
1658         for (size_t i = 0; i < mTracks.size(); ++i) {
1659             onTimeUpdate(i, 0, 0ll);
1660         }
1661     }
1662 
dataReceivedOnAllChannelsMyHandler1663     bool dataReceivedOnAllChannels() {
1664         TrackInfo *track;
1665         for (size_t i = 0; i < mTracks.size(); ++i) {
1666             track = &mTracks.editItemAt(i);
1667             if (track->mPackets.empty()) {
1668                 return false;
1669             }
1670         }
1671         return true;
1672     }
1673 
onTimeUpdateMyHandler1674     void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1675         ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1676              trackIndex, rtpTime, ntpTime);
1677 
1678         int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1679 
1680         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1681 
1682         track->mRTPAnchor = rtpTime;
1683         track->mNTPAnchorUs = ntpTimeUs;
1684 
1685         if (mNTPAnchorUs < 0) {
1686             mNTPAnchorUs = ntpTimeUs;
1687             mMediaAnchorUs = mLastMediaTimeUs;
1688         }
1689 
1690         if (!mAllTracksHaveTime) {
1691             bool allTracksHaveTime = true;
1692             for (size_t i = 0; i < mTracks.size(); ++i) {
1693                 TrackInfo *track = &mTracks.editItemAt(i);
1694                 if (track->mNTPAnchorUs < 0) {
1695                     allTracksHaveTime = false;
1696                     break;
1697                 }
1698             }
1699             if (allTracksHaveTime) {
1700                 mAllTracksHaveTime = true;
1701                 ALOGI("Time now established for all tracks.");
1702             }
1703         }
1704         if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1705             // Time is now established, lets start timestamping immediately
1706             for (size_t i = 0; i < mTracks.size(); ++i) {
1707                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1708                 while (!trackInfo->mPackets.empty()) {
1709                     sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1710                     trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1711 
1712                     if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1713                         postQueueAccessUnit(i, accessUnit);
1714                     }
1715                 }
1716             }
1717             for (size_t i = 0; i < mTracks.size(); ++i) {
1718                 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1719                 if (trackInfo->mEOSReceived) {
1720                     postQueueEOS(i, ERROR_END_OF_STREAM);
1721                     trackInfo->mEOSReceived = false;
1722                 }
1723             }
1724         }
1725     }
1726 
onAccessUnitCompleteMyHandler1727     void onAccessUnitComplete(
1728             int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1729         ALOGV("onAccessUnitComplete track %d", trackIndex);
1730 
1731         if (mFirstAccessUnit) {
1732             sp<AMessage> msg = mNotify->dup();
1733             msg->setInt32("what", kWhatConnected);
1734             msg->post();
1735 
1736             if (mSeekable) {
1737                 for (size_t i = 0; i < mTracks.size(); ++i) {
1738                     TrackInfo *info = &mTracks.editItemAt(i);
1739 
1740                     postNormalPlayTimeMapping(
1741                             i,
1742                             info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1743                 }
1744             }
1745 
1746             mFirstAccessUnit = false;
1747         }
1748 
1749         TrackInfo *track = &mTracks.editItemAt(trackIndex);
1750 
1751         if (!mAllTracksHaveTime) {
1752             ALOGV("storing accessUnit, no time established yet");
1753             track->mPackets.push_back(accessUnit);
1754             return;
1755         }
1756 
1757         while (!track->mPackets.empty()) {
1758             sp<ABuffer> accessUnit = *track->mPackets.begin();
1759             track->mPackets.erase(track->mPackets.begin());
1760 
1761             if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1762                 postQueueAccessUnit(trackIndex, accessUnit);
1763             }
1764         }
1765 
1766         if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1767             postQueueAccessUnit(trackIndex, accessUnit);
1768         }
1769 
1770         if (track->mEOSReceived) {
1771             postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1772             track->mEOSReceived = false;
1773         }
1774     }
1775 
addMediaTimestampMyHandler1776     bool addMediaTimestamp(
1777             int32_t trackIndex, const TrackInfo *track,
1778             const sp<ABuffer> &accessUnit) {
1779         uint32_t rtpTime;
1780         CHECK(accessUnit->meta()->findInt32(
1781                     "rtp-time", (int32_t *)&rtpTime));
1782 
1783         int64_t relRtpTimeUs =
1784             (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1785                 / track->mTimeScale;
1786 
1787         int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1788 
1789         int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1790 
1791         if (mediaTimeUs > mLastMediaTimeUs) {
1792             mLastMediaTimeUs = mediaTimeUs;
1793         }
1794 
1795         if (mediaTimeUs < 0) {
1796             ALOGV("dropping early accessUnit.");
1797             return false;
1798         }
1799 
1800         ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1801              trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1802 
1803         accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1804 
1805         return true;
1806     }
1807 
postQueueAccessUnitMyHandler1808     void postQueueAccessUnit(
1809             size_t trackIndex, const sp<ABuffer> &accessUnit) {
1810         sp<AMessage> msg = mNotify->dup();
1811         msg->setInt32("what", kWhatAccessUnit);
1812         msg->setSize("trackIndex", trackIndex);
1813         msg->setBuffer("accessUnit", accessUnit);
1814         msg->post();
1815     }
1816 
postQueueEOSMyHandler1817     void postQueueEOS(size_t trackIndex, status_t finalResult) {
1818         sp<AMessage> msg = mNotify->dup();
1819         msg->setInt32("what", kWhatEOS);
1820         msg->setSize("trackIndex", trackIndex);
1821         msg->setInt32("finalResult", finalResult);
1822         msg->post();
1823     }
1824 
postQueueSeekDiscontinuityMyHandler1825     void postQueueSeekDiscontinuity(size_t trackIndex) {
1826         sp<AMessage> msg = mNotify->dup();
1827         msg->setInt32("what", kWhatSeekDiscontinuity);
1828         msg->setSize("trackIndex", trackIndex);
1829         msg->post();
1830     }
1831 
postNormalPlayTimeMappingMyHandler1832     void postNormalPlayTimeMapping(
1833             size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1834         sp<AMessage> msg = mNotify->dup();
1835         msg->setInt32("what", kWhatNormalPlayTimeMapping);
1836         msg->setSize("trackIndex", trackIndex);
1837         msg->setInt32("rtpTime", rtpTime);
1838         msg->setInt64("nptUs", nptUs);
1839         msg->post();
1840     }
1841 
1842     DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1843 };
1844 
1845 }  // namespace android
1846 
1847 #endif  // MY_HANDLER_H_
1848