1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22 #define LOG_TAG "MyHandler"
23 #include <utils/Log.h>
24
25 #include "APacketSource.h"
26 #include "ARTPConnection.h"
27 #include "ARTSPConnection.h"
28 #include "ASessionDescription.h"
29
30 #include <ctype.h>
31
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/ALooper.h>
35 #include <media/stagefright/foundation/AMessage.h>
36 #include <media/stagefright/MediaErrors.h>
37 #include <media/stagefright/Utils.h>
38
39 #include <arpa/inet.h>
40 #include <sys/socket.h>
41 #include <netdb.h>
42
43 #include "HTTPBase.h"
44
45 // If no access units are received within 5 secs, assume that the rtp
46 // stream has ended and signal end of stream.
47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
48
49 // If no access units arrive for the first 10 secs after starting the
50 // stream, assume none ever will and signal EOS or switch transports.
51 static int64_t kStartupTimeoutUs = 10000000ll;
52
53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54
55 static int64_t kPauseDelayUs = 3000000ll;
56
57 namespace android {
58
GetAttribute(const char * s,const char * key,AString * value)59 static bool GetAttribute(const char *s, const char *key, AString *value) {
60 value->clear();
61
62 size_t keyLen = strlen(key);
63
64 for (;;) {
65 while (isspace(*s)) {
66 ++s;
67 }
68
69 const char *colonPos = strchr(s, ';');
70
71 size_t len =
72 (colonPos == NULL) ? strlen(s) : colonPos - s;
73
74 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
75 value->setTo(&s[keyLen + 1], len - keyLen - 1);
76 return true;
77 }
78
79 if (colonPos == NULL) {
80 return false;
81 }
82
83 s = colonPos + 1;
84 }
85 }
86
87 struct MyHandler : public AHandler {
88 enum {
89 kWhatConnected = 'conn',
90 kWhatDisconnected = 'disc',
91 kWhatSeekDone = 'sdon',
92
93 kWhatAccessUnit = 'accU',
94 kWhatEOS = 'eos!',
95 kWhatSeekDiscontinuity = 'seeD',
96 kWhatNormalPlayTimeMapping = 'nptM',
97 };
98
99 MyHandler(
100 const char *url,
101 const sp<AMessage> ¬ify,
102 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler103 : mNotify(notify),
104 mUIDValid(uidValid),
105 mUID(uid),
106 mNetLooper(new ALooper),
107 mConn(new ARTSPConnection(mUIDValid, mUID)),
108 mRTPConn(new ARTPConnection),
109 mOriginalSessionURL(url),
110 mSessionURL(url),
111 mSetupTracksSuccessful(false),
112 mSeekPending(false),
113 mFirstAccessUnit(true),
114 mAllTracksHaveTime(false),
115 mNTPAnchorUs(-1),
116 mMediaAnchorUs(-1),
117 mLastMediaTimeUs(0),
118 mNumAccessUnitsReceived(0),
119 mCheckPending(false),
120 mCheckGeneration(0),
121 mCheckTimeoutGeneration(0),
122 mTryTCPInterleaving(false),
123 mTryFakeRTCP(false),
124 mReceivedFirstRTCPPacket(false),
125 mReceivedFirstRTPPacket(false),
126 mSeekable(true),
127 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
128 mKeepAliveGeneration(0),
129 mPausing(false),
130 mPauseGeneration(0),
131 mPlayResponseParsed(false) {
132 mNetLooper->setName("rtsp net");
133 mNetLooper->start(false /* runOnCallingThread */,
134 false /* canCallJava */,
135 PRIORITY_HIGHEST);
136
137 // Strip any authentication info from the session url, we don't
138 // want to transmit user/pass in cleartext.
139 AString host, path, user, pass;
140 unsigned port;
141 CHECK(ARTSPConnection::ParseURL(
142 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
143
144 if (user.size() > 0) {
145 mSessionURL.clear();
146 mSessionURL.append("rtsp://");
147 mSessionURL.append(host);
148 mSessionURL.append(":");
149 mSessionURL.append(StringPrintf("%u", port));
150 mSessionURL.append(path);
151
152 ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
153 }
154
155 mSessionHost = host;
156 }
157
connectMyHandler158 void connect() {
159 looper()->registerHandler(mConn);
160 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
161
162 sp<AMessage> notify = new AMessage('biny', id());
163 mConn->observeBinaryData(notify);
164
165 sp<AMessage> reply = new AMessage('conn', id());
166 mConn->connect(mOriginalSessionURL.c_str(), reply);
167 }
168
loadSDPMyHandler169 void loadSDP(const sp<ASessionDescription>& desc) {
170 looper()->registerHandler(mConn);
171 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
172
173 sp<AMessage> notify = new AMessage('biny', id());
174 mConn->observeBinaryData(notify);
175
176 sp<AMessage> reply = new AMessage('sdpl', id());
177 reply->setObject("description", desc);
178 mConn->connect(mOriginalSessionURL.c_str(), reply);
179 }
180
getControlURLMyHandler181 AString getControlURL(sp<ASessionDescription> desc) {
182 AString sessionLevelControlURL;
183 if (mSessionDesc->findAttribute(
184 0,
185 "a=control",
186 &sessionLevelControlURL)) {
187 if (sessionLevelControlURL.compare("*") == 0) {
188 return mBaseURL;
189 } else {
190 AString controlURL;
191 CHECK(MakeURL(
192 mBaseURL.c_str(),
193 sessionLevelControlURL.c_str(),
194 &controlURL));
195 return controlURL;
196 }
197 } else {
198 return mSessionURL;
199 }
200 }
201
disconnectMyHandler202 void disconnect() {
203 (new AMessage('abor', id()))->post();
204 }
205
seekMyHandler206 void seek(int64_t timeUs) {
207 sp<AMessage> msg = new AMessage('seek', id());
208 msg->setInt64("time", timeUs);
209 mPauseGeneration++;
210 msg->post();
211 }
212
isSeekableMyHandler213 bool isSeekable() const {
214 return mSeekable;
215 }
216
pauseMyHandler217 void pause() {
218 sp<AMessage> msg = new AMessage('paus', id());
219 mPauseGeneration++;
220 msg->setInt32("pausecheck", mPauseGeneration);
221 msg->post(kPauseDelayUs);
222 }
223
resumeMyHandler224 void resume() {
225 sp<AMessage> msg = new AMessage('resu', id());
226 mPauseGeneration++;
227 msg->post();
228 }
229
addRRMyHandler230 static void addRR(const sp<ABuffer> &buf) {
231 uint8_t *ptr = buf->data() + buf->size();
232 ptr[0] = 0x80 | 0;
233 ptr[1] = 201; // RR
234 ptr[2] = 0;
235 ptr[3] = 1;
236 ptr[4] = 0xde; // SSRC
237 ptr[5] = 0xad;
238 ptr[6] = 0xbe;
239 ptr[7] = 0xef;
240
241 buf->setRange(0, buf->size() + 8);
242 }
243
addSDESMyHandler244 static void addSDES(int s, const sp<ABuffer> &buffer) {
245 struct sockaddr_in addr;
246 socklen_t addrSize = sizeof(addr);
247 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
248
249 uint8_t *data = buffer->data() + buffer->size();
250 data[0] = 0x80 | 1;
251 data[1] = 202; // SDES
252 data[4] = 0xde; // SSRC
253 data[5] = 0xad;
254 data[6] = 0xbe;
255 data[7] = 0xef;
256
257 size_t offset = 8;
258
259 data[offset++] = 1; // CNAME
260
261 AString cname = "stagefright@";
262 cname.append(inet_ntoa(addr.sin_addr));
263 data[offset++] = cname.size();
264
265 memcpy(&data[offset], cname.c_str(), cname.size());
266 offset += cname.size();
267
268 data[offset++] = 6; // TOOL
269
270 AString tool = MakeUserAgent();
271
272 data[offset++] = tool.size();
273
274 memcpy(&data[offset], tool.c_str(), tool.size());
275 offset += tool.size();
276
277 data[offset++] = 0;
278
279 if ((offset % 4) > 0) {
280 size_t count = 4 - (offset % 4);
281 switch (count) {
282 case 3:
283 data[offset++] = 0;
284 case 2:
285 data[offset++] = 0;
286 case 1:
287 data[offset++] = 0;
288 }
289 }
290
291 size_t numWords = (offset / 4) - 1;
292 data[2] = numWords >> 8;
293 data[3] = numWords & 0xff;
294
295 buffer->setRange(buffer->offset(), buffer->size() + offset);
296 }
297
298 // In case we're behind NAT, fire off two UDP packets to the remote
299 // rtp/rtcp ports to poke a hole into the firewall for future incoming
300 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler301 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
302 struct sockaddr_in addr;
303 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
304 addr.sin_family = AF_INET;
305
306 AString source;
307 AString server_port;
308 if (!GetAttribute(transport.c_str(),
309 "source",
310 &source)) {
311 ALOGW("Missing 'source' field in Transport response. Using "
312 "RTSP endpoint address.");
313
314 struct hostent *ent = gethostbyname(mSessionHost.c_str());
315 if (ent == NULL) {
316 ALOGE("Failed to look up address of session host '%s'",
317 mSessionHost.c_str());
318
319 return false;
320 }
321
322 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
323 } else {
324 addr.sin_addr.s_addr = inet_addr(source.c_str());
325 }
326
327 if (!GetAttribute(transport.c_str(),
328 "server_port",
329 &server_port)) {
330 ALOGI("Missing 'server_port' field in Transport response.");
331 return false;
332 }
333
334 int rtpPort, rtcpPort;
335 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
336 || rtpPort <= 0 || rtpPort > 65535
337 || rtcpPort <=0 || rtcpPort > 65535
338 || rtcpPort != rtpPort + 1) {
339 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
340 " RTP port must be even, RTCP port must be one higher.",
341 server_port.c_str());
342
343 return false;
344 }
345
346 if (rtpPort & 1) {
347 ALOGW("Server picked an odd RTP port, it should've picked an "
348 "even one, we'll let it pass for now, but this may break "
349 "in the future.");
350 }
351
352 if (addr.sin_addr.s_addr == INADDR_NONE) {
353 return true;
354 }
355
356 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
357 // No firewalls to traverse on the loopback interface.
358 return true;
359 }
360
361 // Make up an RR/SDES RTCP packet.
362 sp<ABuffer> buf = new ABuffer(65536);
363 buf->setRange(0, 0);
364 addRR(buf);
365 addSDES(rtpSocket, buf);
366
367 addr.sin_port = htons(rtpPort);
368
369 ssize_t n = sendto(
370 rtpSocket, buf->data(), buf->size(), 0,
371 (const sockaddr *)&addr, sizeof(addr));
372
373 if (n < (ssize_t)buf->size()) {
374 ALOGE("failed to poke a hole for RTP packets");
375 return false;
376 }
377
378 addr.sin_port = htons(rtcpPort);
379
380 n = sendto(
381 rtcpSocket, buf->data(), buf->size(), 0,
382 (const sockaddr *)&addr, sizeof(addr));
383
384 if (n < (ssize_t)buf->size()) {
385 ALOGE("failed to poke a hole for RTCP packets");
386 return false;
387 }
388
389 ALOGV("successfully poked holes.");
390
391 return true;
392 }
393
isLiveStreamMyHandler394 static bool isLiveStream(const sp<ASessionDescription> &desc) {
395 AString attrLiveStream;
396 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
397 ssize_t semicolonPos = attrLiveStream.find(";", 2);
398
399 const char* liveStreamValue;
400 if (semicolonPos < 0) {
401 liveStreamValue = attrLiveStream.c_str();
402 } else {
403 AString valString;
404 valString.setTo(attrLiveStream,
405 semicolonPos + 1,
406 attrLiveStream.size() - semicolonPos - 1);
407 liveStreamValue = valString.c_str();
408 }
409
410 uint32_t value = strtoul(liveStreamValue, NULL, 10);
411 if (value == 1) {
412 ALOGV("found live stream");
413 return true;
414 }
415 } else {
416 // It is a live stream if no duration is returned
417 int64_t durationUs;
418 if (!desc->getDurationUs(&durationUs)) {
419 ALOGV("No duration found, assume live stream");
420 return true;
421 }
422 }
423
424 return false;
425 }
426
onMessageReceivedMyHandler427 virtual void onMessageReceived(const sp<AMessage> &msg) {
428 switch (msg->what()) {
429 case 'conn':
430 {
431 int32_t result;
432 CHECK(msg->findInt32("result", &result));
433
434 ALOGI("connection request completed with result %d (%s)",
435 result, strerror(-result));
436
437 if (result == OK) {
438 AString request;
439 request = "DESCRIBE ";
440 request.append(mSessionURL);
441 request.append(" RTSP/1.0\r\n");
442 request.append("Accept: application/sdp\r\n");
443 request.append("\r\n");
444
445 sp<AMessage> reply = new AMessage('desc', id());
446 mConn->sendRequest(request.c_str(), reply);
447 } else {
448 (new AMessage('disc', id()))->post();
449 }
450 break;
451 }
452
453 case 'disc':
454 {
455 ++mKeepAliveGeneration;
456
457 int32_t reconnect;
458 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
459 sp<AMessage> reply = new AMessage('conn', id());
460 mConn->connect(mOriginalSessionURL.c_str(), reply);
461 } else {
462 (new AMessage('quit', id()))->post();
463 }
464 break;
465 }
466
467 case 'desc':
468 {
469 int32_t result;
470 CHECK(msg->findInt32("result", &result));
471
472 ALOGI("DESCRIBE completed with result %d (%s)",
473 result, strerror(-result));
474
475 if (result == OK) {
476 sp<RefBase> obj;
477 CHECK(msg->findObject("response", &obj));
478 sp<ARTSPResponse> response =
479 static_cast<ARTSPResponse *>(obj.get());
480
481 if (response->mStatusCode == 302) {
482 ssize_t i = response->mHeaders.indexOfKey("location");
483 CHECK_GE(i, 0);
484
485 mSessionURL = response->mHeaders.valueAt(i);
486
487 AString request;
488 request = "DESCRIBE ";
489 request.append(mSessionURL);
490 request.append(" RTSP/1.0\r\n");
491 request.append("Accept: application/sdp\r\n");
492 request.append("\r\n");
493
494 sp<AMessage> reply = new AMessage('desc', id());
495 mConn->sendRequest(request.c_str(), reply);
496 break;
497 }
498
499 if (response->mStatusCode != 200) {
500 result = UNKNOWN_ERROR;
501 } else if (response->mContent == NULL) {
502 result = ERROR_MALFORMED;
503 ALOGE("The response has no content.");
504 } else {
505 mSessionDesc = new ASessionDescription;
506
507 mSessionDesc->setTo(
508 response->mContent->data(),
509 response->mContent->size());
510
511 if (!mSessionDesc->isValid()) {
512 ALOGE("Failed to parse session description.");
513 result = ERROR_MALFORMED;
514 } else {
515 ssize_t i = response->mHeaders.indexOfKey("content-base");
516 if (i >= 0) {
517 mBaseURL = response->mHeaders.valueAt(i);
518 } else {
519 i = response->mHeaders.indexOfKey("content-location");
520 if (i >= 0) {
521 mBaseURL = response->mHeaders.valueAt(i);
522 } else {
523 mBaseURL = mSessionURL;
524 }
525 }
526
527 mSeekable = !isLiveStream(mSessionDesc);
528
529 if (!mBaseURL.startsWith("rtsp://")) {
530 // Some misbehaving servers specify a relative
531 // URL in one of the locations above, combine
532 // it with the absolute session URL to get
533 // something usable...
534
535 ALOGW("Server specified a non-absolute base URL"
536 ", combining it with the session URL to "
537 "get something usable...");
538
539 AString tmp;
540 CHECK(MakeURL(
541 mSessionURL.c_str(),
542 mBaseURL.c_str(),
543 &tmp));
544
545 mBaseURL = tmp;
546 }
547
548 mControlURL = getControlURL(mSessionDesc);
549
550 if (mSessionDesc->countTracks() < 2) {
551 // There's no actual tracks in this session.
552 // The first "track" is merely session meta
553 // data.
554
555 ALOGW("Session doesn't contain any playable "
556 "tracks. Aborting.");
557 result = ERROR_UNSUPPORTED;
558 } else {
559 setupTrack(1);
560 }
561 }
562 }
563 }
564
565 if (result != OK) {
566 sp<AMessage> reply = new AMessage('disc', id());
567 mConn->disconnect(reply);
568 }
569 break;
570 }
571
572 case 'sdpl':
573 {
574 int32_t result;
575 CHECK(msg->findInt32("result", &result));
576
577 ALOGI("SDP connection request completed with result %d (%s)",
578 result, strerror(-result));
579
580 if (result == OK) {
581 sp<RefBase> obj;
582 CHECK(msg->findObject("description", &obj));
583 mSessionDesc =
584 static_cast<ASessionDescription *>(obj.get());
585
586 if (!mSessionDesc->isValid()) {
587 ALOGE("Failed to parse session description.");
588 result = ERROR_MALFORMED;
589 } else {
590 mBaseURL = mSessionURL;
591
592 mSeekable = !isLiveStream(mSessionDesc);
593
594 mControlURL = getControlURL(mSessionDesc);
595
596 if (mSessionDesc->countTracks() < 2) {
597 // There's no actual tracks in this session.
598 // The first "track" is merely session meta
599 // data.
600
601 ALOGW("Session doesn't contain any playable "
602 "tracks. Aborting.");
603 result = ERROR_UNSUPPORTED;
604 } else {
605 setupTrack(1);
606 }
607 }
608 }
609
610 if (result != OK) {
611 sp<AMessage> reply = new AMessage('disc', id());
612 mConn->disconnect(reply);
613 }
614 break;
615 }
616
617 case 'setu':
618 {
619 size_t index;
620 CHECK(msg->findSize("index", &index));
621
622 TrackInfo *track = NULL;
623 size_t trackIndex;
624 if (msg->findSize("track-index", &trackIndex)) {
625 track = &mTracks.editItemAt(trackIndex);
626 }
627
628 int32_t result;
629 CHECK(msg->findInt32("result", &result));
630
631 ALOGI("SETUP(%d) completed with result %d (%s)",
632 index, result, strerror(-result));
633
634 if (result == OK) {
635 CHECK(track != NULL);
636
637 sp<RefBase> obj;
638 CHECK(msg->findObject("response", &obj));
639 sp<ARTSPResponse> response =
640 static_cast<ARTSPResponse *>(obj.get());
641
642 if (response->mStatusCode != 200) {
643 result = UNKNOWN_ERROR;
644 } else {
645 ssize_t i = response->mHeaders.indexOfKey("session");
646 CHECK_GE(i, 0);
647
648 mSessionID = response->mHeaders.valueAt(i);
649
650 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
651 AString timeoutStr;
652 if (GetAttribute(
653 mSessionID.c_str(), "timeout", &timeoutStr)) {
654 char *end;
655 unsigned long timeoutSecs =
656 strtoul(timeoutStr.c_str(), &end, 10);
657
658 if (end == timeoutStr.c_str() || *end != '\0') {
659 ALOGW("server specified malformed timeout '%s'",
660 timeoutStr.c_str());
661
662 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
663 } else if (timeoutSecs < 15) {
664 ALOGW("server specified too short a timeout "
665 "(%lu secs), using default.",
666 timeoutSecs);
667
668 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
669 } else {
670 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
671
672 ALOGI("server specified timeout of %lu secs.",
673 timeoutSecs);
674 }
675 }
676
677 i = mSessionID.find(";");
678 if (i >= 0) {
679 // Remove options, i.e. ";timeout=90"
680 mSessionID.erase(i, mSessionID.size() - i);
681 }
682
683 sp<AMessage> notify = new AMessage('accu', id());
684 notify->setSize("track-index", trackIndex);
685
686 i = response->mHeaders.indexOfKey("transport");
687 CHECK_GE(i, 0);
688
689 if (!track->mUsingInterleavedTCP) {
690 AString transport = response->mHeaders.valueAt(i);
691
692 // We are going to continue even if we were
693 // unable to poke a hole into the firewall...
694 pokeAHole(
695 track->mRTPSocket,
696 track->mRTCPSocket,
697 transport);
698 }
699
700 mRTPConn->addStream(
701 track->mRTPSocket, track->mRTCPSocket,
702 mSessionDesc, index,
703 notify, track->mUsingInterleavedTCP);
704
705 mSetupTracksSuccessful = true;
706 }
707 }
708
709 if (result != OK) {
710 if (track) {
711 if (!track->mUsingInterleavedTCP) {
712 // Clear the tag
713 if (mUIDValid) {
714 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
715 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
716 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
717 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
718 }
719
720 close(track->mRTPSocket);
721 close(track->mRTCPSocket);
722 }
723
724 mTracks.removeItemsAt(trackIndex);
725 }
726 }
727
728 ++index;
729 if (index < mSessionDesc->countTracks()) {
730 setupTrack(index);
731 } else if (mSetupTracksSuccessful) {
732 ++mKeepAliveGeneration;
733 postKeepAlive();
734
735 AString request = "PLAY ";
736 request.append(mControlURL);
737 request.append(" RTSP/1.0\r\n");
738
739 request.append("Session: ");
740 request.append(mSessionID);
741 request.append("\r\n");
742
743 request.append("\r\n");
744
745 sp<AMessage> reply = new AMessage('play', id());
746 mConn->sendRequest(request.c_str(), reply);
747 } else {
748 sp<AMessage> reply = new AMessage('disc', id());
749 mConn->disconnect(reply);
750 }
751 break;
752 }
753
754 case 'play':
755 {
756 int32_t result;
757 CHECK(msg->findInt32("result", &result));
758
759 ALOGI("PLAY completed with result %d (%s)",
760 result, strerror(-result));
761
762 if (result == OK) {
763 sp<RefBase> obj;
764 CHECK(msg->findObject("response", &obj));
765 sp<ARTSPResponse> response =
766 static_cast<ARTSPResponse *>(obj.get());
767
768 if (response->mStatusCode != 200) {
769 result = UNKNOWN_ERROR;
770 } else {
771 parsePlayResponse(response);
772
773 sp<AMessage> timeout = new AMessage('tiou', id());
774 mCheckTimeoutGeneration++;
775 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
776 timeout->post(kStartupTimeoutUs);
777 }
778 }
779
780 if (result != OK) {
781 sp<AMessage> reply = new AMessage('disc', id());
782 mConn->disconnect(reply);
783 }
784
785 break;
786 }
787
788 case 'aliv':
789 {
790 int32_t generation;
791 CHECK(msg->findInt32("generation", &generation));
792
793 if (generation != mKeepAliveGeneration) {
794 // obsolete event.
795 break;
796 }
797
798 AString request;
799 request.append("OPTIONS ");
800 request.append(mSessionURL);
801 request.append(" RTSP/1.0\r\n");
802 request.append("Session: ");
803 request.append(mSessionID);
804 request.append("\r\n");
805 request.append("\r\n");
806
807 sp<AMessage> reply = new AMessage('opts', id());
808 reply->setInt32("generation", mKeepAliveGeneration);
809 mConn->sendRequest(request.c_str(), reply);
810 break;
811 }
812
813 case 'opts':
814 {
815 int32_t result;
816 CHECK(msg->findInt32("result", &result));
817
818 ALOGI("OPTIONS completed with result %d (%s)",
819 result, strerror(-result));
820
821 int32_t generation;
822 CHECK(msg->findInt32("generation", &generation));
823
824 if (generation != mKeepAliveGeneration) {
825 // obsolete event.
826 break;
827 }
828
829 postKeepAlive();
830 break;
831 }
832
833 case 'abor':
834 {
835 for (size_t i = 0; i < mTracks.size(); ++i) {
836 TrackInfo *info = &mTracks.editItemAt(i);
837
838 if (!mFirstAccessUnit) {
839 postQueueEOS(i, ERROR_END_OF_STREAM);
840 }
841
842 if (!info->mUsingInterleavedTCP) {
843 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
844
845 // Clear the tag
846 if (mUIDValid) {
847 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
848 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
849 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
850 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
851 }
852
853 close(info->mRTPSocket);
854 close(info->mRTCPSocket);
855 }
856 }
857 mTracks.clear();
858 mSetupTracksSuccessful = false;
859 mSeekPending = false;
860 mFirstAccessUnit = true;
861 mAllTracksHaveTime = false;
862 mNTPAnchorUs = -1;
863 mMediaAnchorUs = -1;
864 mNumAccessUnitsReceived = 0;
865 mReceivedFirstRTCPPacket = false;
866 mReceivedFirstRTPPacket = false;
867 mPausing = false;
868 mSeekable = true;
869
870 sp<AMessage> reply = new AMessage('tear', id());
871
872 int32_t reconnect;
873 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
874 reply->setInt32("reconnect", true);
875 }
876
877 AString request;
878 request = "TEARDOWN ";
879
880 // XXX should use aggregate url from SDP here...
881 request.append(mSessionURL);
882 request.append(" RTSP/1.0\r\n");
883
884 request.append("Session: ");
885 request.append(mSessionID);
886 request.append("\r\n");
887
888 request.append("\r\n");
889
890 mConn->sendRequest(request.c_str(), reply);
891 break;
892 }
893
894 case 'tear':
895 {
896 int32_t result;
897 CHECK(msg->findInt32("result", &result));
898
899 ALOGI("TEARDOWN completed with result %d (%s)",
900 result, strerror(-result));
901
902 sp<AMessage> reply = new AMessage('disc', id());
903
904 int32_t reconnect;
905 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
906 reply->setInt32("reconnect", true);
907 }
908
909 mConn->disconnect(reply);
910 break;
911 }
912
913 case 'quit':
914 {
915 sp<AMessage> msg = mNotify->dup();
916 msg->setInt32("what", kWhatDisconnected);
917 msg->setInt32("result", UNKNOWN_ERROR);
918 msg->post();
919 break;
920 }
921
922 case 'chek':
923 {
924 int32_t generation;
925 CHECK(msg->findInt32("generation", &generation));
926 if (generation != mCheckGeneration) {
927 // This is an outdated message. Ignore.
928 break;
929 }
930
931 if (mNumAccessUnitsReceived == 0) {
932 #if 1
933 ALOGI("stream ended? aborting.");
934 (new AMessage('abor', id()))->post();
935 break;
936 #else
937 ALOGI("haven't seen an AU in a looong time.");
938 #endif
939 }
940
941 mNumAccessUnitsReceived = 0;
942 msg->post(kAccessUnitTimeoutUs);
943 break;
944 }
945
946 case 'accu':
947 {
948 int32_t timeUpdate;
949 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
950 size_t trackIndex;
951 CHECK(msg->findSize("track-index", &trackIndex));
952
953 uint32_t rtpTime;
954 uint64_t ntpTime;
955 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
956 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
957
958 onTimeUpdate(trackIndex, rtpTime, ntpTime);
959 break;
960 }
961
962 int32_t first;
963 if (msg->findInt32("first-rtcp", &first)) {
964 mReceivedFirstRTCPPacket = true;
965 break;
966 }
967
968 if (msg->findInt32("first-rtp", &first)) {
969 mReceivedFirstRTPPacket = true;
970 break;
971 }
972
973 ++mNumAccessUnitsReceived;
974 postAccessUnitTimeoutCheck();
975
976 size_t trackIndex;
977 CHECK(msg->findSize("track-index", &trackIndex));
978
979 if (trackIndex >= mTracks.size()) {
980 ALOGV("late packets ignored.");
981 break;
982 }
983
984 TrackInfo *track = &mTracks.editItemAt(trackIndex);
985
986 int32_t eos;
987 if (msg->findInt32("eos", &eos)) {
988 ALOGI("received BYE on track index %d", trackIndex);
989 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
990 ALOGI("No time established => fake existing data");
991
992 track->mEOSReceived = true;
993 mTryFakeRTCP = true;
994 mReceivedFirstRTCPPacket = true;
995 fakeTimestamps();
996 } else {
997 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
998 }
999 return;
1000 }
1001
1002 sp<ABuffer> accessUnit;
1003 CHECK(msg->findBuffer("access-unit", &accessUnit));
1004
1005 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1006
1007 if (mSeekPending) {
1008 ALOGV("we're seeking, dropping stale packet.");
1009 break;
1010 }
1011
1012 if (seqNum < track->mFirstSeqNumInSegment) {
1013 ALOGV("dropping stale access-unit (%d < %d)",
1014 seqNum, track->mFirstSeqNumInSegment);
1015 break;
1016 }
1017
1018 if (track->mNewSegment) {
1019 track->mNewSegment = false;
1020 }
1021
1022 onAccessUnitComplete(trackIndex, accessUnit);
1023 break;
1024 }
1025
1026 case 'paus':
1027 {
1028 int32_t generation;
1029 CHECK(msg->findInt32("pausecheck", &generation));
1030 if (generation != mPauseGeneration) {
1031 ALOGV("Ignoring outdated pause message.");
1032 break;
1033 }
1034
1035 if (!mSeekable) {
1036 ALOGW("This is a live stream, ignoring pause request.");
1037 break;
1038 }
1039 mCheckPending = true;
1040 ++mCheckGeneration;
1041 mPausing = true;
1042
1043 AString request = "PAUSE ";
1044 request.append(mControlURL);
1045 request.append(" RTSP/1.0\r\n");
1046
1047 request.append("Session: ");
1048 request.append(mSessionID);
1049 request.append("\r\n");
1050
1051 request.append("\r\n");
1052
1053 sp<AMessage> reply = new AMessage('pau2', id());
1054 mConn->sendRequest(request.c_str(), reply);
1055 break;
1056 }
1057
1058 case 'pau2':
1059 {
1060 int32_t result;
1061 CHECK(msg->findInt32("result", &result));
1062 mCheckTimeoutGeneration++;
1063
1064 ALOGI("PAUSE completed with result %d (%s)",
1065 result, strerror(-result));
1066 break;
1067 }
1068
1069 case 'resu':
1070 {
1071 if (mPausing && mSeekPending) {
1072 // If seeking, Play will be sent from see1 instead
1073 break;
1074 }
1075
1076 if (!mPausing) {
1077 // Dont send PLAY if we have not paused
1078 break;
1079 }
1080 AString request = "PLAY ";
1081 request.append(mControlURL);
1082 request.append(" RTSP/1.0\r\n");
1083
1084 request.append("Session: ");
1085 request.append(mSessionID);
1086 request.append("\r\n");
1087
1088 request.append("\r\n");
1089
1090 sp<AMessage> reply = new AMessage('res2', id());
1091 mConn->sendRequest(request.c_str(), reply);
1092 break;
1093 }
1094
1095 case 'res2':
1096 {
1097 int32_t result;
1098 CHECK(msg->findInt32("result", &result));
1099
1100 ALOGI("PLAY completed with result %d (%s)",
1101 result, strerror(-result));
1102
1103 mCheckPending = false;
1104 postAccessUnitTimeoutCheck();
1105
1106 if (result == OK) {
1107 sp<RefBase> obj;
1108 CHECK(msg->findObject("response", &obj));
1109 sp<ARTSPResponse> response =
1110 static_cast<ARTSPResponse *>(obj.get());
1111
1112 if (response->mStatusCode != 200) {
1113 result = UNKNOWN_ERROR;
1114 } else {
1115 parsePlayResponse(response);
1116
1117 // Post new timeout in order to make sure to use
1118 // fake timestamps if no new Sender Reports arrive
1119 sp<AMessage> timeout = new AMessage('tiou', id());
1120 mCheckTimeoutGeneration++;
1121 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1122 timeout->post(kStartupTimeoutUs);
1123 }
1124 }
1125
1126 if (result != OK) {
1127 ALOGE("resume failed, aborting.");
1128 (new AMessage('abor', id()))->post();
1129 }
1130
1131 mPausing = false;
1132 break;
1133 }
1134
1135 case 'seek':
1136 {
1137 if (!mSeekable) {
1138 ALOGW("This is a live stream, ignoring seek request.");
1139
1140 sp<AMessage> msg = mNotify->dup();
1141 msg->setInt32("what", kWhatSeekDone);
1142 msg->post();
1143 break;
1144 }
1145
1146 int64_t timeUs;
1147 CHECK(msg->findInt64("time", &timeUs));
1148
1149 mSeekPending = true;
1150
1151 // Disable the access unit timeout until we resumed
1152 // playback again.
1153 mCheckPending = true;
1154 ++mCheckGeneration;
1155
1156 sp<AMessage> reply = new AMessage('see1', id());
1157 reply->setInt64("time", timeUs);
1158
1159 if (mPausing) {
1160 // PAUSE already sent
1161 ALOGI("Pause already sent");
1162 reply->post();
1163 break;
1164 }
1165 AString request = "PAUSE ";
1166 request.append(mControlURL);
1167 request.append(" RTSP/1.0\r\n");
1168
1169 request.append("Session: ");
1170 request.append(mSessionID);
1171 request.append("\r\n");
1172
1173 request.append("\r\n");
1174
1175 mConn->sendRequest(request.c_str(), reply);
1176 break;
1177 }
1178
1179 case 'see1':
1180 {
1181 // Session is paused now.
1182 for (size_t i = 0; i < mTracks.size(); ++i) {
1183 TrackInfo *info = &mTracks.editItemAt(i);
1184
1185 postQueueSeekDiscontinuity(i);
1186 info->mEOSReceived = false;
1187
1188 info->mRTPAnchor = 0;
1189 info->mNTPAnchorUs = -1;
1190 }
1191
1192 mAllTracksHaveTime = false;
1193 mNTPAnchorUs = -1;
1194
1195 // Start new timeoutgeneration to avoid getting timeout
1196 // before PLAY response arrive
1197 sp<AMessage> timeout = new AMessage('tiou', id());
1198 mCheckTimeoutGeneration++;
1199 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1200 timeout->post(kStartupTimeoutUs);
1201
1202 int64_t timeUs;
1203 CHECK(msg->findInt64("time", &timeUs));
1204
1205 AString request = "PLAY ";
1206 request.append(mControlURL);
1207 request.append(" RTSP/1.0\r\n");
1208
1209 request.append("Session: ");
1210 request.append(mSessionID);
1211 request.append("\r\n");
1212
1213 request.append(
1214 StringPrintf(
1215 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1216
1217 request.append("\r\n");
1218
1219 sp<AMessage> reply = new AMessage('see2', id());
1220 mConn->sendRequest(request.c_str(), reply);
1221 break;
1222 }
1223
1224 case 'see2':
1225 {
1226 if (mTracks.size() == 0) {
1227 // We have already hit abor, break
1228 break;
1229 }
1230
1231 int32_t result;
1232 CHECK(msg->findInt32("result", &result));
1233
1234 ALOGI("PLAY completed with result %d (%s)",
1235 result, strerror(-result));
1236
1237 mCheckPending = false;
1238 postAccessUnitTimeoutCheck();
1239
1240 if (result == OK) {
1241 sp<RefBase> obj;
1242 CHECK(msg->findObject("response", &obj));
1243 sp<ARTSPResponse> response =
1244 static_cast<ARTSPResponse *>(obj.get());
1245
1246 if (response->mStatusCode != 200) {
1247 result = UNKNOWN_ERROR;
1248 } else {
1249 parsePlayResponse(response);
1250
1251 // Post new timeout in order to make sure to use
1252 // fake timestamps if no new Sender Reports arrive
1253 sp<AMessage> timeout = new AMessage('tiou', id());
1254 mCheckTimeoutGeneration++;
1255 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1256 timeout->post(kStartupTimeoutUs);
1257
1258 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1259 CHECK_GE(i, 0);
1260
1261 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1262
1263 ALOGI("seek completed.");
1264 }
1265 }
1266
1267 if (result != OK) {
1268 ALOGE("seek failed, aborting.");
1269 (new AMessage('abor', id()))->post();
1270 }
1271
1272 mPausing = false;
1273 mSeekPending = false;
1274
1275 sp<AMessage> msg = mNotify->dup();
1276 msg->setInt32("what", kWhatSeekDone);
1277 msg->post();
1278 break;
1279 }
1280
1281 case 'biny':
1282 {
1283 sp<ABuffer> buffer;
1284 CHECK(msg->findBuffer("buffer", &buffer));
1285
1286 int32_t index;
1287 CHECK(buffer->meta()->findInt32("index", &index));
1288
1289 mRTPConn->injectPacket(index, buffer);
1290 break;
1291 }
1292
1293 case 'tiou':
1294 {
1295 int32_t timeoutGenerationCheck;
1296 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1297 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1298 // This is an outdated message. Ignore.
1299 // This typically happens if a lot of seeks are
1300 // performed, since new timeout messages now are
1301 // posted at seek as well.
1302 break;
1303 }
1304 if (!mReceivedFirstRTCPPacket) {
1305 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1306 ALOGW("We received RTP packets but no RTCP packets, "
1307 "using fake timestamps.");
1308
1309 mTryFakeRTCP = true;
1310
1311 mReceivedFirstRTCPPacket = true;
1312
1313 fakeTimestamps();
1314 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1315 ALOGW("Never received any data, switching transports.");
1316
1317 mTryTCPInterleaving = true;
1318
1319 sp<AMessage> msg = new AMessage('abor', id());
1320 msg->setInt32("reconnect", true);
1321 msg->post();
1322 } else {
1323 ALOGW("Never received any data, disconnecting.");
1324 (new AMessage('abor', id()))->post();
1325 }
1326 } else {
1327 if (!mAllTracksHaveTime) {
1328 ALOGW("We received some RTCP packets, but time "
1329 "could not be established on all tracks, now "
1330 "using fake timestamps");
1331
1332 fakeTimestamps();
1333 }
1334 }
1335 break;
1336 }
1337
1338 default:
1339 TRESPASS();
1340 break;
1341 }
1342 }
1343
postKeepAliveMyHandler1344 void postKeepAlive() {
1345 sp<AMessage> msg = new AMessage('aliv', id());
1346 msg->setInt32("generation", mKeepAliveGeneration);
1347 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1348 }
1349
postAccessUnitTimeoutCheckMyHandler1350 void postAccessUnitTimeoutCheck() {
1351 if (mCheckPending) {
1352 return;
1353 }
1354
1355 mCheckPending = true;
1356 sp<AMessage> check = new AMessage('chek', id());
1357 check->setInt32("generation", mCheckGeneration);
1358 check->post(kAccessUnitTimeoutUs);
1359 }
1360
SplitStringMyHandler1361 static void SplitString(
1362 const AString &s, const char *separator, List<AString> *items) {
1363 items->clear();
1364 size_t start = 0;
1365 while (start < s.size()) {
1366 ssize_t offset = s.find(separator, start);
1367
1368 if (offset < 0) {
1369 items->push_back(AString(s, start, s.size() - start));
1370 break;
1371 }
1372
1373 items->push_back(AString(s, start, offset - start));
1374 start = offset + strlen(separator);
1375 }
1376 }
1377
parsePlayResponseMyHandler1378 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1379 mPlayResponseParsed = true;
1380 if (mTracks.size() == 0) {
1381 ALOGV("parsePlayResponse: late packets ignored.");
1382 return;
1383 }
1384
1385 ssize_t i = response->mHeaders.indexOfKey("range");
1386 if (i < 0) {
1387 // Server doesn't even tell use what range it is going to
1388 // play, therefore we won't support seeking.
1389 return;
1390 }
1391
1392 AString range = response->mHeaders.valueAt(i);
1393 ALOGV("Range: %s", range.c_str());
1394
1395 AString val;
1396 CHECK(GetAttribute(range.c_str(), "npt", &val));
1397
1398 float npt1, npt2;
1399 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1400 // This is a live stream and therefore not seekable.
1401
1402 ALOGI("This is a live stream");
1403 return;
1404 }
1405
1406 i = response->mHeaders.indexOfKey("rtp-info");
1407 CHECK_GE(i, 0);
1408
1409 AString rtpInfo = response->mHeaders.valueAt(i);
1410 List<AString> streamInfos;
1411 SplitString(rtpInfo, ",", &streamInfos);
1412
1413 int n = 1;
1414 for (List<AString>::iterator it = streamInfos.begin();
1415 it != streamInfos.end(); ++it) {
1416 (*it).trim();
1417 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1418
1419 CHECK(GetAttribute((*it).c_str(), "url", &val));
1420
1421 size_t trackIndex = 0;
1422 while (trackIndex < mTracks.size()
1423 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1424 ++trackIndex;
1425 }
1426 CHECK_LT(trackIndex, mTracks.size());
1427
1428 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1429
1430 char *end;
1431 unsigned long seq = strtoul(val.c_str(), &end, 10);
1432
1433 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1434 info->mFirstSeqNumInSegment = seq;
1435 info->mNewSegment = true;
1436
1437 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1438
1439 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1440
1441 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1442
1443 info->mNormalPlayTimeRTP = rtpTime;
1444 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1445
1446 if (!mFirstAccessUnit) {
1447 postNormalPlayTimeMapping(
1448 trackIndex,
1449 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1450 }
1451
1452 ++n;
1453 }
1454 }
1455
getTrackFormatMyHandler1456 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1457 CHECK_GE(index, 0u);
1458 CHECK_LT(index, mTracks.size());
1459
1460 const TrackInfo &info = mTracks.itemAt(index);
1461
1462 *timeScale = info.mTimeScale;
1463
1464 return info.mPacketSource->getFormat();
1465 }
1466
countTracksMyHandler1467 size_t countTracks() const {
1468 return mTracks.size();
1469 }
1470
1471 private:
1472 struct TrackInfo {
1473 AString mURL;
1474 int mRTPSocket;
1475 int mRTCPSocket;
1476 bool mUsingInterleavedTCP;
1477 uint32_t mFirstSeqNumInSegment;
1478 bool mNewSegment;
1479
1480 uint32_t mRTPAnchor;
1481 int64_t mNTPAnchorUs;
1482 int32_t mTimeScale;
1483 bool mEOSReceived;
1484
1485 uint32_t mNormalPlayTimeRTP;
1486 int64_t mNormalPlayTimeUs;
1487
1488 sp<APacketSource> mPacketSource;
1489
1490 // Stores packets temporarily while no notion of time
1491 // has been established yet.
1492 List<sp<ABuffer> > mPackets;
1493 };
1494
1495 sp<AMessage> mNotify;
1496 bool mUIDValid;
1497 uid_t mUID;
1498 sp<ALooper> mNetLooper;
1499 sp<ARTSPConnection> mConn;
1500 sp<ARTPConnection> mRTPConn;
1501 sp<ASessionDescription> mSessionDesc;
1502 AString mOriginalSessionURL; // This one still has user:pass@
1503 AString mSessionURL;
1504 AString mSessionHost;
1505 AString mBaseURL;
1506 AString mControlURL;
1507 AString mSessionID;
1508 bool mSetupTracksSuccessful;
1509 bool mSeekPending;
1510 bool mFirstAccessUnit;
1511
1512 bool mAllTracksHaveTime;
1513 int64_t mNTPAnchorUs;
1514 int64_t mMediaAnchorUs;
1515 int64_t mLastMediaTimeUs;
1516
1517 int64_t mNumAccessUnitsReceived;
1518 bool mCheckPending;
1519 int32_t mCheckGeneration;
1520 int32_t mCheckTimeoutGeneration;
1521 bool mTryTCPInterleaving;
1522 bool mTryFakeRTCP;
1523 bool mReceivedFirstRTCPPacket;
1524 bool mReceivedFirstRTPPacket;
1525 bool mSeekable;
1526 int64_t mKeepAliveTimeoutUs;
1527 int32_t mKeepAliveGeneration;
1528 bool mPausing;
1529 int32_t mPauseGeneration;
1530
1531 Vector<TrackInfo> mTracks;
1532
1533 bool mPlayResponseParsed;
1534
setupTrackMyHandler1535 void setupTrack(size_t index) {
1536 sp<APacketSource> source =
1537 new APacketSource(mSessionDesc, index);
1538
1539 if (source->initCheck() != OK) {
1540 ALOGW("Unsupported format. Ignoring track #%d.", index);
1541
1542 sp<AMessage> reply = new AMessage('setu', id());
1543 reply->setSize("index", index);
1544 reply->setInt32("result", ERROR_UNSUPPORTED);
1545 reply->post();
1546 return;
1547 }
1548
1549 AString url;
1550 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1551
1552 AString trackURL;
1553 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1554
1555 mTracks.push(TrackInfo());
1556 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1557 info->mURL = trackURL;
1558 info->mPacketSource = source;
1559 info->mUsingInterleavedTCP = false;
1560 info->mFirstSeqNumInSegment = 0;
1561 info->mNewSegment = true;
1562 info->mRTPAnchor = 0;
1563 info->mNTPAnchorUs = -1;
1564 info->mNormalPlayTimeRTP = 0;
1565 info->mNormalPlayTimeUs = 0ll;
1566
1567 unsigned long PT;
1568 AString formatDesc;
1569 AString formatParams;
1570 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1571
1572 int32_t timescale;
1573 int32_t numChannels;
1574 ASessionDescription::ParseFormatDesc(
1575 formatDesc.c_str(), ×cale, &numChannels);
1576
1577 info->mTimeScale = timescale;
1578 info->mEOSReceived = false;
1579
1580 ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1581
1582 AString request = "SETUP ";
1583 request.append(trackURL);
1584 request.append(" RTSP/1.0\r\n");
1585
1586 if (mTryTCPInterleaving) {
1587 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1588 info->mUsingInterleavedTCP = true;
1589 info->mRTPSocket = interleaveIndex;
1590 info->mRTCPSocket = interleaveIndex + 1;
1591
1592 request.append("Transport: RTP/AVP/TCP;interleaved=");
1593 request.append(interleaveIndex);
1594 request.append("-");
1595 request.append(interleaveIndex + 1);
1596 } else {
1597 unsigned rtpPort;
1598 ARTPConnection::MakePortPair(
1599 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1600
1601 if (mUIDValid) {
1602 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1603 (uint32_t)*(uint32_t*) "RTP_");
1604 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1605 (uint32_t)*(uint32_t*) "RTP_");
1606 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1607 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1608 }
1609
1610 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1611 request.append(rtpPort);
1612 request.append("-");
1613 request.append(rtpPort + 1);
1614 }
1615
1616 request.append("\r\n");
1617
1618 if (index > 1) {
1619 request.append("Session: ");
1620 request.append(mSessionID);
1621 request.append("\r\n");
1622 }
1623
1624 request.append("\r\n");
1625
1626 sp<AMessage> reply = new AMessage('setu', id());
1627 reply->setSize("index", index);
1628 reply->setSize("track-index", mTracks.size() - 1);
1629 mConn->sendRequest(request.c_str(), reply);
1630 }
1631
MakeURLMyHandler1632 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1633 out->clear();
1634
1635 if (strncasecmp("rtsp://", baseURL, 7)) {
1636 // Base URL must be absolute
1637 return false;
1638 }
1639
1640 if (!strncasecmp("rtsp://", url, 7)) {
1641 // "url" is already an absolute URL, ignore base URL.
1642 out->setTo(url);
1643 return true;
1644 }
1645
1646 size_t n = strlen(baseURL);
1647 if (baseURL[n - 1] == '/') {
1648 out->setTo(baseURL);
1649 out->append(url);
1650 } else {
1651 const char *slashPos = strrchr(baseURL, '/');
1652
1653 if (slashPos > &baseURL[6]) {
1654 out->setTo(baseURL, slashPos - baseURL);
1655 } else {
1656 out->setTo(baseURL);
1657 }
1658
1659 out->append("/");
1660 out->append(url);
1661 }
1662
1663 return true;
1664 }
1665
fakeTimestampsMyHandler1666 void fakeTimestamps() {
1667 mNTPAnchorUs = -1ll;
1668 for (size_t i = 0; i < mTracks.size(); ++i) {
1669 onTimeUpdate(i, 0, 0ll);
1670 }
1671 }
1672
dataReceivedOnAllChannelsMyHandler1673 bool dataReceivedOnAllChannels() {
1674 TrackInfo *track;
1675 for (size_t i = 0; i < mTracks.size(); ++i) {
1676 track = &mTracks.editItemAt(i);
1677 if (track->mPackets.empty()) {
1678 return false;
1679 }
1680 }
1681 return true;
1682 }
1683
handleFirstAccessUnitMyHandler1684 void handleFirstAccessUnit() {
1685 if (mFirstAccessUnit) {
1686 sp<AMessage> msg = mNotify->dup();
1687 msg->setInt32("what", kWhatConnected);
1688 msg->post();
1689
1690 if (mSeekable) {
1691 for (size_t i = 0; i < mTracks.size(); ++i) {
1692 TrackInfo *info = &mTracks.editItemAt(i);
1693
1694 postNormalPlayTimeMapping(
1695 i,
1696 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1697 }
1698 }
1699
1700 mFirstAccessUnit = false;
1701 }
1702 }
1703
onTimeUpdateMyHandler1704 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1705 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1706 trackIndex, rtpTime, ntpTime);
1707
1708 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1709
1710 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1711
1712 track->mRTPAnchor = rtpTime;
1713 track->mNTPAnchorUs = ntpTimeUs;
1714
1715 if (mNTPAnchorUs < 0) {
1716 mNTPAnchorUs = ntpTimeUs;
1717 mMediaAnchorUs = mLastMediaTimeUs;
1718 }
1719
1720 if (!mAllTracksHaveTime) {
1721 bool allTracksHaveTime = true;
1722 for (size_t i = 0; i < mTracks.size(); ++i) {
1723 TrackInfo *track = &mTracks.editItemAt(i);
1724 if (track->mNTPAnchorUs < 0) {
1725 allTracksHaveTime = false;
1726 break;
1727 }
1728 }
1729 if (allTracksHaveTime) {
1730 mAllTracksHaveTime = true;
1731 ALOGI("Time now established for all tracks.");
1732 }
1733 }
1734 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1735 handleFirstAccessUnit();
1736
1737 // Time is now established, lets start timestamping immediately
1738 for (size_t i = 0; i < mTracks.size(); ++i) {
1739 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1740 while (!trackInfo->mPackets.empty()) {
1741 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1742 trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1743
1744 if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1745 postQueueAccessUnit(i, accessUnit);
1746 }
1747 }
1748 }
1749 for (size_t i = 0; i < mTracks.size(); ++i) {
1750 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1751 if (trackInfo->mEOSReceived) {
1752 postQueueEOS(i, ERROR_END_OF_STREAM);
1753 trackInfo->mEOSReceived = false;
1754 }
1755 }
1756 }
1757 }
1758
onAccessUnitCompleteMyHandler1759 void onAccessUnitComplete(
1760 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1761 ALOGV("onAccessUnitComplete track %d", trackIndex);
1762
1763 if(!mPlayResponseParsed){
1764 ALOGI("play response is not parsed, storing accessunit");
1765 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1766 track->mPackets.push_back(accessUnit);
1767 return;
1768 }
1769
1770 handleFirstAccessUnit();
1771
1772 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1773
1774 if (!mAllTracksHaveTime) {
1775 ALOGV("storing accessUnit, no time established yet");
1776 track->mPackets.push_back(accessUnit);
1777 return;
1778 }
1779
1780 while (!track->mPackets.empty()) {
1781 sp<ABuffer> accessUnit = *track->mPackets.begin();
1782 track->mPackets.erase(track->mPackets.begin());
1783
1784 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1785 postQueueAccessUnit(trackIndex, accessUnit);
1786 }
1787 }
1788
1789 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1790 postQueueAccessUnit(trackIndex, accessUnit);
1791 }
1792
1793 if (track->mEOSReceived) {
1794 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1795 track->mEOSReceived = false;
1796 }
1797 }
1798
addMediaTimestampMyHandler1799 bool addMediaTimestamp(
1800 int32_t trackIndex, const TrackInfo *track,
1801 const sp<ABuffer> &accessUnit) {
1802 uint32_t rtpTime;
1803 CHECK(accessUnit->meta()->findInt32(
1804 "rtp-time", (int32_t *)&rtpTime));
1805
1806 int64_t relRtpTimeUs =
1807 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1808 / track->mTimeScale;
1809
1810 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1811
1812 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1813
1814 if (mediaTimeUs > mLastMediaTimeUs) {
1815 mLastMediaTimeUs = mediaTimeUs;
1816 }
1817
1818 if (mediaTimeUs < 0) {
1819 ALOGV("dropping early accessUnit.");
1820 return false;
1821 }
1822
1823 ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1824 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1825
1826 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1827
1828 return true;
1829 }
1830
postQueueAccessUnitMyHandler1831 void postQueueAccessUnit(
1832 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1833 sp<AMessage> msg = mNotify->dup();
1834 msg->setInt32("what", kWhatAccessUnit);
1835 msg->setSize("trackIndex", trackIndex);
1836 msg->setBuffer("accessUnit", accessUnit);
1837 msg->post();
1838 }
1839
postQueueEOSMyHandler1840 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1841 sp<AMessage> msg = mNotify->dup();
1842 msg->setInt32("what", kWhatEOS);
1843 msg->setSize("trackIndex", trackIndex);
1844 msg->setInt32("finalResult", finalResult);
1845 msg->post();
1846 }
1847
postQueueSeekDiscontinuityMyHandler1848 void postQueueSeekDiscontinuity(size_t trackIndex) {
1849 sp<AMessage> msg = mNotify->dup();
1850 msg->setInt32("what", kWhatSeekDiscontinuity);
1851 msg->setSize("trackIndex", trackIndex);
1852 msg->post();
1853 }
1854
postNormalPlayTimeMappingMyHandler1855 void postNormalPlayTimeMapping(
1856 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1857 sp<AMessage> msg = mNotify->dup();
1858 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1859 msg->setSize("trackIndex", trackIndex);
1860 msg->setInt32("rtpTime", rtpTime);
1861 msg->setInt64("nptUs", nptUs);
1862 msg->post();
1863 }
1864
1865 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1866 };
1867
1868 } // namespace android
1869
1870 #endif // MY_HANDLER_H_
1871