1 /*
2 * Copyright 2013, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "RTPSender"
19 #include <utils/Log.h>
20
21 #include "RTPSender.h"
22
23 #include <media/stagefright/foundation/ABuffer.h>
24 #include <media/stagefright/foundation/ADebug.h>
25 #include <media/stagefright/foundation/AMessage.h>
26 #include <media/stagefright/foundation/ANetworkSession.h>
27 #include <media/stagefright/foundation/hexdump.h>
28 #include <media/stagefright/MediaErrors.h>
29 #include <media/stagefright/Utils.h>
30
31 #include "include/avc_utils.h"
32
33 namespace android {
34
RTPSender(const sp<ANetworkSession> & netSession,const sp<AMessage> & notify)35 RTPSender::RTPSender(
36 const sp<ANetworkSession> &netSession,
37 const sp<AMessage> ¬ify)
38 : mNetSession(netSession),
39 mNotify(notify),
40 mRTPMode(TRANSPORT_UNDEFINED),
41 mRTCPMode(TRANSPORT_UNDEFINED),
42 mRTPSessionID(0),
43 mRTCPSessionID(0),
44 mRTPConnected(false),
45 mRTCPConnected(false),
46 mLastNTPTime(0),
47 mLastRTPTime(0),
48 mNumRTPSent(0),
49 mNumRTPOctetsSent(0),
50 mNumSRsSent(0),
51 mRTPSeqNo(0),
52 mHistorySize(0) {
53 }
54
~RTPSender()55 RTPSender::~RTPSender() {
56 if (mRTCPSessionID != 0) {
57 mNetSession->destroySession(mRTCPSessionID);
58 mRTCPSessionID = 0;
59 }
60
61 if (mRTPSessionID != 0) {
62 mNetSession->destroySession(mRTPSessionID);
63 mRTPSessionID = 0;
64 }
65 }
66
67 // static
PickRandomRTPPort()68 int32_t RTPBase::PickRandomRTPPort() {
69 // Pick an even integer in range [1024, 65534)
70
71 static const size_t kRange = (65534 - 1024) / 2;
72
73 return (int32_t)(((float)(kRange + 1) * rand()) / RAND_MAX) * 2 + 1024;
74 }
75
initAsync(const char * remoteHost,int32_t remoteRTPPort,TransportMode rtpMode,int32_t remoteRTCPPort,TransportMode rtcpMode,int32_t * outLocalRTPPort)76 status_t RTPSender::initAsync(
77 const char *remoteHost,
78 int32_t remoteRTPPort,
79 TransportMode rtpMode,
80 int32_t remoteRTCPPort,
81 TransportMode rtcpMode,
82 int32_t *outLocalRTPPort) {
83 if (mRTPMode != TRANSPORT_UNDEFINED
84 || rtpMode == TRANSPORT_UNDEFINED
85 || rtpMode == TRANSPORT_NONE
86 || rtcpMode == TRANSPORT_UNDEFINED) {
87 return INVALID_OPERATION;
88 }
89
90 CHECK_NE(rtpMode, TRANSPORT_TCP_INTERLEAVED);
91 CHECK_NE(rtcpMode, TRANSPORT_TCP_INTERLEAVED);
92
93 if ((rtcpMode == TRANSPORT_NONE && remoteRTCPPort >= 0)
94 || (rtcpMode != TRANSPORT_NONE && remoteRTCPPort < 0)) {
95 return INVALID_OPERATION;
96 }
97
98 sp<AMessage> rtpNotify = new AMessage(kWhatRTPNotify, this);
99
100 sp<AMessage> rtcpNotify;
101 if (remoteRTCPPort >= 0) {
102 rtcpNotify = new AMessage(kWhatRTCPNotify, this);
103 }
104
105 CHECK_EQ(mRTPSessionID, 0);
106 CHECK_EQ(mRTCPSessionID, 0);
107
108 int32_t localRTPPort;
109
110 for (;;) {
111 localRTPPort = PickRandomRTPPort();
112
113 status_t err;
114 if (rtpMode == TRANSPORT_UDP) {
115 err = mNetSession->createUDPSession(
116 localRTPPort,
117 remoteHost,
118 remoteRTPPort,
119 rtpNotify,
120 &mRTPSessionID);
121 } else {
122 CHECK_EQ(rtpMode, TRANSPORT_TCP);
123 err = mNetSession->createTCPDatagramSession(
124 localRTPPort,
125 remoteHost,
126 remoteRTPPort,
127 rtpNotify,
128 &mRTPSessionID);
129 }
130
131 if (err != OK) {
132 continue;
133 }
134
135 if (remoteRTCPPort < 0) {
136 break;
137 }
138
139 if (rtcpMode == TRANSPORT_UDP) {
140 err = mNetSession->createUDPSession(
141 localRTPPort + 1,
142 remoteHost,
143 remoteRTCPPort,
144 rtcpNotify,
145 &mRTCPSessionID);
146 } else {
147 CHECK_EQ(rtcpMode, TRANSPORT_TCP);
148 err = mNetSession->createTCPDatagramSession(
149 localRTPPort + 1,
150 remoteHost,
151 remoteRTCPPort,
152 rtcpNotify,
153 &mRTCPSessionID);
154 }
155
156 if (err == OK) {
157 break;
158 }
159
160 mNetSession->destroySession(mRTPSessionID);
161 mRTPSessionID = 0;
162 }
163
164 if (rtpMode == TRANSPORT_UDP) {
165 mRTPConnected = true;
166 }
167
168 if (rtcpMode == TRANSPORT_UDP) {
169 mRTCPConnected = true;
170 }
171
172 mRTPMode = rtpMode;
173 mRTCPMode = rtcpMode;
174 *outLocalRTPPort = localRTPPort;
175
176 if (mRTPMode == TRANSPORT_UDP
177 && (mRTCPMode == TRANSPORT_UDP || mRTCPMode == TRANSPORT_NONE)) {
178 notifyInitDone(OK);
179 }
180
181 return OK;
182 }
183
queueBuffer(const sp<ABuffer> & buffer,uint8_t packetType,PacketizationMode mode)184 status_t RTPSender::queueBuffer(
185 const sp<ABuffer> &buffer, uint8_t packetType, PacketizationMode mode) {
186 status_t err;
187
188 switch (mode) {
189 case PACKETIZATION_NONE:
190 err = queueRawPacket(buffer, packetType);
191 break;
192
193 case PACKETIZATION_TRANSPORT_STREAM:
194 err = queueTSPackets(buffer, packetType);
195 break;
196
197 case PACKETIZATION_H264:
198 err = queueAVCBuffer(buffer, packetType);
199 break;
200
201 default:
202 TRESPASS();
203 }
204
205 return err;
206 }
207
queueRawPacket(const sp<ABuffer> & packet,uint8_t packetType)208 status_t RTPSender::queueRawPacket(
209 const sp<ABuffer> &packet, uint8_t packetType) {
210 CHECK_LE(packet->size(), kMaxUDPPacketSize - 12);
211
212 int64_t timeUs;
213 CHECK(packet->meta()->findInt64("timeUs", &timeUs));
214
215 sp<ABuffer> udpPacket = new ABuffer(12 + packet->size());
216
217 udpPacket->setInt32Data(mRTPSeqNo);
218
219 uint8_t *rtp = udpPacket->data();
220 rtp[0] = 0x80;
221 rtp[1] = packetType;
222
223 rtp[2] = (mRTPSeqNo >> 8) & 0xff;
224 rtp[3] = mRTPSeqNo & 0xff;
225 ++mRTPSeqNo;
226
227 uint32_t rtpTime = (timeUs * 9) / 100ll;
228
229 rtp[4] = rtpTime >> 24;
230 rtp[5] = (rtpTime >> 16) & 0xff;
231 rtp[6] = (rtpTime >> 8) & 0xff;
232 rtp[7] = rtpTime & 0xff;
233
234 rtp[8] = kSourceID >> 24;
235 rtp[9] = (kSourceID >> 16) & 0xff;
236 rtp[10] = (kSourceID >> 8) & 0xff;
237 rtp[11] = kSourceID & 0xff;
238
239 memcpy(&rtp[12], packet->data(), packet->size());
240
241 return sendRTPPacket(
242 udpPacket,
243 true /* storeInHistory */,
244 true /* timeValid */,
245 ALooper::GetNowUs());
246 }
247
queueTSPackets(const sp<ABuffer> & tsPackets,uint8_t packetType)248 status_t RTPSender::queueTSPackets(
249 const sp<ABuffer> &tsPackets, uint8_t packetType) {
250 CHECK_EQ(0, tsPackets->size() % 188);
251
252 int64_t timeUs;
253 CHECK(tsPackets->meta()->findInt64("timeUs", &timeUs));
254
255 size_t srcOffset = 0;
256 while (srcOffset < tsPackets->size()) {
257 sp<ABuffer> udpPacket =
258 new ABuffer(12 + kMaxNumTSPacketsPerRTPPacket * 188);
259
260 udpPacket->setInt32Data(mRTPSeqNo);
261
262 uint8_t *rtp = udpPacket->data();
263 rtp[0] = 0x80;
264 rtp[1] = packetType;
265
266 rtp[2] = (mRTPSeqNo >> 8) & 0xff;
267 rtp[3] = mRTPSeqNo & 0xff;
268 ++mRTPSeqNo;
269
270 int64_t nowUs = ALooper::GetNowUs();
271 uint32_t rtpTime = (nowUs * 9) / 100ll;
272
273 rtp[4] = rtpTime >> 24;
274 rtp[5] = (rtpTime >> 16) & 0xff;
275 rtp[6] = (rtpTime >> 8) & 0xff;
276 rtp[7] = rtpTime & 0xff;
277
278 rtp[8] = kSourceID >> 24;
279 rtp[9] = (kSourceID >> 16) & 0xff;
280 rtp[10] = (kSourceID >> 8) & 0xff;
281 rtp[11] = kSourceID & 0xff;
282
283 size_t numTSPackets = (tsPackets->size() - srcOffset) / 188;
284 if (numTSPackets > kMaxNumTSPacketsPerRTPPacket) {
285 numTSPackets = kMaxNumTSPacketsPerRTPPacket;
286 }
287
288 memcpy(&rtp[12], tsPackets->data() + srcOffset, numTSPackets * 188);
289
290 udpPacket->setRange(0, 12 + numTSPackets * 188);
291
292 srcOffset += numTSPackets * 188;
293 bool isLastPacket = (srcOffset == tsPackets->size());
294
295 status_t err = sendRTPPacket(
296 udpPacket,
297 true /* storeInHistory */,
298 isLastPacket /* timeValid */,
299 timeUs);
300
301 if (err != OK) {
302 return err;
303 }
304 }
305
306 return OK;
307 }
308
queueAVCBuffer(const sp<ABuffer> & accessUnit,uint8_t packetType)309 status_t RTPSender::queueAVCBuffer(
310 const sp<ABuffer> &accessUnit, uint8_t packetType) {
311 int64_t timeUs;
312 CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
313
314 uint32_t rtpTime = (timeUs * 9 / 100ll);
315
316 List<sp<ABuffer> > packets;
317
318 sp<ABuffer> out = new ABuffer(kMaxUDPPacketSize);
319 size_t outBytesUsed = 12; // Placeholder for RTP header.
320
321 const uint8_t *data = accessUnit->data();
322 size_t size = accessUnit->size();
323 const uint8_t *nalStart;
324 size_t nalSize;
325 while (getNextNALUnit(
326 &data, &size, &nalStart, &nalSize,
327 true /* startCodeFollows */) == OK) {
328 size_t bytesNeeded = nalSize + 2;
329 if (outBytesUsed == 12) {
330 ++bytesNeeded;
331 }
332
333 if (outBytesUsed + bytesNeeded > out->capacity()) {
334 bool emitSingleNALPacket = false;
335
336 if (outBytesUsed == 12
337 && outBytesUsed + nalSize <= out->capacity()) {
338 // We haven't emitted anything into the current packet yet and
339 // this NAL unit fits into a single-NAL-unit-packet while
340 // it wouldn't have fit as part of a STAP-A packet.
341
342 memcpy(out->data() + outBytesUsed, nalStart, nalSize);
343 outBytesUsed += nalSize;
344
345 emitSingleNALPacket = true;
346 }
347
348 if (outBytesUsed > 12) {
349 out->setRange(0, outBytesUsed);
350 packets.push_back(out);
351 out = new ABuffer(kMaxUDPPacketSize);
352 outBytesUsed = 12; // Placeholder for RTP header
353 }
354
355 if (emitSingleNALPacket) {
356 continue;
357 }
358 }
359
360 if (outBytesUsed + bytesNeeded <= out->capacity()) {
361 uint8_t *dst = out->data() + outBytesUsed;
362
363 if (outBytesUsed == 12) {
364 *dst++ = 24; // STAP-A header
365 }
366
367 *dst++ = (nalSize >> 8) & 0xff;
368 *dst++ = nalSize & 0xff;
369 memcpy(dst, nalStart, nalSize);
370
371 outBytesUsed += bytesNeeded;
372 continue;
373 }
374
375 // This single NAL unit does not fit into a single RTP packet,
376 // we need to emit an FU-A.
377
378 CHECK_EQ(outBytesUsed, 12u);
379
380 uint8_t nalType = nalStart[0] & 0x1f;
381 uint8_t nri = (nalStart[0] >> 5) & 3;
382
383 size_t srcOffset = 1;
384 while (srcOffset < nalSize) {
385 size_t copy = out->capacity() - outBytesUsed - 2;
386 if (copy > nalSize - srcOffset) {
387 copy = nalSize - srcOffset;
388 }
389
390 uint8_t *dst = out->data() + outBytesUsed;
391 dst[0] = (nri << 5) | 28;
392
393 dst[1] = nalType;
394
395 if (srcOffset == 1) {
396 dst[1] |= 0x80;
397 }
398
399 if (srcOffset + copy == nalSize) {
400 dst[1] |= 0x40;
401 }
402
403 memcpy(&dst[2], nalStart + srcOffset, copy);
404 srcOffset += copy;
405
406 out->setRange(0, outBytesUsed + copy + 2);
407
408 packets.push_back(out);
409 out = new ABuffer(kMaxUDPPacketSize);
410 outBytesUsed = 12; // Placeholder for RTP header
411 }
412 }
413
414 if (outBytesUsed > 12) {
415 out->setRange(0, outBytesUsed);
416 packets.push_back(out);
417 }
418
419 while (!packets.empty()) {
420 sp<ABuffer> out = *packets.begin();
421 packets.erase(packets.begin());
422
423 out->setInt32Data(mRTPSeqNo);
424
425 bool last = packets.empty();
426
427 uint8_t *dst = out->data();
428
429 dst[0] = 0x80;
430
431 dst[1] = packetType;
432 if (last) {
433 dst[1] |= 1 << 7; // M-bit
434 }
435
436 dst[2] = (mRTPSeqNo >> 8) & 0xff;
437 dst[3] = mRTPSeqNo & 0xff;
438 ++mRTPSeqNo;
439
440 dst[4] = rtpTime >> 24;
441 dst[5] = (rtpTime >> 16) & 0xff;
442 dst[6] = (rtpTime >> 8) & 0xff;
443 dst[7] = rtpTime & 0xff;
444 dst[8] = kSourceID >> 24;
445 dst[9] = (kSourceID >> 16) & 0xff;
446 dst[10] = (kSourceID >> 8) & 0xff;
447 dst[11] = kSourceID & 0xff;
448
449 status_t err = sendRTPPacket(out, true /* storeInHistory */);
450
451 if (err != OK) {
452 return err;
453 }
454 }
455
456 return OK;
457 }
458
sendRTPPacket(const sp<ABuffer> & buffer,bool storeInHistory,bool timeValid,int64_t timeUs)459 status_t RTPSender::sendRTPPacket(
460 const sp<ABuffer> &buffer, bool storeInHistory,
461 bool timeValid, int64_t timeUs) {
462 CHECK(mRTPConnected);
463
464 status_t err = mNetSession->sendRequest(
465 mRTPSessionID, buffer->data(), buffer->size(),
466 timeValid, timeUs);
467
468 if (err != OK) {
469 return err;
470 }
471
472 mLastNTPTime = GetNowNTP();
473 mLastRTPTime = U32_AT(buffer->data() + 4);
474
475 ++mNumRTPSent;
476 mNumRTPOctetsSent += buffer->size() - 12;
477
478 if (storeInHistory) {
479 if (mHistorySize == kMaxHistorySize) {
480 mHistory.erase(mHistory.begin());
481 } else {
482 ++mHistorySize;
483 }
484 mHistory.push_back(buffer);
485 }
486
487 return OK;
488 }
489
490 // static
GetNowNTP()491 uint64_t RTPSender::GetNowNTP() {
492 struct timeval tv;
493 gettimeofday(&tv, NULL /* timezone */);
494
495 uint64_t nowUs = tv.tv_sec * 1000000ll + tv.tv_usec;
496
497 nowUs += ((70ll * 365 + 17) * 24) * 60 * 60 * 1000000ll;
498
499 uint64_t hi = nowUs / 1000000ll;
500 uint64_t lo = ((1ll << 32) * (nowUs % 1000000ll)) / 1000000ll;
501
502 return (hi << 32) | lo;
503 }
504
onMessageReceived(const sp<AMessage> & msg)505 void RTPSender::onMessageReceived(const sp<AMessage> &msg) {
506 switch (msg->what()) {
507 case kWhatRTPNotify:
508 case kWhatRTCPNotify:
509 onNetNotify(msg->what() == kWhatRTPNotify, msg);
510 break;
511
512 default:
513 TRESPASS();
514 }
515 }
516
onNetNotify(bool isRTP,const sp<AMessage> & msg)517 void RTPSender::onNetNotify(bool isRTP, const sp<AMessage> &msg) {
518 int32_t reason;
519 CHECK(msg->findInt32("reason", &reason));
520
521 switch (reason) {
522 case ANetworkSession::kWhatError:
523 {
524 int32_t sessionID;
525 CHECK(msg->findInt32("sessionID", &sessionID));
526
527 int32_t err;
528 CHECK(msg->findInt32("err", &err));
529
530 int32_t errorOccuredDuringSend;
531 CHECK(msg->findInt32("send", &errorOccuredDuringSend));
532
533 AString detail;
534 CHECK(msg->findString("detail", &detail));
535
536 ALOGE("An error occurred during %s in session %d "
537 "(%d, '%s' (%s)).",
538 errorOccuredDuringSend ? "send" : "receive",
539 sessionID,
540 err,
541 detail.c_str(),
542 strerror(-err));
543
544 mNetSession->destroySession(sessionID);
545
546 if (sessionID == mRTPSessionID) {
547 mRTPSessionID = 0;
548 } else if (sessionID == mRTCPSessionID) {
549 mRTCPSessionID = 0;
550 }
551
552 if (!mRTPConnected
553 || (mRTPMode != TRANSPORT_NONE && !mRTCPConnected)) {
554 // We haven't completed initialization, attach the error
555 // to the notification instead.
556 notifyInitDone(err);
557 break;
558 }
559
560 notifyError(err);
561 break;
562 }
563
564 case ANetworkSession::kWhatDatagram:
565 {
566 sp<ABuffer> data;
567 CHECK(msg->findBuffer("data", &data));
568
569 if (isRTP) {
570 ALOGW("Huh? Received data on RTP connection...");
571 } else {
572 onRTCPData(data);
573 }
574 break;
575 }
576
577 case ANetworkSession::kWhatConnected:
578 {
579 int32_t sessionID;
580 CHECK(msg->findInt32("sessionID", &sessionID));
581
582 if (isRTP) {
583 CHECK_EQ(mRTPMode, TRANSPORT_TCP);
584 CHECK_EQ(sessionID, mRTPSessionID);
585 mRTPConnected = true;
586 } else {
587 CHECK_EQ(mRTCPMode, TRANSPORT_TCP);
588 CHECK_EQ(sessionID, mRTCPSessionID);
589 mRTCPConnected = true;
590 }
591
592 if (mRTPConnected
593 && (mRTCPMode == TRANSPORT_NONE || mRTCPConnected)) {
594 notifyInitDone(OK);
595 }
596 break;
597 }
598
599 case ANetworkSession::kWhatNetworkStall:
600 {
601 size_t numBytesQueued;
602 CHECK(msg->findSize("numBytesQueued", &numBytesQueued));
603
604 notifyNetworkStall(numBytesQueued);
605 break;
606 }
607
608 default:
609 TRESPASS();
610 }
611 }
612
onRTCPData(const sp<ABuffer> & buffer)613 status_t RTPSender::onRTCPData(const sp<ABuffer> &buffer) {
614 const uint8_t *data = buffer->data();
615 size_t size = buffer->size();
616
617 while (size > 0) {
618 if (size < 8) {
619 // Too short to be a valid RTCP header
620 return ERROR_MALFORMED;
621 }
622
623 if ((data[0] >> 6) != 2) {
624 // Unsupported version.
625 return ERROR_UNSUPPORTED;
626 }
627
628 if (data[0] & 0x20) {
629 // Padding present.
630
631 size_t paddingLength = data[size - 1];
632
633 if (paddingLength + 12 > size) {
634 // If we removed this much padding we'd end up with something
635 // that's too short to be a valid RTP header.
636 return ERROR_MALFORMED;
637 }
638
639 size -= paddingLength;
640 }
641
642 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
643
644 if (size < headerLength) {
645 // Only received a partial packet?
646 return ERROR_MALFORMED;
647 }
648
649 switch (data[1]) {
650 case 200:
651 case 201: // RR
652 parseReceiverReport(data, headerLength);
653 break;
654
655 case 202: // SDES
656 case 203:
657 break;
658
659 case 204: // APP
660 parseAPP(data, headerLength);
661 break;
662
663 case 205: // TSFB (transport layer specific feedback)
664 parseTSFB(data, headerLength);
665 break;
666
667 case 206: // PSFB (payload specific feedback)
668 // hexdump(data, headerLength);
669 break;
670
671 default:
672 {
673 ALOGW("Unknown RTCP packet type %u of size %zu",
674 (unsigned)data[1], headerLength);
675 break;
676 }
677 }
678
679 data += headerLength;
680 size -= headerLength;
681 }
682
683 return OK;
684 }
685
parseReceiverReport(const uint8_t * data,size_t)686 status_t RTPSender::parseReceiverReport(
687 const uint8_t *data, size_t /* size */) {
688 float fractionLost = data[12] / 256.0f;
689
690 ALOGI("lost %.2f %% of packets during report interval.",
691 100.0f * fractionLost);
692
693 return OK;
694 }
695
parseTSFB(const uint8_t * data,size_t size)696 status_t RTPSender::parseTSFB(const uint8_t *data, size_t size) {
697 if ((data[0] & 0x1f) != 1) {
698 return ERROR_UNSUPPORTED; // We only support NACK for now.
699 }
700
701 uint32_t srcId = U32_AT(&data[8]);
702 if (srcId != kSourceID) {
703 return ERROR_MALFORMED;
704 }
705
706 for (size_t i = 12; i < size; i += 4) {
707 uint16_t seqNo = U16_AT(&data[i]);
708 uint16_t blp = U16_AT(&data[i + 2]);
709
710 List<sp<ABuffer> >::iterator it = mHistory.begin();
711 bool foundSeqNo = false;
712 while (it != mHistory.end()) {
713 const sp<ABuffer> &buffer = *it;
714
715 uint16_t bufferSeqNo = buffer->int32Data() & 0xffff;
716
717 bool retransmit = false;
718 if (bufferSeqNo == seqNo) {
719 retransmit = true;
720 } else if (blp != 0) {
721 for (size_t i = 0; i < 16; ++i) {
722 if ((blp & (1 << i))
723 && (bufferSeqNo == ((seqNo + i + 1) & 0xffff))) {
724 blp &= ~(1 << i);
725 retransmit = true;
726 }
727 }
728 }
729
730 if (retransmit) {
731 ALOGV("retransmitting seqNo %d", bufferSeqNo);
732
733 CHECK_EQ((status_t)OK,
734 sendRTPPacket(buffer, false /* storeInHistory */));
735
736 if (bufferSeqNo == seqNo) {
737 foundSeqNo = true;
738 }
739
740 if (foundSeqNo && blp == 0) {
741 break;
742 }
743 }
744
745 ++it;
746 }
747
748 if (!foundSeqNo || blp != 0) {
749 ALOGI("Some sequence numbers were no longer available for "
750 "retransmission (seqNo = %d, foundSeqNo = %d, blp = 0x%04x)",
751 seqNo, foundSeqNo, blp);
752
753 if (!mHistory.empty()) {
754 int32_t earliest = (*mHistory.begin())->int32Data() & 0xffff;
755 int32_t latest = (*--mHistory.end())->int32Data() & 0xffff;
756
757 ALOGI("have seq numbers from %d - %d", earliest, latest);
758 }
759 }
760 }
761
762 return OK;
763 }
764
parseAPP(const uint8_t * data,size_t size)765 status_t RTPSender::parseAPP(const uint8_t *data, size_t size) {
766 static const size_t late_offset = 8;
767 static const char late_string[] = "late";
768 static const size_t avgLatencyUs_offset = late_offset + sizeof(late_string) - 1;
769 static const size_t maxLatencyUs_offset = avgLatencyUs_offset + sizeof(int64_t);
770
771 if ((size >= (maxLatencyUs_offset + sizeof(int64_t)))
772 && !memcmp(late_string, &data[late_offset], sizeof(late_string) - 1)) {
773 int64_t avgLatencyUs = (int64_t)U64_AT(&data[avgLatencyUs_offset]);
774 int64_t maxLatencyUs = (int64_t)U64_AT(&data[maxLatencyUs_offset]);
775
776 sp<AMessage> notify = mNotify->dup();
777 notify->setInt32("what", kWhatInformSender);
778 notify->setInt64("avgLatencyUs", avgLatencyUs);
779 notify->setInt64("maxLatencyUs", maxLatencyUs);
780 notify->post();
781 }
782
783 return OK;
784 }
785
notifyInitDone(status_t err)786 void RTPSender::notifyInitDone(status_t err) {
787 sp<AMessage> notify = mNotify->dup();
788 notify->setInt32("what", kWhatInitDone);
789 notify->setInt32("err", err);
790 notify->post();
791 }
792
notifyError(status_t err)793 void RTPSender::notifyError(status_t err) {
794 sp<AMessage> notify = mNotify->dup();
795 notify->setInt32("what", kWhatError);
796 notify->setInt32("err", err);
797 notify->post();
798 }
799
notifyNetworkStall(size_t numBytesQueued)800 void RTPSender::notifyNetworkStall(size_t numBytesQueued) {
801 sp<AMessage> notify = mNotify->dup();
802 notify->setInt32("what", kWhatNetworkStall);
803 notify->setSize("numBytesQueued", numBytesQueued);
804 notify->post();
805 }
806
807 } // namespace android
808
809