1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22 #define LOG_TAG "MyHandler"
23 #include <utils/Log.h>
24
25 #include "APacketSource.h"
26 #include "ARTPConnection.h"
27 #include "ARTSPConnection.h"
28 #include "ASessionDescription.h"
29
30 #include <ctype.h>
31
32 #include <media/stagefright/foundation/ABuffer.h>
33 #include <media/stagefright/foundation/ADebug.h>
34 #include <media/stagefright/foundation/ALooper.h>
35 #include <media/stagefright/foundation/AMessage.h>
36 #include <media/stagefright/MediaErrors.h>
37 #include <media/stagefright/Utils.h>
38
39 #include <arpa/inet.h>
40 #include <sys/socket.h>
41 #include <netdb.h>
42
43 #include "HTTPBase.h"
44
45 // If no access units are received within 5 secs, assume that the rtp
46 // stream has ended and signal end of stream.
47 static int64_t kAccessUnitTimeoutUs = 10000000ll;
48
49 // If no access units arrive for the first 10 secs after starting the
50 // stream, assume none ever will and signal EOS or switch transports.
51 static int64_t kStartupTimeoutUs = 10000000ll;
52
53 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
54
55 static int64_t kPauseDelayUs = 3000000ll;
56
57 namespace android {
58
GetAttribute(const char * s,const char * key,AString * value)59 static bool GetAttribute(const char *s, const char *key, AString *value) {
60 value->clear();
61
62 size_t keyLen = strlen(key);
63
64 for (;;) {
65 while (isspace(*s)) {
66 ++s;
67 }
68
69 const char *colonPos = strchr(s, ';');
70
71 size_t len =
72 (colonPos == NULL) ? strlen(s) : colonPos - s;
73
74 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
75 value->setTo(&s[keyLen + 1], len - keyLen - 1);
76 return true;
77 }
78
79 if (colonPos == NULL) {
80 return false;
81 }
82
83 s = colonPos + 1;
84 }
85 }
86
87 struct MyHandler : public AHandler {
88 enum {
89 kWhatConnected = 'conn',
90 kWhatDisconnected = 'disc',
91 kWhatSeekDone = 'sdon',
92
93 kWhatAccessUnit = 'accU',
94 kWhatEOS = 'eos!',
95 kWhatSeekDiscontinuity = 'seeD',
96 kWhatNormalPlayTimeMapping = 'nptM',
97 };
98
99 MyHandler(
100 const char *url,
101 const sp<AMessage> ¬ify,
102 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler103 : mNotify(notify),
104 mUIDValid(uidValid),
105 mUID(uid),
106 mNetLooper(new ALooper),
107 mConn(new ARTSPConnection(mUIDValid, mUID)),
108 mRTPConn(new ARTPConnection),
109 mOriginalSessionURL(url),
110 mSessionURL(url),
111 mSetupTracksSuccessful(false),
112 mSeekPending(false),
113 mFirstAccessUnit(true),
114 mAllTracksHaveTime(false),
115 mNTPAnchorUs(-1),
116 mMediaAnchorUs(-1),
117 mLastMediaTimeUs(0),
118 mNumAccessUnitsReceived(0),
119 mCheckPending(false),
120 mCheckGeneration(0),
121 mCheckTimeoutGeneration(0),
122 mTryTCPInterleaving(false),
123 mTryFakeRTCP(false),
124 mReceivedFirstRTCPPacket(false),
125 mReceivedFirstRTPPacket(false),
126 mSeekable(true),
127 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
128 mKeepAliveGeneration(0),
129 mPausing(false),
130 mPauseGeneration(0) {
131 mNetLooper->setName("rtsp net");
132 mNetLooper->start(false /* runOnCallingThread */,
133 false /* canCallJava */,
134 PRIORITY_HIGHEST);
135
136 // Strip any authentication info from the session url, we don't
137 // want to transmit user/pass in cleartext.
138 AString host, path, user, pass;
139 unsigned port;
140 CHECK(ARTSPConnection::ParseURL(
141 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
142
143 if (user.size() > 0) {
144 mSessionURL.clear();
145 mSessionURL.append("rtsp://");
146 mSessionURL.append(host);
147 mSessionURL.append(":");
148 mSessionURL.append(StringPrintf("%u", port));
149 mSessionURL.append(path);
150
151 ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
152 }
153
154 mSessionHost = host;
155 }
156
connectMyHandler157 void connect() {
158 looper()->registerHandler(mConn);
159 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
160
161 sp<AMessage> notify = new AMessage('biny', id());
162 mConn->observeBinaryData(notify);
163
164 sp<AMessage> reply = new AMessage('conn', id());
165 mConn->connect(mOriginalSessionURL.c_str(), reply);
166 }
167
loadSDPMyHandler168 void loadSDP(const sp<ASessionDescription>& desc) {
169 looper()->registerHandler(mConn);
170 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
171
172 sp<AMessage> notify = new AMessage('biny', id());
173 mConn->observeBinaryData(notify);
174
175 sp<AMessage> reply = new AMessage('sdpl', id());
176 reply->setObject("description", desc);
177 mConn->connect(mOriginalSessionURL.c_str(), reply);
178 }
179
getControlURLMyHandler180 AString getControlURL(sp<ASessionDescription> desc) {
181 AString sessionLevelControlURL;
182 if (mSessionDesc->findAttribute(
183 0,
184 "a=control",
185 &sessionLevelControlURL)) {
186 if (sessionLevelControlURL.compare("*") == 0) {
187 return mBaseURL;
188 } else {
189 AString controlURL;
190 CHECK(MakeURL(
191 mBaseURL.c_str(),
192 sessionLevelControlURL.c_str(),
193 &controlURL));
194 return controlURL;
195 }
196 } else {
197 return mSessionURL;
198 }
199 }
200
disconnectMyHandler201 void disconnect() {
202 (new AMessage('abor', id()))->post();
203 }
204
seekMyHandler205 void seek(int64_t timeUs) {
206 sp<AMessage> msg = new AMessage('seek', id());
207 msg->setInt64("time", timeUs);
208 mPauseGeneration++;
209 msg->post();
210 }
211
isSeekableMyHandler212 bool isSeekable() const {
213 return mSeekable;
214 }
215
pauseMyHandler216 void pause() {
217 sp<AMessage> msg = new AMessage('paus', id());
218 mPauseGeneration++;
219 msg->setInt32("pausecheck", mPauseGeneration);
220 msg->post(kPauseDelayUs);
221 }
222
resumeMyHandler223 void resume() {
224 sp<AMessage> msg = new AMessage('resu', id());
225 mPauseGeneration++;
226 msg->post();
227 }
228
addRRMyHandler229 static void addRR(const sp<ABuffer> &buf) {
230 uint8_t *ptr = buf->data() + buf->size();
231 ptr[0] = 0x80 | 0;
232 ptr[1] = 201; // RR
233 ptr[2] = 0;
234 ptr[3] = 1;
235 ptr[4] = 0xde; // SSRC
236 ptr[5] = 0xad;
237 ptr[6] = 0xbe;
238 ptr[7] = 0xef;
239
240 buf->setRange(0, buf->size() + 8);
241 }
242
addSDESMyHandler243 static void addSDES(int s, const sp<ABuffer> &buffer) {
244 struct sockaddr_in addr;
245 socklen_t addrSize = sizeof(addr);
246 CHECK_EQ(0, getsockname(s, (sockaddr *)&addr, &addrSize));
247
248 uint8_t *data = buffer->data() + buffer->size();
249 data[0] = 0x80 | 1;
250 data[1] = 202; // SDES
251 data[4] = 0xde; // SSRC
252 data[5] = 0xad;
253 data[6] = 0xbe;
254 data[7] = 0xef;
255
256 size_t offset = 8;
257
258 data[offset++] = 1; // CNAME
259
260 AString cname = "stagefright@";
261 cname.append(inet_ntoa(addr.sin_addr));
262 data[offset++] = cname.size();
263
264 memcpy(&data[offset], cname.c_str(), cname.size());
265 offset += cname.size();
266
267 data[offset++] = 6; // TOOL
268
269 AString tool = MakeUserAgent();
270
271 data[offset++] = tool.size();
272
273 memcpy(&data[offset], tool.c_str(), tool.size());
274 offset += tool.size();
275
276 data[offset++] = 0;
277
278 if ((offset % 4) > 0) {
279 size_t count = 4 - (offset % 4);
280 switch (count) {
281 case 3:
282 data[offset++] = 0;
283 case 2:
284 data[offset++] = 0;
285 case 1:
286 data[offset++] = 0;
287 }
288 }
289
290 size_t numWords = (offset / 4) - 1;
291 data[2] = numWords >> 8;
292 data[3] = numWords & 0xff;
293
294 buffer->setRange(buffer->offset(), buffer->size() + offset);
295 }
296
297 // In case we're behind NAT, fire off two UDP packets to the remote
298 // rtp/rtcp ports to poke a hole into the firewall for future incoming
299 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler300 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
301 struct sockaddr_in addr;
302 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
303 addr.sin_family = AF_INET;
304
305 AString source;
306 AString server_port;
307 if (!GetAttribute(transport.c_str(),
308 "source",
309 &source)) {
310 ALOGW("Missing 'source' field in Transport response. Using "
311 "RTSP endpoint address.");
312
313 struct hostent *ent = gethostbyname(mSessionHost.c_str());
314 if (ent == NULL) {
315 ALOGE("Failed to look up address of session host '%s'",
316 mSessionHost.c_str());
317
318 return false;
319 }
320
321 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
322 } else {
323 addr.sin_addr.s_addr = inet_addr(source.c_str());
324 }
325
326 if (!GetAttribute(transport.c_str(),
327 "server_port",
328 &server_port)) {
329 ALOGI("Missing 'server_port' field in Transport response.");
330 return false;
331 }
332
333 int rtpPort, rtcpPort;
334 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
335 || rtpPort <= 0 || rtpPort > 65535
336 || rtcpPort <=0 || rtcpPort > 65535
337 || rtcpPort != rtpPort + 1) {
338 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
339 " RTP port must be even, RTCP port must be one higher.",
340 server_port.c_str());
341
342 return false;
343 }
344
345 if (rtpPort & 1) {
346 ALOGW("Server picked an odd RTP port, it should've picked an "
347 "even one, we'll let it pass for now, but this may break "
348 "in the future.");
349 }
350
351 if (addr.sin_addr.s_addr == INADDR_NONE) {
352 return true;
353 }
354
355 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
356 // No firewalls to traverse on the loopback interface.
357 return true;
358 }
359
360 // Make up an RR/SDES RTCP packet.
361 sp<ABuffer> buf = new ABuffer(65536);
362 buf->setRange(0, 0);
363 addRR(buf);
364 addSDES(rtpSocket, buf);
365
366 addr.sin_port = htons(rtpPort);
367
368 ssize_t n = sendto(
369 rtpSocket, buf->data(), buf->size(), 0,
370 (const sockaddr *)&addr, sizeof(addr));
371
372 if (n < (ssize_t)buf->size()) {
373 ALOGE("failed to poke a hole for RTP packets");
374 return false;
375 }
376
377 addr.sin_port = htons(rtcpPort);
378
379 n = sendto(
380 rtcpSocket, buf->data(), buf->size(), 0,
381 (const sockaddr *)&addr, sizeof(addr));
382
383 if (n < (ssize_t)buf->size()) {
384 ALOGE("failed to poke a hole for RTCP packets");
385 return false;
386 }
387
388 ALOGV("successfully poked holes.");
389
390 return true;
391 }
392
isLiveStreamMyHandler393 static bool isLiveStream(const sp<ASessionDescription> &desc) {
394 AString attrLiveStream;
395 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
396 ssize_t semicolonPos = attrLiveStream.find(";", 2);
397
398 const char* liveStreamValue;
399 if (semicolonPos < 0) {
400 liveStreamValue = attrLiveStream.c_str();
401 } else {
402 AString valString;
403 valString.setTo(attrLiveStream,
404 semicolonPos + 1,
405 attrLiveStream.size() - semicolonPos - 1);
406 liveStreamValue = valString.c_str();
407 }
408
409 uint32_t value = strtoul(liveStreamValue, NULL, 10);
410 if (value == 1) {
411 ALOGV("found live stream");
412 return true;
413 }
414 } else {
415 // It is a live stream if no duration is returned
416 int64_t durationUs;
417 if (!desc->getDurationUs(&durationUs)) {
418 ALOGV("No duration found, assume live stream");
419 return true;
420 }
421 }
422
423 return false;
424 }
425
onMessageReceivedMyHandler426 virtual void onMessageReceived(const sp<AMessage> &msg) {
427 switch (msg->what()) {
428 case 'conn':
429 {
430 int32_t result;
431 CHECK(msg->findInt32("result", &result));
432
433 ALOGI("connection request completed with result %d (%s)",
434 result, strerror(-result));
435
436 if (result == OK) {
437 AString request;
438 request = "DESCRIBE ";
439 request.append(mSessionURL);
440 request.append(" RTSP/1.0\r\n");
441 request.append("Accept: application/sdp\r\n");
442 request.append("\r\n");
443
444 sp<AMessage> reply = new AMessage('desc', id());
445 mConn->sendRequest(request.c_str(), reply);
446 } else {
447 (new AMessage('disc', id()))->post();
448 }
449 break;
450 }
451
452 case 'disc':
453 {
454 ++mKeepAliveGeneration;
455
456 int32_t reconnect;
457 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
458 sp<AMessage> reply = new AMessage('conn', id());
459 mConn->connect(mOriginalSessionURL.c_str(), reply);
460 } else {
461 (new AMessage('quit', id()))->post();
462 }
463 break;
464 }
465
466 case 'desc':
467 {
468 int32_t result;
469 CHECK(msg->findInt32("result", &result));
470
471 ALOGI("DESCRIBE completed with result %d (%s)",
472 result, strerror(-result));
473
474 if (result == OK) {
475 sp<RefBase> obj;
476 CHECK(msg->findObject("response", &obj));
477 sp<ARTSPResponse> response =
478 static_cast<ARTSPResponse *>(obj.get());
479
480 if (response->mStatusCode == 302) {
481 ssize_t i = response->mHeaders.indexOfKey("location");
482 CHECK_GE(i, 0);
483
484 mSessionURL = response->mHeaders.valueAt(i);
485
486 AString request;
487 request = "DESCRIBE ";
488 request.append(mSessionURL);
489 request.append(" RTSP/1.0\r\n");
490 request.append("Accept: application/sdp\r\n");
491 request.append("\r\n");
492
493 sp<AMessage> reply = new AMessage('desc', id());
494 mConn->sendRequest(request.c_str(), reply);
495 break;
496 }
497
498 if (response->mStatusCode != 200) {
499 result = UNKNOWN_ERROR;
500 } else if (response->mContent == NULL) {
501 result = ERROR_MALFORMED;
502 ALOGE("The response has no content.");
503 } else {
504 mSessionDesc = new ASessionDescription;
505
506 mSessionDesc->setTo(
507 response->mContent->data(),
508 response->mContent->size());
509
510 if (!mSessionDesc->isValid()) {
511 ALOGE("Failed to parse session description.");
512 result = ERROR_MALFORMED;
513 } else {
514 ssize_t i = response->mHeaders.indexOfKey("content-base");
515 if (i >= 0) {
516 mBaseURL = response->mHeaders.valueAt(i);
517 } else {
518 i = response->mHeaders.indexOfKey("content-location");
519 if (i >= 0) {
520 mBaseURL = response->mHeaders.valueAt(i);
521 } else {
522 mBaseURL = mSessionURL;
523 }
524 }
525
526 mSeekable = !isLiveStream(mSessionDesc);
527
528 if (!mBaseURL.startsWith("rtsp://")) {
529 // Some misbehaving servers specify a relative
530 // URL in one of the locations above, combine
531 // it with the absolute session URL to get
532 // something usable...
533
534 ALOGW("Server specified a non-absolute base URL"
535 ", combining it with the session URL to "
536 "get something usable...");
537
538 AString tmp;
539 CHECK(MakeURL(
540 mSessionURL.c_str(),
541 mBaseURL.c_str(),
542 &tmp));
543
544 mBaseURL = tmp;
545 }
546
547 mControlURL = getControlURL(mSessionDesc);
548
549 if (mSessionDesc->countTracks() < 2) {
550 // There's no actual tracks in this session.
551 // The first "track" is merely session meta
552 // data.
553
554 ALOGW("Session doesn't contain any playable "
555 "tracks. Aborting.");
556 result = ERROR_UNSUPPORTED;
557 } else {
558 setupTrack(1);
559 }
560 }
561 }
562 }
563
564 if (result != OK) {
565 sp<AMessage> reply = new AMessage('disc', id());
566 mConn->disconnect(reply);
567 }
568 break;
569 }
570
571 case 'sdpl':
572 {
573 int32_t result;
574 CHECK(msg->findInt32("result", &result));
575
576 ALOGI("SDP connection request completed with result %d (%s)",
577 result, strerror(-result));
578
579 if (result == OK) {
580 sp<RefBase> obj;
581 CHECK(msg->findObject("description", &obj));
582 mSessionDesc =
583 static_cast<ASessionDescription *>(obj.get());
584
585 if (!mSessionDesc->isValid()) {
586 ALOGE("Failed to parse session description.");
587 result = ERROR_MALFORMED;
588 } else {
589 mBaseURL = mSessionURL;
590
591 mSeekable = !isLiveStream(mSessionDesc);
592
593 mControlURL = getControlURL(mSessionDesc);
594
595 if (mSessionDesc->countTracks() < 2) {
596 // There's no actual tracks in this session.
597 // The first "track" is merely session meta
598 // data.
599
600 ALOGW("Session doesn't contain any playable "
601 "tracks. Aborting.");
602 result = ERROR_UNSUPPORTED;
603 } else {
604 setupTrack(1);
605 }
606 }
607 }
608
609 if (result != OK) {
610 sp<AMessage> reply = new AMessage('disc', id());
611 mConn->disconnect(reply);
612 }
613 break;
614 }
615
616 case 'setu':
617 {
618 size_t index;
619 CHECK(msg->findSize("index", &index));
620
621 TrackInfo *track = NULL;
622 size_t trackIndex;
623 if (msg->findSize("track-index", &trackIndex)) {
624 track = &mTracks.editItemAt(trackIndex);
625 }
626
627 int32_t result;
628 CHECK(msg->findInt32("result", &result));
629
630 ALOGI("SETUP(%d) completed with result %d (%s)",
631 index, result, strerror(-result));
632
633 if (result == OK) {
634 CHECK(track != NULL);
635
636 sp<RefBase> obj;
637 CHECK(msg->findObject("response", &obj));
638 sp<ARTSPResponse> response =
639 static_cast<ARTSPResponse *>(obj.get());
640
641 if (response->mStatusCode != 200) {
642 result = UNKNOWN_ERROR;
643 } else {
644 ssize_t i = response->mHeaders.indexOfKey("session");
645 CHECK_GE(i, 0);
646
647 mSessionID = response->mHeaders.valueAt(i);
648
649 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
650 AString timeoutStr;
651 if (GetAttribute(
652 mSessionID.c_str(), "timeout", &timeoutStr)) {
653 char *end;
654 unsigned long timeoutSecs =
655 strtoul(timeoutStr.c_str(), &end, 10);
656
657 if (end == timeoutStr.c_str() || *end != '\0') {
658 ALOGW("server specified malformed timeout '%s'",
659 timeoutStr.c_str());
660
661 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
662 } else if (timeoutSecs < 15) {
663 ALOGW("server specified too short a timeout "
664 "(%lu secs), using default.",
665 timeoutSecs);
666
667 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
668 } else {
669 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
670
671 ALOGI("server specified timeout of %lu secs.",
672 timeoutSecs);
673 }
674 }
675
676 i = mSessionID.find(";");
677 if (i >= 0) {
678 // Remove options, i.e. ";timeout=90"
679 mSessionID.erase(i, mSessionID.size() - i);
680 }
681
682 sp<AMessage> notify = new AMessage('accu', id());
683 notify->setSize("track-index", trackIndex);
684
685 i = response->mHeaders.indexOfKey("transport");
686 CHECK_GE(i, 0);
687
688 if (!track->mUsingInterleavedTCP) {
689 AString transport = response->mHeaders.valueAt(i);
690
691 // We are going to continue even if we were
692 // unable to poke a hole into the firewall...
693 pokeAHole(
694 track->mRTPSocket,
695 track->mRTCPSocket,
696 transport);
697 }
698
699 mRTPConn->addStream(
700 track->mRTPSocket, track->mRTCPSocket,
701 mSessionDesc, index,
702 notify, track->mUsingInterleavedTCP);
703
704 mSetupTracksSuccessful = true;
705 }
706 }
707
708 if (result != OK) {
709 if (track) {
710 if (!track->mUsingInterleavedTCP) {
711 // Clear the tag
712 if (mUIDValid) {
713 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
714 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
715 }
716
717 close(track->mRTPSocket);
718 close(track->mRTCPSocket);
719 }
720
721 mTracks.removeItemsAt(trackIndex);
722 }
723 }
724
725 ++index;
726 if (index < mSessionDesc->countTracks()) {
727 setupTrack(index);
728 } else if (mSetupTracksSuccessful) {
729 ++mKeepAliveGeneration;
730 postKeepAlive();
731
732 AString request = "PLAY ";
733 request.append(mControlURL);
734 request.append(" RTSP/1.0\r\n");
735
736 request.append("Session: ");
737 request.append(mSessionID);
738 request.append("\r\n");
739
740 request.append("\r\n");
741
742 sp<AMessage> reply = new AMessage('play', id());
743 mConn->sendRequest(request.c_str(), reply);
744 } else {
745 sp<AMessage> reply = new AMessage('disc', id());
746 mConn->disconnect(reply);
747 }
748 break;
749 }
750
751 case 'play':
752 {
753 int32_t result;
754 CHECK(msg->findInt32("result", &result));
755
756 ALOGI("PLAY completed with result %d (%s)",
757 result, strerror(-result));
758
759 if (result == OK) {
760 sp<RefBase> obj;
761 CHECK(msg->findObject("response", &obj));
762 sp<ARTSPResponse> response =
763 static_cast<ARTSPResponse *>(obj.get());
764
765 if (response->mStatusCode != 200) {
766 result = UNKNOWN_ERROR;
767 } else {
768 parsePlayResponse(response);
769
770 sp<AMessage> timeout = new AMessage('tiou', id());
771 mCheckTimeoutGeneration++;
772 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
773 timeout->post(kStartupTimeoutUs);
774 }
775 }
776
777 if (result != OK) {
778 sp<AMessage> reply = new AMessage('disc', id());
779 mConn->disconnect(reply);
780 }
781
782 break;
783 }
784
785 case 'aliv':
786 {
787 int32_t generation;
788 CHECK(msg->findInt32("generation", &generation));
789
790 if (generation != mKeepAliveGeneration) {
791 // obsolete event.
792 break;
793 }
794
795 AString request;
796 request.append("OPTIONS ");
797 request.append(mSessionURL);
798 request.append(" RTSP/1.0\r\n");
799 request.append("Session: ");
800 request.append(mSessionID);
801 request.append("\r\n");
802 request.append("\r\n");
803
804 sp<AMessage> reply = new AMessage('opts', id());
805 reply->setInt32("generation", mKeepAliveGeneration);
806 mConn->sendRequest(request.c_str(), reply);
807 break;
808 }
809
810 case 'opts':
811 {
812 int32_t result;
813 CHECK(msg->findInt32("result", &result));
814
815 ALOGI("OPTIONS completed with result %d (%s)",
816 result, strerror(-result));
817
818 int32_t generation;
819 CHECK(msg->findInt32("generation", &generation));
820
821 if (generation != mKeepAliveGeneration) {
822 // obsolete event.
823 break;
824 }
825
826 postKeepAlive();
827 break;
828 }
829
830 case 'abor':
831 {
832 for (size_t i = 0; i < mTracks.size(); ++i) {
833 TrackInfo *info = &mTracks.editItemAt(i);
834
835 if (!mFirstAccessUnit) {
836 postQueueEOS(i, ERROR_END_OF_STREAM);
837 }
838
839 if (!info->mUsingInterleavedTCP) {
840 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
841
842 // Clear the tag
843 if (mUIDValid) {
844 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
845 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
846 }
847
848 close(info->mRTPSocket);
849 close(info->mRTCPSocket);
850 }
851 }
852 mTracks.clear();
853 mSetupTracksSuccessful = false;
854 mSeekPending = false;
855 mFirstAccessUnit = true;
856 mAllTracksHaveTime = false;
857 mNTPAnchorUs = -1;
858 mMediaAnchorUs = -1;
859 mNumAccessUnitsReceived = 0;
860 mReceivedFirstRTCPPacket = false;
861 mReceivedFirstRTPPacket = false;
862 mPausing = false;
863 mSeekable = true;
864
865 sp<AMessage> reply = new AMessage('tear', id());
866
867 int32_t reconnect;
868 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
869 reply->setInt32("reconnect", true);
870 }
871
872 AString request;
873 request = "TEARDOWN ";
874
875 // XXX should use aggregate url from SDP here...
876 request.append(mSessionURL);
877 request.append(" RTSP/1.0\r\n");
878
879 request.append("Session: ");
880 request.append(mSessionID);
881 request.append("\r\n");
882
883 request.append("\r\n");
884
885 mConn->sendRequest(request.c_str(), reply);
886 break;
887 }
888
889 case 'tear':
890 {
891 int32_t result;
892 CHECK(msg->findInt32("result", &result));
893
894 ALOGI("TEARDOWN completed with result %d (%s)",
895 result, strerror(-result));
896
897 sp<AMessage> reply = new AMessage('disc', id());
898
899 int32_t reconnect;
900 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
901 reply->setInt32("reconnect", true);
902 }
903
904 mConn->disconnect(reply);
905 break;
906 }
907
908 case 'quit':
909 {
910 sp<AMessage> msg = mNotify->dup();
911 msg->setInt32("what", kWhatDisconnected);
912 msg->setInt32("result", UNKNOWN_ERROR);
913 msg->post();
914 break;
915 }
916
917 case 'chek':
918 {
919 int32_t generation;
920 CHECK(msg->findInt32("generation", &generation));
921 if (generation != mCheckGeneration) {
922 // This is an outdated message. Ignore.
923 break;
924 }
925
926 if (mNumAccessUnitsReceived == 0) {
927 #if 1
928 ALOGI("stream ended? aborting.");
929 (new AMessage('abor', id()))->post();
930 break;
931 #else
932 ALOGI("haven't seen an AU in a looong time.");
933 #endif
934 }
935
936 mNumAccessUnitsReceived = 0;
937 msg->post(kAccessUnitTimeoutUs);
938 break;
939 }
940
941 case 'accu':
942 {
943 int32_t timeUpdate;
944 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
945 size_t trackIndex;
946 CHECK(msg->findSize("track-index", &trackIndex));
947
948 uint32_t rtpTime;
949 uint64_t ntpTime;
950 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
951 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
952
953 onTimeUpdate(trackIndex, rtpTime, ntpTime);
954 break;
955 }
956
957 int32_t first;
958 if (msg->findInt32("first-rtcp", &first)) {
959 mReceivedFirstRTCPPacket = true;
960 break;
961 }
962
963 if (msg->findInt32("first-rtp", &first)) {
964 mReceivedFirstRTPPacket = true;
965 break;
966 }
967
968 ++mNumAccessUnitsReceived;
969 postAccessUnitTimeoutCheck();
970
971 size_t trackIndex;
972 CHECK(msg->findSize("track-index", &trackIndex));
973
974 if (trackIndex >= mTracks.size()) {
975 ALOGV("late packets ignored.");
976 break;
977 }
978
979 TrackInfo *track = &mTracks.editItemAt(trackIndex);
980
981 int32_t eos;
982 if (msg->findInt32("eos", &eos)) {
983 ALOGI("received BYE on track index %d", trackIndex);
984 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
985 ALOGI("No time established => fake existing data");
986
987 track->mEOSReceived = true;
988 mTryFakeRTCP = true;
989 mReceivedFirstRTCPPacket = true;
990 fakeTimestamps();
991 } else {
992 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
993 }
994 return;
995 }
996
997 sp<ABuffer> accessUnit;
998 CHECK(msg->findBuffer("access-unit", &accessUnit));
999
1000 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1001
1002 if (mSeekPending) {
1003 ALOGV("we're seeking, dropping stale packet.");
1004 break;
1005 }
1006
1007 if (seqNum < track->mFirstSeqNumInSegment) {
1008 ALOGV("dropping stale access-unit (%d < %d)",
1009 seqNum, track->mFirstSeqNumInSegment);
1010 break;
1011 }
1012
1013 if (track->mNewSegment) {
1014 track->mNewSegment = false;
1015 }
1016
1017 onAccessUnitComplete(trackIndex, accessUnit);
1018 break;
1019 }
1020
1021 case 'paus':
1022 {
1023 int32_t generation;
1024 CHECK(msg->findInt32("pausecheck", &generation));
1025 if (generation != mPauseGeneration) {
1026 ALOGV("Ignoring outdated pause message.");
1027 break;
1028 }
1029
1030 if (!mSeekable) {
1031 ALOGW("This is a live stream, ignoring pause request.");
1032 break;
1033 }
1034 mCheckPending = true;
1035 ++mCheckGeneration;
1036 mPausing = true;
1037
1038 AString request = "PAUSE ";
1039 request.append(mControlURL);
1040 request.append(" RTSP/1.0\r\n");
1041
1042 request.append("Session: ");
1043 request.append(mSessionID);
1044 request.append("\r\n");
1045
1046 request.append("\r\n");
1047
1048 sp<AMessage> reply = new AMessage('pau2', id());
1049 mConn->sendRequest(request.c_str(), reply);
1050 break;
1051 }
1052
1053 case 'pau2':
1054 {
1055 int32_t result;
1056 CHECK(msg->findInt32("result", &result));
1057 mCheckTimeoutGeneration++;
1058
1059 ALOGI("PAUSE completed with result %d (%s)",
1060 result, strerror(-result));
1061 break;
1062 }
1063
1064 case 'resu':
1065 {
1066 if (mPausing && mSeekPending) {
1067 // If seeking, Play will be sent from see1 instead
1068 break;
1069 }
1070
1071 if (!mPausing) {
1072 // Dont send PLAY if we have not paused
1073 break;
1074 }
1075 AString request = "PLAY ";
1076 request.append(mControlURL);
1077 request.append(" RTSP/1.0\r\n");
1078
1079 request.append("Session: ");
1080 request.append(mSessionID);
1081 request.append("\r\n");
1082
1083 request.append("\r\n");
1084
1085 sp<AMessage> reply = new AMessage('res2', id());
1086 mConn->sendRequest(request.c_str(), reply);
1087 break;
1088 }
1089
1090 case 'res2':
1091 {
1092 int32_t result;
1093 CHECK(msg->findInt32("result", &result));
1094
1095 ALOGI("PLAY completed with result %d (%s)",
1096 result, strerror(-result));
1097
1098 mCheckPending = false;
1099 postAccessUnitTimeoutCheck();
1100
1101 if (result == OK) {
1102 sp<RefBase> obj;
1103 CHECK(msg->findObject("response", &obj));
1104 sp<ARTSPResponse> response =
1105 static_cast<ARTSPResponse *>(obj.get());
1106
1107 if (response->mStatusCode != 200) {
1108 result = UNKNOWN_ERROR;
1109 } else {
1110 parsePlayResponse(response);
1111
1112 // Post new timeout in order to make sure to use
1113 // fake timestamps if no new Sender Reports arrive
1114 sp<AMessage> timeout = new AMessage('tiou', id());
1115 mCheckTimeoutGeneration++;
1116 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1117 timeout->post(kStartupTimeoutUs);
1118 }
1119 }
1120
1121 if (result != OK) {
1122 ALOGE("resume failed, aborting.");
1123 (new AMessage('abor', id()))->post();
1124 }
1125
1126 mPausing = false;
1127 break;
1128 }
1129
1130 case 'seek':
1131 {
1132 if (!mSeekable) {
1133 ALOGW("This is a live stream, ignoring seek request.");
1134
1135 sp<AMessage> msg = mNotify->dup();
1136 msg->setInt32("what", kWhatSeekDone);
1137 msg->post();
1138 break;
1139 }
1140
1141 int64_t timeUs;
1142 CHECK(msg->findInt64("time", &timeUs));
1143
1144 mSeekPending = true;
1145
1146 // Disable the access unit timeout until we resumed
1147 // playback again.
1148 mCheckPending = true;
1149 ++mCheckGeneration;
1150
1151 sp<AMessage> reply = new AMessage('see1', id());
1152 reply->setInt64("time", timeUs);
1153
1154 if (mPausing) {
1155 // PAUSE already sent
1156 ALOGI("Pause already sent");
1157 reply->post();
1158 break;
1159 }
1160 AString request = "PAUSE ";
1161 request.append(mControlURL);
1162 request.append(" RTSP/1.0\r\n");
1163
1164 request.append("Session: ");
1165 request.append(mSessionID);
1166 request.append("\r\n");
1167
1168 request.append("\r\n");
1169
1170 mConn->sendRequest(request.c_str(), reply);
1171 break;
1172 }
1173
1174 case 'see1':
1175 {
1176 // Session is paused now.
1177 for (size_t i = 0; i < mTracks.size(); ++i) {
1178 TrackInfo *info = &mTracks.editItemAt(i);
1179
1180 postQueueSeekDiscontinuity(i);
1181 info->mEOSReceived = false;
1182
1183 info->mRTPAnchor = 0;
1184 info->mNTPAnchorUs = -1;
1185 }
1186
1187 mAllTracksHaveTime = false;
1188 mNTPAnchorUs = -1;
1189
1190 // Start new timeoutgeneration to avoid getting timeout
1191 // before PLAY response arrive
1192 sp<AMessage> timeout = new AMessage('tiou', id());
1193 mCheckTimeoutGeneration++;
1194 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1195 timeout->post(kStartupTimeoutUs);
1196
1197 int64_t timeUs;
1198 CHECK(msg->findInt64("time", &timeUs));
1199
1200 AString request = "PLAY ";
1201 request.append(mControlURL);
1202 request.append(" RTSP/1.0\r\n");
1203
1204 request.append("Session: ");
1205 request.append(mSessionID);
1206 request.append("\r\n");
1207
1208 request.append(
1209 StringPrintf(
1210 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1211
1212 request.append("\r\n");
1213
1214 sp<AMessage> reply = new AMessage('see2', id());
1215 mConn->sendRequest(request.c_str(), reply);
1216 break;
1217 }
1218
1219 case 'see2':
1220 {
1221 if (mTracks.size() == 0) {
1222 // We have already hit abor, break
1223 break;
1224 }
1225
1226 int32_t result;
1227 CHECK(msg->findInt32("result", &result));
1228
1229 ALOGI("PLAY completed with result %d (%s)",
1230 result, strerror(-result));
1231
1232 mCheckPending = false;
1233 postAccessUnitTimeoutCheck();
1234
1235 if (result == OK) {
1236 sp<RefBase> obj;
1237 CHECK(msg->findObject("response", &obj));
1238 sp<ARTSPResponse> response =
1239 static_cast<ARTSPResponse *>(obj.get());
1240
1241 if (response->mStatusCode != 200) {
1242 result = UNKNOWN_ERROR;
1243 } else {
1244 parsePlayResponse(response);
1245
1246 // Post new timeout in order to make sure to use
1247 // fake timestamps if no new Sender Reports arrive
1248 sp<AMessage> timeout = new AMessage('tiou', id());
1249 mCheckTimeoutGeneration++;
1250 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1251 timeout->post(kStartupTimeoutUs);
1252
1253 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1254 CHECK_GE(i, 0);
1255
1256 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1257
1258 ALOGI("seek completed.");
1259 }
1260 }
1261
1262 if (result != OK) {
1263 ALOGE("seek failed, aborting.");
1264 (new AMessage('abor', id()))->post();
1265 }
1266
1267 mPausing = false;
1268 mSeekPending = false;
1269
1270 sp<AMessage> msg = mNotify->dup();
1271 msg->setInt32("what", kWhatSeekDone);
1272 msg->post();
1273 break;
1274 }
1275
1276 case 'biny':
1277 {
1278 sp<ABuffer> buffer;
1279 CHECK(msg->findBuffer("buffer", &buffer));
1280
1281 int32_t index;
1282 CHECK(buffer->meta()->findInt32("index", &index));
1283
1284 mRTPConn->injectPacket(index, buffer);
1285 break;
1286 }
1287
1288 case 'tiou':
1289 {
1290 int32_t timeoutGenerationCheck;
1291 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1292 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1293 // This is an outdated message. Ignore.
1294 // This typically happens if a lot of seeks are
1295 // performed, since new timeout messages now are
1296 // posted at seek as well.
1297 break;
1298 }
1299 if (!mReceivedFirstRTCPPacket) {
1300 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1301 ALOGW("We received RTP packets but no RTCP packets, "
1302 "using fake timestamps.");
1303
1304 mTryFakeRTCP = true;
1305
1306 mReceivedFirstRTCPPacket = true;
1307
1308 fakeTimestamps();
1309 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1310 ALOGW("Never received any data, switching transports.");
1311
1312 mTryTCPInterleaving = true;
1313
1314 sp<AMessage> msg = new AMessage('abor', id());
1315 msg->setInt32("reconnect", true);
1316 msg->post();
1317 } else {
1318 ALOGW("Never received any data, disconnecting.");
1319 (new AMessage('abor', id()))->post();
1320 }
1321 } else {
1322 if (!mAllTracksHaveTime) {
1323 ALOGW("We received some RTCP packets, but time "
1324 "could not be established on all tracks, now "
1325 "using fake timestamps");
1326
1327 fakeTimestamps();
1328 }
1329 }
1330 break;
1331 }
1332
1333 default:
1334 TRESPASS();
1335 break;
1336 }
1337 }
1338
postKeepAliveMyHandler1339 void postKeepAlive() {
1340 sp<AMessage> msg = new AMessage('aliv', id());
1341 msg->setInt32("generation", mKeepAliveGeneration);
1342 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1343 }
1344
postAccessUnitTimeoutCheckMyHandler1345 void postAccessUnitTimeoutCheck() {
1346 if (mCheckPending) {
1347 return;
1348 }
1349
1350 mCheckPending = true;
1351 sp<AMessage> check = new AMessage('chek', id());
1352 check->setInt32("generation", mCheckGeneration);
1353 check->post(kAccessUnitTimeoutUs);
1354 }
1355
SplitStringMyHandler1356 static void SplitString(
1357 const AString &s, const char *separator, List<AString> *items) {
1358 items->clear();
1359 size_t start = 0;
1360 while (start < s.size()) {
1361 ssize_t offset = s.find(separator, start);
1362
1363 if (offset < 0) {
1364 items->push_back(AString(s, start, s.size() - start));
1365 break;
1366 }
1367
1368 items->push_back(AString(s, start, offset - start));
1369 start = offset + strlen(separator);
1370 }
1371 }
1372
parsePlayResponseMyHandler1373 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1374 if (mTracks.size() == 0) {
1375 ALOGV("parsePlayResponse: late packets ignored.");
1376 return;
1377 }
1378
1379 ssize_t i = response->mHeaders.indexOfKey("range");
1380 if (i < 0) {
1381 // Server doesn't even tell use what range it is going to
1382 // play, therefore we won't support seeking.
1383 return;
1384 }
1385
1386 AString range = response->mHeaders.valueAt(i);
1387 ALOGV("Range: %s", range.c_str());
1388
1389 AString val;
1390 CHECK(GetAttribute(range.c_str(), "npt", &val));
1391
1392 float npt1, npt2;
1393 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1394 // This is a live stream and therefore not seekable.
1395
1396 ALOGI("This is a live stream");
1397 return;
1398 }
1399
1400 i = response->mHeaders.indexOfKey("rtp-info");
1401 CHECK_GE(i, 0);
1402
1403 AString rtpInfo = response->mHeaders.valueAt(i);
1404 List<AString> streamInfos;
1405 SplitString(rtpInfo, ",", &streamInfos);
1406
1407 int n = 1;
1408 for (List<AString>::iterator it = streamInfos.begin();
1409 it != streamInfos.end(); ++it) {
1410 (*it).trim();
1411 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1412
1413 CHECK(GetAttribute((*it).c_str(), "url", &val));
1414
1415 size_t trackIndex = 0;
1416 while (trackIndex < mTracks.size()
1417 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1418 ++trackIndex;
1419 }
1420 CHECK_LT(trackIndex, mTracks.size());
1421
1422 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1423
1424 char *end;
1425 unsigned long seq = strtoul(val.c_str(), &end, 10);
1426
1427 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1428 info->mFirstSeqNumInSegment = seq;
1429 info->mNewSegment = true;
1430
1431 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1432
1433 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1434
1435 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1436
1437 info->mNormalPlayTimeRTP = rtpTime;
1438 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1439
1440 if (!mFirstAccessUnit) {
1441 postNormalPlayTimeMapping(
1442 trackIndex,
1443 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1444 }
1445
1446 ++n;
1447 }
1448 }
1449
getTrackFormatMyHandler1450 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1451 CHECK_GE(index, 0u);
1452 CHECK_LT(index, mTracks.size());
1453
1454 const TrackInfo &info = mTracks.itemAt(index);
1455
1456 *timeScale = info.mTimeScale;
1457
1458 return info.mPacketSource->getFormat();
1459 }
1460
countTracksMyHandler1461 size_t countTracks() const {
1462 return mTracks.size();
1463 }
1464
1465 private:
1466 struct TrackInfo {
1467 AString mURL;
1468 int mRTPSocket;
1469 int mRTCPSocket;
1470 bool mUsingInterleavedTCP;
1471 uint32_t mFirstSeqNumInSegment;
1472 bool mNewSegment;
1473
1474 uint32_t mRTPAnchor;
1475 int64_t mNTPAnchorUs;
1476 int32_t mTimeScale;
1477 bool mEOSReceived;
1478
1479 uint32_t mNormalPlayTimeRTP;
1480 int64_t mNormalPlayTimeUs;
1481
1482 sp<APacketSource> mPacketSource;
1483
1484 // Stores packets temporarily while no notion of time
1485 // has been established yet.
1486 List<sp<ABuffer> > mPackets;
1487 };
1488
1489 sp<AMessage> mNotify;
1490 bool mUIDValid;
1491 uid_t mUID;
1492 sp<ALooper> mNetLooper;
1493 sp<ARTSPConnection> mConn;
1494 sp<ARTPConnection> mRTPConn;
1495 sp<ASessionDescription> mSessionDesc;
1496 AString mOriginalSessionURL; // This one still has user:pass@
1497 AString mSessionURL;
1498 AString mSessionHost;
1499 AString mBaseURL;
1500 AString mControlURL;
1501 AString mSessionID;
1502 bool mSetupTracksSuccessful;
1503 bool mSeekPending;
1504 bool mFirstAccessUnit;
1505
1506 bool mAllTracksHaveTime;
1507 int64_t mNTPAnchorUs;
1508 int64_t mMediaAnchorUs;
1509 int64_t mLastMediaTimeUs;
1510
1511 int64_t mNumAccessUnitsReceived;
1512 bool mCheckPending;
1513 int32_t mCheckGeneration;
1514 int32_t mCheckTimeoutGeneration;
1515 bool mTryTCPInterleaving;
1516 bool mTryFakeRTCP;
1517 bool mReceivedFirstRTCPPacket;
1518 bool mReceivedFirstRTPPacket;
1519 bool mSeekable;
1520 int64_t mKeepAliveTimeoutUs;
1521 int32_t mKeepAliveGeneration;
1522 bool mPausing;
1523 int32_t mPauseGeneration;
1524
1525 Vector<TrackInfo> mTracks;
1526
setupTrackMyHandler1527 void setupTrack(size_t index) {
1528 sp<APacketSource> source =
1529 new APacketSource(mSessionDesc, index);
1530
1531 if (source->initCheck() != OK) {
1532 ALOGW("Unsupported format. Ignoring track #%d.", index);
1533
1534 sp<AMessage> reply = new AMessage('setu', id());
1535 reply->setSize("index", index);
1536 reply->setInt32("result", ERROR_UNSUPPORTED);
1537 reply->post();
1538 return;
1539 }
1540
1541 AString url;
1542 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1543
1544 AString trackURL;
1545 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1546
1547 mTracks.push(TrackInfo());
1548 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1549 info->mURL = trackURL;
1550 info->mPacketSource = source;
1551 info->mUsingInterleavedTCP = false;
1552 info->mFirstSeqNumInSegment = 0;
1553 info->mNewSegment = true;
1554 info->mRTPAnchor = 0;
1555 info->mNTPAnchorUs = -1;
1556 info->mNormalPlayTimeRTP = 0;
1557 info->mNormalPlayTimeUs = 0ll;
1558
1559 unsigned long PT;
1560 AString formatDesc;
1561 AString formatParams;
1562 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1563
1564 int32_t timescale;
1565 int32_t numChannels;
1566 ASessionDescription::ParseFormatDesc(
1567 formatDesc.c_str(), ×cale, &numChannels);
1568
1569 info->mTimeScale = timescale;
1570 info->mEOSReceived = false;
1571
1572 ALOGV("track #%d URL=%s", mTracks.size(), trackURL.c_str());
1573
1574 AString request = "SETUP ";
1575 request.append(trackURL);
1576 request.append(" RTSP/1.0\r\n");
1577
1578 if (mTryTCPInterleaving) {
1579 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1580 info->mUsingInterleavedTCP = true;
1581 info->mRTPSocket = interleaveIndex;
1582 info->mRTCPSocket = interleaveIndex + 1;
1583
1584 request.append("Transport: RTP/AVP/TCP;interleaved=");
1585 request.append(interleaveIndex);
1586 request.append("-");
1587 request.append(interleaveIndex + 1);
1588 } else {
1589 unsigned rtpPort;
1590 ARTPConnection::MakePortPair(
1591 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1592
1593 if (mUIDValid) {
1594 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1595 (uint32_t)*(uint32_t*) "RTP_");
1596 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1597 (uint32_t)*(uint32_t*) "RTP_");
1598 }
1599
1600 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1601 request.append(rtpPort);
1602 request.append("-");
1603 request.append(rtpPort + 1);
1604 }
1605
1606 request.append("\r\n");
1607
1608 if (index > 1) {
1609 request.append("Session: ");
1610 request.append(mSessionID);
1611 request.append("\r\n");
1612 }
1613
1614 request.append("\r\n");
1615
1616 sp<AMessage> reply = new AMessage('setu', id());
1617 reply->setSize("index", index);
1618 reply->setSize("track-index", mTracks.size() - 1);
1619 mConn->sendRequest(request.c_str(), reply);
1620 }
1621
MakeURLMyHandler1622 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1623 out->clear();
1624
1625 if (strncasecmp("rtsp://", baseURL, 7)) {
1626 // Base URL must be absolute
1627 return false;
1628 }
1629
1630 if (!strncasecmp("rtsp://", url, 7)) {
1631 // "url" is already an absolute URL, ignore base URL.
1632 out->setTo(url);
1633 return true;
1634 }
1635
1636 size_t n = strlen(baseURL);
1637 if (baseURL[n - 1] == '/') {
1638 out->setTo(baseURL);
1639 out->append(url);
1640 } else {
1641 const char *slashPos = strrchr(baseURL, '/');
1642
1643 if (slashPos > &baseURL[6]) {
1644 out->setTo(baseURL, slashPos - baseURL);
1645 } else {
1646 out->setTo(baseURL);
1647 }
1648
1649 out->append("/");
1650 out->append(url);
1651 }
1652
1653 return true;
1654 }
1655
fakeTimestampsMyHandler1656 void fakeTimestamps() {
1657 mNTPAnchorUs = -1ll;
1658 for (size_t i = 0; i < mTracks.size(); ++i) {
1659 onTimeUpdate(i, 0, 0ll);
1660 }
1661 }
1662
dataReceivedOnAllChannelsMyHandler1663 bool dataReceivedOnAllChannels() {
1664 TrackInfo *track;
1665 for (size_t i = 0; i < mTracks.size(); ++i) {
1666 track = &mTracks.editItemAt(i);
1667 if (track->mPackets.empty()) {
1668 return false;
1669 }
1670 }
1671 return true;
1672 }
1673
onTimeUpdateMyHandler1674 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1675 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = 0x%016llx",
1676 trackIndex, rtpTime, ntpTime);
1677
1678 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1679
1680 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1681
1682 track->mRTPAnchor = rtpTime;
1683 track->mNTPAnchorUs = ntpTimeUs;
1684
1685 if (mNTPAnchorUs < 0) {
1686 mNTPAnchorUs = ntpTimeUs;
1687 mMediaAnchorUs = mLastMediaTimeUs;
1688 }
1689
1690 if (!mAllTracksHaveTime) {
1691 bool allTracksHaveTime = true;
1692 for (size_t i = 0; i < mTracks.size(); ++i) {
1693 TrackInfo *track = &mTracks.editItemAt(i);
1694 if (track->mNTPAnchorUs < 0) {
1695 allTracksHaveTime = false;
1696 break;
1697 }
1698 }
1699 if (allTracksHaveTime) {
1700 mAllTracksHaveTime = true;
1701 ALOGI("Time now established for all tracks.");
1702 }
1703 }
1704 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1705 // Time is now established, lets start timestamping immediately
1706 for (size_t i = 0; i < mTracks.size(); ++i) {
1707 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1708 while (!trackInfo->mPackets.empty()) {
1709 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1710 trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1711
1712 if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1713 postQueueAccessUnit(i, accessUnit);
1714 }
1715 }
1716 }
1717 for (size_t i = 0; i < mTracks.size(); ++i) {
1718 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1719 if (trackInfo->mEOSReceived) {
1720 postQueueEOS(i, ERROR_END_OF_STREAM);
1721 trackInfo->mEOSReceived = false;
1722 }
1723 }
1724 }
1725 }
1726
onAccessUnitCompleteMyHandler1727 void onAccessUnitComplete(
1728 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1729 ALOGV("onAccessUnitComplete track %d", trackIndex);
1730
1731 if (mFirstAccessUnit) {
1732 sp<AMessage> msg = mNotify->dup();
1733 msg->setInt32("what", kWhatConnected);
1734 msg->post();
1735
1736 if (mSeekable) {
1737 for (size_t i = 0; i < mTracks.size(); ++i) {
1738 TrackInfo *info = &mTracks.editItemAt(i);
1739
1740 postNormalPlayTimeMapping(
1741 i,
1742 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1743 }
1744 }
1745
1746 mFirstAccessUnit = false;
1747 }
1748
1749 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1750
1751 if (!mAllTracksHaveTime) {
1752 ALOGV("storing accessUnit, no time established yet");
1753 track->mPackets.push_back(accessUnit);
1754 return;
1755 }
1756
1757 while (!track->mPackets.empty()) {
1758 sp<ABuffer> accessUnit = *track->mPackets.begin();
1759 track->mPackets.erase(track->mPackets.begin());
1760
1761 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1762 postQueueAccessUnit(trackIndex, accessUnit);
1763 }
1764 }
1765
1766 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1767 postQueueAccessUnit(trackIndex, accessUnit);
1768 }
1769
1770 if (track->mEOSReceived) {
1771 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1772 track->mEOSReceived = false;
1773 }
1774 }
1775
addMediaTimestampMyHandler1776 bool addMediaTimestamp(
1777 int32_t trackIndex, const TrackInfo *track,
1778 const sp<ABuffer> &accessUnit) {
1779 uint32_t rtpTime;
1780 CHECK(accessUnit->meta()->findInt32(
1781 "rtp-time", (int32_t *)&rtpTime));
1782
1783 int64_t relRtpTimeUs =
1784 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1785 / track->mTimeScale;
1786
1787 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1788
1789 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1790
1791 if (mediaTimeUs > mLastMediaTimeUs) {
1792 mLastMediaTimeUs = mediaTimeUs;
1793 }
1794
1795 if (mediaTimeUs < 0) {
1796 ALOGV("dropping early accessUnit.");
1797 return false;
1798 }
1799
1800 ALOGV("track %d rtpTime=%d mediaTimeUs = %lld us (%.2f secs)",
1801 trackIndex, rtpTime, mediaTimeUs, mediaTimeUs / 1E6);
1802
1803 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1804
1805 return true;
1806 }
1807
postQueueAccessUnitMyHandler1808 void postQueueAccessUnit(
1809 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1810 sp<AMessage> msg = mNotify->dup();
1811 msg->setInt32("what", kWhatAccessUnit);
1812 msg->setSize("trackIndex", trackIndex);
1813 msg->setBuffer("accessUnit", accessUnit);
1814 msg->post();
1815 }
1816
postQueueEOSMyHandler1817 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1818 sp<AMessage> msg = mNotify->dup();
1819 msg->setInt32("what", kWhatEOS);
1820 msg->setSize("trackIndex", trackIndex);
1821 msg->setInt32("finalResult", finalResult);
1822 msg->post();
1823 }
1824
postQueueSeekDiscontinuityMyHandler1825 void postQueueSeekDiscontinuity(size_t trackIndex) {
1826 sp<AMessage> msg = mNotify->dup();
1827 msg->setInt32("what", kWhatSeekDiscontinuity);
1828 msg->setSize("trackIndex", trackIndex);
1829 msg->post();
1830 }
1831
postNormalPlayTimeMappingMyHandler1832 void postNormalPlayTimeMapping(
1833 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1834 sp<AMessage> msg = mNotify->dup();
1835 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1836 msg->setSize("trackIndex", trackIndex);
1837 msg->setInt32("rtpTime", rtpTime);
1838 msg->setInt64("nptUs", nptUs);
1839 msg->post();
1840 }
1841
1842 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1843 };
1844
1845 } // namespace android
1846
1847 #endif // MY_HANDLER_H_
1848