• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20 
21 #include "ARTPConnection.h"
22 #include "ARTPSource.h"
23 #include "ASessionDescription.h"
24 
25 #include <media/stagefright/foundation/ABuffer.h>
26 #include <media/stagefright/foundation/ADebug.h>
27 #include <media/stagefright/foundation/AMessage.h>
28 #include <media/stagefright/foundation/AString.h>
29 #include <media/stagefright/foundation/hexdump.h>
30 
31 #include <android/multinetwork.h>
32 
33 #include <arpa/inet.h>
34 #include <sys/socket.h>
35 
36 namespace android {
37 
38 static const size_t kMaxUDPSize = 1500;
39 
u16at(const uint8_t * data)40 static uint16_t u16at(const uint8_t *data) {
41     return data[0] << 8 | data[1];
42 }
43 
u32at(const uint8_t * data)44 static uint32_t u32at(const uint8_t *data) {
45     return u16at(data) << 16 | u16at(&data[2]);
46 }
47 
u64at(const uint8_t * data)48 static uint64_t u64at(const uint8_t *data) {
49     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50 }
51 
52 // static
53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000LL;
54 
55 struct ARTPConnection::StreamInfo {
56     bool isIPv6;
57     int mRTPSocket;
58     int mRTCPSocket;
59     sp<ASessionDescription> mSessionDesc;
60     size_t mIndex;
61     sp<AMessage> mNotifyMsg;
62     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
63 
64     int64_t mNumRTCPPacketsReceived;
65     int64_t mNumRTPPacketsReceived;
66     struct sockaddr_in mRemoteRTCPAddr;
67     struct sockaddr_in6 mRemoteRTCPAddr6;
68 
69     bool mIsInjected;
70 
71     // A place to save time when it polls
72     int64_t mLastPollTimeUs;
73     // RTCP Extension for CVO
74     int mCVOExtMap; // will be set to 0 if cvo is not negotiated in sdp
75 };
76 
ARTPConnection(uint32_t flags)77 ARTPConnection::ARTPConnection(uint32_t flags)
78     : mFlags(flags),
79       mPollEventPending(false),
80       mLastReceiverReportTimeUs(-1),
81       mLastBitrateReportTimeUs(-1),
82       mTargetBitrate(-1),
83       mStaticJitterTimeMs(kStaticJitterTimeMs) {
84 }
85 
~ARTPConnection()86 ARTPConnection::~ARTPConnection() {
87 }
88 
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)89 void ARTPConnection::addStream(
90         int rtpSocket, int rtcpSocket,
91         const sp<ASessionDescription> &sessionDesc,
92         size_t index,
93         const sp<AMessage> &notify,
94         bool injected) {
95     sp<AMessage> msg = new AMessage(kWhatAddStream, this);
96     msg->setInt32("rtp-socket", rtpSocket);
97     msg->setInt32("rtcp-socket", rtcpSocket);
98     msg->setObject("session-desc", sessionDesc);
99     msg->setSize("index", index);
100     msg->setMessage("notify", notify);
101     msg->setInt32("injected", injected);
102     msg->post();
103 }
104 
seekStream()105 void ARTPConnection::seekStream() {
106     sp<AMessage> msg = new AMessage(kWhatSeekStream, this);
107     msg->post();
108 }
109 
removeStream(int rtpSocket,int rtcpSocket)110 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
111     sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
112     msg->setInt32("rtp-socket", rtpSocket);
113     msg->setInt32("rtcp-socket", rtcpSocket);
114     msg->post();
115 }
116 
bumpSocketBufferSize(int s)117 static void bumpSocketBufferSize(int s) {
118     int size = 256 * 1024;
119     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
120 }
121 
122 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)123 void ARTPConnection::MakePortPair(
124         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
125     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
126     CHECK_GE(*rtpSocket, 0);
127 
128     bumpSocketBufferSize(*rtpSocket);
129 
130     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
131     CHECK_GE(*rtcpSocket, 0);
132 
133     bumpSocketBufferSize(*rtcpSocket);
134 
135     /* rand() * 1000 may overflow int type, use long long */
136     unsigned start = (unsigned)((rand()* 1000LL)/RAND_MAX) + 15550;
137     start &= ~1;
138 
139     for (unsigned port = start; port < 65535; port += 2) {
140         struct sockaddr_in addr;
141         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
142         addr.sin_family = AF_INET;
143         addr.sin_addr.s_addr = htonl(INADDR_ANY);
144         addr.sin_port = htons(port);
145 
146         if (bind(*rtpSocket,
147                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
148             continue;
149         }
150 
151         addr.sin_port = htons(port + 1);
152 
153         if (bind(*rtcpSocket,
154                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
155             *rtpPort = port;
156             return;
157         } else {
158             // we should recreate a RTP socket to avoid bind other port in same RTP socket
159             close(*rtpSocket);
160 
161             *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
162             CHECK_GE(*rtpSocket, 0);
163             bumpSocketBufferSize(*rtpSocket);
164         }
165     }
166 
167     TRESPASS();
168 }
169 
170 // static
MakeRTPSocketPair(int * rtpSocket,int * rtcpSocket,const char * localIp,const char * remoteIp,unsigned localPort,unsigned remotePort,int64_t socketNetwork)171 void ARTPConnection::MakeRTPSocketPair(
172         int *rtpSocket, int *rtcpSocket, const char *localIp, const char *remoteIp,
173         unsigned localPort, unsigned remotePort, int64_t socketNetwork) {
174     bool isIPv6 = false;
175     if (strchr(localIp, ':') != NULL)
176         isIPv6 = true;
177 
178     *rtpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
179     CHECK_GE(*rtpSocket, 0);
180 
181     bumpSocketBufferSize(*rtpSocket);
182 
183     *rtcpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
184     CHECK_GE(*rtcpSocket, 0);
185 
186     if (socketNetwork != 0) {
187         ALOGD("trying to bind rtp socket(%d) to network(%llu).",
188                 *rtpSocket, (unsigned long long)socketNetwork);
189 
190         int result = android_setsocknetwork((net_handle_t)socketNetwork, *rtpSocket);
191         if (result != 0) {
192             ALOGW("failed(%d) to bind rtp socket(%d) to network(%llu)",
193                     result, *rtpSocket, (unsigned long long)socketNetwork);
194         }
195         result = android_setsocknetwork((net_handle_t)socketNetwork, *rtcpSocket);
196         if (result != 0) {
197             ALOGW("failed(%d) to bind rtcp socket(%d) to network(%llu)",
198                     result, *rtcpSocket, (unsigned long long)socketNetwork);
199         }
200     }
201 
202     bumpSocketBufferSize(*rtcpSocket);
203 
204     struct sockaddr *addr;
205     struct sockaddr_in addr4;
206     struct sockaddr_in6 addr6;
207 
208     if (isIPv6) {
209         addr = (struct sockaddr *)&addr6;
210         memset(&addr6, 0, sizeof(addr6));
211         addr6.sin6_family = AF_INET6;
212         inet_pton(AF_INET6, localIp, &addr6.sin6_addr);
213         addr6.sin6_port = htons((uint16_t)localPort);
214     } else {
215         addr = (struct sockaddr *)&addr4;
216         memset(&addr4, 0, sizeof(addr4));
217         addr4.sin_family = AF_INET;
218         addr4.sin_addr.s_addr = inet_addr(localIp);
219         addr4.sin_port = htons((uint16_t)localPort);
220     }
221 
222     int sockopt = 1;
223     setsockopt(*rtpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
224     setsockopt(*rtcpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
225 
226     int sizeSockSt = isIPv6 ? sizeof(addr6) : sizeof(addr4);
227 
228     if (bind(*rtpSocket, addr, sizeSockSt) == 0) {
229         ALOGI("rtp socket successfully binded. addr=%s:%d", localIp, localPort);
230     } else {
231         ALOGE("failed to bind rtp socket addr=%s:%d err=%s", localIp, localPort, strerror(errno));
232         return;
233     }
234 
235     if (isIPv6)
236         addr6.sin6_port = htons(localPort + 1);
237     else
238         addr4.sin_port = htons(localPort + 1);
239 
240     if (bind(*rtcpSocket, addr, sizeSockSt) == 0) {
241         ALOGI("rtcp socket successfully binded. addr=%s:%d", localIp, localPort + 1);
242     } else {
243         ALOGE("failed to bind rtcp socket addr=%s:%d err=%s", localIp,
244                 localPort + 1, strerror(errno));
245     }
246 
247     // Re uses addr variable as remote addr.
248     if (isIPv6) {
249         memset(&addr6, 0, sizeof(addr6));
250         addr6.sin6_family = AF_INET6;
251         inet_pton(AF_INET6, remoteIp, &addr6.sin6_addr);
252         addr6.sin6_port = htons((uint16_t)remotePort);
253     } else {
254         memset(&addr4, 0, sizeof(addr4));
255         addr4.sin_family = AF_INET;
256         addr4.sin_addr.s_addr = inet_addr(remoteIp);
257         addr4.sin_port = htons((uint16_t)remotePort);
258     }
259     if (connect(*rtpSocket, addr, sizeSockSt) == 0) {
260         ALOGI("rtp socket successfully connected to remote=%s:%d", remoteIp, remotePort);
261     } else {
262         ALOGE("failed to connect rtp socket to remote addr=%s:%d err=%s", remoteIp,
263                 remotePort, strerror(errno));
264         return;
265     }
266 
267     if (isIPv6)
268         addr6.sin6_port = htons(remotePort + 1);
269     else
270         addr4.sin_port = htons(remotePort + 1);
271 
272     if (connect(*rtcpSocket, addr, sizeSockSt) == 0) {
273         ALOGI("rtcp socket successfully connected to remote=%s:%d", remoteIp, remotePort + 1);
274     } else {
275         ALOGE("failed to connect rtcp socket addr=%s:%d err=%s", remoteIp,
276                 remotePort + 1, strerror(errno));
277         return;
278     }
279 }
280 
onMessageReceived(const sp<AMessage> & msg)281 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
282     switch (msg->what()) {
283         case kWhatAddStream:
284         {
285             onAddStream(msg);
286             break;
287         }
288 
289         case kWhatSeekStream:
290         {
291             onSeekStream(msg);
292             break;
293         }
294 
295         case kWhatRemoveStream:
296         {
297             onRemoveStream(msg);
298             break;
299         }
300 
301         case kWhatPollStreams:
302         {
303             onPollStreams();
304             break;
305         }
306 
307         case kWhatAlarmStream:
308         {
309             onAlarmStream(msg);
310             break;
311         }
312 
313         case kWhatInjectPacket:
314         {
315             onInjectPacket(msg);
316             break;
317         }
318 
319         default:
320         {
321             TRESPASS();
322             break;
323         }
324     }
325 }
326 
onAddStream(const sp<AMessage> & msg)327 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
328     mStreams.push_back(StreamInfo());
329     StreamInfo *info = &*--mStreams.end();
330 
331     int32_t s;
332     CHECK(msg->findInt32("rtp-socket", &s));
333     info->mRTPSocket = s;
334     CHECK(msg->findInt32("rtcp-socket", &s));
335     info->mRTCPSocket = s;
336 
337     int32_t injected;
338     CHECK(msg->findInt32("injected", &injected));
339 
340     info->mIsInjected = injected;
341 
342     sp<RefBase> obj;
343     CHECK(msg->findObject("session-desc", &obj));
344     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
345 
346     CHECK(msg->findSize("index", &info->mIndex));
347     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
348 
349     info->mNumRTCPPacketsReceived = 0;
350     info->mNumRTPPacketsReceived = 0;
351     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
352     memset(&info->mRemoteRTCPAddr6, 0, sizeof(info->mRemoteRTCPAddr6));
353 
354     sp<ASessionDescription> sessionDesc = info->mSessionDesc;
355     info->mCVOExtMap = 0;
356     for (size_t i = 1; i < sessionDesc->countTracks(); ++i) {
357         int32_t cvoExtMap;
358         if (sessionDesc->getCvoExtMap(i, &cvoExtMap)) {
359             info->mCVOExtMap = cvoExtMap;
360             ALOGI("urn:3gpp:video-orientation(cvo) found as extmap:%d", info->mCVOExtMap);
361         } else {
362             ALOGI("urn:3gpp:video-orientation(cvo) not found :%d", info->mCVOExtMap);
363         }
364     }
365 
366     if (!injected) {
367         postPollEvent();
368     }
369 }
370 
onSeekStream(const sp<AMessage> & msg)371 void ARTPConnection::onSeekStream(const sp<AMessage> &msg) {
372     (void)msg; // unused param as of now.
373     List<StreamInfo>::iterator it = mStreams.begin();
374     while (it != mStreams.end()) {
375         for (size_t i = 0; i < it->mSources.size(); ++i) {
376             sp<ARTPSource> source = it->mSources.valueAt(i);
377             source->timeReset();
378         }
379         ++it;
380     }
381 }
382 
onRemoveStream(const sp<AMessage> & msg)383 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
384     int32_t rtpSocket, rtcpSocket;
385     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
386     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
387 
388     List<StreamInfo>::iterator it = mStreams.begin();
389     while (it != mStreams.end()
390            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
391         ++it;
392     }
393 
394     if (it == mStreams.end()) {
395         return;
396     }
397 
398     mStreams.erase(it);
399 }
400 
postPollEvent()401 void ARTPConnection::postPollEvent() {
402     if (mPollEventPending) {
403         return;
404     }
405 
406     sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
407     msg->post();
408 
409     mPollEventPending = true;
410 }
411 
onPollStreams()412 void ARTPConnection::onPollStreams() {
413     mPollEventPending = false;
414 
415     if (mStreams.empty()) {
416         return;
417     }
418 
419     struct timeval tv;
420     tv.tv_sec = 0;
421     tv.tv_usec = kSelectTimeoutUs;
422 
423     fd_set rs;
424     FD_ZERO(&rs);
425 
426     int maxSocket = -1;
427     for (List<StreamInfo>::iterator it = mStreams.begin();
428          it != mStreams.end(); ++it) {
429         if ((*it).mIsInjected) {
430             continue;
431         }
432 
433         FD_SET(it->mRTPSocket, &rs);
434         FD_SET(it->mRTCPSocket, &rs);
435 
436         if (it->mRTPSocket > maxSocket) {
437             maxSocket = it->mRTPSocket;
438         }
439         if (it->mRTCPSocket > maxSocket) {
440             maxSocket = it->mRTCPSocket;
441         }
442     }
443 
444     if (maxSocket == -1) {
445         return;
446     }
447 
448     int64_t nowUs = ALooper::GetNowUs();
449     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
450 
451     if (res > 0) {
452         List<StreamInfo>::iterator it = mStreams.begin();
453         while (it != mStreams.end()) {
454             if ((*it).mIsInjected) {
455                 ++it;
456                 continue;
457             }
458             it->mLastPollTimeUs = nowUs;
459 
460             status_t err = OK;
461             if (FD_ISSET(it->mRTPSocket, &rs)) {
462                 err = receive(&*it, true);
463             }
464             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
465                 err = receive(&*it, false);
466             }
467 
468             if (err == -ECONNRESET) {
469                 // socket failure, this stream is dead, Jim.
470                 for (size_t i = 0; i < it->mSources.size(); ++i) {
471                     sp<AMessage> notify = it->mNotifyMsg->dup();
472                     notify->setInt32("rtcp-event", 1);
473                     notify->setInt32("payload-type", 400);
474                     notify->setInt32("feedback-type", 1);
475                     notify->setInt32("sender", it->mSources.valueAt(i)->getSelfID());
476                     notify->post();
477 
478                     ALOGW("failed to receive RTP/RTCP datagram.");
479                 }
480                 it = mStreams.erase(it);
481                 continue;
482             }
483 
484             // add NACK and FIR that needs to be sent immediately.
485             sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
486             for (size_t i = 0; i < it->mSources.size(); ++i) {
487                 buffer->setRange(0, 0);
488                 int cnt = it->mSources.valueAt(i)->addNACK(buffer);
489                 if (cnt > 0) {
490                     ALOGV("Send NACK for lost %d Packets", cnt);
491                     send(&*it, buffer);
492                 }
493 
494                 buffer->setRange(0, 0);
495                 it->mSources.valueAt(i)->addFIR(buffer);
496                 if (buffer->size() > 0) {
497                     ALOGD("Send FIR immediately for lost Packets");
498                     send(&*it, buffer);
499                 }
500 
501                 buffer->setRange(0, 0);
502                 it->mSources.valueAt(i)->addTMMBR(buffer, mTargetBitrate);
503                 mTargetBitrate = -1;
504                 if (buffer->size() > 0) {
505                     ALOGV("Sending TMMBR...");
506                     ssize_t n = send(&*it, buffer);
507 
508                     if (n != (ssize_t)buffer->size()) {
509                         ALOGW("failed to send RTCP TMMBR (%s).",
510                                 n >= 0 ? "connection gone" : strerror(errno));
511 
512                         it = mStreams.erase(it);
513                         continue;
514                     }
515                 }
516             }
517 
518             ++it;
519         }
520     }
521 
522     checkRxBitrate(nowUs);
523 
524     if (mLastReceiverReportTimeUs <= 0
525             || mLastReceiverReportTimeUs + 5000000LL <= nowUs) {
526         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
527         List<StreamInfo>::iterator it = mStreams.begin();
528         while (it != mStreams.end()) {
529             StreamInfo *s = &*it;
530 
531             if (s->mIsInjected) {
532                 ++it;
533                 continue;
534             }
535 
536             if (s->mNumRTCPPacketsReceived == 0) {
537                 // We have never received any RTCP packets on this stream,
538                 // we don't even know where to send a report.
539                 ++it;
540                 continue;
541             }
542 
543             buffer->setRange(0, 0);
544 
545             for (size_t i = 0; i < s->mSources.size(); ++i) {
546                 sp<ARTPSource> source = s->mSources.valueAt(i);
547 
548                 source->addReceiverReport(buffer);
549 
550                 if (mFlags & kRegularlyRequestFIR) {
551                     source->addFIR(buffer);
552                 }
553             }
554 
555             if (buffer->size() > 0) {
556                 ALOGV("Sending RR...");
557 
558                 ssize_t n = send(s, buffer);
559 
560                 if (n != (ssize_t)buffer->size()) {
561                     ALOGW("failed to send RTCP receiver report (%s).",
562                             n >= 0 ? "connection gone" : strerror(errno));
563 
564                     it = mStreams.erase(it);
565                     continue;
566                 }
567 
568                 mLastReceiverReportTimeUs = nowUs;
569             }
570 
571             ++it;
572         }
573     }
574 
575     if (!mStreams.empty()) {
576         postPollEvent();
577     }
578 }
579 
onAlarmStream(const sp<AMessage> msg)580 void ARTPConnection::onAlarmStream(const sp<AMessage> msg) {
581     sp<ARTPSource> source = nullptr;
582     if (msg->findObject("source", (sp<android::RefBase>*)&source)) {
583         source->processRTPPacket();
584     }
585 }
586 
receive(StreamInfo * s,bool receiveRTP)587 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
588     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
589 
590     CHECK(!s->mIsInjected);
591 
592     sp<ABuffer> buffer = new ABuffer(65536);
593 
594     struct sockaddr *pRemoteRTCPAddr;
595     int sizeSockSt;
596     if (s->isIPv6) {
597         pRemoteRTCPAddr = (struct sockaddr *)&s->mRemoteRTCPAddr6;
598         sizeSockSt = sizeof(struct sockaddr_in6);
599     } else {
600         pRemoteRTCPAddr = (struct sockaddr *)&s->mRemoteRTCPAddr;
601         sizeSockSt = sizeof(struct sockaddr_in);
602     }
603     socklen_t remoteAddrLen =
604         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
605             ? sizeSockSt : 0;
606 
607     if (mFlags & kViLTEConnection) {
608         remoteAddrLen = 0;
609     }
610 
611     ssize_t nbytes;
612     do {
613         nbytes = recvfrom(
614             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
615             buffer->data(),
616             buffer->capacity(),
617             0,
618             remoteAddrLen > 0 ? pRemoteRTCPAddr : NULL,
619             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
620         mCumulativeBytes += nbytes;
621     } while (nbytes < 0 && errno == EINTR);
622 
623     if (nbytes <= 0) {
624         return -ECONNRESET;
625     }
626 
627     buffer->setRange(0, nbytes);
628 
629     // ALOGI("received %d bytes.", buffer->size());
630 
631     status_t err;
632     if (receiveRTP) {
633         err = parseRTP(s, buffer);
634     } else {
635         err = parseRTCP(s, buffer);
636     }
637 
638     return err;
639 }
640 
send(const StreamInfo * info,const sp<ABuffer> buffer)641 ssize_t ARTPConnection::send(const StreamInfo *info, const sp<ABuffer> buffer) {
642         struct sockaddr* pRemoteRTCPAddr;
643         int sizeSockSt;
644 
645         /* It seems this isIPv6 variable is useless.
646          * We should remove it to prevent confusion */
647         if (info->isIPv6) {
648             pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr6;
649             sizeSockSt = sizeof(struct sockaddr_in6);
650         } else {
651             pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr;
652             sizeSockSt = sizeof(struct sockaddr_in);
653         }
654 
655         if (mFlags & kViLTEConnection) {
656             ALOGV("ViLTE RTCP");
657             pRemoteRTCPAddr = NULL;
658             sizeSockSt = 0;
659         }
660 
661         ssize_t n;
662         do {
663             n = sendto(
664                     info->mRTCPSocket, buffer->data(), buffer->size(), 0,
665                     pRemoteRTCPAddr, sizeSockSt);
666         } while (n < 0 && errno == EINTR);
667 
668         return n;
669 }
670 
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)671 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
672     size_t size = buffer->size();
673 
674     if (size < 12) {
675         // Too short to be a valid RTP header.
676         return -1;
677     }
678 
679     const uint8_t *data = buffer->data();
680 
681     if ((data[0] >> 6) != 2) {
682         // Unsupported version.
683         return -1;
684     }
685 
686     if ((data[1] & 0x7f) == 20 /* decimal */) {
687         // Unassigned payload type
688         return -1;
689     }
690 
691     if (data[0] & 0x20) {
692         // Padding present.
693 
694         size_t paddingLength = data[size - 1];
695 
696         if (paddingLength + 12 > size) {
697             // If we removed this much padding we'd end up with something
698             // that's too short to be a valid RTP header.
699             return -1;
700         }
701 
702         size -= paddingLength;
703     }
704 
705     int numCSRCs = data[0] & 0x0f;
706 
707     size_t payloadOffset = 12 + 4 * numCSRCs;
708 
709     if (size < payloadOffset) {
710         // Not enough data to fit the basic header and all the CSRC entries.
711         return -1;
712     }
713 
714     int32_t cvoDegrees = -1;
715     if (data[0] & 0x10) {
716         // Header eXtension present.
717 
718         if (size < payloadOffset + 4) {
719             // Not enough data to fit the basic header, all CSRC entries
720             // and the first 4 bytes of the extension header.
721 
722             return -1;
723         }
724 
725         const uint8_t *extensionData = &data[payloadOffset];
726 
727         size_t extensionLength =
728             (4 * (extensionData[2] << 8 | extensionData[3])) + 4;
729 
730         if (size < payloadOffset + extensionLength) {
731             return -1;
732         }
733 
734         parseRTPExt(s, (const uint8_t *)extensionData, extensionLength, &cvoDegrees);
735         payloadOffset += extensionLength;
736     }
737 
738     uint32_t srcId = u32at(&data[8]);
739 
740     sp<ARTPSource> source = findSource(s, srcId);
741 
742     uint32_t rtpTime = u32at(&data[4]);
743 
744     sp<AMessage> meta = buffer->meta();
745     meta->setInt32("ssrc", srcId);
746     meta->setInt32("rtp-time", rtpTime);
747     meta->setInt32("PT", data[1] & 0x7f);
748     meta->setInt32("M", data[1] >> 7);
749     if (cvoDegrees >= 0) {
750         meta->setInt32("cvo", cvoDegrees);
751     }
752 
753     int32_t seq = u16at(&data[2]);
754     buffer->setInt32Data(seq);
755     buffer->setRange(payloadOffset, size - payloadOffset);
756 
757     if (s->mNumRTPPacketsReceived++ == 0) {
758         sp<AMessage> notify = s->mNotifyMsg->dup();
759         notify->setInt32("first-rtp", true);
760         notify->setInt32("rtcp-event", 1);
761         notify->setInt32("payload-type", ARTPSource::RTP_FIRST_PACKET);
762         notify->setInt32("rtp-time", (int32_t)rtpTime);
763         notify->setInt32("rtp-seq-num", seq);
764         notify->setInt64("recv-time-us", ALooper::GetNowUs());
765         notify->post();
766 
767         ALOGD("send first-rtp event to upper layer");
768     }
769 
770     source->processRTPPacket(buffer);
771 
772     return OK;
773 }
774 
parseRTPExt(StreamInfo * s,const uint8_t * extHeader,size_t extLen,int32_t * cvoDegrees)775 status_t ARTPConnection::parseRTPExt(StreamInfo *s,
776         const uint8_t *extHeader, size_t extLen, int32_t *cvoDegrees) {
777     if (extLen < 4)
778         return -1;
779 
780     uint16_t header = (extHeader[0] << 8) | (extHeader[1]);
781     bool isOnebyteHeader = false;
782 
783     if (header == 0xBEDE) {
784         isOnebyteHeader = true;
785     } else if (header == 0x1000) {
786         ALOGW("parseRTPExt: two-byte header is not implemented yet");
787         return -1;
788     } else {
789         ALOGW("parseRTPExt: can not recognize header");
790         return -1;
791     }
792 
793     const uint8_t *extPayload = extHeader + 4;
794     extLen -= 4;
795     size_t offset = 0; //start from first payload of rtp extension.
796     // one-byte header parser
797     while (isOnebyteHeader && offset < extLen) {
798         uint8_t extmapId = extPayload[offset] >> 4;
799         uint8_t length = (extPayload[offset] & 0xF) + 1;
800         offset++;
801 
802         // padding case
803         if (extmapId == 0)
804             continue;
805 
806         uint8_t data[16]; // maximum length value
807         for (uint8_t j = 0; offset + j <= extLen && j < length; j++) {
808             data[j] = extPayload[offset + j];
809         }
810 
811         offset += length;
812 
813         if (extmapId == s->mCVOExtMap) {
814             *cvoDegrees = (int32_t)data[0];
815             return OK;
816         }
817     }
818 
819     return BAD_VALUE;
820 }
821 
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)822 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
823     if (s->mNumRTCPPacketsReceived++ == 0) {
824         sp<AMessage> notify = s->mNotifyMsg->dup();
825         notify->setInt32("first-rtcp", true);
826         notify->setInt32("rtcp-event", 1);
827         notify->setInt32("payload-type", ARTPSource::RTCP_FIRST_PACKET);
828         notify->setInt64("recv-time-us", ALooper::GetNowUs());
829         notify->post();
830 
831         ALOGD("send first-rtcp event to upper layer");
832     }
833 
834     const uint8_t *data = buffer->data();
835     size_t size = buffer->size();
836 
837     while (size > 0) {
838         if (size < 8) {
839             // Too short to be a valid RTCP header
840             return -1;
841         }
842 
843         if ((data[0] >> 6) != 2) {
844             // Unsupported version.
845             return -1;
846         }
847 
848         if (data[0] & 0x20) {
849             // Padding present.
850 
851             size_t paddingLength = data[size - 1];
852 
853             if (paddingLength + 12 > size) {
854                 // If we removed this much padding we'd end up with something
855                 // that's too short to be a valid RTP header.
856                 return -1;
857             }
858 
859             size -= paddingLength;
860         }
861 
862         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
863 
864         if (size < headerLength) {
865             // Only received a partial packet?
866             return -1;
867         }
868 
869         switch (data[1]) {
870             case 200:
871             {
872                 parseSR(s, data, headerLength);
873                 break;
874             }
875 
876             case 201:  // RR
877             case 202:  // SDES
878             case 204:  // APP
879                 break;
880 
881             case 205:  // TSFB (transport layer specific feedback)
882                 parseTSFB(s, data, headerLength);
883                 break;
884             case 206:  // PSFB (payload specific feedback)
885                 // hexdump(data, headerLength);
886                 parsePSFB(s, data, headerLength);
887                 ALOGI("RTCP packet type %u of size %zu", (unsigned)data[1], headerLength);
888                 break;
889 
890             case 203:
891             {
892                 parseBYE(s, data, headerLength);
893                 break;
894             }
895 
896             default:
897             {
898                 ALOGW("Unknown RTCP packet type %u of size %zu",
899                      (unsigned)data[1], headerLength);
900                 break;
901             }
902         }
903 
904         data += headerLength;
905         size -= headerLength;
906     }
907 
908     return OK;
909 }
910 
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)911 status_t ARTPConnection::parseBYE(
912         StreamInfo *s, const uint8_t *data, size_t size) {
913     size_t SC = data[0] & 0x3f;
914 
915     if (SC == 0 || size < (4 + SC * 4)) {
916         // Packet too short for the minimal BYE header.
917         return -1;
918     }
919 
920     uint32_t id = u32at(&data[4]);
921 
922     sp<ARTPSource> source = findSource(s, id);
923 
924     // Report a final stastics to be used for rtp data usage.
925     int64_t nowUs = ALooper::GetNowUs();
926     int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
927     int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
928     source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
929 
930     source->byeReceived();
931 
932     return OK;
933 }
934 
parseSR(StreamInfo * s,const uint8_t * data,size_t size)935 status_t ARTPConnection::parseSR(
936         StreamInfo *s, const uint8_t *data, size_t size) {
937     size_t RC = data[0] & 0x1f;
938 
939     if (size < (7 + RC * 6) * 4) {
940         // Packet too short for the minimal SR header.
941         return -1;
942     }
943 
944     uint32_t id = u32at(&data[4]);
945     uint64_t ntpTime = u64at(&data[8]);
946     uint32_t rtpTime = u32at(&data[16]);
947 
948 #if 0
949     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
950          id,
951          rtpTime,
952          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
953 #endif
954 
955     sp<ARTPSource> source = findSource(s, id);
956 
957     source->timeUpdate(rtpTime, ntpTime);
958 
959     return 0;
960 }
961 
parseTSFB(StreamInfo * s,const uint8_t * data,size_t size)962 status_t ARTPConnection::parseTSFB(
963         StreamInfo *s, const uint8_t *data, size_t size) {
964     if (size < 12) {
965         // broken packet
966         return -1;
967     }
968 
969     uint8_t msgType = data[0] & 0x1f;
970     uint32_t id = u32at(&data[4]);
971 
972     const uint8_t *ptr = &data[12];
973     size -= 12;
974 
975     using namespace std;
976     size_t FCISize;
977     switch(msgType) {
978         case 1:     // Generic NACK
979         {
980             FCISize = 4;
981             while (size >= FCISize) {
982                 uint16_t PID = u16at(&ptr[0]);  // lost packet RTP number
983                 uint16_t BLP = u16at(&ptr[2]);  // Bitmask of following Lost Packets
984 
985                 size -= FCISize;
986                 ptr += FCISize;
987 
988                 AString list_of_losts;
989                 list_of_losts.append(PID);
990                 for (int i=0 ; i<16 ; i++) {
991                     bool is_lost = BLP & (0x1 << i);
992                     if (is_lost) {
993                         list_of_losts.append(", ");
994                         list_of_losts.append(PID + i);
995                     }
996                 }
997                 ALOGI("Opponent losts packet of RTP %s", list_of_losts.c_str());
998             }
999             break;
1000         }
1001         case 3:     // TMMBR
1002         case 4:     // TMMBN
1003         {
1004             FCISize = 8;
1005             while (size >= FCISize) {
1006                 uint32_t MxTBR = u32at(&ptr[4]);
1007                 uint32_t MxTBRExp = MxTBR >> 26;
1008                 uint32_t MxTBRMantissa = (MxTBR >> 9) & 0x01FFFF;
1009                 uint32_t overhead = MxTBR & 0x01FF;
1010 
1011                 size -= FCISize;
1012                 ptr += FCISize;
1013 
1014                 uint32_t bitRate = (1 << MxTBRExp) * MxTBRMantissa;
1015 
1016                 if (msgType == 3)
1017                     ALOGI("Op -> UE Req Tx bitrate : %d X 2^%d = %d",
1018                         MxTBRMantissa, MxTBRExp, bitRate);
1019                 else if (msgType == 4)
1020                     ALOGI("OP -> UE Noti Rx bitrate : %d X 2^%d = %d",
1021                         MxTBRMantissa, MxTBRExp, bitRate);
1022 
1023                 sp<AMessage> notify = s->mNotifyMsg->dup();
1024                 notify->setInt32("rtcp-event", 1);
1025                 notify->setInt32("payload-type", 205);
1026                 notify->setInt32("feedback-type", msgType);
1027                 notify->setInt32("sender", id);
1028                 notify->setInt32("bit-rate", bitRate);
1029                 notify->post();
1030                 ALOGI("overhead : %d", overhead);
1031             }
1032             break;
1033         }
1034         default:
1035         {
1036             ALOGI("Not supported TSFB type %d", msgType);
1037             break;
1038         }
1039     }
1040 
1041     return 0;
1042 }
1043 
parsePSFB(StreamInfo * s,const uint8_t * data,size_t size)1044 status_t ARTPConnection::parsePSFB(
1045         StreamInfo *s, const uint8_t *data, size_t size) {
1046     if (size < 12) {
1047         // broken packet
1048         return -1;
1049     }
1050 
1051     uint8_t msgType = data[0] & 0x1f;
1052     uint32_t id = u32at(&data[4]);
1053 
1054     const uint8_t *ptr = &data[12];
1055     size -= 12;
1056 
1057     using namespace std;
1058     switch(msgType) {
1059         case 1:     // Picture Loss Indication (PLI)
1060         {
1061             if (size > 0) {
1062                 // PLI does not need parameters
1063                 break;
1064             };
1065             sp<AMessage> notify = s->mNotifyMsg->dup();
1066             notify->setInt32("rtcp-event", 1);
1067             notify->setInt32("payload-type", 206);
1068             notify->setInt32("feedback-type", msgType);
1069             notify->setInt32("sender", id);
1070             notify->post();
1071             ALOGI("PLI detected.");
1072             break;
1073         }
1074         case 4:     // Full Intra Request (FIR)
1075         {
1076             if (size < 4) {
1077                 break;
1078             }
1079             uint32_t requestedId = u32at(&ptr[0]);
1080             if (requestedId == (uint32_t)mSelfID) {
1081                 sp<AMessage> notify = s->mNotifyMsg->dup();
1082                 notify->setInt32("rtcp-event", 1);
1083                 notify->setInt32("payload-type", 206);
1084                 notify->setInt32("feedback-type", msgType);
1085                 notify->setInt32("sender", id);
1086                 notify->post();
1087                 ALOGI("FIR detected.");
1088             }
1089             break;
1090         }
1091         default:
1092         {
1093             ALOGI("Not supported PSFB type %d", msgType);
1094             break;
1095         }
1096     }
1097 
1098     return 0;
1099 }
findSource(StreamInfo * info,uint32_t srcId)1100 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
1101     sp<ARTPSource> source;
1102     ssize_t index = info->mSources.indexOfKey(srcId);
1103     if (index < 0) {
1104         index = info->mSources.size();
1105 
1106         source = new ARTPSource(
1107                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
1108 
1109         if (mFlags & kViLTEConnection) {
1110             setStaticJitterTimeMs(50);
1111             source->setPeriodicFIR(false);
1112         }
1113 
1114         source->setSelfID(mSelfID);
1115         source->setStaticJitterTimeMs(mStaticJitterTimeMs);
1116         sp<AMessage> timer = new AMessage(kWhatAlarmStream, this);
1117         source->setJbTimer(timer);
1118         info->mSources.add(srcId, source);
1119     } else {
1120         source = info->mSources.valueAt(index);
1121     }
1122 
1123     return source;
1124 }
1125 
injectPacket(int index,const sp<ABuffer> & buffer)1126 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
1127     sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
1128     msg->setInt32("index", index);
1129     msg->setBuffer("buffer", buffer);
1130     msg->post();
1131 }
1132 
setSelfID(const uint32_t selfID)1133 void ARTPConnection::setSelfID(const uint32_t selfID) {
1134     mSelfID = selfID;
1135 }
1136 
setStaticJitterTimeMs(const uint32_t jbTimeMs)1137 void ARTPConnection::setStaticJitterTimeMs(const uint32_t jbTimeMs) {
1138     mStaticJitterTimeMs = jbTimeMs;
1139 }
1140 
setTargetBitrate(int32_t targetBitrate)1141 void ARTPConnection::setTargetBitrate(int32_t targetBitrate) {
1142     mTargetBitrate = targetBitrate;
1143 }
1144 
checkRxBitrate(int64_t nowUs)1145 void ARTPConnection::checkRxBitrate(int64_t nowUs) {
1146     if (mLastBitrateReportTimeUs <= 0) {
1147         mCumulativeBytes = 0;
1148         mLastBitrateReportTimeUs = nowUs;
1149     }
1150     else if (mLastEarlyNotifyTimeUs + 100000ll <= nowUs) {
1151         int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1152         int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1153         mLastEarlyNotifyTimeUs = nowUs;
1154 
1155         List<StreamInfo>::iterator it = mStreams.begin();
1156         while (it != mStreams.end()) {
1157             StreamInfo *s = &*it;
1158             if (s->mIsInjected) {
1159                 ++it;
1160                 continue;
1161             }
1162             for (size_t i = 0; i < s->mSources.size(); ++i) {
1163                 sp<ARTPSource> source = s->mSources.valueAt(i);
1164                 if (source->isNeedToEarlyNotify()) {
1165                     source->notifyPktInfo(bitrate, nowUs, false /* isRegular */);
1166                     mLastEarlyNotifyTimeUs = nowUs + (1000000ll * 3600 * 24); // after 1 day
1167                 }
1168             }
1169             ++it;
1170         }
1171     }
1172     else if (mLastBitrateReportTimeUs + 1000000ll <= nowUs) {
1173         int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1174         int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1175         ALOGI("Actual Rx bitrate : %d bits/sec", bitrate);
1176 
1177         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
1178         List<StreamInfo>::iterator it = mStreams.begin();
1179         while (it != mStreams.end()) {
1180             StreamInfo *s = &*it;
1181             if (s->mIsInjected) {
1182                 ++it;
1183                 continue;
1184             }
1185 
1186             if (s->mNumRTCPPacketsReceived == 0) {
1187                 // We have never received any RTCP packets on this stream,
1188                 // we don't even know where to send a report.
1189                 ++it;
1190                 continue;
1191             }
1192 
1193             buffer->setRange(0, 0);
1194             for (size_t i = 0; i < s->mSources.size(); ++i) {
1195                 sp<ARTPSource> source = s->mSources.valueAt(i);
1196                 source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
1197             }
1198             ++it;
1199         }
1200         mCumulativeBytes = 0;
1201         mLastBitrateReportTimeUs = nowUs;
1202         mLastEarlyNotifyTimeUs = nowUs;
1203     }
1204 }
onInjectPacket(const sp<AMessage> & msg)1205 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
1206     int32_t index;
1207     CHECK(msg->findInt32("index", &index));
1208 
1209     sp<ABuffer> buffer;
1210     CHECK(msg->findBuffer("buffer", &buffer));
1211 
1212     List<StreamInfo>::iterator it = mStreams.begin();
1213     while (it != mStreams.end()
1214            && it->mRTPSocket != index && it->mRTCPSocket != index) {
1215         ++it;
1216     }
1217 
1218     if (it == mStreams.end()) {
1219         TRESPASS();
1220     }
1221 
1222     StreamInfo *s = &*it;
1223 
1224     if (it->mRTPSocket == index) {
1225         parseRTP(s, buffer);
1226     } else {
1227         parseRTCP(s, buffer);
1228     }
1229 }
1230 
1231 }  // namespace android
1232