1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20
21 #include "ARTPAssembler.h"
22 #include "ARTPConnection.h"
23
24 #include "ARTPSource.h"
25 #include "ASessionDescription.h"
26
27 #include <media/stagefright/foundation/ABuffer.h>
28 #include <media/stagefright/foundation/ADebug.h>
29 #include <media/stagefright/foundation/AMessage.h>
30 #include <media/stagefright/foundation/AString.h>
31 #include <media/stagefright/foundation/hexdump.h>
32
33 #include <arpa/inet.h>
34 #include <sys/socket.h>
35
36 namespace android {
37
38 static const size_t kMaxUDPSize = 1500;
39
u16at(const uint8_t * data)40 static uint16_t u16at(const uint8_t *data) {
41 return data[0] << 8 | data[1];
42 }
43
u32at(const uint8_t * data)44 static uint32_t u32at(const uint8_t *data) {
45 return u16at(data) << 16 | u16at(&data[2]);
46 }
47
u64at(const uint8_t * data)48 static uint64_t u64at(const uint8_t *data) {
49 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50 }
51
52 // static
53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
54
55 struct ARTPConnection::StreamInfo {
56 int mRTPSocket;
57 int mRTCPSocket;
58 sp<ASessionDescription> mSessionDesc;
59 size_t mIndex;
60 sp<AMessage> mNotifyMsg;
61 KeyedVector<uint32_t, sp<ARTPSource> > mSources;
62
63 int64_t mNumRTCPPacketsReceived;
64 int64_t mNumRTPPacketsReceived;
65 struct sockaddr_in mRemoteRTCPAddr;
66
67 bool mIsInjected;
68 };
69
ARTPConnection(uint32_t flags)70 ARTPConnection::ARTPConnection(uint32_t flags)
71 : mFlags(flags),
72 mPollEventPending(false),
73 mLastReceiverReportTimeUs(-1) {
74 }
75
~ARTPConnection()76 ARTPConnection::~ARTPConnection() {
77 }
78
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)79 void ARTPConnection::addStream(
80 int rtpSocket, int rtcpSocket,
81 const sp<ASessionDescription> &sessionDesc,
82 size_t index,
83 const sp<AMessage> ¬ify,
84 bool injected) {
85 sp<AMessage> msg = new AMessage(kWhatAddStream, id());
86 msg->setInt32("rtp-socket", rtpSocket);
87 msg->setInt32("rtcp-socket", rtcpSocket);
88 msg->setObject("session-desc", sessionDesc);
89 msg->setSize("index", index);
90 msg->setMessage("notify", notify);
91 msg->setInt32("injected", injected);
92 msg->post();
93 }
94
removeStream(int rtpSocket,int rtcpSocket)95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
96 sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
97 msg->setInt32("rtp-socket", rtpSocket);
98 msg->setInt32("rtcp-socket", rtcpSocket);
99 msg->post();
100 }
101
bumpSocketBufferSize(int s)102 static void bumpSocketBufferSize(int s) {
103 int size = 256 * 1024;
104 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
105 }
106
107 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)108 void ARTPConnection::MakePortPair(
109 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
110 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
111 CHECK_GE(*rtpSocket, 0);
112
113 bumpSocketBufferSize(*rtpSocket);
114
115 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
116 CHECK_GE(*rtcpSocket, 0);
117
118 bumpSocketBufferSize(*rtcpSocket);
119
120 unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
121 start &= ~1;
122
123 for (unsigned port = start; port < 65536; port += 2) {
124 struct sockaddr_in addr;
125 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
126 addr.sin_family = AF_INET;
127 addr.sin_addr.s_addr = htonl(INADDR_ANY);
128 addr.sin_port = htons(port);
129
130 if (bind(*rtpSocket,
131 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
132 continue;
133 }
134
135 addr.sin_port = htons(port + 1);
136
137 if (bind(*rtcpSocket,
138 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
139 *rtpPort = port;
140 return;
141 }
142 }
143
144 TRESPASS();
145 }
146
onMessageReceived(const sp<AMessage> & msg)147 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
148 switch (msg->what()) {
149 case kWhatAddStream:
150 {
151 onAddStream(msg);
152 break;
153 }
154
155 case kWhatRemoveStream:
156 {
157 onRemoveStream(msg);
158 break;
159 }
160
161 case kWhatPollStreams:
162 {
163 onPollStreams();
164 break;
165 }
166
167 case kWhatInjectPacket:
168 {
169 onInjectPacket(msg);
170 break;
171 }
172
173 default:
174 {
175 TRESPASS();
176 break;
177 }
178 }
179 }
180
onAddStream(const sp<AMessage> & msg)181 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
182 mStreams.push_back(StreamInfo());
183 StreamInfo *info = &*--mStreams.end();
184
185 int32_t s;
186 CHECK(msg->findInt32("rtp-socket", &s));
187 info->mRTPSocket = s;
188 CHECK(msg->findInt32("rtcp-socket", &s));
189 info->mRTCPSocket = s;
190
191 int32_t injected;
192 CHECK(msg->findInt32("injected", &injected));
193
194 info->mIsInjected = injected;
195
196 sp<RefBase> obj;
197 CHECK(msg->findObject("session-desc", &obj));
198 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
199
200 CHECK(msg->findSize("index", &info->mIndex));
201 CHECK(msg->findMessage("notify", &info->mNotifyMsg));
202
203 info->mNumRTCPPacketsReceived = 0;
204 info->mNumRTPPacketsReceived = 0;
205 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
206
207 if (!injected) {
208 postPollEvent();
209 }
210 }
211
onRemoveStream(const sp<AMessage> & msg)212 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
213 int32_t rtpSocket, rtcpSocket;
214 CHECK(msg->findInt32("rtp-socket", &rtpSocket));
215 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
216
217 List<StreamInfo>::iterator it = mStreams.begin();
218 while (it != mStreams.end()
219 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
220 ++it;
221 }
222
223 if (it == mStreams.end()) {
224 return;
225 }
226
227 mStreams.erase(it);
228 }
229
postPollEvent()230 void ARTPConnection::postPollEvent() {
231 if (mPollEventPending) {
232 return;
233 }
234
235 sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
236 msg->post();
237
238 mPollEventPending = true;
239 }
240
onPollStreams()241 void ARTPConnection::onPollStreams() {
242 mPollEventPending = false;
243
244 if (mStreams.empty()) {
245 return;
246 }
247
248 struct timeval tv;
249 tv.tv_sec = 0;
250 tv.tv_usec = kSelectTimeoutUs;
251
252 fd_set rs;
253 FD_ZERO(&rs);
254
255 int maxSocket = -1;
256 for (List<StreamInfo>::iterator it = mStreams.begin();
257 it != mStreams.end(); ++it) {
258 if ((*it).mIsInjected) {
259 continue;
260 }
261
262 FD_SET(it->mRTPSocket, &rs);
263 FD_SET(it->mRTCPSocket, &rs);
264
265 if (it->mRTPSocket > maxSocket) {
266 maxSocket = it->mRTPSocket;
267 }
268 if (it->mRTCPSocket > maxSocket) {
269 maxSocket = it->mRTCPSocket;
270 }
271 }
272
273 if (maxSocket == -1) {
274 return;
275 }
276
277 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
278
279 if (res > 0) {
280 List<StreamInfo>::iterator it = mStreams.begin();
281 while (it != mStreams.end()) {
282 if ((*it).mIsInjected) {
283 ++it;
284 continue;
285 }
286
287 status_t err = OK;
288 if (FD_ISSET(it->mRTPSocket, &rs)) {
289 err = receive(&*it, true);
290 }
291 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
292 err = receive(&*it, false);
293 }
294
295 if (err == -ECONNRESET) {
296 // socket failure, this stream is dead, Jim.
297
298 ALOGW("failed to receive RTP/RTCP datagram.");
299 it = mStreams.erase(it);
300 continue;
301 }
302
303 ++it;
304 }
305 }
306
307 int64_t nowUs = ALooper::GetNowUs();
308 if (mLastReceiverReportTimeUs <= 0
309 || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
310 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
311 List<StreamInfo>::iterator it = mStreams.begin();
312 while (it != mStreams.end()) {
313 StreamInfo *s = &*it;
314
315 if (s->mIsInjected) {
316 ++it;
317 continue;
318 }
319
320 if (s->mNumRTCPPacketsReceived == 0) {
321 // We have never received any RTCP packets on this stream,
322 // we don't even know where to send a report.
323 ++it;
324 continue;
325 }
326
327 buffer->setRange(0, 0);
328
329 for (size_t i = 0; i < s->mSources.size(); ++i) {
330 sp<ARTPSource> source = s->mSources.valueAt(i);
331
332 source->addReceiverReport(buffer);
333
334 if (mFlags & kRegularlyRequestFIR) {
335 source->addFIR(buffer);
336 }
337 }
338
339 if (buffer->size() > 0) {
340 ALOGV("Sending RR...");
341
342 ssize_t n;
343 do {
344 n = sendto(
345 s->mRTCPSocket, buffer->data(), buffer->size(), 0,
346 (const struct sockaddr *)&s->mRemoteRTCPAddr,
347 sizeof(s->mRemoteRTCPAddr));
348 } while (n < 0 && errno == EINTR);
349
350 if (n <= 0) {
351 ALOGW("failed to send RTCP receiver report (%s).",
352 n == 0 ? "connection gone" : strerror(errno));
353
354 it = mStreams.erase(it);
355 continue;
356 }
357
358 CHECK_EQ(n, (ssize_t)buffer->size());
359
360 mLastReceiverReportTimeUs = nowUs;
361 }
362
363 ++it;
364 }
365 }
366
367 if (!mStreams.empty()) {
368 postPollEvent();
369 }
370 }
371
receive(StreamInfo * s,bool receiveRTP)372 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
373 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
374
375 CHECK(!s->mIsInjected);
376
377 sp<ABuffer> buffer = new ABuffer(65536);
378
379 socklen_t remoteAddrLen =
380 (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
381 ? sizeof(s->mRemoteRTCPAddr) : 0;
382
383 ssize_t nbytes;
384 do {
385 nbytes = recvfrom(
386 receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
387 buffer->data(),
388 buffer->capacity(),
389 0,
390 remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
391 remoteAddrLen > 0 ? &remoteAddrLen : NULL);
392 } while (nbytes < 0 && errno == EINTR);
393
394 if (nbytes <= 0) {
395 return -ECONNRESET;
396 }
397
398 buffer->setRange(0, nbytes);
399
400 // ALOGI("received %d bytes.", buffer->size());
401
402 status_t err;
403 if (receiveRTP) {
404 err = parseRTP(s, buffer);
405 } else {
406 err = parseRTCP(s, buffer);
407 }
408
409 return err;
410 }
411
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)412 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
413 if (s->mNumRTPPacketsReceived++ == 0) {
414 sp<AMessage> notify = s->mNotifyMsg->dup();
415 notify->setInt32("first-rtp", true);
416 notify->post();
417 }
418
419 size_t size = buffer->size();
420
421 if (size < 12) {
422 // Too short to be a valid RTP header.
423 return -1;
424 }
425
426 const uint8_t *data = buffer->data();
427
428 if ((data[0] >> 6) != 2) {
429 // Unsupported version.
430 return -1;
431 }
432
433 if (data[0] & 0x20) {
434 // Padding present.
435
436 size_t paddingLength = data[size - 1];
437
438 if (paddingLength + 12 > size) {
439 // If we removed this much padding we'd end up with something
440 // that's too short to be a valid RTP header.
441 return -1;
442 }
443
444 size -= paddingLength;
445 }
446
447 int numCSRCs = data[0] & 0x0f;
448
449 size_t payloadOffset = 12 + 4 * numCSRCs;
450
451 if (size < payloadOffset) {
452 // Not enough data to fit the basic header and all the CSRC entries.
453 return -1;
454 }
455
456 if (data[0] & 0x10) {
457 // Header eXtension present.
458
459 if (size < payloadOffset + 4) {
460 // Not enough data to fit the basic header, all CSRC entries
461 // and the first 4 bytes of the extension header.
462
463 return -1;
464 }
465
466 const uint8_t *extensionData = &data[payloadOffset];
467
468 size_t extensionLength =
469 4 * (extensionData[2] << 8 | extensionData[3]);
470
471 if (size < payloadOffset + 4 + extensionLength) {
472 return -1;
473 }
474
475 payloadOffset += 4 + extensionLength;
476 }
477
478 uint32_t srcId = u32at(&data[8]);
479
480 sp<ARTPSource> source = findSource(s, srcId);
481
482 uint32_t rtpTime = u32at(&data[4]);
483
484 sp<AMessage> meta = buffer->meta();
485 meta->setInt32("ssrc", srcId);
486 meta->setInt32("rtp-time", rtpTime);
487 meta->setInt32("PT", data[1] & 0x7f);
488 meta->setInt32("M", data[1] >> 7);
489
490 buffer->setInt32Data(u16at(&data[2]));
491 buffer->setRange(payloadOffset, size - payloadOffset);
492
493 source->processRTPPacket(buffer);
494
495 return OK;
496 }
497
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)498 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
499 if (s->mNumRTCPPacketsReceived++ == 0) {
500 sp<AMessage> notify = s->mNotifyMsg->dup();
501 notify->setInt32("first-rtcp", true);
502 notify->post();
503 }
504
505 const uint8_t *data = buffer->data();
506 size_t size = buffer->size();
507
508 while (size > 0) {
509 if (size < 8) {
510 // Too short to be a valid RTCP header
511 return -1;
512 }
513
514 if ((data[0] >> 6) != 2) {
515 // Unsupported version.
516 return -1;
517 }
518
519 if (data[0] & 0x20) {
520 // Padding present.
521
522 size_t paddingLength = data[size - 1];
523
524 if (paddingLength + 12 > size) {
525 // If we removed this much padding we'd end up with something
526 // that's too short to be a valid RTP header.
527 return -1;
528 }
529
530 size -= paddingLength;
531 }
532
533 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
534
535 if (size < headerLength) {
536 // Only received a partial packet?
537 return -1;
538 }
539
540 switch (data[1]) {
541 case 200:
542 {
543 parseSR(s, data, headerLength);
544 break;
545 }
546
547 case 201: // RR
548 case 202: // SDES
549 case 204: // APP
550 break;
551
552 case 205: // TSFB (transport layer specific feedback)
553 case 206: // PSFB (payload specific feedback)
554 // hexdump(data, headerLength);
555 break;
556
557 case 203:
558 {
559 parseBYE(s, data, headerLength);
560 break;
561 }
562
563 default:
564 {
565 ALOGW("Unknown RTCP packet type %u of size %d",
566 (unsigned)data[1], headerLength);
567 break;
568 }
569 }
570
571 data += headerLength;
572 size -= headerLength;
573 }
574
575 return OK;
576 }
577
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)578 status_t ARTPConnection::parseBYE(
579 StreamInfo *s, const uint8_t *data, size_t size) {
580 size_t SC = data[0] & 0x3f;
581
582 if (SC == 0 || size < (4 + SC * 4)) {
583 // Packet too short for the minimal BYE header.
584 return -1;
585 }
586
587 uint32_t id = u32at(&data[4]);
588
589 sp<ARTPSource> source = findSource(s, id);
590
591 source->byeReceived();
592
593 return OK;
594 }
595
parseSR(StreamInfo * s,const uint8_t * data,size_t size)596 status_t ARTPConnection::parseSR(
597 StreamInfo *s, const uint8_t *data, size_t size) {
598 size_t RC = data[0] & 0x1f;
599
600 if (size < (7 + RC * 6) * 4) {
601 // Packet too short for the minimal SR header.
602 return -1;
603 }
604
605 uint32_t id = u32at(&data[4]);
606 uint64_t ntpTime = u64at(&data[8]);
607 uint32_t rtpTime = u32at(&data[16]);
608
609 #if 0
610 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
611 id,
612 rtpTime,
613 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
614 #endif
615
616 sp<ARTPSource> source = findSource(s, id);
617
618 source->timeUpdate(rtpTime, ntpTime);
619
620 return 0;
621 }
622
findSource(StreamInfo * info,uint32_t srcId)623 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
624 sp<ARTPSource> source;
625 ssize_t index = info->mSources.indexOfKey(srcId);
626 if (index < 0) {
627 index = info->mSources.size();
628
629 source = new ARTPSource(
630 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
631
632 info->mSources.add(srcId, source);
633 } else {
634 source = info->mSources.valueAt(index);
635 }
636
637 return source;
638 }
639
injectPacket(int index,const sp<ABuffer> & buffer)640 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
641 sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
642 msg->setInt32("index", index);
643 msg->setBuffer("buffer", buffer);
644 msg->post();
645 }
646
onInjectPacket(const sp<AMessage> & msg)647 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
648 int32_t index;
649 CHECK(msg->findInt32("index", &index));
650
651 sp<ABuffer> buffer;
652 CHECK(msg->findBuffer("buffer", &buffer));
653
654 List<StreamInfo>::iterator it = mStreams.begin();
655 while (it != mStreams.end()
656 && it->mRTPSocket != index && it->mRTCPSocket != index) {
657 ++it;
658 }
659
660 if (it == mStreams.end()) {
661 TRESPASS();
662 }
663
664 StreamInfo *s = &*it;
665
666 status_t err;
667 if (it->mRTPSocket == index) {
668 err = parseRTP(s, buffer);
669 } else {
670 err = parseRTCP(s, buffer);
671 }
672 }
673
674 } // namespace android
675
676