• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "buffer_dispatcher.h"
17 #include <cmath>
18 #include <cstdarg>
19 #include <cstdint>
20 #include "media_channel_def.h"
21 namespace OHOS {
22 namespace Sharing {
23 
24 constexpr int32_t WRITING_TIMTOUT = 30;
SetSource(IBufferReader::Ptr dataReader)25 void BufferReceiver::SetSource(IBufferReader::Ptr dataReader)
26 {
27     SHARING_LOGD("trace.");
28     bufferReader_ = dataReader;
29 }
30 
OnMediaDataNotify()31 int32_t BufferReceiver::OnMediaDataNotify()
32 {
33     SHARING_LOGD("BufferReceiver Media notified.");
34     dataReady_ = true;
35     notifyData_.notify_one();
36     return 0;
37 }
38 
OnAudioDataNotify()39 int32_t BufferReceiver::OnAudioDataNotify()
40 {
41     MEDIA_LOGD("BufferReceiver Audio notified.");
42     nonBlockAudio_ = true;
43     notifyAudio_.notify_one();
44     return 0;
45 }
46 
OnVideoDataNotify()47 int32_t BufferReceiver::OnVideoDataNotify()
48 {
49     MEDIA_LOGD("BufferReceiver Video notified.");
50     nonBlockVideo_ = true;
51     notifyVideo_.notify_one();
52     return 0;
53 }
54 
IsMixedReceiver()55 bool BufferReceiver::IsMixedReceiver()
56 {
57     MEDIA_LOGD("trace.");
58     return mixed_;
59 }
60 
RequestRead(MediaType type,std::function<void (const MediaData::Ptr & data)> cb)61 int32_t BufferReceiver::RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb)
62 {
63     MEDIA_LOGD("trace.");
64     if (bufferReader_ == nullptr) {
65         SHARING_LOGE("BufferReceiver read failed null dispatcher.");
66         return -1;
67     }
68 
69     if (firstMRead_ && type == MEDIA_TYPE_AV) {
70         bufferReader_->NotifyReadReady(GetReceiverId(), type);
71         mixed_ = true;
72         firstMRead_ = false;
73     } else if (firstARead_ && type == MEDIA_TYPE_AUDIO) {
74         bufferReader_->NotifyReadReady(GetReceiverId(), type);
75         firstARead_ = false;
76     } else if (firstVRead_ && type == MEDIA_TYPE_VIDEO) {
77         bufferReader_->NotifyReadReady(GetReceiverId(), type);
78         firstVRead_ = false;
79     }
80     std::unique_lock<std::mutex> locker(mutex_);
81     MEDIA_LOGD("BufferDispatcher NotifyThreadWorker before wait, receiverId: %{public}u.", GetReceiverId());
82     switch (type) {
83         /*  cv will waiting if pred is false;
84             so set waiting audio pred (type != MEDIA_TYPE_AUDIO) to NOT block other type.*/
85         case MEDIA_TYPE_AUDIO:
86             MEDIA_LOGD("wait Audio, receiverId: %{public}u.", GetReceiverId());
87             notifyAudio_.wait(locker, [=]() { return nonBlockAudio_ || type != MEDIA_TYPE_AUDIO; });
88             nonBlockAudio_ = false;
89             break;
90         case MEDIA_TYPE_VIDEO:
91             MEDIA_LOGD("wait Video, receiverId: %{public}u.", GetReceiverId());
92             notifyVideo_.wait(locker, [=]() { return nonBlockVideo_ || type != MEDIA_TYPE_VIDEO; });
93             nonBlockVideo_ = false;
94             break;
95         case MEDIA_TYPE_AV:
96             MEDIA_LOGD("wait Mixed, receiverId: %{public}u.", GetReceiverId());
97             notifyData_.wait(locker, [=]() { return dataReady_ || type != MEDIA_TYPE_AV; });
98             dataReady_ = false;
99             break;
100         default:
101             return 0;
102             break;
103     }
104 
105     bufferReader_->ClearDataBit(GetReceiverId(), type);
106     bufferReader_->ClearReadBit(GetReceiverId(), type);
107     MEDIA_LOGD("BufferDispatcher NotifyThreadWorker after wait  start read, receiverId: %{public}u.", GetReceiverId());
108     int32_t ret = bufferReader_->ReadBufferData(GetReceiverId(), type, cb);
109     bufferReader_->NotifyReadReady(GetReceiverId(), type);
110     dataReady_ = false;
111 
112     return ret;
113 }
114 
NotifyReadStart()115 void BufferReceiver::NotifyReadStart()
116 {
117     SHARING_LOGD("receiverId: %{public}u notify start read.", GetReceiverId());
118     firstARead_ = true;
119     firstVRead_ = true;
120     firstMRead_ = true;
121 }
122 
GetReceiverId()123 uint32_t BufferReceiver::GetReceiverId()
124 {
125     MEDIA_LOGD("trace.");
126     return GetId();
127 }
128 
GetDispatcherId()129 uint32_t BufferReceiver::GetDispatcherId()
130 {
131     SHARING_LOGD("trace.");
132     if (bufferReader_) {
133         return bufferReader_->GetDispatcherId();
134     }
135 
136     return 0;
137 }
138 
NotifyReadStop()139 void BufferReceiver::NotifyReadStop()
140 {
141     SHARING_LOGD("receiverId: %{public}u notify stop read.", GetReceiverId());
142     nonBlockAudio_ = true;
143     nonBlockVideo_ = true;
144     dataReady_ = true;
145     notifyAudio_.notify_all();
146     notifyVideo_.notify_all();
147     notifyData_.notify_all();
148 }
149 
EnableKeyMode(bool enable)150 void BufferReceiver::EnableKeyMode(bool enable)
151 {
152     SHARING_LOGD("bufferReceiver id %{public}u SetKeyOnlyMode %{public}d.", GetReceiverId(), enable);
153     if (keyOnly_ == true && enable == false) {
154         SHARING_LOGD("Set KeyOnlyMode false, need report fast read over.");
155         accelerationDone_ = true;
156     }
157 
158     keyOnly_ = enable;
159     if (bufferReader_ && enable) {
160         bufferReader_->ClearDataBit(GetReceiverId(), MEDIA_TYPE_VIDEO);
161     }
162 
163     auto listener = listener_.lock();
164     if (listener) {
165         listener->OnKeyModeNotify(enable);
166     }
167 }
168 
IsKeyMode()169 bool BufferReceiver::IsKeyMode()
170 {
171     MEDIA_LOGD("trace.");
172     return keyOnly_;
173 }
174 
IsKeyRedirect()175 bool BufferReceiver::IsKeyRedirect()
176 {
177     SHARING_LOGD("trace.");
178     return keyRedirect_;
179 }
180 
GetSPS()181 const MediaData::Ptr BufferReceiver::GetSPS()
182 {
183     MEDIA_LOGD("trace.");
184     if (bufferReader_) {
185         return bufferReader_->GetSPS();
186     }
187 
188     return nullptr;
189 }
190 
GetPPS()191 const MediaData::Ptr BufferReceiver::GetPPS()
192 {
193     MEDIA_LOGD("trace.");
194     if (bufferReader_) {
195         return bufferReader_->GetPPS();
196     }
197 
198     return nullptr;
199 }
200 
NeedAcceleration()201 bool BufferReceiver::NeedAcceleration()
202 {
203     MEDIA_LOGD("trace.");
204     return accelerationDone_;
205 }
206 
DisableAcceleration()207 void BufferReceiver::DisableAcceleration()
208 {
209     SHARING_LOGD("trace.");
210     accelerationDone_ = false;
211 }
212 
SendAccelerationDone()213 void BufferReceiver::SendAccelerationDone()
214 {
215     SHARING_LOGD("trace.");
216     auto listener = listener_.lock();
217     if (listener) {
218         listener->OnAccelerationDoneNotify();
219     }
220 }
221 
EnableKeyRedirect(bool enable)222 void BufferReceiver::EnableKeyRedirect(bool enable)
223 {
224     SHARING_LOGD("trace.");
225     if (bufferReader_ && enable) {
226         bufferReader_->EnableKeyRedirect(enable);
227     }
228     keyRedirect_ = enable;
229 }
230 
SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)231 void BufferReceiver::SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)
232 {
233     SHARING_LOGD("trace.");
234     listener_ = listener;
235 }
236 
237 using DataNotifier = BufferDispatcher::DataNotifier;
238 
NotifyDataReceiver(MediaType type)239 void DataNotifier::NotifyDataReceiver(MediaType type)
240 {
241     MEDIA_LOGD("trace.");
242     if (receiver_.lock() == nullptr) {
243         SHARING_LOGE("target receiver NOT exist.");
244         return;
245     }
246 
247     if (block_) {
248         return;
249     }
250 
251     MEDIA_LOGD("notify target type %{public}d.", type);
252     switch (type) {
253         case MEDIA_TYPE_AUDIO:
254             GetBufferReceiver()->OnAudioDataNotify();
255             break;
256         case MEDIA_TYPE_VIDEO:
257             GetBufferReceiver()->OnVideoDataNotify();
258             break;
259         case MEDIA_TYPE_AV:
260             GetBufferReceiver()->OnMediaDataNotify();
261             break;
262         default:
263             SHARING_LOGI("none process case.");
264             break;
265     }
266 }
267 
GetBufferReceiver()268 BufferReceiver::Ptr DataNotifier::GetBufferReceiver()
269 {
270     MEDIA_LOGD("trace.");
271     return receiver_.lock();
272 }
273 
GetReceiverId()274 uint32_t DataNotifier::GetReceiverId()
275 {
276     MEDIA_LOGD("trace.");
277     auto receiver = receiver_.lock();
278     if (receiver == nullptr) {
279         SHARING_LOGE("target receiver NOT exist.");
280         return INVALID_INDEX;
281     }
282 
283     return receiver->GetReceiverId();
284 }
285 
SetListenDispatcher(IBufferReader::Ptr dispatcher)286 void DataNotifier::SetListenDispatcher(IBufferReader::Ptr dispatcher)
287 {
288     SHARING_LOGD("trace.");
289     dispatcher_ = dispatcher;
290 }
291 
SetNotifyReceiver(BufferReceiver::Ptr receiver)292 void DataNotifier::SetNotifyReceiver(BufferReceiver::Ptr receiver)
293 {
294     SHARING_LOGD("trace.");
295     receiver_ = receiver;
296 }
297 
SetBlock()298 void DataNotifier::SetBlock()
299 {
300     SHARING_LOGD("trace.");
301     block_ = true;
302 }
303 
SetNeedUpdate(bool enable,MediaType type)304 void DataNotifier::SetNeedUpdate(bool enable, MediaType type)
305 {
306     MEDIA_LOGD("trace.");
307     if (type == MEDIA_TYPE_AUDIO) {
308         needUpdateAIndex = enable;
309     } else {
310         needUpdateVIndex = enable;
311     }
312 }
313 
DataAvailable(MediaType type)314 bool DataNotifier::DataAvailable(MediaType type)
315 {
316     MEDIA_LOGD("trace.");
317     auto dispatcher = dispatcher_.lock();
318     if (dispatcher == nullptr) {
319         SHARING_LOGE("target dispatcher NOT exist.");
320         return false;
321     }
322 
323     if (type == MEDIA_TYPE_AUDIO) {
324         return audioIndex != INVALID_INDEX &&
325                (audioIndex < dispatcher->GetLatestAudioIndex() || !dispatcher->IsRead(GetReceiverId(), audioIndex + 1));
326     } else if (type == MEDIA_TYPE_VIDEO) {
327         return videoIndex != INVALID_INDEX &&
328                (videoIndex < dispatcher->GetLatestVideoIndex() || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
329     } else {
330         return videoIndex != INVALID_INDEX &&
331                (videoIndex < dispatcher->GetBufferSize() - 1 || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
332     }
333 
334     return false;
335 }
336 
IsMixedReceiver()337 bool DataNotifier::IsMixedReceiver()
338 {
339     MEDIA_LOGD("trace.");
340     auto receiver = receiver_.lock();
341     if (receiver == nullptr) {
342         SHARING_LOGE("target receiver NOT exist.");
343         return false;
344     }
345 
346     return receiver->IsMixedReceiver();
347 }
348 
GetReceiverReadIndex(MediaType type)349 uint32_t DataNotifier::GetReceiverReadIndex(MediaType type)
350 {
351     MEDIA_LOGD("trace.");
352     switch (type) {
353         case MEDIA_TYPE_VIDEO:
354             MEDIA_LOGD("Video Recvid:%{public}d index: %{public}d.", GetReceiverId(), videoIndex);
355             return videoIndex;
356             break;
357         case MEDIA_TYPE_AUDIO:
358             MEDIA_LOGD("Audio Recvid:%{public}d index: %{public}d.", GetReceiverId(), audioIndex);
359             return audioIndex;
360             break;
361         case MEDIA_TYPE_AV:
362             MEDIA_LOGD("Mixed Recvid:%{public}d vindex: %{public}d  aindex: %{public}d.", GetReceiverId(), videoIndex,
363                        audioIndex);
364             if (audioIndex != INVALID_INDEX && videoIndex != INVALID_INDEX) {
365                 return audioIndex <= videoIndex ? audioIndex : videoIndex;
366             } else if (audioIndex == INVALID_INDEX && videoIndex == INVALID_INDEX) {
367                 return INVALID_INDEX;
368             } else {
369                 return audioIndex == INVALID_INDEX ? videoIndex : audioIndex;
370             }
371             break;
372         default:
373             return INVALID_INDEX;
374             break;
375     }
376 }
377 
IsKeyModeReceiver()378 bool DataNotifier::IsKeyModeReceiver()
379 {
380     MEDIA_LOGD("trace.");
381     auto receiver = receiver_.lock();
382     if (receiver) {
383         return receiver->IsKeyMode();
384     }
385 
386     return false;
387 }
388 
IsKeyRedirectReceiver()389 bool DataNotifier::IsKeyRedirectReceiver()
390 {
391     SHARING_LOGD("trace.");
392     auto receiver = receiver_.lock();
393     if (receiver) {
394         return receiver->IsKeyRedirect();
395     }
396 
397     return false;
398 }
399 
NeedAcceleration()400 bool DataNotifier::NeedAcceleration()
401 {
402     MEDIA_LOGD("trace.");
403     auto receiver = receiver_.lock();
404     if (receiver == nullptr) {
405         SHARING_LOGE("target receiver NOT exist.");
406         return false;
407     }
408 
409     return receiver->NeedAcceleration();
410 }
411 
SendAccelerationDone()412 void DataNotifier::SendAccelerationDone()
413 {
414     SHARING_LOGD("trace.");
415     auto receiver = receiver_.lock();
416     if (receiver == nullptr) {
417         SHARING_LOGE("target receiver NOT exist.");
418         return;
419     }
420 
421     receiver->SendAccelerationDone();
422     receiver->DisableAcceleration();
423 }
424 
BufferDispatcher(uint32_t maxCapacity,uint32_t capacityIncrement)425 BufferDispatcher::BufferDispatcher(uint32_t maxCapacity, uint32_t capacityIncrement)
426 {
427     SHARING_LOGD("BufferDispatcher ctor, set capacity: %{public}u.", maxCapacity);
428     maxBufferCapacity_ = maxCapacity;
429     bufferCapacityIncrement_ = capacityIncrement;
430     {
431         std::lock_guard<std::mutex> lock(idleMutex_);
432         idleAudioBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
433         idleVideoBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
434         for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
435             MediaData::Ptr adata = std::make_shared<MediaData>();
436             MediaData::Ptr vdata = std::make_shared<MediaData>();
437             adata->buff = std::make_shared<DataBuffer>();
438             vdata->buff = std::make_shared<DataBuffer>();
439             idleAudioBuffer_.push_back(adata);
440             idleVideoBuffer_.push_back(vdata);
441         }
442     }
443 
444     writingTimer_ = std::make_unique<TimeoutTimer>("dispatcher-writing-timer");
445 
446     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
447     circularBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
448     StartDispatch();
449 }
450 
~BufferDispatcher()451 BufferDispatcher::~BufferDispatcher()
452 {
453     SHARING_LOGI("BufferDispatcher dtor.");
454     running_ = false;
455     StopDispatch();
456     FlushBuffer();
457     ReleaseIdleBuffer();
458     ReleaseAllReceiver();
459 }
460 
StartDispatch()461 void BufferDispatcher::StartDispatch()
462 {
463     SHARING_LOGD("trace.");
464     running_ = true;
465     notifyThread_ = std::thread(&BufferDispatcher::NotifyThreadWorker, this);
466     std::string name = "notifyThread";
467     pthread_setname_np(notifyThread_.native_handle(), name.c_str());
468 }
469 
StopDispatch()470 void BufferDispatcher::StopDispatch()
471 {
472     SHARING_LOGD("trace.");
473     running_ = false;
474     continueNotify_ = true;
475 
476     if (writingTimer_) {
477         writingTimer_.reset();
478     }
479 
480     dataCV_.notify_all();
481     if (notifyThread_.joinable()) {
482         notifyThread_.join();
483     }
484 }
485 
SetBufferCapacity(size_t capacity)486 void BufferDispatcher::SetBufferCapacity(size_t capacity)
487 {
488     SHARING_LOGD("trace.");
489     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
490     circularBuffer_.set_capacity(capacity);
491 }
492 
SetDataMode(MediaDispacherMode dataMode)493 void BufferDispatcher::SetDataMode(MediaDispacherMode dataMode)
494 {
495     SHARING_LOGD("trace.");
496     dataMode_ = dataMode;
497 }
498 
ReleaseIdleBuffer()499 void BufferDispatcher::ReleaseIdleBuffer()
500 {
501     SHARING_LOGD("BufferDispatcher idle Release Start.");
502     std::unique_lock<std::mutex> locker(idleMutex_);
503     for (auto &data : idleAudioBuffer_) {
504         if (data != nullptr || data->buff != nullptr) {
505             data->buff.reset();
506         }
507     }
508 
509     idleAudioBuffer_.clear();
510     for (auto &data : idleVideoBuffer_) {
511         if (data != nullptr || data->buff != nullptr) {
512             data->buff.reset();
513         }
514     }
515 
516     idleVideoBuffer_.clear();
517     SHARING_LOGD("BufferDispatcher idle Release End.");
518 }
519 
FlushBuffer()520 void BufferDispatcher::FlushBuffer()
521 {
522     SHARING_LOGD("BufferDispatcher Start flushing, dispatcherId: %{public}u.", GetDispatcherId());
523     {
524         std::lock_guard<std::mutex> lock(idleMutex_);
525         idleAudioBuffer_.clear();
526         idleVideoBuffer_.clear();
527         for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
528             MediaData::Ptr adata = std::make_shared<MediaData>();
529             MediaData::Ptr vdata = std::make_shared<MediaData>();
530             adata->buff = std::make_shared<DataBuffer>();
531             vdata->buff = std::make_shared<DataBuffer>();
532             idleAudioBuffer_.push_back(adata);
533             idleVideoBuffer_.push_back(vdata);
534         }
535     }
536 
537     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
538     for (auto &data : circularBuffer_) {
539         if (data->mediaData != nullptr && data->mediaData->buff != nullptr) {
540             data->mediaData->buff.reset();
541         }
542     }
543 
544     circularBuffer_.clear();
545     waitingKey_ = true;
546     gop_ = 0;
547     audioFrameCnt_ = 0;
548     videoFrameCnt_ = 0;
549     ResetAllIndex();
550     SHARING_LOGD("BufferDispatcher Dispatcher flushing end, dispatcherId: %{public}u.", GetDispatcherId());
551 }
552 
RequestDataBuffer(MediaType type,uint32_t size)553 MediaData::Ptr BufferDispatcher::RequestDataBuffer(MediaType type, uint32_t size)
554 {
555     SHARING_LOGD("trace.");
556     std::lock_guard<std::mutex> lock(idleMutex_);
557     if (size <= 0) {
558         SHARING_LOGE("Size invalid.");
559         return nullptr;
560     }
561 
562     MediaData::Ptr retData;
563     if (type == MEDIA_TYPE_VIDEO) {
564         if (!idleVideoBuffer_.empty()) {
565             SHARING_LOGD("video From idle.");
566             retData = idleVideoBuffer_.front();
567             idleVideoBuffer_.pop_front();
568             if (retData == nullptr || retData->buff == nullptr) {
569                 MEDIA_LOGW("video From alloc when idle nullptr.");
570                 retData = std::make_shared<MediaData>();
571                 retData->buff = std::make_shared<DataBuffer>();
572                 retData->buff->Resize(size);
573 
574                 return retData;
575             }
576             if (retData->buff->Size() < static_cast<int>(size)) {
577                 retData->buff->Resize(size);
578             }
579             return retData;
580         }
581     } else {
582         if (!idleAudioBuffer_.empty()) {
583             SHARING_LOGD("Audio From idle.");
584             retData = idleAudioBuffer_.front();
585             idleAudioBuffer_.pop_front();
586             if (retData == nullptr || retData->buff == nullptr) {
587                 MEDIA_LOGW("Audio From alloc when idle nullptr.");
588                 retData = std::make_shared<MediaData>();
589                 retData->buff = std::make_shared<DataBuffer>();
590                 retData->buff->Resize(size);
591 
592                 return retData;
593             }
594             if (retData->buff->Size() < size) {
595                 retData->buff->Resize(size);
596             }
597             return retData;
598         }
599     }
600 
601     SHARING_LOGD("Audio From alloc.");
602     retData = std::make_shared<MediaData>();
603     retData->buff = std::make_shared<DataBuffer>();
604     retData->buff->Resize(size);
605     return retData;
606 }
607 
ReturnIdleBuffer(DataSpec::Ptr & data)608 void BufferDispatcher::ReturnIdleBuffer(DataSpec::Ptr &data)
609 {
610     MEDIA_LOGD("trace.");
611     std::lock_guard<std::mutex> lock(idleMutex_);
612     if (data == nullptr || data->mediaData == nullptr) {
613         return;
614     }
615     if (data->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
616         if (idleVideoBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
617             idleVideoBuffer_.push_back(data->mediaData);
618             MEDIA_LOGD("data: push_back in idleVideoBuffer_, size: %{public}zu.", idleVideoBuffer_.size());
619         }
620     } else {
621         if (idleAudioBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
622             idleAudioBuffer_.push_back(data->mediaData);
623             MEDIA_LOGD("data: push_back in idleAudioBuffer_, size: %{public}zu.", idleAudioBuffer_.size());
624         }
625     }
626 
627     data.reset();
628 }
629 
GetBufferSize()630 size_t BufferDispatcher::GetBufferSize()
631 {
632     SHARING_LOGD("trace.");
633     return circularBuffer_.size();
634 }
635 
FindReceiverIndex(uint32_t receiverId)636 uint32_t BufferDispatcher::FindReceiverIndex(uint32_t receiverId)
637 {
638     MEDIA_LOGD("trace.");
639     if (notifiers_.find(receiverId) != notifiers_.end()) {
640         return notifiers_[receiverId]->GetReadIndex();
641     }
642 
643     return INVALID_INDEX;
644 }
645 
IsRecevierExist(uint32_t receiverId)646 bool BufferDispatcher::IsRecevierExist(uint32_t receiverId)
647 {
648     SHARING_LOGD("trace.");
649     auto notifier = GetNotifierByReceiverId(receiverId);
650     if (notifier == nullptr) {
651         return false;
652     }
653 
654     return true;
655 }
656 
EnableKeyMode(bool enable)657 void BufferDispatcher::EnableKeyMode(bool enable)
658 {
659     SHARING_LOGD("trace.");
660     keyOnly_ = enable;
661 }
662 
AttachReceiver(BufferReceiver::Ptr receiver)663 int32_t BufferDispatcher::AttachReceiver(BufferReceiver::Ptr receiver)
664 {
665     SHARING_LOGD("trace.");
666     if (receiver == nullptr) {
667         return -1;
668     }
669 
670     if (IsRecevierExist(receiver->GetReceiverId())) {
671         SHARING_LOGE("Exist.");
672         return 0;
673     }
674 
675     receiver->NotifyReadStart();
676     std::lock_guard<std::mutex> locker(notifyMutex_);
677     if (readRefFlag_ == 0xFFFF) {
678         SHARING_LOGE("readRefFlag limited.");
679         return -1;
680     }
681 
682     DataNotifier::Ptr notifier = std::make_shared<DataNotifier>();
683     notifier->SetListenDispatcher(shared_from_this());
684     notifier->SetNotifyReceiver(receiver);
685 
686     auto usableRef = ~readRefFlag_ & (-(~readRefFlag_));
687 
688     if ((usableRef & (usableRef - 1)) != 0) {
689         SHARING_LOGE("usableRef: %{public}d invalid.", usableRef);
690         return -1;
691     }
692 
693     readRefFlag_ |= usableRef;
694     notifier->SetReadIndex(static_cast<uint32_t>(log2(usableRef)));
695     SHARING_LOGI("receiverId: %{public}d, readIndex: %{public}d, usableRef: %{public}d, readRefFlag_: %{public}d.",
696                  receiver->GetReceiverId(), notifier->GetReadIndex(), usableRef, readRefFlag_);
697     receiver->SetSource(shared_from_this());
698     notifiers_.emplace(receiver->GetReceiverId(), notifier);
699 
700     if (circularBuffer_.empty()) {
701         notifier->audioIndex = INVALID_INDEX;
702         notifier->videoIndex = INVALID_INDEX;
703         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
704         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
705         SHARING_LOGD("BufferDispatcher Attach when buffer empty  RecvId: %{public}d.", receiver->GetReceiverId());
706         videoNeedActivate_ = true;
707         audioNeedActivate_ = true;
708         return 0;
709     }
710 
711     if (dataMode_ == MEDIA_AUDIO_ONLY) {
712         notifier->audioIndex = circularBuffer_.size() - 1;
713         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
714         notifier->videoIndex = INVALID_INDEX;
715         SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
716         SHARING_LOGD("BufferDispatcher Attach when Keyindex List empty  RecvId: %{public}d.",
717                      receiver->GetReceiverId());
718     } else {
719         if (!keyIndexList_.empty()) {
720             SHARING_LOGD("BufferDispatcher Attach with Keyindex  RecvId: %{public}d  KeyIndex:%{public}d.",
721                          receiver->GetReceiverId(), keyIndexList_.back());
722             uint32_t tempIndex = FindNextIndex(keyIndexList_.back(), MEDIA_TYPE_AUDIO);
723             notifier->audioIndex = tempIndex == keyIndexList_.back() ? INVALID_INDEX : tempIndex;
724             notifier->videoIndex = keyIndexList_.back();
725             bool isAudioReady = tempIndex != INVALID_INDEX ? true : false;
726             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, isAudioReady);
727             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, true);
728             if (lastAudioIndex_ == INVALID_INDEX) {
729                 audioNeedActivate_ = true;
730             }
731         } else {
732             SHARING_LOGD("BufferDispatcher Attach with Non Keyindex Exist RecvId: %{public}d.",
733                          receiver->GetReceiverId());
734             uint32_t tempIndex = FindLastIndex(MEDIA_TYPE_AUDIO);
735             notifier->audioIndex = tempIndex;
736             notifier->videoIndex = INVALID_INDEX;
737             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
738             SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
739             if (lastAudioIndex_ == INVALID_INDEX) {
740                 audioNeedActivate_ = true;
741             }
742         }
743     }
744 
745     return 0;
746 }
747 
DetachReceiver(BufferReceiver::Ptr receiver)748 int32_t BufferDispatcher::DetachReceiver(BufferReceiver::Ptr receiver)
749 {
750     SHARING_LOGI("buffer dispatcher: Detach receiver in.");
751     if (receiver == nullptr) {
752         SHARING_LOGE("buffer dispatcher: Detach receiver failed - null receiver.");
753         return -1;
754     }
755 
756     if (!IsRecevierExist(receiver->GetReceiverId())) {
757         SHARING_LOGE("BufferDispatcher AttachReceiver No Vaild Recevier Exist.");
758         return 0;
759     }
760 
761     auto notifier = GetNotifierByReceiverPtr(receiver);
762     if (notifier == nullptr) {
763         SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
764         return -1;
765     }
766 
767     std::lock_guard<std::mutex> locker(notifyMutex_);
768     notifier->SetBlock();
769     SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
770     SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
771 
772     readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
773     notifiers_.erase(receiver->GetReceiverId());
774     SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
775     return 0;
776 }
777 
ReleaseAllReceiver()778 void BufferDispatcher::ReleaseAllReceiver()
779 {
780     SHARING_LOGD("trace.");
781     std::lock_guard<std::mutex> locker(notifyMutex_);
782     for (auto &[recvId, notifier] : notifiers_) {
783         if (notifier != nullptr && notifier->GetBufferReceiver() != nullptr) {
784             DetachReceiver(notifier->GetBufferReceiver());
785         }
786     }
787 
788     notifiers_.clear();
789     SHARING_LOGD("release all receiver out.");
790 }
791 
SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)792 void BufferDispatcher::SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)
793 {
794     SHARING_LOGD("trace.");
795     listener_ = listener;
796     writingTimer_->StartTimer(WRITING_TIMTOUT, "waiting for continuous data inputs", [this]() {
797         if (!writing_) {
798             SHARING_LOGI("writing timeout");
799             auto listener = listener_.lock();
800             if (listener) {
801                 listener->OnWriteTimeout();
802             }
803         } else {
804             SHARING_LOGI("restart timer");
805             writing_ = false;
806         }
807     }, true);
808 }
809 
GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)810 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)
811 {
812     SHARING_LOGD("trace.");
813     return GetNotifierByReceiverId(receiver->GetReceiverId());
814 }
815 
GetNotifierByReceiverId(uint32_t receiverId)816 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverId(uint32_t receiverId)
817 {
818     MEDIA_LOGD("trace.");
819     std::lock_guard<std::mutex> locker(notifyMutex_);
820     if (notifiers_.find(receiverId) != notifiers_.end()) {
821         return notifiers_[receiverId];
822     }
823 
824     return nullptr;
825 }
826 
ReadBufferData(uint32_t receiverId,MediaType type,std::function<void (const MediaData::Ptr & data)> cb)827 int32_t BufferDispatcher::ReadBufferData(uint32_t receiverId, MediaType type,
828                                          std::function<void(const MediaData::Ptr &data)> cb)
829 {
830     MEDIA_LOGD("in, receiverId: %{public}u.", receiverId);
831     auto notifier = GetNotifierByReceiverId(receiverId);
832     if (notifier == nullptr) {
833         SHARING_LOGE("notifier is nullptr.");
834         return -1;
835     }
836 
837     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
838     uint32_t readIndex = notifier->GetReceiverReadIndex(type);
839     if (readIndex >= circularBuffer_.size()) {
840         SHARING_LOGE("Read wrong index exceed size.");
841         return -1;
842     }
843 
844     if ((keyOnly_ || notifier->IsKeyModeReceiver()) && type == MEDIA_TYPE_VIDEO &&
845         !IsKeyVideoFrame(circularBuffer_.at(readIndex))) {
846         UpdateReceiverReadIndex(receiverId, readIndex, type);
847         SHARING_LOGE("Read Non Key Video in KeyOnly Mode index: %{public}u.", readIndex);
848         return -1;
849     }
850 
851     if (IsDataReaded(receiverId, circularBuffer_.at(readIndex))) {
852         UpdateReceiverReadIndex(receiverId, readIndex, type);
853         return -1;
854     }
855 
856     readIndex = notifier->GetReceiverReadIndex(type);
857     if (readIndex >= circularBuffer_.size()) {
858         return -1;
859     }
860 
861     auto data = circularBuffer_.at(readIndex);
862     if (data == nullptr) {
863         SHARING_LOGE("BufferDispatcher Read data nullptr.");
864         return -1;
865     }
866 
867     if (IsKeyVideoFrame(data)) {
868         int32_t bufferVideoCacheCnt = 0;
869         for (size_t i = readIndex + 1; i < circularBuffer_.size(); i++) {
870             if (circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO)
871                 bufferVideoCacheCnt++;
872         }
873         MEDIA_LOGD("TEST STATISTIC:interval: buffer cache %{public}d frames.", bufferVideoCacheCnt);
874     }
875 
876     SetReceiverReadFlag(receiverId, data);
877     if (cb != nullptr) {
878         cb(data->mediaData);
879     }
880 
881     MEDIA_LOGD(
882         "Current data readed, Recvid:%{public}d, remain %{public}zu data, readIndex: %{public}u, "
883         "readtype: %{public}d, diff: %{public}zu.",
884         receiverId, circularBuffer_.size(), readIndex, int32_t(type), circularBuffer_.size() - readIndex);
885     UpdateReceiverReadIndex(receiverId, readIndex, type);
886     return 0;
887 }
888 
InputData(const MediaData::Ptr & data)889 int32_t BufferDispatcher::InputData(const MediaData::Ptr &data)
890 {
891     MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".", data->mediaType,
892                data->keyFrame, data->pts);
893     if (data == nullptr || data->buff == nullptr) {
894         SHARING_LOGE("data nullptr.");
895         return -1;
896     }
897 
898     if (!writing_) {
899         writing_ = true;
900     }
901 
902     DataSpec::Ptr dataSpec = std::make_shared<DataSpec>();
903     dataSpec->mediaData = data;
904     if (dataMode_ == MEDIA_AUDIO_ONLY) {
905         WriteDataIntoBuffer(dataSpec);
906     } else {
907         PreProcessDataSpec(dataSpec);
908     }
909 
910     if (circularBuffer_.size() > 0) {
911         MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".",
912                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->mediaType,
913                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->keyFrame,
914                    circularBuffer_[circularBuffer_.size() - 1]->mediaData->pts);
915     }
916 
917     MEDIA_LOGD(
918         "dispatcherId: %{public}u, after InputData, current circularBuffer_ size: %{public}zu, "
919         "idleVideoBuffer_ size: %{public}zu, idle_audioBuffer_ size: %{public}zu, "
920         "keyFrame: %{public}s, data size: %{public}d, adataCount:%{public}d.",
921         GetDispatcherId(), circularBuffer_.size(), idleVideoBuffer_.size(), idleAudioBuffer_.size(),
922         data->keyFrame ? "true" : "false", data->buff->Size(), audioFrameCnt_);
923     return 0;
924 }
925 
PreProcessDataSpec(const DataSpec::Ptr & dataSpec)926 void BufferDispatcher::PreProcessDataSpec(const DataSpec::Ptr &dataSpec)
927 {
928     MEDIA_LOGD("trace.");
929     if (waitingKey_) {
930         if (IsAudioData(dataSpec)) {
931         } else if (!IsKeyVideoFrame(dataSpec)) {
932             SHARING_LOGD("BufferDispatcher Waiting First Key Video Frame.");
933             return;
934         } else {
935             SHARING_LOGD("BufferDispatcher received first key video frame and restore from uncontinuous...Flushing.");
936             FlushBuffer();
937             baseCounter_++;
938             capacityEvaluating_ = true;
939             waitingKey_ = false;
940         }
941     } else {
942         if (capacityEvaluating_) {
943             ReCalculateCapacity(IsKeyVideoFrame(dataSpec));
944         }
945     }
946 
947     WriteDataIntoBuffer(dataSpec);
948 }
949 
WriteDataIntoBuffer(const DataSpec::Ptr & data)950 int32_t BufferDispatcher::WriteDataIntoBuffer(const DataSpec::Ptr &data)
951 {
952     MEDIA_LOGD("trace.");
953     if (data->mediaData == nullptr || data->mediaData->buff == nullptr) {
954         SHARING_LOGE("null data.");
955         return -1;
956     }
957 
958     if (NeedExtendToDBCapacity()) {
959         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d  CRTL_SIZE.", doubleBufferCapacity_);
960         SetBufferCapacity(doubleBufferCapacity_);
961     }
962 
963     if (NeedRestoreToNormalCapacity()) {
964         std::unique_lock<std::shared_mutex> locker(bufferMutex_);
965         int32_t popSize = circularBuffer_.size() - INITIAL_BUFFER_CAPACITY;
966         for (int32_t i = 0; i < popSize; i++) {
967             if (HeadFrameNeedReserve()) {
968                 MEDIA_LOGW(
969                     "dispatcherId: %{public}u, need reserve but pop, mediaType: "
970                     "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64".",
971                     GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
972                     circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
973                     circularBuffer_.front()->mediaData->pts);
974             }
975 
976             MEDIA_LOGW(
977                 "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: "
978                 "%{public}s, pts: %{public}" PRIu64", reserveFlag: %{public}x.",
979                 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
980                 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
981                 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
982             circularBuffer_.pop_front();
983             audioFrameCnt_--;
984             UpdateIndex();
985         }
986 
987         baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
988         doubleBufferCapacity_ = INITIAL_BUFFER_CAPACITY * 2; //2 : increasement
989         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d  NORMALSIZE.", baseBufferCapacity_);
990         circularBuffer_.set_capacity(baseBufferCapacity_);
991     }
992 
993     if (IsKeyVideoFrame(data) && !keyOnly_) {
994         EraseOldGopDatas();
995     }
996 
997     bool updateIndexFlag = false;
998     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
999     if (circularBuffer_.size() >= circularBuffer_.capacity()) {
1000         updateIndexFlag = true;
1001     }
1002 
1003     if (updateIndexFlag) {
1004         uint32_t nextDeleteIndex = 1;
1005         if (IsVideoData(data)) {
1006             nextDeleteIndex = FindNextDeleteVideoIndex();
1007         }
1008 
1009         for (size_t i = 0; i <= nextDeleteIndex; i++) {
1010             MediaType headType = circularBuffer_.front()->mediaData->mediaType;
1011             DataSpec::Ptr retBuff = circularBuffer_.front();
1012             if (HeadFrameNeedReserve()) {
1013                 MEDIA_LOGW(
1014                     "dispatcherId: %{public}u, need reserve but pop, mediaType: "
1015                     "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64".",
1016                     GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1017                     retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1018             }
1019 
1020             MEDIA_LOGW(
1021                 "dispatcherId: %{public}u, delete data, mediaType: %{public}d, "
1022                 "keyFrame: %{public}s, pts: %{public}" PRIu64", reserveFlag: %{public}x.",
1023                 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1024                 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1025                 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1026 
1027             circularBuffer_.pop_front();
1028             ReturnIdleBuffer(retBuff);
1029             headType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1030             UpdateIndex();
1031         }
1032     }
1033 
1034     data->reserveFlag = 0;
1035     MEDIA_LOGD("WriteDataIntoBuffer data type: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1036                ", cur_size: %{public}zu, capacity: %{public}zu dispatcher[%{public}u].",
1037                int32_t(data->mediaData->mediaType), data->mediaData->keyFrame ? "true" : "false",
1038                data->mediaData->pts, circularBuffer_.size(), circularBuffer_.capacity(), GetDispatcherId());
1039     circularBuffer_.push_back(data);
1040     if (IsAudioData(data)) {
1041         lastAudioIndex_ = circularBuffer_.size() - 1;
1042         ActiveDataRef(MEDIA_TYPE_AUDIO, false);
1043         audioFrameCnt_++;
1044     } else {
1045         lastVideoIndex_ = circularBuffer_.size() - 1;
1046         if (!keyOnly_ || (keyOnly_ && IsKeyVideoFrame(data))) {
1047             ActiveDataRef(MEDIA_TYPE_VIDEO, IsKeyVideoFrame(data));
1048         }
1049         videoFrameCnt_++;
1050     }
1051 
1052     if (audioNeedActivate_ && IsAudioData(data)) {
1053         MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By AudioData.");
1054         ActivateReceiverIndex(circularBuffer_.size() - 1, MEDIA_TYPE_AUDIO);
1055     }
1056 
1057     if (IsKeyVideoFrame(data)) {
1058         uint32_t keyIndex = circularBuffer_.size() - 1;
1059         {
1060             std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1061             keyIndexList_.push_back(keyIndex);
1062         }
1063         if (videoNeedActivate_) {
1064             MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By KeyVideo Frame index: %{public}d.", keyIndex);
1065             ActivateReceiverIndex(keyIndex, MEDIA_TYPE_VIDEO);
1066         }
1067         if (keyRedirect_) {
1068             OnKeyRedirect();
1069             EnableKeyRedirect(false);
1070         }
1071     }
1072 
1073     continueNotify_ = true;
1074     dataCV_.notify_one();
1075     return 0;
1076 }
1077 
EraseOldGopDatas()1078 void BufferDispatcher::EraseOldGopDatas()
1079 {
1080     MEDIA_LOGD("BufferDispatcher Delete old datas In.");
1081     if (dataMode_ == MEDIA_AUDIO_ONLY) {
1082         FlushBuffer();
1083         return;
1084     }
1085 
1086     std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1087     uint32_t nextKey = 0;
1088     {
1089         std::lock_guard<std::mutex> lock(notifyMutex_);
1090         if (!keyIndexList_.empty() && keyIndexList_.back() > 0) {
1091             MEDIA_LOGD("find next key listsize %{public}zu, back:%{public}d.", keyIndexList_.size(),
1092                        keyIndexList_.back());
1093             nextKey = keyIndexList_.back();
1094             keyIndexList_.clear();
1095             keyIndexList_.push_back(nextKey);
1096         }
1097     }
1098 
1099     MEDIA_LOGD("erase between 0 to next Video Frame %{public}d.", nextKey);
1100     DeleteHeadDatas(nextKey, false);
1101     nextKey = FindNextDeleteVideoIndex();
1102     DeleteHeadDatas(nextKey, true);
1103     std::string indexs;
1104 
1105     MEDIA_LOGD("circularBuffer_ size: %{public}zu.", circularBuffer_.size());
1106     for (auto &keyIndex : keyIndexList_) {
1107         indexs += std::to_string(keyIndex) + ", ";
1108         MEDIA_LOGD("keyIndex update to %{public}d.", keyIndex);
1109     }
1110 
1111     MEDIA_LOGD("current keyIndex: %{public}s.", indexs.c_str());
1112 }
1113 
DeleteHeadDatas(uint32_t size,bool forceDelete)1114 void BufferDispatcher::DeleteHeadDatas(uint32_t size, bool forceDelete)
1115 {
1116     MEDIA_LOGD("trace.");
1117     if (size <= 0) {
1118         MEDIA_LOGW("invalid Size, dispatcherId: %{public}u!", GetDispatcherId());
1119         return;
1120     }
1121 
1122     for (size_t i = 0; i < size; i++) {
1123         if (HeadFrameNeedReserve() && !forceDelete) {
1124             MEDIA_LOGD("index %{public}zu need reserve.", i);
1125             break;
1126         }
1127         if (circularBuffer_.empty()) {
1128             return;
1129         }
1130         DataSpec::Ptr retBuff = circularBuffer_.front();
1131         MEDIA_LOGD("BufferDispatcher pop out headtype  %{public}d.", retBuff->mediaData->mediaType);
1132         retBuff->mediaData->mediaType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1133         if (HeadFrameNeedReserve()) {
1134             MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1135                  "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1136                  GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1137                  retBuff->mediaData->keyFrame ? "true" : "false",
1138                  retBuff->mediaData->pts);
1139         }
1140 
1141         MEDIA_LOGW(
1142             "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1143             ", reserveFlag: %{public}x.",
1144             GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1145             circularBuffer_.front()->mediaData->keyFrame ? "true" : "false", circularBuffer_.front()->mediaData->pts,
1146             circularBuffer_.front()->reserveFlag.load());
1147         circularBuffer_.pop_front();
1148         ReturnIdleBuffer(retBuff);
1149         UpdateIndex();
1150     }
1151 
1152     if (circularBuffer_.size() < baseBufferCapacity_ && circularBuffer_.capacity() > baseBufferCapacity_) {
1153         MEDIA_LOGD("capacity return to base %{public}d.", baseBufferCapacity_);
1154         circularBuffer_.set_capacity(baseBufferCapacity_);
1155     }
1156 }
1157 
UpdateIndex()1158 void BufferDispatcher::UpdateIndex()
1159 {
1160     MEDIA_LOGD("trace.");
1161     std::lock_guard<std::mutex> locker(notifyMutex_);
1162     if (!keyIndexList_.empty() && keyIndexList_.front() == 0) {
1163         keyIndexList_.pop_front();
1164         MEDIA_LOGD("BufferDispatcher pop out first  0 keyIndex after listsize %{public}zu.", keyIndexList_.size());
1165     }
1166 
1167     for (auto &keyIndex : keyIndexList_) {
1168         if (keyIndex > 0) {
1169             keyIndex--;
1170         }
1171     }
1172 
1173     for (auto &[recvId, notifier] : notifiers_) {
1174         if (notifier->videoIndex > 0 && notifier->videoIndex != INVALID_INDEX) {
1175             notifier->videoIndex--;
1176         }
1177         if (notifier->audioIndex > 0 && notifier->audioIndex != INVALID_INDEX) {
1178             notifier->audioIndex--;
1179         }
1180     }
1181 
1182     if (lastVideoIndex_ > 0 && lastVideoIndex_ != INVALID_INDEX) {
1183         lastVideoIndex_--;
1184     }
1185 
1186     if (lastAudioIndex_ > 0 && lastAudioIndex_ != INVALID_INDEX) {
1187         lastAudioIndex_--;
1188     }
1189 }
1190 
FindNextDeleteVideoIndex()1191 uint32_t BufferDispatcher::FindNextDeleteVideoIndex()
1192 {
1193     MEDIA_LOGD("trace.");
1194     for (size_t i = 0; i < circularBuffer_.size(); i++) {
1195         if (circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
1196             return i;
1197         }
1198     }
1199 
1200     return 0;
1201 }
1202 
FindLastIndex(MediaType type)1203 uint32_t BufferDispatcher::FindLastIndex(MediaType type)
1204 {
1205     SHARING_LOGD("trace.");
1206     if (circularBuffer_.empty()) {
1207         return INVALID_INDEX;
1208     }
1209 
1210     return type == MEDIA_TYPE_AUDIO ? lastAudioIndex_ : lastVideoIndex_;
1211 }
1212 
UpdateReceiverReadIndex(uint32_t receiverId,const uint32_t readIndex,MediaType type)1213 void BufferDispatcher::UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type)
1214 {
1215     MEDIA_LOGD("trace.");
1216     uint32_t nextIndex = FindNextIndex(readIndex, type, receiverId);
1217     bool noAvaliableData = false;
1218     if (nextIndex == readIndex) {
1219         noAvaliableData = true;
1220     }
1221 
1222     auto notifier = GetNotifierByReceiverId(receiverId);
1223     if (notifier == nullptr) {
1224         SHARING_LOGE("notifier is nullptr.");
1225         return;
1226     }
1227 
1228     bool readOver = circularBuffer_.size() - readIndex < 3;
1229     if (readOver && notifier->NeedAcceleration() && type == MEDIA_TYPE_VIDEO) {
1230         SHARING_LOGD("BufferDispatcher SendAccelerationDone.");
1231         notifier->SendAccelerationDone();
1232     }
1233 
1234     notifier->SetNeedUpdate(noAvaliableData, type);
1235 
1236     if (type == MEDIA_TYPE_VIDEO) {
1237         notifier->videoIndex = nextIndex;
1238     } else if (type == MEDIA_TYPE_AUDIO) {
1239         notifier->audioIndex = nextIndex;
1240     } else {
1241         notifier->videoIndex = nextIndex;
1242         notifier->audioIndex = nextIndex;
1243     }
1244 
1245     MEDIA_LOGD("After UpdateReceiverReadIndex  type %{public}d, aindex %{public}d, vindex %{public}d.", type,
1246                notifier->audioIndex, notifier->videoIndex);
1247 }
1248 
FindNextIndex(uint32_t index,MediaType type)1249 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type)
1250 {
1251     MEDIA_LOGD("trace.");
1252     if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1253         return index;
1254     }
1255 
1256     if (type == MEDIA_TYPE_AV) {
1257         return index + 1;
1258     }
1259 
1260     for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1261         if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1262             if (keyOnly_ && type == MEDIA_TYPE_VIDEO && !IsKeyVideoFrame(circularBuffer_[i])) {
1263                 continue;
1264             } else {
1265                 return i;
1266             }
1267         }
1268     }
1269 
1270     return index;
1271 }
1272 
FindNextIndex(uint32_t index,MediaType type,uint32_t receiverId)1273 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId)
1274 {
1275     MEDIA_LOGD("trace.");
1276     if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1277         return index;
1278     }
1279 
1280     if (type == MEDIA_TYPE_AV) {
1281         return index + 1;
1282     }
1283 
1284     auto notifier = GetNotifierByReceiverId(receiverId);
1285     if (notifier == nullptr) {
1286         SHARING_LOGE("FindNextIndex GetNotifier nullptr.");
1287         return INVALID_INDEX;
1288     }
1289 
1290     bool keyModeReceiver = notifier->IsKeyModeReceiver();
1291     for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1292         if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1293             if ((keyOnly_ || keyModeReceiver) && type == MEDIA_TYPE_VIDEO) {
1294                 if (!IsKeyVideoFrame(circularBuffer_[i])) {
1295                     continue;
1296                 } else {
1297                     for (size_t bIndex = index + 1; bIndex < i; bIndex++) {
1298                         SetReceiverReadFlag(receiverId, circularBuffer_[bIndex]);
1299                     }
1300                     return i;
1301                 }
1302             } else {
1303                 return i;
1304             }
1305         }
1306     }
1307 
1308     return index;
1309 }
1310 
ResetAllIndex()1311 void BufferDispatcher::ResetAllIndex()
1312 {
1313     SHARING_LOGD("trace.");
1314     std::lock_guard<std::mutex> locker(notifyMutex_);
1315     keyIndexList_.clear();
1316     for (auto &[recvId, notifier] : notifiers_) {
1317         notifier->videoIndex = INVALID_INDEX;
1318         notifier->audioIndex = INVALID_INDEX;
1319     }
1320 
1321     videoNeedActivate_ = true;
1322     audioNeedActivate_ = true;
1323     lastAudioIndex_ = INVALID_INDEX;
1324     lastVideoIndex_ = INVALID_INDEX;
1325 }
1326 
IsDataReaded(uint32_t receiverId,DataSpec::Ptr & dataSpec)1327 bool BufferDispatcher::IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1328 {
1329     MEDIA_LOGD("trace.");
1330     uint32_t index = FindReceiverIndex(receiverId);
1331     if (index == INVALID_INDEX) {
1332         return false;
1333     }
1334 
1335     return dataSpec->reserveFlag & RECV_FLAG_BASE << index;
1336 }
1337 
IsVideoData(const DataSpec::Ptr & dataSpec)1338 bool BufferDispatcher::IsVideoData(const DataSpec::Ptr &dataSpec)
1339 {
1340     MEDIA_LOGD("trace.");
1341     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1342         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1343         return false;
1344     }
1345 
1346     return dataSpec->mediaData->mediaType == MEDIA_TYPE_VIDEO;
1347 }
1348 
IsAudioData(const DataSpec::Ptr & dataSpec)1349 bool BufferDispatcher::IsAudioData(const DataSpec::Ptr &dataSpec)
1350 {
1351     MEDIA_LOGD("trace.");
1352     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1353         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1354         return false;
1355     }
1356 
1357     return dataSpec->mediaData->mediaType == MEDIA_TYPE_AUDIO;
1358 }
1359 
IsKeyVideoFrame(const DataSpec::Ptr & dataSpec)1360 bool BufferDispatcher::IsKeyVideoFrame(const DataSpec::Ptr &dataSpec)
1361 {
1362     MEDIA_LOGD("trace.");
1363     if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1364         SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1365         return false;
1366     }
1367 
1368     return IsVideoData(dataSpec) && dataSpec->mediaData->keyFrame;
1369 }
1370 
HeadFrameNeedReserve()1371 bool BufferDispatcher::HeadFrameNeedReserve()
1372 {
1373     MEDIA_LOGD("trace.");
1374     if (!circularBuffer_.empty()) {
1375         uint8_t temp = readRefFlag_;
1376         MEDIA_LOGD("IsHeadFrameNeedReserve Head reserveFlag %{public}d readRefFlag_ %{public}d.",
1377                    circularBuffer_.front()->reserveFlag.load(), readRefFlag_);
1378         return temp ^ circularBuffer_.front()->reserveFlag;
1379     }
1380 
1381     return false;
1382 }
1383 
NeedExtendToDBCapacity()1384 bool BufferDispatcher::NeedExtendToDBCapacity()
1385 {
1386     MEDIA_LOGD("trace.");
1387     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1388     return (circularBuffer_.size() >= circularBuffer_.capacity() &&
1389             circularBuffer_.capacity() < doubleBufferCapacity_ && HeadFrameNeedReserve());
1390 }
1391 
NeedRestoreToNormalCapacity()1392 bool BufferDispatcher::NeedRestoreToNormalCapacity()
1393 {
1394     MEDIA_LOGD("trace.");
1395     std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1396     return audioFrameCnt_ >= circularBuffer_.capacity() && circularBuffer_.capacity() != INITIAL_BUFFER_CAPACITY;
1397 }
1398 
ReCalculateCapacity(bool keyFrame)1399 void BufferDispatcher::ReCalculateCapacity(bool keyFrame)
1400 {
1401     MEDIA_LOGD("trace.");
1402     baseCounter_++;
1403     if (baseCounter_ >= maxBufferCapacity_) {
1404         SHARING_LOGE("BufferDispatcher too many Audiodata need Set Capacity to default.");
1405     }
1406 
1407     if (baseCounter_ >= circularBuffer_.capacity() && !keyFrame) {
1408         uint32_t tmpSize = circularBuffer_.capacity() + bufferCapacityIncrement_ < maxBufferCapacity_
1409                                ? circularBuffer_.capacity() + bufferCapacityIncrement_
1410                                : maxBufferCapacity_;
1411         doubleBufferCapacity_ = tmpSize * 2 < maxBufferCapacity_ ? tmpSize * 2 : maxBufferCapacity_; // 2: increasement
1412         SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d in adaptive capacity calculating.", tmpSize);
1413         SetBufferCapacity(tmpSize);
1414         return;
1415     }
1416 
1417     if (keyFrame) {
1418         baseBufferCapacity_ = baseCounter_ + bufferCapacityIncrement_ < maxBufferCapacity_
1419                                   ? baseCounter_ + bufferCapacityIncrement_
1420                                   : maxBufferCapacity_;
1421         if (baseBufferCapacity_ < INITIAL_BUFFER_CAPACITY) {
1422             baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1423         }
1424         doubleBufferCapacity_ = baseBufferCapacity_ * 2 < maxBufferCapacity_ // 2: increasement
1425                                 ? baseBufferCapacity_ * 2 // 2: increasement
1426                                 : maxBufferCapacity_;
1427         gop_ = baseCounter_;
1428         capacityEvaluating_ = gop_ > 0 ? false : true;
1429         SetBufferCapacity(baseBufferCapacity_);
1430         baseCounter_ = 0;
1431         SHARING_LOGD(
1432             "The gop is %{public}d and BufferDispatcher buffer Extended to %{public}d on base capacity confirmed.",
1433             GetCurrentGop(), baseBufferCapacity_);
1434     }
1435 }
1436 
NotifyThreadWorker(void * userParam)1437 int32_t BufferDispatcher::NotifyThreadWorker(void *userParam)
1438 {
1439     SHARING_LOGI("BufferDispatcher DataNotifier thread in.");
1440     BufferDispatcher *dispatcher = (BufferDispatcher *)userParam;
1441     while (dispatcher->running_) {
1442         std::unique_lock<std::mutex> locker(dispatcher->notifyMutex_);
1443         uint32_t notifyRef = dispatcher->dataBitRef_ & dispatcher->recvBitRef_;
1444         MEDIA_LOGD("DataBitRef %{public}u   recvBitRef_ %{public}d   notifyRef_ %{public}d.",
1445                    dispatcher->dataBitRef_.load(), dispatcher->recvBitRef_.load(), notifyRef);
1446 
1447         for (auto &[recvId, notifier] : dispatcher->notifiers_) {
1448             auto index = notifier->GetReadIndex();
1449             if (((RECV_FLAG_BASE << (index * 2)) & notifyRef) || // 2: fix offset, get data type
1450                 ((RECV_FLAG_BASE << (index * 2 + 1)) & notifyRef)) { // 2: fix offset, get data type
1451                 MediaType notifyType;
1452                 if (notifier->IsMixedReceiver()) {
1453                     notifyType = MEDIA_TYPE_AV;
1454                 } else {
1455                     notifyType = (((RECV_FLAG_BASE << (index * 2)) & notifyRef) ? // 2: fix offset, get data type
1456                                     MEDIA_TYPE_AUDIO : MEDIA_TYPE_VIDEO); // 2: fix offset, get data type
1457                 }
1458                 MEDIA_LOGD("notify the receiveId: %{public}d, notifyType: %{public}d, notifyRef: %{public}x.", recvId,
1459                            notifyType, notifyRef);
1460                 notifier->NotifyDataReceiver(notifyType);
1461             }
1462         }
1463 
1464         dispatcher->dataCV_.wait(locker, [&dispatcher]() { return dispatcher->continueNotify_.load(); });
1465         dispatcher->continueNotify_ = false;
1466     }
1467 
1468     return 0;
1469 }
1470 
NotifyReadReady(uint32_t receiverId,MediaType type)1471 void BufferDispatcher::NotifyReadReady(uint32_t receiverId, MediaType type)
1472 {
1473     MEDIA_LOGD("trace.");
1474     auto notifier = GetNotifierByReceiverId(receiverId);
1475     if (notifier == nullptr) {
1476         SHARING_LOGE("notifier is nullptr.");
1477         return;
1478     }
1479 
1480     std::shared_lock<std::shared_mutex> lock(bufferMutex_);
1481     std::unique_lock<std::mutex> locker(notifyMutex_);
1482     if (type == MEDIA_TYPE_AV) {
1483         SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, true);
1484         SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, true);
1485     } else {
1486         SetReceiverReadRef(receiverId, type, true);
1487     }
1488 
1489     bool dataAvaliable = notifier->DataAvailable(type);
1490     MEDIA_LOGW("receiverId %{public}d  MediaType %{public}d  dataAvaliable %{public}d.", receiverId, type,
1491                dataAvaliable);
1492     if (type == MEDIA_TYPE_AV) {
1493         SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, dataAvaliable);
1494         SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, dataAvaliable);
1495     } else {
1496         SetReceiverDataRef(receiverId, type, dataAvaliable);
1497     }
1498 
1499     if (!dataAvaliable) {
1500         return;
1501     }
1502 
1503     continueNotify_ = true;
1504     dataCV_.notify_one();
1505 }
1506 
SetDataRef(uint32_t bitref)1507 void BufferDispatcher::SetDataRef(uint32_t bitref)
1508 {
1509     SHARING_LOGD("trace.");
1510     dataBitRef_ &= bitref;
1511 }
1512 
GetDataRef()1513 uint32_t BufferDispatcher::GetDataRef()
1514 {
1515     SHARING_LOGD("trace.");
1516     return dataBitRef_;
1517 }
1518 
SetReadRef(uint32_t bitref)1519 void BufferDispatcher::SetReadRef(uint32_t bitref)
1520 {
1521     SHARING_LOGD("trace.");
1522     recvBitRef_ &= bitref;
1523 }
1524 
GetReadRef()1525 uint32_t BufferDispatcher::GetReadRef()
1526 {
1527     SHARING_LOGD("trace.");
1528     return recvBitRef_;
1529 }
1530 
ActiveDataRef(MediaType type,bool keyFrame)1531 void BufferDispatcher::ActiveDataRef(MediaType type, bool keyFrame)
1532 {
1533     MEDIA_LOGD("trace.");
1534     std::unique_lock<std::mutex> locker(notifyMutex_);
1535     uint32_t bitRef = 0x0000;
1536     for (auto &[recvId, notifier] : notifiers_) {
1537         auto index = notifier->GetReadIndex();
1538         if (type == MEDIA_TYPE_AUDIO) {
1539             bitRef |= RECV_FLAG_BASE << (index * 2); // 2: fix offset, get audio notifyer id
1540             continue;
1541         }
1542         bool keyModeReceiver = false;
1543         keyModeReceiver = notifier->IsKeyModeReceiver();
1544         if (keyFrame && keyModeReceiver && keyIndexList_.empty()) {
1545             notifier->videoIndex = circularBuffer_.size() - 1;
1546         }
1547         if ((!keyModeReceiver || (keyModeReceiver && keyFrame))) {
1548             if (index != INVALID_INDEX) {
1549                 bitRef |= RECV_FLAG_BASE << (index * 2 + 1); // 2: fix offset, get video notifyer id
1550             }
1551         }
1552     }
1553 
1554     dataBitRef_ |= bitRef;
1555 }
1556 
SetReceiverDataRef(uint32_t receiverId,MediaType type,bool ready)1557 void BufferDispatcher::SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready)
1558 {
1559     MEDIA_LOGD("trace.");
1560     uint32_t index = FindReceiverIndex(receiverId);
1561     if (index == INVALID_INDEX) {
1562         return;
1563     }
1564 
1565     if (type == MEDIA_TYPE_AUDIO) {
1566         uint32_t audioBit = index * 2;
1567         uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1568         MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1569         if (!ready) {
1570             bitRef = ~bitRef;
1571             dataBitRef_ &= bitRef;
1572         } else {
1573             dataBitRef_ |= bitRef;
1574         }
1575     } else if (type == MEDIA_TYPE_VIDEO) {
1576         uint32_t videoBit = index * 2 + 1;
1577         uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1578         MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1579         if (!ready) {
1580             bitRef = ~bitRef;
1581             dataBitRef_ &= bitRef;
1582         } else {
1583             dataBitRef_ |= bitRef;
1584         }
1585     }
1586 }
1587 
SetReceiverReadRef(uint32_t receiverId,MediaType type,bool ready)1588 void BufferDispatcher::SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready)
1589 {
1590     MEDIA_LOGD("trace.");
1591     uint32_t index = FindReceiverIndex(receiverId);
1592     if (index == INVALID_INDEX) {
1593         return;
1594     }
1595 
1596     if (type == MEDIA_TYPE_AUDIO) {
1597         uint32_t audioBit = index * 2;
1598         uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1599         MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1600 
1601         if (!ready) {
1602             bitRef = ~bitRef;
1603             recvBitRef_ &= bitRef;
1604         } else {
1605             recvBitRef_ |= bitRef;
1606         }
1607     } else if (type == MEDIA_TYPE_VIDEO) {
1608         uint32_t videoBit = index * 2 + 1;
1609         uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1610         MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1611 
1612         if (!ready) {
1613             bitRef = ~bitRef;
1614             recvBitRef_ &= bitRef;
1615         } else {
1616             recvBitRef_ |= bitRef;
1617         }
1618     }
1619 }
1620 
GetReceiverDataRef(uint32_t receiverId)1621 uint32_t BufferDispatcher::GetReceiverDataRef(uint32_t receiverId)
1622 {
1623     SHARING_LOGD("trace.");
1624     return 0;
1625 }
1626 
GetReceiverReadRef(uint32_t receiverId)1627 uint32_t BufferDispatcher::GetReceiverReadRef(uint32_t receiverId)
1628 {
1629     SHARING_LOGD("trace.");
1630     uint32_t retBitRef = GetReceiverIndexRef(receiverId);
1631     retBitRef &= recvBitRef_;
1632     return retBitRef;
1633 }
1634 
GetReceiverIndexRef(uint32_t receiverId)1635 uint32_t BufferDispatcher::GetReceiverIndexRef(uint32_t receiverId)
1636 {
1637     SHARING_LOGD("trace.");
1638     uint32_t index = FindReceiverIndex(receiverId);
1639     uint32_t audioBit = index * 2;
1640     uint32_t videoBit = index * 2 + 1;
1641     uint32_t retBitRef = 0x0000;
1642     retBitRef |= RECV_FLAG_BASE << audioBit;
1643     retBitRef |= RECV_FLAG_BASE << videoBit;
1644     return retBitRef;
1645 }
1646 
ClearReadBit(uint32_t receiverId,MediaType type)1647 void BufferDispatcher::ClearReadBit(uint32_t receiverId, MediaType type)
1648 {
1649     MEDIA_LOGD("trace.");
1650     std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1651     if (type != MEDIA_TYPE_AV) {
1652         SetReceiverReadRef(receiverId, type, false);
1653     } else {
1654         SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
1655         SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
1656     }
1657 }
1658 
ClearDataBit(uint32_t receiverId,MediaType type)1659 void BufferDispatcher::ClearDataBit(uint32_t receiverId, MediaType type)
1660 {
1661     MEDIA_LOGD("trace.");
1662     std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1663     if (type != MEDIA_TYPE_AV) {
1664         SetReceiverDataRef(receiverId, type, false);
1665     } else {
1666         SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, false);
1667         SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, false);
1668     }
1669 }
1670 
IsRead(uint32_t receiverId,uint32_t index)1671 bool BufferDispatcher::IsRead(uint32_t receiverId, uint32_t index)
1672 {
1673     MEDIA_LOGD("trace.");
1674     if (index >= circularBuffer_.size()) {
1675         return true;
1676     } else {
1677         return IsDataReaded(receiverId, circularBuffer_.at(index));
1678     }
1679 }
1680 
ActivateReceiverIndex(uint32_t index,MediaType type)1681 void BufferDispatcher::ActivateReceiverIndex(uint32_t index, MediaType type)
1682 {
1683     MEDIA_LOGD("trace.");
1684     std::unique_lock<std::mutex> lock(notifyMutex_);
1685     for (auto &[recvId, notifier] : notifiers_) {
1686         if (type == MEDIA_TYPE_VIDEO) {
1687             if (notifier->videoIndex == INVALID_INDEX) {
1688                 notifier->videoIndex = index;
1689                 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->videoIndex);
1690                 videoNeedActivate_ = false;
1691             }
1692         } else {
1693             if (notifier->audioIndex == INVALID_INDEX) {
1694                 notifier->audioIndex = index;
1695                 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->audioIndex);
1696                 audioNeedActivate_ = false;
1697             }
1698         }
1699     }
1700 }
UnlockWaitingReceiverIndex(MediaType type)1701 void BufferDispatcher::UnlockWaitingReceiverIndex(MediaType type)
1702 {
1703     SHARING_LOGD("trace.");
1704 }
1705 
SetReceiverReadFlag(uint32_t receiverId,DataSpec::Ptr & dataSpec)1706 void BufferDispatcher::SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1707 {
1708     MEDIA_LOGD("trace.");
1709     uint32_t index = FindReceiverIndex(receiverId);
1710     if (index != INVALID_INDEX) {
1711         dataSpec->reserveFlag |= RECV_FLAG_BASE << index;
1712         MEDIA_LOGW("mediaType: %{public}d, pts: %{public}" PRIu64
1713                    ", reserveFlag: %{public}x, receiverId: %{public}d, index: %{public}d.",
1714                    dataSpec->mediaData->mediaType, dataSpec->mediaData->pts, dataSpec->reserveFlag.load(), receiverId,
1715                    index);
1716     }
1717 }
1718 
CancelReserve()1719 void BufferDispatcher::CancelReserve()
1720 {
1721     SHARING_LOGD("trace.");
1722     for (auto &data : circularBuffer_) {
1723         data->reserveFlag = 0xff;
1724     }
1725 }
1726 
SetSpsNalu(MediaData::Ptr spsbuf)1727 void BufferDispatcher::SetSpsNalu(MediaData::Ptr spsbuf)
1728 {
1729     SHARING_LOGD("trace.");
1730     spsBuf_ = spsbuf;
1731 }
1732 
GetSPS()1733 const MediaData::Ptr BufferDispatcher::GetSPS()
1734 {
1735     MEDIA_LOGD("trace.");
1736     return spsBuf_;
1737 }
1738 
SetPpsNalu(MediaData::Ptr ppsbuf)1739 void BufferDispatcher::SetPpsNalu(MediaData::Ptr ppsbuf)
1740 {
1741     SHARING_LOGD("trace.");
1742     ppsBuf_ = ppsbuf;
1743 }
1744 
GetPPS()1745 const MediaData::Ptr BufferDispatcher::GetPPS()
1746 {
1747     MEDIA_LOGD("trace.");
1748     return ppsBuf_;
1749 }
1750 
GetCurrentGop()1751 uint32_t BufferDispatcher::GetCurrentGop()
1752 {
1753     SHARING_LOGD("trace.");
1754     return gop_;
1755 }
1756 
OnKeyRedirect()1757 void BufferDispatcher::OnKeyRedirect()
1758 {
1759     SHARING_LOGD("trace.");
1760     std::unique_lock<std::mutex> lock(notifyMutex_);
1761     if (keyIndexList_.empty()) {
1762         return;
1763     }
1764 
1765     auto nextIndex = keyIndexList_.back();
1766 
1767     for (auto &[recvId, notifier] : notifiers_) {
1768         if (notifier->IsKeyRedirectReceiver()) {
1769             SHARING_LOGD("receiverId: %{public}u, videoIndex: %{public}d, nextIndex: %{public}d.",
1770                          notifier->GetReceiverId(), notifier->videoIndex, nextIndex);
1771             auto curIndex = notifier->videoIndex;
1772             notifier->videoIndex = nextIndex;
1773             for (auto i = curIndex; i < nextIndex; i++) {
1774                 SetReceiverReadFlag(notifier->GetReceiverId(), circularBuffer_[i]);
1775             }
1776             if (!rapidMode_) {
1777                 auto receiver = notifier->GetBufferReceiver();
1778                 if (receiver != nullptr) {
1779                     receiver->EnableKeyRedirect(false);
1780                 }
1781             }
1782         }
1783     }
1784 }
1785 
EnableRapidMode(bool enable)1786 void BufferDispatcher::EnableRapidMode(bool enable)
1787 {
1788     SHARING_LOGD("trace.");
1789     rapidMode_ = enable;
1790 }
1791 
1792 } // namespace Sharing
1793 } // namespace OHOS
1794