1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "buffer_dispatcher.h"
17 #include <cmath>
18 #include <cstdarg>
19 #include <cstdint>
20 #include "common/common_macro.h"
21 #include "media_channel_def.h"
22
23 namespace OHOS {
24 namespace Sharing {
25
26 constexpr int32_t WRITING_TIMTOUT = 30;
27 constexpr int32_t FIX_OFFSET_TWO = 2;
28 constexpr int32_t FIX_OFFSET_ONE = 1;
29
SetSource(IBufferReader::Ptr dataReader)30 void BufferReceiver::SetSource(IBufferReader::Ptr dataReader)
31 {
32 SHARING_LOGD("trace.");
33 bufferReader_ = dataReader;
34 }
35
OnMediaDataNotify()36 int32_t BufferReceiver::OnMediaDataNotify()
37 {
38 SHARING_LOGD("BufferReceiver Media notified.");
39 dataReady_ = true;
40 notifyData_.notify_one();
41 return 0;
42 }
43
OnAudioDataNotify()44 int32_t BufferReceiver::OnAudioDataNotify()
45 {
46 MEDIA_LOGD("BufferReceiver Audio notified.");
47 nonBlockAudio_ = true;
48 notifyAudio_.notify_one();
49 return 0;
50 }
51
OnVideoDataNotify()52 int32_t BufferReceiver::OnVideoDataNotify()
53 {
54 MEDIA_LOGD("BufferReceiver Video notified.");
55 nonBlockVideo_ = true;
56 notifyVideo_.notify_one();
57 return 0;
58 }
59
IsMixedReceiver()60 bool BufferReceiver::IsMixedReceiver()
61 {
62 MEDIA_LOGD("trace.");
63 return mixed_;
64 }
65
RequestRead(MediaType type,std::function<void (const MediaData::Ptr & data)> cb)66 int32_t BufferReceiver::RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb)
67 {
68 MEDIA_LOGD("trace.");
69 if (bufferReader_ == nullptr) {
70 SHARING_LOGE("BufferReceiver read failed null dispatcher.");
71 return -1;
72 }
73
74 if (firstMRead_ && type == MEDIA_TYPE_AV) {
75 bufferReader_->NotifyReadReady(GetReceiverId(), type);
76 mixed_ = true;
77 firstMRead_ = false;
78 } else if (firstARead_ && type == MEDIA_TYPE_AUDIO) {
79 bufferReader_->NotifyReadReady(GetReceiverId(), type);
80 firstARead_ = false;
81 } else if (firstVRead_ && type == MEDIA_TYPE_VIDEO) {
82 bufferReader_->NotifyReadReady(GetReceiverId(), type);
83 firstVRead_ = false;
84 }
85 std::unique_lock<std::mutex> locker(mutex_);
86 MEDIA_LOGD("BufferDispatcher NotifyThreadWorker before wait, receiverId: %{public}u.", GetReceiverId());
87 switch (type) {
88 /* cv will waiting if pred is false;
89 so set waiting audio pred (type != MEDIA_TYPE_AUDIO) to NOT block other type.*/
90 case MEDIA_TYPE_AUDIO:
91 MEDIA_LOGD("wait Audio, receiverId: %{public}u.", GetReceiverId());
92 notifyAudio_.wait(locker, [=]() { return nonBlockAudio_ || type != MEDIA_TYPE_AUDIO; });
93 nonBlockAudio_ = false;
94 break;
95 case MEDIA_TYPE_VIDEO:
96 MEDIA_LOGD("wait Video, receiverId: %{public}u.", GetReceiverId());
97 notifyVideo_.wait(locker, [=]() { return nonBlockVideo_ || type != MEDIA_TYPE_VIDEO; });
98 nonBlockVideo_ = false;
99 break;
100 case MEDIA_TYPE_AV:
101 MEDIA_LOGD("wait Mixed, receiverId: %{public}u.", GetReceiverId());
102 notifyData_.wait(locker, [=]() { return dataReady_ || type != MEDIA_TYPE_AV; });
103 dataReady_ = false;
104 break;
105 default:
106 return 0;
107 break;
108 }
109
110 bufferReader_->ClearDataBit(GetReceiverId(), type);
111 bufferReader_->ClearReadBit(GetReceiverId(), type);
112 MEDIA_LOGD("BufferDispatcher NotifyThreadWorker after wait start read, receiverId: %{public}u.", GetReceiverId());
113 int32_t ret = bufferReader_->ReadBufferData(GetReceiverId(), type, cb);
114 bufferReader_->NotifyReadReady(GetReceiverId(), type);
115 dataReady_ = false;
116
117 return ret;
118 }
119
NotifyReadStart()120 void BufferReceiver::NotifyReadStart()
121 {
122 SHARING_LOGD("receiverId: %{public}u notify start read.", GetReceiverId());
123 firstARead_ = true;
124 firstVRead_ = true;
125 firstMRead_ = true;
126 }
127
GetReceiverId()128 uint32_t BufferReceiver::GetReceiverId()
129 {
130 MEDIA_LOGD("trace.");
131 return GetId();
132 }
133
GetDispatcherId()134 uint32_t BufferReceiver::GetDispatcherId()
135 {
136 SHARING_LOGD("trace.");
137 if (bufferReader_) {
138 return bufferReader_->GetDispatcherId();
139 }
140
141 return 0;
142 }
143
NotifyReadStop()144 void BufferReceiver::NotifyReadStop()
145 {
146 SHARING_LOGD("receiverId: %{public}u notify stop read.", GetReceiverId());
147 nonBlockAudio_ = true;
148 nonBlockVideo_ = true;
149 dataReady_ = true;
150 notifyAudio_.notify_all();
151 notifyVideo_.notify_all();
152 notifyData_.notify_all();
153 }
154
EnableKeyMode(bool enable)155 void BufferReceiver::EnableKeyMode(bool enable)
156 {
157 SHARING_LOGD("bufferReceiver id %{public}u SetKeyOnlyMode %{public}d.", GetReceiverId(), enable);
158 if (keyOnly_ == true && enable == false) {
159 SHARING_LOGD("Set KeyOnlyMode false, need report fast read over.");
160 accelerationDone_ = true;
161 }
162
163 keyOnly_ = enable;
164 if (bufferReader_ && enable) {
165 bufferReader_->ClearDataBit(GetReceiverId(), MEDIA_TYPE_VIDEO);
166 }
167
168 auto listener = listener_.lock();
169 if (listener) {
170 listener->OnKeyModeNotify(enable);
171 }
172 }
173
IsKeyMode()174 bool BufferReceiver::IsKeyMode()
175 {
176 MEDIA_LOGD("trace.");
177 return keyOnly_;
178 }
179
IsKeyRedirect()180 bool BufferReceiver::IsKeyRedirect()
181 {
182 SHARING_LOGD("trace.");
183 return keyRedirect_;
184 }
185
GetSPS()186 const MediaData::Ptr BufferReceiver::GetSPS()
187 {
188 MEDIA_LOGD("trace.");
189 if (bufferReader_) {
190 return bufferReader_->GetSPS();
191 }
192
193 return nullptr;
194 }
195
GetPPS()196 const MediaData::Ptr BufferReceiver::GetPPS()
197 {
198 MEDIA_LOGD("trace.");
199 if (bufferReader_) {
200 return bufferReader_->GetPPS();
201 }
202
203 return nullptr;
204 }
205
NeedAcceleration()206 bool BufferReceiver::NeedAcceleration()
207 {
208 MEDIA_LOGD("trace.");
209 return accelerationDone_;
210 }
211
DisableAcceleration()212 void BufferReceiver::DisableAcceleration()
213 {
214 SHARING_LOGD("trace.");
215 accelerationDone_ = false;
216 }
217
SendAccelerationDone()218 void BufferReceiver::SendAccelerationDone()
219 {
220 SHARING_LOGD("trace.");
221 auto listener = listener_.lock();
222 if (listener) {
223 listener->OnAccelerationDoneNotify();
224 }
225 }
226
EnableKeyRedirect(bool enable)227 void BufferReceiver::EnableKeyRedirect(bool enable)
228 {
229 SHARING_LOGD("trace.");
230 if (bufferReader_ && enable) {
231 bufferReader_->EnableKeyRedirect(enable);
232 }
233 keyRedirect_ = enable;
234 }
235
SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)236 void BufferReceiver::SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)
237 {
238 SHARING_LOGD("trace.");
239 listener_ = listener;
240 }
241
242 using DataNotifier = BufferDispatcher::DataNotifier;
243
NotifyDataReceiver(MediaType type)244 void DataNotifier::NotifyDataReceiver(MediaType type)
245 {
246 MEDIA_LOGD("trace.");
247 if (receiver_.lock() == nullptr) {
248 SHARING_LOGE("target receiver NOT exist.");
249 return;
250 }
251
252 if (block_) {
253 return;
254 }
255
256 MEDIA_LOGD("notify target type %{public}d.", type);
257 switch (type) {
258 case MEDIA_TYPE_AUDIO:
259 GetBufferReceiver()->OnAudioDataNotify();
260 break;
261 case MEDIA_TYPE_VIDEO:
262 GetBufferReceiver()->OnVideoDataNotify();
263 break;
264 case MEDIA_TYPE_AV:
265 GetBufferReceiver()->OnMediaDataNotify();
266 break;
267 default:
268 SHARING_LOGI("none process case.");
269 break;
270 }
271 }
272
GetBufferReceiver()273 BufferReceiver::Ptr DataNotifier::GetBufferReceiver()
274 {
275 MEDIA_LOGD("trace.");
276 return receiver_.lock();
277 }
278
GetReceiverId()279 uint32_t DataNotifier::GetReceiverId()
280 {
281 MEDIA_LOGD("trace.");
282 auto receiver = receiver_.lock();
283 if (receiver == nullptr) {
284 SHARING_LOGE("target receiver NOT exist.");
285 return INVALID_INDEX;
286 }
287
288 return receiver->GetReceiverId();
289 }
290
SetListenDispatcher(IBufferReader::Ptr dispatcher)291 void DataNotifier::SetListenDispatcher(IBufferReader::Ptr dispatcher)
292 {
293 SHARING_LOGD("trace.");
294 dispatcher_ = dispatcher;
295 }
296
SetNotifyReceiver(BufferReceiver::Ptr receiver)297 void DataNotifier::SetNotifyReceiver(BufferReceiver::Ptr receiver)
298 {
299 SHARING_LOGD("trace.");
300 receiver_ = receiver;
301 }
302
SetBlock()303 void DataNotifier::SetBlock()
304 {
305 SHARING_LOGD("trace.");
306 block_ = true;
307 }
308
SetNeedUpdate(bool enable,MediaType type)309 void DataNotifier::SetNeedUpdate(bool enable, MediaType type)
310 {
311 MEDIA_LOGD("trace.");
312 if (type == MEDIA_TYPE_AUDIO) {
313 needUpdateAIndex = enable;
314 } else {
315 needUpdateVIndex = enable;
316 }
317 }
318
DataAvailable(MediaType type)319 bool DataNotifier::DataAvailable(MediaType type)
320 {
321 MEDIA_LOGD("trace.");
322 auto dispatcher = dispatcher_.lock();
323 if (dispatcher == nullptr) {
324 SHARING_LOGE("target dispatcher NOT exist.");
325 return false;
326 }
327
328 if (type == MEDIA_TYPE_AUDIO) {
329 return audioIndex != INVALID_INDEX &&
330 (audioIndex < dispatcher->GetLatestAudioIndex() || !dispatcher->IsRead(GetReceiverId(), audioIndex + 1));
331 } else if (type == MEDIA_TYPE_VIDEO) {
332 return videoIndex != INVALID_INDEX &&
333 (videoIndex < dispatcher->GetLatestVideoIndex() || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
334 } else {
335 return videoIndex != INVALID_INDEX &&
336 (videoIndex < dispatcher->GetBufferSize() - 1 || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
337 }
338
339 return false;
340 }
341
IsMixedReceiver()342 bool DataNotifier::IsMixedReceiver()
343 {
344 MEDIA_LOGD("trace.");
345 auto receiver = receiver_.lock();
346 if (receiver == nullptr) {
347 SHARING_LOGE("target receiver NOT exist.");
348 return false;
349 }
350
351 return receiver->IsMixedReceiver();
352 }
353
GetReceiverReadIndex(MediaType type)354 uint32_t DataNotifier::GetReceiverReadIndex(MediaType type)
355 {
356 MEDIA_LOGD("trace.");
357 switch (type) {
358 case MEDIA_TYPE_VIDEO:
359 MEDIA_LOGD("Video Recvid:%{public}d index: %{public}d.", GetReceiverId(), videoIndex);
360 return videoIndex;
361 break;
362 case MEDIA_TYPE_AUDIO:
363 MEDIA_LOGD("Audio Recvid:%{public}d index: %{public}d.", GetReceiverId(), audioIndex);
364 return audioIndex;
365 break;
366 case MEDIA_TYPE_AV:
367 MEDIA_LOGD("Mixed Recvid:%{public}d vindex: %{public}d aindex: %{public}d.", GetReceiverId(), videoIndex,
368 audioIndex);
369 if (audioIndex != INVALID_INDEX && videoIndex != INVALID_INDEX) {
370 return audioIndex <= videoIndex ? audioIndex : videoIndex;
371 } else if (audioIndex == INVALID_INDEX && videoIndex == INVALID_INDEX) {
372 return INVALID_INDEX;
373 } else {
374 return audioIndex == INVALID_INDEX ? videoIndex : audioIndex;
375 }
376 break;
377 default:
378 return INVALID_INDEX;
379 break;
380 }
381 }
382
IsKeyModeReceiver()383 bool DataNotifier::IsKeyModeReceiver()
384 {
385 MEDIA_LOGD("trace.");
386 auto receiver = receiver_.lock();
387 if (receiver) {
388 return receiver->IsKeyMode();
389 }
390
391 return false;
392 }
393
IsKeyRedirectReceiver()394 bool DataNotifier::IsKeyRedirectReceiver()
395 {
396 SHARING_LOGD("trace.");
397 auto receiver = receiver_.lock();
398 if (receiver) {
399 return receiver->IsKeyRedirect();
400 }
401
402 return false;
403 }
404
NeedAcceleration()405 bool DataNotifier::NeedAcceleration()
406 {
407 MEDIA_LOGD("trace.");
408 auto receiver = receiver_.lock();
409 if (receiver == nullptr) {
410 SHARING_LOGE("target receiver NOT exist.");
411 return false;
412 }
413
414 return receiver->NeedAcceleration();
415 }
416
SendAccelerationDone()417 void DataNotifier::SendAccelerationDone()
418 {
419 SHARING_LOGD("trace.");
420 auto receiver = receiver_.lock();
421 if (receiver == nullptr) {
422 SHARING_LOGE("target receiver NOT exist.");
423 return;
424 }
425
426 receiver->SendAccelerationDone();
427 receiver->DisableAcceleration();
428 }
429
BufferDispatcher(uint32_t maxCapacity,uint32_t capacityIncrement)430 BufferDispatcher::BufferDispatcher(uint32_t maxCapacity, uint32_t capacityIncrement)
431 {
432 SHARING_LOGD("BufferDispatcher ctor, set capacity: %{public}u.", maxCapacity);
433 maxBufferCapacity_ = maxCapacity;
434 bufferCapacityIncrement_ = capacityIncrement;
435 {
436 std::lock_guard<std::mutex> lock(idleMutex_);
437 idleAudioBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
438 idleVideoBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
439 for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
440 MediaData::Ptr adata = std::make_shared<MediaData>();
441 MediaData::Ptr vdata = std::make_shared<MediaData>();
442 adata->buff = std::make_shared<DataBuffer>();
443 vdata->buff = std::make_shared<DataBuffer>();
444 idleAudioBuffer_.push_back(adata);
445 idleVideoBuffer_.push_back(vdata);
446 }
447 }
448
449 writingTimer_ = std::make_unique<TimeoutTimer>("dispatcher-writing-timer");
450
451 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
452 circularBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
453 StartDispatch();
454 }
455
~BufferDispatcher()456 BufferDispatcher::~BufferDispatcher()
457 {
458 SHARING_LOGI("BufferDispatcher dtor.");
459 running_ = false;
460 StopDispatch();
461 FlushBuffer();
462 ReleaseIdleBuffer();
463 ReleaseAllReceiver();
464 }
465
StartDispatch()466 void BufferDispatcher::StartDispatch()
467 {
468 SHARING_LOGD("trace.");
469 running_ = true;
470 notifyThread_ = std::thread([this] {
471 this->NotifyThreadWorker(this);
472 });
473 std::string name = "notifyThread";
474 pthread_setname_np(notifyThread_.native_handle(), name.c_str());
475 }
476
StopDispatch()477 void BufferDispatcher::StopDispatch()
478 {
479 SHARING_LOGD("trace.");
480 running_ = false;
481 continueNotify_ = true;
482
483 if (writingTimer_) {
484 writingTimer_.reset();
485 }
486
487 dataCV_.notify_all();
488 if (notifyThread_.joinable()) {
489 notifyThread_.join();
490 }
491 }
492
SetBufferCapacity(size_t capacity)493 void BufferDispatcher::SetBufferCapacity(size_t capacity)
494 {
495 SHARING_LOGD("trace.");
496 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
497 circularBuffer_.set_capacity(capacity);
498 }
499
SetDataMode(MediaDispacherMode dataMode)500 void BufferDispatcher::SetDataMode(MediaDispacherMode dataMode)
501 {
502 SHARING_LOGD("trace.");
503 dataMode_ = dataMode;
504 }
505
ReleaseIdleBuffer()506 void BufferDispatcher::ReleaseIdleBuffer()
507 {
508 SHARING_LOGD("BufferDispatcher idle Release Start.");
509 std::unique_lock<std::mutex> locker(idleMutex_);
510 for (auto &data : idleAudioBuffer_) {
511 if (data != nullptr && data->buff != nullptr) {
512 data->buff.reset();
513 }
514 }
515
516 idleAudioBuffer_.clear();
517 for (auto &data : idleVideoBuffer_) {
518 if (data != nullptr && data->buff != nullptr) {
519 data->buff.reset();
520 }
521 }
522
523 idleVideoBuffer_.clear();
524 SHARING_LOGD("BufferDispatcher idle Release End.");
525 }
526
FlushBuffer()527 void BufferDispatcher::FlushBuffer()
528 {
529 SHARING_LOGI("BufferDispatcher Start flushing, dispatcherId: %{public}u.", GetDispatcherId());
530 {
531 std::lock_guard<std::mutex> lock(idleMutex_);
532 idleAudioBuffer_.clear();
533 idleVideoBuffer_.clear();
534 for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
535 MediaData::Ptr adata = std::make_shared<MediaData>();
536 MediaData::Ptr vdata = std::make_shared<MediaData>();
537 adata->buff = std::make_shared<DataBuffer>();
538 vdata->buff = std::make_shared<DataBuffer>();
539 idleAudioBuffer_.push_back(adata);
540 idleVideoBuffer_.push_back(vdata);
541 }
542 }
543
544 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
545 for (auto &data : circularBuffer_) {
546 if (data->mediaData != nullptr && data->mediaData->buff != nullptr) {
547 data->mediaData->buff.reset();
548 }
549 }
550
551 circularBuffer_.clear();
552 waitingKey_ = true;
553 gop_ = 0;
554 audioFrameCnt_ = 0;
555 videoFrameCnt_ = 0;
556 ResetAllIndex();
557 SHARING_LOGD("BufferDispatcher Dispatcher flushing end, dispatcherId: %{public}u.", GetDispatcherId());
558 }
559
RequestDataBuffer(MediaType type,uint32_t size)560 MediaData::Ptr BufferDispatcher::RequestDataBuffer(MediaType type, uint32_t size)
561 {
562 SHARING_LOGD("trace.");
563 std::lock_guard<std::mutex> lock(idleMutex_);
564 if (size <= 0) {
565 SHARING_LOGE("Size invalid.");
566 return nullptr;
567 }
568
569 MediaData::Ptr retData;
570 if (type == MEDIA_TYPE_VIDEO) {
571 if (!idleVideoBuffer_.empty()) {
572 SHARING_LOGD("video From idle.");
573 retData = idleVideoBuffer_.front();
574 idleVideoBuffer_.pop_front();
575 if (retData == nullptr) {
576 MEDIA_LOGW("video From alloc when idle nullptr.");
577 retData = std::make_shared<MediaData>();
578 }
579 return retData;
580 }
581 } else {
582 if (!idleAudioBuffer_.empty()) {
583 SHARING_LOGD("Audio From idle.");
584 retData = idleAudioBuffer_.front();
585 idleAudioBuffer_.pop_front();
586 if (retData == nullptr) {
587 MEDIA_LOGW("Audio From alloc when idle nullptr.");
588 retData = std::make_shared<MediaData>();
589 }
590 return retData;
591 }
592 }
593
594 SHARING_LOGD("Audio/video from alloc.");
595 retData = std::make_shared<MediaData>();
596 return retData;
597 }
598
ReturnIdleBuffer(DataSpec::Ptr & data)599 void BufferDispatcher::ReturnIdleBuffer(DataSpec::Ptr &data)
600 {
601 MEDIA_LOGD("trace.");
602 std::lock_guard<std::mutex> lock(idleMutex_);
603 if (data == nullptr || data->mediaData == nullptr) {
604 return;
605 }
606 if (data->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
607 if (idleVideoBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
608 idleVideoBuffer_.push_back(data->mediaData);
609 MEDIA_LOGD("data: push_back in idleVideoBuffer_, size: %{public}zu.", idleVideoBuffer_.size());
610 }
611 } else {
612 if (idleAudioBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
613 idleAudioBuffer_.push_back(data->mediaData);
614 MEDIA_LOGD("data: push_back in idleAudioBuffer_, size: %{public}zu.", idleAudioBuffer_.size());
615 }
616 }
617
618 data.reset();
619 }
620
GetBufferSize()621 size_t BufferDispatcher::GetBufferSize()
622 {
623 SHARING_LOGD("trace.");
624 return circularBuffer_.size();
625 }
626
FindReceiverIndex(uint32_t receiverId)627 uint32_t BufferDispatcher::FindReceiverIndex(uint32_t receiverId)
628 {
629 MEDIA_LOGD("trace.");
630 if (notifiers_.find(receiverId) != notifiers_.end()) {
631 return notifiers_[receiverId]->GetReadIndex();
632 }
633
634 return INVALID_INDEX;
635 }
636
IsRecevierExist(uint32_t receiverId)637 bool BufferDispatcher::IsRecevierExist(uint32_t receiverId)
638 {
639 SHARING_LOGD("trace.");
640 auto notifier = GetNotifierByReceiverId(receiverId);
641 if (notifier == nullptr) {
642 return false;
643 }
644
645 return true;
646 }
647
EnableKeyMode(bool enable)648 void BufferDispatcher::EnableKeyMode(bool enable)
649 {
650 SHARING_LOGD("trace.");
651 keyOnly_ = enable;
652 }
653
AttachReceiver(BufferReceiver::Ptr receiver)654 int32_t BufferDispatcher::AttachReceiver(BufferReceiver::Ptr receiver)
655 {
656 SHARING_LOGD("trace.");
657 if (receiver == nullptr) {
658 return -1;
659 }
660
661 if (IsRecevierExist(receiver->GetReceiverId())) {
662 SHARING_LOGE("Exist.");
663 return 0;
664 }
665
666 receiver->NotifyReadStart();
667 std::lock_guard<std::mutex> locker(notifyMutex_);
668 if (readRefFlag_ == 0xFFFF) {
669 SHARING_LOGE("readRefFlag limited.");
670 return -1;
671 }
672
673 DataNotifier::Ptr notifier = std::make_shared<DataNotifier>();
674 notifier->SetListenDispatcher(shared_from_this());
675 notifier->SetNotifyReceiver(receiver);
676
677 auto usableRef = ~readRefFlag_ & (-(~readRefFlag_));
678
679 if ((static_cast<uint32_t>(usableRef) & static_cast<uint32_t>(usableRef - 1)) != 0) {
680 SHARING_LOGE("usableRef: %{public}d invalid.", usableRef);
681 return -1;
682 }
683
684 readRefFlag_ |= static_cast<uint32_t>(usableRef);
685 notifier->SetReadIndex(static_cast<uint32_t>(log2(usableRef)));
686 SHARING_LOGI("receiverId: %{public}d, readIndex: %{public}d, usableRef: %{public}d, readRefFlag_: %{public}d.",
687 receiver->GetReceiverId(), notifier->GetReadIndex(), usableRef, readRefFlag_);
688 receiver->SetSource(shared_from_this());
689 notifiers_.emplace(receiver->GetReceiverId(), notifier);
690
691 if (circularBuffer_.empty()) {
692 notifier->audioIndex = INVALID_INDEX;
693 notifier->videoIndex = INVALID_INDEX;
694 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
695 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
696 SHARING_LOGD("BufferDispatcher Attach when buffer empty RecvId: %{public}d.", receiver->GetReceiverId());
697 videoNeedActivate_ = true;
698 audioNeedActivate_ = true;
699 return 0;
700 }
701
702 if (dataMode_ == MEDIA_AUDIO_ONLY) {
703 notifier->audioIndex = circularBuffer_.size() - 1;
704 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
705 notifier->videoIndex = INVALID_INDEX;
706 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
707 SHARING_LOGD("BufferDispatcher Attach when Keyindex List empty RecvId: %{public}d.",
708 receiver->GetReceiverId());
709 } else {
710 if (!keyIndexList_.empty()) {
711 SHARING_LOGD("BufferDispatcher Attach with Keyindex RecvId: %{public}d KeyIndex:%{public}d.",
712 receiver->GetReceiverId(), keyIndexList_.back());
713 uint32_t tempIndex = FindNextIndex(keyIndexList_.back(), MEDIA_TYPE_AUDIO);
714 notifier->audioIndex = tempIndex == keyIndexList_.back() ? INVALID_INDEX : tempIndex;
715 notifier->videoIndex = keyIndexList_.back();
716 bool isAudioReady = tempIndex != INVALID_INDEX ? true : false;
717 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, isAudioReady);
718 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, true);
719 if (lastAudioIndex_ == INVALID_INDEX) {
720 audioNeedActivate_ = true;
721 }
722 } else {
723 SHARING_LOGD("BufferDispatcher Attach with Non Keyindex Exist RecvId: %{public}d.",
724 receiver->GetReceiverId());
725 uint32_t tempIndex = FindLastIndex(MEDIA_TYPE_AUDIO);
726 notifier->audioIndex = tempIndex;
727 notifier->videoIndex = INVALID_INDEX;
728 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
729 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
730 if (lastAudioIndex_ == INVALID_INDEX) {
731 audioNeedActivate_ = true;
732 }
733 }
734 }
735
736 return 0;
737 }
738
DetachReceiver(BufferReceiver::Ptr receiver)739 int32_t BufferDispatcher::DetachReceiver(BufferReceiver::Ptr receiver)
740 {
741 SHARING_LOGI("buffer dispatcher: Detach receiver in.");
742 if (receiver == nullptr) {
743 SHARING_LOGE("buffer dispatcher: Detach receiver failed - null receiver.");
744 return -1;
745 }
746
747 if (!IsRecevierExist(receiver->GetReceiverId())) {
748 SHARING_LOGE("BufferDispatcher AttachReceiver No Vaild Recevier Exist.");
749 return 0;
750 }
751
752 auto notifier = GetNotifierByReceiverPtr(receiver);
753 if (notifier == nullptr) {
754 SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
755 return -1;
756 }
757
758 std::lock_guard<std::mutex> locker(notifyMutex_);
759 notifier->SetBlock();
760 SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
761 SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
762
763 readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
764 notifiers_.erase(receiver->GetReceiverId());
765 SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
766 return 0;
767 }
768
DetachReceiver(uint32_t receiverId,DataNotifier::Ptr notifier)769 int32_t BufferDispatcher::DetachReceiver(uint32_t receiverId, DataNotifier::Ptr notifier)
770 {
771 SHARING_LOGI("buffer dispatcher: Detach notifier in.");
772 if (notifier == nullptr) {
773 SHARING_LOGE("buffer dispatcher: Detach receiver failed - null notifier.");
774 return -1;
775 }
776 notifier->SetBlock();
777 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
778 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
779
780 readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
781 notifiers_.erase(receiverId);
782 SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
783 return 0;
784 }
785
ReleaseAllReceiver()786 void BufferDispatcher::ReleaseAllReceiver()
787 {
788 SHARING_LOGD("trace.");
789 std::lock_guard<std::mutex> locker(notifyMutex_);
790 for (auto it = notifiers_.begin(); it != notifiers_.end();) {
791 auto notifier = it->second;
792 if (notifier == nullptr) {
793 ++it;
794 continue;
795 }
796
797 auto receiver = notifier->GetBufferReceiver();
798 if (receiver == nullptr) {
799 ++it;
800 continue;
801 }
802
803 auto receiverId = receiver->GetReceiverId();
804 if (notifiers_.find(receiverId) != notifiers_.end()) {
805 auto notifierFind = notifiers_[receiverId];
806 ++it;
807 DetachReceiver(receiverId, notifierFind);
808 } else {
809 ++it;
810 SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
811 }
812 }
813
814 notifiers_.clear();
815 SHARING_LOGD("release all receiver out.");
816 }
817
SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)818 void BufferDispatcher::SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)
819 {
820 SHARING_LOGD("trace.");
821 listener_ = listener;
822 RETURN_IF_NULL(writingTimer_);
823 writingTimer_->StartTimer(
824 WRITING_TIMTOUT, "waiting for continuous data inputs",
825 [this]() {
826 if (!writing_) {
827 SHARING_LOGI("writing timeout");
828 auto listener = listener_.lock();
829 if (listener) {
830 listener->OnWriteTimeout();
831 }
832 } else {
833 SHARING_LOGI("restart timer");
834 writing_ = false;
835 }
836 },
837 true);
838 }
839
GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)840 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)
841 {
842 SHARING_LOGD("trace.");
843
844 return GetNotifierByReceiverId(receiver->GetReceiverId());
845 }
846
GetNotifierByReceiverId(uint32_t receiverId)847 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverId(uint32_t receiverId)
848 {
849 MEDIA_LOGD("trace.");
850 std::lock_guard<std::mutex> locker(notifyMutex_);
851 if (notifiers_.find(receiverId) != notifiers_.end()) {
852 return notifiers_[receiverId];
853 }
854
855 return nullptr;
856 }
857
ReadBufferData(uint32_t receiverId,MediaType type,std::function<void (const MediaData::Ptr & data)> cb)858 int32_t BufferDispatcher::ReadBufferData(uint32_t receiverId, MediaType type,
859 std::function<void(const MediaData::Ptr &data)> cb)
860 {
861 MEDIA_LOGD("in, receiverId: %{public}u.", receiverId);
862 auto notifier = GetNotifierByReceiverId(receiverId);
863 if (notifier == nullptr) {
864 SHARING_LOGE("notifier is nullptr.");
865 return -1;
866 }
867
868 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
869 uint32_t readIndex = notifier->GetReceiverReadIndex(type);
870 if (readIndex >= circularBuffer_.size()) {
871 SHARING_LOGE("Read wrong index exceed size.");
872 return -1;
873 }
874
875 if ((keyOnly_ || notifier->IsKeyModeReceiver()) && type == MEDIA_TYPE_VIDEO &&
876 !IsKeyVideoFrame(circularBuffer_.at(readIndex))) {
877 UpdateReceiverReadIndex(receiverId, readIndex, type);
878 SHARING_LOGE("Read Non Key Video in KeyOnly Mode index: %{public}u.", readIndex);
879 return -1;
880 }
881
882 if (IsDataReaded(receiverId, circularBuffer_.at(readIndex))) {
883 UpdateReceiverReadIndex(receiverId, readIndex, type);
884 return -1;
885 }
886
887 readIndex = notifier->GetReceiverReadIndex(type);
888 if (readIndex >= circularBuffer_.size()) {
889 return -1;
890 }
891
892 auto data = circularBuffer_.at(readIndex);
893 if (data == nullptr) {
894 SHARING_LOGE("BufferDispatcher Read data nullptr.");
895 return -1;
896 }
897
898 if (IsKeyVideoFrame(data)) {
899 int32_t bufferVideoCacheCnt = 0;
900 for (size_t i = (size_t)readIndex + 1; i < circularBuffer_.size(); i++) {
901 if (circularBuffer_[static_cast<int32_t>(i)]->mediaData->mediaType == MEDIA_TYPE_VIDEO)
902 bufferVideoCacheCnt++;
903 }
904 MEDIA_LOGD("TEST STATISTIC:interval: buffer cache %{public}d frames.", bufferVideoCacheCnt);
905 }
906
907 SetReceiverReadFlag(receiverId, data);
908 if (cb != nullptr) {
909 cb(data->mediaData);
910 }
911
912 MEDIA_LOGD("Current data readed, Recvid:%{public}d, remain %{public}zu data, readIndex: %{public}u, "
913 "readtype: %{public}d, diff: %{public}zu.",
914 receiverId, circularBuffer_.size(), readIndex, int32_t(type), circularBuffer_.size() - readIndex);
915 UpdateReceiverReadIndex(receiverId, readIndex, type);
916 return 0;
917 }
918
InputData(const MediaData::Ptr & data)919 int32_t BufferDispatcher::InputData(const MediaData::Ptr &data)
920 {
921 if (data == nullptr || data->buff == nullptr) {
922 SHARING_LOGE("data nullptr.");
923 return -1;
924 }
925 MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".", data->mediaType,
926 data->keyFrame, data->pts);
927
928 if (!writing_) {
929 writing_ = true;
930 }
931
932 DataSpec::Ptr dataSpec = std::make_shared<DataSpec>();
933 dataSpec->mediaData = data;
934 if (dataMode_ == MEDIA_AUDIO_ONLY) {
935 WriteDataIntoBuffer(dataSpec);
936 } else {
937 PreProcessDataSpec(dataSpec);
938 }
939
940 if (circularBuffer_.size() > 0) {
941 MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".",
942 circularBuffer_[circularBuffer_.size() - 1]->mediaData->mediaType,
943 circularBuffer_[circularBuffer_.size() - 1]->mediaData->keyFrame,
944 circularBuffer_[circularBuffer_.size() - 1]->mediaData->pts);
945 }
946
947 if (data->keyFrame) {
948 MEDIA_LOGD("dispatcherId: %{public}u, after InputData, current circularBuffer_ size: %{public}zu, "
949 "idleVideoBuffer_ size: %{public}zu, idle_audioBuffer_ size: %{public}zu, "
950 "keyFrame: %{public}s, data size: %{public}d, adataCount:%{public}d.",
951 GetDispatcherId(), circularBuffer_.size(), idleVideoBuffer_.size(), idleAudioBuffer_.size(),
952 data->keyFrame ? "true" : "false", data->buff->Size(), audioFrameCnt_);
953 }
954
955 return 0;
956 }
957
PreProcessDataSpec(const DataSpec::Ptr & dataSpec)958 void BufferDispatcher::PreProcessDataSpec(const DataSpec::Ptr &dataSpec)
959 {
960 MEDIA_LOGD("trace.");
961 if (waitingKey_) {
962 if (IsAudioData(dataSpec)) {
963 } else if (!IsKeyVideoFrame(dataSpec)) {
964 SHARING_LOGD("BufferDispatcher Waiting First Key Video Frame.");
965 return;
966 } else {
967 SHARING_LOGD("BufferDispatcher received first key video frame and restore from uncontinuous...Flushing.");
968 FlushBuffer();
969 baseCounter_++;
970 capacityEvaluating_ = true;
971 waitingKey_ = false;
972 }
973 } else {
974 if (capacityEvaluating_) {
975 ReCalculateCapacity(IsKeyVideoFrame(dataSpec));
976 }
977 }
978
979 WriteDataIntoBuffer(dataSpec);
980 }
981
WriteDataIntoBuffer(const DataSpec::Ptr & data)982 int32_t BufferDispatcher::WriteDataIntoBuffer(const DataSpec::Ptr &data)
983 {
984 MEDIA_LOGD("trace.");
985 if (data->mediaData == nullptr || data->mediaData->buff == nullptr) {
986 SHARING_LOGE("null data.");
987 return -1;
988 }
989
990 if (NeedExtendToDBCapacity()) {
991 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d CRTL_SIZE.", doubleBufferCapacity_);
992 SetBufferCapacity(doubleBufferCapacity_);
993 }
994
995 if (NeedRestoreToNormalCapacity()) {
996 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
997 int32_t popSize = (int32_t)circularBuffer_.size() - (int32_t)INITIAL_BUFFER_CAPACITY;
998 for (int32_t i = 0; i < popSize; i++) {
999 if (HeadFrameNeedReserve()) {
1000 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1001 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1002 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1003 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1004 circularBuffer_.front()->mediaData->pts);
1005 }
1006
1007 MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: "
1008 "%{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1009 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1010 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1011 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1012 circularBuffer_.pop_front();
1013 audioFrameCnt_--;
1014 UpdateIndex();
1015 }
1016
1017 baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1018 doubleBufferCapacity_ = INITIAL_BUFFER_CAPACITY * 2; // 2 : increasement
1019 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d NORMALSIZE.", baseBufferCapacity_);
1020 circularBuffer_.set_capacity(baseBufferCapacity_);
1021 }
1022
1023 if (IsKeyVideoFrame(data) && !keyOnly_) {
1024 EraseOldGopDatas();
1025 }
1026
1027 bool updateIndexFlag = false;
1028 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1029 if (circularBuffer_.size() >= circularBuffer_.capacity()) {
1030 updateIndexFlag = true;
1031 }
1032
1033 if (updateIndexFlag) {
1034 uint32_t nextDeleteIndex = 1;
1035 if (IsVideoData(data)) {
1036 nextDeleteIndex = FindNextDeleteVideoIndex();
1037 }
1038
1039 for (size_t i = 0; i <= nextDeleteIndex; i++) {
1040 MediaType headType = circularBuffer_.front()->mediaData->mediaType;
1041 DataSpec::Ptr retBuff = circularBuffer_.front();
1042 if (HeadFrameNeedReserve()) {
1043 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1044 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1045 GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1046 retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1047 }
1048
1049 MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, "
1050 "keyFrame: %{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1051 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1052 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1053 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1054
1055 circularBuffer_.pop_front();
1056 ReturnIdleBuffer(retBuff);
1057 headType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1058 UpdateIndex();
1059 }
1060 }
1061
1062 data->reserveFlag = 0;
1063 MEDIA_LOGD("WriteDataIntoBuffer data type: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1064 ", cur_size: %{public}zu, capacity: %{public}zu dispatcher[%{public}u].",
1065 int32_t(data->mediaData->mediaType), data->mediaData->keyFrame ? "true" : "false", data->mediaData->pts,
1066 circularBuffer_.size(), circularBuffer_.capacity(), GetDispatcherId());
1067 circularBuffer_.push_back(data);
1068 if (IsAudioData(data)) {
1069 lastAudioIndex_ = circularBuffer_.size() - 1;
1070 ActiveDataRef(MEDIA_TYPE_AUDIO, false);
1071 audioFrameCnt_++;
1072 } else {
1073 lastVideoIndex_ = circularBuffer_.size() - 1;
1074 if (!keyOnly_ || IsKeyVideoFrame(data)) {
1075 ActiveDataRef(MEDIA_TYPE_VIDEO, IsKeyVideoFrame(data));
1076 }
1077 videoFrameCnt_++;
1078 }
1079
1080 if (audioNeedActivate_ && IsAudioData(data)) {
1081 MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By AudioData.");
1082 ActivateReceiverIndex(circularBuffer_.size() - 1, MEDIA_TYPE_AUDIO);
1083 }
1084
1085 if (IsKeyVideoFrame(data)) {
1086 uint32_t keyIndex = circularBuffer_.size() - 1;
1087 {
1088 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1089 keyIndexList_.push_back(keyIndex);
1090 }
1091 if (videoNeedActivate_) {
1092 MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By KeyVideo Frame index: %{public}d.", keyIndex);
1093 ActivateReceiverIndex(keyIndex, MEDIA_TYPE_VIDEO);
1094 }
1095 if (keyRedirect_) {
1096 OnKeyRedirect();
1097 EnableKeyRedirect(false);
1098 }
1099 }
1100
1101 continueNotify_ = true;
1102 dataCV_.notify_one();
1103 return 0;
1104 }
1105
EraseOldGopDatas()1106 void BufferDispatcher::EraseOldGopDatas()
1107 {
1108 MEDIA_LOGD("BufferDispatcher Delete old datas In.");
1109 if (dataMode_ == MEDIA_AUDIO_ONLY) {
1110 FlushBuffer();
1111 return;
1112 }
1113
1114 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1115 uint32_t nextKey = 0;
1116 {
1117 std::lock_guard<std::mutex> lock(notifyMutex_);
1118 if (!keyIndexList_.empty() && keyIndexList_.back() > 0) {
1119 MEDIA_LOGD("find next key listsize %{public}zu, back:%{public}d.", keyIndexList_.size(),
1120 keyIndexList_.back());
1121 nextKey = keyIndexList_.back();
1122 keyIndexList_.clear();
1123 keyIndexList_.push_back(nextKey);
1124 }
1125 }
1126
1127 MEDIA_LOGD("erase between 0 to next Video Frame %{public}d.", nextKey);
1128 DeleteHeadDatas(nextKey, false);
1129 nextKey = FindNextDeleteVideoIndex();
1130 DeleteHeadDatas(nextKey, true);
1131 std::string indexs;
1132
1133 MEDIA_LOGD("circularBuffer_ size: %{public}zu.", circularBuffer_.size());
1134 for (auto &keyIndex : keyIndexList_) {
1135 indexs += std::to_string(keyIndex) + ", ";
1136 MEDIA_LOGD("keyIndex update to %{public}d.", keyIndex);
1137 }
1138
1139 MEDIA_LOGD("current keyIndex: %{public}s.", indexs.c_str());
1140 }
1141
DeleteHeadDatas(uint32_t size,bool forceDelete)1142 void BufferDispatcher::DeleteHeadDatas(uint32_t size, bool forceDelete)
1143 {
1144 SHARING_LOGI("%{public}s, size %{public}d.", __FUNCTION__, size);
1145 if (size <= 0) {
1146 MEDIA_LOGW("invalid Size, dispatcherId: %{public}u!", GetDispatcherId());
1147 return;
1148 }
1149
1150 for (size_t i = 0; i < size; i++) {
1151 if (HeadFrameNeedReserve() && !forceDelete) {
1152 MEDIA_LOGD("index %{public}zu need reserve.", i);
1153 break;
1154 }
1155 if (circularBuffer_.empty()) {
1156 return;
1157 }
1158 DataSpec::Ptr retBuff = circularBuffer_.front();
1159 MEDIA_LOGD("BufferDispatcher pop out headtype %{public}d.", retBuff->mediaData->mediaType);
1160 retBuff->mediaData->mediaType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1161 if (HeadFrameNeedReserve()) {
1162 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1163 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1164 GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1165 retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1166 }
1167
1168 MEDIA_LOGD(
1169 "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1170 ", reserveFlag: %{public}x.",
1171 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1172 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false", circularBuffer_.front()->mediaData->pts,
1173 circularBuffer_.front()->reserveFlag.load());
1174 circularBuffer_.pop_front();
1175 ReturnIdleBuffer(retBuff);
1176 UpdateIndex();
1177 }
1178
1179 if (circularBuffer_.size() < baseBufferCapacity_ && circularBuffer_.capacity() > baseBufferCapacity_) {
1180 MEDIA_LOGE("capacity return to base %{public}d.", baseBufferCapacity_);
1181 circularBuffer_.set_capacity(baseBufferCapacity_);
1182 }
1183 }
1184
UpdateIndex()1185 void BufferDispatcher::UpdateIndex()
1186 {
1187 MEDIA_LOGD("trace.");
1188 std::lock_guard<std::mutex> locker(notifyMutex_);
1189 if (!keyIndexList_.empty() && keyIndexList_.front() == 0) {
1190 keyIndexList_.pop_front();
1191 MEDIA_LOGD("BufferDispatcher pop out first 0 keyIndex after listsize %{public}zu.", keyIndexList_.size());
1192 }
1193
1194 for (auto &keyIndex : keyIndexList_) {
1195 if (keyIndex > 0) {
1196 keyIndex--;
1197 }
1198 }
1199
1200 for (auto &[recvId, notifier] : notifiers_) {
1201 if (notifier->videoIndex > 0 && notifier->videoIndex != INVALID_INDEX) {
1202 notifier->videoIndex--;
1203 }
1204 if (notifier->audioIndex > 0 && notifier->audioIndex != INVALID_INDEX) {
1205 notifier->audioIndex--;
1206 }
1207 }
1208
1209 if (lastVideoIndex_ > 0 && lastVideoIndex_ != INVALID_INDEX) {
1210 lastVideoIndex_--;
1211 }
1212
1213 if (lastAudioIndex_ > 0 && lastAudioIndex_ != INVALID_INDEX) {
1214 lastAudioIndex_--;
1215 }
1216 }
1217
FindNextDeleteVideoIndex()1218 uint32_t BufferDispatcher::FindNextDeleteVideoIndex()
1219 {
1220 MEDIA_LOGD("trace.");
1221 for (size_t i = 0; i < circularBuffer_.size(); i++) {
1222 if (circularBuffer_[i]->mediaData != nullptr && circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
1223 return i;
1224 }
1225 }
1226
1227 return 0;
1228 }
1229
FindLastIndex(MediaType type)1230 uint32_t BufferDispatcher::FindLastIndex(MediaType type)
1231 {
1232 SHARING_LOGD("trace.");
1233 if (circularBuffer_.empty()) {
1234 return INVALID_INDEX;
1235 }
1236
1237 return type == MEDIA_TYPE_AUDIO ? lastAudioIndex_ : lastVideoIndex_;
1238 }
1239
UpdateReceiverReadIndex(uint32_t receiverId,const uint32_t readIndex,MediaType type)1240 void BufferDispatcher::UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type)
1241 {
1242 MEDIA_LOGD("trace.");
1243 uint32_t nextIndex = FindNextIndex(readIndex, type, receiverId);
1244 bool noAvaliableData = false;
1245 if (nextIndex == readIndex) {
1246 noAvaliableData = true;
1247 }
1248
1249 auto notifier = GetNotifierByReceiverId(receiverId);
1250 if (notifier == nullptr) {
1251 SHARING_LOGE("notifier is nullptr.");
1252 return;
1253 }
1254
1255 bool readOver = circularBuffer_.size() - readIndex < 3;
1256 if (readOver && notifier->NeedAcceleration() && type == MEDIA_TYPE_VIDEO) {
1257 SHARING_LOGD("BufferDispatcher SendAccelerationDone.");
1258 notifier->SendAccelerationDone();
1259 }
1260
1261 notifier->SetNeedUpdate(noAvaliableData, type);
1262
1263 if (type == MEDIA_TYPE_VIDEO) {
1264 notifier->videoIndex = nextIndex;
1265 } else if (type == MEDIA_TYPE_AUDIO) {
1266 notifier->audioIndex = nextIndex;
1267 } else {
1268 notifier->videoIndex = nextIndex;
1269 notifier->audioIndex = nextIndex;
1270 }
1271
1272 MEDIA_LOGD("After UpdateReceiverReadIndex type %{public}d, aindex %{public}d, vindex %{public}d.", type,
1273 notifier->audioIndex, notifier->videoIndex);
1274 }
1275
FindNextIndex(uint32_t index,MediaType type)1276 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type)
1277 {
1278 MEDIA_LOGD("trace.");
1279 if ((uint64_t)index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1280 return index;
1281 }
1282
1283 if (type == MEDIA_TYPE_AV) {
1284 return index + 1;
1285 }
1286
1287 for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1288 if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1289 if (keyOnly_ && type == MEDIA_TYPE_VIDEO && !IsKeyVideoFrame(circularBuffer_[i])) {
1290 continue;
1291 } else {
1292 return i;
1293 }
1294 }
1295 }
1296
1297 return index;
1298 }
1299
FindNextIndex(uint32_t index,MediaType type,uint32_t receiverId)1300 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId)
1301 {
1302 MEDIA_LOGD("trace.");
1303 if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1304 return index;
1305 }
1306
1307 if (type == MEDIA_TYPE_AV) {
1308 return index + 1;
1309 }
1310
1311 auto notifier = GetNotifierByReceiverId(receiverId);
1312 if (notifier == nullptr) {
1313 SHARING_LOGE("FindNextIndex GetNotifier nullptr.");
1314 return INVALID_INDEX;
1315 }
1316
1317 bool keyModeReceiver = notifier->IsKeyModeReceiver();
1318 for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1319 if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1320 if ((keyOnly_ || keyModeReceiver) && type == MEDIA_TYPE_VIDEO) {
1321 if (!IsKeyVideoFrame(circularBuffer_[i])) {
1322 continue;
1323 } else {
1324 for (size_t bIndex = index + 1; bIndex < i; bIndex++) {
1325 SetReceiverReadFlag(receiverId, circularBuffer_[bIndex]);
1326 }
1327 return i;
1328 }
1329 } else {
1330 return i;
1331 }
1332 }
1333 }
1334
1335 return index;
1336 }
1337
ResetAllIndex()1338 void BufferDispatcher::ResetAllIndex()
1339 {
1340 SHARING_LOGD("trace.");
1341 std::lock_guard<std::mutex> locker(notifyMutex_);
1342 keyIndexList_.clear();
1343 for (auto &[recvId, notifier] : notifiers_) {
1344 notifier->videoIndex = INVALID_INDEX;
1345 notifier->audioIndex = INVALID_INDEX;
1346 }
1347
1348 videoNeedActivate_ = true;
1349 audioNeedActivate_ = true;
1350 lastAudioIndex_ = INVALID_INDEX;
1351 lastVideoIndex_ = INVALID_INDEX;
1352 }
1353
IsDataReaded(uint32_t receiverId,DataSpec::Ptr & dataSpec)1354 bool BufferDispatcher::IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1355 {
1356 MEDIA_LOGD("trace.");
1357 uint32_t index = FindReceiverIndex(receiverId);
1358 if (index == INVALID_INDEX) {
1359 return false;
1360 }
1361
1362 return dataSpec->reserveFlag & RECV_FLAG_BASE << index;
1363 }
1364
IsVideoData(const DataSpec::Ptr & dataSpec)1365 bool BufferDispatcher::IsVideoData(const DataSpec::Ptr &dataSpec)
1366 {
1367 MEDIA_LOGD("trace.");
1368 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1369 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1370 return false;
1371 }
1372
1373 return dataSpec->mediaData->mediaType == MEDIA_TYPE_VIDEO;
1374 }
1375
IsAudioData(const DataSpec::Ptr & dataSpec)1376 bool BufferDispatcher::IsAudioData(const DataSpec::Ptr &dataSpec)
1377 {
1378 MEDIA_LOGD("trace.");
1379 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1380 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1381 return false;
1382 }
1383
1384 return dataSpec->mediaData->mediaType == MEDIA_TYPE_AUDIO;
1385 }
1386
IsKeyVideoFrame(const DataSpec::Ptr & dataSpec)1387 bool BufferDispatcher::IsKeyVideoFrame(const DataSpec::Ptr &dataSpec)
1388 {
1389 MEDIA_LOGD("trace.");
1390 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1391 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1392 return false;
1393 }
1394
1395 return IsVideoData(dataSpec) && dataSpec->mediaData->keyFrame;
1396 }
1397
HeadFrameNeedReserve()1398 bool BufferDispatcher::HeadFrameNeedReserve()
1399 {
1400 MEDIA_LOGD("trace.");
1401 if (!circularBuffer_.empty()) {
1402 uint8_t temp = readRefFlag_;
1403 MEDIA_LOGD("IsHeadFrameNeedReserve Head reserveFlag %{public}d readRefFlag_ %{public}d.",
1404 circularBuffer_.front()->reserveFlag.load(), readRefFlag_);
1405 return temp ^ circularBuffer_.front()->reserveFlag;
1406 }
1407
1408 return false;
1409 }
1410
NeedExtendToDBCapacity()1411 bool BufferDispatcher::NeedExtendToDBCapacity()
1412 {
1413 MEDIA_LOGD("trace.");
1414 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1415 return (circularBuffer_.size() >= circularBuffer_.capacity() &&
1416 circularBuffer_.capacity() < doubleBufferCapacity_ && HeadFrameNeedReserve());
1417 }
1418
NeedRestoreToNormalCapacity()1419 bool BufferDispatcher::NeedRestoreToNormalCapacity()
1420 {
1421 MEDIA_LOGD("trace.");
1422 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1423 return audioFrameCnt_ >= circularBuffer_.capacity() && circularBuffer_.capacity() != INITIAL_BUFFER_CAPACITY;
1424 }
1425
ReCalculateCapacity(bool keyFrame)1426 void BufferDispatcher::ReCalculateCapacity(bool keyFrame)
1427 {
1428 MEDIA_LOGD("trace.");
1429 baseCounter_++;
1430 if (baseCounter_ >= maxBufferCapacity_) {
1431 SHARING_LOGE("BufferDispatcher too many Audiodata need Set Capacity to default.");
1432 }
1433
1434 if (baseCounter_ >= circularBuffer_.capacity() && !keyFrame) {
1435 uint32_t tmpSize = circularBuffer_.capacity() + bufferCapacityIncrement_ < maxBufferCapacity_
1436 ? circularBuffer_.capacity() + bufferCapacityIncrement_
1437 : maxBufferCapacity_;
1438 doubleBufferCapacity_ = tmpSize * 2 < maxBufferCapacity_ ? tmpSize * 2 : maxBufferCapacity_; // 2: increasement
1439 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d in adaptive capacity calculating.", tmpSize);
1440 SetBufferCapacity(tmpSize);
1441 return;
1442 }
1443
1444 if (keyFrame) {
1445 baseBufferCapacity_ = baseCounter_ + bufferCapacityIncrement_ < maxBufferCapacity_
1446 ? baseCounter_ + bufferCapacityIncrement_
1447 : maxBufferCapacity_;
1448 if (baseBufferCapacity_ < INITIAL_BUFFER_CAPACITY) {
1449 baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1450 }
1451 doubleBufferCapacity_ = baseBufferCapacity_ * 2 < maxBufferCapacity_ // 2: increasement
1452 ? baseBufferCapacity_ * 2 // 2: increasement
1453 : maxBufferCapacity_;
1454 gop_ = baseCounter_;
1455 capacityEvaluating_ = gop_ > 0 ? false : true;
1456 SetBufferCapacity(baseBufferCapacity_);
1457 baseCounter_ = 0;
1458 SHARING_LOGI(
1459 "The gop is %{public}d and BufferDispatcher buffer Extended to %{public}d on base capacity confirmed.",
1460 GetCurrentGop(), baseBufferCapacity_);
1461 }
1462 }
1463
NotifyThreadWorker(void * userParam)1464 int32_t BufferDispatcher::NotifyThreadWorker(void *userParam)
1465 {
1466 SHARING_LOGI("BufferDispatcher DataNotifier thread in.");
1467 RETURN_INVALID_IF_NULL(userParam);
1468 BufferDispatcher *dispatcher = (BufferDispatcher *)userParam;
1469 while (dispatcher->running_) {
1470 std::unique_lock<std::mutex> locker(dispatcher->notifyMutex_);
1471 uint32_t notifyRef = dispatcher->dataBitRef_ & dispatcher->recvBitRef_;
1472 MEDIA_LOGD("DataBitRef %{public}u recvBitRef_ %{public}d notifyRef_ %{public}d.",
1473 dispatcher->dataBitRef_.load(), dispatcher->recvBitRef_.load(), notifyRef);
1474
1475 for (auto &[recvId, notifier] : dispatcher->notifiers_) {
1476 auto index = notifier->GetReadIndex();
1477 if (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ||
1478 ((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO + FIX_OFFSET_ONE)) & notifyRef)) {
1479 MediaType notifyType;
1480 if (notifier->IsMixedReceiver()) {
1481 notifyType = MEDIA_TYPE_AV;
1482 } else {
1483 notifyType = (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ? MEDIA_TYPE_AUDIO
1484 : MEDIA_TYPE_VIDEO);
1485 }
1486 MEDIA_LOGD("notify the receiveId: %{public}d, notifyType: %{public}d, notifyRef: %{public}x.", recvId,
1487 notifyType, notifyRef);
1488 notifier->NotifyDataReceiver(notifyType);
1489 }
1490 }
1491
1492 dispatcher->dataCV_.wait(locker, [&dispatcher]() { return dispatcher->continueNotify_.load(); });
1493 dispatcher->continueNotify_ = false;
1494 }
1495
1496 return 0;
1497 }
1498
NotifyReadReady(uint32_t receiverId,MediaType type)1499 void BufferDispatcher::NotifyReadReady(uint32_t receiverId, MediaType type)
1500 {
1501 MEDIA_LOGD("trace.");
1502 auto notifier = GetNotifierByReceiverId(receiverId);
1503 if (notifier == nullptr) {
1504 SHARING_LOGE("notifier is nullptr.");
1505 return;
1506 }
1507
1508 std::shared_lock<std::shared_mutex> lock(bufferMutex_);
1509 std::unique_lock<std::mutex> locker(notifyMutex_);
1510 if (type == MEDIA_TYPE_AV) {
1511 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, true);
1512 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, true);
1513 } else {
1514 SetReceiverReadRef(receiverId, type, true);
1515 }
1516
1517 bool dataAvaliable = notifier->DataAvailable(type);
1518 MEDIA_LOGD("receiverId %{public}d MediaType %{public}d dataAvaliable %{public}d.", receiverId, type,
1519 dataAvaliable);
1520 if (type == MEDIA_TYPE_AV) {
1521 SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, dataAvaliable);
1522 SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, dataAvaliable);
1523 } else {
1524 SetReceiverDataRef(receiverId, type, dataAvaliable);
1525 }
1526
1527 if (!dataAvaliable) {
1528 return;
1529 }
1530
1531 continueNotify_ = true;
1532 dataCV_.notify_one();
1533 }
1534
SetDataRef(uint32_t bitref)1535 void BufferDispatcher::SetDataRef(uint32_t bitref)
1536 {
1537 SHARING_LOGD("trace.");
1538 dataBitRef_ &= bitref;
1539 }
1540
GetDataRef()1541 uint32_t BufferDispatcher::GetDataRef()
1542 {
1543 SHARING_LOGD("trace.");
1544 return dataBitRef_;
1545 }
1546
SetReadRef(uint32_t bitref)1547 void BufferDispatcher::SetReadRef(uint32_t bitref)
1548 {
1549 SHARING_LOGD("trace.");
1550 recvBitRef_ &= bitref;
1551 }
1552
GetReadRef()1553 uint32_t BufferDispatcher::GetReadRef()
1554 {
1555 SHARING_LOGD("trace.");
1556 return recvBitRef_;
1557 }
1558
ActiveDataRef(MediaType type,bool keyFrame)1559 void BufferDispatcher::ActiveDataRef(MediaType type, bool keyFrame)
1560 {
1561 MEDIA_LOGD("trace.");
1562 std::unique_lock<std::mutex> locker(notifyMutex_);
1563 uint32_t bitRef = 0x0000;
1564 for (auto &[recvId, notifier] : notifiers_) {
1565 auto index = notifier->GetReadIndex();
1566 if (type == MEDIA_TYPE_AUDIO) {
1567 bitRef |= RECV_FLAG_BASE << (index * 2); // 2: fix offset, get audio notifyer id
1568 continue;
1569 }
1570 bool keyModeReceiver = false;
1571 keyModeReceiver = notifier->IsKeyModeReceiver();
1572 if (keyFrame && keyModeReceiver && keyIndexList_.empty()) {
1573 notifier->videoIndex = circularBuffer_.size() - 1;
1574 }
1575 if (!keyModeReceiver || keyFrame) {
1576 if (index != INVALID_INDEX) {
1577 bitRef |= RECV_FLAG_BASE << (index * 2 + 1); // 2: fix offset, get video notifyer id
1578 }
1579 }
1580 }
1581
1582 dataBitRef_ |= bitRef;
1583 }
1584
SetReceiverDataRef(uint32_t receiverId,MediaType type,bool ready)1585 void BufferDispatcher::SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready)
1586 {
1587 MEDIA_LOGD("trace.");
1588 uint32_t index = FindReceiverIndex(receiverId);
1589 if (index == INVALID_INDEX) {
1590 return;
1591 }
1592
1593 if (type == MEDIA_TYPE_AUDIO) {
1594 uint32_t audioBit = index * 2;
1595 uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1596 MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1597 if (!ready) {
1598 bitRef = ~bitRef;
1599 dataBitRef_ &= bitRef;
1600 } else {
1601 dataBitRef_ |= bitRef;
1602 }
1603 } else if (type == MEDIA_TYPE_VIDEO) {
1604 uint32_t videoBit = index * 2 + 1;
1605 uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1606 MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1607 if (!ready) {
1608 bitRef = ~bitRef;
1609 dataBitRef_ &= bitRef;
1610 } else {
1611 dataBitRef_ |= bitRef;
1612 }
1613 }
1614 }
1615
SetReceiverReadRef(uint32_t receiverId,MediaType type,bool ready)1616 void BufferDispatcher::SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready)
1617 {
1618 MEDIA_LOGD("trace.");
1619 uint32_t index = FindReceiverIndex(receiverId);
1620 if (index == INVALID_INDEX) {
1621 return;
1622 }
1623
1624 if (type == MEDIA_TYPE_AUDIO) {
1625 uint32_t audioBit = index * 2;
1626 uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1627 MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1628
1629 if (!ready) {
1630 bitRef = ~bitRef;
1631 recvBitRef_ &= bitRef;
1632 } else {
1633 recvBitRef_ |= bitRef;
1634 }
1635 } else if (type == MEDIA_TYPE_VIDEO) {
1636 uint32_t videoBit = index * 2 + 1;
1637 uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1638 MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1639
1640 if (!ready) {
1641 bitRef = ~bitRef;
1642 recvBitRef_ &= bitRef;
1643 } else {
1644 recvBitRef_ |= bitRef;
1645 }
1646 }
1647 }
1648
GetReceiverDataRef(uint32_t receiverId)1649 uint32_t BufferDispatcher::GetReceiverDataRef(uint32_t receiverId)
1650 {
1651 SHARING_LOGD("trace.");
1652 return 0;
1653 }
1654
GetReceiverReadRef(uint32_t receiverId)1655 uint32_t BufferDispatcher::GetReceiverReadRef(uint32_t receiverId)
1656 {
1657 SHARING_LOGD("trace.");
1658 uint32_t retBitRef = GetReceiverIndexRef(receiverId);
1659 retBitRef &= recvBitRef_;
1660 return retBitRef;
1661 }
1662
GetReceiverIndexRef(uint32_t receiverId)1663 uint32_t BufferDispatcher::GetReceiverIndexRef(uint32_t receiverId)
1664 {
1665 SHARING_LOGD("trace.");
1666 uint32_t index = FindReceiverIndex(receiverId);
1667 uint32_t audioBit = index * 2;
1668 uint32_t videoBit = index * 2 + 1;
1669 uint32_t retBitRef = 0x0000;
1670 if (videoBit < 32) { // 32:bit width
1671 retBitRef |= RECV_FLAG_BASE << audioBit;
1672 retBitRef |= RECV_FLAG_BASE << videoBit;
1673 }
1674 return retBitRef;
1675 }
1676
ClearReadBit(uint32_t receiverId,MediaType type)1677 void BufferDispatcher::ClearReadBit(uint32_t receiverId, MediaType type)
1678 {
1679 MEDIA_LOGD("trace.");
1680 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1681 if (type != MEDIA_TYPE_AV) {
1682 SetReceiverReadRef(receiverId, type, false);
1683 } else {
1684 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
1685 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
1686 }
1687 }
1688
ClearDataBit(uint32_t receiverId,MediaType type)1689 void BufferDispatcher::ClearDataBit(uint32_t receiverId, MediaType type)
1690 {
1691 MEDIA_LOGD("trace.");
1692 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1693 if (type != MEDIA_TYPE_AV) {
1694 SetReceiverDataRef(receiverId, type, false);
1695 } else {
1696 SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, false);
1697 SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, false);
1698 }
1699 }
1700
IsRead(uint32_t receiverId,uint32_t index)1701 bool BufferDispatcher::IsRead(uint32_t receiverId, uint32_t index)
1702 {
1703 MEDIA_LOGD("trace.");
1704 if (index >= circularBuffer_.size()) {
1705 return true;
1706 } else {
1707 return IsDataReaded(receiverId, circularBuffer_.at(index));
1708 }
1709 }
1710
ActivateReceiverIndex(uint32_t index,MediaType type)1711 void BufferDispatcher::ActivateReceiverIndex(uint32_t index, MediaType type)
1712 {
1713 MEDIA_LOGD("trace.");
1714 std::unique_lock<std::mutex> lock(notifyMutex_);
1715 for (auto &[recvId, notifier] : notifiers_) {
1716 if (type == MEDIA_TYPE_VIDEO) {
1717 if (notifier->videoIndex == INVALID_INDEX) {
1718 notifier->videoIndex = index;
1719 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->videoIndex);
1720 videoNeedActivate_ = false;
1721 }
1722 } else {
1723 if (notifier->audioIndex == INVALID_INDEX) {
1724 notifier->audioIndex = index;
1725 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->audioIndex);
1726 audioNeedActivate_ = false;
1727 }
1728 }
1729 }
1730 }
UnlockWaitingReceiverIndex(MediaType type)1731 void BufferDispatcher::UnlockWaitingReceiverIndex(MediaType type)
1732 {
1733 SHARING_LOGD("trace.");
1734 }
1735
SetReceiverReadFlag(uint32_t receiverId,DataSpec::Ptr & dataSpec)1736 void BufferDispatcher::SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1737 {
1738 MEDIA_LOGD("trace.");
1739 RETURN_IF_NULL(dataSpec);
1740 uint32_t index = FindReceiverIndex(receiverId);
1741 if (index != INVALID_INDEX) {
1742 dataSpec->reserveFlag |= RECV_FLAG_BASE << index;
1743 MEDIA_LOGD("mediaType: %{public}d, pts: %{public}" PRIu64
1744 ", reserveFlag: %{public}x, receiverId: %{public}d, index: %{public}d.",
1745 dataSpec->mediaData->mediaType, dataSpec->mediaData->pts, dataSpec->reserveFlag.load(), receiverId,
1746 index);
1747 }
1748 }
1749
CancelReserve()1750 void BufferDispatcher::CancelReserve()
1751 {
1752 SHARING_LOGD("trace.");
1753 for (auto &data : circularBuffer_) {
1754 data->reserveFlag = 0xff;
1755 }
1756 }
1757
SetSpsNalu(MediaData::Ptr spsbuf)1758 void BufferDispatcher::SetSpsNalu(MediaData::Ptr spsbuf)
1759 {
1760 SHARING_LOGD("trace.");
1761 spsBuf_ = spsbuf;
1762 }
1763
GetSPS()1764 const MediaData::Ptr BufferDispatcher::GetSPS()
1765 {
1766 MEDIA_LOGD("trace.");
1767 return spsBuf_;
1768 }
1769
SetPpsNalu(MediaData::Ptr ppsbuf)1770 void BufferDispatcher::SetPpsNalu(MediaData::Ptr ppsbuf)
1771 {
1772 SHARING_LOGD("trace.");
1773 ppsBuf_ = ppsbuf;
1774 }
1775
GetPPS()1776 const MediaData::Ptr BufferDispatcher::GetPPS()
1777 {
1778 MEDIA_LOGD("trace.");
1779 return ppsBuf_;
1780 }
1781
GetCurrentGop()1782 uint32_t BufferDispatcher::GetCurrentGop()
1783 {
1784 SHARING_LOGD("trace.");
1785 return gop_;
1786 }
1787
OnKeyRedirect()1788 void BufferDispatcher::OnKeyRedirect()
1789 {
1790 SHARING_LOGD("trace.");
1791 std::unique_lock<std::mutex> lock(notifyMutex_);
1792 if (keyIndexList_.empty()) {
1793 return;
1794 }
1795
1796 auto nextIndex = keyIndexList_.back();
1797
1798 for (auto &[recvId, notifier] : notifiers_) {
1799 if (notifier->IsKeyRedirectReceiver()) {
1800 SHARING_LOGD("receiverId: %{public}u, videoIndex: %{public}d, nextIndex: %{public}d.",
1801 notifier->GetReceiverId(), notifier->videoIndex, nextIndex);
1802 auto curIndex = notifier->videoIndex;
1803 notifier->videoIndex = nextIndex;
1804 for (auto i = curIndex; i < nextIndex; i++) {
1805 SetReceiverReadFlag(notifier->GetReceiverId(), circularBuffer_[i]);
1806 }
1807 if (!rapidMode_) {
1808 auto receiver = notifier->GetBufferReceiver();
1809 if (receiver != nullptr) {
1810 receiver->EnableKeyRedirect(false);
1811 }
1812 }
1813 }
1814 }
1815 }
1816
EnableRapidMode(bool enable)1817 void BufferDispatcher::EnableRapidMode(bool enable)
1818 {
1819 SHARING_LOGD("trace.");
1820 rapidMode_ = enable;
1821 }
1822
1823 } // namespace Sharing
1824 } // namespace OHOS
1825