• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20 
21 #include "ARTPConnection.h"
22 
23 #include "ARTPSource.h"
24 #include "ASessionDescription.h"
25 
26 #include <media/stagefright/foundation/ABuffer.h>
27 #include <media/stagefright/foundation/ADebug.h>
28 #include <media/stagefright/foundation/AMessage.h>
29 #include <media/stagefright/foundation/AString.h>
30 #include <media/stagefright/foundation/hexdump.h>
31 
32 #include <arpa/inet.h>
33 #include <sys/socket.h>
34 
35 namespace android {
36 
37 static const size_t kMaxUDPSize = 1500;
38 
u16at(const uint8_t * data)39 static uint16_t u16at(const uint8_t *data) {
40     return data[0] << 8 | data[1];
41 }
42 
u32at(const uint8_t * data)43 static uint32_t u32at(const uint8_t *data) {
44     return u16at(data) << 16 | u16at(&data[2]);
45 }
46 
u64at(const uint8_t * data)47 static uint64_t u64at(const uint8_t *data) {
48     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
49 }
50 
51 // static
52 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
53 
54 struct ARTPConnection::StreamInfo {
55     int mRTPSocket;
56     int mRTCPSocket;
57     sp<ASessionDescription> mSessionDesc;
58     size_t mIndex;
59     sp<AMessage> mNotifyMsg;
60     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
61 
62     int64_t mNumRTCPPacketsReceived;
63     int64_t mNumRTPPacketsReceived;
64     struct sockaddr_in mRemoteRTCPAddr;
65 
66     bool mIsInjected;
67 };
68 
ARTPConnection(uint32_t flags)69 ARTPConnection::ARTPConnection(uint32_t flags)
70     : mFlags(flags),
71       mPollEventPending(false),
72       mLastReceiverReportTimeUs(-1) {
73 }
74 
~ARTPConnection()75 ARTPConnection::~ARTPConnection() {
76 }
77 
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)78 void ARTPConnection::addStream(
79         int rtpSocket, int rtcpSocket,
80         const sp<ASessionDescription> &sessionDesc,
81         size_t index,
82         const sp<AMessage> &notify,
83         bool injected) {
84     sp<AMessage> msg = new AMessage(kWhatAddStream, id());
85     msg->setInt32("rtp-socket", rtpSocket);
86     msg->setInt32("rtcp-socket", rtcpSocket);
87     msg->setObject("session-desc", sessionDesc);
88     msg->setSize("index", index);
89     msg->setMessage("notify", notify);
90     msg->setInt32("injected", injected);
91     msg->post();
92 }
93 
removeStream(int rtpSocket,int rtcpSocket)94 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
95     sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
96     msg->setInt32("rtp-socket", rtpSocket);
97     msg->setInt32("rtcp-socket", rtcpSocket);
98     msg->post();
99 }
100 
bumpSocketBufferSize(int s)101 static void bumpSocketBufferSize(int s) {
102     int size = 256 * 1024;
103     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
104 }
105 
106 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)107 void ARTPConnection::MakePortPair(
108         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
109     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
110     CHECK_GE(*rtpSocket, 0);
111 
112     bumpSocketBufferSize(*rtpSocket);
113 
114     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
115     CHECK_GE(*rtcpSocket, 0);
116 
117     bumpSocketBufferSize(*rtcpSocket);
118 
119     unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
120     start &= ~1;
121 
122     for (unsigned port = start; port < 65536; port += 2) {
123         struct sockaddr_in addr;
124         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
125         addr.sin_family = AF_INET;
126         addr.sin_addr.s_addr = htonl(INADDR_ANY);
127         addr.sin_port = htons(port);
128 
129         if (bind(*rtpSocket,
130                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
131             continue;
132         }
133 
134         addr.sin_port = htons(port + 1);
135 
136         if (bind(*rtcpSocket,
137                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
138             *rtpPort = port;
139             return;
140         }
141     }
142 
143     TRESPASS();
144 }
145 
onMessageReceived(const sp<AMessage> & msg)146 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
147     switch (msg->what()) {
148         case kWhatAddStream:
149         {
150             onAddStream(msg);
151             break;
152         }
153 
154         case kWhatRemoveStream:
155         {
156             onRemoveStream(msg);
157             break;
158         }
159 
160         case kWhatPollStreams:
161         {
162             onPollStreams();
163             break;
164         }
165 
166         case kWhatInjectPacket:
167         {
168             onInjectPacket(msg);
169             break;
170         }
171 
172         default:
173         {
174             TRESPASS();
175             break;
176         }
177     }
178 }
179 
onAddStream(const sp<AMessage> & msg)180 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
181     mStreams.push_back(StreamInfo());
182     StreamInfo *info = &*--mStreams.end();
183 
184     int32_t s;
185     CHECK(msg->findInt32("rtp-socket", &s));
186     info->mRTPSocket = s;
187     CHECK(msg->findInt32("rtcp-socket", &s));
188     info->mRTCPSocket = s;
189 
190     int32_t injected;
191     CHECK(msg->findInt32("injected", &injected));
192 
193     info->mIsInjected = injected;
194 
195     sp<RefBase> obj;
196     CHECK(msg->findObject("session-desc", &obj));
197     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
198 
199     CHECK(msg->findSize("index", &info->mIndex));
200     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
201 
202     info->mNumRTCPPacketsReceived = 0;
203     info->mNumRTPPacketsReceived = 0;
204     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
205 
206     if (!injected) {
207         postPollEvent();
208     }
209 }
210 
onRemoveStream(const sp<AMessage> & msg)211 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
212     int32_t rtpSocket, rtcpSocket;
213     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
214     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
215 
216     List<StreamInfo>::iterator it = mStreams.begin();
217     while (it != mStreams.end()
218            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
219         ++it;
220     }
221 
222     if (it == mStreams.end()) {
223         return;
224     }
225 
226     mStreams.erase(it);
227 }
228 
postPollEvent()229 void ARTPConnection::postPollEvent() {
230     if (mPollEventPending) {
231         return;
232     }
233 
234     sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
235     msg->post();
236 
237     mPollEventPending = true;
238 }
239 
onPollStreams()240 void ARTPConnection::onPollStreams() {
241     mPollEventPending = false;
242 
243     if (mStreams.empty()) {
244         return;
245     }
246 
247     struct timeval tv;
248     tv.tv_sec = 0;
249     tv.tv_usec = kSelectTimeoutUs;
250 
251     fd_set rs;
252     FD_ZERO(&rs);
253 
254     int maxSocket = -1;
255     for (List<StreamInfo>::iterator it = mStreams.begin();
256          it != mStreams.end(); ++it) {
257         if ((*it).mIsInjected) {
258             continue;
259         }
260 
261         FD_SET(it->mRTPSocket, &rs);
262         FD_SET(it->mRTCPSocket, &rs);
263 
264         if (it->mRTPSocket > maxSocket) {
265             maxSocket = it->mRTPSocket;
266         }
267         if (it->mRTCPSocket > maxSocket) {
268             maxSocket = it->mRTCPSocket;
269         }
270     }
271 
272     if (maxSocket == -1) {
273         return;
274     }
275 
276     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
277 
278     if (res > 0) {
279         List<StreamInfo>::iterator it = mStreams.begin();
280         while (it != mStreams.end()) {
281             if ((*it).mIsInjected) {
282                 ++it;
283                 continue;
284             }
285 
286             status_t err = OK;
287             if (FD_ISSET(it->mRTPSocket, &rs)) {
288                 err = receive(&*it, true);
289             }
290             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
291                 err = receive(&*it, false);
292             }
293 
294             if (err == -ECONNRESET) {
295                 // socket failure, this stream is dead, Jim.
296 
297                 ALOGW("failed to receive RTP/RTCP datagram.");
298                 it = mStreams.erase(it);
299                 continue;
300             }
301 
302             ++it;
303         }
304     }
305 
306     int64_t nowUs = ALooper::GetNowUs();
307     if (mLastReceiverReportTimeUs <= 0
308             || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
309         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
310         List<StreamInfo>::iterator it = mStreams.begin();
311         while (it != mStreams.end()) {
312             StreamInfo *s = &*it;
313 
314             if (s->mIsInjected) {
315                 ++it;
316                 continue;
317             }
318 
319             if (s->mNumRTCPPacketsReceived == 0) {
320                 // We have never received any RTCP packets on this stream,
321                 // we don't even know where to send a report.
322                 ++it;
323                 continue;
324             }
325 
326             buffer->setRange(0, 0);
327 
328             for (size_t i = 0; i < s->mSources.size(); ++i) {
329                 sp<ARTPSource> source = s->mSources.valueAt(i);
330 
331                 source->addReceiverReport(buffer);
332 
333                 if (mFlags & kRegularlyRequestFIR) {
334                     source->addFIR(buffer);
335                 }
336             }
337 
338             if (buffer->size() > 0) {
339                 ALOGV("Sending RR...");
340 
341                 ssize_t n;
342                 do {
343                     n = sendto(
344                         s->mRTCPSocket, buffer->data(), buffer->size(), 0,
345                         (const struct sockaddr *)&s->mRemoteRTCPAddr,
346                         sizeof(s->mRemoteRTCPAddr));
347                 } while (n < 0 && errno == EINTR);
348 
349                 if (n <= 0) {
350                     ALOGW("failed to send RTCP receiver report (%s).",
351                          n == 0 ? "connection gone" : strerror(errno));
352 
353                     it = mStreams.erase(it);
354                     continue;
355                 }
356 
357                 CHECK_EQ(n, (ssize_t)buffer->size());
358 
359                 mLastReceiverReportTimeUs = nowUs;
360             }
361 
362             ++it;
363         }
364     }
365 
366     if (!mStreams.empty()) {
367         postPollEvent();
368     }
369 }
370 
receive(StreamInfo * s,bool receiveRTP)371 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
372     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
373 
374     CHECK(!s->mIsInjected);
375 
376     sp<ABuffer> buffer = new ABuffer(65536);
377 
378     socklen_t remoteAddrLen =
379         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
380             ? sizeof(s->mRemoteRTCPAddr) : 0;
381 
382     ssize_t nbytes;
383     do {
384         nbytes = recvfrom(
385             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
386             buffer->data(),
387             buffer->capacity(),
388             0,
389             remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
390             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
391     } while (nbytes < 0 && errno == EINTR);
392 
393     if (nbytes <= 0) {
394         return -ECONNRESET;
395     }
396 
397     buffer->setRange(0, nbytes);
398 
399     // ALOGI("received %d bytes.", buffer->size());
400 
401     status_t err;
402     if (receiveRTP) {
403         err = parseRTP(s, buffer);
404     } else {
405         err = parseRTCP(s, buffer);
406     }
407 
408     return err;
409 }
410 
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)411 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
412     if (s->mNumRTPPacketsReceived++ == 0) {
413         sp<AMessage> notify = s->mNotifyMsg->dup();
414         notify->setInt32("first-rtp", true);
415         notify->post();
416     }
417 
418     size_t size = buffer->size();
419 
420     if (size < 12) {
421         // Too short to be a valid RTP header.
422         return -1;
423     }
424 
425     const uint8_t *data = buffer->data();
426 
427     if ((data[0] >> 6) != 2) {
428         // Unsupported version.
429         return -1;
430     }
431 
432     if (data[0] & 0x20) {
433         // Padding present.
434 
435         size_t paddingLength = data[size - 1];
436 
437         if (paddingLength + 12 > size) {
438             // If we removed this much padding we'd end up with something
439             // that's too short to be a valid RTP header.
440             return -1;
441         }
442 
443         size -= paddingLength;
444     }
445 
446     int numCSRCs = data[0] & 0x0f;
447 
448     size_t payloadOffset = 12 + 4 * numCSRCs;
449 
450     if (size < payloadOffset) {
451         // Not enough data to fit the basic header and all the CSRC entries.
452         return -1;
453     }
454 
455     if (data[0] & 0x10) {
456         // Header eXtension present.
457 
458         if (size < payloadOffset + 4) {
459             // Not enough data to fit the basic header, all CSRC entries
460             // and the first 4 bytes of the extension header.
461 
462             return -1;
463         }
464 
465         const uint8_t *extensionData = &data[payloadOffset];
466 
467         size_t extensionLength =
468             4 * (extensionData[2] << 8 | extensionData[3]);
469 
470         if (size < payloadOffset + 4 + extensionLength) {
471             return -1;
472         }
473 
474         payloadOffset += 4 + extensionLength;
475     }
476 
477     uint32_t srcId = u32at(&data[8]);
478 
479     sp<ARTPSource> source = findSource(s, srcId);
480 
481     uint32_t rtpTime = u32at(&data[4]);
482 
483     sp<AMessage> meta = buffer->meta();
484     meta->setInt32("ssrc", srcId);
485     meta->setInt32("rtp-time", rtpTime);
486     meta->setInt32("PT", data[1] & 0x7f);
487     meta->setInt32("M", data[1] >> 7);
488 
489     buffer->setInt32Data(u16at(&data[2]));
490     buffer->setRange(payloadOffset, size - payloadOffset);
491 
492     source->processRTPPacket(buffer);
493 
494     return OK;
495 }
496 
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)497 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
498     if (s->mNumRTCPPacketsReceived++ == 0) {
499         sp<AMessage> notify = s->mNotifyMsg->dup();
500         notify->setInt32("first-rtcp", true);
501         notify->post();
502     }
503 
504     const uint8_t *data = buffer->data();
505     size_t size = buffer->size();
506 
507     while (size > 0) {
508         if (size < 8) {
509             // Too short to be a valid RTCP header
510             return -1;
511         }
512 
513         if ((data[0] >> 6) != 2) {
514             // Unsupported version.
515             return -1;
516         }
517 
518         if (data[0] & 0x20) {
519             // Padding present.
520 
521             size_t paddingLength = data[size - 1];
522 
523             if (paddingLength + 12 > size) {
524                 // If we removed this much padding we'd end up with something
525                 // that's too short to be a valid RTP header.
526                 return -1;
527             }
528 
529             size -= paddingLength;
530         }
531 
532         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
533 
534         if (size < headerLength) {
535             // Only received a partial packet?
536             return -1;
537         }
538 
539         switch (data[1]) {
540             case 200:
541             {
542                 parseSR(s, data, headerLength);
543                 break;
544             }
545 
546             case 201:  // RR
547             case 202:  // SDES
548             case 204:  // APP
549                 break;
550 
551             case 205:  // TSFB (transport layer specific feedback)
552             case 206:  // PSFB (payload specific feedback)
553                 // hexdump(data, headerLength);
554                 break;
555 
556             case 203:
557             {
558                 parseBYE(s, data, headerLength);
559                 break;
560             }
561 
562             default:
563             {
564                 ALOGW("Unknown RTCP packet type %u of size %d",
565                      (unsigned)data[1], headerLength);
566                 break;
567             }
568         }
569 
570         data += headerLength;
571         size -= headerLength;
572     }
573 
574     return OK;
575 }
576 
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)577 status_t ARTPConnection::parseBYE(
578         StreamInfo *s, const uint8_t *data, size_t size) {
579     size_t SC = data[0] & 0x3f;
580 
581     if (SC == 0 || size < (4 + SC * 4)) {
582         // Packet too short for the minimal BYE header.
583         return -1;
584     }
585 
586     uint32_t id = u32at(&data[4]);
587 
588     sp<ARTPSource> source = findSource(s, id);
589 
590     source->byeReceived();
591 
592     return OK;
593 }
594 
parseSR(StreamInfo * s,const uint8_t * data,size_t size)595 status_t ARTPConnection::parseSR(
596         StreamInfo *s, const uint8_t *data, size_t size) {
597     size_t RC = data[0] & 0x1f;
598 
599     if (size < (7 + RC * 6) * 4) {
600         // Packet too short for the minimal SR header.
601         return -1;
602     }
603 
604     uint32_t id = u32at(&data[4]);
605     uint64_t ntpTime = u64at(&data[8]);
606     uint32_t rtpTime = u32at(&data[16]);
607 
608 #if 0
609     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
610          id,
611          rtpTime,
612          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
613 #endif
614 
615     sp<ARTPSource> source = findSource(s, id);
616 
617     source->timeUpdate(rtpTime, ntpTime);
618 
619     return 0;
620 }
621 
findSource(StreamInfo * info,uint32_t srcId)622 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
623     sp<ARTPSource> source;
624     ssize_t index = info->mSources.indexOfKey(srcId);
625     if (index < 0) {
626         index = info->mSources.size();
627 
628         source = new ARTPSource(
629                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
630 
631         info->mSources.add(srcId, source);
632     } else {
633         source = info->mSources.valueAt(index);
634     }
635 
636     return source;
637 }
638 
injectPacket(int index,const sp<ABuffer> & buffer)639 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
640     sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
641     msg->setInt32("index", index);
642     msg->setBuffer("buffer", buffer);
643     msg->post();
644 }
645 
onInjectPacket(const sp<AMessage> & msg)646 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
647     int32_t index;
648     CHECK(msg->findInt32("index", &index));
649 
650     sp<ABuffer> buffer;
651     CHECK(msg->findBuffer("buffer", &buffer));
652 
653     List<StreamInfo>::iterator it = mStreams.begin();
654     while (it != mStreams.end()
655            && it->mRTPSocket != index && it->mRTCPSocket != index) {
656         ++it;
657     }
658 
659     if (it == mStreams.end()) {
660         TRESPASS();
661     }
662 
663     StreamInfo *s = &*it;
664 
665     status_t err;
666     if (it->mRTPSocket == index) {
667         err = parseRTP(s, buffer);
668     } else {
669         err = parseRTCP(s, buffer);
670     }
671 }
672 
673 }  // namespace android
674 
675