• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "buffer_dispatcher.h"
17 #include <cmath>
18 #include <cstdarg>
19 #include <cstdint>
20 #include "common/common_macro.h"
21 #include "media_channel_def.h"
22 
23 namespace OHOS {
24 namespace Sharing {
25 
26 constexpr int32_t WRITING_TIMTOUT = 30;
27 constexpr int32_t FIX_OFFSET_TWO = 2;
28 constexpr int32_t FIX_OFFSET_ONE = 1;
29 
SetSource(IBufferReader::Ptr dataReader)30 void BufferReceiver::SetSource(IBufferReader::Ptr dataReader)
31 {
32     SHARING_LOGD("trace.");
33     bufferReader_ = dataReader;
34 }
35 
OnMediaDataNotify()36 int32_t BufferReceiver::OnMediaDataNotify()
37 {
38     SHARING_LOGD("BufferReceiver Media notified.");
39     dataReady_ = true;
40     notifyData_.notify_one();
41     return 0;
42 }
43 
OnAudioDataNotify()44 int32_t BufferReceiver::OnAudioDataNotify()
45 {
46     MEDIA_LOGD("BufferReceiver Audio notified.");
47     nonBlockAudio_ = true;
48     notifyAudio_.notify_one();
49     return 0;
50 }
51 
OnVideoDataNotify()52 int32_t BufferReceiver::OnVideoDataNotify()
53 {
54     MEDIA_LOGD("BufferReceiver Video notified.");
55     nonBlockVideo_ = true;
56     notifyVideo_.notify_one();
57     return 0;
58 }
59 
IsMixedReceiver()60 bool BufferReceiver::IsMixedReceiver()
61 {
62     MEDIA_LOGD("trace.");
63     return mixed_;
64 }
65 
RequestRead(MediaType type,std::function<void (const MediaData::Ptr & data)> cb)66 int32_t BufferReceiver::RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb)
67 {
68     MEDIA_LOGD("trace.");
69     if (bufferReader_ == nullptr) {
70         SHARING_LOGE("BufferReceiver read failed null dispatcher.");
71         return -1;
72     }
73 
74     if (firstMRead_ && type == MEDIA_TYPE_AV) {
75         bufferReader_->NotifyReadReady(GetReceiverId(), type);
76         mixed_ = true;
77         firstMRead_ = false;
78     } else if (firstARead_ && type == MEDIA_TYPE_AUDIO) {
79         bufferReader_->NotifyReadReady(GetReceiverId(), type);
80         firstARead_ = false;
81     } else if (firstVRead_ && type == MEDIA_TYPE_VIDEO) {
82         bufferReader_->NotifyReadReady(GetReceiverId(), type);
83         firstVRead_ = false;
84     }
85     std::unique_lock<std::mutex> locker(mutex_);
86     MEDIA_LOGD("BufferDispatcher NotifyThreadWorker before wait, receiverId: %{public}u.", GetReceiverId());
87     switch (type) {
88         /*  cv will waiting if pred is false;
89             so set waiting audio pred (type != MEDIA_TYPE_AUDIO) to NOT block other type.*/
90         case MEDIA_TYPE_AUDIO:
91             MEDIA_LOGD("wait Audio, receiverId: %{public}u.", GetReceiverId());
92             notifyAudio_.wait(locker, [=]() { return nonBlockAudio_ || type != MEDIA_TYPE_AUDIO; });
93             nonBlockAudio_ = false;
94             break;
95         case MEDIA_TYPE_VIDEO:
96             MEDIA_LOGD("wait Video, receiverId: %{public}u.", GetReceiverId());
97             notifyVideo_.wait(locker, [=]() { return nonBlockVideo_ || type != MEDIA_TYPE_VIDEO; });
98             nonBlockVideo_ = false;
99             break;
100         case MEDIA_TYPE_AV:
101             MEDIA_LOGD("wait Mixed, receiverId: %{public}u.", GetReceiverId());
102             notifyData_.wait(locker, [=]() { return dataReady_ || type != MEDIA_TYPE_AV; });
103             dataReady_ = false;
104             break;
105         default:
106             return 0;
107             break;
108     }
109 
110     bufferReader_->ClearDataBit(GetReceiverId(), type);
111     bufferReader_->ClearReadBit(GetReceiverId(), type);
112     MEDIA_LOGD("BufferDispatcher NotifyThreadWorker after wait  start read, receiverId: %{public}u.", GetReceiverId());
113     int32_t ret = bufferReader_->ReadBufferData(GetReceiverId(), type, cb);
114     bufferReader_->NotifyReadReady(GetReceiverId(), type);
115     dataReady_ = false;
116 
117     return ret;
118 }
119 
NotifyReadStart()120 void BufferReceiver::NotifyReadStart()
121 {
122     SHARING_LOGD("receiverId: %{public}u notify start read.", GetReceiverId());
123     firstARead_ = true;
124     firstVRead_ = true;
125     firstMRead_ = true;
126 }
127 
GetReceiverId()128 uint32_t BufferReceiver::GetReceiverId()
129 {
130     MEDIA_LOGD("trace.");
131     return GetId();
132 }
133 
GetDispatcherId()134 uint32_t BufferReceiver::GetDispatcherId()
135 {
136     SHARING_LOGD("trace.");
137     if (bufferReader_) {
138         return bufferReader_->GetDispatcherId();
139     }
140 
141     return 0;
142 }
143 
NotifyReadStop()144 void BufferReceiver::NotifyReadStop()
145 {
146     SHARING_LOGD("receiverId: %{public}u notify stop read.", GetReceiverId());
147     nonBlockAudio_ = true;
148     nonBlockVideo_ = true;
149     dataReady_ = true;
150     notifyAudio_.notify_all();
151     notifyVideo_.notify_all();
152     notifyData_.notify_all();
153 }
154 
EnableKeyMode(bool enable)155 void BufferReceiver::EnableKeyMode(bool enable)
156 {
157     SHARING_LOGD("bufferReceiver id %{public}u SetKeyOnlyMode %{public}d.", GetReceiverId(), enable);
158     if (keyOnly_ == true && enable == false) {
159         SHARING_LOGD("Set KeyOnlyMode false, need report fast read over.");
160         accelerationDone_ = true;
161     }
162 
163     keyOnly_ = enable;
164     if (bufferReader_ && enable) {
165         bufferReader_->ClearDataBit(GetReceiverId(), MEDIA_TYPE_VIDEO);
166     }
167 
168     auto listener = listener_.lock();
169     if (listener) {
170         listener->OnKeyModeNotify(enable);
171     }
172 }
173 
IsKeyMode()174 bool BufferReceiver::IsKeyMode()
175 {
176     MEDIA_LOGD("trace.");
177     return keyOnly_;
178 }
179 
IsKeyRedirect()180 bool BufferReceiver::IsKeyRedirect()
181 {
182     SHARING_LOGD("trace.");
183     return keyRedirect_;
184 }
185 
GetSPS()186 const MediaData::Ptr BufferReceiver::GetSPS()
187 {
188     MEDIA_LOGD("trace.");
189     if (bufferReader_) {
190         return bufferReader_->GetSPS();
191     }
192 
193     return nullptr;
194 }
195 
GetPPS()196 const MediaData::Ptr BufferReceiver::GetPPS()
197 {
198     MEDIA_LOGD("trace.");
199     if (bufferReader_) {
200         return bufferReader_->GetPPS();
201     }
202 
203     return nullptr;
204 }
205 
NeedAcceleration()206 bool BufferReceiver::NeedAcceleration()
207 {
208     MEDIA_LOGD("trace.");
209     return accelerationDone_;
210 }
211 
DisableAcceleration()212 void BufferReceiver::DisableAcceleration()
213 {
214     SHARING_LOGD("trace.");
215     accelerationDone_ = false;
216 }
217 
SendAccelerationDone()218 void BufferReceiver::SendAccelerationDone()
219 {
220     SHARING_LOGD("trace.");
221     auto listener = listener_.lock();
222     if (listener) {
223         listener->OnAccelerationDoneNotify();
224     }
225 }
226 
EnableKeyRedirect(bool enable)227 void BufferReceiver::EnableKeyRedirect(bool enable)
228 {
229     SHARING_LOGD("trace.");
230     if (bufferReader_ && enable) {
231         bufferReader_->EnableKeyRedirect(enable);
232     }
233     keyRedirect_ = enable;
234 }
235 
SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)236 void BufferReceiver::SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)
237 {
238     SHARING_LOGD("trace.");
239     listener_ = listener;
240 }
241 
242 using DataNotifier = BufferDispatcher::DataNotifier;
243 
NotifyDataReceiver(MediaType type)244 void DataNotifier::NotifyDataReceiver(MediaType type)
245 {
246     MEDIA_LOGD("trace.");
247     if (receiver_.lock() == nullptr) {
248         SHARING_LOGE("target receiver NOT exist.");
249         return;
250     }
251 
252     if (block_) {
253         return;
254     }
255 
256     MEDIA_LOGD("notify target type %{public}d.", type);
257     switch (type) {
258         case MEDIA_TYPE_AUDIO:
259             GetBufferReceiver()->OnAudioDataNotify();
260             break;
261         case MEDIA_TYPE_VIDEO:
262             GetBufferReceiver()->OnVideoDataNotify();
263             break;
264         case MEDIA_TYPE_AV:
265             GetBufferReceiver()->OnMediaDataNotify();
266             break;
267         default:
268             SHARING_LOGI("none process case.");
269             break;
270     }
271 }
272 
GetBufferReceiver()273 BufferReceiver::Ptr DataNotifier::GetBufferReceiver()
274 {
275     MEDIA_LOGD("trace.");
276     return receiver_.lock();
277 }
278 
GetReceiverId()279 uint32_t DataNotifier::GetReceiverId()
280 {
281     MEDIA_LOGD("trace.");
282     auto receiver = receiver_.lock();
283     if (receiver == nullptr) {
284         SHARING_LOGE("target receiver NOT exist.");
285         return INVALID_INDEX;
286     }
287 
288     return receiver->GetReceiverId();
289 }
290 
SetListenDispatcher(IBufferReader::Ptr dispatcher)291 void DataNotifier::SetListenDispatcher(IBufferReader::Ptr dispatcher)
292 {
293     SHARING_LOGD("trace.");
294     dispatcher_ = dispatcher;
295 }
296 
SetNotifyReceiver(BufferReceiver::Ptr receiver)297 void DataNotifier::SetNotifyReceiver(BufferReceiver::Ptr receiver)
298 {
299     SHARING_LOGD("trace.");
300     receiver_ = receiver;
301 }
302 
SetBlock()303 void DataNotifier::SetBlock()
304 {
305     SHARING_LOGD("trace.");
306     block_ = true;
307 }
308 
SetNeedUpdate(bool enable,MediaType type)309 void DataNotifier::SetNeedUpdate(bool enable, MediaType type)
310 {
311     MEDIA_LOGD("trace.");
312     if (type == MEDIA_TYPE_AUDIO) {
313         needUpdateAIndex = enable;
314     } else {
315         needUpdateVIndex = enable;
316     }
317 }
318 
DataAvailable(MediaType type)319 bool DataNotifier::DataAvailable(MediaType type)
320 {
321     MEDIA_LOGD("trace.");
322     auto dispatcher = dispatcher_.lock();
323     if (dispatcher == nullptr) {
324         SHARING_LOGE("target dispatcher NOT exist.");
325         return false;
326     }
327 
328     if (type == MEDIA_TYPE_AUDIO) {
329         return audioIndex != INVALID_INDEX &&
330                (audioIndex < dispatcher->GetLatestAudioIndex() || !dispatcher->IsRead(GetReceiverId(), audioIndex + 1));
331     } else if (type == MEDIA_TYPE_VIDEO) {
332         return videoIndex != INVALID_INDEX &&
333                (videoIndex < dispatcher->GetLatestVideoIndex() || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
334     } else {
335         return videoIndex != INVALID_INDEX &&
336                (videoIndex < dispatcher->GetBufferSize() - 1 || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
337     }
338 
339     return false;
340 }
341 
IsMixedReceiver()342 bool DataNotifier::IsMixedReceiver()
343 {
344     MEDIA_LOGD("trace.");
345     auto receiver = receiver_.lock();
346     if (receiver == nullptr) {
347         SHARING_LOGE("target receiver NOT exist.");
348         return false;
349     }
350 
351     return receiver->IsMixedReceiver();
352 }
353 
GetReceiverReadIndex(MediaType type)354 uint32_t DataNotifier::GetReceiverReadIndex(MediaType type)
355 {
356     MEDIA_LOGD("trace.");
357     switch (type) {
358         case MEDIA_TYPE_VIDEO:
359             MEDIA_LOGD("Video Recvid:%{public}d index: %{public}d.", GetReceiverId(), videoIndex);
360             return videoIndex;
361             break;
362         case MEDIA_TYPE_AUDIO:
363             MEDIA_LOGD("Audio Recvid:%{public}d index: %{public}d.", GetReceiverId(), audioIndex);
364             return audioIndex;
365             break;
366         case MEDIA_TYPE_AV:
367             MEDIA_LOGD("Mixed Recvid:%{public}d vindex: %{public}d  aindex: %{public}d.", GetReceiverId(), videoIndex,
368                        audioIndex);
369             if (audioIndex != INVALID_INDEX && videoIndex != INVALID_INDEX) {
370                 return audioIndex <= videoIndex ? audioIndex : videoIndex;
371             } else if (audioIndex == INVALID_INDEX && videoIndex == INVALID_INDEX) {
372                 return INVALID_INDEX;
373             } else {
374                 return audioIndex == INVALID_INDEX ? videoIndex : audioIndex;
375             }
376             break;
377         default:
378             return INVALID_INDEX;
379             break;
380     }
381 }
382 
IsKeyModeReceiver()383 bool DataNotifier::IsKeyModeReceiver()
384 {
385     MEDIA_LOGD("trace.");
386     auto receiver = receiver_.lock();
387     if (receiver) {
388         return receiver->IsKeyMode();
389     }
390 
391     return false;
392 }
393 
IsKeyRedirectReceiver()394 bool DataNotifier::IsKeyRedirectReceiver()
395 {
396     SHARING_LOGD("trace.");
397     auto receiver = receiver_.lock();
398     if (receiver) {
399         return receiver->IsKeyRedirect();
400     }
401 
402     return false;
403 }
404 
NeedAcceleration()405 bool DataNotifier::NeedAcceleration()
406 {
407     MEDIA_LOGD("trace.");
408     auto receiver = receiver_.lock();
409     if (receiver == nullptr) {
410         SHARING_LOGE("target receiver NOT exist.");
411         return false;
412     }
413 
414     return receiver->NeedAcceleration();
415 }
416 
SendAccelerationDone()417 void DataNotifier::SendAccelerationDone()
418 {
419     SHARING_LOGD("trace.");
420     auto receiver = receiver_.lock();
421     if (receiver == nullptr) {
422         SHARING_LOGE("target receiver NOT exist.");
423         return;
424     }
425 
426     receiver->SendAccelerationDone();
427     receiver->DisableAcceleration();
428 }
429 
BufferDispatcher(uint32_t maxCapacity,uint32_t capacityIncrement)430 BufferDispatcher::BufferDispatcher(uint32_t maxCapacity, uint32_t capacityIncrement)
431 {
432     SHARING_LOGD("BufferDispatcher ctor, set capacity: %{public}u.", maxCapacity);
433     maxBufferCapacity_ = maxCapacity;
434     bufferCapacityIncrement_ = capacityIncrement;
435     {
436         std::lock_guard<std::mutex> lock(idleMutex_);
437         idleAudioBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
438         idleVideoBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
439         for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
440             MediaData::Ptr adata = std::make_shared<MediaData>();
441             MediaData::Ptr vdata = std::make_shared<MediaData>();
442             adata->buff = std::make_shared<DataBuffer>();
443             vdata->buff = std::make_shared<DataBuffer>();
444             idleAudioBuffer_.push_back(adata);
445             idleVideoBuffer_.push_back(vdata);
446         }
447     }
448 
449     writingTimer_ = std::make_unique<TimeoutTimer>("dispatcher-writing-timer");
450 
451     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
452     circularBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
453     StartDispatch();
454 }
455 
~BufferDispatcher()456 BufferDispatcher::~BufferDispatcher()
457 {
458     SHARING_LOGI("BufferDispatcher dtor.");
459     running_ = false;
460     StopDispatch();
461     FlushBuffer();
462     ReleaseIdleBuffer();
463     ReleaseAllReceiver();
464 }
465 
StartDispatch()466 void BufferDispatcher::StartDispatch()
467 {
468     SHARING_LOGD("trace.");
469     running_ = true;
470     notifyThread_ = std::thread(&BufferDispatcher::NotifyThreadWorker, this);
471     std::string name = "notifyThread";
472     pthread_setname_np(notifyThread_.native_handle(), name.c_str());
473 }
474 
StopDispatch()475 void BufferDispatcher::StopDispatch()
476 {
477     SHARING_LOGD("trace.");
478     running_ = false;
479     continueNotify_ = true;
480 
481     if (writingTimer_) {
482         writingTimer_.reset();
483     }
484 
485     dataCV_.notify_all();
486     if (notifyThread_.joinable()) {
487         notifyThread_.join();
488     }
489 }
490 
SetBufferCapacity(size_t capacity)491 void BufferDispatcher::SetBufferCapacity(size_t capacity)
492 {
493     SHARING_LOGD("trace.");
494     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
495     circularBuffer_.set_capacity(capacity);
496 }
497 
SetDataMode(MediaDispacherMode dataMode)498 void BufferDispatcher::SetDataMode(MediaDispacherMode dataMode)
499 {
500     SHARING_LOGD("trace.");
501     dataMode_ = dataMode;
502 }
503 
ReleaseIdleBuffer()504 void BufferDispatcher::ReleaseIdleBuffer()
505 {
506     SHARING_LOGD("BufferDispatcher idle Release Start.");
507     std::unique_lock<std::mutex> locker(idleMutex_);
508     for (auto &data : idleAudioBuffer_) {
509         if (data != nullptr || data->buff != nullptr) {
510             data->buff.reset();
511         }
512     }
513 
514     idleAudioBuffer_.clear();
515     for (auto &data : idleVideoBuffer_) {
516         if (data != nullptr || data->buff != nullptr) {
517             data->buff.reset();
518         }
519     }
520 
521     idleVideoBuffer_.clear();
522     SHARING_LOGD("BufferDispatcher idle Release End.");
523 }
524 
FlushBuffer()525 void BufferDispatcher::FlushBuffer()
526 {
527     SHARING_LOGD("BufferDispatcher Start flushing, dispatcherId: %{public}u.", GetDispatcherId());
528     {
529         std::lock_guard<std::mutex> lock(idleMutex_);
530         idleAudioBuffer_.clear();
531         idleVideoBuffer_.clear();
532         for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
533             MediaData::Ptr adata = std::make_shared<MediaData>();
534             MediaData::Ptr vdata = std::make_shared<MediaData>();
535             adata->buff = std::make_shared<DataBuffer>();
536             vdata->buff = std::make_shared<DataBuffer>();
537             idleAudioBuffer_.push_back(adata);
538             idleVideoBuffer_.push_back(vdata);
539         }
540     }
541 
542     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
543     for (auto &data : circularBuffer_) {
544         if (data->mediaData != nullptr && data->mediaData->buff != nullptr) {
545             data->mediaData->buff.reset();
546         }
547     }
548 
549     circularBuffer_.clear();
550     waitingKey_ = true;
551     gop_ = 0;
552     audioFrameCnt_ = 0;
553     videoFrameCnt_ = 0;
554     ResetAllIndex();
555     SHARING_LOGD("BufferDispatcher Dispatcher flushing end, dispatcherId: %{public}u.", GetDispatcherId());
556 }
557 
RequestDataBuffer(MediaType type,uint32_t size)558 MediaData::Ptr BufferDispatcher::RequestDataBuffer(MediaType type, uint32_t size)
559 {
560     SHARING_LOGD("trace.");
561     std::lock_guard<std::mutex> lock(idleMutex_);
562     if (size <= 0) {
563         SHARING_LOGE("Size invalid.");
564         return nullptr;
565     }
566 
567     MediaData::Ptr retData;
568     if (type == MEDIA_TYPE_VIDEO) {
569         if (!idleVideoBuffer_.empty()) {
570             SHARING_LOGD("video From idle.");
571             retData = idleVideoBuffer_.front();
572             idleVideoBuffer_.pop_front();
573             if (retData == nullptr) {
574                 MEDIA_LOGW("video From alloc when idle nullptr.");
575                 retData = std::make_shared<MediaData>();
576             }
577             return retData;
578         }
579     } else {
580         if (!idleAudioBuffer_.empty()) {
581             SHARING_LOGD("Audio From idle.");
582             retData = idleAudioBuffer_.front();
583             idleAudioBuffer_.pop_front();
584             if (retData == nullptr) {
585                 MEDIA_LOGW("Audio From alloc when idle nullptr.");
586                 retData = std::make_shared<MediaData>();
587             }
588             return retData;
589         }
590     }
591 
592     SHARING_LOGD("Audio/Video From alloc.");
593     retData = std::make_shared<MediaData>();
594     return retData;
595 }
596 
ReturnIdleBuffer(DataSpec::Ptr & data)597 void BufferDispatcher::ReturnIdleBuffer(DataSpec::Ptr &data)
598 {
599     MEDIA_LOGD("trace.");
600     std::lock_guard<std::mutex> lock(idleMutex_);
601     if (data == nullptr || data->mediaData == nullptr) {
602         return;
603     }
604     if (data->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
605         if (idleVideoBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
606             idleVideoBuffer_.push_back(data->mediaData);
607             MEDIA_LOGD("data: push_back in idleVideoBuffer_, size: %{public}zu.", idleVideoBuffer_.size());
608         }
609     } else {
610         if (idleAudioBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
611             idleAudioBuffer_.push_back(data->mediaData);
612             MEDIA_LOGD("data: push_back in idleAudioBuffer_, size: %{public}zu.", idleAudioBuffer_.size());
613         }
614     }
615 
616     data.reset();
617 }
618 
GetBufferSize()619 size_t BufferDispatcher::GetBufferSize()
620 {
621     SHARING_LOGD("trace.");
622     return circularBuffer_.size();
623 }
624 
FindReceiverIndex(uint32_t receiverId)625 uint32_t BufferDispatcher::FindReceiverIndex(uint32_t receiverId)
626 {
627     MEDIA_LOGD("trace.");
628     if (notifiers_.find(receiverId) != notifiers_.end()) {
629         return notifiers_[receiverId]->GetReadIndex();
630     }
631 
632     return INVALID_INDEX;
633 }
634 
IsRecevierExist(uint32_t receiverId)635 bool BufferDispatcher::IsRecevierExist(uint32_t receiverId)
636 {
637     SHARING_LOGD("trace.");
638     auto notifier = GetNotifierByReceiverId(receiverId);
639     if (notifier == nullptr) {
640         return false;
641     }
642 
643     return true;
644 }
645 
EnableKeyMode(bool enable)646 void BufferDispatcher::EnableKeyMode(bool enable)
647 {
648     SHARING_LOGD("trace.");
649     keyOnly_ = enable;
650 }
651 
AttachReceiver(BufferReceiver::Ptr receiver)652 int32_t BufferDispatcher::AttachReceiver(BufferReceiver::Ptr receiver)
653 {
654     SHARING_LOGD("trace.");
655     if (receiver == nullptr) {
656         return -1;
657     }
658 
659     if (IsRecevierExist(receiver->GetReceiverId())) {
660         SHARING_LOGE("Exist.");
661         return 0;
662     }
663 
664     receiver->NotifyReadStart();
665     std::lock_guard<std::mutex> locker(notifyMutex_);
666     if (readRefFlag_ == 0xFFFF) {
667         SHARING_LOGE("readRefFlag limited.");
668         return -1;
669     }
670 
671     DataNotifier::Ptr notifier = std::make_shared<DataNotifier>();
672     notifier->SetListenDispatcher(shared_from_this());
673     notifier->SetNotifyReceiver(receiver);
674 
675     auto usableRef = ~readRefFlag_ & (-(~readRefFlag_));
676 
677     if ((usableRef & (usableRef - 1)) != 0) {
678         SHARING_LOGE("usableRef: %{public}d invalid.", usableRef);
679         return -1;
680     }
681 
682     readRefFlag_ |= usableRef;
683     notifier->SetReadIndex(static_cast<uint32_t>(log2(usableRef)));
684     SHARING_LOGI("receiverId: %{public}d, readIndex: %{public}d, usableRef: %{public}d, readRefFlag_: %{public}d.",
685                  receiver->GetReceiverId(), notifier->GetReadIndex(), usableRef, readRefFlag_);
686     receiver->SetSource(shared_from_this());
687     notifiers_.emplace(receiver->GetReceiverId(), notifier);
688 
689     if (circularBuffer_.empty()) {
690         notifier->audioIndex = INVALID_INDEX;
691         notifier->videoIndex = INVALID_INDEX;
692         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
693         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
694         SHARING_LOGD("BufferDispatcher Attach when buffer empty  RecvId: %{public}d.", receiver->GetReceiverId());
695         videoNeedActivate_ = true;
696         audioNeedActivate_ = true;
697         return 0;
698     }
699 
700     if (dataMode_ == MEDIA_AUDIO_ONLY) {
701         notifier->audioIndex = circularBuffer_.size() - 1;
702         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
703         notifier->videoIndex = INVALID_INDEX;
704         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
705         SHARING_LOGD("BufferDispatcher Attach when Keyindex List empty  RecvId: %{public}d.",
706                      receiver->GetReceiverId());
707     } else {
708         if (!keyIndexList_.empty()) {
709             SHARING_LOGD("BufferDispatcher Attach with Keyindex  RecvId: %{public}d  KeyIndex:%{public}d.",
710                          receiver->GetReceiverId(), keyIndexList_.back());
711             uint32_t tempIndex = FindNextIndex(keyIndexList_.back(), MEDIA_TYPE_AUDIO);
712             notifier->audioIndex = tempIndex == keyIndexList_.back() ? INVALID_INDEX : tempIndex;
713             notifier->videoIndex = keyIndexList_.back();
714             bool isAudioReady = tempIndex != INVALID_INDEX ? true : false;
715             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, isAudioReady);
716             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, true);
717             if (lastAudioIndex_ == INVALID_INDEX) {
718                 audioNeedActivate_ = true;
719             }
720         } else {
721             SHARING_LOGD("BufferDispatcher Attach with Non Keyindex Exist RecvId: %{public}d.",
722                          receiver->GetReceiverId());
723             uint32_t tempIndex = FindLastIndex(MEDIA_TYPE_AUDIO);
724             notifier->audioIndex = tempIndex;
725             notifier->videoIndex = INVALID_INDEX;
726             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
727             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
728             if (lastAudioIndex_ == INVALID_INDEX) {
729                 audioNeedActivate_ = true;
730             }
731         }
732     }
733 
734     return 0;
735 }
736 
DetachReceiver(BufferReceiver::Ptr receiver)737 int32_t BufferDispatcher::DetachReceiver(BufferReceiver::Ptr receiver)
738 {
739     SHARING_LOGI("buffer dispatcher: Detach receiver in.");
740     if (receiver == nullptr) {
741         SHARING_LOGE("buffer dispatcher: Detach receiver failed - null receiver.");
742         return -1;
743     }
744 
745     if (!IsRecevierExist(receiver->GetReceiverId())) {
746         SHARING_LOGE("BufferDispatcher AttachReceiver No Vaild Recevier Exist.");
747         return 0;
748     }
749 
750     auto notifier = GetNotifierByReceiverPtr(receiver);
751     if (notifier == nullptr) {
752         SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
753         return -1;
754     }
755 
756     std::lock_guard<std::mutex> locker(notifyMutex_);
757     notifier->SetBlock();
758     SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
759     SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
760 
761     readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
762     notifiers_.erase(receiver->GetReceiverId());
763     SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
764     return 0;
765 }
766 
DetachReceiver(uint32_t receiverId,DataNotifier::Ptr notifier)767 int32_t BufferDispatcher::DetachReceiver(uint32_t receiverId, DataNotifier::Ptr notifier)
768 {
769     SHARING_LOGI("buffer dispatcher: Detach notifier in.");
770     if (notifier == nullptr) {
771         SHARING_LOGE("buffer dispatcher: Detach receiver failed - null notifier.");
772         return -1;
773     }
774     notifier->SetBlock();
775     SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
776     SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
777 
778     readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
779     notifiers_.erase(receiverId);
780     SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
781     return 0;
782 }
783 
ReleaseAllReceiver()784 void BufferDispatcher::ReleaseAllReceiver()
785 {
786     SHARING_LOGD("trace.");
787     std::lock_guard<std::mutex> locker(notifyMutex_);
788     for (auto it = notifiers_.begin(); it != notifiers_.end();) {
789         auto notifier = it->second;
790         if (notifier == nullptr) {
791             ++it;
792             continue;
793         }
794 
795         auto receiver = notifier->GetBufferReceiver();
796         if (receiver == nullptr) {
797             ++it;
798             continue;
799         }
800 
801         auto receiverId = receiver->GetReceiverId();
802         if (notifiers_.find(receiverId) != notifiers_.end()) {
803             auto notifierFind = notifiers_[receiverId];
804             ++it;
805             DetachReceiver(receiverId, notifierFind);
806         } else {
807             ++it;
808             SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
809         }
810     }
811 
812     notifiers_.clear();
813     SHARING_LOGD("release all receiver out.");
814 }
815 
SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)816 void BufferDispatcher::SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)
817 {
818     SHARING_LOGD("trace.");
819     listener_ = listener;
820     RETURN_IF_NULL(writingTimer_);
821     writingTimer_->StartTimer(
822         WRITING_TIMTOUT, "waiting for continuous data inputs",
823         [this]() {
824             if (!writing_) {
825                 SHARING_LOGI("writing timeout");
826                 auto listener = listener_.lock();
827                 if (listener) {
828                     listener->OnWriteTimeout();
829                 }
830             } else {
831                 SHARING_LOGI("restart timer");
832                 writing_ = false;
833             }
834         },
835         true);
836 }
837 
GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)838 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)
839 {
840     SHARING_LOGD("trace.");
841     if (receiver == nullptr) {
842         return nullptr;
843     }
844     return GetNotifierByReceiverId(receiver->GetReceiverId());
845 }
846 
GetNotifierByReceiverId(uint32_t receiverId)847 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverId(uint32_t receiverId)
848 {
849     MEDIA_LOGD("trace.");
850     std::lock_guard<std::mutex> locker(notifyMutex_);
851     if (notifiers_.find(receiverId) != notifiers_.end()) {
852         return notifiers_[receiverId];
853     }
854 
855     return nullptr;
856 }
857 
ReadBufferData(uint32_t receiverId,MediaType type,std::function<void (const MediaData::Ptr & data)> cb)858 int32_t BufferDispatcher::ReadBufferData(uint32_t receiverId, MediaType type,
859                                          std::function<void(const MediaData::Ptr &data)> cb)
860 {
861     MEDIA_LOGD("in, receiverId: %{public}u.", receiverId);
862     auto notifier = GetNotifierByReceiverId(receiverId);
863     if (notifier == nullptr) {
864         SHARING_LOGE("notifier is nullptr.");
865         return -1;
866     }
867 
868     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
869     uint32_t readIndex = notifier->GetReceiverReadIndex(type);
870     if (readIndex >= circularBuffer_.size()) {
871         SHARING_LOGE("Read wrong index exceed size.");
872         return -1;
873     }
874 
875     if ((keyOnly_ || notifier->IsKeyModeReceiver()) && type == MEDIA_TYPE_VIDEO &&
876         !IsKeyVideoFrame(circularBuffer_.at(readIndex))) {
877         UpdateReceiverReadIndex(receiverId, readIndex, type);
878         SHARING_LOGE("Read Non Key Video in KeyOnly Mode index: %{public}u.", readIndex);
879         return -1;
880     }
881 
882     if (IsDataReaded(receiverId, circularBuffer_.at(readIndex))) {
883         UpdateReceiverReadIndex(receiverId, readIndex, type);
884         return -1;
885     }
886 
887     readIndex = notifier->GetReceiverReadIndex(type);
888     if (readIndex >= circularBuffer_.size()) {
889         return -1;
890     }
891 
892     auto data = circularBuffer_.at(readIndex);
893     if (data == nullptr) {
894         SHARING_LOGE("BufferDispatcher Read data nullptr.");
895         return -1;
896     }
897 
898     if (IsKeyVideoFrame(data)) {
899         int32_t bufferVideoCacheCnt = 0;
900         for (size_t i = readIndex + 1; i < circularBuffer_.size(); i++) {
901             if (circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO)
902                 bufferVideoCacheCnt++;
903         }
904         MEDIA_LOGD("TEST STATISTIC:interval: buffer cache %{public}d frames.", bufferVideoCacheCnt);
905     }
906 
907     SetReceiverReadFlag(receiverId, data);
908     if (cb != nullptr) {
909         cb(data->mediaData);
910     }
911 
912     MEDIA_LOGD("Current data readed, Recvid:%{public}d, remain %{public}zu data, readIndex: %{public}u, "
913                "readtype: %{public}d, diff: %{public}zu.",
914                receiverId, circularBuffer_.size(), readIndex, int32_t(type), circularBuffer_.size() - readIndex);
915     UpdateReceiverReadIndex(receiverId, readIndex, type);
916     return 0;
917 }
918 
InputData(const MediaData::Ptr & data)919 int32_t BufferDispatcher::InputData(const MediaData::Ptr &data)
920 {
921     if (data == nullptr || data->buff == nullptr) {
922         SHARING_LOGE("data nullptr.");
923         return -1;
924     }
925     MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".", data->mediaType,
926                data->keyFrame, data->pts);
927 
928     if (!writing_) {
929         writing_ = true;
930     }
931 
932     DataSpec::Ptr dataSpec = std::make_shared<DataSpec>();
933     dataSpec->mediaData = data;
934     if (dataMode_ == MEDIA_AUDIO_ONLY) {
935         WriteDataIntoBuffer(dataSpec);
936     } else {
937         PreProcessDataSpec(dataSpec);
938     }
939 
940     if (circularBuffer_.size() > 0) {
941         MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".",
942                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->mediaType,
943                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->keyFrame,
944                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->pts);
945     }
946 
947     MEDIA_LOGD("dispatcherId: %{public}u, after InputData, current circularBuffer_ size: %{public}zu, "
948                "idleVideoBuffer_ size: %{public}zu, idle_audioBuffer_ size: %{public}zu, "
949                "keyFrame: %{public}s, data size: %{public}d, adataCount:%{public}d.",
950                GetDispatcherId(), circularBuffer_.size(), idleVideoBuffer_.size(), idleAudioBuffer_.size(),
951                data->keyFrame ? "true" : "false", data->buff->Size(), audioFrameCnt_);
952     return 0;
953 }
954 
PreProcessDataSpec(const DataSpec::Ptr & dataSpec)955 void BufferDispatcher::PreProcessDataSpec(const DataSpec::Ptr &dataSpec)
956 {
957     MEDIA_LOGD("trace.");
958     if (waitingKey_) {
959         if (IsAudioData(dataSpec)) {
960         } else if (!IsKeyVideoFrame(dataSpec)) {
961             SHARING_LOGD("BufferDispatcher Waiting First Key Video Frame.");
962             return;
963         } else {
964             SHARING_LOGD("BufferDispatcher received first key video frame and restore from uncontinuous...Flushing.");
965             FlushBuffer();
966             baseCounter_++;
967             capacityEvaluating_ = true;
968             waitingKey_ = false;
969         }
970     } else {
971         if (capacityEvaluating_) {
972             ReCalculateCapacity(IsKeyVideoFrame(dataSpec));
973         }
974     }
975 
976     WriteDataIntoBuffer(dataSpec);
977 }
978 
WriteDataIntoBuffer(const DataSpec::Ptr & data)979 int32_t BufferDispatcher::WriteDataIntoBuffer(const DataSpec::Ptr &data)
980 {
981     MEDIA_LOGD("trace.");
982     if (data->mediaData == nullptr || data->mediaData->buff == nullptr) {
983         SHARING_LOGE("null data.");
984         return -1;
985     }
986 
987     if (NeedExtendToDBCapacity()) {
988         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d  CRTL_SIZE.", doubleBufferCapacity_);
989         SetBufferCapacity(doubleBufferCapacity_);
990     }
991 
992     if (NeedRestoreToNormalCapacity()) {
993         std::unique_lock<std::shared_mutex> locker(bufferMutex_);
994         int32_t popSize = circularBuffer_.size() - INITIAL_BUFFER_CAPACITY;
995         for (int32_t i = 0; i < popSize; i++) {
996             if (HeadFrameNeedReserve()) {
997                 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
998                            "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
999                            GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1000                            circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1001                            circularBuffer_.front()->mediaData->pts);
1002             }
1003 
1004             MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: "
1005                        "%{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1006                        GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1007                        circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1008                        circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1009             circularBuffer_.pop_front();
1010             audioFrameCnt_--;
1011             UpdateIndex();
1012         }
1013 
1014         baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1015         doubleBufferCapacity_ = INITIAL_BUFFER_CAPACITY * 2; // 2 : increasement
1016         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d  NORMALSIZE.", baseBufferCapacity_);
1017         circularBuffer_.set_capacity(baseBufferCapacity_);
1018     }
1019 
1020     if (IsKeyVideoFrame(data) && !keyOnly_) {
1021         EraseOldGopDatas();
1022     }
1023 
1024     bool updateIndexFlag = false;
1025     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1026     if (circularBuffer_.size() >= circularBuffer_.capacity()) {
1027         updateIndexFlag = true;
1028     }
1029 
1030     if (updateIndexFlag) {
1031         uint32_t nextDeleteIndex = 1;
1032         if (IsVideoData(data)) {
1033             nextDeleteIndex = FindNextDeleteVideoIndex();
1034         }
1035 
1036         for (size_t i = 0; i <= nextDeleteIndex; i++) {
1037             MediaType headType = circularBuffer_.front()->mediaData->mediaType;
1038             DataSpec::Ptr retBuff = circularBuffer_.front();
1039             if (HeadFrameNeedReserve()) {
1040                 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1041                            "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1042                            GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1043                            retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1044             }
1045 
1046             MEDIA_LOGW("dispatcherId: %{public}u, delete data, mediaType: %{public}d, "
1047                        "keyFrame: %{public}s, pts: %{public}" PRIu64 ", reserveFlag: %{public}x.",
1048                        GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1049                        circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1050                        circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1051 
1052             circularBuffer_.pop_front();
1053             ReturnIdleBuffer(retBuff);
1054             headType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1055             UpdateIndex();
1056         }
1057     }
1058 
1059     data->reserveFlag = 0;
1060     MEDIA_LOGD("WriteDataIntoBuffer data type: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1061                ", cur_size: %{public}zu, capacity: %{public}zu dispatcher[%{public}u].",
1062                int32_t(data->mediaData->mediaType), data->mediaData->keyFrame ? "true" : "false", data->mediaData->pts,
1063                circularBuffer_.size(), circularBuffer_.capacity(), GetDispatcherId());
1064     circularBuffer_.push_back(data);
1065     if (IsAudioData(data)) {
1066         lastAudioIndex_ = circularBuffer_.size() - 1;
1067         ActiveDataRef(MEDIA_TYPE_AUDIO, false);
1068         audioFrameCnt_++;
1069     } else {
1070         lastVideoIndex_ = circularBuffer_.size() - 1;
1071         if (!keyOnly_ || (keyOnly_ && IsKeyVideoFrame(data))) {
1072             ActiveDataRef(MEDIA_TYPE_VIDEO, IsKeyVideoFrame(data));
1073         }
1074         videoFrameCnt_++;
1075     }
1076 
1077     if (audioNeedActivate_ && IsAudioData(data)) {
1078         MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By AudioData.");
1079         ActivateReceiverIndex(circularBuffer_.size() - 1, MEDIA_TYPE_AUDIO);
1080     }
1081 
1082     if (IsKeyVideoFrame(data)) {
1083         uint32_t keyIndex = circularBuffer_.size() - 1;
1084         {
1085             std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1086             keyIndexList_.push_back(keyIndex);
1087         }
1088         if (videoNeedActivate_) {
1089             MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By KeyVideo Frame index: %{public}d.", keyIndex);
1090             ActivateReceiverIndex(keyIndex, MEDIA_TYPE_VIDEO);
1091         }
1092         if (keyRedirect_) {
1093             OnKeyRedirect();
1094             EnableKeyRedirect(false);
1095         }
1096     }
1097 
1098     continueNotify_ = true;
1099     dataCV_.notify_one();
1100     return 0;
1101 }
1102 
EraseOldGopDatas()1103 void BufferDispatcher::EraseOldGopDatas()
1104 {
1105     MEDIA_LOGD("BufferDispatcher Delete old datas In.");
1106     if (dataMode_ == MEDIA_AUDIO_ONLY) {
1107         FlushBuffer();
1108         return;
1109     }
1110 
1111     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1112     uint32_t nextKey = 0;
1113     {
1114         std::lock_guard<std::mutex> lock(notifyMutex_);
1115         if (!keyIndexList_.empty() && keyIndexList_.back() > 0) {
1116             MEDIA_LOGD("find next key listsize %{public}zu, back:%{public}d.", keyIndexList_.size(),
1117                        keyIndexList_.back());
1118             nextKey = keyIndexList_.back();
1119             keyIndexList_.clear();
1120             keyIndexList_.push_back(nextKey);
1121         }
1122     }
1123 
1124     MEDIA_LOGD("erase between 0 to next Video Frame %{public}d.", nextKey);
1125     DeleteHeadDatas(nextKey, false);
1126     nextKey = FindNextDeleteVideoIndex();
1127     DeleteHeadDatas(nextKey, true);
1128     std::string indexs;
1129 
1130     MEDIA_LOGD("circularBuffer_ size: %{public}zu.", circularBuffer_.size());
1131     for (auto &keyIndex : keyIndexList_) {
1132         indexs += std::to_string(keyIndex) + ", ";
1133         MEDIA_LOGD("keyIndex update to %{public}d.", keyIndex);
1134     }
1135 
1136     MEDIA_LOGD("current keyIndex: %{public}s.", indexs.c_str());
1137 }
1138 
DeleteHeadDatas(uint32_t size,bool forceDelete)1139 void BufferDispatcher::DeleteHeadDatas(uint32_t size, bool forceDelete)
1140 {
1141     MEDIA_LOGD("trace.");
1142     if (size <= 0) {
1143         MEDIA_LOGW("invalid Size, dispatcherId: %{public}u!", GetDispatcherId());
1144         return;
1145     }
1146 
1147     for (size_t i = 0; i < size; i++) {
1148         if (HeadFrameNeedReserve() && !forceDelete) {
1149             MEDIA_LOGD("index %{public}zu need reserve.", i);
1150             break;
1151         }
1152         if (circularBuffer_.empty()) {
1153             return;
1154         }
1155         DataSpec::Ptr retBuff = circularBuffer_.front();
1156         MEDIA_LOGD("BufferDispatcher pop out headtype  %{public}d.", retBuff->mediaData->mediaType);
1157         retBuff->mediaData->mediaType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1158         if (HeadFrameNeedReserve()) {
1159             MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1160                        "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1161                        GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1162                        retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1163         }
1164 
1165         MEDIA_LOGW(
1166             "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1167             ", reserveFlag: %{public}x.",
1168             GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1169             circularBuffer_.front()->mediaData->keyFrame ? "true" : "false", circularBuffer_.front()->mediaData->pts,
1170             circularBuffer_.front()->reserveFlag.load());
1171         circularBuffer_.pop_front();
1172         ReturnIdleBuffer(retBuff);
1173         UpdateIndex();
1174     }
1175 
1176     if (circularBuffer_.size() < baseBufferCapacity_ && circularBuffer_.capacity() > baseBufferCapacity_) {
1177         MEDIA_LOGD("capacity return to base %{public}d.", baseBufferCapacity_);
1178         circularBuffer_.set_capacity(baseBufferCapacity_);
1179     }
1180 }
1181 
UpdateIndex()1182 void BufferDispatcher::UpdateIndex()
1183 {
1184     MEDIA_LOGD("trace.");
1185     std::lock_guard<std::mutex> locker(notifyMutex_);
1186     if (!keyIndexList_.empty() && keyIndexList_.front() == 0) {
1187         keyIndexList_.pop_front();
1188         MEDIA_LOGD("BufferDispatcher pop out first  0 keyIndex after listsize %{public}zu.", keyIndexList_.size());
1189     }
1190 
1191     for (auto &keyIndex : keyIndexList_) {
1192         if (keyIndex > 0) {
1193             keyIndex--;
1194         }
1195     }
1196 
1197     for (auto &[recvId, notifier] : notifiers_) {
1198         if (notifier->videoIndex > 0 && notifier->videoIndex != INVALID_INDEX) {
1199             notifier->videoIndex--;
1200         }
1201         if (notifier->audioIndex > 0 && notifier->audioIndex != INVALID_INDEX) {
1202             notifier->audioIndex--;
1203         }
1204     }
1205 
1206     if (lastVideoIndex_ > 0 && lastVideoIndex_ != INVALID_INDEX) {
1207         lastVideoIndex_--;
1208     }
1209 
1210     if (lastAudioIndex_ > 0 && lastAudioIndex_ != INVALID_INDEX) {
1211         lastAudioIndex_--;
1212     }
1213 }
1214 
FindNextDeleteVideoIndex()1215 uint32_t BufferDispatcher::FindNextDeleteVideoIndex()
1216 {
1217     MEDIA_LOGD("trace.");
1218     for (size_t i = 0; i < circularBuffer_.size(); i++) {
1219         if (circularBuffer_[i]->mediaData != nullptr && circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
1220             return i;
1221         }
1222     }
1223 
1224     return 0;
1225 }
1226 
FindLastIndex(MediaType type)1227 uint32_t BufferDispatcher::FindLastIndex(MediaType type)
1228 {
1229     SHARING_LOGD("trace.");
1230     if (circularBuffer_.empty()) {
1231         return INVALID_INDEX;
1232     }
1233 
1234     return type == MEDIA_TYPE_AUDIO ? lastAudioIndex_ : lastVideoIndex_;
1235 }
1236 
UpdateReceiverReadIndex(uint32_t receiverId,const uint32_t readIndex,MediaType type)1237 void BufferDispatcher::UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type)
1238 {
1239     MEDIA_LOGD("trace.");
1240     uint32_t nextIndex = FindNextIndex(readIndex, type, receiverId);
1241     bool noAvaliableData = false;
1242     if (nextIndex == readIndex) {
1243         noAvaliableData = true;
1244     }
1245 
1246     auto notifier = GetNotifierByReceiverId(receiverId);
1247     if (notifier == nullptr) {
1248         SHARING_LOGE("notifier is nullptr.");
1249         return;
1250     }
1251 
1252     bool readOver = circularBuffer_.size() - readIndex < 3;
1253     if (readOver && notifier->NeedAcceleration() && type == MEDIA_TYPE_VIDEO) {
1254         SHARING_LOGD("BufferDispatcher SendAccelerationDone.");
1255         notifier->SendAccelerationDone();
1256     }
1257 
1258     notifier->SetNeedUpdate(noAvaliableData, type);
1259 
1260     if (type == MEDIA_TYPE_VIDEO) {
1261         notifier->videoIndex = nextIndex;
1262     } else if (type == MEDIA_TYPE_AUDIO) {
1263         notifier->audioIndex = nextIndex;
1264     } else {
1265         notifier->videoIndex = nextIndex;
1266         notifier->audioIndex = nextIndex;
1267     }
1268 
1269     MEDIA_LOGD("After UpdateReceiverReadIndex  type %{public}d, aindex %{public}d, vindex %{public}d.", type,
1270                notifier->audioIndex, notifier->videoIndex);
1271 }
1272 
FindNextIndex(uint32_t index,MediaType type)1273 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type)
1274 {
1275     MEDIA_LOGD("trace.");
1276     if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1277         return index;
1278     }
1279 
1280     if (type == MEDIA_TYPE_AV) {
1281         return index + 1;
1282     }
1283 
1284     for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1285         if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1286             if (keyOnly_ && type == MEDIA_TYPE_VIDEO && !IsKeyVideoFrame(circularBuffer_[i])) {
1287                 continue;
1288             } else {
1289                 return i;
1290             }
1291         }
1292     }
1293 
1294     return index;
1295 }
1296 
FindNextIndex(uint32_t index,MediaType type,uint32_t receiverId)1297 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId)
1298 {
1299     MEDIA_LOGD("trace.");
1300     if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1301         return index;
1302     }
1303 
1304     if (type == MEDIA_TYPE_AV) {
1305         return index + 1;
1306     }
1307 
1308     auto notifier = GetNotifierByReceiverId(receiverId);
1309     if (notifier == nullptr) {
1310         SHARING_LOGE("FindNextIndex GetNotifier nullptr.");
1311         return INVALID_INDEX;
1312     }
1313 
1314     bool keyModeReceiver = notifier->IsKeyModeReceiver();
1315     for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1316         if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1317             if ((keyOnly_ || keyModeReceiver) && type == MEDIA_TYPE_VIDEO) {
1318                 if (!IsKeyVideoFrame(circularBuffer_[i])) {
1319                     continue;
1320                 } else {
1321                     for (size_t bIndex = index + 1; bIndex < i; bIndex++) {
1322                         SetReceiverReadFlag(receiverId, circularBuffer_[bIndex]);
1323                     }
1324                     return i;
1325                 }
1326             } else {
1327                 return i;
1328             }
1329         }
1330     }
1331 
1332     return index;
1333 }
1334 
ResetAllIndex()1335 void BufferDispatcher::ResetAllIndex()
1336 {
1337     SHARING_LOGD("trace.");
1338     std::lock_guard<std::mutex> locker(notifyMutex_);
1339     keyIndexList_.clear();
1340     for (auto &[recvId, notifier] : notifiers_) {
1341         notifier->videoIndex = INVALID_INDEX;
1342         notifier->audioIndex = INVALID_INDEX;
1343     }
1344 
1345     videoNeedActivate_ = true;
1346     audioNeedActivate_ = true;
1347     lastAudioIndex_ = INVALID_INDEX;
1348     lastVideoIndex_ = INVALID_INDEX;
1349 }
1350 
IsDataReaded(uint32_t receiverId,DataSpec::Ptr & dataSpec)1351 bool BufferDispatcher::IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1352 {
1353     MEDIA_LOGD("trace.");
1354     uint32_t index = FindReceiverIndex(receiverId);
1355     if (index == INVALID_INDEX) {
1356         return false;
1357     }
1358 
1359     return dataSpec->reserveFlag & RECV_FLAG_BASE << index;
1360 }
1361 
IsVideoData(const DataSpec::Ptr & dataSpec)1362 bool BufferDispatcher::IsVideoData(const DataSpec::Ptr &dataSpec)
1363 {
1364     MEDIA_LOGD("trace.");
1365     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1366         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1367         return false;
1368     }
1369 
1370     return dataSpec->mediaData->mediaType == MEDIA_TYPE_VIDEO;
1371 }
1372 
IsAudioData(const DataSpec::Ptr & dataSpec)1373 bool BufferDispatcher::IsAudioData(const DataSpec::Ptr &dataSpec)
1374 {
1375     MEDIA_LOGD("trace.");
1376     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1377         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1378         return false;
1379     }
1380 
1381     return dataSpec->mediaData->mediaType == MEDIA_TYPE_AUDIO;
1382 }
1383 
IsKeyVideoFrame(const DataSpec::Ptr & dataSpec)1384 bool BufferDispatcher::IsKeyVideoFrame(const DataSpec::Ptr &dataSpec)
1385 {
1386     MEDIA_LOGD("trace.");
1387     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1388         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1389         return false;
1390     }
1391 
1392     return IsVideoData(dataSpec) && dataSpec->mediaData->keyFrame;
1393 }
1394 
HeadFrameNeedReserve()1395 bool BufferDispatcher::HeadFrameNeedReserve()
1396 {
1397     MEDIA_LOGD("trace.");
1398     if (!circularBuffer_.empty()) {
1399         uint8_t temp = readRefFlag_;
1400         MEDIA_LOGD("IsHeadFrameNeedReserve Head reserveFlag %{public}d readRefFlag_ %{public}d.",
1401                    circularBuffer_.front()->reserveFlag.load(), readRefFlag_);
1402         return temp ^ circularBuffer_.front()->reserveFlag;
1403     }
1404 
1405     return false;
1406 }
1407 
NeedExtendToDBCapacity()1408 bool BufferDispatcher::NeedExtendToDBCapacity()
1409 {
1410     MEDIA_LOGD("trace.");
1411     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1412     return (circularBuffer_.size() >= circularBuffer_.capacity() &&
1413             circularBuffer_.capacity() < doubleBufferCapacity_ && HeadFrameNeedReserve());
1414 }
1415 
NeedRestoreToNormalCapacity()1416 bool BufferDispatcher::NeedRestoreToNormalCapacity()
1417 {
1418     MEDIA_LOGD("trace.");
1419     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1420     return audioFrameCnt_ >= circularBuffer_.capacity() && circularBuffer_.capacity() != INITIAL_BUFFER_CAPACITY;
1421 }
1422 
ReCalculateCapacity(bool keyFrame)1423 void BufferDispatcher::ReCalculateCapacity(bool keyFrame)
1424 {
1425     MEDIA_LOGD("trace.");
1426     baseCounter_++;
1427     if (baseCounter_ >= maxBufferCapacity_) {
1428         SHARING_LOGE("BufferDispatcher too many Audiodata need Set Capacity to default.");
1429     }
1430 
1431     if (baseCounter_ >= circularBuffer_.capacity() && !keyFrame) {
1432         uint32_t tmpSize = circularBuffer_.capacity() + bufferCapacityIncrement_ < maxBufferCapacity_
1433                                ? circularBuffer_.capacity() + bufferCapacityIncrement_
1434                                : maxBufferCapacity_;
1435         doubleBufferCapacity_ = tmpSize * 2 < maxBufferCapacity_ ? tmpSize * 2 : maxBufferCapacity_; // 2: increasement
1436         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d in adaptive capacity calculating.", tmpSize);
1437         SetBufferCapacity(tmpSize);
1438         return;
1439     }
1440 
1441     if (keyFrame) {
1442         baseBufferCapacity_ = baseCounter_ + bufferCapacityIncrement_ < maxBufferCapacity_
1443                                   ? baseCounter_ + bufferCapacityIncrement_
1444                                   : maxBufferCapacity_;
1445         if (baseBufferCapacity_ < INITIAL_BUFFER_CAPACITY) {
1446             baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1447         }
1448         doubleBufferCapacity_ = baseBufferCapacity_ * 2 < maxBufferCapacity_ // 2: increasement
1449                                     ? baseBufferCapacity_ * 2                // 2: increasement
1450                                     : maxBufferCapacity_;
1451         gop_ = baseCounter_;
1452         capacityEvaluating_ = gop_ > 0 ? false : true;
1453         SetBufferCapacity(baseBufferCapacity_);
1454         baseCounter_ = 0;
1455         SHARING_LOGD(
1456             "The gop is %{public}d and BufferDispatcher buffer Extended to %{public}d on base capacity confirmed.",
1457             GetCurrentGop(), baseBufferCapacity_);
1458     }
1459 }
1460 
NotifyThreadWorker(void * userParam)1461 int32_t BufferDispatcher::NotifyThreadWorker(void *userParam)
1462 {
1463     SHARING_LOGI("BufferDispatcher DataNotifier thread in.");
1464     RETURN_INVALID_IF_NULL(userParam);
1465     BufferDispatcher *dispatcher = (BufferDispatcher *)userParam;
1466     while (dispatcher->running_) {
1467         std::unique_lock<std::mutex> locker(dispatcher->notifyMutex_);
1468         uint32_t notifyRef = dispatcher->dataBitRef_ & dispatcher->recvBitRef_;
1469         MEDIA_LOGD("DataBitRef %{public}u   recvBitRef_ %{public}d   notifyRef_ %{public}d.",
1470                    dispatcher->dataBitRef_.load(), dispatcher->recvBitRef_.load(), notifyRef);
1471 
1472         for (auto &[recvId, notifier] : dispatcher->notifiers_) {
1473             auto index = notifier->GetReadIndex();
1474             if (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ||
1475                 ((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO + FIX_OFFSET_ONE)) & notifyRef)) {
1476                 MediaType notifyType;
1477                 if (notifier->IsMixedReceiver()) {
1478                     notifyType = MEDIA_TYPE_AV;
1479                 } else {
1480                     notifyType = (((RECV_FLAG_BASE << (index * FIX_OFFSET_TWO)) & notifyRef) ? MEDIA_TYPE_AUDIO
1481                                                                                              : MEDIA_TYPE_VIDEO);
1482                 }
1483                 MEDIA_LOGD("notify the receiveId: %{public}d, notifyType: %{public}d, notifyRef: %{public}x.", recvId,
1484                            notifyType, notifyRef);
1485                 notifier->NotifyDataReceiver(notifyType);
1486             }
1487         }
1488 
1489         dispatcher->dataCV_.wait(locker, [&dispatcher]() { return dispatcher->continueNotify_.load(); });
1490         dispatcher->continueNotify_ = false;
1491     }
1492 
1493     return 0;
1494 }
1495 
NotifyReadReady(uint32_t receiverId,MediaType type)1496 void BufferDispatcher::NotifyReadReady(uint32_t receiverId, MediaType type)
1497 {
1498     MEDIA_LOGD("trace.");
1499     auto notifier = GetNotifierByReceiverId(receiverId);
1500     if (notifier == nullptr) {
1501         SHARING_LOGE("notifier is nullptr.");
1502         return;
1503     }
1504 
1505     std::shared_lock<std::shared_mutex> lock(bufferMutex_);
1506     std::unique_lock<std::mutex> locker(notifyMutex_);
1507     if (type == MEDIA_TYPE_AV) {
1508         SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, true);
1509         SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, true);
1510     } else {
1511         SetReceiverReadRef(receiverId, type, true);
1512     }
1513 
1514     bool dataAvaliable = notifier->DataAvailable(type);
1515     MEDIA_LOGW("receiverId %{public}d  MediaType %{public}d  dataAvaliable %{public}d.", receiverId, type,
1516                dataAvaliable);
1517     if (type == MEDIA_TYPE_AV) {
1518         SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, dataAvaliable);
1519         SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, dataAvaliable);
1520     } else {
1521         SetReceiverDataRef(receiverId, type, dataAvaliable);
1522     }
1523 
1524     if (!dataAvaliable) {
1525         return;
1526     }
1527 
1528     continueNotify_ = true;
1529     dataCV_.notify_one();
1530 }
1531 
SetDataRef(uint32_t bitref)1532 void BufferDispatcher::SetDataRef(uint32_t bitref)
1533 {
1534     SHARING_LOGD("trace.");
1535     dataBitRef_ &= bitref;
1536 }
1537 
GetDataRef()1538 uint32_t BufferDispatcher::GetDataRef()
1539 {
1540     SHARING_LOGD("trace.");
1541     return dataBitRef_;
1542 }
1543 
SetReadRef(uint32_t bitref)1544 void BufferDispatcher::SetReadRef(uint32_t bitref)
1545 {
1546     SHARING_LOGD("trace.");
1547     recvBitRef_ &= bitref;
1548 }
1549 
GetReadRef()1550 uint32_t BufferDispatcher::GetReadRef()
1551 {
1552     SHARING_LOGD("trace.");
1553     return recvBitRef_;
1554 }
1555 
ActiveDataRef(MediaType type,bool keyFrame)1556 void BufferDispatcher::ActiveDataRef(MediaType type, bool keyFrame)
1557 {
1558     MEDIA_LOGD("trace.");
1559     std::unique_lock<std::mutex> locker(notifyMutex_);
1560     uint32_t bitRef = 0x0000;
1561     for (auto &[recvId, notifier] : notifiers_) {
1562         auto index = notifier->GetReadIndex();
1563         if (type == MEDIA_TYPE_AUDIO) {
1564             bitRef |= RECV_FLAG_BASE << (index * 2); // 2: fix offset, get audio notifyer id
1565             continue;
1566         }
1567         bool keyModeReceiver = false;
1568         keyModeReceiver = notifier->IsKeyModeReceiver();
1569         if (keyFrame && keyModeReceiver && keyIndexList_.empty()) {
1570             notifier->videoIndex = circularBuffer_.size() - 1;
1571         }
1572         if ((!keyModeReceiver || (keyModeReceiver && keyFrame))) {
1573             if (index != INVALID_INDEX) {
1574                 bitRef |= RECV_FLAG_BASE << (index * 2 + 1); // 2: fix offset, get video notifyer id
1575             }
1576         }
1577     }
1578 
1579     dataBitRef_ |= bitRef;
1580 }
1581 
SetReceiverDataRef(uint32_t receiverId,MediaType type,bool ready)1582 void BufferDispatcher::SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready)
1583 {
1584     MEDIA_LOGD("trace.");
1585     uint32_t index = FindReceiverIndex(receiverId);
1586     if (index == INVALID_INDEX) {
1587         return;
1588     }
1589 
1590     if (type == MEDIA_TYPE_AUDIO) {
1591         uint32_t audioBit = index * 2;
1592         uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1593         MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1594         if (!ready) {
1595             bitRef = ~bitRef;
1596             dataBitRef_ &= bitRef;
1597         } else {
1598             dataBitRef_ |= bitRef;
1599         }
1600     } else if (type == MEDIA_TYPE_VIDEO) {
1601         uint32_t videoBit = index * 2 + 1;
1602         uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1603         MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1604         if (!ready) {
1605             bitRef = ~bitRef;
1606             dataBitRef_ &= bitRef;
1607         } else {
1608             dataBitRef_ |= bitRef;
1609         }
1610     }
1611 }
1612 
SetReceiverReadRef(uint32_t receiverId,MediaType type,bool ready)1613 void BufferDispatcher::SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready)
1614 {
1615     MEDIA_LOGD("trace.");
1616     uint32_t index = FindReceiverIndex(receiverId);
1617     if (index == INVALID_INDEX) {
1618         return;
1619     }
1620 
1621     if (type == MEDIA_TYPE_AUDIO) {
1622         uint32_t audioBit = index * 2;
1623         uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1624         MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1625 
1626         if (!ready) {
1627             bitRef = ~bitRef;
1628             recvBitRef_ &= bitRef;
1629         } else {
1630             recvBitRef_ |= bitRef;
1631         }
1632     } else if (type == MEDIA_TYPE_VIDEO) {
1633         uint32_t videoBit = index * 2 + 1;
1634         uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1635         MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1636 
1637         if (!ready) {
1638             bitRef = ~bitRef;
1639             recvBitRef_ &= bitRef;
1640         } else {
1641             recvBitRef_ |= bitRef;
1642         }
1643     }
1644 }
1645 
GetReceiverDataRef(uint32_t receiverId)1646 uint32_t BufferDispatcher::GetReceiverDataRef(uint32_t receiverId)
1647 {
1648     SHARING_LOGD("trace.");
1649     return 0;
1650 }
1651 
GetReceiverReadRef(uint32_t receiverId)1652 uint32_t BufferDispatcher::GetReceiverReadRef(uint32_t receiverId)
1653 {
1654     SHARING_LOGD("trace.");
1655     uint32_t retBitRef = GetReceiverIndexRef(receiverId);
1656     retBitRef &= recvBitRef_;
1657     return retBitRef;
1658 }
1659 
GetReceiverIndexRef(uint32_t receiverId)1660 uint32_t BufferDispatcher::GetReceiverIndexRef(uint32_t receiverId)
1661 {
1662     SHARING_LOGD("trace.");
1663     uint32_t index = FindReceiverIndex(receiverId);
1664     uint32_t audioBit = index * 2;
1665     uint32_t videoBit = index * 2 + 1;
1666     uint32_t retBitRef = 0x0000;
1667     retBitRef |= RECV_FLAG_BASE << audioBit;
1668     retBitRef |= RECV_FLAG_BASE << videoBit;
1669     return retBitRef;
1670 }
1671 
ClearReadBit(uint32_t receiverId,MediaType type)1672 void BufferDispatcher::ClearReadBit(uint32_t receiverId, MediaType type)
1673 {
1674     MEDIA_LOGD("trace.");
1675     std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1676     if (type != MEDIA_TYPE_AV) {
1677         SetReceiverReadRef(receiverId, type, false);
1678     } else {
1679         SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
1680         SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
1681     }
1682 }
1683 
ClearDataBit(uint32_t receiverId,MediaType type)1684 void BufferDispatcher::ClearDataBit(uint32_t receiverId, MediaType type)
1685 {
1686     MEDIA_LOGD("trace.");
1687     std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1688     if (type != MEDIA_TYPE_AV) {
1689         SetReceiverDataRef(receiverId, type, false);
1690     } else {
1691         SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, false);
1692         SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, false);
1693     }
1694 }
1695 
IsRead(uint32_t receiverId,uint32_t index)1696 bool BufferDispatcher::IsRead(uint32_t receiverId, uint32_t index)
1697 {
1698     MEDIA_LOGD("trace.");
1699     if (index >= circularBuffer_.size()) {
1700         return true;
1701     } else {
1702         return IsDataReaded(receiverId, circularBuffer_.at(index));
1703     }
1704 }
1705 
ActivateReceiverIndex(uint32_t index,MediaType type)1706 void BufferDispatcher::ActivateReceiverIndex(uint32_t index, MediaType type)
1707 {
1708     MEDIA_LOGD("trace.");
1709     std::unique_lock<std::mutex> lock(notifyMutex_);
1710     for (auto &[recvId, notifier] : notifiers_) {
1711         if (type == MEDIA_TYPE_VIDEO) {
1712             if (notifier->videoIndex == INVALID_INDEX) {
1713                 notifier->videoIndex = index;
1714                 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->videoIndex);
1715                 videoNeedActivate_ = false;
1716             }
1717         } else {
1718             if (notifier->audioIndex == INVALID_INDEX) {
1719                 notifier->audioIndex = index;
1720                 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->audioIndex);
1721                 audioNeedActivate_ = false;
1722             }
1723         }
1724     }
1725 }
UnlockWaitingReceiverIndex(MediaType type)1726 void BufferDispatcher::UnlockWaitingReceiverIndex(MediaType type)
1727 {
1728     SHARING_LOGD("trace.");
1729 }
1730 
SetReceiverReadFlag(uint32_t receiverId,DataSpec::Ptr & dataSpec)1731 void BufferDispatcher::SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1732 {
1733     MEDIA_LOGD("trace.");
1734     RETURN_IF_NULL(dataSpec);
1735     uint32_t index = FindReceiverIndex(receiverId);
1736     if (index != INVALID_INDEX) {
1737         dataSpec->reserveFlag |= RECV_FLAG_BASE << index;
1738         MEDIA_LOGW("mediaType: %{public}d, pts: %{public}" PRIu64
1739                    ", reserveFlag: %{public}x, receiverId: %{public}d, index: %{public}d.",
1740                    dataSpec->mediaData->mediaType, dataSpec->mediaData->pts, dataSpec->reserveFlag.load(), receiverId,
1741                    index);
1742     }
1743 }
1744 
CancelReserve()1745 void BufferDispatcher::CancelReserve()
1746 {
1747     SHARING_LOGD("trace.");
1748     for (auto &data : circularBuffer_) {
1749         data->reserveFlag = 0xff;
1750     }
1751 }
1752 
SetSpsNalu(MediaData::Ptr spsbuf)1753 void BufferDispatcher::SetSpsNalu(MediaData::Ptr spsbuf)
1754 {
1755     SHARING_LOGD("trace.");
1756     spsBuf_ = spsbuf;
1757 }
1758 
GetSPS()1759 const MediaData::Ptr BufferDispatcher::GetSPS()
1760 {
1761     MEDIA_LOGD("trace.");
1762     return spsBuf_;
1763 }
1764 
SetPpsNalu(MediaData::Ptr ppsbuf)1765 void BufferDispatcher::SetPpsNalu(MediaData::Ptr ppsbuf)
1766 {
1767     SHARING_LOGD("trace.");
1768     ppsBuf_ = ppsbuf;
1769 }
1770 
GetPPS()1771 const MediaData::Ptr BufferDispatcher::GetPPS()
1772 {
1773     MEDIA_LOGD("trace.");
1774     return ppsBuf_;
1775 }
1776 
GetCurrentGop()1777 uint32_t BufferDispatcher::GetCurrentGop()
1778 {
1779     SHARING_LOGD("trace.");
1780     return gop_;
1781 }
1782 
OnKeyRedirect()1783 void BufferDispatcher::OnKeyRedirect()
1784 {
1785     SHARING_LOGD("trace.");
1786     std::unique_lock<std::mutex> lock(notifyMutex_);
1787     if (keyIndexList_.empty()) {
1788         return;
1789     }
1790 
1791     auto nextIndex = keyIndexList_.back();
1792 
1793     for (auto &[recvId, notifier] : notifiers_) {
1794         if (notifier->IsKeyRedirectReceiver()) {
1795             SHARING_LOGD("receiverId: %{public}u, videoIndex: %{public}d, nextIndex: %{public}d.",
1796                          notifier->GetReceiverId(), notifier->videoIndex, nextIndex);
1797             auto curIndex = notifier->videoIndex;
1798             notifier->videoIndex = nextIndex;
1799             for (auto i = curIndex; i < nextIndex; i++) {
1800                 SetReceiverReadFlag(notifier->GetReceiverId(), circularBuffer_[i]);
1801             }
1802             if (!rapidMode_) {
1803                 auto receiver = notifier->GetBufferReceiver();
1804                 if (receiver != nullptr) {
1805                     receiver->EnableKeyRedirect(false);
1806                 }
1807             }
1808         }
1809     }
1810 }
1811 
EnableRapidMode(bool enable)1812 void BufferDispatcher::EnableRapidMode(bool enable)
1813 {
1814     SHARING_LOGD("trace.");
1815     rapidMode_ = enable;
1816 }
1817 
1818 } // namespace Sharing
1819 } // namespace OHOS
1820