1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include "parameter.h"
18 #include "soundpool.h"
19 #include "media_log.h"
20 #include "media_errors.h"
21 #include "stream_id_manager.h"
22
23 namespace {
24 // audiorender max concurrency.
25 static const std::string THREAD_POOL_NAME = "StreamIDManagerThreadPool";
26 static const int32_t MAX_THREADS_NUM = std::thread::hardware_concurrency() >= 4 ? 2 : 1;
27 }
28
29 namespace OHOS {
30 namespace Media {
StreamIDManager(int32_t maxStreams,AudioStandard::AudioRendererInfo audioRenderInfo)31 StreamIDManager::StreamIDManager(int32_t maxStreams,
32 AudioStandard::AudioRendererInfo audioRenderInfo) : audioRendererInfo_(audioRenderInfo), maxStreams_(maxStreams)
33 {
34 MEDIA_INFO_LOG("Construction StreamIDManager.");
35 #ifdef SOUNDPOOL_SUPPORT_LOW_LATENCY
36 char hardwareName[10] = {0};
37 GetParameter("ohos.boot.hardware", "rk3568", hardwareName, sizeof(hardwareName));
38 if (strcmp(hardwareName, "baltimore") != 0) {
39 MEDIA_INFO_LOG("Device unsupport low-latency, force set to normal play.");
40 audioRendererInfo_.rendererFlags = 0;
41 } else {
42 MEDIA_INFO_LOG("Device support low-latency, set renderer by user.");
43 }
44 #else
45 // Force all play to normal.
46 audioRendererInfo_.rendererFlags = 0;
47 MEDIA_INFO_LOG("SoundPool unsupport low-latency.");
48 #endif
49
50 InitThreadPool();
51 }
52
~StreamIDManager()53 StreamIDManager::~StreamIDManager()
54 {
55 MEDIA_INFO_LOG("Destruction StreamIDManager");
56 if (callback_ != nullptr) {
57 callback_.reset();
58 }
59 for (auto cacheBuffer : cacheBuffers_) {
60 if (cacheBuffer.second != nullptr) {
61 cacheBuffer.second->Release();
62 }
63 }
64 cacheBuffers_.clear();
65 if (isStreamPlayingThreadPoolStarted_.load()) {
66 if (streamPlayingThreadPool_ != nullptr) {
67 streamPlayingThreadPool_->Stop();
68 }
69 isStreamPlayingThreadPoolStarted_.store(false);
70 }
71 }
72
InitThreadPool()73 int32_t StreamIDManager::InitThreadPool()
74 {
75 if (isStreamPlayingThreadPoolStarted_.load()) {
76 return MSERR_OK;
77 }
78 streamPlayingThreadPool_ = std::make_unique<ThreadPool>(THREAD_POOL_NAME);
79 CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
80 "Failed to obtain playing ThreadPool");
81 if (maxStreams_ > MAX_PLAY_STREAMS_NUMBER || maxStreams_ <= 0) {
82 maxStreams_ = MAX_PLAY_STREAMS_NUMBER;
83 MEDIA_INFO_LOG("more than max play stream number, align to max play strem number.");
84 }
85 if (maxStreams_ < MIN_PLAY_STREAMS_NUMBER) {
86 maxStreams_ = MIN_PLAY_STREAMS_NUMBER;
87 MEDIA_INFO_LOG("less than min play stream number, align to min play strem number.");
88 }
89 MEDIA_INFO_LOG("stream playing thread pool maxStreams_:%{public}d", maxStreams_);
90 // For stream priority logic, thread num need align to task num.
91 streamPlayingThreadPool_->Start(maxStreams_);
92 streamPlayingThreadPool_->SetMaxTaskNum(maxStreams_);
93 isStreamPlayingThreadPoolStarted_.store(true);
94
95 return MSERR_OK;
96 }
97
Play(std::shared_ptr<SoundParser> soundParser,PlayParams playParameters)98 int32_t StreamIDManager::Play(std::shared_ptr<SoundParser> soundParser, PlayParams playParameters)
99 {
100 int32_t soundID = soundParser->GetSoundID();
101 int32_t streamID = GetFreshStreamID(soundID, playParameters);
102 {
103 std::lock_guard lock(streamIDManagerLock_);
104 if (streamID <= 0) {
105 do {
106 nextStreamID_ = nextStreamID_ == INT32_MAX ? 1 : nextStreamID_ + 1;
107 } while (FindCacheBuffer(nextStreamID_) != nullptr);
108 streamID = nextStreamID_;
109 std::deque<std::shared_ptr<AudioBufferEntry>> cacheData;
110 soundParser->GetSoundData(cacheData);
111 size_t cacheDataTotalSize = soundParser->GetSoundDataTotalSize();
112 auto cacheBuffer =
113 std::make_shared<CacheBuffer>(soundParser->GetSoundTrackFormat(), cacheData, cacheDataTotalSize,
114 soundID, streamID);
115 CHECK_AND_RETURN_RET_LOG(cacheBuffer != nullptr, -1, "failed to create cache buffer");
116 CHECK_AND_RETURN_RET_LOG(callback_ != nullptr, MSERR_INVALID_VAL, "Invalid callback.");
117 cacheBuffer->SetCallback(callback_);
118 cacheBufferCallback_ = std::make_shared<CacheBufferCallBack>(weak_from_this());
119 CHECK_AND_RETURN_RET_LOG(cacheBufferCallback_ != nullptr, MSERR_INVALID_VAL,
120 "Invalid cachebuffer callback");
121 cacheBuffer->SetCacheBufferCallback(cacheBufferCallback_);
122 cacheBuffers_.emplace(streamID, cacheBuffer);
123 }
124 }
125 SetPlay(soundID, streamID, playParameters);
126 return streamID;
127 }
128
SetPlay(const int32_t soundID,const int32_t streamID,const PlayParams playParameters)129 int32_t StreamIDManager::SetPlay(const int32_t soundID, const int32_t streamID, const PlayParams playParameters)
130 {
131 if (!isStreamPlayingThreadPoolStarted_.load()) {
132 InitThreadPool();
133 }
134 {
135 std::lock_guard lock(streamIDManagerLock_);
136 streamIDs_.emplace_back(streamID);
137 }
138
139 CHECK_AND_RETURN_RET_LOG(streamPlayingThreadPool_ != nullptr, MSERR_INVALID_VAL,
140 "Failed to obtain stream play threadpool.");
141 MEDIA_INFO_LOG("StreamIDManager cur task num:%{public}zu, maxStreams_:%{public}d",
142 currentTaskNum_, maxStreams_);
143 // CacheBuffer must prepare before play.
144 std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(streamID);
145 freshCacheBuffer->PreparePlay(streamID, audioRendererInfo_, playParameters);
146 int32_t tempMaxStream = maxStreams_;
147 if (currentTaskNum_ < static_cast<size_t>(tempMaxStream)) {
148 AddPlayTask(streamID, playParameters);
149 } else {
150 int32_t playingStreamID = playingStreamIDs_.back();
151 std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
152 MEDIA_INFO_LOG("StreamIDManager fresh sound priority:%{public}d, playing stream priority:%{public}d",
153 freshCacheBuffer->GetPriority(), playingCacheBuffer->GetPriority());
154 if (freshCacheBuffer->GetPriority() >= playingCacheBuffer->GetPriority()) {
155 MEDIA_INFO_LOG("StreamIDManager stop playing low priority sound:%{public}d", playingStreamID);
156 playingCacheBuffer->Stop(playingStreamID);
157 playingStreamIDs_.pop_back();
158 MEDIA_INFO_LOG("StreamIDManager to playing fresh sound:%{public}d.", streamID);
159 AddPlayTask(streamID, playParameters);
160 } else {
161 MEDIA_INFO_LOG("StreamIDManager queue will play streams, streamID:%{public}d.", streamID);
162 StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo;
163 freshStreamIDAndPlayParamsInfo.streamID = streamID;
164 freshStreamIDAndPlayParamsInfo.playParameters = playParameters;
165 QueueAndSortWillPlayStreamID(freshStreamIDAndPlayParamsInfo);
166 }
167 }
168 for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
169 int32_t playingStreamID = playingStreamIDs_[i];
170 MEDIA_DEBUG_LOG("StreamIDManager::SetPlay playingStreamID:%{public}d", playingStreamID);
171 }
172 for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
173 StreamIDAndPlayParamsInfo willPlayInfo = willPlayStreamInfos_[i];
174 MEDIA_DEBUG_LOG("StreamIDManager::SetPlay willPlayStreamID:%{public}d", willPlayInfo.streamID);
175 }
176 return MSERR_OK;
177 }
178
179 // Sort in descending order
180 // 0 has the lowest priority, and the higher the value, the higher the priority
181 // The queue head has the highest value and priority
QueueAndSortPlayingStreamID(int32_t streamID)182 void StreamIDManager::QueueAndSortPlayingStreamID(int32_t streamID)
183 {
184 if (playingStreamIDs_.empty()) {
185 std::lock_guard lock(streamIDManagerLock_);
186 playingStreamIDs_.emplace_back(streamID);
187 } else {
188 bool shouldReCombinePlayingQueue = false;
189 for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
190 int32_t playingStreamID = playingStreamIDs_[i];
191 std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(streamID);
192 std::shared_ptr<CacheBuffer> playingCacheBuffer = FindCacheBuffer(playingStreamID);
193 if (playingCacheBuffer == nullptr) {
194 std::lock_guard lock(streamIDManagerLock_);
195 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
196 shouldReCombinePlayingQueue = true;
197 break;
198 }
199 if (!playingCacheBuffer->IsRunning()) {
200 std::lock_guard lock(streamIDManagerLock_);
201 playingStreamIDs_.erase(playingStreamIDs_.begin() + i);
202 shouldReCombinePlayingQueue = true;
203 break;
204 }
205 if (freshCacheBuffer == nullptr) {
206 break;
207 }
208 if (freshCacheBuffer->GetPriority() >= playingCacheBuffer->GetPriority()) {
209 std::lock_guard lock(streamIDManagerLock_);
210 playingStreamIDs_.insert(playingStreamIDs_.begin() + i, streamID);
211 break;
212 }
213 if (i == playingStreamIDs_.size() - 1 &&
214 freshCacheBuffer->GetPriority() < playingCacheBuffer->GetPriority()) {
215 playingStreamIDs_.push_back(streamID);
216 break;
217 }
218 }
219 if (shouldReCombinePlayingQueue) {
220 QueueAndSortPlayingStreamID(streamID);
221 }
222 }
223 }
224
225 // Sort in descending order.
226 // 0 has the lowest priority, and the higher the value, the higher the priority
227 // The queue head has the highest value and priority
QueueAndSortWillPlayStreamID(StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo)228 void StreamIDManager::QueueAndSortWillPlayStreamID(StreamIDAndPlayParamsInfo freshStreamIDAndPlayParamsInfo)
229 {
230 if (willPlayStreamInfos_.empty()) {
231 std::lock_guard lock(streamIDManagerLock_);
232 willPlayStreamInfos_.emplace_back(freshStreamIDAndPlayParamsInfo);
233 } else {
234 bool shouldReCombineWillPlayQueue = false;
235 for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
236 std::shared_ptr<CacheBuffer> freshCacheBuffer = FindCacheBuffer(freshStreamIDAndPlayParamsInfo.streamID);
237 std::shared_ptr<CacheBuffer> willPlayCacheBuffer = FindCacheBuffer(willPlayStreamInfos_[i].streamID);
238 if (willPlayCacheBuffer == nullptr) {
239 std::lock_guard lock(streamIDManagerLock_);
240 willPlayStreamInfos_.erase(willPlayStreamInfos_.begin() + i);
241 shouldReCombineWillPlayQueue = true;
242 break;
243 }
244 if (freshCacheBuffer == nullptr) {
245 break;
246 }
247 if (freshCacheBuffer->GetPriority() >= willPlayCacheBuffer->GetPriority()) {
248 std::lock_guard lock(streamIDManagerLock_);
249 willPlayStreamInfos_.insert(willPlayStreamInfos_.begin() + i, freshStreamIDAndPlayParamsInfo);
250 break;
251 }
252 if (i == willPlayStreamInfos_.size() - 1 &&
253 freshCacheBuffer->GetPriority() < willPlayCacheBuffer->GetPriority()) {
254 willPlayStreamInfos_.push_back(freshStreamIDAndPlayParamsInfo);
255 break;
256 }
257 }
258 if (shouldReCombineWillPlayQueue) {
259 QueueAndSortWillPlayStreamID(freshStreamIDAndPlayParamsInfo);
260 }
261 }
262 }
263
AddPlayTask(const int32_t streamID,const PlayParams playParameters)264 int32_t StreamIDManager::AddPlayTask(const int32_t streamID, const PlayParams playParameters)
265 {
266 ThreadPool::Task streamPlayTask = std::bind(&StreamIDManager::DoPlay, this, streamID);
267 CHECK_AND_RETURN_RET_LOG(streamPlayTask != nullptr, MSERR_INVALID_VAL, "Failed to obtain stream play Task");
268 streamPlayingThreadPool_->AddTask(streamPlayTask);
269 currentTaskNum_++;
270 QueueAndSortPlayingStreamID(streamID);
271 return MSERR_OK;
272 }
273
DoPlay(const int32_t streamID)274 int32_t StreamIDManager::DoPlay(const int32_t streamID)
275 {
276 MEDIA_INFO_LOG("StreamIDManager streamID:%{public}d", streamID);
277 std::shared_ptr<CacheBuffer> cacheBuffer = FindCacheBuffer(streamID);
278 CHECK_AND_RETURN_RET_LOG(cacheBuffer.get() != nullptr, MSERR_INVALID_VAL, "cachebuffer invalid.");
279 if (cacheBuffer->DoPlay(streamID) == MSERR_OK) {
280 return MSERR_OK;
281 }
282 return MSERR_INVALID_VAL;
283 }
284
FindCacheBuffer(const int32_t streamID)285 std::shared_ptr<CacheBuffer> StreamIDManager::FindCacheBuffer(const int32_t streamID)
286 {
287 if (cacheBuffers_.empty()) {
288 MEDIA_INFO_LOG("StreamIDManager cacheBuffers_ empty");
289 return nullptr;
290 }
291 if (cacheBuffers_.find(streamID) != cacheBuffers_.end()) {
292 return cacheBuffers_.at(streamID);
293 }
294 return nullptr;
295 }
296
ReorderStream(int32_t streamID,int32_t priority)297 int32_t StreamIDManager::ReorderStream(int32_t streamID, int32_t priority)
298 {
299 int32_t playingSize = playingStreamIDs_.size();
300 for (int32_t i = 0; i < playingSize - 1; ++i) {
301 for (int32_t j = 0; j < playingSize - 1 - i; ++j) {
302 std::shared_ptr<CacheBuffer> left = FindCacheBuffer(playingStreamIDs_[j]);
303 std::shared_ptr<CacheBuffer> right = FindCacheBuffer(playingStreamIDs_[j + 1]);
304 if (left->GetPriority() < right->GetPriority()) {
305 int32_t streamIdTemp = playingStreamIDs_[j];
306 playingStreamIDs_[j] = playingStreamIDs_[j + 1];
307 playingStreamIDs_[j + 1] = streamIdTemp;
308 }
309 }
310 }
311 for (size_t i = 0; i < playingStreamIDs_.size(); i++) {
312 int32_t playingStreamID = playingStreamIDs_[i];
313 MEDIA_DEBUG_LOG("StreamIDManager::ReorderStream playingStreamID:%{public}d", playingStreamID);
314 }
315
316 int32_t willPlaySize = willPlayStreamInfos_.size();
317 for (int32_t i = 0; i < willPlaySize - 1; ++i) {
318 for (int32_t j = 0; j < willPlaySize - 1 - i; ++j) {
319 std::shared_ptr<CacheBuffer> left = FindCacheBuffer(willPlayStreamInfos_[j].streamID);
320 std::shared_ptr<CacheBuffer> right = FindCacheBuffer(willPlayStreamInfos_[j + 1].streamID);
321 if (left->GetPriority() < right->GetPriority()) {
322 StreamIDAndPlayParamsInfo willPlayInfoTemp = willPlayStreamInfos_[j];
323 willPlayStreamInfos_[j] = willPlayStreamInfos_[j + 1];
324 willPlayStreamInfos_[j + 1] = willPlayInfoTemp;
325 }
326 }
327 }
328 for (size_t i = 0; i < willPlayStreamInfos_.size(); i++) {
329 StreamIDAndPlayParamsInfo willPlayInfo = willPlayStreamInfos_[i];
330 MEDIA_DEBUG_LOG("StreamIDManager::ReorderStream willPlayStreamID:%{public}d", willPlayInfo.streamID);
331 }
332 return MSERR_OK;
333 }
334
GetFreshStreamID(const int32_t soundID,PlayParams playParameters)335 int32_t StreamIDManager::GetFreshStreamID(const int32_t soundID, PlayParams playParameters)
336 {
337 int32_t streamID = 0;
338 if (cacheBuffers_.empty()) {
339 MEDIA_INFO_LOG("StreamIDManager cacheBuffers_ empty");
340 return streamID;
341 }
342 for (auto cacheBuffer : cacheBuffers_) {
343 if (soundID == cacheBuffer.second->GetSoundID()) {
344 streamID = cacheBuffer.second->GetStreamID();
345 MEDIA_INFO_LOG("Have cache soundID:%{public}d, streamID:%{public}d", soundID, streamID);
346 break;
347 }
348 }
349 return streamID;
350 }
351
OnPlayFinished()352 void StreamIDManager::OnPlayFinished()
353 {
354 currentTaskNum_--;
355 if (!willPlayStreamInfos_.empty()) {
356 MEDIA_INFO_LOG("StreamIDManager OnPlayFinished will play streams non empty, get the front.");
357 StreamIDAndPlayParamsInfo willPlayStreamInfo = willPlayStreamInfos_.front();
358 AddPlayTask(willPlayStreamInfo.streamID, willPlayStreamInfo.playParameters);
359 std::lock_guard lock(streamIDManagerLock_);
360 willPlayStreamInfos_.pop_front();
361 }
362 }
363
SetCallback(const std::shared_ptr<ISoundPoolCallback> & callback)364 int32_t StreamIDManager::SetCallback(const std::shared_ptr<ISoundPoolCallback> &callback)
365 {
366 callback_ = callback;
367 return MSERR_OK;
368 }
369 } // namespace Media
370 } // namespace OHOS
371