1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26
27 #include <utils/Log.h>
28
29 #include "APacketSource.h"
30 #include "ARTPConnection.h"
31 #include "ARTSPConnection.h"
32 #include "ASessionDescription.h"
33
34 #include <ctype.h>
35
36 #include <media/stagefright/foundation/ABuffer.h>
37 #include <media/stagefright/foundation/ADebug.h>
38 #include <media/stagefright/foundation/ALooper.h>
39 #include <media/stagefright/foundation/AMessage.h>
40 #include <media/stagefright/MediaErrors.h>
41 #include <media/stagefright/Utils.h>
42
43 #include <arpa/inet.h>
44 #include <sys/socket.h>
45 #include <netdb.h>
46
47 #include "HTTPBase.h"
48
49 #if LOG_NDEBUG
50 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
51 #else
52 #define UNUSED_UNLESS_VERBOSE(x)
53 #endif
54
55 // If no access units are received within 5 secs, assume that the rtp
56 // stream has ended and signal end of stream.
57 static int64_t kAccessUnitTimeoutUs = 10000000ll;
58
59 // If no access units arrive for the first 10 secs after starting the
60 // stream, assume none ever will and signal EOS or switch transports.
61 static int64_t kStartupTimeoutUs = 10000000ll;
62
63 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
64
65 static int64_t kPauseDelayUs = 3000000ll;
66
67 // The allowed maximum number of stale access units at the beginning of
68 // a new sequence.
69 static int32_t kMaxAllowedStaleAccessUnits = 20;
70
71 namespace android {
72
GetAttribute(const char * s,const char * key,AString * value)73 static bool GetAttribute(const char *s, const char *key, AString *value) {
74 value->clear();
75
76 size_t keyLen = strlen(key);
77
78 for (;;) {
79 while (isspace(*s)) {
80 ++s;
81 }
82
83 const char *colonPos = strchr(s, ';');
84
85 size_t len =
86 (colonPos == NULL) ? strlen(s) : colonPos - s;
87
88 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
89 value->setTo(&s[keyLen + 1], len - keyLen - 1);
90 return true;
91 }
92
93 if (colonPos == NULL) {
94 return false;
95 }
96
97 s = colonPos + 1;
98 }
99 }
100
101 struct MyHandler : public AHandler {
102 enum {
103 kWhatConnected = 'conn',
104 kWhatDisconnected = 'disc',
105 kWhatSeekPaused = 'spau',
106 kWhatSeekDone = 'sdon',
107
108 kWhatAccessUnit = 'accU',
109 kWhatEOS = 'eos!',
110 kWhatSeekDiscontinuity = 'seeD',
111 kWhatNormalPlayTimeMapping = 'nptM',
112 };
113
114 MyHandler(
115 const char *url,
116 const sp<AMessage> ¬ify,
117 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler118 : mNotify(notify),
119 mUIDValid(uidValid),
120 mUID(uid),
121 mNetLooper(new ALooper),
122 mConn(new ARTSPConnection(mUIDValid, mUID)),
123 mRTPConn(new ARTPConnection),
124 mOriginalSessionURL(url),
125 mSessionURL(url),
126 mSetupTracksSuccessful(false),
127 mSeekPending(false),
128 mFirstAccessUnit(true),
129 mAllTracksHaveTime(false),
130 mNTPAnchorUs(-1),
131 mMediaAnchorUs(-1),
132 mLastMediaTimeUs(0),
133 mNumAccessUnitsReceived(0),
134 mCheckPending(false),
135 mCheckGeneration(0),
136 mCheckTimeoutGeneration(0),
137 mTryTCPInterleaving(false),
138 mTryFakeRTCP(false),
139 mReceivedFirstRTCPPacket(false),
140 mReceivedFirstRTPPacket(false),
141 mSeekable(true),
142 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
143 mKeepAliveGeneration(0),
144 mPausing(false),
145 mPauseGeneration(0),
146 mPlayResponseParsed(false) {
147 mNetLooper->setName("rtsp net");
148 mNetLooper->start(false /* runOnCallingThread */,
149 false /* canCallJava */,
150 PRIORITY_HIGHEST);
151
152 // Strip any authentication info from the session url, we don't
153 // want to transmit user/pass in cleartext.
154 AString host, path, user, pass;
155 unsigned port;
156 CHECK(ARTSPConnection::ParseURL(
157 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
158
159 if (user.size() > 0) {
160 mSessionURL.clear();
161 mSessionURL.append("rtsp://");
162 mSessionURL.append(host);
163 mSessionURL.append(":");
164 mSessionURL.append(AStringPrintf("%u", port));
165 mSessionURL.append(path);
166
167 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
168 }
169
170 mSessionHost = host;
171 }
172
connectMyHandler173 void connect() {
174 looper()->registerHandler(mConn);
175 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
176
177 sp<AMessage> notify = new AMessage('biny', this);
178 mConn->observeBinaryData(notify);
179
180 sp<AMessage> reply = new AMessage('conn', this);
181 mConn->connect(mOriginalSessionURL.c_str(), reply);
182 }
183
loadSDPMyHandler184 void loadSDP(const sp<ASessionDescription>& desc) {
185 looper()->registerHandler(mConn);
186 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
187
188 sp<AMessage> notify = new AMessage('biny', this);
189 mConn->observeBinaryData(notify);
190
191 sp<AMessage> reply = new AMessage('sdpl', this);
192 reply->setObject("description", desc);
193 mConn->connect(mOriginalSessionURL.c_str(), reply);
194 }
195
getControlURLMyHandler196 AString getControlURL() {
197 AString sessionLevelControlURL;
198 if (mSessionDesc->findAttribute(
199 0,
200 "a=control",
201 &sessionLevelControlURL)) {
202 if (sessionLevelControlURL.compare("*") == 0) {
203 return mBaseURL;
204 } else {
205 AString controlURL;
206 CHECK(MakeURL(
207 mBaseURL.c_str(),
208 sessionLevelControlURL.c_str(),
209 &controlURL));
210 return controlURL;
211 }
212 } else {
213 return mSessionURL;
214 }
215 }
216
disconnectMyHandler217 void disconnect() {
218 (new AMessage('abor', this))->post();
219 }
220
seekMyHandler221 void seek(int64_t timeUs) {
222 sp<AMessage> msg = new AMessage('seek', this);
223 msg->setInt64("time", timeUs);
224 mPauseGeneration++;
225 msg->post();
226 }
227
continueSeekAfterPauseMyHandler228 void continueSeekAfterPause(int64_t timeUs) {
229 sp<AMessage> msg = new AMessage('see1', this);
230 msg->setInt64("time", timeUs);
231 msg->post();
232 }
233
isSeekableMyHandler234 bool isSeekable() const {
235 return mSeekable;
236 }
237
pauseMyHandler238 void pause() {
239 sp<AMessage> msg = new AMessage('paus', this);
240 mPauseGeneration++;
241 msg->setInt32("pausecheck", mPauseGeneration);
242 msg->post();
243 }
244
resumeMyHandler245 void resume() {
246 sp<AMessage> msg = new AMessage('resu', this);
247 mPauseGeneration++;
248 msg->post();
249 }
250
addRRMyHandler251 static void addRR(const sp<ABuffer> &buf) {
252 uint8_t *ptr = buf->data() + buf->size();
253 ptr[0] = 0x80 | 0;
254 ptr[1] = 201; // RR
255 ptr[2] = 0;
256 ptr[3] = 1;
257 ptr[4] = 0xde; // SSRC
258 ptr[5] = 0xad;
259 ptr[6] = 0xbe;
260 ptr[7] = 0xef;
261
262 buf->setRange(0, buf->size() + 8);
263 }
264
addSDESMyHandler265 static void addSDES(int s, const sp<ABuffer> &buffer) {
266 struct sockaddr_in addr;
267 socklen_t addrSize = sizeof(addr);
268 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
269 inet_aton("0.0.0.0", &(addr.sin_addr));
270 }
271
272 uint8_t *data = buffer->data() + buffer->size();
273 data[0] = 0x80 | 1;
274 data[1] = 202; // SDES
275 data[4] = 0xde; // SSRC
276 data[5] = 0xad;
277 data[6] = 0xbe;
278 data[7] = 0xef;
279
280 size_t offset = 8;
281
282 data[offset++] = 1; // CNAME
283
284 AString cname = "stagefright@";
285 cname.append(inet_ntoa(addr.sin_addr));
286 data[offset++] = cname.size();
287
288 memcpy(&data[offset], cname.c_str(), cname.size());
289 offset += cname.size();
290
291 data[offset++] = 6; // TOOL
292
293 AString tool = MakeUserAgent();
294
295 data[offset++] = tool.size();
296
297 memcpy(&data[offset], tool.c_str(), tool.size());
298 offset += tool.size();
299
300 data[offset++] = 0;
301
302 if ((offset % 4) > 0) {
303 size_t count = 4 - (offset % 4);
304 switch (count) {
305 case 3:
306 data[offset++] = 0;
307 case 2:
308 data[offset++] = 0;
309 case 1:
310 data[offset++] = 0;
311 }
312 }
313
314 size_t numWords = (offset / 4) - 1;
315 data[2] = numWords >> 8;
316 data[3] = numWords & 0xff;
317
318 buffer->setRange(buffer->offset(), buffer->size() + offset);
319 }
320
321 // In case we're behind NAT, fire off two UDP packets to the remote
322 // rtp/rtcp ports to poke a hole into the firewall for future incoming
323 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler324 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
325 struct sockaddr_in addr;
326 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
327 addr.sin_family = AF_INET;
328
329 AString source;
330 AString server_port;
331 if (!GetAttribute(transport.c_str(),
332 "source",
333 &source)) {
334 ALOGW("Missing 'source' field in Transport response. Using "
335 "RTSP endpoint address.");
336
337 struct hostent *ent = gethostbyname(mSessionHost.c_str());
338 if (ent == NULL) {
339 ALOGE("Failed to look up address of session host '%s'",
340 mSessionHost.c_str());
341
342 return false;
343 }
344
345 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
346 } else {
347 addr.sin_addr.s_addr = inet_addr(source.c_str());
348 }
349
350 if (!GetAttribute(transport.c_str(),
351 "server_port",
352 &server_port)) {
353 ALOGI("Missing 'server_port' field in Transport response.");
354 return false;
355 }
356
357 int rtpPort, rtcpPort;
358 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
359 || rtpPort <= 0 || rtpPort > 65535
360 || rtcpPort <=0 || rtcpPort > 65535
361 || rtcpPort != rtpPort + 1) {
362 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
363 " RTP port must be even, RTCP port must be one higher.",
364 server_port.c_str());
365
366 return false;
367 }
368
369 if (rtpPort & 1) {
370 ALOGW("Server picked an odd RTP port, it should've picked an "
371 "even one, we'll let it pass for now, but this may break "
372 "in the future.");
373 }
374
375 if (addr.sin_addr.s_addr == INADDR_NONE) {
376 return true;
377 }
378
379 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
380 // No firewalls to traverse on the loopback interface.
381 return true;
382 }
383
384 // Make up an RR/SDES RTCP packet.
385 sp<ABuffer> buf = new ABuffer(65536);
386 buf->setRange(0, 0);
387 addRR(buf);
388 addSDES(rtpSocket, buf);
389
390 addr.sin_port = htons(rtpPort);
391
392 ssize_t n = sendto(
393 rtpSocket, buf->data(), buf->size(), 0,
394 (const sockaddr *)&addr, sizeof(addr));
395
396 if (n < (ssize_t)buf->size()) {
397 ALOGE("failed to poke a hole for RTP packets");
398 return false;
399 }
400
401 addr.sin_port = htons(rtcpPort);
402
403 n = sendto(
404 rtcpSocket, buf->data(), buf->size(), 0,
405 (const sockaddr *)&addr, sizeof(addr));
406
407 if (n < (ssize_t)buf->size()) {
408 ALOGE("failed to poke a hole for RTCP packets");
409 return false;
410 }
411
412 ALOGV("successfully poked holes.");
413
414 return true;
415 }
416
isLiveStreamMyHandler417 static bool isLiveStream(const sp<ASessionDescription> &desc) {
418 AString attrLiveStream;
419 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
420 ssize_t semicolonPos = attrLiveStream.find(";", 2);
421
422 const char* liveStreamValue;
423 if (semicolonPos < 0) {
424 liveStreamValue = attrLiveStream.c_str();
425 } else {
426 AString valString;
427 valString.setTo(attrLiveStream,
428 semicolonPos + 1,
429 attrLiveStream.size() - semicolonPos - 1);
430 liveStreamValue = valString.c_str();
431 }
432
433 uint32_t value = strtoul(liveStreamValue, NULL, 10);
434 if (value == 1) {
435 ALOGV("found live stream");
436 return true;
437 }
438 } else {
439 // It is a live stream if no duration is returned
440 int64_t durationUs;
441 if (!desc->getDurationUs(&durationUs)) {
442 ALOGV("No duration found, assume live stream");
443 return true;
444 }
445 }
446
447 return false;
448 }
449
onMessageReceivedMyHandler450 virtual void onMessageReceived(const sp<AMessage> &msg) {
451 switch (msg->what()) {
452 case 'conn':
453 {
454 int32_t result;
455 CHECK(msg->findInt32("result", &result));
456
457 ALOGI("connection request completed with result %d (%s)",
458 result, strerror(-result));
459
460 if (result == OK) {
461 AString request;
462 request = "DESCRIBE ";
463 request.append(mSessionURL);
464 request.append(" RTSP/1.0\r\n");
465 request.append("Accept: application/sdp\r\n");
466 request.append("\r\n");
467
468 sp<AMessage> reply = new AMessage('desc', this);
469 mConn->sendRequest(request.c_str(), reply);
470 } else {
471 (new AMessage('disc', this))->post();
472 }
473 break;
474 }
475
476 case 'disc':
477 {
478 ++mKeepAliveGeneration;
479
480 int32_t reconnect;
481 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
482 sp<AMessage> reply = new AMessage('conn', this);
483 mConn->connect(mOriginalSessionURL.c_str(), reply);
484 } else {
485 (new AMessage('quit', this))->post();
486 }
487 break;
488 }
489
490 case 'desc':
491 {
492 int32_t result;
493 CHECK(msg->findInt32("result", &result));
494
495 ALOGI("DESCRIBE completed with result %d (%s)",
496 result, strerror(-result));
497
498 if (result == OK) {
499 sp<RefBase> obj;
500 CHECK(msg->findObject("response", &obj));
501 sp<ARTSPResponse> response =
502 static_cast<ARTSPResponse *>(obj.get());
503
504 if (response->mStatusCode == 301 || response->mStatusCode == 302) {
505 ssize_t i = response->mHeaders.indexOfKey("location");
506 CHECK_GE(i, 0);
507
508 mOriginalSessionURL = response->mHeaders.valueAt(i);
509 mSessionURL = mOriginalSessionURL;
510
511 // Strip any authentication info from the session url, we don't
512 // want to transmit user/pass in cleartext.
513 AString host, path, user, pass;
514 unsigned port;
515 if (ARTSPConnection::ParseURL(
516 mSessionURL.c_str(), &host, &port, &path, &user, &pass)
517 && user.size() > 0) {
518 mSessionURL.clear();
519 mSessionURL.append("rtsp://");
520 mSessionURL.append(host);
521 mSessionURL.append(":");
522 mSessionURL.append(AStringPrintf("%u", port));
523 mSessionURL.append(path);
524
525 ALOGI("rewritten session url: '%s'", mSessionURL.c_str());
526 }
527
528 sp<AMessage> reply = new AMessage('conn', this);
529 mConn->connect(mOriginalSessionURL.c_str(), reply);
530 break;
531 }
532
533 if (response->mStatusCode != 200) {
534 result = UNKNOWN_ERROR;
535 } else if (response->mContent == NULL) {
536 result = ERROR_MALFORMED;
537 ALOGE("The response has no content.");
538 } else {
539 mSessionDesc = new ASessionDescription;
540
541 mSessionDesc->setTo(
542 response->mContent->data(),
543 response->mContent->size());
544
545 if (!mSessionDesc->isValid()) {
546 ALOGE("Failed to parse session description.");
547 result = ERROR_MALFORMED;
548 } else {
549 ssize_t i = response->mHeaders.indexOfKey("content-base");
550 if (i >= 0) {
551 mBaseURL = response->mHeaders.valueAt(i);
552 } else {
553 i = response->mHeaders.indexOfKey("content-location");
554 if (i >= 0) {
555 mBaseURL = response->mHeaders.valueAt(i);
556 } else {
557 mBaseURL = mSessionURL;
558 }
559 }
560
561 mSeekable = !isLiveStream(mSessionDesc);
562
563 if (!mBaseURL.startsWith("rtsp://")) {
564 // Some misbehaving servers specify a relative
565 // URL in one of the locations above, combine
566 // it with the absolute session URL to get
567 // something usable...
568
569 ALOGW("Server specified a non-absolute base URL"
570 ", combining it with the session URL to "
571 "get something usable...");
572
573 AString tmp;
574 CHECK(MakeURL(
575 mSessionURL.c_str(),
576 mBaseURL.c_str(),
577 &tmp));
578
579 mBaseURL = tmp;
580 }
581
582 mControlURL = getControlURL();
583
584 if (mSessionDesc->countTracks() < 2) {
585 // There's no actual tracks in this session.
586 // The first "track" is merely session meta
587 // data.
588
589 ALOGW("Session doesn't contain any playable "
590 "tracks. Aborting.");
591 result = ERROR_UNSUPPORTED;
592 } else {
593 setupTrack(1);
594 }
595 }
596 }
597 }
598
599 if (result != OK) {
600 sp<AMessage> reply = new AMessage('disc', this);
601 mConn->disconnect(reply);
602 }
603 break;
604 }
605
606 case 'sdpl':
607 {
608 int32_t result;
609 CHECK(msg->findInt32("result", &result));
610
611 ALOGI("SDP connection request completed with result %d (%s)",
612 result, strerror(-result));
613
614 if (result == OK) {
615 sp<RefBase> obj;
616 CHECK(msg->findObject("description", &obj));
617 mSessionDesc =
618 static_cast<ASessionDescription *>(obj.get());
619
620 if (!mSessionDesc->isValid()) {
621 ALOGE("Failed to parse session description.");
622 result = ERROR_MALFORMED;
623 } else {
624 mBaseURL = mSessionURL;
625
626 mSeekable = !isLiveStream(mSessionDesc);
627
628 mControlURL = getControlURL();
629
630 if (mSessionDesc->countTracks() < 2) {
631 // There's no actual tracks in this session.
632 // The first "track" is merely session meta
633 // data.
634
635 ALOGW("Session doesn't contain any playable "
636 "tracks. Aborting.");
637 result = ERROR_UNSUPPORTED;
638 } else {
639 setupTrack(1);
640 }
641 }
642 }
643
644 if (result != OK) {
645 sp<AMessage> reply = new AMessage('disc', this);
646 mConn->disconnect(reply);
647 }
648 break;
649 }
650
651 case 'setu':
652 {
653 size_t index;
654 CHECK(msg->findSize("index", &index));
655
656 TrackInfo *track = NULL;
657 size_t trackIndex;
658 if (msg->findSize("track-index", &trackIndex)) {
659 track = &mTracks.editItemAt(trackIndex);
660 }
661
662 int32_t result;
663 CHECK(msg->findInt32("result", &result));
664
665 ALOGI("SETUP(%zu) completed with result %d (%s)",
666 index, result, strerror(-result));
667
668 if (result == OK) {
669 CHECK(track != NULL);
670
671 sp<RefBase> obj;
672 CHECK(msg->findObject("response", &obj));
673 sp<ARTSPResponse> response =
674 static_cast<ARTSPResponse *>(obj.get());
675
676 if (response->mStatusCode != 200) {
677 result = UNKNOWN_ERROR;
678 } else {
679 ssize_t i = response->mHeaders.indexOfKey("session");
680 CHECK_GE(i, 0);
681
682 mSessionID = response->mHeaders.valueAt(i);
683
684 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
685 AString timeoutStr;
686 if (GetAttribute(
687 mSessionID.c_str(), "timeout", &timeoutStr)) {
688 char *end;
689 unsigned long timeoutSecs =
690 strtoul(timeoutStr.c_str(), &end, 10);
691
692 if (end == timeoutStr.c_str() || *end != '\0') {
693 ALOGW("server specified malformed timeout '%s'",
694 timeoutStr.c_str());
695
696 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
697 } else if (timeoutSecs < 15) {
698 ALOGW("server specified too short a timeout "
699 "(%lu secs), using default.",
700 timeoutSecs);
701
702 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
703 } else {
704 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
705
706 ALOGI("server specified timeout of %lu secs.",
707 timeoutSecs);
708 }
709 }
710
711 i = mSessionID.find(";");
712 if (i >= 0) {
713 // Remove options, i.e. ";timeout=90"
714 mSessionID.erase(i, mSessionID.size() - i);
715 }
716
717 sp<AMessage> notify = new AMessage('accu', this);
718 notify->setSize("track-index", trackIndex);
719
720 i = response->mHeaders.indexOfKey("transport");
721 CHECK_GE(i, 0);
722
723 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
724 if (!track->mUsingInterleavedTCP) {
725 AString transport = response->mHeaders.valueAt(i);
726
727 // We are going to continue even if we were
728 // unable to poke a hole into the firewall...
729 pokeAHole(
730 track->mRTPSocket,
731 track->mRTCPSocket,
732 transport);
733 }
734
735 mRTPConn->addStream(
736 track->mRTPSocket, track->mRTCPSocket,
737 mSessionDesc, index,
738 notify, track->mUsingInterleavedTCP);
739
740 mSetupTracksSuccessful = true;
741 } else {
742 result = BAD_VALUE;
743 }
744 }
745 }
746
747 if (result != OK) {
748 if (track) {
749 if (!track->mUsingInterleavedTCP) {
750 // Clear the tag
751 if (mUIDValid) {
752 HTTPBase::UnRegisterSocketUserTag(track->mRTPSocket);
753 HTTPBase::UnRegisterSocketUserMark(track->mRTPSocket);
754 HTTPBase::UnRegisterSocketUserTag(track->mRTCPSocket);
755 HTTPBase::UnRegisterSocketUserMark(track->mRTCPSocket);
756 }
757
758 close(track->mRTPSocket);
759 close(track->mRTCPSocket);
760 }
761
762 mTracks.removeItemsAt(trackIndex);
763 }
764 }
765
766 ++index;
767 if (result == OK && index < mSessionDesc->countTracks()) {
768 setupTrack(index);
769 } else if (mSetupTracksSuccessful) {
770 ++mKeepAliveGeneration;
771 postKeepAlive();
772
773 AString request = "PLAY ";
774 request.append(mControlURL);
775 request.append(" RTSP/1.0\r\n");
776
777 request.append("Session: ");
778 request.append(mSessionID);
779 request.append("\r\n");
780
781 request.append("\r\n");
782
783 sp<AMessage> reply = new AMessage('play', this);
784 mConn->sendRequest(request.c_str(), reply);
785 } else {
786 sp<AMessage> reply = new AMessage('disc', this);
787 mConn->disconnect(reply);
788 }
789 break;
790 }
791
792 case 'play':
793 {
794 int32_t result;
795 CHECK(msg->findInt32("result", &result));
796
797 ALOGI("PLAY completed with result %d (%s)",
798 result, strerror(-result));
799
800 if (result == OK) {
801 sp<RefBase> obj;
802 CHECK(msg->findObject("response", &obj));
803 sp<ARTSPResponse> response =
804 static_cast<ARTSPResponse *>(obj.get());
805
806 if (response->mStatusCode != 200) {
807 result = UNKNOWN_ERROR;
808 } else {
809 parsePlayResponse(response);
810
811 sp<AMessage> timeout = new AMessage('tiou', this);
812 mCheckTimeoutGeneration++;
813 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
814 timeout->post(kStartupTimeoutUs);
815 }
816 }
817
818 if (result != OK) {
819 sp<AMessage> reply = new AMessage('disc', this);
820 mConn->disconnect(reply);
821 }
822
823 break;
824 }
825
826 case 'aliv':
827 {
828 int32_t generation;
829 CHECK(msg->findInt32("generation", &generation));
830
831 if (generation != mKeepAliveGeneration) {
832 // obsolete event.
833 break;
834 }
835
836 AString request;
837 request.append("OPTIONS ");
838 request.append(mSessionURL);
839 request.append(" RTSP/1.0\r\n");
840 request.append("Session: ");
841 request.append(mSessionID);
842 request.append("\r\n");
843 request.append("\r\n");
844
845 sp<AMessage> reply = new AMessage('opts', this);
846 reply->setInt32("generation", mKeepAliveGeneration);
847 mConn->sendRequest(request.c_str(), reply);
848 break;
849 }
850
851 case 'opts':
852 {
853 int32_t result;
854 CHECK(msg->findInt32("result", &result));
855
856 ALOGI("OPTIONS completed with result %d (%s)",
857 result, strerror(-result));
858
859 int32_t generation;
860 CHECK(msg->findInt32("generation", &generation));
861
862 if (generation != mKeepAliveGeneration) {
863 // obsolete event.
864 break;
865 }
866
867 postKeepAlive();
868 break;
869 }
870
871 case 'abor':
872 {
873 for (size_t i = 0; i < mTracks.size(); ++i) {
874 TrackInfo *info = &mTracks.editItemAt(i);
875
876 if (!mFirstAccessUnit) {
877 postQueueEOS(i, ERROR_END_OF_STREAM);
878 }
879
880 if (!info->mUsingInterleavedTCP) {
881 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
882
883 // Clear the tag
884 if (mUIDValid) {
885 HTTPBase::UnRegisterSocketUserTag(info->mRTPSocket);
886 HTTPBase::UnRegisterSocketUserMark(info->mRTPSocket);
887 HTTPBase::UnRegisterSocketUserTag(info->mRTCPSocket);
888 HTTPBase::UnRegisterSocketUserMark(info->mRTCPSocket);
889 }
890
891 close(info->mRTPSocket);
892 close(info->mRTCPSocket);
893 }
894 }
895 mTracks.clear();
896 mSetupTracksSuccessful = false;
897 mSeekPending = false;
898 mFirstAccessUnit = true;
899 mAllTracksHaveTime = false;
900 mNTPAnchorUs = -1;
901 mMediaAnchorUs = -1;
902 mNumAccessUnitsReceived = 0;
903 mReceivedFirstRTCPPacket = false;
904 mReceivedFirstRTPPacket = false;
905 mPausing = false;
906 mSeekable = true;
907
908 sp<AMessage> reply = new AMessage('tear', this);
909
910 int32_t reconnect;
911 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
912 reply->setInt32("reconnect", true);
913 }
914
915 AString request;
916 request = "TEARDOWN ";
917
918 // XXX should use aggregate url from SDP here...
919 request.append(mSessionURL);
920 request.append(" RTSP/1.0\r\n");
921
922 request.append("Session: ");
923 request.append(mSessionID);
924 request.append("\r\n");
925
926 request.append("\r\n");
927
928 mConn->sendRequest(request.c_str(), reply);
929 break;
930 }
931
932 case 'tear':
933 {
934 int32_t result;
935 CHECK(msg->findInt32("result", &result));
936
937 ALOGI("TEARDOWN completed with result %d (%s)",
938 result, strerror(-result));
939
940 sp<AMessage> reply = new AMessage('disc', this);
941
942 int32_t reconnect;
943 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
944 reply->setInt32("reconnect", true);
945 }
946
947 mConn->disconnect(reply);
948 break;
949 }
950
951 case 'quit':
952 {
953 sp<AMessage> msg = mNotify->dup();
954 msg->setInt32("what", kWhatDisconnected);
955 msg->setInt32("result", UNKNOWN_ERROR);
956 msg->post();
957 break;
958 }
959
960 case 'chek':
961 {
962 int32_t generation;
963 CHECK(msg->findInt32("generation", &generation));
964 if (generation != mCheckGeneration) {
965 // This is an outdated message. Ignore.
966 break;
967 }
968
969 if (mNumAccessUnitsReceived == 0) {
970 #if 1
971 ALOGI("stream ended? aborting.");
972 (new AMessage('abor', this))->post();
973 break;
974 #else
975 ALOGI("haven't seen an AU in a looong time.");
976 #endif
977 }
978
979 mNumAccessUnitsReceived = 0;
980 msg->post(kAccessUnitTimeoutUs);
981 break;
982 }
983
984 case 'accu':
985 {
986 if (mSeekPending) {
987 ALOGV("Stale access unit.");
988 break;
989 }
990
991 int32_t timeUpdate;
992 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
993 size_t trackIndex;
994 CHECK(msg->findSize("track-index", &trackIndex));
995
996 uint32_t rtpTime;
997 uint64_t ntpTime;
998 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
999 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1000
1001 onTimeUpdate(trackIndex, rtpTime, ntpTime);
1002 break;
1003 }
1004
1005 int32_t first;
1006 if (msg->findInt32("first-rtcp", &first)) {
1007 mReceivedFirstRTCPPacket = true;
1008 break;
1009 }
1010
1011 if (msg->findInt32("first-rtp", &first)) {
1012 mReceivedFirstRTPPacket = true;
1013 break;
1014 }
1015
1016 ++mNumAccessUnitsReceived;
1017 postAccessUnitTimeoutCheck();
1018
1019 size_t trackIndex;
1020 CHECK(msg->findSize("track-index", &trackIndex));
1021
1022 if (trackIndex >= mTracks.size()) {
1023 ALOGV("late packets ignored.");
1024 break;
1025 }
1026
1027 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1028
1029 int32_t eos;
1030 if (msg->findInt32("eos", &eos)) {
1031 ALOGI("received BYE on track index %zu", trackIndex);
1032 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1033 ALOGI("No time established => fake existing data");
1034
1035 track->mEOSReceived = true;
1036 mTryFakeRTCP = true;
1037 mReceivedFirstRTCPPacket = true;
1038 fakeTimestamps();
1039 } else {
1040 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1041 }
1042 return;
1043 }
1044
1045 if (mSeekPending) {
1046 ALOGV("we're seeking, dropping stale packet.");
1047 break;
1048 }
1049
1050 sp<ABuffer> accessUnit;
1051 CHECK(msg->findBuffer("access-unit", &accessUnit));
1052 onAccessUnitComplete(trackIndex, accessUnit);
1053 break;
1054 }
1055
1056 case 'paus':
1057 {
1058 int32_t generation;
1059 CHECK(msg->findInt32("pausecheck", &generation));
1060 if (generation != mPauseGeneration) {
1061 ALOGV("Ignoring outdated pause message.");
1062 break;
1063 }
1064
1065 if (!mSeekable) {
1066 ALOGW("This is a live stream, ignoring pause request.");
1067 break;
1068 }
1069
1070 if (mPausing) {
1071 ALOGV("This stream is already paused.");
1072 break;
1073 }
1074
1075 mCheckPending = true;
1076 ++mCheckGeneration;
1077 mPausing = true;
1078
1079 AString request = "PAUSE ";
1080 request.append(mControlURL);
1081 request.append(" RTSP/1.0\r\n");
1082
1083 request.append("Session: ");
1084 request.append(mSessionID);
1085 request.append("\r\n");
1086
1087 request.append("\r\n");
1088
1089 sp<AMessage> reply = new AMessage('pau2', this);
1090 mConn->sendRequest(request.c_str(), reply);
1091 break;
1092 }
1093
1094 case 'pau2':
1095 {
1096 int32_t result;
1097 CHECK(msg->findInt32("result", &result));
1098 mCheckTimeoutGeneration++;
1099
1100 ALOGI("PAUSE completed with result %d (%s)",
1101 result, strerror(-result));
1102 break;
1103 }
1104
1105 case 'resu':
1106 {
1107 if (mPausing && mSeekPending) {
1108 // If seeking, Play will be sent from see1 instead
1109 break;
1110 }
1111
1112 if (!mPausing) {
1113 // Dont send PLAY if we have not paused
1114 break;
1115 }
1116 AString request = "PLAY ";
1117 request.append(mControlURL);
1118 request.append(" RTSP/1.0\r\n");
1119
1120 request.append("Session: ");
1121 request.append(mSessionID);
1122 request.append("\r\n");
1123
1124 request.append("\r\n");
1125
1126 sp<AMessage> reply = new AMessage('res2', this);
1127 mConn->sendRequest(request.c_str(), reply);
1128 break;
1129 }
1130
1131 case 'res2':
1132 {
1133 int32_t result;
1134 CHECK(msg->findInt32("result", &result));
1135
1136 ALOGI("PLAY (for resume) completed with result %d (%s)",
1137 result, strerror(-result));
1138
1139 mCheckPending = false;
1140 ++mCheckGeneration;
1141 postAccessUnitTimeoutCheck();
1142
1143 if (result == OK) {
1144 sp<RefBase> obj;
1145 CHECK(msg->findObject("response", &obj));
1146 sp<ARTSPResponse> response =
1147 static_cast<ARTSPResponse *>(obj.get());
1148
1149 if (response->mStatusCode != 200) {
1150 result = UNKNOWN_ERROR;
1151 } else {
1152 parsePlayResponse(response);
1153
1154 // Post new timeout in order to make sure to use
1155 // fake timestamps if no new Sender Reports arrive
1156 sp<AMessage> timeout = new AMessage('tiou', this);
1157 mCheckTimeoutGeneration++;
1158 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1159 timeout->post(kStartupTimeoutUs);
1160 }
1161 }
1162
1163 if (result != OK) {
1164 ALOGE("resume failed, aborting.");
1165 (new AMessage('abor', this))->post();
1166 }
1167
1168 mPausing = false;
1169 break;
1170 }
1171
1172 case 'seek':
1173 {
1174 if (!mSeekable) {
1175 ALOGW("This is a live stream, ignoring seek request.");
1176
1177 sp<AMessage> msg = mNotify->dup();
1178 msg->setInt32("what", kWhatSeekDone);
1179 msg->post();
1180 break;
1181 }
1182
1183 int64_t timeUs;
1184 CHECK(msg->findInt64("time", &timeUs));
1185
1186 mSeekPending = true;
1187
1188 // Disable the access unit timeout until we resumed
1189 // playback again.
1190 mCheckPending = true;
1191 ++mCheckGeneration;
1192
1193 sp<AMessage> reply = new AMessage('see0', this);
1194 reply->setInt64("time", timeUs);
1195
1196 if (mPausing) {
1197 // PAUSE already sent
1198 ALOGI("Pause already sent");
1199 reply->post();
1200 break;
1201 }
1202 AString request = "PAUSE ";
1203 request.append(mControlURL);
1204 request.append(" RTSP/1.0\r\n");
1205
1206 request.append("Session: ");
1207 request.append(mSessionID);
1208 request.append("\r\n");
1209
1210 request.append("\r\n");
1211
1212 mConn->sendRequest(request.c_str(), reply);
1213 break;
1214 }
1215
1216 case 'see0':
1217 {
1218 // Session is paused now.
1219 status_t err = OK;
1220 msg->findInt32("result", &err);
1221
1222 int64_t timeUs;
1223 CHECK(msg->findInt64("time", &timeUs));
1224
1225 sp<AMessage> notify = mNotify->dup();
1226 notify->setInt32("what", kWhatSeekPaused);
1227 notify->setInt32("err", err);
1228 notify->setInt64("time", timeUs);
1229 notify->post();
1230 break;
1231
1232 }
1233
1234 case 'see1':
1235 {
1236 for (size_t i = 0; i < mTracks.size(); ++i) {
1237 TrackInfo *info = &mTracks.editItemAt(i);
1238
1239 postQueueSeekDiscontinuity(i);
1240 info->mEOSReceived = false;
1241
1242 info->mRTPAnchor = 0;
1243 info->mNTPAnchorUs = -1;
1244 }
1245
1246 mAllTracksHaveTime = false;
1247 mNTPAnchorUs = -1;
1248
1249 // Start new timeoutgeneration to avoid getting timeout
1250 // before PLAY response arrive
1251 sp<AMessage> timeout = new AMessage('tiou', this);
1252 mCheckTimeoutGeneration++;
1253 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1254 timeout->post(kStartupTimeoutUs);
1255
1256 int64_t timeUs;
1257 CHECK(msg->findInt64("time", &timeUs));
1258
1259 AString request = "PLAY ";
1260 request.append(mControlURL);
1261 request.append(" RTSP/1.0\r\n");
1262
1263 request.append("Session: ");
1264 request.append(mSessionID);
1265 request.append("\r\n");
1266
1267 request.append(
1268 AStringPrintf(
1269 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1270
1271 request.append("\r\n");
1272
1273 sp<AMessage> reply = new AMessage('see2', this);
1274 mConn->sendRequest(request.c_str(), reply);
1275 break;
1276 }
1277
1278 case 'see2':
1279 {
1280 if (mTracks.size() == 0) {
1281 // We have already hit abor, break
1282 break;
1283 }
1284
1285 int32_t result;
1286 CHECK(msg->findInt32("result", &result));
1287
1288 ALOGI("PLAY (for seek) completed with result %d (%s)",
1289 result, strerror(-result));
1290
1291 mCheckPending = false;
1292 ++mCheckGeneration;
1293 postAccessUnitTimeoutCheck();
1294
1295 if (result == OK) {
1296 sp<RefBase> obj;
1297 CHECK(msg->findObject("response", &obj));
1298 sp<ARTSPResponse> response =
1299 static_cast<ARTSPResponse *>(obj.get());
1300
1301 if (response->mStatusCode != 200) {
1302 result = UNKNOWN_ERROR;
1303 } else {
1304 parsePlayResponse(response);
1305
1306 // Post new timeout in order to make sure to use
1307 // fake timestamps if no new Sender Reports arrive
1308 sp<AMessage> timeout = new AMessage('tiou', this);
1309 mCheckTimeoutGeneration++;
1310 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1311 timeout->post(kStartupTimeoutUs);
1312
1313 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1314 CHECK_GE(i, 0);
1315
1316 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1317
1318 ALOGI("seek completed.");
1319 }
1320 }
1321
1322 if (result != OK) {
1323 ALOGE("seek failed, aborting.");
1324 (new AMessage('abor', this))->post();
1325 }
1326
1327 mPausing = false;
1328 mSeekPending = false;
1329
1330 // Discard all stale access units.
1331 for (size_t i = 0; i < mTracks.size(); ++i) {
1332 TrackInfo *track = &mTracks.editItemAt(i);
1333 track->mPackets.clear();
1334 }
1335
1336 sp<AMessage> msg = mNotify->dup();
1337 msg->setInt32("what", kWhatSeekDone);
1338 msg->post();
1339 break;
1340 }
1341
1342 case 'biny':
1343 {
1344 sp<ABuffer> buffer;
1345 CHECK(msg->findBuffer("buffer", &buffer));
1346
1347 int32_t index;
1348 CHECK(buffer->meta()->findInt32("index", &index));
1349
1350 mRTPConn->injectPacket(index, buffer);
1351 break;
1352 }
1353
1354 case 'tiou':
1355 {
1356 int32_t timeoutGenerationCheck;
1357 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1358 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1359 // This is an outdated message. Ignore.
1360 // This typically happens if a lot of seeks are
1361 // performed, since new timeout messages now are
1362 // posted at seek as well.
1363 break;
1364 }
1365 if (!mReceivedFirstRTCPPacket) {
1366 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1367 ALOGW("We received RTP packets but no RTCP packets, "
1368 "using fake timestamps.");
1369
1370 mTryFakeRTCP = true;
1371
1372 mReceivedFirstRTCPPacket = true;
1373
1374 fakeTimestamps();
1375 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1376 ALOGW("Never received any data, switching transports.");
1377
1378 mTryTCPInterleaving = true;
1379
1380 sp<AMessage> msg = new AMessage('abor', this);
1381 msg->setInt32("reconnect", true);
1382 msg->post();
1383 } else {
1384 ALOGW("Never received any data, disconnecting.");
1385 (new AMessage('abor', this))->post();
1386 }
1387 } else {
1388 if (!mAllTracksHaveTime) {
1389 ALOGW("We received some RTCP packets, but time "
1390 "could not be established on all tracks, now "
1391 "using fake timestamps");
1392
1393 fakeTimestamps();
1394 }
1395 }
1396 break;
1397 }
1398
1399 default:
1400 TRESPASS();
1401 break;
1402 }
1403 }
1404
postKeepAliveMyHandler1405 void postKeepAlive() {
1406 sp<AMessage> msg = new AMessage('aliv', this);
1407 msg->setInt32("generation", mKeepAliveGeneration);
1408 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1409 }
1410
postAccessUnitTimeoutCheckMyHandler1411 void postAccessUnitTimeoutCheck() {
1412 if (mCheckPending) {
1413 return;
1414 }
1415
1416 mCheckPending = true;
1417 sp<AMessage> check = new AMessage('chek', this);
1418 check->setInt32("generation", mCheckGeneration);
1419 check->post(kAccessUnitTimeoutUs);
1420 }
1421
SplitStringMyHandler1422 static void SplitString(
1423 const AString &s, const char *separator, List<AString> *items) {
1424 items->clear();
1425 size_t start = 0;
1426 while (start < s.size()) {
1427 ssize_t offset = s.find(separator, start);
1428
1429 if (offset < 0) {
1430 items->push_back(AString(s, start, s.size() - start));
1431 break;
1432 }
1433
1434 items->push_back(AString(s, start, offset - start));
1435 start = offset + strlen(separator);
1436 }
1437 }
1438
parsePlayResponseMyHandler1439 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1440 mPlayResponseParsed = true;
1441 if (mTracks.size() == 0) {
1442 ALOGV("parsePlayResponse: late packets ignored.");
1443 return;
1444 }
1445
1446 ssize_t i = response->mHeaders.indexOfKey("range");
1447 if (i < 0) {
1448 // Server doesn't even tell use what range it is going to
1449 // play, therefore we won't support seeking.
1450 return;
1451 }
1452
1453 AString range = response->mHeaders.valueAt(i);
1454 ALOGV("Range: %s", range.c_str());
1455
1456 AString val;
1457 CHECK(GetAttribute(range.c_str(), "npt", &val));
1458
1459 float npt1, npt2;
1460 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1461 // This is a live stream and therefore not seekable.
1462
1463 ALOGI("This is a live stream");
1464 return;
1465 }
1466
1467 i = response->mHeaders.indexOfKey("rtp-info");
1468 CHECK_GE(i, 0);
1469
1470 AString rtpInfo = response->mHeaders.valueAt(i);
1471 List<AString> streamInfos;
1472 SplitString(rtpInfo, ",", &streamInfos);
1473
1474 int n = 1;
1475 for (List<AString>::iterator it = streamInfos.begin();
1476 it != streamInfos.end(); ++it) {
1477 (*it).trim();
1478 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1479
1480 CHECK(GetAttribute((*it).c_str(), "url", &val));
1481
1482 size_t trackIndex = 0;
1483 while (trackIndex < mTracks.size()
1484 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1485 ++trackIndex;
1486 }
1487 CHECK_LT(trackIndex, mTracks.size());
1488
1489 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1490
1491 char *end;
1492 unsigned long seq = strtoul(val.c_str(), &end, 10);
1493
1494 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1495 info->mFirstSeqNumInSegment = seq;
1496 info->mNewSegment = true;
1497 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1498
1499 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1500
1501 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1502
1503 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1504
1505 info->mNormalPlayTimeRTP = rtpTime;
1506 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1507
1508 if (!mFirstAccessUnit) {
1509 postNormalPlayTimeMapping(
1510 trackIndex,
1511 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1512 }
1513
1514 ++n;
1515 }
1516 }
1517
getTrackFormatMyHandler1518 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1519 CHECK_GE(index, 0u);
1520 CHECK_LT(index, mTracks.size());
1521
1522 const TrackInfo &info = mTracks.itemAt(index);
1523
1524 *timeScale = info.mTimeScale;
1525
1526 return info.mPacketSource->getFormat();
1527 }
1528
countTracksMyHandler1529 size_t countTracks() const {
1530 return mTracks.size();
1531 }
1532
1533 private:
1534 struct TrackInfo {
1535 AString mURL;
1536 int mRTPSocket;
1537 int mRTCPSocket;
1538 bool mUsingInterleavedTCP;
1539 uint32_t mFirstSeqNumInSegment;
1540 bool mNewSegment;
1541 int32_t mAllowedStaleAccessUnits;
1542
1543 uint32_t mRTPAnchor;
1544 int64_t mNTPAnchorUs;
1545 int32_t mTimeScale;
1546 bool mEOSReceived;
1547
1548 uint32_t mNormalPlayTimeRTP;
1549 int64_t mNormalPlayTimeUs;
1550
1551 sp<APacketSource> mPacketSource;
1552
1553 // Stores packets temporarily while no notion of time
1554 // has been established yet.
1555 List<sp<ABuffer> > mPackets;
1556 };
1557
1558 sp<AMessage> mNotify;
1559 bool mUIDValid;
1560 uid_t mUID;
1561 sp<ALooper> mNetLooper;
1562 sp<ARTSPConnection> mConn;
1563 sp<ARTPConnection> mRTPConn;
1564 sp<ASessionDescription> mSessionDesc;
1565 AString mOriginalSessionURL; // This one still has user:pass@
1566 AString mSessionURL;
1567 AString mSessionHost;
1568 AString mBaseURL;
1569 AString mControlURL;
1570 AString mSessionID;
1571 bool mSetupTracksSuccessful;
1572 bool mSeekPending;
1573 bool mFirstAccessUnit;
1574
1575 bool mAllTracksHaveTime;
1576 int64_t mNTPAnchorUs;
1577 int64_t mMediaAnchorUs;
1578 int64_t mLastMediaTimeUs;
1579
1580 int64_t mNumAccessUnitsReceived;
1581 bool mCheckPending;
1582 int32_t mCheckGeneration;
1583 int32_t mCheckTimeoutGeneration;
1584 bool mTryTCPInterleaving;
1585 bool mTryFakeRTCP;
1586 bool mReceivedFirstRTCPPacket;
1587 bool mReceivedFirstRTPPacket;
1588 bool mSeekable;
1589 int64_t mKeepAliveTimeoutUs;
1590 int32_t mKeepAliveGeneration;
1591 bool mPausing;
1592 int32_t mPauseGeneration;
1593
1594 Vector<TrackInfo> mTracks;
1595
1596 bool mPlayResponseParsed;
1597
setupTrackMyHandler1598 void setupTrack(size_t index) {
1599 sp<APacketSource> source =
1600 new APacketSource(mSessionDesc, index);
1601
1602 if (source->initCheck() != OK) {
1603 ALOGW("Unsupported format. Ignoring track #%zu.", index);
1604
1605 sp<AMessage> reply = new AMessage('setu', this);
1606 reply->setSize("index", index);
1607 reply->setInt32("result", ERROR_UNSUPPORTED);
1608 reply->post();
1609 return;
1610 }
1611
1612 AString url;
1613 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1614
1615 AString trackURL;
1616 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1617
1618 mTracks.push(TrackInfo());
1619 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1620 info->mURL = trackURL;
1621 info->mPacketSource = source;
1622 info->mUsingInterleavedTCP = false;
1623 info->mFirstSeqNumInSegment = 0;
1624 info->mNewSegment = true;
1625 info->mAllowedStaleAccessUnits = kMaxAllowedStaleAccessUnits;
1626 info->mRTPSocket = -1;
1627 info->mRTCPSocket = -1;
1628 info->mRTPAnchor = 0;
1629 info->mNTPAnchorUs = -1;
1630 info->mNormalPlayTimeRTP = 0;
1631 info->mNormalPlayTimeUs = 0ll;
1632
1633 unsigned long PT;
1634 AString formatDesc;
1635 AString formatParams;
1636 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1637
1638 int32_t timescale;
1639 int32_t numChannels;
1640 ASessionDescription::ParseFormatDesc(
1641 formatDesc.c_str(), ×cale, &numChannels);
1642
1643 info->mTimeScale = timescale;
1644 info->mEOSReceived = false;
1645
1646 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1647
1648 AString request = "SETUP ";
1649 request.append(trackURL);
1650 request.append(" RTSP/1.0\r\n");
1651
1652 if (mTryTCPInterleaving) {
1653 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1654 info->mUsingInterleavedTCP = true;
1655 info->mRTPSocket = interleaveIndex;
1656 info->mRTCPSocket = interleaveIndex + 1;
1657
1658 request.append("Transport: RTP/AVP/TCP;interleaved=");
1659 request.append(interleaveIndex);
1660 request.append("-");
1661 request.append(interleaveIndex + 1);
1662 } else {
1663 unsigned rtpPort;
1664 ARTPConnection::MakePortPair(
1665 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1666
1667 if (mUIDValid) {
1668 HTTPBase::RegisterSocketUserTag(info->mRTPSocket, mUID,
1669 (uint32_t)*(uint32_t*) "RTP_");
1670 HTTPBase::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1671 (uint32_t)*(uint32_t*) "RTP_");
1672 HTTPBase::RegisterSocketUserMark(info->mRTPSocket, mUID);
1673 HTTPBase::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1674 }
1675
1676 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1677 request.append(rtpPort);
1678 request.append("-");
1679 request.append(rtpPort + 1);
1680 }
1681
1682 request.append("\r\n");
1683
1684 if (index > 1) {
1685 request.append("Session: ");
1686 request.append(mSessionID);
1687 request.append("\r\n");
1688 }
1689
1690 request.append("\r\n");
1691
1692 sp<AMessage> reply = new AMessage('setu', this);
1693 reply->setSize("index", index);
1694 reply->setSize("track-index", mTracks.size() - 1);
1695 mConn->sendRequest(request.c_str(), reply);
1696 }
1697
MakeURLMyHandler1698 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1699 out->clear();
1700
1701 if (strncasecmp("rtsp://", baseURL, 7)) {
1702 // Base URL must be absolute
1703 return false;
1704 }
1705
1706 if (!strncasecmp("rtsp://", url, 7)) {
1707 // "url" is already an absolute URL, ignore base URL.
1708 out->setTo(url);
1709 return true;
1710 }
1711
1712 size_t n = strlen(baseURL);
1713 out->setTo(baseURL);
1714 if (baseURL[n - 1] != '/') {
1715 out->append("/");
1716 }
1717 out->append(url);
1718
1719 return true;
1720 }
1721
fakeTimestampsMyHandler1722 void fakeTimestamps() {
1723 mNTPAnchorUs = -1ll;
1724 for (size_t i = 0; i < mTracks.size(); ++i) {
1725 onTimeUpdate(i, 0, 0ll);
1726 }
1727 }
1728
dataReceivedOnAllChannelsMyHandler1729 bool dataReceivedOnAllChannels() {
1730 TrackInfo *track;
1731 for (size_t i = 0; i < mTracks.size(); ++i) {
1732 track = &mTracks.editItemAt(i);
1733 if (track->mPackets.empty()) {
1734 return false;
1735 }
1736 }
1737 return true;
1738 }
1739
handleFirstAccessUnitMyHandler1740 void handleFirstAccessUnit() {
1741 if (mFirstAccessUnit) {
1742 sp<AMessage> msg = mNotify->dup();
1743 msg->setInt32("what", kWhatConnected);
1744 msg->post();
1745
1746 if (mSeekable) {
1747 for (size_t i = 0; i < mTracks.size(); ++i) {
1748 TrackInfo *info = &mTracks.editItemAt(i);
1749
1750 postNormalPlayTimeMapping(
1751 i,
1752 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1753 }
1754 }
1755
1756 mFirstAccessUnit = false;
1757 }
1758 }
1759
onTimeUpdateMyHandler1760 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1761 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1762 trackIndex, rtpTime, (long long)ntpTime);
1763
1764 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1765
1766 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1767
1768 track->mRTPAnchor = rtpTime;
1769 track->mNTPAnchorUs = ntpTimeUs;
1770
1771 if (mNTPAnchorUs < 0) {
1772 mNTPAnchorUs = ntpTimeUs;
1773 mMediaAnchorUs = mLastMediaTimeUs;
1774 }
1775
1776 if (!mAllTracksHaveTime) {
1777 bool allTracksHaveTime = (mTracks.size() > 0);
1778 for (size_t i = 0; i < mTracks.size(); ++i) {
1779 TrackInfo *track = &mTracks.editItemAt(i);
1780 if (track->mNTPAnchorUs < 0) {
1781 allTracksHaveTime = false;
1782 break;
1783 }
1784 }
1785 if (allTracksHaveTime) {
1786 mAllTracksHaveTime = true;
1787 ALOGI("Time now established for all tracks.");
1788 }
1789 }
1790 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1791 handleFirstAccessUnit();
1792
1793 // Time is now established, lets start timestamping immediately
1794 for (size_t i = 0; i < mTracks.size(); ++i) {
1795 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1796 while (!trackInfo->mPackets.empty()) {
1797 sp<ABuffer> accessUnit = *trackInfo->mPackets.begin();
1798 trackInfo->mPackets.erase(trackInfo->mPackets.begin());
1799
1800 if (addMediaTimestamp(i, trackInfo, accessUnit)) {
1801 postQueueAccessUnit(i, accessUnit);
1802 }
1803 }
1804 }
1805 for (size_t i = 0; i < mTracks.size(); ++i) {
1806 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1807 if (trackInfo->mEOSReceived) {
1808 postQueueEOS(i, ERROR_END_OF_STREAM);
1809 trackInfo->mEOSReceived = false;
1810 }
1811 }
1812 }
1813 }
1814
onAccessUnitCompleteMyHandler1815 void onAccessUnitComplete(
1816 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1817 ALOGV("onAccessUnitComplete track %d", trackIndex);
1818
1819 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1820 if(!mPlayResponseParsed){
1821 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1822 ALOGI("play response is not parsed, storing accessunit %u", seqNum);
1823 track->mPackets.push_back(accessUnit);
1824 return;
1825 }
1826
1827 handleFirstAccessUnit();
1828
1829 if (!mAllTracksHaveTime) {
1830 ALOGV("storing accessUnit, no time established yet");
1831 track->mPackets.push_back(accessUnit);
1832 return;
1833 }
1834
1835 while (!track->mPackets.empty()) {
1836 sp<ABuffer> accessUnit = *track->mPackets.begin();
1837 track->mPackets.erase(track->mPackets.begin());
1838
1839 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1840 if (track->mNewSegment) {
1841 // The sequence number from RTP packet has only 16 bits and is extended
1842 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1843 // RTSP "PLAY" command should be used to detect the first RTP packet
1844 // after seeking.
1845 if (track->mAllowedStaleAccessUnits > 0) {
1846 uint32_t seqNum16 = seqNum & 0xffff;
1847 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1848 if (seqNum16 > firstSeqNumInSegment16 + kMaxAllowedStaleAccessUnits
1849 || seqNum16 < firstSeqNumInSegment16) {
1850 // Not the first rtp packet of the stream after seeking, discarding.
1851 track->mAllowedStaleAccessUnits--;
1852 ALOGV("discarding stale access unit (0x%x : 0x%x)",
1853 seqNum, track->mFirstSeqNumInSegment);
1854 continue;
1855 }
1856 ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1857 "Missing the first packet(%u), now take packet(%u) as first one",
1858 track->mFirstSeqNumInSegment, seqNum);
1859 } else { // track->mAllowedStaleAccessUnits <= 0
1860 mNumAccessUnitsReceived = 0;
1861 ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1862 "Still no first rtp packet after %d stale ones",
1863 kMaxAllowedStaleAccessUnits);
1864 track->mAllowedStaleAccessUnits = -1;
1865 return;
1866 }
1867
1868 // Now found the first rtp packet of the stream after seeking.
1869 track->mFirstSeqNumInSegment = seqNum;
1870 track->mNewSegment = false;
1871 }
1872
1873 if (seqNum < track->mFirstSeqNumInSegment) {
1874 ALOGV("dropping stale access-unit (%d < %d)",
1875 seqNum, track->mFirstSeqNumInSegment);
1876 continue;
1877 }
1878
1879
1880 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1881 postQueueAccessUnit(trackIndex, accessUnit);
1882 }
1883 }
1884
1885 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1886 postQueueAccessUnit(trackIndex, accessUnit);
1887 }
1888
1889 if (track->mEOSReceived) {
1890 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1891 track->mEOSReceived = false;
1892 }
1893 }
1894
addMediaTimestampMyHandler1895 bool addMediaTimestamp(
1896 int32_t trackIndex, const TrackInfo *track,
1897 const sp<ABuffer> &accessUnit) {
1898 UNUSED_UNLESS_VERBOSE(trackIndex);
1899
1900 uint32_t rtpTime;
1901 CHECK(accessUnit->meta()->findInt32(
1902 "rtp-time", (int32_t *)&rtpTime));
1903
1904 int64_t relRtpTimeUs =
1905 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1906 / track->mTimeScale;
1907
1908 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1909
1910 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1911
1912 if (mediaTimeUs > mLastMediaTimeUs) {
1913 mLastMediaTimeUs = mediaTimeUs;
1914 }
1915
1916 if (mediaTimeUs < 0) {
1917 ALOGV("dropping early accessUnit.");
1918 return false;
1919 }
1920
1921 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1922 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1923
1924 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1925
1926 return true;
1927 }
1928
postQueueAccessUnitMyHandler1929 void postQueueAccessUnit(
1930 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1931 sp<AMessage> msg = mNotify->dup();
1932 msg->setInt32("what", kWhatAccessUnit);
1933 msg->setSize("trackIndex", trackIndex);
1934 msg->setBuffer("accessUnit", accessUnit);
1935 msg->post();
1936 }
1937
postQueueEOSMyHandler1938 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1939 sp<AMessage> msg = mNotify->dup();
1940 msg->setInt32("what", kWhatEOS);
1941 msg->setSize("trackIndex", trackIndex);
1942 msg->setInt32("finalResult", finalResult);
1943 msg->post();
1944 }
1945
postQueueSeekDiscontinuityMyHandler1946 void postQueueSeekDiscontinuity(size_t trackIndex) {
1947 sp<AMessage> msg = mNotify->dup();
1948 msg->setInt32("what", kWhatSeekDiscontinuity);
1949 msg->setSize("trackIndex", trackIndex);
1950 msg->post();
1951 }
1952
postNormalPlayTimeMappingMyHandler1953 void postNormalPlayTimeMapping(
1954 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1955 sp<AMessage> msg = mNotify->dup();
1956 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1957 msg->setSize("trackIndex", trackIndex);
1958 msg->setInt32("rtpTime", rtpTime);
1959 msg->setInt64("nptUs", nptUs);
1960 msg->post();
1961 }
1962
1963 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
1964 };
1965
1966 } // namespace android
1967
1968 #endif // MY_HANDLER_H_
1969