1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20
21 #include "ARTPConnection.h"
22
23 #include "ARTPSource.h"
24 #include "ASessionDescription.h"
25
26 #include <media/stagefright/foundation/ABuffer.h>
27 #include <media/stagefright/foundation/ADebug.h>
28 #include <media/stagefright/foundation/AMessage.h>
29 #include <media/stagefright/foundation/AString.h>
30 #include <media/stagefright/foundation/hexdump.h>
31
32 #include <arpa/inet.h>
33 #include <sys/socket.h>
34
35 namespace android {
36
37 static const size_t kMaxUDPSize = 1500;
38
u16at(const uint8_t * data)39 static uint16_t u16at(const uint8_t *data) {
40 return data[0] << 8 | data[1];
41 }
42
u32at(const uint8_t * data)43 static uint32_t u32at(const uint8_t *data) {
44 return u16at(data) << 16 | u16at(&data[2]);
45 }
46
u64at(const uint8_t * data)47 static uint64_t u64at(const uint8_t *data) {
48 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
49 }
50
51 // static
52 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
53
54 struct ARTPConnection::StreamInfo {
55 int mRTPSocket;
56 int mRTCPSocket;
57 sp<ASessionDescription> mSessionDesc;
58 size_t mIndex;
59 sp<AMessage> mNotifyMsg;
60 KeyedVector<uint32_t, sp<ARTPSource> > mSources;
61
62 int64_t mNumRTCPPacketsReceived;
63 int64_t mNumRTPPacketsReceived;
64 struct sockaddr_in mRemoteRTCPAddr;
65
66 bool mIsInjected;
67 };
68
ARTPConnection(uint32_t flags)69 ARTPConnection::ARTPConnection(uint32_t flags)
70 : mFlags(flags),
71 mPollEventPending(false),
72 mLastReceiverReportTimeUs(-1) {
73 }
74
~ARTPConnection()75 ARTPConnection::~ARTPConnection() {
76 }
77
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)78 void ARTPConnection::addStream(
79 int rtpSocket, int rtcpSocket,
80 const sp<ASessionDescription> &sessionDesc,
81 size_t index,
82 const sp<AMessage> ¬ify,
83 bool injected) {
84 sp<AMessage> msg = new AMessage(kWhatAddStream, id());
85 msg->setInt32("rtp-socket", rtpSocket);
86 msg->setInt32("rtcp-socket", rtcpSocket);
87 msg->setObject("session-desc", sessionDesc);
88 msg->setSize("index", index);
89 msg->setMessage("notify", notify);
90 msg->setInt32("injected", injected);
91 msg->post();
92 }
93
removeStream(int rtpSocket,int rtcpSocket)94 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
95 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
96 msg->setInt32("rtp-socket", rtpSocket);
97 msg->setInt32("rtcp-socket", rtcpSocket);
98 msg->post();
99 }
100
bumpSocketBufferSize(int s)101 static void bumpSocketBufferSize(int s) {
102 int size = 256 * 1024;
103 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
104 }
105
106 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)107 void ARTPConnection::MakePortPair(
108 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
109 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
110 CHECK_GE(*rtpSocket, 0);
111
112 bumpSocketBufferSize(*rtpSocket);
113
114 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
115 CHECK_GE(*rtcpSocket, 0);
116
117 bumpSocketBufferSize(*rtcpSocket);
118
119 unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
120 start &= ~1;
121
122 for (unsigned port = start; port < 65536; port += 2) {
123 struct sockaddr_in addr;
124 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
125 addr.sin_family = AF_INET;
126 addr.sin_addr.s_addr = htonl(INADDR_ANY);
127 addr.sin_port = htons(port);
128
129 if (bind(*rtpSocket,
130 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
131 continue;
132 }
133
134 addr.sin_port = htons(port + 1);
135
136 if (bind(*rtcpSocket,
137 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
138 *rtpPort = port;
139 return;
140 }
141 }
142
143 TRESPASS();
144 }
145
onMessageReceived(const sp<AMessage> & msg)146 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
147 switch (msg->what()) {
148 case kWhatAddStream:
149 {
150 onAddStream(msg);
151 break;
152 }
153
154 case kWhatRemoveStream:
155 {
156 onRemoveStream(msg);
157 break;
158 }
159
160 case kWhatPollStreams:
161 {
162 onPollStreams();
163 break;
164 }
165
166 case kWhatInjectPacket:
167 {
168 onInjectPacket(msg);
169 break;
170 }
171
172 default:
173 {
174 TRESPASS();
175 break;
176 }
177 }
178 }
179
onAddStream(const sp<AMessage> & msg)180 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
181 mStreams.push_back(StreamInfo());
182 StreamInfo *info = &*--mStreams.end();
183
184 int32_t s;
185 CHECK(msg->findInt32("rtp-socket", &s));
186 info->mRTPSocket = s;
187 CHECK(msg->findInt32("rtcp-socket", &s));
188 info->mRTCPSocket = s;
189
190 int32_t injected;
191 CHECK(msg->findInt32("injected", &injected));
192
193 info->mIsInjected = injected;
194
195 sp<RefBase> obj;
196 CHECK(msg->findObject("session-desc", &obj));
197 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
198
199 CHECK(msg->findSize("index", &info->mIndex));
200 CHECK(msg->findMessage("notify", &info->mNotifyMsg));
201
202 info->mNumRTCPPacketsReceived = 0;
203 info->mNumRTPPacketsReceived = 0;
204 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
205
206 if (!injected) {
207 postPollEvent();
208 }
209 }
210
onRemoveStream(const sp<AMessage> & msg)211 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
212 int32_t rtpSocket, rtcpSocket;
213 CHECK(msg->findInt32("rtp-socket", &rtpSocket));
214 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
215
216 List<StreamInfo>::iterator it = mStreams.begin();
217 while (it != mStreams.end()
218 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
219 ++it;
220 }
221
222 if (it == mStreams.end()) {
223 return;
224 }
225
226 mStreams.erase(it);
227 }
228
postPollEvent()229 void ARTPConnection::postPollEvent() {
230 if (mPollEventPending) {
231 return;
232 }
233
234 sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
235 msg->post();
236
237 mPollEventPending = true;
238 }
239
onPollStreams()240 void ARTPConnection::onPollStreams() {
241 mPollEventPending = false;
242
243 if (mStreams.empty()) {
244 return;
245 }
246
247 struct timeval tv;
248 tv.tv_sec = 0;
249 tv.tv_usec = kSelectTimeoutUs;
250
251 fd_set rs;
252 FD_ZERO(&rs);
253
254 int maxSocket = -1;
255 for (List<StreamInfo>::iterator it = mStreams.begin();
256 it != mStreams.end(); ++it) {
257 if ((*it).mIsInjected) {
258 continue;
259 }
260
261 FD_SET(it->mRTPSocket, &rs);
262 FD_SET(it->mRTCPSocket, &rs);
263
264 if (it->mRTPSocket > maxSocket) {
265 maxSocket = it->mRTPSocket;
266 }
267 if (it->mRTCPSocket > maxSocket) {
268 maxSocket = it->mRTCPSocket;
269 }
270 }
271
272 if (maxSocket == -1) {
273 return;
274 }
275
276 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
277
278 if (res > 0) {
279 List<StreamInfo>::iterator it = mStreams.begin();
280 while (it != mStreams.end()) {
281 if ((*it).mIsInjected) {
282 ++it;
283 continue;
284 }
285
286 status_t err = OK;
287 if (FD_ISSET(it->mRTPSocket, &rs)) {
288 err = receive(&*it, true);
289 }
290 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
291 err = receive(&*it, false);
292 }
293
294 if (err == -ECONNRESET) {
295 // socket failure, this stream is dead, Jim.
296
297 ALOGW("failed to receive RTP/RTCP datagram.");
298 it = mStreams.erase(it);
299 continue;
300 }
301
302 ++it;
303 }
304 }
305
306 int64_t nowUs = ALooper::GetNowUs();
307 if (mLastReceiverReportTimeUs <= 0
308 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
309 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
310 List<StreamInfo>::iterator it = mStreams.begin();
311 while (it != mStreams.end()) {
312 StreamInfo *s = &*it;
313
314 if (s->mIsInjected) {
315 ++it;
316 continue;
317 }
318
319 if (s->mNumRTCPPacketsReceived == 0) {
320 // We have never received any RTCP packets on this stream,
321 // we don't even know where to send a report.
322 ++it;
323 continue;
324 }
325
326 buffer->setRange(0, 0);
327
328 for (size_t i = 0; i < s->mSources.size(); ++i) {
329 sp<ARTPSource> source = s->mSources.valueAt(i);
330
331 source->addReceiverReport(buffer);
332
333 if (mFlags & kRegularlyRequestFIR) {
334 source->addFIR(buffer);
335 }
336 }
337
338 if (buffer->size() > 0) {
339 ALOGV("Sending RR...");
340
341 ssize_t n;
342 do {
343 n = sendto(
344 s->mRTCPSocket, buffer->data(), buffer->size(), 0,
345 (const struct sockaddr *)&s->mRemoteRTCPAddr,
346 sizeof(s->mRemoteRTCPAddr));
347 } while (n < 0 && errno == EINTR);
348
349 if (n <= 0) {
350 ALOGW("failed to send RTCP receiver report (%s).",
351 n == 0 ? "connection gone" : strerror(errno));
352
353 it = mStreams.erase(it);
354 continue;
355 }
356
357 CHECK_EQ(n, (ssize_t)buffer->size());
358
359 mLastReceiverReportTimeUs = nowUs;
360 }
361
362 ++it;
363 }
364 }
365
366 if (!mStreams.empty()) {
367 postPollEvent();
368 }
369 }
370
receive(StreamInfo * s,bool receiveRTP)371 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
372 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
373
374 CHECK(!s->mIsInjected);
375
376 sp<ABuffer> buffer = new ABuffer(65536);
377
378 socklen_t remoteAddrLen =
379 (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
380 ? sizeof(s->mRemoteRTCPAddr) : 0;
381
382 ssize_t nbytes;
383 do {
384 nbytes = recvfrom(
385 receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
386 buffer->data(),
387 buffer->capacity(),
388 0,
389 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
390 remoteAddrLen > 0 ? &remoteAddrLen : NULL);
391 } while (nbytes < 0 && errno == EINTR);
392
393 if (nbytes <= 0) {
394 return -ECONNRESET;
395 }
396
397 buffer->setRange(0, nbytes);
398
399 // ALOGI("received %d bytes.", buffer->size());
400
401 status_t err;
402 if (receiveRTP) {
403 err = parseRTP(s, buffer);
404 } else {
405 err = parseRTCP(s, buffer);
406 }
407
408 return err;
409 }
410
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)411 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
412 if (s->mNumRTPPacketsReceived++ == 0) {
413 sp<AMessage> notify = s->mNotifyMsg->dup();
414 notify->setInt32("first-rtp", true);
415 notify->post();
416 }
417
418 size_t size = buffer->size();
419
420 if (size < 12) {
421 // Too short to be a valid RTP header.
422 return -1;
423 }
424
425 const uint8_t *data = buffer->data();
426
427 if ((data[0] >> 6) != 2) {
428 // Unsupported version.
429 return -1;
430 }
431
432 if (data[0] & 0x20) {
433 // Padding present.
434
435 size_t paddingLength = data[size - 1];
436
437 if (paddingLength + 12 > size) {
438 // If we removed this much padding we'd end up with something
439 // that's too short to be a valid RTP header.
440 return -1;
441 }
442
443 size -= paddingLength;
444 }
445
446 int numCSRCs = data[0] & 0x0f;
447
448 size_t payloadOffset = 12 + 4 * numCSRCs;
449
450 if (size < payloadOffset) {
451 // Not enough data to fit the basic header and all the CSRC entries.
452 return -1;
453 }
454
455 if (data[0] & 0x10) {
456 // Header eXtension present.
457
458 if (size < payloadOffset + 4) {
459 // Not enough data to fit the basic header, all CSRC entries
460 // and the first 4 bytes of the extension header.
461
462 return -1;
463 }
464
465 const uint8_t *extensionData = &data[payloadOffset];
466
467 size_t extensionLength =
468 4 * (extensionData[2] << 8 | extensionData[3]);
469
470 if (size < payloadOffset + 4 + extensionLength) {
471 return -1;
472 }
473
474 payloadOffset += 4 + extensionLength;
475 }
476
477 uint32_t srcId = u32at(&data[8]);
478
479 sp<ARTPSource> source = findSource(s, srcId);
480
481 uint32_t rtpTime = u32at(&data[4]);
482
483 sp<AMessage> meta = buffer->meta();
484 meta->setInt32("ssrc", srcId);
485 meta->setInt32("rtp-time", rtpTime);
486 meta->setInt32("PT", data[1] & 0x7f);
487 meta->setInt32("M", data[1] >> 7);
488
489 buffer->setInt32Data(u16at(&data[2]));
490 buffer->setRange(payloadOffset, size - payloadOffset);
491
492 source->processRTPPacket(buffer);
493
494 return OK;
495 }
496
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)497 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
498 if (s->mNumRTCPPacketsReceived++ == 0) {
499 sp<AMessage> notify = s->mNotifyMsg->dup();
500 notify->setInt32("first-rtcp", true);
501 notify->post();
502 }
503
504 const uint8_t *data = buffer->data();
505 size_t size = buffer->size();
506
507 while (size > 0) {
508 if (size < 8) {
509 // Too short to be a valid RTCP header
510 return -1;
511 }
512
513 if ((data[0] >> 6) != 2) {
514 // Unsupported version.
515 return -1;
516 }
517
518 if (data[0] & 0x20) {
519 // Padding present.
520
521 size_t paddingLength = data[size - 1];
522
523 if (paddingLength + 12 > size) {
524 // If we removed this much padding we'd end up with something
525 // that's too short to be a valid RTP header.
526 return -1;
527 }
528
529 size -= paddingLength;
530 }
531
532 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
533
534 if (size < headerLength) {
535 // Only received a partial packet?
536 return -1;
537 }
538
539 switch (data[1]) {
540 case 200:
541 {
542 parseSR(s, data, headerLength);
543 break;
544 }
545
546 case 201: // RR
547 case 202: // SDES
548 case 204: // APP
549 break;
550
551 case 205: // TSFB (transport layer specific feedback)
552 case 206: // PSFB (payload specific feedback)
553 // hexdump(data, headerLength);
554 break;
555
556 case 203:
557 {
558 parseBYE(s, data, headerLength);
559 break;
560 }
561
562 default:
563 {
564 ALOGW("Unknown RTCP packet type %u of size %d",
565 (unsigned)data[1], headerLength);
566 break;
567 }
568 }
569
570 data += headerLength;
571 size -= headerLength;
572 }
573
574 return OK;
575 }
576
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)577 status_t ARTPConnection::parseBYE(
578 StreamInfo *s, const uint8_t *data, size_t size) {
579 size_t SC = data[0] & 0x3f;
580
581 if (SC == 0 || size < (4 + SC * 4)) {
582 // Packet too short for the minimal BYE header.
583 return -1;
584 }
585
586 uint32_t id = u32at(&data[4]);
587
588 sp<ARTPSource> source = findSource(s, id);
589
590 source->byeReceived();
591
592 return OK;
593 }
594
parseSR(StreamInfo * s,const uint8_t * data,size_t size)595 status_t ARTPConnection::parseSR(
596 StreamInfo *s, const uint8_t *data, size_t size) {
597 size_t RC = data[0] & 0x1f;
598
599 if (size < (7 + RC * 6) * 4) {
600 // Packet too short for the minimal SR header.
601 return -1;
602 }
603
604 uint32_t id = u32at(&data[4]);
605 uint64_t ntpTime = u64at(&data[8]);
606 uint32_t rtpTime = u32at(&data[16]);
607
608 #if 0
609 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
610 id,
611 rtpTime,
612 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
613 #endif
614
615 sp<ARTPSource> source = findSource(s, id);
616
617 source->timeUpdate(rtpTime, ntpTime);
618
619 return 0;
620 }
621
findSource(StreamInfo * info,uint32_t srcId)622 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
623 sp<ARTPSource> source;
624 ssize_t index = info->mSources.indexOfKey(srcId);
625 if (index < 0) {
626 index = info->mSources.size();
627
628 source = new ARTPSource(
629 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
630
631 info->mSources.add(srcId, source);
632 } else {
633 source = info->mSources.valueAt(index);
634 }
635
636 return source;
637 }
638
injectPacket(int index,const sp<ABuffer> & buffer)639 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
640 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
641 msg->setInt32("index", index);
642 msg->setBuffer("buffer", buffer);
643 msg->post();
644 }
645
onInjectPacket(const sp<AMessage> & msg)646 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
647 int32_t index;
648 CHECK(msg->findInt32("index", &index));
649
650 sp<ABuffer> buffer;
651 CHECK(msg->findBuffer("buffer", &buffer));
652
653 List<StreamInfo>::iterator it = mStreams.begin();
654 while (it != mStreams.end()
655 && it->mRTPSocket != index && it->mRTCPSocket != index) {
656 ++it;
657 }
658
659 if (it == mStreams.end()) {
660 TRESPASS();
661 }
662
663 StreamInfo *s = &*it;
664
665 status_t err;
666 if (it->mRTPSocket == index) {
667 err = parseRTP(s, buffer);
668 } else {
669 err = parseRTCP(s, buffer);
670 }
671 }
672
673 } // namespace android
674
675