• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "buffer_dispatcher.h"
17 #include <cmath>
18 #include <cstdarg>
19 #include <cstdint>
20 #include "common/common_macro.h"
21 #include "media_channel_def.h"
22 
23 namespace OHOS {
24 namespace Sharing {
25 
26 constexpr int32_t WRITING_TIMTOUT = 30;
27 constexpr int32_t FIX_OFFSET_TWO = 2;
28 constexpr int32_t FIX_OFFSET_ONE = 1;
29 
SetSource(IBufferReader::Ptr dataReader)30 void BufferReceiver::SetSource(IBufferReader::Ptr dataReader)
31 {
32     SHARING_LOGD("trace.");
33     bufferReader_ = dataReader;
34 }
35 
OnMediaDataNotify()36 int32_t BufferReceiver::OnMediaDataNotify()
37 {
38     SHARING_LOGD("BufferReceiver Media notified.");
39     dataReady_ = true;
40     notifyData_.notify_one();
41     return 0;
42 }
43 
OnAudioDataNotify()44 int32_t BufferReceiver::OnAudioDataNotify()
45 {
46     MEDIA_LOGD("BufferReceiver Audio notified.");
47     nonBlockAudio_ = true;
48     notifyAudio_.notify_one();
49     return 0;
50 }
51 
OnVideoDataNotify()52 int32_t BufferReceiver::OnVideoDataNotify()
53 {
54     MEDIA_LOGD("BufferReceiver Video notified.");
55     nonBlockVideo_ = true;
56     notifyVideo_.notify_one();
57     return 0;
58 }
59 
IsMixedReceiver()60 bool BufferReceiver::IsMixedReceiver()
61 {
62     MEDIA_LOGD("trace.");
63     return mixed_;
64 }
65 
RequestRead(MediaType type,std::function<void (const MediaData::Ptr & data)> cb)66 int32_t BufferReceiver::RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb)
67 {
68     MEDIA_LOGD("trace.");
69     if (bufferReader_ == nullptr) {
70         SHARING_LOGE("BufferReceiver read failed null dispatcher.");
71         return -1;
72     }
73 
74     if (firstMRead_ && type == MEDIA_TYPE_AV) {
75         bufferReader_->NotifyReadReady(GetReceiverId(), type);
76         mixed_ = true;
77         firstMRead_ = false;
78     } else if (firstARead_ && type == MEDIA_TYPE_AUDIO) {
79         bufferReader_->NotifyReadReady(GetReceiverId(), type);
80         firstARead_ = false;
81     } else if (firstVRead_ && type == MEDIA_TYPE_VIDEO) {
82         bufferReader_->NotifyReadReady(GetReceiverId(), type);
83         firstVRead_ = false;
84     }
85     std::unique_lock<std::mutex> locker(mutex_);
86     MEDIA_LOGD("BufferDispatcher NotifyThreadWorker before wait, receiverId: %{public}u.", GetReceiverId());
87     switch (type) {
88         /*  cv will waiting if pred is false;
89             so set waiting audio pred (type != MEDIA_TYPE_AUDIO) to NOT block other type.*/
90         case MEDIA_TYPE_AUDIO:
91             MEDIA_LOGD("wait Audio, receiverId: %{public}u.", GetReceiverId());
92             notifyAudio_.wait(locker, [=]() { return nonBlockAudio_ || type != MEDIA_TYPE_AUDIO; });
93             nonBlockAudio_ = false;
94             break;
95         case MEDIA_TYPE_VIDEO:
96             MEDIA_LOGD("wait Video, receiverId: %{public}u.", GetReceiverId());
97             notifyVideo_.wait(locker, [=]() { return nonBlockVideo_ || type != MEDIA_TYPE_VIDEO; });
98             nonBlockVideo_ = false;
99             break;
100         case MEDIA_TYPE_AV:
101             MEDIA_LOGD("wait Mixed, receiverId: %{public}u.", GetReceiverId());
102             notifyData_.wait(locker, [=]() { return dataReady_ || type != MEDIA_TYPE_AV; });
103             dataReady_ = false;
104             break;
105         default:
106             return 0;
107             break;
108     }
109 
110     bufferReader_->ClearDataBit(GetReceiverId(), type);
111     bufferReader_->ClearReadBit(GetReceiverId(), type);
112     MEDIA_LOGD("BufferDispatcher NotifyThreadWorker after wait  start read, receiverId: %{public}u.", GetReceiverId());
113     int32_t ret = bufferReader_->ReadBufferData(GetReceiverId(), type, cb);
114     bufferReader_->NotifyReadReady(GetReceiverId(), type);
115     dataReady_ = false;
116 
117     return ret;
118 }
119 
NotifyReadStart()120 void BufferReceiver::NotifyReadStart()
121 {
122     SHARING_LOGD("receiverId: %{public}u notify start read.", GetReceiverId());
123     firstARead_ = true;
124     firstVRead_ = true;
125     firstMRead_ = true;
126 }
127 
GetReceiverId()128 uint32_t BufferReceiver::GetReceiverId()
129 {
130     MEDIA_LOGD("trace.");
131     return GetId();
132 }
133 
GetDispatcherId()134 uint32_t BufferReceiver::GetDispatcherId()
135 {
136     SHARING_LOGD("trace.");
137     if (bufferReader_) {
138         return bufferReader_->GetDispatcherId();
139     }
140 
141     return 0;
142 }
143 
NotifyReadStop()144 void BufferReceiver::NotifyReadStop()
145 {
146     SHARING_LOGD("receiverId: %{public}u notify stop read.", GetReceiverId());
147     nonBlockAudio_ = true;
148     nonBlockVideo_ = true;
149     dataReady_ = true;
150     notifyAudio_.notify_all();
151     notifyVideo_.notify_all();
152     notifyData_.notify_all();
153 }
154 
EnableKeyMode(bool enable)155 void BufferReceiver::EnableKeyMode(bool enable)
156 {
157     SHARING_LOGD("bufferReceiver id %{public}u SetKeyOnlyMode %{public}d.", GetReceiverId(), enable);
158     if (keyOnly_ == true && enable == false) {
159         SHARING_LOGD("Set KeyOnlyMode false, need report fast read over.");
160         accelerationDone_ = true;
161     }
162 
163     keyOnly_ = enable;
164     if (bufferReader_ && enable) {
165         bufferReader_->ClearDataBit(GetReceiverId(), MEDIA_TYPE_VIDEO);
166     }
167 
168     auto listener = listener_.lock();
169     if (listener) {
170         listener->OnKeyModeNotify(enable);
171     }
172 }
173 
IsKeyMode()174 bool BufferReceiver::IsKeyMode()
175 {
176     MEDIA_LOGD("trace.");
177     return keyOnly_;
178 }
179 
IsKeyRedirect()180 bool BufferReceiver::IsKeyRedirect()
181 {
182     SHARING_LOGD("trace.");
183     return keyRedirect_;
184 }
185 
GetSPS()186 const MediaData::Ptr BufferReceiver::GetSPS()
187 {
188     MEDIA_LOGD("trace.");
189     if (bufferReader_) {
190         return bufferReader_->GetSPS();
191     }
192 
193     return nullptr;
194 }
195 
GetPPS()196 const MediaData::Ptr BufferReceiver::GetPPS()
197 {
198     MEDIA_LOGD("trace.");
199     if (bufferReader_) {
200         return bufferReader_->GetPPS();
201     }
202 
203     return nullptr;
204 }
205 
NeedAcceleration()206 bool BufferReceiver::NeedAcceleration()
207 {
208     MEDIA_LOGD("trace.");
209     return accelerationDone_;
210 }
211 
DisableAcceleration()212 void BufferReceiver::DisableAcceleration()
213 {
214     SHARING_LOGD("trace.");
215     accelerationDone_ = false;
216 }
217 
SendAccelerationDone()218 void BufferReceiver::SendAccelerationDone()
219 {
220     SHARING_LOGD("trace.");
221     auto listener = listener_.lock();
222     if (listener) {
223         listener->OnAccelerationDoneNotify();
224     }
225 }
226 
EnableKeyRedirect(bool enable)227 void BufferReceiver::EnableKeyRedirect(bool enable)
228 {
229     SHARING_LOGD("trace.");
230     if (bufferReader_ && enable) {
231         bufferReader_->EnableKeyRedirect(enable);
232     }
233     keyRedirect_ = enable;
234 }
235 
SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)236 void BufferReceiver::SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)
237 {
238     SHARING_LOGD("trace.");
239     listener_ = listener;
240 }
241 
242 using DataNotifier = BufferDispatcher::DataNotifier;
243 
NotifyDataReceiver(MediaType type)244 void DataNotifier::NotifyDataReceiver(MediaType type)
245 {
246     MEDIA_LOGD("trace.");
247     if (receiver_.lock() == nullptr) {
248         SHARING_LOGE("target receiver NOT exist.");
249         return;
250     }
251 
252     if (block_) {
253         return;
254     }
255 
256     MEDIA_LOGD("notify target type %{public}d.", type);
257     switch (type) {
258         case MEDIA_TYPE_AUDIO:
259             GetBufferReceiver()->OnAudioDataNotify();
260             break;
261         case MEDIA_TYPE_VIDEO:
262             GetBufferReceiver()->OnVideoDataNotify();
263             break;
264         case MEDIA_TYPE_AV:
265             GetBufferReceiver()->OnMediaDataNotify();
266             break;
267         default:
268             SHARING_LOGI("none process case.");
269             break;
270     }
271 }
272 
GetBufferReceiver()273 BufferReceiver::Ptr DataNotifier::GetBufferReceiver()
274 {
275     MEDIA_LOGD("trace.");
276     return receiver_.lock();
277 }
278 
GetReceiverId()279 uint32_t DataNotifier::GetReceiverId()
280 {
281     MEDIA_LOGD("trace.");
282     auto receiver = receiver_.lock();
283     if (receiver == nullptr) {
284         SHARING_LOGE("target receiver NOT exist.");
285         return INVALID_INDEX;
286     }
287 
288     return receiver->GetReceiverId();
289 }
290 
SetListenDispatcher(IBufferReader::Ptr dispatcher)291 void DataNotifier::SetListenDispatcher(IBufferReader::Ptr dispatcher)
292 {
293     SHARING_LOGD("trace.");
294     dispatcher_ = dispatcher;
295 }
296 
SetNotifyReceiver(BufferReceiver::Ptr receiver)297 void DataNotifier::SetNotifyReceiver(BufferReceiver::Ptr receiver)
298 {
299     SHARING_LOGD("trace.");
300     receiver_ = receiver;
301 }
302 
SetBlock()303 void DataNotifier::SetBlock()
304 {
305     SHARING_LOGD("trace.");
306     block_ = true;
307 }
308 
SetNeedUpdate(bool enable,MediaType type)309 void DataNotifier::SetNeedUpdate(bool enable, MediaType type)
310 {
311     MEDIA_LOGD("trace.");
312     if (type == MEDIA_TYPE_AUDIO) {
313         needUpdateAIndex = enable;
314     } else {
315         needUpdateVIndex = enable;
316     }
317 }
318 
DataAvailable(MediaType type)319 bool DataNotifier::DataAvailable(MediaType type)
320 {
321     MEDIA_LOGD("trace.");
322     auto dispatcher = dispatcher_.lock();
323     if (dispatcher == nullptr) {
324         SHARING_LOGE("target dispatcher NOT exist.");
325         return false;
326     }
327 
328     if (type == MEDIA_TYPE_AUDIO) {
329         return audioIndex != INVALID_INDEX &&
330                (audioIndex < dispatcher->GetLatestAudioIndex() || !dispatcher->IsRead(GetReceiverId(), audioIndex + 1));
331     } else if (type == MEDIA_TYPE_VIDEO) {
332         return videoIndex != INVALID_INDEX &&
333                (videoIndex < dispatcher->GetLatestVideoIndex() || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
334     } else {
335         return videoIndex != INVALID_INDEX &&
336                (videoIndex < dispatcher->GetBufferSize() - 1 || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
337     }
338 
339     return false;
340 }
341 
IsMixedReceiver()342 bool DataNotifier::IsMixedReceiver()
343 {
344     MEDIA_LOGD("trace.");
345     auto receiver = receiver_.lock();
346     if (receiver == nullptr) {
347         SHARING_LOGE("target receiver NOT exist.");
348         return false;
349     }
350 
351     return receiver->IsMixedReceiver();
352 }
353 
GetReceiverReadIndex(MediaType type)354 uint32_t DataNotifier::GetReceiverReadIndex(MediaType type)
355 {
356     MEDIA_LOGD("trace.");
357     switch (type) {
358         case MEDIA_TYPE_VIDEO:
359             MEDIA_LOGD("Video Recvid:%{public}d index: %{public}d.", GetReceiverId(), videoIndex);
360             return videoIndex;
361             break;
362         case MEDIA_TYPE_AUDIO:
363             MEDIA_LOGD("Audio Recvid:%{public}d index: %{public}d.", GetReceiverId(), audioIndex);
364             return audioIndex;
365             break;
366         case MEDIA_TYPE_AV:
367             MEDIA_LOGD("Mixed Recvid:%{public}d vindex: %{public}d  aindex: %{public}d.", GetReceiverId(), videoIndex,
368                        audioIndex);
369             if (audioIndex != INVALID_INDEX && videoIndex != INVALID_INDEX) {
370                 return audioIndex <= videoIndex ? audioIndex : videoIndex;
371             } else if (audioIndex == INVALID_INDEX && videoIndex == INVALID_INDEX) {
372                 return INVALID_INDEX;
373             } else {
374                 return audioIndex == INVALID_INDEX ? videoIndex : audioIndex;
375             }
376             break;
377         default:
378             return INVALID_INDEX;
379             break;
380     }
381 }
382 
IsKeyModeReceiver()383 bool DataNotifier::IsKeyModeReceiver()
384 {
385     MEDIA_LOGD("trace.");
386     auto receiver = receiver_.lock();
387     if (receiver) {
388         return receiver->IsKeyMode();
389     }
390 
391     return false;
392 }
393 
IsKeyRedirectReceiver()394 bool DataNotifier::IsKeyRedirectReceiver()
395 {
396     SHARING_LOGD("trace.");
397     auto receiver = receiver_.lock();
398     if (receiver) {
399         return receiver->IsKeyRedirect();
400     }
401 
402     return false;
403 }
404 
NeedAcceleration()405 bool DataNotifier::NeedAcceleration()
406 {
407     MEDIA_LOGD("trace.");
408     auto receiver = receiver_.lock();
409     if (receiver == nullptr) {
410         SHARING_LOGE("target receiver NOT exist.");
411         return false;
412     }
413 
414     return receiver->NeedAcceleration();
415 }
416 
SendAccelerationDone()417 void DataNotifier::SendAccelerationDone()
418 {
419     SHARING_LOGD("trace.");
420     auto receiver = receiver_.lock();
421     if (receiver == nullptr) {
422         SHARING_LOGE("target receiver NOT exist.");
423         return;
424     }
425 
426     receiver->SendAccelerationDone();
427     receiver->DisableAcceleration();
428 }
429 
BufferDispatcher(uint32_t maxCapacity,uint32_t capacityIncrement)430 BufferDispatcher::BufferDispatcher(uint32_t maxCapacity, uint32_t capacityIncrement)
431 {
432     SHARING_LOGD("BufferDispatcher ctor, set capacity: %{public}u.", maxCapacity);
433     maxBufferCapacity_ = maxCapacity;
434     bufferCapacityIncrement_ = capacityIncrement;
435     {
436         std::lock_guard<std::mutex> lock(idleMutex_);
437         idleAudioBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
438         idleVideoBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
439         for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
440             MediaData::Ptr adata = std::make_shared<MediaData>();
441             MediaData::Ptr vdata = std::make_shared<MediaData>();
442             adata->buff = std::make_shared<DataBuffer>();
443             vdata->buff = std::make_shared<DataBuffer>();
444             idleAudioBuffer_.push_back(adata);
445             idleVideoBuffer_.push_back(vdata);
446         }
447     }
448 
449     writingTimer_ = std::make_unique<TimeoutTimer>("dispatcher-writing-timer");
450 
451     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
452     circularBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
453     StartDispatch();
454 }
455 
~BufferDispatcher()456 BufferDispatcher::~BufferDispatcher()
457 {
458     SHARING_LOGI("BufferDispatcher dtor.");
459     running_ = false;
460     StopDispatch();
461     FlushBuffer();
462     ReleaseIdleBuffer();
463     ReleaseAllReceiver();
464 }
465 
StartDispatch()466 void BufferDispatcher::StartDispatch()
467 {
468     SHARING_LOGD("trace.");
469     running_ = true;
470     notifyThread_ = std::thread([this] {
471         this->NotifyThreadWorker(this);
472     });
473     std::string name = "notifyThread";
474     pthread_setname_np(notifyThread_.native_handle(), name.c_str());
475 }
476 
StopDispatch()477 void BufferDispatcher::StopDispatch()
478 {
479     SHARING_LOGD("trace.");
480     running_ = false;
481     continueNotify_ = true;
482 
483     if (writingTimer_) {
484         writingTimer_.reset();
485     }
486 
487     dataCV_.notify_all();
488     if (notifyThread_.joinable()) {
489         notifyThread_.join();
490     }
491 }
492 
SetBufferCapacity(size_t capacity)493 void BufferDispatcher::SetBufferCapacity(size_t capacity)
494 {
495     SHARING_LOGD("trace.");
496     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
497     circularBuffer_.set_capacity(capacity);
498 }
499 
SetDataMode(MediaDispacherMode dataMode)500 void BufferDispatcher::SetDataMode(MediaDispacherMode dataMode)
501 {
502     SHARING_LOGD("trace.");
503     dataMode_ = dataMode;
504 }
505 
ReleaseIdleBuffer()506 void BufferDispatcher::ReleaseIdleBuffer()
507 {
508     SHARING_LOGD("BufferDispatcher idle Release Start.");
509     std::unique_lock<std::mutex> locker(idleMutex_);
510     for (auto &data : idleAudioBuffer_) {
511         if (data != nullptr && data->buff != nullptr) {
512             data->buff.reset();
513         }
514     }
515 
516     idleAudioBuffer_.clear();
517     for (auto &data : idleVideoBuffer_) {
518         if (data != nullptr && data->buff != nullptr) {
519             data->buff.reset();
520         }
521     }
522 
523     idleVideoBuffer_.clear();
524     SHARING_LOGD("BufferDispatcher idle Release End.");
525 }
526 
FlushBuffer()527 void BufferDispatcher::FlushBuffer()
528 {
529     SHARING_LOGI("BufferDispatcher Start flushing, dispatcherId: %{public}u.", GetDispatcherId());
530     {
531         std::lock_guard<std::mutex> lock(idleMutex_);
532         idleAudioBuffer_.clear();
533         idleVideoBuffer_.clear();
534         for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
535             MediaData::Ptr adata = std::make_shared<MediaData>();
536             MediaData::Ptr vdata = std::make_shared<MediaData>();
537             adata->buff = std::make_shared<DataBuffer>();
538             vdata->buff = std::make_shared<DataBuffer>();
539             idleAudioBuffer_.push_back(adata);
540             idleVideoBuffer_.push_back(vdata);
541         }
542     }
543 
544     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
545     for (auto &data : circularBuffer_) {
546         if (data->mediaData != nullptr && data->mediaData->buff != nullptr) {
547             data->mediaData->buff.reset();
548         }
549     }
550 
551     circularBuffer_.clear();
552     waitingKey_ = true;
553     gop_ = 0;
554     audioFrameCnt_ = 0;
555     videoFrameCnt_ = 0;
556     ResetAllIndex();
557     SHARING_LOGD("BufferDispatcher Dispatcher flushing end, dispatcherId: %{public}u.", GetDispatcherId());
558 }
559 
RequestDataBuffer(MediaType type,uint32_t size)560 MediaData::Ptr BufferDispatcher::RequestDataBuffer(MediaType type, uint32_t size)
561 {
562     SHARING_LOGD("trace.");
563     std::lock_guard<std::mutex> lock(idleMutex_);
564     if (size <= 0) {
565         SHARING_LOGE("Size invalid.");
566         return nullptr;
567     }
568 
569     MediaData::Ptr retData;
570     if (type == MEDIA_TYPE_VIDEO) {
571         if (!idleVideoBuffer_.empty()) {
572             SHARING_LOGD("video From idle.");
573             retData = idleVideoBuffer_.front();
574             idleVideoBuffer_.pop_front();
575             if (retData == nullptr) {
576                 MEDIA_LOGW("video From alloc when idle nullptr.");
577                 retData = std::make_shared<MediaData>();
578             }
579             return retData;
580         }
581     } else {
582         if (!idleAudioBuffer_.empty()) {
583             SHARING_LOGD("Audio From idle.");
584             retData = idleAudioBuffer_.front();
585             idleAudioBuffer_.pop_front();
586             if (retData == nullptr) {
587                 MEDIA_LOGW("Audio From alloc when idle nullptr.");
588                 retData = std::make_shared<MediaData>();
589             }
590             return retData;
591         }
592     }
593 
594     SHARING_LOGD("Audio/video from alloc.");
595     retData = std::make_shared<MediaData>();
596     return retData;
597 }
598 
ReturnIdleBuffer(DataSpec::Ptr & data)599 void BufferDispatcher::ReturnIdleBuffer(DataSpec::Ptr &data)
600 {
601     MEDIA_LOGD("trace.");
602     std::lock_guard<std::mutex> lock(idleMutex_);
603     if (data == nullptr || data->mediaData == nullptr) {
604         return;
605     }
606     if (data->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
607         if (idleVideoBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
608             idleVideoBuffer_.push_back(data->mediaData);
609             MEDIA_LOGD("data: push_back in idleVideoBuffer_, size: %{public}zu.", idleVideoBuffer_.size());
610         }
611     } else {
612         if (idleAudioBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
613             idleAudioBuffer_.push_back(data->mediaData);
614             MEDIA_LOGD("data: push_back in idleAudioBuffer_, size: %{public}zu.", idleAudioBuffer_.size());
615         }
616     }
617 
618     data.reset();
619 }
620 
GetBufferSize()621 size_t BufferDispatcher::GetBufferSize()
622 {
623     SHARING_LOGD("trace.");
624     return circularBuffer_.size();
625 }
626 
FindReceiverIndex(uint32_t receiverId)627 uint32_t BufferDispatcher::FindReceiverIndex(uint32_t receiverId)
628 {
629     MEDIA_LOGD("trace.");
630     if (notifiers_.find(receiverId) != notifiers_.end()) {
631         return notifiers_[receiverId]->GetReadIndex();
632     }
633 
634     return INVALID_INDEX;
635 }
636 
IsRecevierExist(uint32_t receiverId)637 bool BufferDispatcher::IsRecevierExist(uint32_t receiverId)
638 {
639     SHARING_LOGD("trace.");
640     auto notifier = GetNotifierByReceiverId(receiverId);
641     if (notifier == nullptr) {
642         return false;
643     }
644 
645     return true;
646 }
647 
EnableKeyMode(bool enable)648 void BufferDispatcher::EnableKeyMode(bool enable)
649 {
650     SHARING_LOGD("trace.");
651     keyOnly_ = enable;
652 }
653 
AttachReceiver(BufferReceiver::Ptr receiver)654 int32_t BufferDispatcher::AttachReceiver(BufferReceiver::Ptr receiver)
655 {
656     SHARING_LOGD("trace.");
657     if (receiver == nullptr) {
658         return -1;
659     }
660 
661     if (IsRecevierExist(receiver->GetReceiverId())) {
662         SHARING_LOGE("Exist.");
663         return 0;
664     }
665 
666     receiver->NotifyReadStart();
667     std::lock_guard<std::mutex> locker(notifyMutex_);
668     if (readRefFlag_ == 0xFFFF) {
669         SHARING_LOGE("readRefFlag limited.");
670         return -1;
671     }
672 
673     DataNotifier::Ptr notifier = std::make_shared<DataNotifier>();
674     notifier->SetListenDispatcher(shared_from_this());
675     notifier->SetNotifyReceiver(receiver);
676 
677     auto usableRef = ~readRefFlag_ & (-(~readRefFlag_));
678 
679     if ((static_cast<uint32_t>(usableRef) & static_cast<uint32_t>(usableRef - 1)) != 0) {
680         SHARING_LOGE("usableRef: %{public}d invalid.", usableRef);
681         return -1;
682     }
683 
684     readRefFlag_ |= static_cast<uint32_t>(usableRef);
685     notifier->SetReadIndex(static_cast<uint32_t>(log2(usableRef)));
686     SHARING_LOGI("receiverId: %{public}d, readIndex: %{public}d, usableRef: %{public}d, readRefFlag_: %{public}d.",
687                  receiver->GetReceiverId(), notifier->GetReadIndex(), usableRef, readRefFlag_);
688     receiver->SetSource(shared_from_this());
689     notifiers_.emplace(receiver->GetReceiverId(), notifier);
690 
691     if (circularBuffer_.empty()) {
692         notifier->audioIndex = INVALID_INDEX;
693         notifier->videoIndex = INVALID_INDEX;
694         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
695         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
696         SHARING_LOGD("BufferDispatcher Attach when buffer empty  RecvId: %{public}d.", receiver->GetReceiverId());
697         videoNeedActivate_ = true;
698         audioNeedActivate_ = true;
699         return 0;
700     }
701 
702     if (dataMode_ == MEDIA_AUDIO_ONLY) {
703         notifier->audioIndex = circularBuffer_.size() - 1;
704         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
705         notifier->videoIndex = INVALID_INDEX;
706         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
707         SHARING_LOGD("BufferDispatcher Attach when Keyindex List empty  RecvId: %{public}d.",
708                      receiver->GetReceiverId());
709     } else {
710         if (!keyIndexList_.empty()) {
711             SHARING_LOGD("BufferDispatcher Attach with Keyindex  RecvId: %{public}d  KeyIndex:%{public}d.",
712                          receiver->GetReceiverId(), keyIndexList_.back());
713             uint32_t tempIndex = FindNextIndex(keyIndexList_.back(), MEDIA_TYPE_AUDIO);
714             notifier->audioIndex = tempIndex == keyIndexList_.back() ? INVALID_INDEX : tempIndex;
715             notifier->videoIndex = keyIndexList_.back();
716             bool isAudioReady = tempIndex != INVALID_INDEX ? true : false;
717             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, isAudioReady);
718             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, true);
719             if (lastAudioIndex_ == INVALID_INDEX) {
720                 audioNeedActivate_ = true;
721             }
722         } else {
723             SHARING_LOGD("BufferDispatcher Attach with Non Keyindex Exist RecvId: %{public}d.",
724                          receiver->GetReceiverId());
725             uint32_t tempIndex = FindLastIndex(MEDIA_TYPE_AUDIO);
726             notifier->audioIndex = tempIndex;
727             notifier->videoIndex = INVALID_INDEX;
728             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
729             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
730             if (lastAudioIndex_ == INVALID_INDEX) {
731                 audioNeedActivate_ = true;
732             }
733         }
734     }
735 
736     return 0;
737 }
738 
DetachReceiver(BufferReceiver::Ptr receiver)739 int32_t BufferDispatcher::DetachReceiver(BufferReceiver::Ptr receiver)
740 {
741     SHARING_LOGI("buffer dispatcher: Detach receiver in.");
742     if (receiver == nullptr) {
743         SHARING_LOGE("buffer dispatcher: Detach receiver failed - null receiver.");
744         return -1;
745     }
746 
747     if (!IsRecevierExist(receiver->GetReceiverId())) {
748         SHARING_LOGE("BufferDispatcher AttachReceiver No Vaild Recevier Exist.");
749         return 0;
750     }
751 
752     auto notifier = GetNotifierByReceiverPtr(receiver);
753     if (notifier == nullptr) {
754         SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
755         return -1;
756     }
757 
758     std::lock_guard<std::mutex> locker(notifyMutex_);
759     notifier->SetBlock();
760     SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
761     SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
762 
763     readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
764     notifiers_.erase(receiver->GetReceiverId());
765     SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
766     return 0;
767 }
768 
DetachReceiver(uint32_t receiverId,DataNotifier::Ptr notifier)769 int32_t BufferDispatcher::DetachReceiver(uint32_t receiverId, DataNotifier::Ptr notifier)
770 {
771     SHARING_LOGI("buffer dispatcher: Detach notifier in.");
772     if (notifier == nullptr) {
773         SHARING_LOGE("buffer dispatcher: Detach receiver failed - null notifier.");
774         return -1;
775     }
776     notifier->SetBlock();
777     SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
778     SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
779 
780     readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
781     notifiers_.erase(receiverId);
782     SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
783     return 0;
784 }
785 
ReleaseAllReceiver()786 void BufferDispatcher::ReleaseAllReceiver()
787 {
788     SHARING_LOGD("trace.");
789     std::lock_guard<std::mutex> locker(notifyMutex_);
790     for (auto it = notifiers_.begin(); it != notifiers_.end();) {
791         auto notifier = it->second;
792         if (notifier == nullptr) {
793             ++it;
794             continue;
795         }
796 
797         auto receiver = notifier->GetBufferReceiver();
798         if (receiver == nullptr) {
799             ++it;
800             continue;
801         }
802 
803         auto receiverId = receiver->GetReceiverId();
804         if (notifiers_.find(receiverId) != notifiers_.end()) {
805             auto notifierFind = notifiers_[receiverId];
806             ++it;
807             DetachReceiver(receiverId, notifierFind);
808         } else {
809             ++it;
810             SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
811         }
812     }
813 
814     notifiers_.clear();
815     SHARING_LOGD("release all receiver out.");
816 }
817 
SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)818 void BufferDispatcher::SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)
819 {
820     SHARING_LOGD("trace.");
821     listener_ = listener;
822     RETURN_IF_NULL(writingTimer_);
823     writingTimer_->StartTimer(
824         WRITING_TIMTOUT, "waiting for continuous data inputs",
825         [this]() {
826             if (!writing_) {
827                 SHARING_LOGI("writing timeout");
828                 auto listener = listener_.lock();
829                 if (listener) {
830                     listener->OnWriteTimeout();
831                 }
832             } else {
833                 SHARING_LOGI("restart timer");
834                 writing_ = false;
835             }
836         },
837         true);
838 }
839 
GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)840 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)
841 {
842     SHARING_LOGD("trace.");
843 
844     return GetNotifierByReceiverId(receiver->GetReceiverId());
845 }
846 
GetNotifierByReceiverId(uint32_t receiverId)847 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverId(uint32_t receiverId)
848 {
849     MEDIA_LOGD("trace.");
850     std::lock_guard<std::mutex> locker(notifyMutex_);
851     if (notifiers_.find(receiverId) != notifiers_.end()) {
852         return notifiers_[receiverId];
853     }
854 
855     return nullptr;
856 }
857 
ReadBufferData(uint32_t receiverId,MediaType type,std::function<void (const MediaData::Ptr & data)> cb)858 int32_t BufferDispatcher::ReadBufferData(uint32_t receiverId, MediaType type,
859                                          std::function<void(const MediaData::Ptr &data)> cb)
860 {
861     MEDIA_LOGD("in, receiverId: %{public}u.", receiverId);
862     auto notifier = GetNotifierByReceiverId(receiverId);
863     if (notifier == nullptr) {
864         SHARING_LOGE("notifier is nullptr.");
865         return -1;
866     }
867 
868     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
869     uint32_t readIndex = notifier->GetReceiverReadIndex(type);
870     if (readIndex >= circularBuffer_.size()) {
871         SHARING_LOGE("Read wrong index exceed size.");
872         return -1;
873     }
874 
875     if ((keyOnly_ || notifier->IsKeyModeReceiver()) && type == MEDIA_TYPE_VIDEO &&
876         !IsKeyVideoFrame(circularBuffer_.at(readIndex))) {
877         UpdateReceiverReadIndex(receiverId, readIndex, type);
878         SHARING_LOGE("Read Non Key Video in KeyOnly Mode index: %{public}u.", readIndex);
879         return -1;
880     }
881 
882     if (IsDataReaded(receiverId, circularBuffer_.at(readIndex))) {
883         UpdateReceiverReadIndex(receiverId, readIndex, type);
884         return -1;
885     }
886 
887     readIndex = notifier->GetReceiverReadIndex(type);
888     if (readIndex >= circularBuffer_.size()) {
889         return -1;
890     }
891 
892     auto data = circularBuffer_.at(readIndex);
893     if (data == nullptr) {
894         SHARING_LOGE("BufferDispatcher Read data nullptr.");
895         return -1;
896     }
897 
898     if (IsKeyVideoFrame(data)) {
899         int32_t bufferVideoCacheCnt = 0;
900         for (size_t i = (size_t)readIndex + 1; i < circularBuffer_.size(); i++) {
901             if (circularBuffer_[static_cast<int32_t>(i)]->mediaData->mediaType == MEDIA_TYPE_VIDEO)
902                 bufferVideoCacheCnt++;
903         }
904         MEDIA_LOGD("TEST STATISTIC:interval: buffer cache %{public}d frames.", bufferVideoCacheCnt);
905     }
906 
907     SetReceiverReadFlag(receiverId, data);
908     if (cb != nullptr) {
909         cb(data->mediaData);
910     }
911 
912     MEDIA_LOGD("Current data readed, Recvid:%{public}d, remain %{public}zu data, readIndex: %{public}u, "
913                "readtype: %{public}d, diff: %{public}zu.",
914                receiverId, circularBuffer_.size(), readIndex, int32_t(type), circularBuffer_.size() - readIndex);
915     UpdateReceiverReadIndex(receiverId, readIndex, type);
916     return 0;
917 }
918 
InputData(const MediaData::Ptr & data)919 int32_t BufferDispatcher::InputData(const MediaData::Ptr &data)
920 {
921     if (data == nullptr || data->buff == nullptr) {
922         SHARING_LOGE("data nullptr.");
923         return -1;
924     }
925     MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".", data->mediaType,
926                data->keyFrame, data->pts);
927 
928     if (!writing_) {
929         writing_ = true;
930     }
931 
932     DataSpec::Ptr dataSpec = std::make_shared<DataSpec>();
933     dataSpec->mediaData = data;
934     if (dataMode_ == MEDIA_AUDIO_ONLY) {
935         WriteDataIntoBuffer(dataSpec);
936     } else {
937         PreProcessDataSpec(dataSpec);
938     }
939 
940     if (circularBuffer_.size() > 0) {
941         MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".",
942                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->mediaType,
943                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->keyFrame,
944                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->pts);
945     }
946 
947     if (data->keyFrame) {
948         MEDIA_LOGD("dispatcherId: %{public}u, after InputData, current circularBuffer_ size: %{public}zu, "
949                    "idleVideoBuffer_ size: %{public}zu, idle_audioBuffer_ size: %{public}zu, "
950                    "keyFrame: %{public}s, data size: %{public}d, adataCount:%{public}d.",
951                    GetDispatcherId(), circularBuffer_.size(), idleVideoBuffer_.size(), idleAudioBuffer_.size(),
952                    data->keyFrame ? "true" : "false", data->buff->Size(), audioFrameCnt_);
953     }
954 
955     return 0;
956 }
957 
PreProcessDataSpec(const DataSpec::Ptr & dataSpec)958 void BufferDispatcher::PreProcessDataSpec(const DataSpec::Ptr &dataSpec)
959 {
960     MEDIA_LOGD("trace.");
961     if (waitingKey_) {
962         if (IsAudioData(dataSpec)) {
963         } else if (!IsKeyVideoFrame(dataSpec)) {
964             SHARING_LOGD("BufferDispatcher Waiting First Key Video Frame.");
965             return;
966         } else {
967             SHARING_LOGD("BufferDispatcher received first key video frame and restore from uncontinuous...Flushing.");
968             FlushBuffer();
969             baseCounter_++;
970             capacityEvaluating_ = true;
971             waitingKey_ = false;
972         }
973     } else {
974         if (capacityEvaluating_) {
975             ReCalculateCapacity(IsKeyVideoFrame(dataSpec));
976         }
977     }
978 
979     WriteDataIntoBuffer(dataSpec);
980 }
981 
WriteDataIntoBuffer(const DataSpec::Ptr & data)982 int32_t BufferDispatcher::WriteDataIntoBuffer(const DataSpec::Ptr &data)
983 {
984     MEDIA_LOGD("trace.");
985     if (data->mediaData == nullptr || data->mediaData->buff == nullptr) {
986         SHARING_LOGE("null data.");
987         return -1;
988     }
989 
990     if (NeedExtendToDBCapacity()) {
991         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d  CRTL_SIZE.", doubleBufferCapacity_);
992         SetBufferCapacity(doubleBufferCapacity_);
993     }
994 
995     if (NeedRestoreToNormalCapacity()) {
996         std::unique_lock<std::shared_mutex> locker(bufferMutex_);
997         int32_t popSize = (int32_t)circularBuffer_.size() - (int32_t)INITIAL_BUFFER_CAPACITY;
998         for (int32_t i = 0; i < popSize; i++) {
999             if (HeadFrameNeedReserve()) {
1000                 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1001                            "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1002                            GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1003                            circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1004                            circularBuffer_.front()->mediaData->pts);
1005             }
1006 
1007             MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: "
1008                        "%{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1009                        GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1010                        circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1011                        circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1012             circularBuffer_.pop_front();
1013             audioFrameCnt_--;
1014             UpdateIndex();
1015         }
1016 
1017         baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1018         doubleBufferCapacity_ = INITIAL_BUFFER_CAPACITY * 2; // 2 : increasement
1019         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d  NORMALSIZE.", baseBufferCapacity_);
1020         circularBuffer_.set_capacity(baseBufferCapacity_);
1021     }
1022 
1023     if (IsKeyVideoFrame(data) && !keyOnly_) {
1024         EraseOldGopDatas();
1025     }
1026 
1027     bool updateIndexFlag = false;
1028     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1029     if (circularBuffer_.size() >= circularBuffer_.capacity()) {
1030         updateIndexFlag = true;
1031     }
1032 
1033     if (updateIndexFlag) {
1034         uint32_t nextDeleteIndex = 1;
1035         if (IsVideoData(data)) {
1036             nextDeleteIndex = FindNextDeleteVideoIndex();
1037         }
1038 
1039         for (size_t i = 0; i <= nextDeleteIndex; i++) {
1040             MediaType headType = circularBuffer_.front()->mediaData->mediaType;
1041             DataSpec::Ptr retBuff = circularBuffer_.front();
1042             if (HeadFrameNeedReserve()) {
1043                 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1044                            "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1045                            GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1046                            retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1047             }
1048 
1049             MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, "
1050                        "keyFrame: %{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1051                        GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1052                        circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1053                        circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1054 
1055             circularBuffer_.pop_front();
1056             ReturnIdleBuffer(retBuff);
1057             headType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1058             UpdateIndex();
1059         }
1060     }
1061 
1062     data->reserveFlag = 0;
1063     MEDIA_LOGD("WriteDataIntoBuffer data type: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1064                ", cur_size: %{public}zu, capacity: %{public}zu dispatcher[%{public}u].",
1065                int32_t(data->mediaData->mediaType), data->mediaData->keyFrame ? "true" : "false", data->mediaData->pts,
1066                circularBuffer_.size(), circularBuffer_.capacity(), GetDispatcherId());
1067     circularBuffer_.push_back(data);
1068     if (IsAudioData(data)) {
1069         lastAudioIndex_ = circularBuffer_.size() - 1;
1070         ActiveDataRef(MEDIA_TYPE_AUDIO, false);
1071         audioFrameCnt_++;
1072     } else {
1073         lastVideoIndex_ = circularBuffer_.size() - 1;
1074         if (!keyOnly_ || IsKeyVideoFrame(data)) {
1075             ActiveDataRef(MEDIA_TYPE_VIDEO, IsKeyVideoFrame(data));
1076         }
1077         videoFrameCnt_++;
1078     }
1079 
1080     if (audioNeedActivate_ && IsAudioData(data)) {
1081         MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By AudioData.");
1082         ActivateReceiverIndex(circularBuffer_.size() - 1, MEDIA_TYPE_AUDIO);
1083     }
1084 
1085     if (IsKeyVideoFrame(data)) {
1086         uint32_t keyIndex = circularBuffer_.size() - 1;
1087         {
1088             std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1089             keyIndexList_.push_back(keyIndex);
1090         }
1091         if (videoNeedActivate_) {
1092             MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By KeyVideo Frame index: %{public}d.", keyIndex);
1093             ActivateReceiverIndex(keyIndex, MEDIA_TYPE_VIDEO);
1094         }
1095         if (keyRedirect_) {
1096             OnKeyRedirect();
1097             EnableKeyRedirect(false);
1098         }
1099     }
1100 
1101     continueNotify_ = true;
1102     dataCV_.notify_one();
1103     return 0;
1104 }
1105 
EraseOldGopDatas()1106 void BufferDispatcher::EraseOldGopDatas()
1107 {
1108     MEDIA_LOGD("BufferDispatcher Delete old datas In.");
1109     if (dataMode_ == MEDIA_AUDIO_ONLY) {
1110         FlushBuffer();
1111         return;
1112     }
1113 
1114     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1115     uint32_t nextKey = 0;
1116     {
1117         std::lock_guard<std::mutex> lock(notifyMutex_);
1118         if (!keyIndexList_.empty() && keyIndexList_.back() > 0) {
1119             MEDIA_LOGD("find next key listsize %{public}zu, back:%{public}d.", keyIndexList_.size(),
1120                        keyIndexList_.back());
1121             nextKey = keyIndexList_.back();
1122             keyIndexList_.clear();
1123             keyIndexList_.push_back(nextKey);
1124         }
1125     }
1126 
1127     MEDIA_LOGD("erase between 0 to next Video Frame %{public}d.", nextKey);
1128     DeleteHeadDatas(nextKey, false);
1129     nextKey = FindNextDeleteVideoIndex();
1130     DeleteHeadDatas(nextKey, true);
1131     std::string indexs;
1132 
1133     MEDIA_LOGD("circularBuffer_ size: %{public}zu.", circularBuffer_.size());
1134     for (auto &keyIndex : keyIndexList_) {
1135         indexs += std::to_string(keyIndex) + ", ";
1136         MEDIA_LOGD("keyIndex update to %{public}d.", keyIndex);
1137     }
1138 
1139     MEDIA_LOGD("current keyIndex: %{public}s.", indexs.c_str());
1140 }
1141 
DeleteHeadDatas(uint32_t size,bool forceDelete)1142 void BufferDispatcher::DeleteHeadDatas(uint32_t size, bool forceDelete)
1143 {
1144     SHARING_LOGI("%{public}s, size %{public}d.", __FUNCTION__, size);
1145     if (size <= 0) {
1146         MEDIA_LOGW("invalid Size, dispatcherId: %{public}u!", GetDispatcherId());
1147         return;
1148     }
1149 
1150     for (size_t i = 0; i < size; i++) {
1151         if (HeadFrameNeedReserve() && !forceDelete) {
1152             MEDIA_LOGD("index %{public}zu need reserve.", i);
1153             break;
1154         }
1155         if (circularBuffer_.empty()) {
1156             return;
1157         }
1158         DataSpec::Ptr retBuff = circularBuffer_.front();
1159         MEDIA_LOGD("BufferDispatcher pop out headtype  %{public}d.", retBuff->mediaData->mediaType);
1160         retBuff->mediaData->mediaType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1161         if (HeadFrameNeedReserve()) {
1162             MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1163                        "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1164                        GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1165                        retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1166         }
1167 
1168         MEDIA_LOGD(
1169             "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1170             ", reserveFlag: %{public}x.",
1171             GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1172             circularBuffer_.front()->mediaData->keyFrame ? "true" : "false", circularBuffer_.front()->mediaData->pts,
1173             circularBuffer_.front()->reserveFlag.load());
1174         circularBuffer_.pop_front();
1175         ReturnIdleBuffer(retBuff);
1176         UpdateIndex();
1177     }
1178 
1179     if (circularBuffer_.size() < baseBufferCapacity_ && circularBuffer_.capacity() > baseBufferCapacity_) {
1180         MEDIA_LOGE("capacity return to base %{public}d.", baseBufferCapacity_);
1181         circularBuffer_.set_capacity(baseBufferCapacity_);
1182     }
1183 }
1184 
UpdateIndex()1185 void BufferDispatcher::UpdateIndex()
1186 {
1187     MEDIA_LOGD("trace.");
1188     std::lock_guard<std::mutex> locker(notifyMutex_);
1189     if (!keyIndexList_.empty() && keyIndexList_.front() == 0) {
1190         keyIndexList_.pop_front();
1191         MEDIA_LOGD("BufferDispatcher pop out first  0 keyIndex after listsize %{public}zu.", keyIndexList_.size());
1192     }
1193 
1194     for (auto &keyIndex : keyIndexList_) {
1195         if (keyIndex > 0) {
1196             keyIndex--;
1197         }
1198     }
1199 
1200     for (auto &[recvId, notifier] : notifiers_) {
1201         if (notifier->videoIndex > 0 && notifier->videoIndex != INVALID_INDEX) {
1202             notifier->videoIndex--;
1203         }
1204         if (notifier->audioIndex > 0 && notifier->audioIndex != INVALID_INDEX) {
1205             notifier->audioIndex--;
1206         }
1207     }
1208 
1209     if (lastVideoIndex_ > 0 && lastVideoIndex_ != INVALID_INDEX) {
1210         lastVideoIndex_--;
1211     }
1212 
1213     if (lastAudioIndex_ > 0 && lastAudioIndex_ != INVALID_INDEX) {
1214         lastAudioIndex_--;
1215     }
1216 }
1217 
FindNextDeleteVideoIndex()1218 uint32_t BufferDispatcher::FindNextDeleteVideoIndex()
1219 {
1220     MEDIA_LOGD("trace.");
1221     for (size_t i = 0; i < circularBuffer_.size(); i++) {
1222         if (circularBuffer_[i]->mediaData != nullptr && circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
1223             return i;
1224         }
1225     }
1226 
1227     return 0;
1228 }
1229 
FindLastIndex(MediaType type)1230 uint32_t BufferDispatcher::FindLastIndex(MediaType type)
1231 {
1232     SHARING_LOGD("trace.");
1233     if (circularBuffer_.empty()) {
1234         return INVALID_INDEX;
1235     }
1236 
1237     return type == MEDIA_TYPE_AUDIO ? lastAudioIndex_ : lastVideoIndex_;
1238 }
1239 
UpdateReceiverReadIndex(uint32_t receiverId,const uint32_t readIndex,MediaType type)1240 void BufferDispatcher::UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type)
1241 {
1242     MEDIA_LOGD("trace.");
1243     uint32_t nextIndex = FindNextIndex(readIndex, type, receiverId);
1244     bool noAvaliableData = false;
1245     if (nextIndex == readIndex) {
1246         noAvaliableData = true;
1247     }
1248 
1249     auto notifier = GetNotifierByReceiverId(receiverId);
1250     if (notifier == nullptr) {
1251         SHARING_LOGE("notifier is nullptr.");
1252         return;
1253     }
1254 
1255     bool readOver = circularBuffer_.size() - readIndex < 3;
1256     if (readOver && notifier->NeedAcceleration() && type == MEDIA_TYPE_VIDEO) {
1257         SHARING_LOGD("BufferDispatcher SendAccelerationDone.");
1258         notifier->SendAccelerationDone();
1259     }
1260 
1261     notifier->SetNeedUpdate(noAvaliableData, type);
1262 
1263     if (type == MEDIA_TYPE_VIDEO) {
1264         notifier->videoIndex = nextIndex;
1265     } else if (type == MEDIA_TYPE_AUDIO) {
1266         notifier->audioIndex = nextIndex;
1267     } else {
1268         notifier->videoIndex = nextIndex;
1269         notifier->audioIndex = nextIndex;
1270     }
1271 
1272     MEDIA_LOGD("After UpdateReceiverReadIndex  type %{public}d, aindex %{public}d, vindex %{public}d.", type,
1273                notifier->audioIndex, notifier->videoIndex);
1274 }
1275 
FindNextIndex(uint32_t index,MediaType type)1276 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type)
1277 {
1278     MEDIA_LOGD("trace.");
1279     if ((uint64_t)index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1280         return index;
1281     }
1282 
1283     if (type == MEDIA_TYPE_AV) {
1284         return index + 1;
1285     }
1286 
1287     for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1288         if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1289             if (keyOnly_ && type == MEDIA_TYPE_VIDEO && !IsKeyVideoFrame(circularBuffer_[i])) {
1290                 continue;
1291             } else {
1292                 return i;
1293             }
1294         }
1295     }
1296 
1297     return index;
1298 }
1299 
FindNextIndex(uint32_t index,MediaType type,uint32_t receiverId)1300 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId)
1301 {
1302     MEDIA_LOGD("trace.");
1303     if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1304         return index;
1305     }
1306 
1307     if (type == MEDIA_TYPE_AV) {
1308         return index + 1;
1309     }
1310 
1311     auto notifier = GetNotifierByReceiverId(receiverId);
1312     if (notifier == nullptr) {
1313         SHARING_LOGE("FindNextIndex GetNotifier nullptr.");
1314         return INVALID_INDEX;
1315     }
1316 
1317     bool keyModeReceiver = notifier->IsKeyModeReceiver();
1318     for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1319         if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1320             if ((keyOnly_ || keyModeReceiver) && type == MEDIA_TYPE_VIDEO) {
1321                 if (!IsKeyVideoFrame(circularBuffer_[i])) {
1322                     continue;
1323                 } else {
1324                     for (size_t bIndex = index + 1; bIndex < i; bIndex++) {
1325                         SetReceiverReadFlag(receiverId, circularBuffer_[bIndex]);
1326                     }
1327                     return i;
1328                 }
1329             } else {
1330                 return i;
1331             }
1332         }
1333     }
1334 
1335     return index;
1336 }
1337 
ResetAllIndex()1338 void BufferDispatcher::ResetAllIndex()
1339 {
1340     SHARING_LOGD("trace.");
1341     std::lock_guard<std::mutex> locker(notifyMutex_);
1342     keyIndexList_.clear();
1343     for (auto &[recvId, notifier] : notifiers_) {
1344         notifier->videoIndex = INVALID_INDEX;
1345         notifier->audioIndex = INVALID_INDEX;
1346     }
1347 
1348     videoNeedActivate_ = true;
1349     audioNeedActivate_ = true;
1350     lastAudioIndex_ = INVALID_INDEX;
1351     lastVideoIndex_ = INVALID_INDEX;
1352 }
1353 
IsDataReaded(uint32_t receiverId,DataSpec::Ptr & dataSpec)1354 bool BufferDispatcher::IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1355 {
1356     MEDIA_LOGD("trace.");
1357     uint32_t index = FindReceiverIndex(receiverId);
1358     if (index == INVALID_INDEX) {
1359         return false;
1360     }
1361 
1362     return dataSpec->reserveFlag & RECV_FLAG_BASE << index;
1363 }
1364 
IsVideoData(const DataSpec::Ptr & dataSpec)1365 bool BufferDispatcher::IsVideoData(const DataSpec::Ptr &dataSpec)
1366 {
1367     MEDIA_LOGD("trace.");
1368     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1369         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1370         return false;
1371     }
1372 
1373     return dataSpec->mediaData->mediaType == MEDIA_TYPE_VIDEO;
1374 }
1375 
IsAudioData(const DataSpec::Ptr & dataSpec)1376 bool BufferDispatcher::IsAudioData(const DataSpec::Ptr &dataSpec)
1377 {
1378     MEDIA_LOGD("trace.");
1379     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1380         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1381         return false;
1382     }
1383 
1384     return dataSpec->mediaData->mediaType == MEDIA_TYPE_AUDIO;
1385 }
1386 
IsKeyVideoFrame(const DataSpec::Ptr & dataSpec)1387 bool BufferDispatcher::IsKeyVideoFrame(const DataSpec::Ptr &dataSpec)
1388 {
1389     MEDIA_LOGD("trace.");
1390     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1391         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1392         return false;
1393     }
1394 
1395     return IsVideoData(dataSpec) && dataSpec->mediaData->keyFrame;
1396 }
1397 
HeadFrameNeedReserve()1398 bool BufferDispatcher::HeadFrameNeedReserve()
1399 {
1400     MEDIA_LOGD("trace.");
1401     if (!circularBuffer_.empty()) {
1402         uint8_t temp = readRefFlag_;
1403         MEDIA_LOGD("IsHeadFrameNeedReserve Head reserveFlag %{public}d readRefFlag_ %{public}d.",
1404                    circularBuffer_.front()->reserveFlag.load(), readRefFlag_);
1405         return temp ^ circularBuffer_.front()->reserveFlag;
1406     }
1407 
1408     return false;
1409 }
1410 
NeedExtendToDBCapacity()1411 bool BufferDispatcher::NeedExtendToDBCapacity()
1412 {
1413     MEDIA_LOGD("trace.");
1414     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1415     return (circularBuffer_.size() >= circularBuffer_.capacity() &&
1416             circularBuffer_.capacity() < doubleBufferCapacity_ && HeadFrameNeedReserve());
1417 }
1418 
NeedRestoreToNormalCapacity()1419 bool BufferDispatcher::NeedRestoreToNormalCapacity()
1420 {
1421     MEDIA_LOGD("trace.");
1422     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1423     return audioFrameCnt_ >= circularBuffer_.capacity() && circularBuffer_.capacity() != INITIAL_BUFFER_CAPACITY;
1424 }
1425 
ReCalculateCapacity(bool keyFrame)1426 void BufferDispatcher::ReCalculateCapacity(bool keyFrame)
1427 {
1428     MEDIA_LOGD("trace.");
1429     baseCounter_++;
1430     if (baseCounter_ >= maxBufferCapacity_) {
1431         SHARING_LOGE("BufferDispatcher too many Audiodata need Set Capacity to default.");
1432     }
1433 
1434     if (baseCounter_ >= circularBuffer_.capacity() && !keyFrame) {
1435         uint32_t tmpSize = circularBuffer_.capacity() + bufferCapacityIncrement_ < maxBufferCapacity_
1436                                ? circularBuffer_.capacity() + bufferCapacityIncrement_
1437                                : maxBufferCapacity_;
1438         doubleBufferCapacity_ = tmpSize * 2 < maxBufferCapacity_ ? tmpSize * 2 : maxBufferCapacity_; // 2: increasement
1439         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d in adaptive capacity calculating.", tmpSize);
1440         SetBufferCapacity(tmpSize);
1441         return;
1442     }
1443 
1444     if (keyFrame) {
1445         baseBufferCapacity_ = baseCounter_ + bufferCapacityIncrement_ < maxBufferCapacity_
1446                                   ? baseCounter_ + bufferCapacityIncrement_
1447                                   : maxBufferCapacity_;
1448         if (baseBufferCapacity_ < INITIAL_BUFFER_CAPACITY) {
1449             baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1450         }
1451         doubleBufferCapacity_ = baseBufferCapacity_ * 2 < maxBufferCapacity_ // 2: increasement
1452                                     ? baseBufferCapacity_ * 2                // 2: increasement
1453                                     : maxBufferCapacity_;
1454         gop_ = baseCounter_;
1455         capacityEvaluating_ = gop_ > 0 ? false : true;
1456         SetBufferCapacity(baseBufferCapacity_);
1457         baseCounter_ = 0;
1458         SHARING_LOGI(
1459             "The gop is %{public}d and BufferDispatcher buffer Extended to %{public}d on base capacity confirmed.",
1460             GetCurrentGop(), baseBufferCapacity_);
1461     }
1462 }
1463 
NotifyThreadWorker(void * userParam)1464 int32_t BufferDispatcher::NotifyThreadWorker(void *userParam)
1465 {
1466     SHARING_LOGI("BufferDispatcher DataNotifier thread in.");
1467     RETURN_INVALID_IF_NULL(userParam);
1468     BufferDispatcher *dispatcher = (BufferDispatcher *)userParam;
1469     while (dispatcher->running_) {
1470         std::unique_lock<std::mutex> locker(dispatcher->notifyMutex_);
1471         uint32_t notifyRef = dispatcher->dataBitRef_ & dispatcher->recvBitRef_;
1472         MEDIA_LOGD("DataBitRef %{public}u   recvBitRef_ %{public}d   notifyRef_ %{public}d.",
1473                    dispatcher->dataBitRef_.load(), dispatcher->recvBitRef_.load(), notifyRef);
1474 
1475         for (auto &[recvId, notifier] : dispatcher->notifiers_) {
1476             auto index = notifier->GetReadIndex();
1477             if (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ||
1478                 ((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO + FIX_OFFSET_ONE)) & notifyRef)) {
1479                 MediaType notifyType;
1480                 if (notifier->IsMixedReceiver()) {
1481                     notifyType = MEDIA_TYPE_AV;
1482                 } else {
1483                     notifyType = (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ? MEDIA_TYPE_AUDIO
1484                                                                                              : MEDIA_TYPE_VIDEO);
1485                 }
1486                 MEDIA_LOGD("notify the receiveId: %{public}d, notifyType: %{public}d, notifyRef: %{public}x.", recvId,
1487                            notifyType, notifyRef);
1488                 notifier->NotifyDataReceiver(notifyType);
1489             }
1490         }
1491 
1492         dispatcher->dataCV_.wait(locker, [&dispatcher]() { return dispatcher->continueNotify_.load(); });
1493         dispatcher->continueNotify_ = false;
1494     }
1495 
1496     return 0;
1497 }
1498 
NotifyReadReady(uint32_t receiverId,MediaType type)1499 void BufferDispatcher::NotifyReadReady(uint32_t receiverId, MediaType type)
1500 {
1501     MEDIA_LOGD("trace.");
1502     auto notifier = GetNotifierByReceiverId(receiverId);
1503     if (notifier == nullptr) {
1504         SHARING_LOGE("notifier is nullptr.");
1505         return;
1506     }
1507 
1508     std::shared_lock<std::shared_mutex> lock(bufferMutex_);
1509     std::unique_lock<std::mutex> locker(notifyMutex_);
1510     if (type == MEDIA_TYPE_AV) {
1511         SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, true);
1512         SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, true);
1513     } else {
1514         SetReceiverReadRef(receiverId, type, true);
1515     }
1516 
1517     bool dataAvaliable = notifier->DataAvailable(type);
1518     MEDIA_LOGD("receiverId %{public}d  MediaType %{public}d  dataAvaliable %{public}d.", receiverId, type,
1519                dataAvaliable);
1520     if (type == MEDIA_TYPE_AV) {
1521         SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, dataAvaliable);
1522         SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, dataAvaliable);
1523     } else {
1524         SetReceiverDataRef(receiverId, type, dataAvaliable);
1525     }
1526 
1527     if (!dataAvaliable) {
1528         return;
1529     }
1530 
1531     continueNotify_ = true;
1532     dataCV_.notify_one();
1533 }
1534 
SetDataRef(uint32_t bitref)1535 void BufferDispatcher::SetDataRef(uint32_t bitref)
1536 {
1537     SHARING_LOGD("trace.");
1538     dataBitRef_ &= bitref;
1539 }
1540 
GetDataRef()1541 uint32_t BufferDispatcher::GetDataRef()
1542 {
1543     SHARING_LOGD("trace.");
1544     return dataBitRef_;
1545 }
1546 
SetReadRef(uint32_t bitref)1547 void BufferDispatcher::SetReadRef(uint32_t bitref)
1548 {
1549     SHARING_LOGD("trace.");
1550     recvBitRef_ &= bitref;
1551 }
1552 
GetReadRef()1553 uint32_t BufferDispatcher::GetReadRef()
1554 {
1555     SHARING_LOGD("trace.");
1556     return recvBitRef_;
1557 }
1558 
ActiveDataRef(MediaType type,bool keyFrame)1559 void BufferDispatcher::ActiveDataRef(MediaType type, bool keyFrame)
1560 {
1561     MEDIA_LOGD("trace.");
1562     std::unique_lock<std::mutex> locker(notifyMutex_);
1563     uint32_t bitRef = 0x0000;
1564     for (auto &[recvId, notifier] : notifiers_) {
1565         auto index = notifier->GetReadIndex();
1566         if (type == MEDIA_TYPE_AUDIO) {
1567             bitRef |= RECV_FLAG_BASE << (index * 2); // 2: fix offset, get audio notifyer id
1568             continue;
1569         }
1570         bool keyModeReceiver = false;
1571         keyModeReceiver = notifier->IsKeyModeReceiver();
1572         if (keyFrame && keyModeReceiver && keyIndexList_.empty()) {
1573             notifier->videoIndex = circularBuffer_.size() - 1;
1574         }
1575         if (!keyModeReceiver || keyFrame) {
1576             if (index != INVALID_INDEX) {
1577                 bitRef |= RECV_FLAG_BASE << (index * 2 + 1); // 2: fix offset, get video notifyer id
1578             }
1579         }
1580     }
1581 
1582     dataBitRef_ |= bitRef;
1583 }
1584 
SetReceiverDataRef(uint32_t receiverId,MediaType type,bool ready)1585 void BufferDispatcher::SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready)
1586 {
1587     MEDIA_LOGD("trace.");
1588     uint32_t index = FindReceiverIndex(receiverId);
1589     if (index == INVALID_INDEX) {
1590         return;
1591     }
1592 
1593     if (type == MEDIA_TYPE_AUDIO) {
1594         uint32_t audioBit = index * 2;
1595         uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1596         MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1597         if (!ready) {
1598             bitRef = ~bitRef;
1599             dataBitRef_ &= bitRef;
1600         } else {
1601             dataBitRef_ |= bitRef;
1602         }
1603     } else if (type == MEDIA_TYPE_VIDEO) {
1604         uint32_t videoBit = index * 2 + 1;
1605         uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1606         MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1607         if (!ready) {
1608             bitRef = ~bitRef;
1609             dataBitRef_ &= bitRef;
1610         } else {
1611             dataBitRef_ |= bitRef;
1612         }
1613     }
1614 }
1615 
SetReceiverReadRef(uint32_t receiverId,MediaType type,bool ready)1616 void BufferDispatcher::SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready)
1617 {
1618     MEDIA_LOGD("trace.");
1619     uint32_t index = FindReceiverIndex(receiverId);
1620     if (index == INVALID_INDEX) {
1621         return;
1622     }
1623 
1624     if (type == MEDIA_TYPE_AUDIO) {
1625         uint32_t audioBit = index * 2;
1626         uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1627         MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1628 
1629         if (!ready) {
1630             bitRef = ~bitRef;
1631             recvBitRef_ &= bitRef;
1632         } else {
1633             recvBitRef_ |= bitRef;
1634         }
1635     } else if (type == MEDIA_TYPE_VIDEO) {
1636         uint32_t videoBit = index * 2 + 1;
1637         uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1638         MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1639 
1640         if (!ready) {
1641             bitRef = ~bitRef;
1642             recvBitRef_ &= bitRef;
1643         } else {
1644             recvBitRef_ |= bitRef;
1645         }
1646     }
1647 }
1648 
GetReceiverDataRef(uint32_t receiverId)1649 uint32_t BufferDispatcher::GetReceiverDataRef(uint32_t receiverId)
1650 {
1651     SHARING_LOGD("trace.");
1652     return 0;
1653 }
1654 
GetReceiverReadRef(uint32_t receiverId)1655 uint32_t BufferDispatcher::GetReceiverReadRef(uint32_t receiverId)
1656 {
1657     SHARING_LOGD("trace.");
1658     uint32_t retBitRef = GetReceiverIndexRef(receiverId);
1659     retBitRef &= recvBitRef_;
1660     return retBitRef;
1661 }
1662 
GetReceiverIndexRef(uint32_t receiverId)1663 uint32_t BufferDispatcher::GetReceiverIndexRef(uint32_t receiverId)
1664 {
1665     SHARING_LOGD("trace.");
1666     uint32_t index = FindReceiverIndex(receiverId);
1667     uint32_t audioBit = index * 2;
1668     uint32_t videoBit = index * 2 + 1;
1669     uint32_t retBitRef = 0x0000;
1670     if (videoBit < 32) { // 32:bit width
1671         retBitRef |= RECV_FLAG_BASE << audioBit;
1672         retBitRef |= RECV_FLAG_BASE << videoBit;
1673     }
1674     return retBitRef;
1675 }
1676 
ClearReadBit(uint32_t receiverId,MediaType type)1677 void BufferDispatcher::ClearReadBit(uint32_t receiverId, MediaType type)
1678 {
1679     MEDIA_LOGD("trace.");
1680     std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1681     if (type != MEDIA_TYPE_AV) {
1682         SetReceiverReadRef(receiverId, type, false);
1683     } else {
1684         SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
1685         SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
1686     }
1687 }
1688 
ClearDataBit(uint32_t receiverId,MediaType type)1689 void BufferDispatcher::ClearDataBit(uint32_t receiverId, MediaType type)
1690 {
1691     MEDIA_LOGD("trace.");
1692     std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1693     if (type != MEDIA_TYPE_AV) {
1694         SetReceiverDataRef(receiverId, type, false);
1695     } else {
1696         SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, false);
1697         SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, false);
1698     }
1699 }
1700 
IsRead(uint32_t receiverId,uint32_t index)1701 bool BufferDispatcher::IsRead(uint32_t receiverId, uint32_t index)
1702 {
1703     MEDIA_LOGD("trace.");
1704     if (index >= circularBuffer_.size()) {
1705         return true;
1706     } else {
1707         return IsDataReaded(receiverId, circularBuffer_.at(index));
1708     }
1709 }
1710 
ActivateReceiverIndex(uint32_t index,MediaType type)1711 void BufferDispatcher::ActivateReceiverIndex(uint32_t index, MediaType type)
1712 {
1713     MEDIA_LOGD("trace.");
1714     std::unique_lock<std::mutex> lock(notifyMutex_);
1715     for (auto &[recvId, notifier] : notifiers_) {
1716         if (type == MEDIA_TYPE_VIDEO) {
1717             if (notifier->videoIndex == INVALID_INDEX) {
1718                 notifier->videoIndex = index;
1719                 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->videoIndex);
1720                 videoNeedActivate_ = false;
1721             }
1722         } else {
1723             if (notifier->audioIndex == INVALID_INDEX) {
1724                 notifier->audioIndex = index;
1725                 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->audioIndex);
1726                 audioNeedActivate_ = false;
1727             }
1728         }
1729     }
1730 }
UnlockWaitingReceiverIndex(MediaType type)1731 void BufferDispatcher::UnlockWaitingReceiverIndex(MediaType type)
1732 {
1733     SHARING_LOGD("trace.");
1734 }
1735 
SetReceiverReadFlag(uint32_t receiverId,DataSpec::Ptr & dataSpec)1736 void BufferDispatcher::SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1737 {
1738     MEDIA_LOGD("trace.");
1739     RETURN_IF_NULL(dataSpec);
1740     uint32_t index = FindReceiverIndex(receiverId);
1741     if (index != INVALID_INDEX) {
1742         dataSpec->reserveFlag |= RECV_FLAG_BASE << index;
1743         MEDIA_LOGD("mediaType: %{public}d, pts: %{public}" PRIu64
1744                    ", reserveFlag: %{public}x, receiverId: %{public}d, index: %{public}d.",
1745                    dataSpec->mediaData->mediaType, dataSpec->mediaData->pts, dataSpec->reserveFlag.load(), receiverId,
1746                    index);
1747     }
1748 }
1749 
CancelReserve()1750 void BufferDispatcher::CancelReserve()
1751 {
1752     SHARING_LOGD("trace.");
1753     for (auto &data : circularBuffer_) {
1754         data->reserveFlag = 0xff;
1755     }
1756 }
1757 
SetSpsNalu(MediaData::Ptr spsbuf)1758 void BufferDispatcher::SetSpsNalu(MediaData::Ptr spsbuf)
1759 {
1760     SHARING_LOGD("trace.");
1761     spsBuf_ = spsbuf;
1762 }
1763 
GetSPS()1764 const MediaData::Ptr BufferDispatcher::GetSPS()
1765 {
1766     MEDIA_LOGD("trace.");
1767     return spsBuf_;
1768 }
1769 
SetPpsNalu(MediaData::Ptr ppsbuf)1770 void BufferDispatcher::SetPpsNalu(MediaData::Ptr ppsbuf)
1771 {
1772     SHARING_LOGD("trace.");
1773     ppsBuf_ = ppsbuf;
1774 }
1775 
GetPPS()1776 const MediaData::Ptr BufferDispatcher::GetPPS()
1777 {
1778     MEDIA_LOGD("trace.");
1779     return ppsBuf_;
1780 }
1781 
GetCurrentGop()1782 uint32_t BufferDispatcher::GetCurrentGop()
1783 {
1784     SHARING_LOGD("trace.");
1785     return gop_;
1786 }
1787 
OnKeyRedirect()1788 void BufferDispatcher::OnKeyRedirect()
1789 {
1790     SHARING_LOGD("trace.");
1791     std::unique_lock<std::mutex> lock(notifyMutex_);
1792     if (keyIndexList_.empty()) {
1793         return;
1794     }
1795 
1796     auto nextIndex = keyIndexList_.back();
1797 
1798     for (auto &[recvId, notifier] : notifiers_) {
1799         if (notifier->IsKeyRedirectReceiver()) {
1800             SHARING_LOGD("receiverId: %{public}u, videoIndex: %{public}d, nextIndex: %{public}d.",
1801                          notifier->GetReceiverId(), notifier->videoIndex, nextIndex);
1802             auto curIndex = notifier->videoIndex;
1803             notifier->videoIndex = nextIndex;
1804             for (auto i = curIndex; i < nextIndex; i++) {
1805                 SetReceiverReadFlag(notifier->GetReceiverId(), circularBuffer_[i]);
1806             }
1807             if (!rapidMode_) {
1808                 auto receiver = notifier->GetBufferReceiver();
1809                 if (receiver != nullptr) {
1810                     receiver->EnableKeyRedirect(false);
1811                 }
1812             }
1813         }
1814     }
1815 }
1816 
EnableRapidMode(bool enable)1817 void BufferDispatcher::EnableRapidMode(bool enable)
1818 {
1819     SHARING_LOGD("trace.");
1820     rapidMode_ = enable;
1821 }
1822 
1823 } // namespace Sharing
1824 } // namespace OHOS
1825