• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2013, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "RTPSender"
19 #include <utils/Log.h>
20 
21 #include "RTPSender.h"
22 
23 #include <media/stagefright/foundation/ABuffer.h>
24 #include <media/stagefright/foundation/ADebug.h>
25 #include <media/stagefright/foundation/AMessage.h>
26 #include <media/stagefright/foundation/ANetworkSession.h>
27 #include <media/stagefright/foundation/hexdump.h>
28 #include <media/stagefright/MediaErrors.h>
29 #include <media/stagefright/Utils.h>
30 
31 #include "include/avc_utils.h"
32 
33 namespace android {
34 
RTPSender(const sp<ANetworkSession> & netSession,const sp<AMessage> & notify)35 RTPSender::RTPSender(
36         const sp<ANetworkSession> &netSession,
37         const sp<AMessage> &notify)
38     : mNetSession(netSession),
39       mNotify(notify),
40       mRTPMode(TRANSPORT_UNDEFINED),
41       mRTCPMode(TRANSPORT_UNDEFINED),
42       mRTPSessionID(0),
43       mRTCPSessionID(0),
44       mRTPConnected(false),
45       mRTCPConnected(false),
46       mLastNTPTime(0),
47       mLastRTPTime(0),
48       mNumRTPSent(0),
49       mNumRTPOctetsSent(0),
50       mNumSRsSent(0),
51       mRTPSeqNo(0),
52       mHistorySize(0) {
53 }
54 
~RTPSender()55 RTPSender::~RTPSender() {
56     if (mRTCPSessionID != 0) {
57         mNetSession->destroySession(mRTCPSessionID);
58         mRTCPSessionID = 0;
59     }
60 
61     if (mRTPSessionID != 0) {
62         mNetSession->destroySession(mRTPSessionID);
63         mRTPSessionID = 0;
64     }
65 }
66 
67 // static
PickRandomRTPPort()68 int32_t RTPBase::PickRandomRTPPort() {
69     // Pick an even integer in range [1024, 65534)
70 
71     static const size_t kRange = (65534 - 1024) / 2;
72 
73     return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024;
74 }
75 
initAsync(const char * remoteHost,int32_t remoteRTPPort,TransportMode rtpMode,int32_t remoteRTCPPort,TransportMode rtcpMode,int32_t * outLocalRTPPort)76 status_t RTPSender::initAsync(
77         const char *remoteHost,
78         int32_t remoteRTPPort,
79         TransportMode rtpMode,
80         int32_t remoteRTCPPort,
81         TransportMode rtcpMode,
82         int32_t *outLocalRTPPort) {
83     if (mRTPMode != TRANSPORT_UNDEFINED
84             || rtpMode == TRANSPORT_UNDEFINED
85             || rtpMode == TRANSPORT_NONE
86             || rtcpMode == TRANSPORT_UNDEFINED) {
87         return INVALID_OPERATION;
88     }
89 
90     CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED);
91     CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED);
92 
93     if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0)
94             || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) {
95         return INVALID_OPERATION;
96     }
97 
98     sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, this);
99 
100     sp<AMessage> rtcpNotify;
101     if (remoteRTCPPort >= 0) {
102         rtcpNotify = new AMessage(kWhatRTCPNotify, this);
103     }
104 
105     CHECK_EQ(mRTPSessionID, 0);
106     CHECK_EQ(mRTCPSessionID, 0);
107 
108     int32_t localRTPPort;
109 
110     for (;;) {
111         localRTPPort = PickRandomRTPPort();
112 
113         status_t err;
114         if (rtpMode == TRANSPORT_UDP) {
115             err = mNetSession->createUDPSession(
116                     localRTPPort,
117                     remoteHost,
118                     remoteRTPPort,
119                     rtpNotify,
120                     &mRTPSessionID);
121         } else {
122             CHECK_EQ(rtpMode, TRANSPORT_TCP);
123             err = mNetSession->createTCPDatagramSession(
124                     localRTPPort,
125                     remoteHost,
126                     remoteRTPPort,
127                     rtpNotify,
128                     &mRTPSessionID);
129         }
130 
131         if (err != OK) {
132             continue;
133         }
134 
135         if (remoteRTCPPort < 0) {
136             break;
137         }
138 
139         if (rtcpMode == TRANSPORT_UDP) {
140             err = mNetSession->createUDPSession(
141                     localRTPPort + 1,
142                     remoteHost,
143                     remoteRTCPPort,
144                     rtcpNotify,
145                     &mRTCPSessionID);
146         } else {
147             CHECK_EQ(rtcpMode, TRANSPORT_TCP);
148             err = mNetSession->createTCPDatagramSession(
149                     localRTPPort + 1,
150                     remoteHost,
151                     remoteRTCPPort,
152                     rtcpNotify,
153                     &mRTCPSessionID);
154         }
155 
156         if (err == OK) {
157             break;
158         }
159 
160         mNetSession->destroySession(mRTPSessionID);
161         mRTPSessionID = 0;
162     }
163 
164     if (rtpMode == TRANSPORT_UDP) {
165         mRTPConnected = true;
166     }
167 
168     if (rtcpMode == TRANSPORT_UDP) {
169         mRTCPConnected = true;
170     }
171 
172     mRTPMode = rtpMode;
173     mRTCPMode = rtcpMode;
174     *outLocalRTPPort = localRTPPort;
175 
176     if (mRTPMode == TRANSPORT_UDP
177             && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) {
178         notifyInitDone(OK);
179     }
180 
181     return OK;
182 }
183 
queueBuffer(const sp<ABuffer> & buffer,uint8_t packetType,PacketizationMode mode)184 status_t RTPSender::queueBuffer(
185         const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) {
186     status_t err;
187 
188     switch (mode) {
189         case PACKETIZATION_NONE:
190             err = queueRawPacket(buffer, packetType);
191             break;
192 
193         case PACKETIZATION_TRANSPORT_STREAM:
194             err = queueTSPackets(buffer, packetType);
195             break;
196 
197         case PACKETIZATION_H264:
198             err  = queueAVCBuffer(buffer, packetType);
199             break;
200 
201         default:
202             TRESPASS();
203     }
204 
205     return err;
206 }
207 
queueRawPacket(const sp<ABuffer> & packet,uint8_t packetType)208 status_t RTPSender::queueRawPacket(
209         const sp<ABuffer> &packet, uint8_t packetType) {
210     CHECK_LE(packet->size(), kMaxUDPPacketSize - 12);
211 
212     int64_t timeUs;
213     CHECK(packet->meta()->findInt64("timeUs", &timeUs));
214 
215     sp<ABuffer> udpPacket = new ABuffer(12 + packet->size());
216 
217     udpPacket->setInt32Data(mRTPSeqNo);
218 
219     uint8_t *rtp = udpPacket->data();
220     rtp[0] = 0x80;
221     rtp[1] = packetType;
222 
223     rtp[2] = (mRTPSeqNo >> 8) & 0xff;
224     rtp[3] = mRTPSeqNo & 0xff;
225     ++mRTPSeqNo;
226 
227     uint32_t rtpTime = (timeUs * 9) / 100ll;
228 
229     rtp[4] = rtpTime >> 24;
230     rtp[5] = (rtpTime >> 16) & 0xff;
231     rtp[6] = (rtpTime >> 8) & 0xff;
232     rtp[7] = rtpTime & 0xff;
233 
234     rtp[8] = kSourceID >> 24;
235     rtp[9] = (kSourceID >> 16) & 0xff;
236     rtp[10] = (kSourceID >> 8) & 0xff;
237     rtp[11] = kSourceID & 0xff;
238 
239     memcpy(&rtp[12], packet->data(), packet->size());
240 
241     return sendRTPPacket(
242             udpPacket,
243             true /* storeInHistory */,
244             true /* timeValid */,
245             ALooper::GetNowUs());
246 }
247 
queueTSPackets(const sp<ABuffer> & tsPackets,uint8_t packetType)248 status_t RTPSender::queueTSPackets(
249         const sp<ABuffer> &tsPackets, uint8_t packetType) {
250     CHECK_EQ(0, tsPackets->size() % 188);
251 
252     int64_t timeUs;
253     CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));
254 
255     size_t srcOffset = 0;
256     while (srcOffset < tsPackets->size()) {
257         sp<ABuffer> udpPacket =
258             new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
259 
260         udpPacket->setInt32Data(mRTPSeqNo);
261 
262         uint8_t *rtp = udpPacket->data();
263         rtp[0] = 0x80;
264         rtp[1] = packetType;
265 
266         rtp[2] = (mRTPSeqNo >> 8) & 0xff;
267         rtp[3] = mRTPSeqNo & 0xff;
268         ++mRTPSeqNo;
269 
270         int64_t nowUs = ALooper::GetNowUs();
271         uint32_t rtpTime = (nowUs * 9) / 100ll;
272 
273         rtp[4] = rtpTime >> 24;
274         rtp[5] = (rtpTime >> 16) & 0xff;
275         rtp[6] = (rtpTime >> 8) & 0xff;
276         rtp[7] = rtpTime & 0xff;
277 
278         rtp[8] = kSourceID >> 24;
279         rtp[9] = (kSourceID >> 16) & 0xff;
280         rtp[10] = (kSourceID >> 8) & 0xff;
281         rtp[11] = kSourceID & 0xff;
282 
283         size_t numTSPackets = (tsPackets->size() - srcOffset) / 188;
284         if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) {
285             numTSPackets = kMaxNumTSPacketsPerRTPPacket;
286         }
287 
288         memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
289 
290         udpPacket->setRange(0, 12 + numTSPackets * 188);
291 
292         srcOffset += numTSPackets * 188;
293         bool isLastPacket = (srcOffset == tsPackets->size());
294 
295         status_t err = sendRTPPacket(
296                 udpPacket,
297                 true /* storeInHistory */,
298                 isLastPacket /* timeValid */,
299                 timeUs);
300 
301         if (err != OK) {
302             return err;
303         }
304     }
305 
306     return OK;
307 }
308 
queueAVCBuffer(const sp<ABuffer> & accessUnit,uint8_t packetType)309 status_t RTPSender::queueAVCBuffer(
310         const sp<ABuffer> &accessUnit, uint8_t packetType) {
311     int64_t timeUs;
312     CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
313 
314     uint32_t rtpTime = (timeUs * 9 / 100ll);
315 
316     List<sp<ABuffer> > packets;
317 
318     sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize);
319     size_t outBytesUsed = 12;  // Placeholder for RTP header.
320 
321     const uint8_t *data = accessUnit->data();
322     size_t size = accessUnit->size();
323     const uint8_t *nalStart;
324     size_t nalSize;
325     while (getNextNALUnit(
326                 &data, &size, &nalStart, &nalSize,
327                 true /* startCodeFollows */) == OK) {
328         size_t bytesNeeded = nalSize + 2;
329         if (outBytesUsed == 12) {
330             ++bytesNeeded;
331         }
332 
333         if (outBytesUsed + bytesNeeded > out->capacity()) {
334             bool emitSingleNALPacket = false;
335 
336             if (outBytesUsed == 12
337                     && outBytesUsed + nalSize <= out->capacity()) {
338                 // We haven't emitted anything into the current packet yet and
339                 // this NAL unit fits into a single-NAL-unit-packet while
340                 // it wouldn't have fit as part of a STAP-A packet.
341 
342                 memcpy(out->data() + outBytesUsed, nalStart, nalSize);
343                 outBytesUsed += nalSize;
344 
345                 emitSingleNALPacket = true;
346             }
347 
348             if (outBytesUsed > 12) {
349                 out->setRange(0, outBytesUsed);
350                 packets.push_back(out);
351                 out = new ABuffer(kMaxUDPPacketSize);
352                 outBytesUsed = 12;  // Placeholder for RTP header
353             }
354 
355             if (emitSingleNALPacket) {
356                 continue;
357             }
358         }
359 
360         if (outBytesUsed + bytesNeeded <= out->capacity()) {
361             uint8_t *dst = out->data() + outBytesUsed;
362 
363             if (outBytesUsed == 12) {
364                 *dst++ = 24;  // STAP-A header
365             }
366 
367             *dst++ = (nalSize >> 8) & 0xff;
368             *dst++ = nalSize & 0xff;
369             memcpy(dst, nalStart, nalSize);
370 
371             outBytesUsed += bytesNeeded;
372             continue;
373         }
374 
375         // This single NAL unit does not fit into a single RTP packet,
376         // we need to emit an FU-A.
377 
378         CHECK_EQ(outBytesUsed, 12u);
379 
380         uint8_t nalType = nalStart[0] & 0x1f;
381         uint8_t nri = (nalStart[0] >> 5) & 3;
382 
383         size_t srcOffset = 1;
384         while (srcOffset < nalSize) {
385             size_t copy = out->capacity() - outBytesUsed - 2;
386             if (copy > nalSize - srcOffset) {
387                 copy = nalSize - srcOffset;
388             }
389 
390             uint8_t *dst = out->data() + outBytesUsed;
391             dst[0] = (nri << 5) | 28;
392 
393             dst[1] = nalType;
394 
395             if (srcOffset == 1) {
396                 dst[1] |= 0x80;
397             }
398 
399             if (srcOffset + copy == nalSize) {
400                 dst[1] |= 0x40;
401             }
402 
403             memcpy(&dst[2], nalStart + srcOffset, copy);
404             srcOffset += copy;
405 
406             out->setRange(0, outBytesUsed + copy + 2);
407 
408             packets.push_back(out);
409             out = new ABuffer(kMaxUDPPacketSize);
410             outBytesUsed = 12;  // Placeholder for RTP header
411         }
412     }
413 
414     if (outBytesUsed > 12) {
415         out->setRange(0, outBytesUsed);
416         packets.push_back(out);
417     }
418 
419     while (!packets.empty()) {
420         sp<ABuffer> out = *packets.begin();
421         packets.erase(packets.begin());
422 
423         out->setInt32Data(mRTPSeqNo);
424 
425         bool last = packets.empty();
426 
427         uint8_t *dst = out->data();
428 
429         dst[0] = 0x80;
430 
431         dst[1] = packetType;
432         if (last) {
433             dst[1] |= 1 << 7;  // M-bit
434         }
435 
436         dst[2] = (mRTPSeqNo >> 8) & 0xff;
437         dst[3] = mRTPSeqNo & 0xff;
438         ++mRTPSeqNo;
439 
440         dst[4] = rtpTime >> 24;
441         dst[5] = (rtpTime >> 16) & 0xff;
442         dst[6] = (rtpTime >> 8) & 0xff;
443         dst[7] = rtpTime & 0xff;
444         dst[8] = kSourceID >> 24;
445         dst[9] = (kSourceID >> 16) & 0xff;
446         dst[10] = (kSourceID >> 8) & 0xff;
447         dst[11] = kSourceID & 0xff;
448 
449         status_t err = sendRTPPacket(out, true /* storeInHistory */);
450 
451         if (err != OK) {
452             return err;
453         }
454     }
455 
456     return OK;
457 }
458 
sendRTPPacket(const sp<ABuffer> & buffer,bool storeInHistory,bool timeValid,int64_t timeUs)459 status_t RTPSender::sendRTPPacket(
460         const sp<ABuffer> &buffer, bool storeInHistory,
461         bool timeValid, int64_t timeUs) {
462     CHECK(mRTPConnected);
463 
464     status_t err = mNetSession->sendRequest(
465             mRTPSessionID, buffer->data(), buffer->size(),
466             timeValid, timeUs);
467 
468     if (err != OK) {
469         return err;
470     }
471 
472     mLastNTPTime = GetNowNTP();
473     mLastRTPTime = U32_AT(buffer->data() + 4);
474 
475     ++mNumRTPSent;
476     mNumRTPOctetsSent += buffer->size() - 12;
477 
478     if (storeInHistory) {
479         if (mHistorySize == kMaxHistorySize) {
480             mHistory.erase(mHistory.begin());
481         } else {
482             ++mHistorySize;
483         }
484         mHistory.push_back(buffer);
485     }
486 
487     return OK;
488 }
489 
490 // static
GetNowNTP()491 uint64_t RTPSender::GetNowNTP() {
492     struct timeval tv;
493     gettimeofday(&tv, NULL /* timezone */);
494 
495     uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec;
496 
497     nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
498 
499     uint64_t hi = nowUs / 1000000ll;
500     uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
501 
502     return (hi << 32) | lo;
503 }
504 
onMessageReceived(const sp<AMessage> & msg)505 void RTPSender::onMessageReceived(const sp<AMessage> &msg) {
506     switch (msg->what()) {
507         case kWhatRTPNotify:
508         case kWhatRTCPNotify:
509             onNetNotify(msg->what() == kWhatRTPNotify, msg);
510             break;
511 
512         default:
513             TRESPASS();
514     }
515 }
516 
onNetNotify(bool isRTP,const sp<AMessage> & msg)517 void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
518     int32_t reason;
519     CHECK(msg->findInt32("reason", &reason));
520 
521     switch (reason) {
522         case ANetworkSession::kWhatError:
523         {
524             int32_t sessionID;
525             CHECK(msg->findInt32("sessionID", &sessionID));
526 
527             int32_t err;
528             CHECK(msg->findInt32("err", &err));
529 
530             int32_t errorOccuredDuringSend;
531             CHECK(msg->findInt32("send", &errorOccuredDuringSend));
532 
533             AString detail;
534             CHECK(msg->findString("detail", &detail));
535 
536             ALOGE("An error occurred during %s in session %d "
537                   "(%d, '%s' (%s)).",
538                   errorOccuredDuringSend ? "send" : "receive",
539                   sessionID,
540                   err,
541                   detail.c_str(),
542                   strerror(-err));
543 
544             mNetSession->destroySession(sessionID);
545 
546             if (sessionID == mRTPSessionID) {
547                 mRTPSessionID = 0;
548             } else if (sessionID == mRTCPSessionID) {
549                 mRTCPSessionID = 0;
550             }
551 
552             if (!mRTPConnected
553                     || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) {
554                 // We haven't completed initialization, attach the error
555                 // to the notification instead.
556                 notifyInitDone(err);
557                 break;
558             }
559 
560             notifyError(err);
561             break;
562         }
563 
564         case ANetworkSession::kWhatDatagram:
565         {
566             sp<ABuffer> data;
567             CHECK(msg->findBuffer("data", &data));
568 
569             if (isRTP) {
570                 ALOGW("Huh? Received data on RTP connection...");
571             } else {
572                 onRTCPData(data);
573             }
574             break;
575         }
576 
577         case ANetworkSession::kWhatConnected:
578         {
579             int32_t sessionID;
580             CHECK(msg->findInt32("sessionID", &sessionID));
581 
582             if  (isRTP) {
583                 CHECK_EQ(mRTPMode, TRANSPORT_TCP);
584                 CHECK_EQ(sessionID, mRTPSessionID);
585                 mRTPConnected = true;
586             } else {
587                 CHECK_EQ(mRTCPMode, TRANSPORT_TCP);
588                 CHECK_EQ(sessionID, mRTCPSessionID);
589                 mRTCPConnected = true;
590             }
591 
592             if (mRTPConnected
593                     && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) {
594                 notifyInitDone(OK);
595             }
596             break;
597         }
598 
599         case ANetworkSession::kWhatNetworkStall:
600         {
601             size_t numBytesQueued;
602             CHECK(msg->findSize("numBytesQueued", &numBytesQueued));
603 
604             notifyNetworkStall(numBytesQueued);
605             break;
606         }
607 
608         default:
609             TRESPASS();
610     }
611 }
612 
onRTCPData(const sp<ABuffer> & buffer)613 status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) {
614     const uint8_t *data = buffer->data();
615     size_t size = buffer->size();
616 
617     while (size > 0) {
618         if (size < 8) {
619             // Too short to be a valid RTCP header
620             return ERROR_MALFORMED;
621         }
622 
623         if ((data[0] >> 6) != 2) {
624             // Unsupported version.
625             return ERROR_UNSUPPORTED;
626         }
627 
628         if (data[0] & 0x20) {
629             // Padding present.
630 
631             size_t paddingLength = data[size - 1];
632 
633             if (paddingLength + 12 > size) {
634                 // If we removed this much padding we'd end up with something
635                 // that's too short to be a valid RTP header.
636                 return ERROR_MALFORMED;
637             }
638 
639             size -= paddingLength;
640         }
641 
642         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
643 
644         if (size < headerLength) {
645             // Only received a partial packet?
646             return ERROR_MALFORMED;
647         }
648 
649         switch (data[1]) {
650             case 200:
651             case 201:  // RR
652                 parseReceiverReport(data, headerLength);
653                 break;
654 
655             case 202:  // SDES
656             case 203:
657                 break;
658 
659             case 204:  // APP
660                 parseAPP(data, headerLength);
661                 break;
662 
663             case 205:  // TSFB (transport layer specific feedback)
664                 parseTSFB(data, headerLength);
665                 break;
666 
667             case 206:  // PSFB (payload specific feedback)
668                 // hexdump(data, headerLength);
669                 break;
670 
671             default:
672             {
673                 ALOGW("Unknown RTCP packet type %u of size %zu",
674                         (unsigned)data[1], headerLength);
675                 break;
676             }
677         }
678 
679         data += headerLength;
680         size -= headerLength;
681     }
682 
683     return OK;
684 }
685 
parseReceiverReport(const uint8_t * data,size_t)686 status_t RTPSender::parseReceiverReport(
687         const uint8_t *data, size_t /* size */) {
688     float fractionLost = data[12] / 256.0f;
689 
690     ALOGI("lost %.2f %% of packets during report interval.",
691           100.0f * fractionLost);
692 
693     return OK;
694 }
695 
parseTSFB(const uint8_t * data,size_t size)696 status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) {
697     if ((data[0] & 0x1f) != 1) {
698         return ERROR_UNSUPPORTED;  // We only support NACK for now.
699     }
700 
701     uint32_t srcId = U32_AT(&data[8]);
702     if (srcId != kSourceID) {
703         return ERROR_MALFORMED;
704     }
705 
706     for (size_t i = 12; i < size; i += 4) {
707         uint16_t seqNo = U16_AT(&data[i]);
708         uint16_t blp = U16_AT(&data[i + 2]);
709 
710         List<sp<ABuffer> >::iterator it = mHistory.begin();
711         bool foundSeqNo = false;
712         while (it != mHistory.end()) {
713             const sp<ABuffer> &buffer = *it;
714 
715             uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
716 
717             bool retransmit = false;
718             if (bufferSeqNo == seqNo) {
719                 retransmit = true;
720             } else if (blp != 0) {
721                 for (size_t i = 0; i < 16; ++i) {
722                     if ((blp & (1 << i))
723                         && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
724                         blp &= ~(1 << i);
725                         retransmit = true;
726                     }
727                 }
728             }
729 
730             if (retransmit) {
731                 ALOGV("retransmitting seqNo %d", bufferSeqNo);
732 
733                 CHECK_EQ((status_t)OK,
734                          sendRTPPacket(buffer, false /* storeInHistory */));
735 
736                 if (bufferSeqNo == seqNo) {
737                     foundSeqNo = true;
738                 }
739 
740                 if (foundSeqNo && blp == 0) {
741                     break;
742                 }
743             }
744 
745             ++it;
746         }
747 
748         if (!foundSeqNo || blp != 0) {
749             ALOGI("Some sequence numbers were no longer available for "
750                   "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)",
751                   seqNo, foundSeqNo, blp);
752 
753             if (!mHistory.empty()) {
754                 int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff;
755                 int32_t latest = (*--mHistory.end())->int32Data() & 0xffff;
756 
757                 ALOGI("have seq numbers from %d - %d", earliest, latest);
758             }
759         }
760     }
761 
762     return OK;
763 }
764 
parseAPP(const uint8_t * data,size_t size)765 status_t RTPSender::parseAPP(const uint8_t *data, size_t size) {
766     static const size_t late_offset = 8;
767     static const char late_string[] = "late";
768     static const size_t avgLatencyUs_offset = late_offset + sizeof(late_string) - 1;
769     static const size_t maxLatencyUs_offset = avgLatencyUs_offset + sizeof(int64_t);
770 
771     if ((size >= (maxLatencyUs_offset + sizeof(int64_t)))
772             && !memcmp(late_string, &data[late_offset], sizeof(late_string) - 1)) {
773         int64_t avgLatencyUs = (int64_t)U64_AT(&data[avgLatencyUs_offset]);
774         int64_t maxLatencyUs = (int64_t)U64_AT(&data[maxLatencyUs_offset]);
775 
776         sp<AMessage> notify = mNotify->dup();
777         notify->setInt32("what", kWhatInformSender);
778         notify->setInt64("avgLatencyUs", avgLatencyUs);
779         notify->setInt64("maxLatencyUs", maxLatencyUs);
780         notify->post();
781     }
782 
783     return OK;
784 }
785 
notifyInitDone(status_t err)786 void RTPSender::notifyInitDone(status_t err) {
787     sp<AMessage> notify = mNotify->dup();
788     notify->setInt32("what", kWhatInitDone);
789     notify->setInt32("err", err);
790     notify->post();
791 }
792 
notifyError(status_t err)793 void RTPSender::notifyError(status_t err) {
794     sp<AMessage> notify = mNotify->dup();
795     notify->setInt32("what", kWhatError);
796     notify->setInt32("err", err);
797     notify->post();
798 }
799 
notifyNetworkStall(size_t numBytesQueued)800 void RTPSender::notifyNetworkStall(size_t numBytesQueued) {
801     sp<AMessage> notify = mNotify->dup();
802     notify->setInt32("what", kWhatNetworkStall);
803     notify->setSize("numBytesQueued", numBytesQueued);
804     notify->post();
805 }
806 
807 }  // namespace android
808 
809