1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef MY_HANDLER_H_
18
19 #define MY_HANDLER_H_
20
21 //#define LOG_NDEBUG 0
22
23 #ifndef LOG_TAG
24 #define LOG_TAG "MyHandler"
25 #endif
26
27 #include <utils/Log.h>
28 #include <cutils/properties.h> // for property_get
29
30 #include "APacketSource.h"
31 #include "ARTPConnection.h"
32 #include "ARTSPConnection.h"
33 #include "ASessionDescription.h"
34 #include "NetworkUtils.h"
35
36 #include <ctype.h>
37 #include <cutils/properties.h>
38
39 #include <datasource/HTTPBase.h>
40 #include <media/stagefright/foundation/ABuffer.h>
41 #include <media/stagefright/foundation/ADebug.h>
42 #include <media/stagefright/foundation/ALooper.h>
43 #include <media/stagefright/foundation/AMessage.h>
44 #include <media/stagefright/MediaErrors.h>
45 #include <media/stagefright/Utils.h>
46 #include <media/stagefright/FoundationUtils.h>
47
48 #include <arpa/inet.h>
49 #include <sys/socket.h>
50 #include <netdb.h>
51
52
53 #if LOG_NDEBUG
54 #define UNUSED_UNLESS_VERBOSE(x) (void)(x)
55 #else
56 #define UNUSED_UNLESS_VERBOSE(x)
57 #endif
58
59 #ifndef FALLTHROUGH_INTENDED
60 #define FALLTHROUGH_INTENDED [[clang::fallthrough]] // NOLINT
61 #endif
62
63 // If no access units are received within 5 secs, assume that the rtp
64 // stream has ended and signal end of stream.
65 static int64_t kAccessUnitTimeoutUs = 10000000ll;
66
67 // If no access units arrive for the first 10 secs after starting the
68 // stream, assume none ever will and signal EOS or switch transports.
69 static int64_t kStartupTimeoutUs = 10000000ll;
70
71 static int64_t kDefaultKeepAliveTimeoutUs = 60000000ll;
72
73 static int64_t kPauseDelayUs = 3000000ll;
74
75 // The allowed maximum number of stale access units at the beginning of
76 // a new sequence.
77 static int32_t kMaxAllowedStaleAudioAccessUnits = 20;
78 static int32_t kMaxAllowedStaleVideoAccessUnits = 400;
79
80 static int64_t kTearDownTimeoutUs = 3000000ll;
81
82 namespace android {
83
GetAttribute(const char * s,const char * key,AString * value)84 static bool GetAttribute(const char *s, const char *key, AString *value) {
85 value->clear();
86
87 size_t keyLen = strlen(key);
88
89 for (;;) {
90 while (isspace(*s)) {
91 ++s;
92 }
93
94 const char *colonPos = strchr(s, ';');
95
96 size_t len =
97 (colonPos == NULL) ? strlen(s) : colonPos - s;
98
99 if (len >= keyLen + 1 && s[keyLen] == '=' && !strncmp(s, key, keyLen)) {
100 value->setTo(&s[keyLen + 1], len - keyLen - 1);
101 return true;
102 }
103
104 if (colonPos == NULL) {
105 return false;
106 }
107
108 s = colonPos + 1;
109 }
110 }
111
GetMaxAllowedStaleCount(bool isVideo)112 static int32_t GetMaxAllowedStaleCount(bool isVideo) {
113 return isVideo ? kMaxAllowedStaleVideoAccessUnits : kMaxAllowedStaleAudioAccessUnits;
114 }
115
116 struct MyHandler : public AHandler {
117 enum {
118 kWhatConnected = 'conn',
119 kWhatDisconnected = 'disc',
120 kWhatSeekPaused = 'spau',
121 kWhatSeekDone = 'sdon',
122
123 kWhatAccessUnit = 'accU',
124 kWhatEOS = 'eos!',
125 kWhatSeekDiscontinuity = 'seeD',
126 kWhatNormalPlayTimeMapping = 'nptM',
127 };
128
129 MyHandler(
130 const char *url,
131 const sp<AMessage> ¬ify,
132 bool uidValid = false, uid_t uid = 0)
mNotifyMyHandler133 : mNotify(notify),
134 mUIDValid(uidValid),
135 mUID(uid),
136 mNetLooper(new ALooper),
137 mConn(new ARTSPConnection(mUIDValid, mUID)),
138 mRTPConn(new ARTPConnection),
139 mOriginalSessionURL(url),
140 mSessionURL(url),
141 mSetupTracksSuccessful(false),
142 mSeekPending(false),
143 mFirstAccessUnit(true),
144 mAllTracksHaveTime(false),
145 mNTPAnchorUs(-1),
146 mMediaAnchorUs(-1),
147 mLastMediaTimeUs(0),
148 mNumAccessUnitsReceived(0),
149 mCheckPending(false),
150 mCheckGeneration(0),
151 mCheckTimeoutGeneration(0),
152 mTryTCPInterleaving(property_get_bool("rtp.transport.TCP", false)),
153 mTryFakeRTCP(false),
154 mReceivedFirstRTCPPacket(false),
155 mReceivedFirstRTPPacket(false),
156 mSeekable(true),
157 mKeepAliveTimeoutUs(kDefaultKeepAliveTimeoutUs),
158 mKeepAliveGeneration(0),
159 mPausing(false),
160 mPauseGeneration(0),
161 mPlayResponseParsed(false) {
162 mNetLooper->setName("rtsp net");
163 mNetLooper->start(false /* runOnCallingThread */,
164 false /* canCallJava */,
165 PRIORITY_HIGHEST);
166
167 // Strip any authentication info from the session url, we don't
168 // want to transmit user/pass in cleartext.
169 AString host, path, user, pass;
170 unsigned port;
171 CHECK(ARTSPConnection::ParseURL(
172 mSessionURL.c_str(), &host, &port, &path, &user, &pass));
173
174 if (user.size() > 0) {
175 mSessionURL.clear();
176 mSessionURL.append("rtsp://");
177 mSessionURL.append(host);
178 mSessionURL.append(":");
179 mSessionURL.append(AStringPrintf("%u", port));
180 mSessionURL.append(path);
181
182 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
183 }
184
185 mSessionHost = host;
186 }
187
connectMyHandler188 void connect() {
189 looper()->registerHandler(mConn);
190 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
191
192 sp<AMessage> notify = new AMessage('biny', this);
193 mConn->observeBinaryData(notify);
194
195 sp<AMessage> reply = new AMessage('conn', this);
196 mConn->connect(mOriginalSessionURL.c_str(), reply);
197 }
198
loadSDPMyHandler199 void loadSDP(const sp<ASessionDescription>& desc) {
200 looper()->registerHandler(mConn);
201 (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
202
203 sp<AMessage> notify = new AMessage('biny', this);
204 mConn->observeBinaryData(notify);
205
206 sp<AMessage> reply = new AMessage('sdpl', this);
207 reply->setObject("description", desc);
208 mConn->connect(mOriginalSessionURL.c_str(), reply);
209 }
210
getControlURLMyHandler211 AString getControlURL() {
212 AString sessionLevelControlURL;
213 if (mSessionDesc->findAttribute(
214 0,
215 "a=control",
216 &sessionLevelControlURL)) {
217 if (sessionLevelControlURL.compare("*") == 0) {
218 return mBaseURL;
219 } else {
220 AString controlURL;
221 CHECK(MakeURL(
222 mBaseURL.c_str(),
223 sessionLevelControlURL.c_str(),
224 &controlURL));
225 return controlURL;
226 }
227 } else {
228 return mSessionURL;
229 }
230 }
231
disconnectMyHandler232 void disconnect() {
233 (new AMessage('abor', this))->post();
234 }
235
seekMyHandler236 void seek(int64_t timeUs) {
237 sp<AMessage> msg = new AMessage('seek', this);
238 msg->setInt64("time", timeUs);
239 mPauseGeneration++;
240 msg->post();
241 }
242
continueSeekAfterPauseMyHandler243 void continueSeekAfterPause(int64_t timeUs) {
244 sp<AMessage> msg = new AMessage('see1', this);
245 msg->setInt64("time", timeUs);
246 msg->post();
247 }
248
isSeekableMyHandler249 bool isSeekable() const {
250 return mSeekable;
251 }
252
pauseMyHandler253 void pause() {
254 sp<AMessage> msg = new AMessage('paus', this);
255 mPauseGeneration++;
256 msg->setInt32("pausecheck", mPauseGeneration);
257 msg->post();
258 }
259
resumeMyHandler260 void resume() {
261 sp<AMessage> msg = new AMessage('resu', this);
262 mPauseGeneration++;
263 msg->post();
264 }
265
getARTSPConnectionMyHandler266 sp<ARTSPConnection> getARTSPConnection() {
267 return mConn;
268 }
269
addRRMyHandler270 static void addRR(const sp<ABuffer> &buf) {
271 uint8_t *ptr = buf->data() + buf->size();
272 ptr[0] = 0x80 | 0;
273 ptr[1] = 201; // RR
274 ptr[2] = 0;
275 ptr[3] = 1;
276 ptr[4] = 0xde; // SSRC
277 ptr[5] = 0xad;
278 ptr[6] = 0xbe;
279 ptr[7] = 0xef;
280
281 buf->setRange(0, buf->size() + 8);
282 }
283
addSDESMyHandler284 static void addSDES(int s, const sp<ABuffer> &buffer) {
285 struct sockaddr_in addr;
286 socklen_t addrSize = sizeof(addr);
287 if (getsockname(s, (sockaddr *)&addr, &addrSize) != 0) {
288 inet_aton("0.0.0.0", &(addr.sin_addr));
289 }
290
291 uint8_t *data = buffer->data() + buffer->size();
292 data[0] = 0x80 | 1;
293 data[1] = 202; // SDES
294 data[4] = 0xde; // SSRC
295 data[5] = 0xad;
296 data[6] = 0xbe;
297 data[7] = 0xef;
298
299 size_t offset = 8;
300
301 data[offset++] = 1; // CNAME
302
303 AString cname = "stagefright@";
304 cname.append(inet_ntoa(addr.sin_addr));
305 data[offset++] = cname.size();
306
307 memcpy(&data[offset], cname.c_str(), cname.size());
308 offset += cname.size();
309
310 data[offset++] = 6; // TOOL
311
312 AString tool = MakeUserAgent();
313
314 data[offset++] = tool.size();
315
316 memcpy(&data[offset], tool.c_str(), tool.size());
317 offset += tool.size();
318
319 data[offset++] = 0;
320
321 if ((offset % 4) > 0) {
322 size_t count = 4 - (offset % 4);
323 switch (count) {
324 case 3:
325 data[offset++] = 0;
326 FALLTHROUGH_INTENDED;
327 case 2:
328 data[offset++] = 0;
329 FALLTHROUGH_INTENDED;
330 case 1:
331 data[offset++] = 0;
332 }
333 }
334
335 size_t numWords = (offset / 4) - 1;
336 data[2] = numWords >> 8;
337 data[3] = numWords & 0xff;
338
339 buffer->setRange(buffer->offset(), buffer->size() + offset);
340 }
341
342 // In case we're behind NAT, fire off two UDP packets to the remote
343 // rtp/rtcp ports to poke a hole into the firewall for future incoming
344 // packets. We're going to send an RR/SDES RTCP packet to both of them.
pokeAHoleMyHandler345 bool pokeAHole(int rtpSocket, int rtcpSocket, const AString &transport) {
346 struct sockaddr_in addr;
347 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
348 addr.sin_family = AF_INET;
349
350 AString source;
351 AString server_port;
352 if (!GetAttribute(transport.c_str(),
353 "source",
354 &source)) {
355 ALOGW("Missing 'source' field in Transport response. Using "
356 "RTSP endpoint address.");
357
358 struct hostent *ent = gethostbyname(mSessionHost.c_str());
359 if (ent == NULL) {
360 ALOGE("Failed to look up address of session host");
361
362 return false;
363 }
364
365 addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
366 } else {
367 addr.sin_addr.s_addr = inet_addr(source.c_str());
368 }
369
370 if (!GetAttribute(transport.c_str(),
371 "server_port",
372 &server_port)) {
373 ALOGI("Missing 'server_port' field in Transport response.");
374 return false;
375 }
376
377 int rtpPort, rtcpPort;
378 if (sscanf(server_port.c_str(), "%d-%d", &rtpPort, &rtcpPort) != 2
379 || rtpPort <= 0 || rtpPort > 65535
380 || rtcpPort <=0 || rtcpPort > 65535
381 || rtcpPort != rtpPort + 1) {
382 ALOGE("Server picked invalid RTP/RTCP port pair %s,"
383 " RTP port must be even, RTCP port must be one higher.",
384 server_port.c_str());
385
386 return false;
387 }
388
389 if (rtpPort & 1) {
390 ALOGW("Server picked an odd RTP port, it should've picked an "
391 "even one, we'll let it pass for now, but this may break "
392 "in the future.");
393 }
394
395 if (addr.sin_addr.s_addr == INADDR_NONE) {
396 return true;
397 }
398
399 if (IN_LOOPBACK(ntohl(addr.sin_addr.s_addr))) {
400 // No firewalls to traverse on the loopback interface.
401 return true;
402 }
403
404 // Make up an RR/SDES RTCP packet.
405 sp<ABuffer> buf = new ABuffer(65536);
406 buf->setRange(0, 0);
407 addRR(buf);
408 addSDES(rtpSocket, buf);
409
410 addr.sin_port = htons(rtpPort);
411
412 ssize_t n = sendto(
413 rtpSocket, buf->data(), buf->size(), 0,
414 (const sockaddr *)&addr, sizeof(addr));
415
416 if (n < (ssize_t)buf->size()) {
417 ALOGE("failed to poke a hole for RTP packets");
418 return false;
419 }
420
421 addr.sin_port = htons(rtcpPort);
422
423 n = sendto(
424 rtcpSocket, buf->data(), buf->size(), 0,
425 (const sockaddr *)&addr, sizeof(addr));
426
427 if (n < (ssize_t)buf->size()) {
428 ALOGE("failed to poke a hole for RTCP packets");
429 return false;
430 }
431
432 ALOGV("successfully poked holes.");
433
434 return true;
435 }
436
isLiveStreamMyHandler437 static bool isLiveStream(const sp<ASessionDescription> &desc) {
438 AString attrLiveStream;
439 if (desc->findAttribute(0, "a=LiveStream", &attrLiveStream)) {
440 ssize_t semicolonPos = attrLiveStream.find(";", 2);
441
442 const char* liveStreamValue;
443 if (semicolonPos < 0) {
444 liveStreamValue = attrLiveStream.c_str();
445 } else {
446 AString valString;
447 valString.setTo(attrLiveStream,
448 semicolonPos + 1,
449 attrLiveStream.size() - semicolonPos - 1);
450 liveStreamValue = valString.c_str();
451 }
452
453 uint32_t value = strtoul(liveStreamValue, NULL, 10);
454 if (value == 1) {
455 ALOGV("found live stream");
456 return true;
457 }
458 } else {
459 // It is a live stream if no duration is returned
460 int64_t durationUs;
461 if (!desc->getDurationUs(&durationUs)) {
462 ALOGV("No duration found, assume live stream");
463 return true;
464 }
465 }
466
467 return false;
468 }
469
onMessageReceivedMyHandler470 virtual void onMessageReceived(const sp<AMessage> &msg) {
471 switch (msg->what()) {
472 case 'conn':
473 {
474 int32_t result;
475 CHECK(msg->findInt32("result", &result));
476
477 ALOGI("connection request completed with result %d (%s)",
478 result, strerror(-result));
479
480 if (result == OK) {
481 AString request;
482 request = "DESCRIBE ";
483 request.append(mSessionURL);
484 request.append(" RTSP/1.0\r\n");
485 request.append("Accept: application/sdp\r\n");
486 request.append("\r\n");
487
488 sp<AMessage> reply = new AMessage('desc', this);
489 mConn->sendRequest(request.c_str(), reply);
490 } else {
491 (new AMessage('disc', this))->post();
492 }
493 break;
494 }
495
496 case 'disc':
497 {
498 ++mKeepAliveGeneration;
499
500 int32_t reconnect;
501 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
502 sp<AMessage> reply = new AMessage('conn', this);
503 mConn->connect(mOriginalSessionURL.c_str(), reply);
504 } else {
505 (new AMessage('quit', this))->post();
506 }
507 break;
508 }
509
510 case 'desc':
511 {
512 int32_t result;
513 CHECK(msg->findInt32("result", &result));
514
515 ALOGI("DESCRIBE completed with result %d (%s)",
516 result, strerror(-result));
517
518 if (result == OK) {
519 sp<RefBase> obj;
520 CHECK(msg->findObject("response", &obj));
521 sp<ARTSPResponse> response =
522 static_cast<ARTSPResponse *>(obj.get());
523
524 if (response->mStatusCode == 301 || response->mStatusCode == 302) {
525 ssize_t i = response->mHeaders.indexOfKey("location");
526 CHECK_GE(i, 0);
527
528 mOriginalSessionURL = response->mHeaders.valueAt(i);
529 mSessionURL = mOriginalSessionURL;
530
531 // Strip any authentication info from the session url, we don't
532 // want to transmit user/pass in cleartext.
533 AString host, path, user, pass;
534 unsigned port;
535 if (ARTSPConnection::ParseURL(
536 mSessionURL.c_str(), &host, &port, &path, &user, &pass)
537 && user.size() > 0) {
538 mSessionURL.clear();
539 mSessionURL.append("rtsp://");
540 mSessionURL.append(host);
541 mSessionURL.append(":");
542 mSessionURL.append(AStringPrintf("%u", port));
543 mSessionURL.append(path);
544
545 ALOGV("rewritten session url: '%s'", mSessionURL.c_str());
546 }
547
548 sp<AMessage> reply = new AMessage('conn', this);
549 mConn->connect(mOriginalSessionURL.c_str(), reply);
550 break;
551 }
552
553 if (response->mStatusCode != 200) {
554 result = UNKNOWN_ERROR;
555 } else if (response->mContent == NULL) {
556 result = ERROR_MALFORMED;
557 ALOGE("The response has no content.");
558 } else {
559 mSessionDesc = new ASessionDescription;
560
561 mSessionDesc->setTo(
562 response->mContent->data(),
563 response->mContent->size());
564
565 if (!mSessionDesc->isValid()) {
566 ALOGE("Failed to parse session description.");
567 result = ERROR_MALFORMED;
568 } else {
569 ssize_t i = response->mHeaders.indexOfKey("content-base");
570 if (i >= 0) {
571 mBaseURL = response->mHeaders.valueAt(i);
572 } else {
573 i = response->mHeaders.indexOfKey("content-location");
574 if (i >= 0) {
575 mBaseURL = response->mHeaders.valueAt(i);
576 } else {
577 mBaseURL = mSessionURL;
578 }
579 }
580
581 mSeekable = !isLiveStream(mSessionDesc);
582
583 if (!mBaseURL.startsWith("rtsp://")) {
584 // Some misbehaving servers specify a relative
585 // URL in one of the locations above, combine
586 // it with the absolute session URL to get
587 // something usable...
588
589 ALOGW("Server specified a non-absolute base URL"
590 ", combining it with the session URL to "
591 "get something usable...");
592
593 AString tmp;
594 CHECK(MakeURL(
595 mSessionURL.c_str(),
596 mBaseURL.c_str(),
597 &tmp));
598
599 mBaseURL = tmp;
600 }
601
602 mControlURL = getControlURL();
603
604 if (mSessionDesc->countTracks() < 2) {
605 // There's no actual tracks in this session.
606 // The first "track" is merely session meta
607 // data.
608
609 ALOGW("Session doesn't contain any playable "
610 "tracks. Aborting.");
611 result = ERROR_UNSUPPORTED;
612 } else {
613 setupTrack(1);
614 }
615 }
616 }
617 }
618
619 if (result != OK) {
620 sp<AMessage> reply = new AMessage('disc', this);
621 mConn->disconnect(reply);
622 }
623 break;
624 }
625
626 case 'sdpl':
627 {
628 int32_t result;
629 CHECK(msg->findInt32("result", &result));
630
631 ALOGI("SDP connection request completed with result %d (%s)",
632 result, strerror(-result));
633
634 if (result == OK) {
635 sp<RefBase> obj;
636 CHECK(msg->findObject("description", &obj));
637 mSessionDesc =
638 static_cast<ASessionDescription *>(obj.get());
639
640 if (!mSessionDesc->isValid()) {
641 ALOGE("Failed to parse session description.");
642 result = ERROR_MALFORMED;
643 } else {
644 mBaseURL = mSessionURL;
645
646 mSeekable = !isLiveStream(mSessionDesc);
647
648 mControlURL = getControlURL();
649
650 if (mSessionDesc->countTracks() < 2) {
651 // There's no actual tracks in this session.
652 // The first "track" is merely session meta
653 // data.
654
655 ALOGW("Session doesn't contain any playable "
656 "tracks. Aborting.");
657 result = ERROR_UNSUPPORTED;
658 } else {
659 setupTrack(1);
660 }
661 }
662 }
663
664 if (result != OK) {
665 sp<AMessage> reply = new AMessage('disc', this);
666 mConn->disconnect(reply);
667 }
668 break;
669 }
670
671 case 'setu':
672 {
673 size_t index;
674 CHECK(msg->findSize("index", &index));
675
676 TrackInfo *track = NULL;
677 size_t trackIndex;
678 if (msg->findSize("track-index", &trackIndex)) {
679 track = &mTracks.editItemAt(trackIndex);
680 }
681
682 int32_t result;
683 CHECK(msg->findInt32("result", &result));
684
685 ALOGI("SETUP(%zu) completed with result %d (%s)",
686 index, result, strerror(-result));
687
688 if (result == OK) {
689 CHECK(track != NULL);
690
691 sp<RefBase> obj;
692 CHECK(msg->findObject("response", &obj));
693 sp<ARTSPResponse> response =
694 static_cast<ARTSPResponse *>(obj.get());
695
696 if (response->mStatusCode != 200) {
697 result = UNKNOWN_ERROR;
698 } else {
699 ssize_t i = response->mHeaders.indexOfKey("session");
700 CHECK_GE(i, 0);
701
702 mSessionID = response->mHeaders.valueAt(i);
703
704 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
705 AString timeoutStr;
706 if (GetAttribute(
707 mSessionID.c_str(), "timeout", &timeoutStr)) {
708 char *end;
709 unsigned long timeoutSecs =
710 strtoul(timeoutStr.c_str(), &end, 10);
711
712 if (end == timeoutStr.c_str() || *end != '\0') {
713 ALOGW("server specified malformed timeout '%s'",
714 timeoutStr.c_str());
715
716 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
717 } else if (timeoutSecs < 15) {
718 ALOGW("server specified too short a timeout "
719 "(%lu secs), using default.",
720 timeoutSecs);
721
722 mKeepAliveTimeoutUs = kDefaultKeepAliveTimeoutUs;
723 } else {
724 mKeepAliveTimeoutUs = timeoutSecs * 1000000ll;
725
726 ALOGI("server specified timeout of %lu secs.",
727 timeoutSecs);
728 }
729 }
730
731 i = mSessionID.find(";");
732 if (i >= 0) {
733 // Remove options, i.e. ";timeout=90"
734 mSessionID.erase(i, mSessionID.size() - i);
735 }
736
737 sp<AMessage> notify = new AMessage('accu', this);
738 notify->setSize("track-index", trackIndex);
739
740 i = response->mHeaders.indexOfKey("transport");
741 CHECK_GE(i, 0);
742
743 if (track->mRTPSocket != -1 && track->mRTCPSocket != -1) {
744 if (!track->mUsingInterleavedTCP) {
745 AString transport = response->mHeaders.valueAt(i);
746
747 // We are going to continue even if we were
748 // unable to poke a hole into the firewall...
749 pokeAHole(
750 track->mRTPSocket,
751 track->mRTCPSocket,
752 transport);
753 }
754
755 mRTPConn->addStream(
756 track->mRTPSocket, track->mRTCPSocket,
757 mSessionDesc, index,
758 notify, track->mUsingInterleavedTCP);
759
760 mSetupTracksSuccessful = true;
761 } else {
762 result = BAD_VALUE;
763 }
764 }
765 }
766
767 if (result != OK) {
768 if (track) {
769 if (!track->mUsingInterleavedTCP) {
770 // Clear the tag
771 if (mUIDValid) {
772 NetworkUtils::UnRegisterSocketUserTag(track->mRTPSocket);
773 NetworkUtils::UnRegisterSocketUserMark(track->mRTPSocket);
774 NetworkUtils::UnRegisterSocketUserTag(track->mRTCPSocket);
775 NetworkUtils::UnRegisterSocketUserMark(track->mRTCPSocket);
776 }
777
778 close(track->mRTPSocket);
779 close(track->mRTCPSocket);
780 }
781
782 mTracks.removeItemsAt(trackIndex);
783 }
784 }
785
786 ++index;
787 if (result == OK && index < mSessionDesc->countTracks()) {
788 setupTrack(index);
789 } else if (mSetupTracksSuccessful) {
790 ++mKeepAliveGeneration;
791 postKeepAlive();
792
793 AString request = "PLAY ";
794 request.append(mControlURL);
795 request.append(" RTSP/1.0\r\n");
796
797 request.append("Session: ");
798 request.append(mSessionID);
799 request.append("\r\n");
800
801 request.append("\r\n");
802
803 sp<AMessage> reply = new AMessage('play', this);
804 mConn->sendRequest(request.c_str(), reply);
805 } else {
806 sp<AMessage> reply = new AMessage('disc', this);
807 mConn->disconnect(reply);
808 }
809 break;
810 }
811
812 case 'play':
813 {
814 int32_t result;
815 CHECK(msg->findInt32("result", &result));
816
817 ALOGI("PLAY completed with result %d (%s)",
818 result, strerror(-result));
819
820 if (result == OK) {
821 sp<RefBase> obj;
822 CHECK(msg->findObject("response", &obj));
823 sp<ARTSPResponse> response =
824 static_cast<ARTSPResponse *>(obj.get());
825
826 if (response->mStatusCode != 200) {
827 result = UNKNOWN_ERROR;
828 } else {
829 parsePlayResponse(response);
830 postTimeout();
831 }
832 }
833
834 if (result != OK) {
835 sp<AMessage> reply = new AMessage('disc', this);
836 mConn->disconnect(reply);
837 }
838
839 break;
840 }
841
842 case 'aliv':
843 {
844 int32_t generation;
845 CHECK(msg->findInt32("generation", &generation));
846
847 if (generation != mKeepAliveGeneration) {
848 // obsolete event.
849 break;
850 }
851
852 AString request;
853 request.append("OPTIONS ");
854 request.append(mSessionURL);
855 request.append(" RTSP/1.0\r\n");
856 request.append("Session: ");
857 request.append(mSessionID);
858 request.append("\r\n");
859 request.append("\r\n");
860
861 sp<AMessage> reply = new AMessage('opts', this);
862 reply->setInt32("generation", mKeepAliveGeneration);
863 mConn->sendRequest(request.c_str(), reply);
864 break;
865 }
866
867 case 'opts':
868 {
869 int32_t result;
870 CHECK(msg->findInt32("result", &result));
871
872 ALOGI("OPTIONS completed with result %d (%s)",
873 result, strerror(-result));
874
875 int32_t generation;
876 CHECK(msg->findInt32("generation", &generation));
877
878 if (generation != mKeepAliveGeneration) {
879 // obsolete event.
880 break;
881 }
882
883 postKeepAlive();
884 break;
885 }
886
887 case 'abor':
888 {
889 for (size_t i = 0; i < mTracks.size(); ++i) {
890 TrackInfo *info = &mTracks.editItemAt(i);
891
892 if (!mFirstAccessUnit) {
893 postQueueEOS(i, ERROR_END_OF_STREAM);
894 }
895
896 if (!info->mUsingInterleavedTCP) {
897 mRTPConn->removeStream(info->mRTPSocket, info->mRTCPSocket);
898
899 // Clear the tag
900 if (mUIDValid) {
901 NetworkUtils::UnRegisterSocketUserTag(info->mRTPSocket);
902 NetworkUtils::UnRegisterSocketUserMark(info->mRTPSocket);
903 NetworkUtils::UnRegisterSocketUserTag(info->mRTCPSocket);
904 NetworkUtils::UnRegisterSocketUserMark(info->mRTCPSocket);
905 }
906
907 close(info->mRTPSocket);
908 close(info->mRTCPSocket);
909 }
910 }
911 mTracks.clear();
912 mSetupTracksSuccessful = false;
913 mSeekPending = false;
914 mFirstAccessUnit = true;
915 mAllTracksHaveTime = false;
916 mNTPAnchorUs = -1;
917 mMediaAnchorUs = -1;
918 mNumAccessUnitsReceived = 0;
919 mReceivedFirstRTCPPacket = false;
920 mReceivedFirstRTPPacket = false;
921 mPausing = false;
922 mSeekable = true;
923
924 sp<AMessage> reply = new AMessage('tear', this);
925
926 int32_t reconnect;
927 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
928 reply->setInt32("reconnect", true);
929 }
930
931 AString request;
932 request = "TEARDOWN ";
933
934 // XXX should use aggregate url from SDP here...
935 request.append(mSessionURL);
936 request.append(" RTSP/1.0\r\n");
937
938 request.append("Session: ");
939 request.append(mSessionID);
940 request.append("\r\n");
941
942 request.append("\r\n");
943
944 mConn->sendRequest(request.c_str(), reply);
945
946 // If the response of teardown hasn't been received in 3 seconds,
947 // post 'tear' message to avoid ANR.
948 if (!msg->findInt32("reconnect", &reconnect) || !reconnect) {
949 sp<AMessage> teardown = reply->dup();
950 teardown->setInt32("result", -ECONNABORTED);
951 teardown->post(kTearDownTimeoutUs);
952 }
953 break;
954 }
955
956 case 'tear':
957 {
958 int32_t result;
959 CHECK(msg->findInt32("result", &result));
960
961 ALOGI("TEARDOWN completed with result %d (%s)",
962 result, strerror(-result));
963
964 sp<AMessage> reply = new AMessage('disc', this);
965
966 int32_t reconnect;
967 if (msg->findInt32("reconnect", &reconnect) && reconnect) {
968 reply->setInt32("reconnect", true);
969 }
970
971 mConn->disconnect(reply);
972 break;
973 }
974
975 case 'quit':
976 {
977 sp<AMessage> msg = mNotify->dup();
978 msg->setInt32("what", kWhatDisconnected);
979 msg->setInt32("result", UNKNOWN_ERROR);
980 msg->post();
981 break;
982 }
983
984 case 'chek':
985 {
986 int32_t generation;
987 CHECK(msg->findInt32("generation", &generation));
988 if (generation != mCheckGeneration) {
989 // This is an outdated message. Ignore.
990 break;
991 }
992
993 if (mNumAccessUnitsReceived == 0) {
994 #if 1
995 ALOGI("stream ended? aborting.");
996 (new AMessage('abor', this))->post();
997 break;
998 #else
999 ALOGI("haven't seen an AU in a looong time.");
1000 #endif
1001 }
1002
1003 mNumAccessUnitsReceived = 0;
1004 msg->post(kAccessUnitTimeoutUs);
1005 break;
1006 }
1007
1008 case 'accu':
1009 {
1010 if (mSeekPending) {
1011 ALOGV("Stale access unit.");
1012 break;
1013 }
1014
1015 int32_t timeUpdate;
1016 if (msg->findInt32("time-update", &timeUpdate) && timeUpdate) {
1017 size_t trackIndex;
1018 CHECK(msg->findSize("track-index", &trackIndex));
1019
1020 uint32_t rtpTime;
1021 uint64_t ntpTime;
1022 CHECK(msg->findInt32("rtp-time", (int32_t *)&rtpTime));
1023 CHECK(msg->findInt64("ntp-time", (int64_t *)&ntpTime));
1024
1025 onTimeUpdate(trackIndex, rtpTime, ntpTime);
1026 break;
1027 }
1028
1029 int32_t first;
1030 if (msg->findInt32("first-rtcp", &first)) {
1031 mReceivedFirstRTCPPacket = true;
1032 break;
1033 }
1034
1035 if (msg->findInt32("first-rtp", &first)) {
1036 mReceivedFirstRTPPacket = true;
1037 break;
1038 }
1039
1040 int32_t rtcpEvent;
1041 if (msg->findInt32("rtcp-event", &rtcpEvent)) {
1042 break;
1043 }
1044
1045 ++mNumAccessUnitsReceived;
1046 postAccessUnitTimeoutCheck();
1047
1048 size_t trackIndex;
1049 CHECK(msg->findSize("track-index", &trackIndex));
1050
1051 if (trackIndex >= mTracks.size()) {
1052 ALOGV("late packets ignored.");
1053 break;
1054 }
1055
1056 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1057
1058 int32_t eos;
1059 if (msg->findInt32("eos", &eos)) {
1060 ALOGI("received BYE on track index %zu", trackIndex);
1061 if (!mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1062 ALOGI("No time established => fake existing data");
1063
1064 track->mEOSReceived = true;
1065 mTryFakeRTCP = true;
1066 mReceivedFirstRTCPPacket = true;
1067 fakeTimestamps();
1068 } else {
1069 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1070 }
1071 return;
1072 }
1073
1074 if (mSeekPending) {
1075 ALOGV("we're seeking, dropping stale packet.");
1076 break;
1077 }
1078
1079 sp<ABuffer> accessUnit;
1080 CHECK(msg->findBuffer("access-unit", &accessUnit));
1081 onAccessUnitComplete(trackIndex, accessUnit);
1082 break;
1083 }
1084
1085 case 'paus':
1086 {
1087 int32_t generation;
1088 CHECK(msg->findInt32("pausecheck", &generation));
1089 if (generation != mPauseGeneration) {
1090 ALOGV("Ignoring outdated pause message.");
1091 break;
1092 }
1093
1094 if (!mSeekable) {
1095 ALOGW("This is a live stream, ignoring pause request.");
1096 break;
1097 }
1098
1099 if (mPausing) {
1100 ALOGV("This stream is already paused.");
1101 break;
1102 }
1103
1104 mCheckPending = true;
1105 ++mCheckGeneration;
1106 mPausing = true;
1107
1108 AString request = "PAUSE ";
1109 request.append(mControlURL);
1110 request.append(" RTSP/1.0\r\n");
1111
1112 request.append("Session: ");
1113 request.append(mSessionID);
1114 request.append("\r\n");
1115
1116 request.append("\r\n");
1117
1118 sp<AMessage> reply = new AMessage('pau2', this);
1119 mConn->sendRequest(request.c_str(), reply);
1120 break;
1121 }
1122
1123 case 'pau2':
1124 {
1125 int32_t result;
1126 CHECK(msg->findInt32("result", &result));
1127 mCheckTimeoutGeneration++;
1128
1129 ALOGI("PAUSE completed with result %d (%s)",
1130 result, strerror(-result));
1131 break;
1132 }
1133
1134 case 'resu':
1135 {
1136 if (mPausing && mSeekPending) {
1137 // If seeking, Play will be sent from see1 instead
1138 break;
1139 }
1140
1141 if (!mPausing) {
1142 // Dont send PLAY if we have not paused
1143 break;
1144 }
1145 AString request = "PLAY ";
1146 request.append(mControlURL);
1147 request.append(" RTSP/1.0\r\n");
1148
1149 request.append("Session: ");
1150 request.append(mSessionID);
1151 request.append("\r\n");
1152
1153 request.append("\r\n");
1154
1155 sp<AMessage> reply = new AMessage('res2', this);
1156 mConn->sendRequest(request.c_str(), reply);
1157 break;
1158 }
1159
1160 case 'res2':
1161 {
1162 int32_t result;
1163 CHECK(msg->findInt32("result", &result));
1164
1165 ALOGI("PLAY (for resume) completed with result %d (%s)",
1166 result, strerror(-result));
1167
1168 mCheckPending = false;
1169 ++mCheckGeneration;
1170 postAccessUnitTimeoutCheck();
1171
1172 if (result == OK) {
1173 sp<RefBase> obj;
1174 CHECK(msg->findObject("response", &obj));
1175 sp<ARTSPResponse> response =
1176 static_cast<ARTSPResponse *>(obj.get());
1177
1178 if (response->mStatusCode != 200) {
1179 result = UNKNOWN_ERROR;
1180 } else {
1181 parsePlayResponse(response);
1182
1183 // Post new timeout in order to make sure to use
1184 // fake timestamps if no new Sender Reports arrive
1185 postTimeout();
1186 }
1187 }
1188
1189 if (result != OK) {
1190 ALOGE("resume failed, aborting.");
1191 (new AMessage('abor', this))->post();
1192 }
1193
1194 mPausing = false;
1195 break;
1196 }
1197
1198 case 'seek':
1199 {
1200 if (!mSeekable) {
1201 ALOGW("This is a live stream, ignoring seek request.");
1202
1203 sp<AMessage> msg = mNotify->dup();
1204 msg->setInt32("what", kWhatSeekDone);
1205 msg->post();
1206 break;
1207 }
1208
1209 int64_t timeUs;
1210 CHECK(msg->findInt64("time", &timeUs));
1211
1212 mSeekPending = true;
1213
1214 // Disable the access unit timeout until we resumed
1215 // playback again.
1216 mCheckPending = true;
1217 ++mCheckGeneration;
1218
1219 sp<AMessage> reply = new AMessage('see0', this);
1220 reply->setInt64("time", timeUs);
1221
1222 if (mPausing) {
1223 // PAUSE already sent
1224 ALOGI("Pause already sent");
1225 reply->post();
1226 break;
1227 }
1228 AString request = "PAUSE ";
1229 request.append(mControlURL);
1230 request.append(" RTSP/1.0\r\n");
1231
1232 request.append("Session: ");
1233 request.append(mSessionID);
1234 request.append("\r\n");
1235
1236 request.append("\r\n");
1237
1238 mConn->sendRequest(request.c_str(), reply);
1239 break;
1240 }
1241
1242 case 'see0':
1243 {
1244 // Session is paused now.
1245 status_t err = OK;
1246 msg->findInt32("result", &err);
1247
1248 int64_t timeUs;
1249 CHECK(msg->findInt64("time", &timeUs));
1250
1251 sp<AMessage> notify = mNotify->dup();
1252 notify->setInt32("what", kWhatSeekPaused);
1253 notify->setInt32("err", err);
1254 notify->setInt64("time", timeUs);
1255 notify->post();
1256 break;
1257
1258 }
1259
1260 case 'see1':
1261 {
1262 for (size_t i = 0; i < mTracks.size(); ++i) {
1263 TrackInfo *info = &mTracks.editItemAt(i);
1264
1265 postQueueSeekDiscontinuity(i);
1266 info->mEOSReceived = false;
1267
1268 info->mRTPAnchor = 0;
1269 info->mNTPAnchorUs = -1;
1270 }
1271
1272 mAllTracksHaveTime = false;
1273 mNTPAnchorUs = -1;
1274
1275 // Start new timeoutgeneration to avoid getting timeout
1276 // before PLAY response arrive
1277 postTimeout();
1278
1279 int64_t timeUs;
1280 CHECK(msg->findInt64("time", &timeUs));
1281
1282 AString request = "PLAY ";
1283 request.append(mControlURL);
1284 request.append(" RTSP/1.0\r\n");
1285
1286 request.append("Session: ");
1287 request.append(mSessionID);
1288 request.append("\r\n");
1289
1290 request.append(
1291 AStringPrintf(
1292 "Range: npt=%lld-\r\n", timeUs / 1000000ll));
1293
1294 request.append("\r\n");
1295
1296 sp<AMessage> reply = new AMessage('see2', this);
1297 mConn->sendRequest(request.c_str(), reply);
1298 break;
1299 }
1300
1301 case 'see2':
1302 {
1303 if (mTracks.size() == 0) {
1304 // We have already hit abor, break
1305 break;
1306 }
1307
1308 int32_t result;
1309 CHECK(msg->findInt32("result", &result));
1310
1311 ALOGI("PLAY (for seek) completed with result %d (%s)",
1312 result, strerror(-result));
1313
1314 mCheckPending = false;
1315 ++mCheckGeneration;
1316 postAccessUnitTimeoutCheck();
1317
1318 if (result == OK) {
1319 sp<RefBase> obj;
1320 CHECK(msg->findObject("response", &obj));
1321 sp<ARTSPResponse> response =
1322 static_cast<ARTSPResponse *>(obj.get());
1323
1324 if (response->mStatusCode != 200) {
1325 result = UNKNOWN_ERROR;
1326 } else {
1327 parsePlayResponse(response);
1328
1329 // Post new timeout in order to make sure to use
1330 // fake timestamps if no new Sender Reports arrive
1331 postTimeout();
1332
1333 ssize_t i = response->mHeaders.indexOfKey("rtp-info");
1334 CHECK_GE(i, 0);
1335
1336 ALOGV("rtp-info: %s", response->mHeaders.valueAt(i).c_str());
1337
1338 mRTPConn->seekStream();
1339
1340 ALOGI("seek completed.");
1341 }
1342 }
1343
1344 if (result != OK) {
1345 ALOGE("seek failed, aborting.");
1346 (new AMessage('abor', this))->post();
1347 }
1348
1349 mPausing = false;
1350 mSeekPending = false;
1351
1352 // Discard all stale access units.
1353 for (size_t i = 0; i < mTracks.size(); ++i) {
1354 TrackInfo *track = &mTracks.editItemAt(i);
1355 track->mPackets.clear();
1356 }
1357
1358 sp<AMessage> msg = mNotify->dup();
1359 msg->setInt32("what", kWhatSeekDone);
1360 msg->post();
1361 break;
1362 }
1363
1364 case 'biny':
1365 {
1366 sp<ABuffer> buffer;
1367 CHECK(msg->findBuffer("buffer", &buffer));
1368
1369 int32_t index;
1370 CHECK(buffer->meta()->findInt32("index", &index));
1371
1372 mRTPConn->injectPacket(index, buffer);
1373 break;
1374 }
1375
1376 case 'tiou':
1377 {
1378 int32_t timeoutGenerationCheck;
1379 CHECK(msg->findInt32("tioucheck", &timeoutGenerationCheck));
1380 if (timeoutGenerationCheck != mCheckTimeoutGeneration) {
1381 // This is an outdated message. Ignore.
1382 // This typically happens if a lot of seeks are
1383 // performed, since new timeout messages now are
1384 // posted at seek as well.
1385 break;
1386 }
1387 if (!mReceivedFirstRTCPPacket) {
1388 if (dataReceivedOnAllChannels() && !mTryFakeRTCP) {
1389 ALOGW("We received RTP packets but no RTCP packets, "
1390 "using fake timestamps.");
1391
1392 mTryFakeRTCP = true;
1393
1394 mReceivedFirstRTCPPacket = true;
1395
1396 fakeTimestamps();
1397 } else if (!mReceivedFirstRTPPacket && !mTryTCPInterleaving) {
1398 ALOGW("Never received any data, switching transports.");
1399
1400 mTryTCPInterleaving = true;
1401
1402 sp<AMessage> msg = new AMessage('abor', this);
1403 msg->setInt32("reconnect", true);
1404 msg->post();
1405 } else {
1406 ALOGW("Never received any data, disconnecting.");
1407 (new AMessage('abor', this))->post();
1408 }
1409 } else {
1410 if (!mAllTracksHaveTime) {
1411 ALOGW("We received some RTCP packets, but time "
1412 "could not be established on all tracks, now "
1413 "using fake timestamps");
1414
1415 fakeTimestamps();
1416 }
1417 }
1418 break;
1419 }
1420
1421 default:
1422 TRESPASS();
1423 break;
1424 }
1425 }
1426
postKeepAliveMyHandler1427 void postKeepAlive() {
1428 sp<AMessage> msg = new AMessage('aliv', this);
1429 msg->setInt32("generation", mKeepAliveGeneration);
1430 msg->post((mKeepAliveTimeoutUs * 9) / 10);
1431 }
1432
cancelAccessUnitTimeoutCheckMyHandler1433 void cancelAccessUnitTimeoutCheck() {
1434 ALOGV("cancelAccessUnitTimeoutCheck");
1435 ++mCheckGeneration;
1436 }
1437
postAccessUnitTimeoutCheckMyHandler1438 void postAccessUnitTimeoutCheck() {
1439 if (mCheckPending) {
1440 return;
1441 }
1442
1443 mCheckPending = true;
1444 sp<AMessage> check = new AMessage('chek', this);
1445 check->setInt32("generation", mCheckGeneration);
1446 check->post(kAccessUnitTimeoutUs);
1447 }
1448
SplitStringMyHandler1449 static void SplitString(
1450 const AString &s, const char *separator, List<AString> *items) {
1451 items->clear();
1452 size_t start = 0;
1453 while (start < s.size()) {
1454 ssize_t offset = s.find(separator, start);
1455
1456 if (offset < 0) {
1457 items->push_back(AString(s, start, s.size() - start));
1458 break;
1459 }
1460
1461 items->push_back(AString(s, start, offset - start));
1462 start = offset + strlen(separator);
1463 }
1464 }
1465
parsePlayResponseMyHandler1466 void parsePlayResponse(const sp<ARTSPResponse> &response) {
1467 mPlayResponseParsed = true;
1468 if (mTracks.size() == 0) {
1469 ALOGV("parsePlayResponse: late packets ignored.");
1470 return;
1471 }
1472
1473 ssize_t i = response->mHeaders.indexOfKey("range");
1474 if (i < 0) {
1475 // Server doesn't even tell use what range it is going to
1476 // play, therefore we won't support seeking.
1477 return;
1478 }
1479
1480 AString range = response->mHeaders.valueAt(i);
1481 ALOGV("Range: %s", range.c_str());
1482
1483 AString val;
1484 CHECK(GetAttribute(range.c_str(), "npt", &val));
1485
1486 float npt1, npt2;
1487 if (!ASessionDescription::parseNTPRange(val.c_str(), &npt1, &npt2)) {
1488 // This is a live stream and therefore not seekable.
1489
1490 ALOGI("This is a live stream");
1491 return;
1492 }
1493
1494 i = response->mHeaders.indexOfKey("rtp-info");
1495 CHECK_GE(i, 0);
1496
1497 AString rtpInfo = response->mHeaders.valueAt(i);
1498 List<AString> streamInfos;
1499 SplitString(rtpInfo, ",", &streamInfos);
1500
1501 int n = 1;
1502 for (List<AString>::iterator it = streamInfos.begin();
1503 it != streamInfos.end(); ++it) {
1504 (*it).trim();
1505 ALOGV("streamInfo[%d] = %s", n, (*it).c_str());
1506
1507 CHECK(GetAttribute((*it).c_str(), "url", &val));
1508
1509 size_t trackIndex = 0;
1510 while (trackIndex < mTracks.size()
1511 && !(val == mTracks.editItemAt(trackIndex).mURL)) {
1512 ++trackIndex;
1513 }
1514 CHECK_LT(trackIndex, mTracks.size());
1515
1516 CHECK(GetAttribute((*it).c_str(), "seq", &val));
1517
1518 char *end;
1519 unsigned long seq = strtoul(val.c_str(), &end, 10);
1520
1521 TrackInfo *info = &mTracks.editItemAt(trackIndex);
1522 info->mFirstSeqNumInSegment = seq;
1523 info->mNewSegment = true;
1524 info->mAllowedStaleAccessUnits = GetMaxAllowedStaleCount(info->mIsVideo);
1525
1526 CHECK(GetAttribute((*it).c_str(), "rtptime", &val));
1527
1528 uint32_t rtpTime = strtoul(val.c_str(), &end, 10);
1529
1530 ALOGV("track #%d: rtpTime=%u <=> npt=%.2f", n, rtpTime, npt1);
1531
1532 info->mNormalPlayTimeRTP = rtpTime;
1533 info->mNormalPlayTimeUs = (int64_t)(npt1 * 1E6);
1534
1535 if (!mFirstAccessUnit) {
1536 postNormalPlayTimeMapping(
1537 trackIndex,
1538 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1539 }
1540
1541 ++n;
1542 }
1543 }
1544
getTrackFormatMyHandler1545 sp<MetaData> getTrackFormat(size_t index, int32_t *timeScale) {
1546 CHECK_GE(index, 0u);
1547 CHECK_LT(index, mTracks.size());
1548
1549 const TrackInfo &info = mTracks.itemAt(index);
1550
1551 *timeScale = info.mTimeScale;
1552
1553 return info.mPacketSource->getFormat();
1554 }
1555
countTracksMyHandler1556 size_t countTracks() const {
1557 return mTracks.size();
1558 }
1559
1560 private:
1561 struct TrackInfo {
1562 AString mURL;
1563 int mRTPSocket;
1564 int mRTCPSocket;
1565 bool mUsingInterleavedTCP;
1566 bool mIsVideo;
1567 uint32_t mFirstSeqNumInSegment;
1568 bool mNewSegment;
1569 int32_t mAllowedStaleAccessUnits;
1570
1571 uint32_t mRTPAnchor;
1572 int64_t mNTPAnchorUs;
1573 int32_t mTimeScale;
1574 bool mEOSReceived;
1575
1576 uint32_t mNormalPlayTimeRTP;
1577 int64_t mNormalPlayTimeUs;
1578
1579 sp<APacketSource> mPacketSource;
1580
1581 // Stores packets temporarily while no notion of time
1582 // has been established yet.
1583 List<sp<ABuffer> > mPackets;
1584 };
1585
1586 sp<AMessage> mNotify;
1587 bool mUIDValid;
1588 uid_t mUID;
1589 sp<ALooper> mNetLooper;
1590 sp<ARTSPConnection> mConn;
1591 sp<ARTPConnection> mRTPConn;
1592 sp<ASessionDescription> mSessionDesc;
1593 AString mOriginalSessionURL; // This one still has user:pass@
1594 AString mSessionURL;
1595 AString mSessionHost;
1596 AString mBaseURL;
1597 AString mControlURL;
1598 AString mSessionID;
1599 bool mSetupTracksSuccessful;
1600 bool mSeekPending;
1601 bool mFirstAccessUnit;
1602
1603 bool mAllTracksHaveTime;
1604 int64_t mNTPAnchorUs;
1605 int64_t mMediaAnchorUs;
1606 int64_t mLastMediaTimeUs;
1607
1608 int64_t mNumAccessUnitsReceived;
1609 bool mCheckPending;
1610 int32_t mCheckGeneration;
1611 int32_t mCheckTimeoutGeneration;
1612 bool mTryTCPInterleaving;
1613 bool mTryFakeRTCP;
1614 bool mReceivedFirstRTCPPacket;
1615 bool mReceivedFirstRTPPacket;
1616 bool mSeekable;
1617 int64_t mKeepAliveTimeoutUs;
1618 int32_t mKeepAliveGeneration;
1619 bool mPausing;
1620 int32_t mPauseGeneration;
1621
1622 Vector<TrackInfo> mTracks;
1623
1624 bool mPlayResponseParsed;
1625
setupTrackMyHandler1626 void setupTrack(size_t index) {
1627 sp<APacketSource> source =
1628 new APacketSource(mSessionDesc, index);
1629
1630 if (source->initCheck() != OK) {
1631 ALOGW("Unsupported format. Ignoring track #%zu.", index);
1632
1633 sp<AMessage> reply = new AMessage('setu', this);
1634 reply->setSize("index", index);
1635 reply->setInt32("result", ERROR_UNSUPPORTED);
1636 reply->post();
1637 return;
1638 }
1639
1640 AString url;
1641 CHECK(mSessionDesc->findAttribute(index, "a=control", &url));
1642
1643 AString trackURL;
1644 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL));
1645
1646 mTracks.push(TrackInfo());
1647 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
1648 info->mURL = trackURL;
1649 info->mPacketSource = source;
1650 info->mUsingInterleavedTCP = false;
1651 info->mIsVideo = source->isVideo();
1652 info->mFirstSeqNumInSegment = 0;
1653 info->mNewSegment = true;
1654 info->mAllowedStaleAccessUnits = GetMaxAllowedStaleCount(info->mIsVideo);
1655 info->mRTPSocket = -1;
1656 info->mRTCPSocket = -1;
1657 info->mRTPAnchor = 0;
1658 info->mNTPAnchorUs = -1;
1659 info->mNormalPlayTimeRTP = 0;
1660 info->mNormalPlayTimeUs = 0ll;
1661
1662 unsigned long PT;
1663 AString formatDesc;
1664 AString formatParams;
1665 mSessionDesc->getFormatType(index, &PT, &formatDesc, &formatParams);
1666
1667 int32_t timescale;
1668 int32_t numChannels;
1669 ASessionDescription::ParseFormatDesc(
1670 formatDesc.c_str(), ×cale, &numChannels);
1671
1672 info->mTimeScale = timescale;
1673 info->mEOSReceived = false;
1674
1675 ALOGV("track #%zu URL=%s", mTracks.size(), trackURL.c_str());
1676
1677 AString request = "SETUP ";
1678 request.append(trackURL);
1679 request.append(" RTSP/1.0\r\n");
1680
1681 if (mTryTCPInterleaving) {
1682 size_t interleaveIndex = 2 * (mTracks.size() - 1);
1683 info->mUsingInterleavedTCP = true;
1684 info->mRTPSocket = interleaveIndex;
1685 info->mRTCPSocket = interleaveIndex + 1;
1686
1687 request.append("Transport: RTP/AVP/TCP;interleaved=");
1688 request.append(interleaveIndex);
1689 request.append("-");
1690 request.append(interleaveIndex + 1);
1691 } else {
1692 unsigned rtpPort;
1693 ARTPConnection::MakePortPair(
1694 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
1695
1696 if (mUIDValid) {
1697 NetworkUtils::RegisterSocketUserTag(info->mRTPSocket, mUID,
1698 (uint32_t)*(uint32_t*) "RTP_");
1699 NetworkUtils::RegisterSocketUserTag(info->mRTCPSocket, mUID,
1700 (uint32_t)*(uint32_t*) "RTP_");
1701 NetworkUtils::RegisterSocketUserMark(info->mRTPSocket, mUID);
1702 NetworkUtils::RegisterSocketUserMark(info->mRTCPSocket, mUID);
1703 }
1704
1705 request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
1706 request.append(rtpPort);
1707 request.append("-");
1708 request.append(rtpPort + 1);
1709 }
1710
1711 request.append("\r\n");
1712
1713 if (index > 1) {
1714 request.append("Session: ");
1715 request.append(mSessionID);
1716 request.append("\r\n");
1717 }
1718
1719 request.append("\r\n");
1720
1721 sp<AMessage> reply = new AMessage('setu', this);
1722 reply->setSize("index", index);
1723 reply->setSize("track-index", mTracks.size() - 1);
1724 mConn->sendRequest(request.c_str(), reply);
1725 }
1726
MakeURLMyHandler1727 static bool MakeURL(const char *baseURL, const char *url, AString *out) {
1728 out->clear();
1729
1730 if (strncasecmp("rtsp://", baseURL, 7)) {
1731 // Base URL must be absolute
1732 return false;
1733 }
1734
1735 if (!strncasecmp("rtsp://", url, 7)) {
1736 // "url" is already an absolute URL, ignore base URL.
1737 out->setTo(url);
1738 return true;
1739 }
1740
1741 size_t n = strlen(baseURL);
1742 out->setTo(baseURL);
1743 if (baseURL[n - 1] != '/') {
1744 out->append("/");
1745 }
1746 out->append(url);
1747
1748 return true;
1749 }
1750
fakeTimestampsMyHandler1751 void fakeTimestamps() {
1752 mNTPAnchorUs = -1ll;
1753 for (size_t i = 0; i < mTracks.size(); ++i) {
1754 onTimeUpdate(i, 0, 0ll);
1755 }
1756 }
1757
dataReceivedOnAllChannelsMyHandler1758 bool dataReceivedOnAllChannels() {
1759 TrackInfo *track;
1760 for (size_t i = 0; i < mTracks.size(); ++i) {
1761 track = &mTracks.editItemAt(i);
1762 if (track->mPackets.empty()) {
1763 return false;
1764 }
1765 }
1766 return true;
1767 }
1768
handleFirstAccessUnitMyHandler1769 void handleFirstAccessUnit() {
1770 if (mFirstAccessUnit) {
1771 sp<AMessage> msg = mNotify->dup();
1772 msg->setInt32("what", kWhatConnected);
1773 msg->post();
1774
1775 if (mSeekable) {
1776 for (size_t i = 0; i < mTracks.size(); ++i) {
1777 TrackInfo *info = &mTracks.editItemAt(i);
1778
1779 postNormalPlayTimeMapping(
1780 i,
1781 info->mNormalPlayTimeRTP, info->mNormalPlayTimeUs);
1782 }
1783 }
1784
1785 mFirstAccessUnit = false;
1786 }
1787 }
1788
onTimeUpdateMyHandler1789 void onTimeUpdate(int32_t trackIndex, uint32_t rtpTime, uint64_t ntpTime) {
1790 ALOGV("onTimeUpdate track %d, rtpTime = 0x%08x, ntpTime = %#016llx",
1791 trackIndex, rtpTime, (long long)ntpTime);
1792
1793 int64_t ntpTimeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32));
1794
1795 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1796
1797 track->mRTPAnchor = rtpTime;
1798 track->mNTPAnchorUs = ntpTimeUs;
1799
1800 if (mNTPAnchorUs < 0) {
1801 mNTPAnchorUs = ntpTimeUs;
1802 mMediaAnchorUs = mLastMediaTimeUs;
1803 }
1804
1805 if (!mAllTracksHaveTime) {
1806 bool allTracksHaveTime = (mTracks.size() > 0);
1807 for (size_t i = 0; i < mTracks.size(); ++i) {
1808 TrackInfo *track = &mTracks.editItemAt(i);
1809 if (track->mNTPAnchorUs < 0) {
1810 allTracksHaveTime = false;
1811 break;
1812 }
1813 }
1814 if (allTracksHaveTime) {
1815 mAllTracksHaveTime = true;
1816 ALOGI("Time now established for all tracks.");
1817 }
1818 }
1819 if (mAllTracksHaveTime && dataReceivedOnAllChannels()) {
1820 handleFirstAccessUnit();
1821
1822 // Time is now established, lets start timestamping immediately
1823 for (size_t i = 0; i < mTracks.size(); ++i) {
1824 if (OK != processAccessUnitQueue(i)) {
1825 return;
1826 }
1827 }
1828 for (size_t i = 0; i < mTracks.size(); ++i) {
1829 TrackInfo *trackInfo = &mTracks.editItemAt(i);
1830 if (trackInfo->mEOSReceived) {
1831 postQueueEOS(i, ERROR_END_OF_STREAM);
1832 trackInfo->mEOSReceived = false;
1833 }
1834 }
1835 }
1836 }
1837
processAccessUnitQueueMyHandler1838 status_t processAccessUnitQueue(int32_t trackIndex) {
1839 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1840 while (!track->mPackets.empty()) {
1841 sp<ABuffer> accessUnit = *track->mPackets.begin();
1842 track->mPackets.erase(track->mPackets.begin());
1843
1844 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1845 if (track->mNewSegment) {
1846 // The sequence number from RTP packet has only 16 bits and is extended
1847 // by ARTPSource. Only the low 16 bits of seq in RTP-Info of reply of
1848 // RTSP "PLAY" command should be used to detect the first RTP packet
1849 // after seeking.
1850 int32_t maxAllowedStaleAccessUnits = GetMaxAllowedStaleCount(track->mIsVideo);
1851 if (mSeekable) {
1852 if (track->mAllowedStaleAccessUnits > 0) {
1853 uint32_t seqNum16 = seqNum & 0xffff;
1854 uint32_t firstSeqNumInSegment16 = track->mFirstSeqNumInSegment & 0xffff;
1855 if (seqNum16 > firstSeqNumInSegment16 + maxAllowedStaleAccessUnits
1856 || seqNum16 < firstSeqNumInSegment16) {
1857 // Not the first rtp packet of the stream after seeking, discarding.
1858 track->mAllowedStaleAccessUnits--;
1859 ALOGV("discarding stale access unit (0x%x : 0x%x)",
1860 seqNum, track->mFirstSeqNumInSegment);
1861 continue;
1862 }
1863 ALOGW_IF(seqNum16 != firstSeqNumInSegment16,
1864 "Missing the first packet(%u), now take packet(%u) as first one",
1865 track->mFirstSeqNumInSegment, seqNum);
1866 } else { // track->mAllowedStaleAccessUnits <= 0
1867 mNumAccessUnitsReceived = 0;
1868 ALOGW_IF(track->mAllowedStaleAccessUnits == 0,
1869 "Still no first rtp packet after %d stale ones",
1870 maxAllowedStaleAccessUnits);
1871 track->mAllowedStaleAccessUnits = -1;
1872 return UNKNOWN_ERROR;
1873 }
1874 }
1875
1876 // Now found the first rtp packet of the stream after seeking.
1877 track->mFirstSeqNumInSegment = seqNum;
1878 track->mNewSegment = false;
1879 }
1880
1881 if (seqNum < track->mFirstSeqNumInSegment) {
1882 ALOGV("dropping stale access-unit (%d < %d)",
1883 seqNum, track->mFirstSeqNumInSegment);
1884 continue;
1885 }
1886
1887 if (addMediaTimestamp(trackIndex, track, accessUnit)) {
1888 postQueueAccessUnit(trackIndex, accessUnit);
1889 }
1890 }
1891 return OK;
1892 }
1893
onAccessUnitCompleteMyHandler1894 void onAccessUnitComplete(
1895 int32_t trackIndex, const sp<ABuffer> &accessUnit) {
1896 TrackInfo *track = &mTracks.editItemAt(trackIndex);
1897 track->mPackets.push_back(accessUnit);
1898
1899 uint32_t seqNum = (uint32_t)accessUnit->int32Data();
1900 ALOGV("onAccessUnitComplete track %d storing accessunit %u", trackIndex, seqNum);
1901
1902 if(!mPlayResponseParsed){
1903 ALOGV("play response is not parsed");
1904 return;
1905 }
1906
1907 handleFirstAccessUnit();
1908
1909 if (!mAllTracksHaveTime) {
1910 ALOGV("storing accessUnit, no time established yet");
1911 return;
1912 }
1913
1914 if (OK != processAccessUnitQueue(trackIndex)) {
1915 return;
1916 }
1917
1918 if (track->mEOSReceived) {
1919 postQueueEOS(trackIndex, ERROR_END_OF_STREAM);
1920 track->mEOSReceived = false;
1921 }
1922 }
1923
addMediaTimestampMyHandler1924 bool addMediaTimestamp(
1925 int32_t trackIndex, const TrackInfo *track,
1926 const sp<ABuffer> &accessUnit) {
1927 UNUSED_UNLESS_VERBOSE(trackIndex);
1928
1929 uint32_t rtpTime;
1930 CHECK(accessUnit->meta()->findInt32(
1931 "rtp-time", (int32_t *)&rtpTime));
1932
1933 int64_t relRtpTimeUs =
1934 (((int64_t)rtpTime - (int64_t)track->mRTPAnchor) * 1000000ll)
1935 / track->mTimeScale;
1936
1937 int64_t ntpTimeUs = track->mNTPAnchorUs + relRtpTimeUs;
1938
1939 int64_t mediaTimeUs = mMediaAnchorUs + ntpTimeUs - mNTPAnchorUs;
1940
1941 if (mediaTimeUs > mLastMediaTimeUs) {
1942 mLastMediaTimeUs = mediaTimeUs;
1943 }
1944
1945 if (mediaTimeUs < 0 && !mSeekable) {
1946 ALOGV("dropping early accessUnit.");
1947 return false;
1948 }
1949
1950 ALOGV("track %d rtpTime=%u mediaTimeUs = %lld us (%.2f secs)",
1951 trackIndex, rtpTime, (long long)mediaTimeUs, mediaTimeUs / 1E6);
1952
1953 accessUnit->meta()->setInt64("timeUs", mediaTimeUs);
1954
1955 return true;
1956 }
1957
postQueueAccessUnitMyHandler1958 void postQueueAccessUnit(
1959 size_t trackIndex, const sp<ABuffer> &accessUnit) {
1960 sp<AMessage> msg = mNotify->dup();
1961 msg->setInt32("what", kWhatAccessUnit);
1962 msg->setSize("trackIndex", trackIndex);
1963 msg->setBuffer("accessUnit", accessUnit);
1964 msg->post();
1965 }
1966
postQueueEOSMyHandler1967 void postQueueEOS(size_t trackIndex, status_t finalResult) {
1968 sp<AMessage> msg = mNotify->dup();
1969 msg->setInt32("what", kWhatEOS);
1970 msg->setSize("trackIndex", trackIndex);
1971 msg->setInt32("finalResult", finalResult);
1972 msg->post();
1973 }
1974
postQueueSeekDiscontinuityMyHandler1975 void postQueueSeekDiscontinuity(size_t trackIndex) {
1976 sp<AMessage> msg = mNotify->dup();
1977 msg->setInt32("what", kWhatSeekDiscontinuity);
1978 msg->setSize("trackIndex", trackIndex);
1979 msg->post();
1980 }
1981
postNormalPlayTimeMappingMyHandler1982 void postNormalPlayTimeMapping(
1983 size_t trackIndex, uint32_t rtpTime, int64_t nptUs) {
1984 sp<AMessage> msg = mNotify->dup();
1985 msg->setInt32("what", kWhatNormalPlayTimeMapping);
1986 msg->setSize("trackIndex", trackIndex);
1987 msg->setInt32("rtpTime", rtpTime);
1988 msg->setInt64("nptUs", nptUs);
1989 msg->post();
1990 }
1991
postTimeoutMyHandler1992 void postTimeout() {
1993 sp<AMessage> timeout = new AMessage('tiou', this);
1994 mCheckTimeoutGeneration++;
1995 timeout->setInt32("tioucheck", mCheckTimeoutGeneration);
1996
1997 int64_t startupTimeoutUs;
1998 startupTimeoutUs = property_get_int64("media.rtsp.timeout-us", kStartupTimeoutUs);
1999 timeout->post(startupTimeoutUs);
2000 }
2001
2002 DISALLOW_EVIL_CONSTRUCTORS(MyHandler);
2003 };
2004
2005 } // namespace android
2006
2007 #endif // MY_HANDLER_H_
2008