• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "BlockQueuePool"
17 
18 #include "common/log.h"
19 #include "block_queue_pool.h"
20 
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_DEMUXER, "BlockQueuePool" };
23 }
24 
25 namespace OHOS {
26 namespace Media {
27 
~BlockQueuePool()28 BlockQueuePool::~BlockQueuePool()
29 {
30     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
31     for (auto que : quePool_) {
32         FreeQueue(que.first);
33     }
34     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
35 }
36 
AddTrackQueue(uint32_t trackIndex)37 Status BlockQueuePool::AddTrackQueue(uint32_t trackIndex)
38 {
39     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
40     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
41     if (!HasQueue(trackIndex)) {
42         uint32_t queIndex = GetValidQueue();
43         queMap_[trackIndex] = std::vector<uint32_t>({ queIndex });
44         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", add track " PUBLIC_LOG_U32
45             ", get queue " PUBLIC_LOG_U32, name_.c_str(), trackIndex, queIndex);
46         sizeMap_[trackIndex] = 0;
47     } else {
48         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " is already in queue",
49             name_.c_str(), trackIndex);
50     }
51     return Status::OK;
52 }
53 
RemoveTrackQueue(uint32_t trackIndex)54 Status BlockQueuePool::RemoveTrackQueue(uint32_t trackIndex)
55 {
56     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
57     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
58     if (!HasQueue(trackIndex)) {
59         MEDIA_LOG_D("Block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " not in queue", name_.c_str(), trackIndex);
60     } else {
61         for (auto queIndex : queMap_[trackIndex]) {
62             ResetQueue(queIndex);
63         }
64         queMap_[trackIndex].clear();
65         queMap_.erase(trackIndex);
66         sizeMap_.erase(trackIndex);
67     }
68     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S, name_.c_str());
69     return Status::OK;
70 }
71 
GetCacheSize(uint32_t trackIndex)72 size_t BlockQueuePool::GetCacheSize(uint32_t trackIndex)
73 {
74     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
75     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
76     size_t size = 0;
77     FALSE_RETURN_V_NOLOG(HasQueue(trackIndex), size);
78     for (auto queIndex : queMap_[trackIndex]) {
79         if (quePool_[queIndex].blockQue == nullptr) {
80             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
81             continue;
82         }
83         if (quePool_[queIndex].blockQue->Size() > 0) {
84             MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " has cache", name_.c_str());
85             size += quePool_[queIndex].blockQue->Size();
86         }
87     }
88     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", size = " PUBLIC_LOG_ZU, name_.c_str(), size);
89     return size;
90 }
91 
GetCacheDataSize(uint32_t trackIndex)92 uint32_t BlockQueuePool::GetCacheDataSize(uint32_t trackIndex)
93 {
94     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
95     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
96     uint32_t dataSize = 0;
97     FALSE_RETURN_V_NOLOG(HasQueue(trackIndex), dataSize);
98     for (auto queIndex : queMap_[trackIndex]) {
99         if (static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize) > UINT32_MAX) {
100             MEDIA_LOG_D("DataSize(" PUBLIC_LOG_U64 ") is out of uint32",
101                 static_cast<uint64_t>(dataSize) + static_cast<uint64_t>(quePool_[queIndex].dataSize));
102             return UINT32_MAX;
103         }
104         dataSize += quePool_[queIndex].dataSize;
105     }
106     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32 " cache size = " PUBLIC_LOG_U32,
107         name_.c_str(), trackIndex, dataSize);
108     return dataSize;
109 }
110 
HasCache(uint32_t trackIndex)111 bool BlockQueuePool::HasCache(uint32_t trackIndex)
112 {
113     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
114     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
115     FALSE_RETURN_V_NOLOG(HasQueue(trackIndex), false);
116     for (auto queIndex : queMap_[trackIndex]) {
117         if (quePool_[queIndex].blockQue == nullptr) {
118             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
119             continue;
120         }
121         if (quePool_[queIndex].blockQue->Size() > 0) {
122             MEDIA_LOG_D("Block queue " PUBLIC_LOG_S " have cache", name_.c_str());
123             return true;
124         }
125     }
126     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " don't have cache", name_.c_str());
127     return false;
128 }
129 
ResetQueue(uint32_t queueIndex)130 void BlockQueuePool::ResetQueue(uint32_t queueIndex)
131 {
132     if (quePool_.count(queueIndex) == 0) {
133         MEDIA_LOG_D("Error queueIndex");
134         return;
135     }
136     auto blockQue = quePool_[queueIndex].blockQue;
137     if (blockQue == nullptr) {
138         return;
139     }
140     blockQue->Clear();
141     quePool_[queueIndex].dataSize = 0;
142     quePool_[queueIndex].isValid = true;
143     quePool_[queueIndex].maxPts = INT64_MIN;
144     return;
145 }
146 
ResetInfo(std::shared_ptr<SamplePacket> block)147 bool BlockQueuePool::ResetInfo(std::shared_ptr<SamplePacket> block)
148 {
149     FALSE_RETURN_V_MSG_E(block != nullptr, false, "Block is nullptr");
150     MEDIA_LOG_D("Reset for block " PUBLIC_LOG_U32, block->queueIndex);
151     uint32_t queIndex = block->queueIndex;
152     FALSE_RETURN_V_MSG_E(quePool_.count(queIndex) > 0, false, "Index is invalid");
153     for (auto pkt : block->pkts) {
154         if (pkt == nullptr) {
155             MEDIA_LOG_D("Pkt is nullptr, will find next");
156             continue;
157         }
158         uint32_t pktSize = static_cast<uint32_t>(pkt->size);
159         if (quePool_[queIndex].dataSize >= pktSize) {
160             quePool_[queIndex].dataSize -= pktSize;
161         } else {
162             quePool_[queIndex].dataSize = 0;
163         }
164     }
165     return true;
166 }
167 
SetInfo(std::shared_ptr<SamplePacket> block)168 bool BlockQueuePool::SetInfo(std::shared_ptr<SamplePacket> block)
169 {
170     FALSE_RETURN_V_MSG_E(block != nullptr, false, "Block is nullptr");
171     MEDIA_LOG_D("Set for block " PUBLIC_LOG_U32, block->queueIndex);
172     uint32_t queIndex = block->queueIndex;
173     FALSE_RETURN_V_MSG_E(quePool_.count(queIndex) > 0, false, "Index is invalid");
174     for (auto pkt : block->pkts) {
175         if (pkt == nullptr) {
176             MEDIA_LOG_D("Pkt is nullptr, will find next");
177             continue;
178         }
179         uint32_t pktSize = static_cast<uint32_t>(pkt->size);
180         if (quePool_[queIndex].dataSize <= UINT32_MAX - pktSize) {
181             quePool_[queIndex].dataSize += pktSize;
182         } else {
183             quePool_[queIndex].dataSize = UINT32_MAX;
184         }
185     }
186     return true;
187 }
188 
FreeQueue(uint32_t queueIndex)189 void BlockQueuePool::FreeQueue(uint32_t queueIndex)
190 {
191     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
192     if (quePool_.count(queueIndex) == 0) {
193         return;
194     }
195     ResetQueue(queueIndex);
196     quePool_[queueIndex].blockQue = nullptr;
197 }
198 
Push(uint32_t trackIndex,std::shared_ptr<SamplePacket> block)199 bool BlockQueuePool::Push(uint32_t trackIndex, std::shared_ptr<SamplePacket> block)
200 {
201     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
202     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
203     if (!HasQueue(trackIndex)) {
204         Status ret = AddTrackQueue(trackIndex);
205         FALSE_RETURN_V_MSG_E(ret == Status::OK, false, "Add new queue error: " PUBLIC_LOG_D32, ret);
206     }
207     auto& queVector = queMap_[trackIndex];
208     uint32_t pushIndex;
209     if (queVector.size() > 0) {
210         pushIndex = queVector[queVector.size() - 1];
211     } else {
212         pushIndex = GetValidQueue();
213         queMap_[trackIndex].push_back(pushIndex);
214         MEDIA_LOG_D("Track has no queue, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
215     }
216     if (InnerQueueIsFull(pushIndex)) {
217         pushIndex = GetValidQueue();
218         queMap_[trackIndex].push_back(pushIndex);
219         MEDIA_LOG_D("Track que is full, will request " PUBLIC_LOG_D32 " from pool", pushIndex);
220     }
221     if (quePool_[pushIndex].blockQue == nullptr) {
222         MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, failed to push data", pushIndex);
223         return false;
224     }
225     sizeMap_[trackIndex] += 1;
226     for (auto pkt : block->pkts) {
227         if (pkt == nullptr) {
228             continue;
229         }
230         quePool_[pushIndex].dataSize += static_cast<uint32_t>(pkt->size);
231         if (pkt->pts != AV_NOPTS_VALUE && pkt->pts > quePool_[pushIndex].maxPts) {
232             quePool_[pushIndex].maxPts = pkt->pts;
233         }
234     }
235     block->queueIndex = pushIndex;
236     return quePool_[pushIndex].blockQue->Push(block);
237 }
238 
Pop(uint32_t trackIndex)239 std::shared_ptr<SamplePacket> BlockQueuePool::Pop(uint32_t trackIndex)
240 {
241     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
242     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
243     if (!HasQueue(trackIndex)) {
244         MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
245         return nullptr;
246     }
247     auto& queVector = queMap_[trackIndex];
248     for (auto it = queVector.begin(); it != queVector.end(); ++it) {
249         int32_t index = std::distance(queVector.begin(), it);
250         auto queIndex = queVector[index];
251         if (quePool_[queIndex].blockQue == nullptr) {
252             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
253             continue;
254         }
255         if (quePool_[queIndex].blockQue->Size() == 0) {
256             continue;
257         }
258         auto block = quePool_[queIndex].blockQue->Pop();
259         if (block == nullptr) {
260             MEDIA_LOG_D("Block is nullptr");
261             continue;
262         }
263         for (auto pkt : block->pkts) {
264             if (pkt == nullptr) {
265                 MEDIA_LOG_D("Pkt is nullptr, will find next");
266                 continue;
267             }
268             uint32_t pktSize = static_cast<uint32_t>(pkt->size);
269             quePool_[queIndex].dataSize =
270                 quePool_[queIndex].dataSize >= pktSize ? quePool_[queIndex].dataSize -= pktSize : 0;
271         }
272         if (quePool_[queIndex].blockQue->Empty()) {
273             ResetQueue(queIndex);
274             quePool_[queIndex].maxPts = INT64_MIN;
275             MEDIA_LOG_D("Track " PUBLIC_LOG_U32 " queue " PUBLIC_LOG_D32 " is empty, will return to pool",
276                 trackIndex, queIndex);
277             queVector.erase(queVector.begin() + index);
278         }
279         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S " for track " PUBLIC_LOG_U32,
280             name_.c_str(), trackIndex);
281         if (sizeMap_[trackIndex] > 0) {
282             sizeMap_[trackIndex] -= 1;
283         }
284         return block;
285     }
286     MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
287     return nullptr;
288 }
289 
Front(uint32_t trackIndex)290 std::shared_ptr<SamplePacket> BlockQueuePool::Front(uint32_t trackIndex)
291 {
292     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
293     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
294     if (!HasQueue(trackIndex)) {
295         MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
296         return nullptr;
297     }
298     auto queVector = queMap_[trackIndex];
299     for (int i = 0; i < static_cast<int32_t>(queVector.size()); ++i) {
300         auto queIndex = queVector[i];
301         if (quePool_[queIndex].blockQue == nullptr) {
302             MEDIA_LOG_D("Block queue " PUBLIC_LOG_D32 " is nullptr, will find next", queIndex);
303             continue;
304         }
305         if (quePool_[queIndex].blockQue->Size() > 0) {
306             auto block = quePool_[queIndex].blockQue->Front();
307             return block;
308         }
309     }
310     MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
311     return nullptr;
312 }
313 
Back(uint32_t trackIndex)314 std::shared_ptr<SamplePacket> BlockQueuePool::Back(uint32_t trackIndex)
315 {
316     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
317     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
318     if (!HasQueue(trackIndex)) {
319         MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
320         return nullptr;
321     }
322     auto queVector = queMap_[trackIndex];
323     if (queVector.size() > 0) {
324         auto lastQueIndex = queVector[queVector.size() - 1];
325         if (quePool_[lastQueIndex].blockQue != nullptr && quePool_[lastQueIndex].blockQue->Size() > 0) {
326             auto block = quePool_[lastQueIndex].blockQue->Back();
327             return block;
328         }
329     }
330     MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
331     return nullptr;
332 }
333 
GetValidQueue()334 uint32_t BlockQueuePool::GetValidQueue()
335 {
336     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
337     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S, name_.c_str());
338     for (auto pair : quePool_) {
339         if (pair.second.isValid && pair.second.blockQue != nullptr && pair.second.blockQue->Empty()) {
340             quePool_[pair.first].isValid = false;
341             return pair.first;
342         }
343     }
344     quePool_[queCount_] = {
345         false,
346         0,
347         std::make_shared<BlockQueue<std::shared_ptr<SamplePacket>>>("source_que_" + std::to_string(queCount_),
348             singleQueSize_)
349     };
350     MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_S ", valid queue index: " PUBLIC_LOG_U32,
351                  name_.c_str(), queCount_);
352     queCount_++;
353     return (queCount_ - 1);
354 }
355 
InnerQueueIsFull(uint32_t queueIndex)356 bool BlockQueuePool::InnerQueueIsFull(uint32_t queueIndex)
357 {
358     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
359     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", queue " PUBLIC_LOG_U32, name_.c_str(), queueIndex);
360     if (quePool_.count(queueIndex) <= 0 || quePool_[queueIndex].blockQue == nullptr) {
361         MEDIA_LOG_D("Out, block queue " PUBLIC_LOG_D32 " is nullptr", queueIndex);
362         return true;
363     }
364     return quePool_[queueIndex].blockQue->Size() >= quePool_[queueIndex].blockQue->Capacity();
365 }
366 
HasQueue(uint32_t trackIndex)367 bool BlockQueuePool::HasQueue(uint32_t trackIndex)
368 {
369     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
370     return queMap_.count(trackIndex) > 0;
371 }
372 
GetLastPTSByTrackId(uint32_t trackIndex,int64_t & maxPts)373 Status BlockQueuePool::GetLastPTSByTrackId(uint32_t trackIndex, int64_t& maxPts)
374 {
375     std::unique_lock<std::recursive_mutex> lockCacheQ(mutextCacheQ_);
376     maxPts = INT64_MIN;
377     MEDIA_LOG_D("In, block queue " PUBLIC_LOG_S ", track " PUBLIC_LOG_U32, name_.c_str(), trackIndex);
378     if (!HasCache(trackIndex)) {
379         MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache queue", trackIndex);
380         return Status::ERROR_NOT_EXISTED;
381     }
382     auto queVector = queMap_[trackIndex];
383     for (auto queIndex : queVector) {
384         if (quePool_[queIndex].blockQue == nullptr) {
385             MEDIA_LOG_D("Block queue " PUBLIC_LOG_U32 " is nullptr, will find next", queIndex);
386             continue;
387         }
388         maxPts = quePool_[queIndex].maxPts;
389         return Status::OK;
390     }
391     MEDIA_LOG_E("Track " PUBLIC_LOG_U32 " has not cache data", trackIndex);
392     return Status::ERROR_NOT_EXISTED;
393 }
394 } // namespace Media
395 } // namespace OHOS