1 /*
2 * Copyright (C) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <algorithm>
17 #include "parameter.h"
18 #include "soundpool.h"
19 #include "media_log.h"
20 #include "media_errors.h"
21 #include "parallel_stream_manager.h"
22 #include "audio_renderer_manager.h"
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN_SOUNDPOOL, "ParallelStreamManager"};
26 static const std::string THREAD_POOL_NAME = "OS_PalStream";
27 static const std::string THREAD_POOL_NAME_STREAM = "OS_Stream";
28 static const int32_t MAX_PLAY_STREAMS_NUMBER = 32;
29 static const int32_t MIN_PLAY_STREAMS_NUMBER = 1;
30 static const int32_t STREAM_THREAD_NUMBER = 1;
31 static const int32_t ERROE_STREAM_ID = -1;
32 static const int32_t ERROE_GLOBAL_ID = -1;
33 }
34
35 namespace OHOS {
36 namespace Media {
ParallelStreamManager(int32_t maxStreams,AudioStandard::AudioRendererInfo audioRenderInfo)37 ParallelStreamManager::ParallelStreamManager(int32_t maxStreams,
38 AudioStandard::AudioRendererInfo audioRenderInfo) : audioRendererInfo_(audioRenderInfo), maxStreams_(maxStreams)
39 {
40 MEDIA_LOGI("Construction ParallelStreamManager");
41 }
42
~ParallelStreamManager()43 ParallelStreamManager::~ParallelStreamManager()
44 {
45 parallelStreamManagerLock_.lock();
46 MEDIA_LOGI("ParallelStreamManager::~ParallelStreamManager start");
47 if (callback_ != nullptr) {
48 callback_.reset();
49 }
50 if (frameWriteCallback_ != nullptr) {
51 frameWriteCallback_.reset();
52 }
53
54 waitingStream_.clear();
55 std::vector<std::shared_ptr<Stream>> vector;
56 for (auto it = playingStream_.begin(); it != playingStream_.end();) {
57 vector.push_back(it->second);
58 it = playingStream_.erase(it);
59 }
60 parallelStreamManagerLock_.unlock();
61 for (auto& item : vector) {
62 item->Stop();
63 }
64 vector.clear();
65 if (streamPlayThreadPool_ != nullptr) {
66 streamPlayThreadPool_->Stop();
67 }
68 if (streamStopThreadPool_ != nullptr) {
69 streamStopThreadPool_->Stop();
70 }
71 MEDIA_LOGI("ParallelStreamManager::~ParallelStreamManager end");
72 }
73
InitThreadPool()74 int32_t ParallelStreamManager::InitThreadPool()
75 {
76 if (maxStreams_ > MAX_PLAY_STREAMS_NUMBER) {
77 maxStreams_ = MAX_PLAY_STREAMS_NUMBER;
78 MEDIA_LOGI("more than max play stream number, align to max play strem number.");
79 }
80 if (maxStreams_ < MIN_PLAY_STREAMS_NUMBER) {
81 maxStreams_ = MIN_PLAY_STREAMS_NUMBER;
82 MEDIA_LOGI("less than min play stream number, align to min play strem number.");
83 }
84 MEDIA_LOGI("stream playing thread pool maxStreams_:%{public}d", maxStreams_);
85 streamPlayThreadPool_ = std::make_shared<ThreadPool>(THREAD_POOL_NAME);
86 CHECK_AND_RETURN_RET_LOG(streamPlayThreadPool_ != nullptr, MSERR_INVALID_VAL, "Parallel playThreadPool fail");
87 streamPlayThreadPool_->Start(maxStreams_);
88
89 streamStopThreadPool_ = std::make_shared<ThreadPool>(THREAD_POOL_NAME_STREAM);
90 CHECK_AND_RETURN_RET_LOG(streamStopThreadPool_ != nullptr, MSERR_INVALID_VAL, "Parallel stopThreadPool fail");
91 streamStopThreadPool_->Start(STREAM_THREAD_NUMBER);
92
93 AudioRendererManager::GetInstance().SetParallelManager(weak_from_this());
94 return MSERR_OK;
95 }
96
GetGlobalId(int32_t soundId)97 int32_t ParallelStreamManager::GetGlobalId(int32_t soundId)
98 {
99 std::lock_guard lock(globalIdMutex_);
100 for (auto it = globalIdVector_.begin(); it != globalIdVector_.end();) {
101 if (it->first == soundId) {
102 return it->second;
103 } else {
104 ++it;
105 }
106 }
107 return ERROE_GLOBAL_ID;
108 }
109
DelGlobalId(int32_t globalId)110 void ParallelStreamManager::DelGlobalId(int32_t globalId)
111 {
112 std::lock_guard lock(globalIdMutex_);
113 for (auto it = globalIdVector_.begin(); it != globalIdVector_.end();) {
114 if (it->second == globalId) {
115 globalIdVector_.erase(it);
116 break;
117 } else {
118 ++it;
119 }
120 }
121 }
122
SetGlobalId(int32_t soundId,int32_t globalId)123 void ParallelStreamManager::SetGlobalId(int32_t soundId, int32_t globalId)
124 {
125 std::lock_guard lock(globalIdMutex_);
126 globalIdVector_.push_back(std::make_pair(soundId, globalId));
127 }
128
DelSoundId(int32_t soundId)129 void ParallelStreamManager::DelSoundId(int32_t soundId)
130 {
131 std::lock_guard lock(globalIdMutex_);
132 for (auto it = globalIdVector_.begin(); it != globalIdVector_.end();) {
133 if (it->first == soundId) {
134 OHOS::Media::AudioRendererManager::GetInstance().DelAudioRenderer(it->second);
135 it = globalIdVector_.erase(it);
136 } else {
137 ++it;
138 }
139 }
140 }
141
Play(std::shared_ptr<SoundParser> soundParser,PlayParams & playParameters)142 int32_t ParallelStreamManager::Play(std::shared_ptr<SoundParser> soundParser, PlayParams &playParameters)
143 {
144 MediaTrace trace("ParallelStreamManager::Play");
145 CHECK_AND_RETURN_RET_LOG(soundParser != nullptr, -1, "Invalid soundParser.");
146 int32_t streamId;
147 std::shared_ptr<Stream> stream;
148 int32_t soundId = soundParser->GetSoundID();
149 {
150 std::lock_guard lock(parallelStreamManagerLock_);
151 do {
152 nextStreamId_ = nextStreamId_ == INT32_MAX ? 1 : nextStreamId_ + 1;
153 } while (FindStream(nextStreamId_) != nullptr);
154 streamId = nextStreamId_;
155 }
156 std::shared_ptr<AudioBufferEntry> cacheData;
157 soundParser->GetSoundData(cacheData);
158 size_t cacheDataTotalSize = soundParser->GetSoundDataTotalSize();
159 MEDIA_LOGI("ParallelStreamManager::Play cacheDataTotalSize:%{public}zu", cacheDataTotalSize);
160 stream = std::make_shared<Stream>(soundParser->GetSoundTrackFormat(), soundId, streamId, streamStopThreadPool_);
161 CHECK_AND_RETURN_RET_LOG(stream != nullptr, MSERR_INVALID_VAL, "failed to create stream");
162 stream->SetSoundData(cacheData, cacheDataTotalSize);
163 stream->SetPlayParamAndRendererInfo(playParameters, audioRendererInfo_);
164 stream->SetManager(weak_from_this());
165 CHECK_AND_RETURN_RET_LOG(callback_ != nullptr, MSERR_INVALID_VAL, "Invalid callback");
166 stream->SetCallback(callback_);
167 std::shared_ptr<ISoundPoolCallback> streamCallback_ = std::make_shared<StreamCallBack>(weak_from_this());
168 CHECK_AND_RETURN_RET_LOG(streamCallback_ != nullptr, MSERR_INVALID_VAL, "error stream callback");
169 stream->SetStreamCallback(streamCallback_);
170 if (frameWriteCallback_ != nullptr) {
171 stream->SetFrameWriteCallback(frameWriteCallback_);
172 }
173
174 int32_t ret = PreparePlay(stream, false);
175 CHECK_AND_RETURN_RET_LOG(ret == MSERR_OK, ERROE_STREAM_ID, "ParallelStreamManager PreparePlay fail");
176 return streamId;
177 }
178
PreparePlay(std::shared_ptr<Stream> stream,bool waitQueueFlag)179 int32_t ParallelStreamManager::PreparePlay(std::shared_ptr<Stream> stream, bool waitQueueFlag)
180 {
181 MediaTrace trace("ParallelStreamManager::PreparePlay");
182 std::shared_ptr<Stream> lastPlay;
183 bool lastPlayStopFlag = false;
184 {
185 std::lock_guard lock(parallelStreamManagerLock_);
186 CHECK_AND_RETURN_RET_LOG(stream != nullptr, MSERR_INVALID_VAL, "PreparePlay invaid stream");
187 if (waitQueueFlag) {
188 stream = waitingStream_.front().second;
189 MEDIA_LOGI("PreparePlay start from waitqueue streamID:%{public}d", stream->GetStreamID());
190 }
191 int32_t streamId = stream->GetStreamID();
192 MEDIA_LOGI("ParallelStreamManager playingDeque size:%{public}d, maxStreams_:%{public}d",
193 static_cast<int32_t>(playingStream_.size()), maxStreams_);
194 if (playingStream_.size() < static_cast<size_t>(maxStreams_)) {
195 DealQueueAndAddTask(streamId, stream, waitQueueFlag);
196 } else {
197 lastPlay = playingStream_.back().second;
198 if (stream->GetPriority() >= lastPlay->GetPriority()) {
199 MEDIA_LOGI("stop streamId:%{public}d, play streamId:%{public}d", lastPlay->GetStreamID(), streamId);
200 lastPlayStopFlag = true;
201 playingStream_.pop_back();
202 DealQueueAndAddTask(streamId, stream, waitQueueFlag);
203 } else {
204 MEDIA_LOGI("ParallelStreamManager to add waitingStream_");
205 if (!waitQueueFlag) {
206 AddToWaitingDeque(streamId, stream);
207 }
208 }
209 }
210 }
211 if (lastPlayStopFlag && lastPlay != nullptr) {
212 ThreadPool::Task streamStopTask = [weakThis = std::weak_ptr<Stream>(lastPlay)] {
213 if (auto thisPtr = weakThis.lock()) {
214 thisPtr->Stop();
215 } else {
216 MEDIA_LOGI("PreparePlay Stream object has been destroyed, skipping Stop");
217 }
218 };
219 streamStopThreadPool_->AddTask(streamStopTask);
220 }
221 return MSERR_OK;
222 }
223
DealQueueAndAddTask(int32_t streamId,std::shared_ptr<Stream> stream,bool waitQueueFlag)224 void ParallelStreamManager::DealQueueAndAddTask(int32_t streamId, std::shared_ptr<Stream> stream, bool waitQueueFlag)
225 {
226 AddToPlayingDeque(streamId, stream);
227 if (waitQueueFlag) {
228 RemoveFromWaitingDeque(streamId);
229 }
230 ThreadPool::Task streamPlayTask = [this, streamId] { this->DoPlay(streamId); };
231 streamPlayThreadPool_->AddTask(streamPlayTask);
232 }
233
RemoveFromWaitingDeque(int32_t streamId)234 void ParallelStreamManager::RemoveFromWaitingDeque(int32_t streamId)
235 {
236 for (auto it = waitingStream_.begin(); it != waitingStream_.end();) {
237 if (it->first == streamId) {
238 it = waitingStream_.erase(it);
239 break;
240 } else {
241 ++it;
242 }
243 }
244 }
245
AddToPlayingDeque(int32_t streamID,std::shared_ptr<Stream> stream)246 void ParallelStreamManager::AddToPlayingDeque(int32_t streamID, std::shared_ptr<Stream> stream)
247 {
248 if (playingStream_.empty()) {
249 playingStream_.push_front(std::make_pair(streamID, stream));
250 } else {
251 for (size_t i = 0; i < playingStream_.size(); i++) {
252 std::shared_ptr<Stream> playingStream = playingStream_[i].second;
253 if (stream->GetPriority() >= playingStream->GetPriority()) {
254 playingStream_.insert(playingStream_.begin() + i, std::make_pair(streamID, stream));
255 break;
256 }
257 if (playingStream_.size() >= 1 && i == playingStream_.size() - 1 &&
258 stream->GetPriority() < playingStream->GetPriority()) {
259 playingStream_.push_back(std::make_pair(streamID, stream));
260 break;
261 }
262 }
263 }
264 }
265
AddToWaitingDeque(int32_t streamID,std::shared_ptr<Stream> stream)266 void ParallelStreamManager::AddToWaitingDeque(int32_t streamID, std::shared_ptr<Stream> stream)
267 {
268 if (waitingStream_.empty()) {
269 waitingStream_.push_front(std::make_pair(streamID, stream));
270 } else {
271 for (size_t i = 0; i < waitingStream_.size(); i++) {
272 std::shared_ptr<Stream> waitingStream = waitingStream_[i].second;
273 if (stream->GetPriority() >= waitingStream->GetPriority()) {
274 waitingStream_.insert(waitingStream_.begin() + i, std::make_pair(streamID, stream));
275 break;
276 }
277 if (waitingStream_.size() >= 1 && i == waitingStream_.size() - 1 &&
278 stream->GetPriority() < waitingStream->GetPriority()) {
279 waitingStream_.push_back(std::make_pair(streamID, stream));
280 break;
281 }
282 }
283 }
284 }
285
DoPlay(int32_t streamID)286 int32_t ParallelStreamManager::DoPlay(int32_t streamID)
287 {
288 MediaTrace trace("ParallelStreamManager::DoPlay");
289 std::shared_ptr<Stream> stream;
290 {
291 std::lock_guard lock(parallelStreamManagerLock_);
292 MEDIA_LOGI("ParallelStreamManager::DoPlay start streamID:%{public}d", streamID);
293 stream = FindStream(streamID);
294 CHECK_AND_RETURN_RET_LOG(stream != nullptr, MSERR_INVALID_VAL, "doplay stream invalid.");
295 }
296
297 if (stream->DoPlay() == MSERR_OK) {
298 MEDIA_LOGI("ParallelStreamManager::DoPlay success streamID:%{public}d", streamID);
299 return MSERR_OK;
300 }
301
302 MEDIA_LOGE("ParallelStreamManager::DoPlay failed streamID:%{public}d", streamID);
303 {
304 std::lock_guard lock(parallelStreamManagerLock_);
305 for (auto it = playingStream_.begin(); it != playingStream_.end();) {
306 if (it->first == streamID) {
307 it = playingStream_.erase(it);
308 break;
309 } else {
310 ++it;
311 }
312 }
313 }
314 return MSERR_INVALID_VAL;
315 }
316
FindStream(const int32_t streamId)317 std::shared_ptr<Stream> ParallelStreamManager::FindStream(const int32_t streamId)
318 {
319 CHECK_AND_RETURN_RET_LOG(streamId >= 0, nullptr, "streamId invalid.");
320 for (auto it = playingStream_.begin(); it != playingStream_.end();) {
321 if (it->first == streamId) {
322 return it->second;
323 } else {
324 ++it;
325 }
326 }
327 for (auto it = waitingStream_.begin(); it != waitingStream_.end();) {
328 if (it->first == streamId) {
329 return it->second;
330 } else {
331 ++it;
332 }
333 }
334 return nullptr;
335 }
336
FindStreamLock(const int32_t streamId)337 std::shared_ptr<Stream> ParallelStreamManager::FindStreamLock(const int32_t streamId)
338 {
339 std::lock_guard lock(parallelStreamManagerLock_);
340 return FindStream(streamId);
341 }
342
UnloadStream(int32_t soundId)343 int32_t ParallelStreamManager::UnloadStream(int32_t soundId)
344 {
345 MediaTrace trace("ParallelStreamManager::UnloadStream");
346 parallelStreamManagerLock_.lock();
347 for (auto it = waitingStream_.begin(); it != waitingStream_.end();) {
348 if (it->second->GetSoundID() == soundId) {
349 it = waitingStream_.erase(it);
350 } else {
351 ++it;
352 }
353 }
354 std::vector<std::shared_ptr<Stream>> vector;
355 for (auto it = playingStream_.begin(); it != playingStream_.end();) {
356 if (it->second->GetSoundID() == soundId) {
357 vector.push_back(it->second);
358 it = playingStream_.erase(it);
359 } else {
360 ++it;
361 }
362 }
363 parallelStreamManagerLock_.unlock();
364 for (auto& item : vector) {
365 item->Stop();
366 }
367 DelSoundId(soundId);
368 return MSERR_OK;
369 }
370
ReorderStream()371 void ParallelStreamManager::ReorderStream()
372 {
373 std::lock_guard lock(parallelStreamManagerLock_);
374 int32_t playingSize = static_cast<int32_t>(playingStream_.size());
375 for (int32_t i = 0; i < playingSize - 1; ++i) {
376 for (int32_t j = 0; j < playingSize - 1 - i; ++j) {
377 std::shared_ptr<Stream> left = playingStream_[j].second;
378 std::shared_ptr<Stream> right = playingStream_[j + 1].second;
379 if (left != nullptr && right != nullptr && left->GetPriority() < right->GetPriority()) {
380 auto streamTemp = playingStream_[j];
381 playingStream_[j] = playingStream_[j + 1];
382 playingStream_[j + 1] = streamTemp;
383 }
384 }
385 }
386
387 int32_t willPlaySize = static_cast<int32_t>(waitingStream_.size());
388 for (int32_t i = 0; i < willPlaySize - 1; ++i) {
389 for (int32_t j = 0; j < willPlaySize - 1 - i; ++j) {
390 std::shared_ptr<Stream> left = waitingStream_[j].second;
391 std::shared_ptr<Stream> right = waitingStream_[j + 1].second;
392 if (left != nullptr && right != nullptr && left->GetPriority() < right->GetPriority()) {
393 auto willPlayTemp = waitingStream_[j];
394 waitingStream_[j] = waitingStream_[j + 1];
395 waitingStream_[j + 1] = willPlayTemp;
396 }
397 }
398 }
399 }
400
OnPlayFinished(int32_t streamID)401 void ParallelStreamManager::OnPlayFinished(int32_t streamID)
402 {
403 MediaTrace trace("ParallelStreamManager::OnPlayFinished");
404 std::lock_guard lock(parallelStreamManagerLock_);
405 MEDIA_LOGI("ParallelStreamManager::OnPlayFinished streamID:%{public}d", streamID);
406 for (auto it = playingStream_.begin(); it != playingStream_.end();) {
407 if (it->first == streamID) {
408 it = playingStream_.erase(it);
409 break;
410 } else {
411 ++it;
412 }
413 }
414 if (waitingStream_.size() > 0) {
415 MEDIA_LOGI("ParallelStreamManager::OnPlayFinished waitingStream_");
416 bool playFlag = true;
417 std::shared_ptr<Stream> waitFront = waitingStream_.front().second;
418 ThreadPool::Task streamPlayTask = [this, waitFront, playFlag] {
419 this->PreparePlay(waitFront, playFlag);
420 };
421 streamPlayThreadPool_->AddTask(streamPlayTask);
422 }
423 }
424
SetCallback(const std::shared_ptr<ISoundPoolCallback> & callback)425 int32_t ParallelStreamManager::SetCallback(const std::shared_ptr<ISoundPoolCallback> &callback)
426 {
427 callback_ = callback;
428 return MSERR_OK;
429 }
430
SetFrameWriteCallback(const std::shared_ptr<ISoundPoolFrameWriteCallback> & callback)431 int32_t ParallelStreamManager::SetFrameWriteCallback(const std::shared_ptr<ISoundPoolFrameWriteCallback> &callback)
432 {
433 frameWriteCallback_ = callback;
434 return MSERR_OK;
435 }
436
OnLoadCompleted(int32_t soundId)437 void ParallelStreamManager::StreamCallBack::OnLoadCompleted(int32_t soundId)
438 {
439 (void)soundId;
440 }
441
OnPlayFinished(int32_t streamId)442 void ParallelStreamManager::StreamCallBack::OnPlayFinished(int32_t streamId)
443 {
444 if (std::shared_ptr<ParallelStreamManager> ptr = parallelStreamManagerInner_.lock()) {
445 ptr->OnPlayFinished(streamId);
446 }
447 }
448
OnError(int32_t errorCode)449 void ParallelStreamManager::StreamCallBack::OnError(int32_t errorCode)
450 {
451 (void)errorCode;
452 }
453
454 } // namespace Media
455 } // namespace OHOS
456