• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20 
21 #include "ARTPAssembler.h"
22 #include "ARTPConnection.h"
23 
24 #include "ARTPSource.h"
25 #include "ASessionDescription.h"
26 
27 #include <media/stagefright/foundation/ABuffer.h>
28 #include <media/stagefright/foundation/ADebug.h>
29 #include <media/stagefright/foundation/AMessage.h>
30 #include <media/stagefright/foundation/AString.h>
31 #include <media/stagefright/foundation/hexdump.h>
32 
33 #include <arpa/inet.h>
34 #include <sys/socket.h>
35 
36 namespace android {
37 
38 static const size_t kMaxUDPSize = 1500;
39 
u16at(const uint8_t * data)40 static uint16_t u16at(const uint8_t *data) {
41     return data[0] << 8 | data[1];
42 }
43 
u32at(const uint8_t * data)44 static uint32_t u32at(const uint8_t *data) {
45     return u16at(data) << 16 | u16at(&data[2]);
46 }
47 
u64at(const uint8_t * data)48 static uint64_t u64at(const uint8_t *data) {
49     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50 }
51 
52 // static
53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
54 
55 struct ARTPConnection::StreamInfo {
56     int mRTPSocket;
57     int mRTCPSocket;
58     sp<ASessionDescription> mSessionDesc;
59     size_t mIndex;
60     sp<AMessage> mNotifyMsg;
61     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
62 
63     int64_t mNumRTCPPacketsReceived;
64     int64_t mNumRTPPacketsReceived;
65     struct sockaddr_in mRemoteRTCPAddr;
66 
67     bool mIsInjected;
68 };
69 
ARTPConnection(uint32_t flags)70 ARTPConnection::ARTPConnection(uint32_t flags)
71     : mFlags(flags),
72       mPollEventPending(false),
73       mLastReceiverReportTimeUs(-1) {
74 }
75 
~ARTPConnection()76 ARTPConnection::~ARTPConnection() {
77 }
78 
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)79 void ARTPConnection::addStream(
80         int rtpSocket, int rtcpSocket,
81         const sp<ASessionDescription> &sessionDesc,
82         size_t index,
83         const sp<AMessage> &notify,
84         bool injected) {
85     sp<AMessage> msg = new AMessage(kWhatAddStream, id());
86     msg->setInt32("rtp-socket", rtpSocket);
87     msg->setInt32("rtcp-socket", rtcpSocket);
88     msg->setObject("session-desc", sessionDesc);
89     msg->setSize("index", index);
90     msg->setMessage("notify", notify);
91     msg->setInt32("injected", injected);
92     msg->post();
93 }
94 
removeStream(int rtpSocket,int rtcpSocket)95 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
96     sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
97     msg->setInt32("rtp-socket", rtpSocket);
98     msg->setInt32("rtcp-socket", rtcpSocket);
99     msg->post();
100 }
101 
bumpSocketBufferSize(int s)102 static void bumpSocketBufferSize(int s) {
103     int size = 256 * 1024;
104     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
105 }
106 
107 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)108 void ARTPConnection::MakePortPair(
109         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
110     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
111     CHECK_GE(*rtpSocket, 0);
112 
113     bumpSocketBufferSize(*rtpSocket);
114 
115     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
116     CHECK_GE(*rtcpSocket, 0);
117 
118     bumpSocketBufferSize(*rtcpSocket);
119 
120     unsigned start = (rand() * 1000)/ RAND_MAX + 15550;
121     start &= ~1;
122 
123     for (unsigned port = start; port < 65536; port += 2) {
124         struct sockaddr_in addr;
125         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
126         addr.sin_family = AF_INET;
127         addr.sin_addr.s_addr = htonl(INADDR_ANY);
128         addr.sin_port = htons(port);
129 
130         if (bind(*rtpSocket,
131                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
132             continue;
133         }
134 
135         addr.sin_port = htons(port + 1);
136 
137         if (bind(*rtcpSocket,
138                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
139             *rtpPort = port;
140             return;
141         }
142     }
143 
144     TRESPASS();
145 }
146 
onMessageReceived(const sp<AMessage> & msg)147 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
148     switch (msg->what()) {
149         case kWhatAddStream:
150         {
151             onAddStream(msg);
152             break;
153         }
154 
155         case kWhatRemoveStream:
156         {
157             onRemoveStream(msg);
158             break;
159         }
160 
161         case kWhatPollStreams:
162         {
163             onPollStreams();
164             break;
165         }
166 
167         case kWhatInjectPacket:
168         {
169             onInjectPacket(msg);
170             break;
171         }
172 
173         default:
174         {
175             TRESPASS();
176             break;
177         }
178     }
179 }
180 
onAddStream(const sp<AMessage> & msg)181 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
182     mStreams.push_back(StreamInfo());
183     StreamInfo *info = &*--mStreams.end();
184 
185     int32_t s;
186     CHECK(msg->findInt32("rtp-socket", &s));
187     info->mRTPSocket = s;
188     CHECK(msg->findInt32("rtcp-socket", &s));
189     info->mRTCPSocket = s;
190 
191     int32_t injected;
192     CHECK(msg->findInt32("injected", &injected));
193 
194     info->mIsInjected = injected;
195 
196     sp<RefBase> obj;
197     CHECK(msg->findObject("session-desc", &obj));
198     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
199 
200     CHECK(msg->findSize("index", &info->mIndex));
201     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
202 
203     info->mNumRTCPPacketsReceived = 0;
204     info->mNumRTPPacketsReceived = 0;
205     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
206 
207     if (!injected) {
208         postPollEvent();
209     }
210 }
211 
onRemoveStream(const sp<AMessage> & msg)212 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
213     int32_t rtpSocket, rtcpSocket;
214     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
215     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
216 
217     List<StreamInfo>::iterator it = mStreams.begin();
218     while (it != mStreams.end()
219            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
220         ++it;
221     }
222 
223     if (it == mStreams.end()) {
224         return;
225     }
226 
227     mStreams.erase(it);
228 }
229 
postPollEvent()230 void ARTPConnection::postPollEvent() {
231     if (mPollEventPending) {
232         return;
233     }
234 
235     sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
236     msg->post();
237 
238     mPollEventPending = true;
239 }
240 
onPollStreams()241 void ARTPConnection::onPollStreams() {
242     mPollEventPending = false;
243 
244     if (mStreams.empty()) {
245         return;
246     }
247 
248     struct timeval tv;
249     tv.tv_sec = 0;
250     tv.tv_usec = kSelectTimeoutUs;
251 
252     fd_set rs;
253     FD_ZERO(&rs);
254 
255     int maxSocket = -1;
256     for (List<StreamInfo>::iterator it = mStreams.begin();
257          it != mStreams.end(); ++it) {
258         if ((*it).mIsInjected) {
259             continue;
260         }
261 
262         FD_SET(it->mRTPSocket, &rs);
263         FD_SET(it->mRTCPSocket, &rs);
264 
265         if (it->mRTPSocket > maxSocket) {
266             maxSocket = it->mRTPSocket;
267         }
268         if (it->mRTCPSocket > maxSocket) {
269             maxSocket = it->mRTCPSocket;
270         }
271     }
272 
273     if (maxSocket == -1) {
274         return;
275     }
276 
277     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
278 
279     if (res > 0) {
280         List<StreamInfo>::iterator it = mStreams.begin();
281         while (it != mStreams.end()) {
282             if ((*it).mIsInjected) {
283                 ++it;
284                 continue;
285             }
286 
287             status_t err = OK;
288             if (FD_ISSET(it->mRTPSocket, &rs)) {
289                 err = receive(&*it, true);
290             }
291             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
292                 err = receive(&*it, false);
293             }
294 
295             if (err == -ECONNRESET) {
296                 // socket failure, this stream is dead, Jim.
297 
298                 ALOGW("failed to receive RTP/RTCP datagram.");
299                 it = mStreams.erase(it);
300                 continue;
301             }
302 
303             ++it;
304         }
305     }
306 
307     int64_t nowUs = ALooper::GetNowUs();
308     if (mLastReceiverReportTimeUs <= 0
309             || mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
310         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
311         List<StreamInfo>::iterator it = mStreams.begin();
312         while (it != mStreams.end()) {
313             StreamInfo *s = &*it;
314 
315             if (s->mIsInjected) {
316                 ++it;
317                 continue;
318             }
319 
320             if (s->mNumRTCPPacketsReceived == 0) {
321                 // We have never received any RTCP packets on this stream,
322                 // we don't even know where to send a report.
323                 ++it;
324                 continue;
325             }
326 
327             buffer->setRange(0, 0);
328 
329             for (size_t i = 0; i < s->mSources.size(); ++i) {
330                 sp<ARTPSource> source = s->mSources.valueAt(i);
331 
332                 source->addReceiverReport(buffer);
333 
334                 if (mFlags & kRegularlyRequestFIR) {
335                     source->addFIR(buffer);
336                 }
337             }
338 
339             if (buffer->size() > 0) {
340                 ALOGV("Sending RR...");
341 
342                 ssize_t n;
343                 do {
344                     n = sendto(
345                         s->mRTCPSocket, buffer->data(), buffer->size(), 0,
346                         (const struct sockaddr *)&s->mRemoteRTCPAddr,
347                         sizeof(s->mRemoteRTCPAddr));
348                 } while (n < 0 && errno == EINTR);
349 
350                 if (n <= 0) {
351                     ALOGW("failed to send RTCP receiver report (%s).",
352                          n == 0 ? "connection gone" : strerror(errno));
353 
354                     it = mStreams.erase(it);
355                     continue;
356                 }
357 
358                 CHECK_EQ(n, (ssize_t)buffer->size());
359 
360                 mLastReceiverReportTimeUs = nowUs;
361             }
362 
363             ++it;
364         }
365     }
366 
367     if (!mStreams.empty()) {
368         postPollEvent();
369     }
370 }
371 
receive(StreamInfo * s,bool receiveRTP)372 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
373     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
374 
375     CHECK(!s->mIsInjected);
376 
377     sp<ABuffer> buffer = new ABuffer(65536);
378 
379     socklen_t remoteAddrLen =
380         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
381             ? sizeof(s->mRemoteRTCPAddr) : 0;
382 
383     ssize_t nbytes;
384     do {
385         nbytes = recvfrom(
386             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
387             buffer->data(),
388             buffer->capacity(),
389             0,
390             remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
391             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
392     } while (nbytes < 0 && errno == EINTR);
393 
394     if (nbytes <= 0) {
395         return -ECONNRESET;
396     }
397 
398     buffer->setRange(0, nbytes);
399 
400     // ALOGI("received %d bytes.", buffer->size());
401 
402     status_t err;
403     if (receiveRTP) {
404         err = parseRTP(s, buffer);
405     } else {
406         err = parseRTCP(s, buffer);
407     }
408 
409     return err;
410 }
411 
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)412 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
413     if (s->mNumRTPPacketsReceived++ == 0) {
414         sp<AMessage> notify = s->mNotifyMsg->dup();
415         notify->setInt32("first-rtp", true);
416         notify->post();
417     }
418 
419     size_t size = buffer->size();
420 
421     if (size < 12) {
422         // Too short to be a valid RTP header.
423         return -1;
424     }
425 
426     const uint8_t *data = buffer->data();
427 
428     if ((data[0] >> 6) != 2) {
429         // Unsupported version.
430         return -1;
431     }
432 
433     if (data[0] & 0x20) {
434         // Padding present.
435 
436         size_t paddingLength = data[size - 1];
437 
438         if (paddingLength + 12 > size) {
439             // If we removed this much padding we'd end up with something
440             // that's too short to be a valid RTP header.
441             return -1;
442         }
443 
444         size -= paddingLength;
445     }
446 
447     int numCSRCs = data[0] & 0x0f;
448 
449     size_t payloadOffset = 12 + 4 * numCSRCs;
450 
451     if (size < payloadOffset) {
452         // Not enough data to fit the basic header and all the CSRC entries.
453         return -1;
454     }
455 
456     if (data[0] & 0x10) {
457         // Header eXtension present.
458 
459         if (size < payloadOffset + 4) {
460             // Not enough data to fit the basic header, all CSRC entries
461             // and the first 4 bytes of the extension header.
462 
463             return -1;
464         }
465 
466         const uint8_t *extensionData = &data[payloadOffset];
467 
468         size_t extensionLength =
469             4 * (extensionData[2] << 8 | extensionData[3]);
470 
471         if (size < payloadOffset + 4 + extensionLength) {
472             return -1;
473         }
474 
475         payloadOffset += 4 + extensionLength;
476     }
477 
478     uint32_t srcId = u32at(&data[8]);
479 
480     sp<ARTPSource> source = findSource(s, srcId);
481 
482     uint32_t rtpTime = u32at(&data[4]);
483 
484     sp<AMessage> meta = buffer->meta();
485     meta->setInt32("ssrc", srcId);
486     meta->setInt32("rtp-time", rtpTime);
487     meta->setInt32("PT", data[1] & 0x7f);
488     meta->setInt32("M", data[1] >> 7);
489 
490     buffer->setInt32Data(u16at(&data[2]));
491     buffer->setRange(payloadOffset, size - payloadOffset);
492 
493     source->processRTPPacket(buffer);
494 
495     return OK;
496 }
497 
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)498 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
499     if (s->mNumRTCPPacketsReceived++ == 0) {
500         sp<AMessage> notify = s->mNotifyMsg->dup();
501         notify->setInt32("first-rtcp", true);
502         notify->post();
503     }
504 
505     const uint8_t *data = buffer->data();
506     size_t size = buffer->size();
507 
508     while (size > 0) {
509         if (size < 8) {
510             // Too short to be a valid RTCP header
511             return -1;
512         }
513 
514         if ((data[0] >> 6) != 2) {
515             // Unsupported version.
516             return -1;
517         }
518 
519         if (data[0] & 0x20) {
520             // Padding present.
521 
522             size_t paddingLength = data[size - 1];
523 
524             if (paddingLength + 12 > size) {
525                 // If we removed this much padding we'd end up with something
526                 // that's too short to be a valid RTP header.
527                 return -1;
528             }
529 
530             size -= paddingLength;
531         }
532 
533         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
534 
535         if (size < headerLength) {
536             // Only received a partial packet?
537             return -1;
538         }
539 
540         switch (data[1]) {
541             case 200:
542             {
543                 parseSR(s, data, headerLength);
544                 break;
545             }
546 
547             case 201:  // RR
548             case 202:  // SDES
549             case 204:  // APP
550                 break;
551 
552             case 205:  // TSFB (transport layer specific feedback)
553             case 206:  // PSFB (payload specific feedback)
554                 // hexdump(data, headerLength);
555                 break;
556 
557             case 203:
558             {
559                 parseBYE(s, data, headerLength);
560                 break;
561             }
562 
563             default:
564             {
565                 ALOGW("Unknown RTCP packet type %u of size %d",
566                      (unsigned)data[1], headerLength);
567                 break;
568             }
569         }
570 
571         data += headerLength;
572         size -= headerLength;
573     }
574 
575     return OK;
576 }
577 
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)578 status_t ARTPConnection::parseBYE(
579         StreamInfo *s, const uint8_t *data, size_t size) {
580     size_t SC = data[0] & 0x3f;
581 
582     if (SC == 0 || size < (4 + SC * 4)) {
583         // Packet too short for the minimal BYE header.
584         return -1;
585     }
586 
587     uint32_t id = u32at(&data[4]);
588 
589     sp<ARTPSource> source = findSource(s, id);
590 
591     source->byeReceived();
592 
593     return OK;
594 }
595 
parseSR(StreamInfo * s,const uint8_t * data,size_t size)596 status_t ARTPConnection::parseSR(
597         StreamInfo *s, const uint8_t *data, size_t size) {
598     size_t RC = data[0] & 0x1f;
599 
600     if (size < (7 + RC * 6) * 4) {
601         // Packet too short for the minimal SR header.
602         return -1;
603     }
604 
605     uint32_t id = u32at(&data[4]);
606     uint64_t ntpTime = u64at(&data[8]);
607     uint32_t rtpTime = u32at(&data[16]);
608 
609 #if 0
610     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
611          id,
612          rtpTime,
613          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
614 #endif
615 
616     sp<ARTPSource> source = findSource(s, id);
617 
618     source->timeUpdate(rtpTime, ntpTime);
619 
620     return 0;
621 }
622 
findSource(StreamInfo * info,uint32_t srcId)623 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
624     sp<ARTPSource> source;
625     ssize_t index = info->mSources.indexOfKey(srcId);
626     if (index < 0) {
627         index = info->mSources.size();
628 
629         source = new ARTPSource(
630                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
631 
632         info->mSources.add(srcId, source);
633     } else {
634         source = info->mSources.valueAt(index);
635     }
636 
637     return source;
638 }
639 
injectPacket(int index,const sp<ABuffer> & buffer)640 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
641     sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
642     msg->setInt32("index", index);
643     msg->setBuffer("buffer", buffer);
644     msg->post();
645 }
646 
onInjectPacket(const sp<AMessage> & msg)647 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
648     int32_t index;
649     CHECK(msg->findInt32("index", &index));
650 
651     sp<ABuffer> buffer;
652     CHECK(msg->findBuffer("buffer", &buffer));
653 
654     List<StreamInfo>::iterator it = mStreams.begin();
655     while (it != mStreams.end()
656            && it->mRTPSocket != index && it->mRTCPSocket != index) {
657         ++it;
658     }
659 
660     if (it == mStreams.end()) {
661         TRESPASS();
662     }
663 
664     StreamInfo *s = &*it;
665 
666     status_t err;
667     if (it->mRTPSocket == index) {
668         err = parseRTP(s, buffer);
669     } else {
670         err = parseRTCP(s, buffer);
671     }
672 }
673 
674 }  // namespace android
675 
676