• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rtp_queue.h"
17 #include <arpa/inet.h>
18 #include <iostream>
19 #include <limits>
20 #include <securec.h>
21 #include "common/common_macro.h"
22 #include "common/media_log.h"
23 
24 namespace OHOS {
25 namespace Sharing {
RtpPacketSortor(int32_t sampleRate,size_t kMax,size_t kMin)26 RtpPacketSortor::RtpPacketSortor(int32_t sampleRate, size_t kMax, size_t kMin)
27     : sampleRate_(sampleRate), kMin_(kMin), kMax_(kMax)
28 {}
29 
InputRtp(TrackType type,uint8_t * ptr,size_t len)30 void RtpPacketSortor::InputRtp(TrackType type, uint8_t *ptr, size_t len)
31 {
32     RETURN_IF_NULL(ptr);
33     if (len < RtpPacket::RTP_HEADER_SIZE) {
34         // hilog rtp packet is too small
35         return;
36     }
37     // todo jduge rtp size
38     if (!sampleRate_) {
39         return;
40     }
41     RtpHeader *header = (RtpHeader *)ptr;
42     if (header->version_ != RtpPacket::RTP_VERSION) {
43         // hilog error version error
44         return;
45     }
46     if (!header->GetPayloadSize(len)) {
47         return;
48     }
49 
50     auto rtp = std::make_shared<RtpPacket>();
51     RETURN_IF_NULL(rtp);
52     rtp->ReplaceData(reinterpret_cast<char*>(ptr), len);
53     rtp->sampleRate_ = (uint32_t)sampleRate_;
54     rtp->type_ = type;
55 
56     if ((size_t)rtp->Size() != len) {
57         return;
58     }
59     MEDIA_LOGD("rtp payload size: %{public}zu.", rtp->GetPayloadSize());
60 
61     if (ssrc_ != rtp->GetSSRC() || rtp->GetSeq() == 0) {
62         ssrc_ = rtp->GetSSRC();
63         Flush();
64         Clear();
65         nextSeqOut_ = rtp->GetSeq();
66     }
67 
68     SortPacket(rtp->GetSeq(), rtp);
69     return;
70 }
71 
SetOnSort(const OnSort & cb)72 void RtpPacketSortor::SetOnSort(const OnSort &cb)
73 {
74     onSort_ = std::move(cb);
75 }
76 
Clear()77 void RtpPacketSortor::Clear()
78 {
79     seqCycleCount_ = 0;
80     pktSortCacheMap_.clear();
81     nextSeqOut_ = 0;
82     maxSortSize_ = kMin_;
83 }
84 
GetJitterSize() const85 size_t RtpPacketSortor::GetJitterSize() const
86 {
87     return pktSortCacheMap_.size();
88 }
89 
GetCycleCount() const90 size_t RtpPacketSortor::GetCycleCount() const
91 {
92     return seqCycleCount_;
93 }
94 
SortPacket(uint16_t seq,RtpPacket::Ptr packet)95 void RtpPacketSortor::SortPacket(uint16_t seq, RtpPacket::Ptr packet)
96 {
97     RETURN_IF_NULL(packet);
98     if (seq < nextSeqOut_) {
99         if (nextSeqOut_ < (uint32_t)seq + kMax_) {
100             return;
101         }
102     } else if (nextSeqOut_ && seq - nextSeqOut_ > ((std::numeric_limits<uint16_t>::max)() >> 1)) {
103         MEDIA_LOGD("nextSeqOut_ && seq - nextSeqOut_ > ((std::numeric_limits<uint16_t>::max)() >> 1)");
104         return;
105     }
106 
107     pktSortCacheMap_.emplace(seq, std::move(packet));
108 
109     TryPopPacket();
110 }
111 
Flush()112 void RtpPacketSortor::Flush()
113 {
114     while (!pktSortCacheMap_.empty()) {
115         PopIterator(pktSortCacheMap_.begin());
116     }
117 }
118 
GetSSRC() const119 uint32_t RtpPacketSortor::GetSSRC() const
120 {
121     return ssrc_;
122 }
123 
PopPacket()124 void RtpPacketSortor::PopPacket()
125 {
126     if (pktSortCacheMap_.empty()) {
127         return;
128     }
129 
130     auto it = pktSortCacheMap_.begin();
131     if (it->first >= nextSeqOut_) {
132         PopIterator(it);
133         return;
134     }
135 
136     if (nextSeqOut_ - it->first > (0xFFFF >> 1)) {
137         if (pktSortCacheMap_.size() < 2 * kMin_) { // 2:fixed size
138             return;
139         }
140         ++seqCycleCount_;
141 
142         auto hit = pktSortCacheMap_.upper_bound((nextSeqOut_ - pktSortCacheMap_.size()));
143         while (hit != pktSortCacheMap_.end()) {
144             if (onSort_) {
145                 onSort_(hit->first, hit->second);
146             }
147             hit = pktSortCacheMap_.erase(hit);
148         }
149 
150         PopIterator(pktSortCacheMap_.begin());
151         return;
152     }
153 
154     pktSortCacheMap_.erase(it);
155 }
156 
PopIterator(std::map<uint16_t,RtpPacket::Ptr>::iterator it)157 void RtpPacketSortor::PopIterator(std::map<uint16_t, RtpPacket::Ptr>::iterator it)
158 {
159     auto seq = it->first;
160     auto data = std::move(it->second);
161     pktSortCacheMap_.erase(it);
162     nextSeqOut_ = seq + 1;
163     if (onSort_) {
164         onSort_(seq, data);
165     }
166 }
167 
TryPopPacket()168 void RtpPacketSortor::TryPopPacket()
169 {
170     int32_t count = 0;
171     while ((!pktSortCacheMap_.empty() && pktSortCacheMap_.begin()->first == nextSeqOut_)) {
172         PopPacket();
173         ++count;
174     }
175 
176     if (count) {
177         SetSortSize();
178     } else if (pktSortCacheMap_.size() > maxSortSize_) {
179         PopPacket();
180         SetSortSize();
181     }
182 }
183 
SetSortSize()184 void RtpPacketSortor::SetSortSize()
185 {
186     maxSortSize_ = kMin_ + pktSortCacheMap_.size();
187     if (maxSortSize_ > kMax_) {
188         maxSortSize_ = kMax_;
189     }
190 }
191 } // namespace Sharing
192 } // namespace OHOS