1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "BlockQueuePool"
17
18 #include "common/log.h"
19 #include "block_queue_pool.h"
20
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_DEMUXER, "BlockQueuePool" };
23 }
24
25 namespace OHOS {
26 namespace Media {
27
~BlockQueuePool()28 BlockQueuePool::~BlockQueuePool()
29 {
30 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
31 for (auto que : quePool_) {
32 FreeQueue(que.first);
33 }
34 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
35 }
36
AddTrackQueue(uint32_t trackIndex)37 Status BlockQueuePool::AddTrackQueue(uint32_t trackIndex)
38 {
39 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
40 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
41 if (!HasQueue(trackIndex)) {
42 uint32_t queIndex = GetValidQueue();
43 queMap_[trackIndex] = std::vector<uint32_t>({ queIndex });
44 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", add track " PUBLIC_LOG_U32
45 ", get queue " PUBLIC_LOG_U32, name_.c_str(), trackIndex, queIndex);
46 sizeMap_[trackIndex] = 0;
47 } else {
48 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " is already in queue",
49 name_.c_str(), trackIndex);
50 }
51 return Status::OK;
52 }
53
RemoveTrackQueue(uint32_t trackIndex)54 Status BlockQueuePool::RemoveTrackQueue(uint32_t trackIndex)
55 {
56 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
57 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
58 if (!HasQueue(trackIndex)) {
59 MEDIA_LOG_D("Block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " not in queue", name_.c_str(), trackIndex);
60 } else {
61 for (auto queIndex : queMap_[trackIndex]) {
62 ResetQueue(queIndex);
63 }
64 queMap_[trackIndex].clear();
65 queMap_.erase(trackIndex);
66 sizeMap_.erase(trackIndex);
67 }
68 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
69 return Status::OK;
70 }
71
GetCacheSize(uint32_t trackIndex)72 size_t BlockQueuePool::GetCacheSize(uint32_t trackIndex)
73 {
74 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
75 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
76 size_t size = 0;
77 FALSE_RETURN_V_NOLOG(HasQueue(trackIndex), size);
78 for (auto queIndex : queMap_[trackIndex]) {
79 if (quePool_[queIndex].blockQue == nullptr) {
80 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
81 continue;
82 }
83 if (quePool_[queIndex].blockQue->Size() > 0) {
84 MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " has cache", name_.c_str());
85 size += quePool_[queIndex].blockQue->Size();
86 }
87 }
88 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", size = " PUBLIC_LOG_ZU, name_.c_str(), size);
89 return size;
90 }
91
GetCacheDataSize(uint32_t trackIndex)92 uint32_t BlockQueuePool::GetCacheDataSize(uint32_t trackIndex)
93 {
94 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
95 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
96 uint32_t dataSize = 0;
97 FALSE_RETURN_V_NOLOG(HasQueue(trackIndex), dataSize);
98 for (auto queIndex : queMap_[trackIndex]) {
99 if (static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize) > UINT32_MAX) {
100 MEDIA_LOG_D("DataSize(" PUBLIC_LOG_U64 ") is out of uint32",
101 static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize));
102 return UINT32_MAX;
103 }
104 dataSize += quePool_[queIndex].dataSize;
105 }
106 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " cache size = " PUBLIC_LOG_U32,
107 name_.c_str(), trackIndex, dataSize);
108 return dataSize;
109 }
110
HasCache(uint32_t trackIndex)111 bool BlockQueuePool::HasCache(uint32_t trackIndex)
112 {
113 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
114 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
115 FALSE_RETURN_V_NOLOG(HasQueue(trackIndex), false);
116 for (auto queIndex : queMap_[trackIndex]) {
117 if (quePool_[queIndex].blockQue == nullptr) {
118 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
119 continue;
120 }
121 if (quePool_[queIndex].blockQue->Size() > 0) {
122 MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " have cache", name_.c_str());
123 return true;
124 }
125 }
126 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " don't have cache", name_.c_str());
127 return false;
128 }
129
ResetQueue(uint32_t queueIndex)130 void BlockQueuePool::ResetQueue(uint32_t queueIndex)
131 {
132 if (quePool_.count(queueIndex) == 0) {
133 MEDIA_LOG_D("Error queueIndex");
134 return;
135 }
136 auto blockQue = quePool_[queueIndex].blockQue;
137 if (blockQue == nullptr) {
138 return;
139 }
140 blockQue->Clear();
141 quePool_[queueIndex].dataSize = 0;
142 quePool_[queueIndex].isValid = true;
143 quePool_[queueIndex].maxPts = INT64_MIN;
144 return;
145 }
146
ResetInfo(std::shared_ptr<SamplePacket> block)147 bool BlockQueuePool::ResetInfo(std::shared_ptr<SamplePacket> block)
148 {
149 FALSE_RETURN_V_MSG_E(block != nullptr, false, "Block is nullptr");
150 MEDIA_LOG_D("Reset for block " PUBLIC_LOG_U32, block->queueIndex);
151 uint32_t queIndex = block->queueIndex;
152 FALSE_RETURN_V_MSG_E(quePool_.count(queIndex) > 0, false, "Index is invalid");
153 for (auto pkt : block->pkts) {
154 if (pkt == nullptr) {
155 MEDIA_LOG_D("Pkt is nullptr, will find next");
156 continue;
157 }
158 uint32_t pktSize = static_cast<uint32_t>(pkt->size);
159 if (quePool_[queIndex].dataSize >= pktSize) {
160 quePool_[queIndex].dataSize -= pktSize;
161 } else {
162 quePool_[queIndex].dataSize = 0;
163 }
164 }
165 return true;
166 }
167
SetInfo(std::shared_ptr<SamplePacket> block)168 bool BlockQueuePool::SetInfo(std::shared_ptr<SamplePacket> block)
169 {
170 FALSE_RETURN_V_MSG_E(block != nullptr, false, "Block is nullptr");
171 MEDIA_LOG_D("Set for block " PUBLIC_LOG_U32, block->queueIndex);
172 uint32_t queIndex = block->queueIndex;
173 FALSE_RETURN_V_MSG_E(quePool_.count(queIndex) > 0, false, "Index is invalid");
174 for (auto pkt : block->pkts) {
175 if (pkt == nullptr) {
176 MEDIA_LOG_D("Pkt is nullptr, will find next");
177 continue;
178 }
179 uint32_t pktSize = static_cast<uint32_t>(pkt->size);
180 if (quePool_[queIndex].dataSize <= UINT32_MAX - pktSize) {
181 quePool_[queIndex].dataSize += pktSize;
182 } else {
183 quePool_[queIndex].dataSize = UINT32_MAX;
184 }
185 }
186 return true;
187 }
188
FreeQueue(uint32_t queueIndex)189 void BlockQueuePool::FreeQueue(uint32_t queueIndex)
190 {
191 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
192 if (quePool_.count(queueIndex) == 0) {
193 return;
194 }
195 ResetQueue(queueIndex);
196 quePool_[queueIndex].blockQue = nullptr;
197 }
198
Push(uint32_t trackIndex,std::shared_ptr<SamplePacket> block)199 bool BlockQueuePool::Push(uint32_t trackIndex, std::shared_ptr<SamplePacket> block)
200 {
201 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
202 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
203 if (!HasQueue(trackIndex)) {
204 Status ret = AddTrackQueue(trackIndex);
205 FALSE_RETURN_V_MSG_E(ret == Status::OK, false, "Add new queue error: " PUBLIC_LOG_D32, ret);
206 }
207 auto& queVector = queMap_[trackIndex];
208 uint32_t pushIndex;
209 if (queVector.size() > 0) {
210 pushIndex = queVector[queVector.size() - 1];
211 } else {
212 pushIndex = GetValidQueue();
213 queMap_[trackIndex].push_back(pushIndex);
214 MEDIA_LOG_D("Track has no queue, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
215 }
216 if (InnerQueueIsFull(pushIndex)) {
217 pushIndex = GetValidQueue();
218 queMap_[trackIndex].push_back(pushIndex);
219 MEDIA_LOG_D("Track que is full, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
220 }
221 if (quePool_[pushIndex].blockQue == nullptr) {
222 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, failed to push data", pushIndex);
223 return false;
224 }
225 sizeMap_[trackIndex] += 1;
226 for (auto pkt : block->pkts) {
227 if (pkt == nullptr) {
228 continue;
229 }
230 quePool_[pushIndex].dataSize += static_cast<uint32_t>(pkt->size);
231 if (pkt->pts != AV_NOPTS_VALUE && pkt->pts > quePool_[pushIndex].maxPts) {
232 quePool_[pushIndex].maxPts = pkt->pts;
233 }
234 }
235 block->queueIndex = pushIndex;
236 return quePool_[pushIndex].blockQue->Push(block);
237 }
238
Pop(uint32_t trackIndex)239 std::shared_ptr<SamplePacket> BlockQueuePool::Pop(uint32_t trackIndex)
240 {
241 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
242 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
243 if (!HasQueue(trackIndex)) {
244 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
245 return nullptr;
246 }
247 auto& queVector = queMap_[trackIndex];
248 for (auto it = queVector.begin(); it != queVector.end(); ++it) {
249 int32_t index = std::distance(queVector.begin(), it);
250 auto queIndex = queVector[index];
251 if (quePool_[queIndex].blockQue == nullptr) {
252 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
253 continue;
254 }
255 if (quePool_[queIndex].blockQue->Size() == 0) {
256 continue;
257 }
258 auto block = quePool_[queIndex].blockQue->Pop();
259 if (block == nullptr) {
260 MEDIA_LOG_D("Block is nullptr");
261 continue;
262 }
263 for (auto pkt : block->pkts) {
264 if (pkt == nullptr) {
265 MEDIA_LOG_D("Pkt is nullptr, will find next");
266 continue;
267 }
268 uint32_t pktSize = static_cast<uint32_t>(pkt->size);
269 quePool_[queIndex].dataSize =
270 quePool_[queIndex].dataSize >= pktSize ? quePool_[queIndex].dataSize -= pktSize : 0;
271 }
272 if (quePool_[queIndex].blockQue->Empty()) {
273 ResetQueue(queIndex);
274 quePool_[queIndex].maxPts = INT64_MIN;
275 MEDIA_LOG_D("Track " PUBLIC_LOG_U32 " queue " PUBLIC_LOG_D32 " is empty, will return to pool",
276 trackIndex, queIndex);
277 queVector.erase(queVector.begin() + index);
278 }
279 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " for track " PUBLIC_LOG_U32,
280 name_.c_str(), trackIndex);
281 if (sizeMap_[trackIndex] > 0) {
282 sizeMap_[trackIndex] -= 1;
283 }
284 return block;
285 }
286 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
287 return nullptr;
288 }
289
Front(uint32_t trackIndex)290 std::shared_ptr<SamplePacket> BlockQueuePool::Front(uint32_t trackIndex)
291 {
292 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
293 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
294 if (!HasQueue(trackIndex)) {
295 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
296 return nullptr;
297 }
298 auto queVector = queMap_[trackIndex];
299 for (int i = 0; i < static_cast<int32_t>(queVector.size()); ++i) {
300 auto queIndex = queVector[i];
301 if (quePool_[queIndex].blockQue == nullptr) {
302 MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
303 continue;
304 }
305 if (quePool_[queIndex].blockQue->Size() > 0) {
306 auto block = quePool_[queIndex].blockQue->Front();
307 return block;
308 }
309 }
310 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
311 return nullptr;
312 }
313
Back(uint32_t trackIndex)314 std::shared_ptr<SamplePacket> BlockQueuePool::Back(uint32_t trackIndex)
315 {
316 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
317 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
318 if (!HasQueue(trackIndex)) {
319 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
320 return nullptr;
321 }
322 auto queVector = queMap_[trackIndex];
323 if (queVector.size() > 0) {
324 auto lastQueIndex = queVector[queVector.size() - 1];
325 if (quePool_[lastQueIndex].blockQue != nullptr && quePool_[lastQueIndex].blockQue->Size() > 0) {
326 auto block = quePool_[lastQueIndex].blockQue->Back();
327 return block;
328 }
329 }
330 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
331 return nullptr;
332 }
333
GetValidQueue()334 uint32_t BlockQueuePool::GetValidQueue()
335 {
336 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
337 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
338 for (auto pair : quePool_) {
339 if (pair.second.isValid && pair.second.blockQue != nullptr && pair.second.blockQue->Empty()) {
340 quePool_[pair.first].isValid = false;
341 return pair.first;
342 }
343 }
344 quePool_[queCount_] = {
345 false,
346 0,
347 std::make_shared<BlockQueue<std::shared_ptr<SamplePacket>>>("source_que_" + std::to_string(queCount_),
348 singleQueSize_)
349 };
350 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", valid queue index: " PUBLIC_LOG_U32,
351 name_.c_str(), queCount_);
352 queCount_++;
353 return (queCount_ - 1);
354 }
355
InnerQueueIsFull(uint32_t queueIndex)356 bool BlockQueuePool::InnerQueueIsFull(uint32_t queueIndex)
357 {
358 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
359 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", queue " PUBLIC_LOG_U32, name_.c_str(), queueIndex);
360 if (quePool_.count(queueIndex) <= 0 || quePool_[queueIndex].blockQue == nullptr) {
361 MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_D32 " is nullptr", queueIndex);
362 return true;
363 }
364 return quePool_[queueIndex].blockQue->Size() >= quePool_[queueIndex].blockQue->Capacity();
365 }
366
HasQueue(uint32_t trackIndex)367 bool BlockQueuePool::HasQueue(uint32_t trackIndex)
368 {
369 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
370 return queMap_.count(trackIndex) > 0;
371 }
372
GetLastPTSByTrackId(uint32_t trackIndex,int64_t & maxPts)373 Status BlockQueuePool::GetLastPTSByTrackId(uint32_t trackIndex, int64_t& maxPts)
374 {
375 std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
376 maxPts = INT64_MIN;
377 MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
378 if (!HasCache(trackIndex)) {
379 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
380 return Status::ERROR_NOT_EXISTED;
381 }
382 auto queVector = queMap_[trackIndex];
383 for (auto queIndex : queVector) {
384 if (quePool_[queIndex].blockQue == nullptr) {
385 MEDIA_LOG_D("Block queue " PUBLIC_LOG_U32 " is nullptr, will find next", queIndex);
386 continue;
387 }
388 maxPts = quePool_[queIndex].maxPts;
389 return Status::OK;
390 }
391 MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
392 return Status::ERROR_NOT_EXISTED;
393 }
394 } // namespace Media
395 } // namespace OHOS