1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "buffer_dispatcher.h"
17 #include <cmath>
18 #include <cstdarg>
19 #include <cstdint>
20 #include "media_channel_def.h"
21 namespace OHOS {
22 namespace Sharing {
23
24 constexpr int32_t WRITING_TIMTOUT = 30;
SetSource(IBufferReader::Ptr dataReader)25 void BufferReceiver::SetSource(IBufferReader::Ptr dataReader)
26 {
27 SHARING_LOGD("trace.");
28 bufferReader_ = dataReader;
29 }
30
OnMediaDataNotify()31 int32_t BufferReceiver::OnMediaDataNotify()
32 {
33 SHARING_LOGD("BufferReceiver Media notified.");
34 dataReady_ = true;
35 notifyData_.notify_one();
36 return 0;
37 }
38
OnAudioDataNotify()39 int32_t BufferReceiver::OnAudioDataNotify()
40 {
41 MEDIA_LOGD("BufferReceiver Audio notified.");
42 nonBlockAudio_ = true;
43 notifyAudio_.notify_one();
44 return 0;
45 }
46
OnVideoDataNotify()47 int32_t BufferReceiver::OnVideoDataNotify()
48 {
49 MEDIA_LOGD("BufferReceiver Video notified.");
50 nonBlockVideo_ = true;
51 notifyVideo_.notify_one();
52 return 0;
53 }
54
IsMixedReceiver()55 bool BufferReceiver::IsMixedReceiver()
56 {
57 MEDIA_LOGD("trace.");
58 return mixed_;
59 }
60
RequestRead(MediaType type,std::function<void (const MediaData::Ptr & data)> cb)61 int32_t BufferReceiver::RequestRead(MediaType type, std::function<void(const MediaData::Ptr &data)> cb)
62 {
63 MEDIA_LOGD("trace.");
64 if (bufferReader_ == nullptr) {
65 SHARING_LOGE("BufferReceiver read failed null dispatcher.");
66 return -1;
67 }
68
69 if (firstMRead_ && type == MEDIA_TYPE_AV) {
70 bufferReader_->NotifyReadReady(GetReceiverId(), type);
71 mixed_ = true;
72 firstMRead_ = false;
73 } else if (firstARead_ && type == MEDIA_TYPE_AUDIO) {
74 bufferReader_->NotifyReadReady(GetReceiverId(), type);
75 firstARead_ = false;
76 } else if (firstVRead_ && type == MEDIA_TYPE_VIDEO) {
77 bufferReader_->NotifyReadReady(GetReceiverId(), type);
78 firstVRead_ = false;
79 }
80 std::unique_lock<std::mutex> locker(mutex_);
81 MEDIA_LOGD("BufferDispatcher NotifyThreadWorker before wait, receiverId: %{public}u.", GetReceiverId());
82 switch (type) {
83 /* cv will waiting if pred is false;
84 so set waiting audio pred (type != MEDIA_TYPE_AUDIO) to NOT block other type.*/
85 case MEDIA_TYPE_AUDIO:
86 MEDIA_LOGD("wait Audio, receiverId: %{public}u.", GetReceiverId());
87 notifyAudio_.wait(locker, [=]() { return nonBlockAudio_ || type != MEDIA_TYPE_AUDIO; });
88 nonBlockAudio_ = false;
89 break;
90 case MEDIA_TYPE_VIDEO:
91 MEDIA_LOGD("wait Video, receiverId: %{public}u.", GetReceiverId());
92 notifyVideo_.wait(locker, [=]() { return nonBlockVideo_ || type != MEDIA_TYPE_VIDEO; });
93 nonBlockVideo_ = false;
94 break;
95 case MEDIA_TYPE_AV:
96 MEDIA_LOGD("wait Mixed, receiverId: %{public}u.", GetReceiverId());
97 notifyData_.wait(locker, [=]() { return dataReady_ || type != MEDIA_TYPE_AV; });
98 dataReady_ = false;
99 break;
100 default:
101 return 0;
102 break;
103 }
104
105 bufferReader_->ClearDataBit(GetReceiverId(), type);
106 bufferReader_->ClearReadBit(GetReceiverId(), type);
107 MEDIA_LOGD("BufferDispatcher NotifyThreadWorker after wait start read, receiverId: %{public}u.", GetReceiverId());
108 int32_t ret = bufferReader_->ReadBufferData(GetReceiverId(), type, cb);
109 bufferReader_->NotifyReadReady(GetReceiverId(), type);
110 dataReady_ = false;
111
112 return ret;
113 }
114
NotifyReadStart()115 void BufferReceiver::NotifyReadStart()
116 {
117 SHARING_LOGD("receiverId: %{public}u notify start read.", GetReceiverId());
118 firstARead_ = true;
119 firstVRead_ = true;
120 firstMRead_ = true;
121 }
122
GetReceiverId()123 uint32_t BufferReceiver::GetReceiverId()
124 {
125 MEDIA_LOGD("trace.");
126 return GetId();
127 }
128
GetDispatcherId()129 uint32_t BufferReceiver::GetDispatcherId()
130 {
131 SHARING_LOGD("trace.");
132 if (bufferReader_) {
133 return bufferReader_->GetDispatcherId();
134 }
135
136 return 0;
137 }
138
NotifyReadStop()139 void BufferReceiver::NotifyReadStop()
140 {
141 SHARING_LOGD("receiverId: %{public}u notify stop read.", GetReceiverId());
142 nonBlockAudio_ = true;
143 nonBlockVideo_ = true;
144 dataReady_ = true;
145 notifyAudio_.notify_all();
146 notifyVideo_.notify_all();
147 notifyData_.notify_all();
148 }
149
EnableKeyMode(bool enable)150 void BufferReceiver::EnableKeyMode(bool enable)
151 {
152 SHARING_LOGD("bufferReceiver id %{public}u SetKeyOnlyMode %{public}d.", GetReceiverId(), enable);
153 if (keyOnly_ == true && enable == false) {
154 SHARING_LOGD("Set KeyOnlyMode false, need report fast read over.");
155 accelerationDone_ = true;
156 }
157
158 keyOnly_ = enable;
159 if (bufferReader_ && enable) {
160 bufferReader_->ClearDataBit(GetReceiverId(), MEDIA_TYPE_VIDEO);
161 }
162
163 auto listener = listener_.lock();
164 if (listener) {
165 listener->OnKeyModeNotify(enable);
166 }
167 }
168
IsKeyMode()169 bool BufferReceiver::IsKeyMode()
170 {
171 MEDIA_LOGD("trace.");
172 return keyOnly_;
173 }
174
IsKeyRedirect()175 bool BufferReceiver::IsKeyRedirect()
176 {
177 SHARING_LOGD("trace.");
178 return keyRedirect_;
179 }
180
GetSPS()181 const MediaData::Ptr BufferReceiver::GetSPS()
182 {
183 MEDIA_LOGD("trace.");
184 if (bufferReader_) {
185 return bufferReader_->GetSPS();
186 }
187
188 return nullptr;
189 }
190
GetPPS()191 const MediaData::Ptr BufferReceiver::GetPPS()
192 {
193 MEDIA_LOGD("trace.");
194 if (bufferReader_) {
195 return bufferReader_->GetPPS();
196 }
197
198 return nullptr;
199 }
200
NeedAcceleration()201 bool BufferReceiver::NeedAcceleration()
202 {
203 MEDIA_LOGD("trace.");
204 return accelerationDone_;
205 }
206
DisableAcceleration()207 void BufferReceiver::DisableAcceleration()
208 {
209 SHARING_LOGD("trace.");
210 accelerationDone_ = false;
211 }
212
SendAccelerationDone()213 void BufferReceiver::SendAccelerationDone()
214 {
215 SHARING_LOGD("trace.");
216 auto listener = listener_.lock();
217 if (listener) {
218 listener->OnAccelerationDoneNotify();
219 }
220 }
221
EnableKeyRedirect(bool enable)222 void BufferReceiver::EnableKeyRedirect(bool enable)
223 {
224 SHARING_LOGD("trace.");
225 if (bufferReader_ && enable) {
226 bufferReader_->EnableKeyRedirect(enable);
227 }
228 keyRedirect_ = enable;
229 }
230
SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)231 void BufferReceiver::SetBufferReceiverListener(std::weak_ptr<IBufferReceiverListener> listener)
232 {
233 SHARING_LOGD("trace.");
234 listener_ = listener;
235 }
236
237 using DataNotifier = BufferDispatcher::DataNotifier;
238
NotifyDataReceiver(MediaType type)239 void DataNotifier::NotifyDataReceiver(MediaType type)
240 {
241 MEDIA_LOGD("trace.");
242 if (receiver_.lock() == nullptr) {
243 SHARING_LOGE("target receiver NOT exist.");
244 return;
245 }
246
247 if (block_) {
248 return;
249 }
250
251 MEDIA_LOGD("notify target type %{public}d.", type);
252 switch (type) {
253 case MEDIA_TYPE_AUDIO:
254 GetBufferReceiver()->OnAudioDataNotify();
255 break;
256 case MEDIA_TYPE_VIDEO:
257 GetBufferReceiver()->OnVideoDataNotify();
258 break;
259 case MEDIA_TYPE_AV:
260 GetBufferReceiver()->OnMediaDataNotify();
261 break;
262 default:
263 SHARING_LOGI("none process case.");
264 break;
265 }
266 }
267
GetBufferReceiver()268 BufferReceiver::Ptr DataNotifier::GetBufferReceiver()
269 {
270 MEDIA_LOGD("trace.");
271 return receiver_.lock();
272 }
273
GetReceiverId()274 uint32_t DataNotifier::GetReceiverId()
275 {
276 MEDIA_LOGD("trace.");
277 auto receiver = receiver_.lock();
278 if (receiver == nullptr) {
279 SHARING_LOGE("target receiver NOT exist.");
280 return INVALID_INDEX;
281 }
282
283 return receiver->GetReceiverId();
284 }
285
SetListenDispatcher(IBufferReader::Ptr dispatcher)286 void DataNotifier::SetListenDispatcher(IBufferReader::Ptr dispatcher)
287 {
288 SHARING_LOGD("trace.");
289 dispatcher_ = dispatcher;
290 }
291
SetNotifyReceiver(BufferReceiver::Ptr receiver)292 void DataNotifier::SetNotifyReceiver(BufferReceiver::Ptr receiver)
293 {
294 SHARING_LOGD("trace.");
295 receiver_ = receiver;
296 }
297
SetBlock()298 void DataNotifier::SetBlock()
299 {
300 SHARING_LOGD("trace.");
301 block_ = true;
302 }
303
SetNeedUpdate(bool enable,MediaType type)304 void DataNotifier::SetNeedUpdate(bool enable, MediaType type)
305 {
306 MEDIA_LOGD("trace.");
307 if (type == MEDIA_TYPE_AUDIO) {
308 needUpdateAIndex = enable;
309 } else {
310 needUpdateVIndex = enable;
311 }
312 }
313
DataAvailable(MediaType type)314 bool DataNotifier::DataAvailable(MediaType type)
315 {
316 MEDIA_LOGD("trace.");
317 auto dispatcher = dispatcher_.lock();
318 if (dispatcher == nullptr) {
319 SHARING_LOGE("target dispatcher NOT exist.");
320 return false;
321 }
322
323 if (type == MEDIA_TYPE_AUDIO) {
324 return audioIndex != INVALID_INDEX &&
325 (audioIndex < dispatcher->GetLatestAudioIndex() || !dispatcher->IsRead(GetReceiverId(), audioIndex + 1));
326 } else if (type == MEDIA_TYPE_VIDEO) {
327 return videoIndex != INVALID_INDEX &&
328 (videoIndex < dispatcher->GetLatestVideoIndex() || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
329 } else {
330 return videoIndex != INVALID_INDEX &&
331 (videoIndex < dispatcher->GetBufferSize() - 1 || !dispatcher->IsRead(GetReceiverId(), videoIndex + 1));
332 }
333
334 return false;
335 }
336
IsMixedReceiver()337 bool DataNotifier::IsMixedReceiver()
338 {
339 MEDIA_LOGD("trace.");
340 auto receiver = receiver_.lock();
341 if (receiver == nullptr) {
342 SHARING_LOGE("target receiver NOT exist.");
343 return false;
344 }
345
346 return receiver->IsMixedReceiver();
347 }
348
GetReceiverReadIndex(MediaType type)349 uint32_t DataNotifier::GetReceiverReadIndex(MediaType type)
350 {
351 MEDIA_LOGD("trace.");
352 switch (type) {
353 case MEDIA_TYPE_VIDEO:
354 MEDIA_LOGD("Video Recvid:%{public}d index: %{public}d.", GetReceiverId(), videoIndex);
355 return videoIndex;
356 break;
357 case MEDIA_TYPE_AUDIO:
358 MEDIA_LOGD("Audio Recvid:%{public}d index: %{public}d.", GetReceiverId(), audioIndex);
359 return audioIndex;
360 break;
361 case MEDIA_TYPE_AV:
362 MEDIA_LOGD("Mixed Recvid:%{public}d vindex: %{public}d aindex: %{public}d.", GetReceiverId(), videoIndex,
363 audioIndex);
364 if (audioIndex != INVALID_INDEX && videoIndex != INVALID_INDEX) {
365 return audioIndex <= videoIndex ? audioIndex : videoIndex;
366 } else if (audioIndex == INVALID_INDEX && videoIndex == INVALID_INDEX) {
367 return INVALID_INDEX;
368 } else {
369 return audioIndex == INVALID_INDEX ? videoIndex : audioIndex;
370 }
371 break;
372 default:
373 return INVALID_INDEX;
374 break;
375 }
376 }
377
IsKeyModeReceiver()378 bool DataNotifier::IsKeyModeReceiver()
379 {
380 MEDIA_LOGD("trace.");
381 auto receiver = receiver_.lock();
382 if (receiver) {
383 return receiver->IsKeyMode();
384 }
385
386 return false;
387 }
388
IsKeyRedirectReceiver()389 bool DataNotifier::IsKeyRedirectReceiver()
390 {
391 SHARING_LOGD("trace.");
392 auto receiver = receiver_.lock();
393 if (receiver) {
394 return receiver->IsKeyRedirect();
395 }
396
397 return false;
398 }
399
NeedAcceleration()400 bool DataNotifier::NeedAcceleration()
401 {
402 MEDIA_LOGD("trace.");
403 auto receiver = receiver_.lock();
404 if (receiver == nullptr) {
405 SHARING_LOGE("target receiver NOT exist.");
406 return false;
407 }
408
409 return receiver->NeedAcceleration();
410 }
411
SendAccelerationDone()412 void DataNotifier::SendAccelerationDone()
413 {
414 SHARING_LOGD("trace.");
415 auto receiver = receiver_.lock();
416 if (receiver == nullptr) {
417 SHARING_LOGE("target receiver NOT exist.");
418 return;
419 }
420
421 receiver->SendAccelerationDone();
422 receiver->DisableAcceleration();
423 }
424
BufferDispatcher(uint32_t maxCapacity,uint32_t capacityIncrement)425 BufferDispatcher::BufferDispatcher(uint32_t maxCapacity, uint32_t capacityIncrement)
426 {
427 SHARING_LOGD("BufferDispatcher ctor, set capacity: %{public}u.", maxCapacity);
428 maxBufferCapacity_ = maxCapacity;
429 bufferCapacityIncrement_ = capacityIncrement;
430 {
431 std::lock_guard<std::mutex> lock(idleMutex_);
432 idleAudioBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
433 idleVideoBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
434 for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
435 MediaData::Ptr adata = std::make_shared<MediaData>();
436 MediaData::Ptr vdata = std::make_shared<MediaData>();
437 adata->buff = std::make_shared<DataBuffer>();
438 vdata->buff = std::make_shared<DataBuffer>();
439 idleAudioBuffer_.push_back(adata);
440 idleVideoBuffer_.push_back(vdata);
441 }
442 }
443
444 writingTimer_ = std::make_unique<TimeoutTimer>("dispatcher-writing-timer");
445
446 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
447 circularBuffer_.set_capacity(INITIAL_BUFFER_CAPACITY);
448 StartDispatch();
449 }
450
~BufferDispatcher()451 BufferDispatcher::~BufferDispatcher()
452 {
453 SHARING_LOGI("BufferDispatcher dtor.");
454 running_ = false;
455 StopDispatch();
456 FlushBuffer();
457 ReleaseIdleBuffer();
458 ReleaseAllReceiver();
459 }
460
StartDispatch()461 void BufferDispatcher::StartDispatch()
462 {
463 SHARING_LOGD("trace.");
464 running_ = true;
465 notifyThread_ = std::thread(&BufferDispatcher::NotifyThreadWorker, this);
466 std::string name = "notifyThread";
467 pthread_setname_np(notifyThread_.native_handle(), name.c_str());
468 }
469
StopDispatch()470 void BufferDispatcher::StopDispatch()
471 {
472 SHARING_LOGD("trace.");
473 running_ = false;
474 continueNotify_ = true;
475
476 if (writingTimer_) {
477 writingTimer_.reset();
478 }
479
480 dataCV_.notify_all();
481 if (notifyThread_.joinable()) {
482 notifyThread_.join();
483 }
484 }
485
SetBufferCapacity(size_t capacity)486 void BufferDispatcher::SetBufferCapacity(size_t capacity)
487 {
488 SHARING_LOGD("trace.");
489 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
490 circularBuffer_.set_capacity(capacity);
491 }
492
SetDataMode(MediaDispacherMode dataMode)493 void BufferDispatcher::SetDataMode(MediaDispacherMode dataMode)
494 {
495 SHARING_LOGD("trace.");
496 dataMode_ = dataMode;
497 }
498
ReleaseIdleBuffer()499 void BufferDispatcher::ReleaseIdleBuffer()
500 {
501 SHARING_LOGD("BufferDispatcher idle Release Start.");
502 std::unique_lock<std::mutex> locker(idleMutex_);
503 for (auto &data : idleAudioBuffer_) {
504 if (data != nullptr || data->buff != nullptr) {
505 data->buff.reset();
506 }
507 }
508
509 idleAudioBuffer_.clear();
510 for (auto &data : idleVideoBuffer_) {
511 if (data != nullptr || data->buff != nullptr) {
512 data->buff.reset();
513 }
514 }
515
516 idleVideoBuffer_.clear();
517 SHARING_LOGD("BufferDispatcher idle Release End.");
518 }
519
FlushBuffer()520 void BufferDispatcher::FlushBuffer()
521 {
522 SHARING_LOGD("BufferDispatcher Start flushing, dispatcherId: %{public}u.", GetDispatcherId());
523 {
524 std::lock_guard<std::mutex> lock(idleMutex_);
525 idleAudioBuffer_.clear();
526 idleVideoBuffer_.clear();
527 for (size_t i = 0; i < INITIAL_BUFFER_CAPACITY; i++) {
528 MediaData::Ptr adata = std::make_shared<MediaData>();
529 MediaData::Ptr vdata = std::make_shared<MediaData>();
530 adata->buff = std::make_shared<DataBuffer>();
531 vdata->buff = std::make_shared<DataBuffer>();
532 idleAudioBuffer_.push_back(adata);
533 idleVideoBuffer_.push_back(vdata);
534 }
535 }
536
537 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
538 for (auto &data : circularBuffer_) {
539 if (data->mediaData != nullptr && data->mediaData->buff != nullptr) {
540 data->mediaData->buff.reset();
541 }
542 }
543
544 circularBuffer_.clear();
545 waitingKey_ = true;
546 gop_ = 0;
547 audioFrameCnt_ = 0;
548 videoFrameCnt_ = 0;
549 ResetAllIndex();
550 SHARING_LOGD("BufferDispatcher Dispatcher flushing end, dispatcherId: %{public}u.", GetDispatcherId());
551 }
552
RequestDataBuffer(MediaType type,uint32_t size)553 MediaData::Ptr BufferDispatcher::RequestDataBuffer(MediaType type, uint32_t size)
554 {
555 SHARING_LOGD("trace.");
556 std::lock_guard<std::mutex> lock(idleMutex_);
557 if (size <= 0) {
558 SHARING_LOGE("Size invalid.");
559 return nullptr;
560 }
561
562 MediaData::Ptr retData;
563 if (type == MEDIA_TYPE_VIDEO) {
564 if (!idleVideoBuffer_.empty()) {
565 SHARING_LOGD("video From idle.");
566 retData = idleVideoBuffer_.front();
567 idleVideoBuffer_.pop_front();
568 if (retData == nullptr || retData->buff == nullptr) {
569 MEDIA_LOGW("video From alloc when idle nullptr.");
570 retData = std::make_shared<MediaData>();
571 retData->buff = std::make_shared<DataBuffer>();
572 retData->buff->Resize(size);
573
574 return retData;
575 }
576 if (retData->buff->Size() < static_cast<int>(size)) {
577 retData->buff->Resize(size);
578 }
579 return retData;
580 }
581 } else {
582 if (!idleAudioBuffer_.empty()) {
583 SHARING_LOGD("Audio From idle.");
584 retData = idleAudioBuffer_.front();
585 idleAudioBuffer_.pop_front();
586 if (retData == nullptr || retData->buff == nullptr) {
587 MEDIA_LOGW("Audio From alloc when idle nullptr.");
588 retData = std::make_shared<MediaData>();
589 retData->buff = std::make_shared<DataBuffer>();
590 retData->buff->Resize(size);
591
592 return retData;
593 }
594 if (retData->buff->Size() < size) {
595 retData->buff->Resize(size);
596 }
597 return retData;
598 }
599 }
600
601 SHARING_LOGD("Audio From alloc.");
602 retData = std::make_shared<MediaData>();
603 retData->buff = std::make_shared<DataBuffer>();
604 retData->buff->Resize(size);
605 return retData;
606 }
607
ReturnIdleBuffer(DataSpec::Ptr & data)608 void BufferDispatcher::ReturnIdleBuffer(DataSpec::Ptr &data)
609 {
610 MEDIA_LOGD("trace.");
611 std::lock_guard<std::mutex> lock(idleMutex_);
612 if (data == nullptr || data->mediaData == nullptr) {
613 return;
614 }
615 if (data->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
616 if (idleVideoBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
617 idleVideoBuffer_.push_back(data->mediaData);
618 MEDIA_LOGD("data: push_back in idleVideoBuffer_, size: %{public}zu.", idleVideoBuffer_.size());
619 }
620 } else {
621 if (idleAudioBuffer_.size() < INITIAL_BUFFER_CAPACITY) {
622 idleAudioBuffer_.push_back(data->mediaData);
623 MEDIA_LOGD("data: push_back in idleAudioBuffer_, size: %{public}zu.", idleAudioBuffer_.size());
624 }
625 }
626
627 data.reset();
628 }
629
GetBufferSize()630 size_t BufferDispatcher::GetBufferSize()
631 {
632 SHARING_LOGD("trace.");
633 return circularBuffer_.size();
634 }
635
FindReceiverIndex(uint32_t receiverId)636 uint32_t BufferDispatcher::FindReceiverIndex(uint32_t receiverId)
637 {
638 MEDIA_LOGD("trace.");
639 if (notifiers_.find(receiverId) != notifiers_.end()) {
640 return notifiers_[receiverId]->GetReadIndex();
641 }
642
643 return INVALID_INDEX;
644 }
645
IsRecevierExist(uint32_t receiverId)646 bool BufferDispatcher::IsRecevierExist(uint32_t receiverId)
647 {
648 SHARING_LOGD("trace.");
649 auto notifier = GetNotifierByReceiverId(receiverId);
650 if (notifier == nullptr) {
651 return false;
652 }
653
654 return true;
655 }
656
EnableKeyMode(bool enable)657 void BufferDispatcher::EnableKeyMode(bool enable)
658 {
659 SHARING_LOGD("trace.");
660 keyOnly_ = enable;
661 }
662
AttachReceiver(BufferReceiver::Ptr receiver)663 int32_t BufferDispatcher::AttachReceiver(BufferReceiver::Ptr receiver)
664 {
665 SHARING_LOGD("trace.");
666 if (receiver == nullptr) {
667 return -1;
668 }
669
670 if (IsRecevierExist(receiver->GetReceiverId())) {
671 SHARING_LOGE("Exist.");
672 return 0;
673 }
674
675 receiver->NotifyReadStart();
676 std::lock_guard<std::mutex> locker(notifyMutex_);
677 if (readRefFlag_ == 0xFFFF) {
678 SHARING_LOGE("readRefFlag limited.");
679 return -1;
680 }
681
682 DataNotifier::Ptr notifier = std::make_shared<DataNotifier>();
683 notifier->SetListenDispatcher(shared_from_this());
684 notifier->SetNotifyReceiver(receiver);
685
686 auto usableRef = ~readRefFlag_ & (-(~readRefFlag_));
687
688 if ((usableRef & (usableRef - 1)) != 0) {
689 SHARING_LOGE("usableRef: %{public}d invalid.", usableRef);
690 return -1;
691 }
692
693 readRefFlag_ |= usableRef;
694 notifier->SetReadIndex(static_cast<uint32_t>(log2(usableRef)));
695 SHARING_LOGI("receiverId: %{public}d, readIndex: %{public}d, usableRef: %{public}d, readRefFlag_: %{public}d.",
696 receiver->GetReceiverId(), notifier->GetReadIndex(), usableRef, readRefFlag_);
697 receiver->SetSource(shared_from_this());
698 notifiers_.emplace(receiver->GetReceiverId(), notifier);
699
700 if (circularBuffer_.empty()) {
701 notifier->audioIndex = INVALID_INDEX;
702 notifier->videoIndex = INVALID_INDEX;
703 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
704 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
705 SHARING_LOGD("BufferDispatcher Attach when buffer empty RecvId: %{public}d.", receiver->GetReceiverId());
706 videoNeedActivate_ = true;
707 audioNeedActivate_ = true;
708 return 0;
709 }
710
711 if (dataMode_ == MEDIA_AUDIO_ONLY) {
712 notifier->audioIndex = circularBuffer_.size() - 1;
713 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
714 notifier->videoIndex = INVALID_INDEX;
715 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
716 SHARING_LOGD("BufferDispatcher Attach when Keyindex List empty RecvId: %{public}d.",
717 receiver->GetReceiverId());
718 } else {
719 if (!keyIndexList_.empty()) {
720 SHARING_LOGD("BufferDispatcher Attach with Keyindex RecvId: %{public}d KeyIndex:%{public}d.",
721 receiver->GetReceiverId(), keyIndexList_.back());
722 uint32_t tempIndex = FindNextIndex(keyIndexList_.back(), MEDIA_TYPE_AUDIO);
723 notifier->audioIndex = tempIndex == keyIndexList_.back() ? INVALID_INDEX : tempIndex;
724 notifier->videoIndex = keyIndexList_.back();
725 bool isAudioReady = tempIndex != INVALID_INDEX ? true : false;
726 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, isAudioReady);
727 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, true);
728 if (lastAudioIndex_ == INVALID_INDEX) {
729 audioNeedActivate_ = true;
730 }
731 } else {
732 SHARING_LOGD("BufferDispatcher Attach with Non Keyindex Exist RecvId: %{public}d.",
733 receiver->GetReceiverId());
734 uint32_t tempIndex = FindLastIndex(MEDIA_TYPE_AUDIO);
735 notifier->audioIndex = tempIndex;
736 notifier->videoIndex = INVALID_INDEX;
737 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, true);
738 SetReceiverDataRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
739 if (lastAudioIndex_ == INVALID_INDEX) {
740 audioNeedActivate_ = true;
741 }
742 }
743 }
744
745 return 0;
746 }
747
DetachReceiver(BufferReceiver::Ptr receiver)748 int32_t BufferDispatcher::DetachReceiver(BufferReceiver::Ptr receiver)
749 {
750 SHARING_LOGI("buffer dispatcher: Detach receiver in.");
751 if (receiver == nullptr) {
752 SHARING_LOGE("buffer dispatcher: Detach receiver failed - null receiver.");
753 return -1;
754 }
755
756 if (!IsRecevierExist(receiver->GetReceiverId())) {
757 SHARING_LOGE("BufferDispatcher AttachReceiver No Vaild Recevier Exist.");
758 return 0;
759 }
760
761 auto notifier = GetNotifierByReceiverPtr(receiver);
762 if (notifier == nullptr) {
763 SHARING_LOGE("buffer dispatcher: Detach receiver failed - no find receiver in notifiers.");
764 return -1;
765 }
766
767 std::lock_guard<std::mutex> locker(notifyMutex_);
768 notifier->SetBlock();
769 SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_AUDIO, false);
770 SetReceiverReadRef(receiver->GetReceiverId(), MEDIA_TYPE_VIDEO, false);
771
772 readRefFlag_ &= ~(RECV_FLAG_BASE << notifier->GetReadIndex());
773 notifiers_.erase(receiver->GetReceiverId());
774 SHARING_LOGI("now refFlag: %{public}d.", readRefFlag_);
775 return 0;
776 }
777
ReleaseAllReceiver()778 void BufferDispatcher::ReleaseAllReceiver()
779 {
780 SHARING_LOGD("trace.");
781 std::lock_guard<std::mutex> locker(notifyMutex_);
782 for (auto &[recvId, notifier] : notifiers_) {
783 if (notifier != nullptr && notifier->GetBufferReceiver() != nullptr) {
784 DetachReceiver(notifier->GetBufferReceiver());
785 }
786 }
787
788 notifiers_.clear();
789 SHARING_LOGD("release all receiver out.");
790 }
791
SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)792 void BufferDispatcher::SetBufferDispatcherListener(BufferDispatcherListener::Ptr listener)
793 {
794 SHARING_LOGD("trace.");
795 listener_ = listener;
796 writingTimer_->StartTimer(WRITING_TIMTOUT, "waiting for continuous data inputs", [this]() {
797 if (!writing_) {
798 SHARING_LOGI("writing timeout");
799 auto listener = listener_.lock();
800 if (listener) {
801 listener->OnWriteTimeout();
802 }
803 } else {
804 SHARING_LOGI("restart timer");
805 writing_ = false;
806 }
807 }, true);
808 }
809
GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)810 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverPtr(BufferReceiver::Ptr receiver)
811 {
812 SHARING_LOGD("trace.");
813 return GetNotifierByReceiverId(receiver->GetReceiverId());
814 }
815
GetNotifierByReceiverId(uint32_t receiverId)816 DataNotifier::Ptr BufferDispatcher::GetNotifierByReceiverId(uint32_t receiverId)
817 {
818 MEDIA_LOGD("trace.");
819 std::lock_guard<std::mutex> locker(notifyMutex_);
820 if (notifiers_.find(receiverId) != notifiers_.end()) {
821 return notifiers_[receiverId];
822 }
823
824 return nullptr;
825 }
826
ReadBufferData(uint32_t receiverId,MediaType type,std::function<void (const MediaData::Ptr & data)> cb)827 int32_t BufferDispatcher::ReadBufferData(uint32_t receiverId, MediaType type,
828 std::function<void(const MediaData::Ptr &data)> cb)
829 {
830 MEDIA_LOGD("in, receiverId: %{public}u.", receiverId);
831 auto notifier = GetNotifierByReceiverId(receiverId);
832 if (notifier == nullptr) {
833 SHARING_LOGE("notifier is nullptr.");
834 return -1;
835 }
836
837 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
838 uint32_t readIndex = notifier->GetReceiverReadIndex(type);
839 if (readIndex >= circularBuffer_.size()) {
840 SHARING_LOGE("Read wrong index exceed size.");
841 return -1;
842 }
843
844 if ((keyOnly_ || notifier->IsKeyModeReceiver()) && type == MEDIA_TYPE_VIDEO &&
845 !IsKeyVideoFrame(circularBuffer_.at(readIndex))) {
846 UpdateReceiverReadIndex(receiverId, readIndex, type);
847 SHARING_LOGE("Read Non Key Video in KeyOnly Mode index: %{public}u.", readIndex);
848 return -1;
849 }
850
851 if (IsDataReaded(receiverId, circularBuffer_.at(readIndex))) {
852 UpdateReceiverReadIndex(receiverId, readIndex, type);
853 return -1;
854 }
855
856 readIndex = notifier->GetReceiverReadIndex(type);
857 if (readIndex >= circularBuffer_.size()) {
858 return -1;
859 }
860
861 auto data = circularBuffer_.at(readIndex);
862 if (data == nullptr) {
863 SHARING_LOGE("BufferDispatcher Read data nullptr.");
864 return -1;
865 }
866
867 if (IsKeyVideoFrame(data)) {
868 int32_t bufferVideoCacheCnt = 0;
869 for (size_t i = readIndex + 1; i < circularBuffer_.size(); i++) {
870 if (circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO)
871 bufferVideoCacheCnt++;
872 }
873 MEDIA_LOGD("TEST STATISTIC:interval: buffer cache %{public}d frames.", bufferVideoCacheCnt);
874 }
875
876 SetReceiverReadFlag(receiverId, data);
877 if (cb != nullptr) {
878 cb(data->mediaData);
879 }
880
881 MEDIA_LOGD(
882 "Current data readed, Recvid:%{public}d, remain %{public}zu data, readIndex: %{public}u, "
883 "readtype: %{public}d, diff: %{public}zu.",
884 receiverId, circularBuffer_.size(), readIndex, int32_t(type), circularBuffer_.size() - readIndex);
885 UpdateReceiverReadIndex(receiverId, readIndex, type);
886 return 0;
887 }
888
InputData(const MediaData::Ptr & data)889 int32_t BufferDispatcher::InputData(const MediaData::Ptr &data)
890 {
891 MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".", data->mediaType,
892 data->keyFrame, data->pts);
893 if (data == nullptr || data->buff == nullptr) {
894 SHARING_LOGE("data nullptr.");
895 return -1;
896 }
897
898 if (!writing_) {
899 writing_ = true;
900 }
901
902 DataSpec::Ptr dataSpec = std::make_shared<DataSpec>();
903 dataSpec->mediaData = data;
904 if (dataMode_ == MEDIA_AUDIO_ONLY) {
905 WriteDataIntoBuffer(dataSpec);
906 } else {
907 PreProcessDataSpec(dataSpec);
908 }
909
910 if (circularBuffer_.size() > 0) {
911 MEDIA_LOGD("inputmediatype: %{public}d, keyFrame: %{public}d, pts: %{public}" PRIu64 ".",
912 circularBuffer_[circularBuffer_.size() - 1]->mediaData->mediaType,
913 circularBuffer_[circularBuffer_.size() - 1]->mediaData->keyFrame,
914 circularBuffer_[circularBuffer_.size() - 1]->mediaData->pts);
915 }
916
917 MEDIA_LOGD(
918 "dispatcherId: %{public}u, after InputData, current circularBuffer_ size: %{public}zu, "
919 "idleVideoBuffer_ size: %{public}zu, idle_audioBuffer_ size: %{public}zu, "
920 "keyFrame: %{public}s, data size: %{public}d, adataCount:%{public}d.",
921 GetDispatcherId(), circularBuffer_.size(), idleVideoBuffer_.size(), idleAudioBuffer_.size(),
922 data->keyFrame ? "true" : "false", data->buff->Size(), audioFrameCnt_);
923 return 0;
924 }
925
PreProcessDataSpec(const DataSpec::Ptr & dataSpec)926 void BufferDispatcher::PreProcessDataSpec(const DataSpec::Ptr &dataSpec)
927 {
928 MEDIA_LOGD("trace.");
929 if (waitingKey_) {
930 if (IsAudioData(dataSpec)) {
931 } else if (!IsKeyVideoFrame(dataSpec)) {
932 SHARING_LOGD("BufferDispatcher Waiting First Key Video Frame.");
933 return;
934 } else {
935 SHARING_LOGD("BufferDispatcher received first key video frame and restore from uncontinuous...Flushing.");
936 FlushBuffer();
937 baseCounter_++;
938 capacityEvaluating_ = true;
939 waitingKey_ = false;
940 }
941 } else {
942 if (capacityEvaluating_) {
943 ReCalculateCapacity(IsKeyVideoFrame(dataSpec));
944 }
945 }
946
947 WriteDataIntoBuffer(dataSpec);
948 }
949
WriteDataIntoBuffer(const DataSpec::Ptr & data)950 int32_t BufferDispatcher::WriteDataIntoBuffer(const DataSpec::Ptr &data)
951 {
952 MEDIA_LOGD("trace.");
953 if (data->mediaData == nullptr || data->mediaData->buff == nullptr) {
954 SHARING_LOGE("null data.");
955 return -1;
956 }
957
958 if (NeedExtendToDBCapacity()) {
959 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d CRTL_SIZE.", doubleBufferCapacity_);
960 SetBufferCapacity(doubleBufferCapacity_);
961 }
962
963 if (NeedRestoreToNormalCapacity()) {
964 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
965 int32_t popSize = circularBuffer_.size() - INITIAL_BUFFER_CAPACITY;
966 for (int32_t i = 0; i < popSize; i++) {
967 if (HeadFrameNeedReserve()) {
968 MEDIA_LOGW(
969 "dispatcherId: %{public}u, need reserve but pop, mediaType: "
970 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64".",
971 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
972 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
973 circularBuffer_.front()->mediaData->pts);
974 }
975
976 MEDIA_LOGW(
977 "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: "
978 "%{public}s, pts: %{public}" PRIu64", reserveFlag: %{public}x.",
979 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
980 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
981 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
982 circularBuffer_.pop_front();
983 audioFrameCnt_--;
984 UpdateIndex();
985 }
986
987 baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
988 doubleBufferCapacity_ = INITIAL_BUFFER_CAPACITY * 2; //2 : increasement
989 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d NORMALSIZE.", baseBufferCapacity_);
990 circularBuffer_.set_capacity(baseBufferCapacity_);
991 }
992
993 if (IsKeyVideoFrame(data) && !keyOnly_) {
994 EraseOldGopDatas();
995 }
996
997 bool updateIndexFlag = false;
998 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
999 if (circularBuffer_.size() >= circularBuffer_.capacity()) {
1000 updateIndexFlag = true;
1001 }
1002
1003 if (updateIndexFlag) {
1004 uint32_t nextDeleteIndex = 1;
1005 if (IsVideoData(data)) {
1006 nextDeleteIndex = FindNextDeleteVideoIndex();
1007 }
1008
1009 for (size_t i = 0; i <= nextDeleteIndex; i++) {
1010 MediaType headType = circularBuffer_.front()->mediaData->mediaType;
1011 DataSpec::Ptr retBuff = circularBuffer_.front();
1012 if (HeadFrameNeedReserve()) {
1013 MEDIA_LOGW(
1014 "dispatcherId: %{public}u, need reserve but pop, mediaType: "
1015 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64".",
1016 GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1017 retBuff->mediaData->keyFrame ? "true" : "false", retBuff->mediaData->pts);
1018 }
1019
1020 MEDIA_LOGW(
1021 "dispatcherId: %{public}u, delete data, mediaType: %{public}d, "
1022 "keyFrame: %{public}s, pts: %{public}" PRIu64", reserveFlag: %{public}x.",
1023 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1024 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false",
1025 circularBuffer_.front()->mediaData->pts, circularBuffer_.front()->reserveFlag.load());
1026
1027 circularBuffer_.pop_front();
1028 ReturnIdleBuffer(retBuff);
1029 headType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1030 UpdateIndex();
1031 }
1032 }
1033
1034 data->reserveFlag = 0;
1035 MEDIA_LOGD("WriteDataIntoBuffer data type: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1036 ", cur_size: %{public}zu, capacity: %{public}zu dispatcher[%{public}u].",
1037 int32_t(data->mediaData->mediaType), data->mediaData->keyFrame ? "true" : "false",
1038 data->mediaData->pts, circularBuffer_.size(), circularBuffer_.capacity(), GetDispatcherId());
1039 circularBuffer_.push_back(data);
1040 if (IsAudioData(data)) {
1041 lastAudioIndex_ = circularBuffer_.size() - 1;
1042 ActiveDataRef(MEDIA_TYPE_AUDIO, false);
1043 audioFrameCnt_++;
1044 } else {
1045 lastVideoIndex_ = circularBuffer_.size() - 1;
1046 if (!keyOnly_ || (keyOnly_ && IsKeyVideoFrame(data))) {
1047 ActiveDataRef(MEDIA_TYPE_VIDEO, IsKeyVideoFrame(data));
1048 }
1049 videoFrameCnt_++;
1050 }
1051
1052 if (audioNeedActivate_ && IsAudioData(data)) {
1053 MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By AudioData.");
1054 ActivateReceiverIndex(circularBuffer_.size() - 1, MEDIA_TYPE_AUDIO);
1055 }
1056
1057 if (IsKeyVideoFrame(data)) {
1058 uint32_t keyIndex = circularBuffer_.size() - 1;
1059 {
1060 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1061 keyIndexList_.push_back(keyIndex);
1062 }
1063 if (videoNeedActivate_) {
1064 MEDIA_LOGD("BufferDispatcher ActivateReceiverIndex By KeyVideo Frame index: %{public}d.", keyIndex);
1065 ActivateReceiverIndex(keyIndex, MEDIA_TYPE_VIDEO);
1066 }
1067 if (keyRedirect_) {
1068 OnKeyRedirect();
1069 EnableKeyRedirect(false);
1070 }
1071 }
1072
1073 continueNotify_ = true;
1074 dataCV_.notify_one();
1075 return 0;
1076 }
1077
EraseOldGopDatas()1078 void BufferDispatcher::EraseOldGopDatas()
1079 {
1080 MEDIA_LOGD("BufferDispatcher Delete old datas In.");
1081 if (dataMode_ == MEDIA_AUDIO_ONLY) {
1082 FlushBuffer();
1083 return;
1084 }
1085
1086 std::unique_lock<std::shared_mutex> locker(bufferMutex_);
1087 uint32_t nextKey = 0;
1088 {
1089 std::lock_guard<std::mutex> lock(notifyMutex_);
1090 if (!keyIndexList_.empty() && keyIndexList_.back() > 0) {
1091 MEDIA_LOGD("find next key listsize %{public}zu, back:%{public}d.", keyIndexList_.size(),
1092 keyIndexList_.back());
1093 nextKey = keyIndexList_.back();
1094 keyIndexList_.clear();
1095 keyIndexList_.push_back(nextKey);
1096 }
1097 }
1098
1099 MEDIA_LOGD("erase between 0 to next Video Frame %{public}d.", nextKey);
1100 DeleteHeadDatas(nextKey, false);
1101 nextKey = FindNextDeleteVideoIndex();
1102 DeleteHeadDatas(nextKey, true);
1103 std::string indexs;
1104
1105 MEDIA_LOGD("circularBuffer_ size: %{public}zu.", circularBuffer_.size());
1106 for (auto &keyIndex : keyIndexList_) {
1107 indexs += std::to_string(keyIndex) + ", ";
1108 MEDIA_LOGD("keyIndex update to %{public}d.", keyIndex);
1109 }
1110
1111 MEDIA_LOGD("current keyIndex: %{public}s.", indexs.c_str());
1112 }
1113
DeleteHeadDatas(uint32_t size,bool forceDelete)1114 void BufferDispatcher::DeleteHeadDatas(uint32_t size, bool forceDelete)
1115 {
1116 MEDIA_LOGD("trace.");
1117 if (size <= 0) {
1118 MEDIA_LOGW("invalid Size, dispatcherId: %{public}u!", GetDispatcherId());
1119 return;
1120 }
1121
1122 for (size_t i = 0; i < size; i++) {
1123 if (HeadFrameNeedReserve() && !forceDelete) {
1124 MEDIA_LOGD("index %{public}zu need reserve.", i);
1125 break;
1126 }
1127 if (circularBuffer_.empty()) {
1128 return;
1129 }
1130 DataSpec::Ptr retBuff = circularBuffer_.front();
1131 MEDIA_LOGD("BufferDispatcher pop out headtype %{public}d.", retBuff->mediaData->mediaType);
1132 retBuff->mediaData->mediaType == MEDIA_TYPE_AUDIO ? audioFrameCnt_-- : videoFrameCnt_--;
1133 if (HeadFrameNeedReserve()) {
1134 MEDIA_LOGW("dispatcherId: %{public}u, need reserve but pop, mediaType: "
1135 "%{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64 ".",
1136 GetDispatcherId(), int32_t(retBuff->mediaData->mediaType),
1137 retBuff->mediaData->keyFrame ? "true" : "false",
1138 retBuff->mediaData->pts);
1139 }
1140
1141 MEDIA_LOGW(
1142 "dispatcherId: %{public}u, delete data, mediaType: %{public}d, keyFrame: %{public}s, pts: %{public}" PRIu64
1143 ", reserveFlag: %{public}x.",
1144 GetDispatcherId(), int32_t(circularBuffer_.front()->mediaData->mediaType),
1145 circularBuffer_.front()->mediaData->keyFrame ? "true" : "false", circularBuffer_.front()->mediaData->pts,
1146 circularBuffer_.front()->reserveFlag.load());
1147 circularBuffer_.pop_front();
1148 ReturnIdleBuffer(retBuff);
1149 UpdateIndex();
1150 }
1151
1152 if (circularBuffer_.size() < baseBufferCapacity_ && circularBuffer_.capacity() > baseBufferCapacity_) {
1153 MEDIA_LOGD("capacity return to base %{public}d.", baseBufferCapacity_);
1154 circularBuffer_.set_capacity(baseBufferCapacity_);
1155 }
1156 }
1157
UpdateIndex()1158 void BufferDispatcher::UpdateIndex()
1159 {
1160 MEDIA_LOGD("trace.");
1161 std::lock_guard<std::mutex> locker(notifyMutex_);
1162 if (!keyIndexList_.empty() && keyIndexList_.front() == 0) {
1163 keyIndexList_.pop_front();
1164 MEDIA_LOGD("BufferDispatcher pop out first 0 keyIndex after listsize %{public}zu.", keyIndexList_.size());
1165 }
1166
1167 for (auto &keyIndex : keyIndexList_) {
1168 if (keyIndex > 0) {
1169 keyIndex--;
1170 }
1171 }
1172
1173 for (auto &[recvId, notifier] : notifiers_) {
1174 if (notifier->videoIndex > 0 && notifier->videoIndex != INVALID_INDEX) {
1175 notifier->videoIndex--;
1176 }
1177 if (notifier->audioIndex > 0 && notifier->audioIndex != INVALID_INDEX) {
1178 notifier->audioIndex--;
1179 }
1180 }
1181
1182 if (lastVideoIndex_ > 0 && lastVideoIndex_ != INVALID_INDEX) {
1183 lastVideoIndex_--;
1184 }
1185
1186 if (lastAudioIndex_ > 0 && lastAudioIndex_ != INVALID_INDEX) {
1187 lastAudioIndex_--;
1188 }
1189 }
1190
FindNextDeleteVideoIndex()1191 uint32_t BufferDispatcher::FindNextDeleteVideoIndex()
1192 {
1193 MEDIA_LOGD("trace.");
1194 for (size_t i = 0; i < circularBuffer_.size(); i++) {
1195 if (circularBuffer_[i]->mediaData->mediaType == MEDIA_TYPE_VIDEO) {
1196 return i;
1197 }
1198 }
1199
1200 return 0;
1201 }
1202
FindLastIndex(MediaType type)1203 uint32_t BufferDispatcher::FindLastIndex(MediaType type)
1204 {
1205 SHARING_LOGD("trace.");
1206 if (circularBuffer_.empty()) {
1207 return INVALID_INDEX;
1208 }
1209
1210 return type == MEDIA_TYPE_AUDIO ? lastAudioIndex_ : lastVideoIndex_;
1211 }
1212
UpdateReceiverReadIndex(uint32_t receiverId,const uint32_t readIndex,MediaType type)1213 void BufferDispatcher::UpdateReceiverReadIndex(uint32_t receiverId, const uint32_t readIndex, MediaType type)
1214 {
1215 MEDIA_LOGD("trace.");
1216 uint32_t nextIndex = FindNextIndex(readIndex, type, receiverId);
1217 bool noAvaliableData = false;
1218 if (nextIndex == readIndex) {
1219 noAvaliableData = true;
1220 }
1221
1222 auto notifier = GetNotifierByReceiverId(receiverId);
1223 if (notifier == nullptr) {
1224 SHARING_LOGE("notifier is nullptr.");
1225 return;
1226 }
1227
1228 bool readOver = circularBuffer_.size() - readIndex < 3;
1229 if (readOver && notifier->NeedAcceleration() && type == MEDIA_TYPE_VIDEO) {
1230 SHARING_LOGD("BufferDispatcher SendAccelerationDone.");
1231 notifier->SendAccelerationDone();
1232 }
1233
1234 notifier->SetNeedUpdate(noAvaliableData, type);
1235
1236 if (type == MEDIA_TYPE_VIDEO) {
1237 notifier->videoIndex = nextIndex;
1238 } else if (type == MEDIA_TYPE_AUDIO) {
1239 notifier->audioIndex = nextIndex;
1240 } else {
1241 notifier->videoIndex = nextIndex;
1242 notifier->audioIndex = nextIndex;
1243 }
1244
1245 MEDIA_LOGD("After UpdateReceiverReadIndex type %{public}d, aindex %{public}d, vindex %{public}d.", type,
1246 notifier->audioIndex, notifier->videoIndex);
1247 }
1248
FindNextIndex(uint32_t index,MediaType type)1249 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type)
1250 {
1251 MEDIA_LOGD("trace.");
1252 if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1253 return index;
1254 }
1255
1256 if (type == MEDIA_TYPE_AV) {
1257 return index + 1;
1258 }
1259
1260 for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1261 if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1262 if (keyOnly_ && type == MEDIA_TYPE_VIDEO && !IsKeyVideoFrame(circularBuffer_[i])) {
1263 continue;
1264 } else {
1265 return i;
1266 }
1267 }
1268 }
1269
1270 return index;
1271 }
1272
FindNextIndex(uint32_t index,MediaType type,uint32_t receiverId)1273 uint32_t BufferDispatcher::FindNextIndex(uint32_t index, MediaType type, uint32_t receiverId)
1274 {
1275 MEDIA_LOGD("trace.");
1276 if (index + 1 >= circularBuffer_.size() || index == INVALID_INDEX) {
1277 return index;
1278 }
1279
1280 if (type == MEDIA_TYPE_AV) {
1281 return index + 1;
1282 }
1283
1284 auto notifier = GetNotifierByReceiverId(receiverId);
1285 if (notifier == nullptr) {
1286 SHARING_LOGE("FindNextIndex GetNotifier nullptr.");
1287 return INVALID_INDEX;
1288 }
1289
1290 bool keyModeReceiver = notifier->IsKeyModeReceiver();
1291 for (size_t i = index + 1; i < circularBuffer_.size(); i++) {
1292 if (circularBuffer_[i] && circularBuffer_[i]->mediaData && circularBuffer_[i]->mediaData->mediaType == type) {
1293 if ((keyOnly_ || keyModeReceiver) && type == MEDIA_TYPE_VIDEO) {
1294 if (!IsKeyVideoFrame(circularBuffer_[i])) {
1295 continue;
1296 } else {
1297 for (size_t bIndex = index + 1; bIndex < i; bIndex++) {
1298 SetReceiverReadFlag(receiverId, circularBuffer_[bIndex]);
1299 }
1300 return i;
1301 }
1302 } else {
1303 return i;
1304 }
1305 }
1306 }
1307
1308 return index;
1309 }
1310
ResetAllIndex()1311 void BufferDispatcher::ResetAllIndex()
1312 {
1313 SHARING_LOGD("trace.");
1314 std::lock_guard<std::mutex> locker(notifyMutex_);
1315 keyIndexList_.clear();
1316 for (auto &[recvId, notifier] : notifiers_) {
1317 notifier->videoIndex = INVALID_INDEX;
1318 notifier->audioIndex = INVALID_INDEX;
1319 }
1320
1321 videoNeedActivate_ = true;
1322 audioNeedActivate_ = true;
1323 lastAudioIndex_ = INVALID_INDEX;
1324 lastVideoIndex_ = INVALID_INDEX;
1325 }
1326
IsDataReaded(uint32_t receiverId,DataSpec::Ptr & dataSpec)1327 bool BufferDispatcher::IsDataReaded(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1328 {
1329 MEDIA_LOGD("trace.");
1330 uint32_t index = FindReceiverIndex(receiverId);
1331 if (index == INVALID_INDEX) {
1332 return false;
1333 }
1334
1335 return dataSpec->reserveFlag & RECV_FLAG_BASE << index;
1336 }
1337
IsVideoData(const DataSpec::Ptr & dataSpec)1338 bool BufferDispatcher::IsVideoData(const DataSpec::Ptr &dataSpec)
1339 {
1340 MEDIA_LOGD("trace.");
1341 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1342 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1343 return false;
1344 }
1345
1346 return dataSpec->mediaData->mediaType == MEDIA_TYPE_VIDEO;
1347 }
1348
IsAudioData(const DataSpec::Ptr & dataSpec)1349 bool BufferDispatcher::IsAudioData(const DataSpec::Ptr &dataSpec)
1350 {
1351 MEDIA_LOGD("trace.");
1352 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1353 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1354 return false;
1355 }
1356
1357 return dataSpec->mediaData->mediaType == MEDIA_TYPE_AUDIO;
1358 }
1359
IsKeyVideoFrame(const DataSpec::Ptr & dataSpec)1360 bool BufferDispatcher::IsKeyVideoFrame(const DataSpec::Ptr &dataSpec)
1361 {
1362 MEDIA_LOGD("trace.");
1363 if (dataSpec == nullptr || dataSpec->mediaData == nullptr) {
1364 SHARING_LOGE("BufferDispatcher EMPTY dataSpec OR mediadata.");
1365 return false;
1366 }
1367
1368 return IsVideoData(dataSpec) && dataSpec->mediaData->keyFrame;
1369 }
1370
HeadFrameNeedReserve()1371 bool BufferDispatcher::HeadFrameNeedReserve()
1372 {
1373 MEDIA_LOGD("trace.");
1374 if (!circularBuffer_.empty()) {
1375 uint8_t temp = readRefFlag_;
1376 MEDIA_LOGD("IsHeadFrameNeedReserve Head reserveFlag %{public}d readRefFlag_ %{public}d.",
1377 circularBuffer_.front()->reserveFlag.load(), readRefFlag_);
1378 return temp ^ circularBuffer_.front()->reserveFlag;
1379 }
1380
1381 return false;
1382 }
1383
NeedExtendToDBCapacity()1384 bool BufferDispatcher::NeedExtendToDBCapacity()
1385 {
1386 MEDIA_LOGD("trace.");
1387 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1388 return (circularBuffer_.size() >= circularBuffer_.capacity() &&
1389 circularBuffer_.capacity() < doubleBufferCapacity_ && HeadFrameNeedReserve());
1390 }
1391
NeedRestoreToNormalCapacity()1392 bool BufferDispatcher::NeedRestoreToNormalCapacity()
1393 {
1394 MEDIA_LOGD("trace.");
1395 std::shared_lock<std::shared_mutex> locker(bufferMutex_);
1396 return audioFrameCnt_ >= circularBuffer_.capacity() && circularBuffer_.capacity() != INITIAL_BUFFER_CAPACITY;
1397 }
1398
ReCalculateCapacity(bool keyFrame)1399 void BufferDispatcher::ReCalculateCapacity(bool keyFrame)
1400 {
1401 MEDIA_LOGD("trace.");
1402 baseCounter_++;
1403 if (baseCounter_ >= maxBufferCapacity_) {
1404 SHARING_LOGE("BufferDispatcher too many Audiodata need Set Capacity to default.");
1405 }
1406
1407 if (baseCounter_ >= circularBuffer_.capacity() && !keyFrame) {
1408 uint32_t tmpSize = circularBuffer_.capacity() + bufferCapacityIncrement_ < maxBufferCapacity_
1409 ? circularBuffer_.capacity() + bufferCapacityIncrement_
1410 : maxBufferCapacity_;
1411 doubleBufferCapacity_ = tmpSize * 2 < maxBufferCapacity_ ? tmpSize * 2 : maxBufferCapacity_; // 2: increasement
1412 SHARING_LOGD("BufferDispatcher buffer Extended to %{public}d in adaptive capacity calculating.", tmpSize);
1413 SetBufferCapacity(tmpSize);
1414 return;
1415 }
1416
1417 if (keyFrame) {
1418 baseBufferCapacity_ = baseCounter_ + bufferCapacityIncrement_ < maxBufferCapacity_
1419 ? baseCounter_ + bufferCapacityIncrement_
1420 : maxBufferCapacity_;
1421 if (baseBufferCapacity_ < INITIAL_BUFFER_CAPACITY) {
1422 baseBufferCapacity_ = INITIAL_BUFFER_CAPACITY;
1423 }
1424 doubleBufferCapacity_ = baseBufferCapacity_ * 2 < maxBufferCapacity_ // 2: increasement
1425 ? baseBufferCapacity_ * 2 // 2: increasement
1426 : maxBufferCapacity_;
1427 gop_ = baseCounter_;
1428 capacityEvaluating_ = gop_ > 0 ? false : true;
1429 SetBufferCapacity(baseBufferCapacity_);
1430 baseCounter_ = 0;
1431 SHARING_LOGD(
1432 "The gop is %{public}d and BufferDispatcher buffer Extended to %{public}d on base capacity confirmed.",
1433 GetCurrentGop(), baseBufferCapacity_);
1434 }
1435 }
1436
NotifyThreadWorker(void * userParam)1437 int32_t BufferDispatcher::NotifyThreadWorker(void *userParam)
1438 {
1439 SHARING_LOGI("BufferDispatcher DataNotifier thread in.");
1440 BufferDispatcher *dispatcher = (BufferDispatcher *)userParam;
1441 while (dispatcher->running_) {
1442 std::unique_lock<std::mutex> locker(dispatcher->notifyMutex_);
1443 uint32_t notifyRef = dispatcher->dataBitRef_ & dispatcher->recvBitRef_;
1444 MEDIA_LOGD("DataBitRef %{public}u recvBitRef_ %{public}d notifyRef_ %{public}d.",
1445 dispatcher->dataBitRef_.load(), dispatcher->recvBitRef_.load(), notifyRef);
1446
1447 for (auto &[recvId, notifier] : dispatcher->notifiers_) {
1448 auto index = notifier->GetReadIndex();
1449 if (((RECV_FLAG_BASE << (index * 2)) & notifyRef) || // 2: fix offset, get data type
1450 ((RECV_FLAG_BASE << (index * 2 + 1)) & notifyRef)) { // 2: fix offset, get data type
1451 MediaType notifyType;
1452 if (notifier->IsMixedReceiver()) {
1453 notifyType = MEDIA_TYPE_AV;
1454 } else {
1455 notifyType = (((RECV_FLAG_BASE << (index * 2)) & notifyRef) ? // 2: fix offset, get data type
1456 MEDIA_TYPE_AUDIO : MEDIA_TYPE_VIDEO); // 2: fix offset, get data type
1457 }
1458 MEDIA_LOGD("notify the receiveId: %{public}d, notifyType: %{public}d, notifyRef: %{public}x.", recvId,
1459 notifyType, notifyRef);
1460 notifier->NotifyDataReceiver(notifyType);
1461 }
1462 }
1463
1464 dispatcher->dataCV_.wait(locker, [&dispatcher]() { return dispatcher->continueNotify_.load(); });
1465 dispatcher->continueNotify_ = false;
1466 }
1467
1468 return 0;
1469 }
1470
NotifyReadReady(uint32_t receiverId,MediaType type)1471 void BufferDispatcher::NotifyReadReady(uint32_t receiverId, MediaType type)
1472 {
1473 MEDIA_LOGD("trace.");
1474 auto notifier = GetNotifierByReceiverId(receiverId);
1475 if (notifier == nullptr) {
1476 SHARING_LOGE("notifier is nullptr.");
1477 return;
1478 }
1479
1480 std::shared_lock<std::shared_mutex> lock(bufferMutex_);
1481 std::unique_lock<std::mutex> locker(notifyMutex_);
1482 if (type == MEDIA_TYPE_AV) {
1483 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, true);
1484 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, true);
1485 } else {
1486 SetReceiverReadRef(receiverId, type, true);
1487 }
1488
1489 bool dataAvaliable = notifier->DataAvailable(type);
1490 MEDIA_LOGW("receiverId %{public}d MediaType %{public}d dataAvaliable %{public}d.", receiverId, type,
1491 dataAvaliable);
1492 if (type == MEDIA_TYPE_AV) {
1493 SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, dataAvaliable);
1494 SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, dataAvaliable);
1495 } else {
1496 SetReceiverDataRef(receiverId, type, dataAvaliable);
1497 }
1498
1499 if (!dataAvaliable) {
1500 return;
1501 }
1502
1503 continueNotify_ = true;
1504 dataCV_.notify_one();
1505 }
1506
SetDataRef(uint32_t bitref)1507 void BufferDispatcher::SetDataRef(uint32_t bitref)
1508 {
1509 SHARING_LOGD("trace.");
1510 dataBitRef_ &= bitref;
1511 }
1512
GetDataRef()1513 uint32_t BufferDispatcher::GetDataRef()
1514 {
1515 SHARING_LOGD("trace.");
1516 return dataBitRef_;
1517 }
1518
SetReadRef(uint32_t bitref)1519 void BufferDispatcher::SetReadRef(uint32_t bitref)
1520 {
1521 SHARING_LOGD("trace.");
1522 recvBitRef_ &= bitref;
1523 }
1524
GetReadRef()1525 uint32_t BufferDispatcher::GetReadRef()
1526 {
1527 SHARING_LOGD("trace.");
1528 return recvBitRef_;
1529 }
1530
ActiveDataRef(MediaType type,bool keyFrame)1531 void BufferDispatcher::ActiveDataRef(MediaType type, bool keyFrame)
1532 {
1533 MEDIA_LOGD("trace.");
1534 std::unique_lock<std::mutex> locker(notifyMutex_);
1535 uint32_t bitRef = 0x0000;
1536 for (auto &[recvId, notifier] : notifiers_) {
1537 auto index = notifier->GetReadIndex();
1538 if (type == MEDIA_TYPE_AUDIO) {
1539 bitRef |= RECV_FLAG_BASE << (index * 2); // 2: fix offset, get audio notifyer id
1540 continue;
1541 }
1542 bool keyModeReceiver = false;
1543 keyModeReceiver = notifier->IsKeyModeReceiver();
1544 if (keyFrame && keyModeReceiver && keyIndexList_.empty()) {
1545 notifier->videoIndex = circularBuffer_.size() - 1;
1546 }
1547 if ((!keyModeReceiver || (keyModeReceiver && keyFrame))) {
1548 if (index != INVALID_INDEX) {
1549 bitRef |= RECV_FLAG_BASE << (index * 2 + 1); // 2: fix offset, get video notifyer id
1550 }
1551 }
1552 }
1553
1554 dataBitRef_ |= bitRef;
1555 }
1556
SetReceiverDataRef(uint32_t receiverId,MediaType type,bool ready)1557 void BufferDispatcher::SetReceiverDataRef(uint32_t receiverId, MediaType type, bool ready)
1558 {
1559 MEDIA_LOGD("trace.");
1560 uint32_t index = FindReceiverIndex(receiverId);
1561 if (index == INVALID_INDEX) {
1562 return;
1563 }
1564
1565 if (type == MEDIA_TYPE_AUDIO) {
1566 uint32_t audioBit = index * 2;
1567 uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1568 MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1569 if (!ready) {
1570 bitRef = ~bitRef;
1571 dataBitRef_ &= bitRef;
1572 } else {
1573 dataBitRef_ |= bitRef;
1574 }
1575 } else if (type == MEDIA_TYPE_VIDEO) {
1576 uint32_t videoBit = index * 2 + 1;
1577 uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1578 MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1579 if (!ready) {
1580 bitRef = ~bitRef;
1581 dataBitRef_ &= bitRef;
1582 } else {
1583 dataBitRef_ |= bitRef;
1584 }
1585 }
1586 }
1587
SetReceiverReadRef(uint32_t receiverId,MediaType type,bool ready)1588 void BufferDispatcher::SetReceiverReadRef(uint32_t receiverId, MediaType type, bool ready)
1589 {
1590 MEDIA_LOGD("trace.");
1591 uint32_t index = FindReceiverIndex(receiverId);
1592 if (index == INVALID_INDEX) {
1593 return;
1594 }
1595
1596 if (type == MEDIA_TYPE_AUDIO) {
1597 uint32_t audioBit = index * 2;
1598 uint32_t bitRef = RECV_FLAG_BASE << audioBit;
1599 MEDIA_LOGD("Audio recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1600
1601 if (!ready) {
1602 bitRef = ~bitRef;
1603 recvBitRef_ &= bitRef;
1604 } else {
1605 recvBitRef_ |= bitRef;
1606 }
1607 } else if (type == MEDIA_TYPE_VIDEO) {
1608 uint32_t videoBit = index * 2 + 1;
1609 uint32_t bitRef = RECV_FLAG_BASE << videoBit;
1610 MEDIA_LOGD("Video recvid %{public}d ,bitRef %{public}d ready %{public}d.", receiverId, bitRef, ready);
1611
1612 if (!ready) {
1613 bitRef = ~bitRef;
1614 recvBitRef_ &= bitRef;
1615 } else {
1616 recvBitRef_ |= bitRef;
1617 }
1618 }
1619 }
1620
GetReceiverDataRef(uint32_t receiverId)1621 uint32_t BufferDispatcher::GetReceiverDataRef(uint32_t receiverId)
1622 {
1623 SHARING_LOGD("trace.");
1624 return 0;
1625 }
1626
GetReceiverReadRef(uint32_t receiverId)1627 uint32_t BufferDispatcher::GetReceiverReadRef(uint32_t receiverId)
1628 {
1629 SHARING_LOGD("trace.");
1630 uint32_t retBitRef = GetReceiverIndexRef(receiverId);
1631 retBitRef &= recvBitRef_;
1632 return retBitRef;
1633 }
1634
GetReceiverIndexRef(uint32_t receiverId)1635 uint32_t BufferDispatcher::GetReceiverIndexRef(uint32_t receiverId)
1636 {
1637 SHARING_LOGD("trace.");
1638 uint32_t index = FindReceiverIndex(receiverId);
1639 uint32_t audioBit = index * 2;
1640 uint32_t videoBit = index * 2 + 1;
1641 uint32_t retBitRef = 0x0000;
1642 retBitRef |= RECV_FLAG_BASE << audioBit;
1643 retBitRef |= RECV_FLAG_BASE << videoBit;
1644 return retBitRef;
1645 }
1646
ClearReadBit(uint32_t receiverId,MediaType type)1647 void BufferDispatcher::ClearReadBit(uint32_t receiverId, MediaType type)
1648 {
1649 MEDIA_LOGD("trace.");
1650 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1651 if (type != MEDIA_TYPE_AV) {
1652 SetReceiverReadRef(receiverId, type, false);
1653 } else {
1654 SetReceiverReadRef(receiverId, MEDIA_TYPE_VIDEO, false);
1655 SetReceiverReadRef(receiverId, MEDIA_TYPE_AUDIO, false);
1656 }
1657 }
1658
ClearDataBit(uint32_t receiverId,MediaType type)1659 void BufferDispatcher::ClearDataBit(uint32_t receiverId, MediaType type)
1660 {
1661 MEDIA_LOGD("trace.");
1662 std::lock_guard<std::mutex> indexLocker(notifyMutex_);
1663 if (type != MEDIA_TYPE_AV) {
1664 SetReceiverDataRef(receiverId, type, false);
1665 } else {
1666 SetReceiverDataRef(receiverId, MEDIA_TYPE_VIDEO, false);
1667 SetReceiverDataRef(receiverId, MEDIA_TYPE_AUDIO, false);
1668 }
1669 }
1670
IsRead(uint32_t receiverId,uint32_t index)1671 bool BufferDispatcher::IsRead(uint32_t receiverId, uint32_t index)
1672 {
1673 MEDIA_LOGD("trace.");
1674 if (index >= circularBuffer_.size()) {
1675 return true;
1676 } else {
1677 return IsDataReaded(receiverId, circularBuffer_.at(index));
1678 }
1679 }
1680
ActivateReceiverIndex(uint32_t index,MediaType type)1681 void BufferDispatcher::ActivateReceiverIndex(uint32_t index, MediaType type)
1682 {
1683 MEDIA_LOGD("trace.");
1684 std::unique_lock<std::mutex> lock(notifyMutex_);
1685 for (auto &[recvId, notifier] : notifiers_) {
1686 if (type == MEDIA_TYPE_VIDEO) {
1687 if (notifier->videoIndex == INVALID_INDEX) {
1688 notifier->videoIndex = index;
1689 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->videoIndex);
1690 videoNeedActivate_ = false;
1691 }
1692 } else {
1693 if (notifier->audioIndex == INVALID_INDEX) {
1694 notifier->audioIndex = index;
1695 SHARING_LOGD("RecvId %{public}d Activate %{public}d.", notifier->GetReceiverId(), notifier->audioIndex);
1696 audioNeedActivate_ = false;
1697 }
1698 }
1699 }
1700 }
UnlockWaitingReceiverIndex(MediaType type)1701 void BufferDispatcher::UnlockWaitingReceiverIndex(MediaType type)
1702 {
1703 SHARING_LOGD("trace.");
1704 }
1705
SetReceiverReadFlag(uint32_t receiverId,DataSpec::Ptr & dataSpec)1706 void BufferDispatcher::SetReceiverReadFlag(uint32_t receiverId, DataSpec::Ptr &dataSpec)
1707 {
1708 MEDIA_LOGD("trace.");
1709 uint32_t index = FindReceiverIndex(receiverId);
1710 if (index != INVALID_INDEX) {
1711 dataSpec->reserveFlag |= RECV_FLAG_BASE << index;
1712 MEDIA_LOGW("mediaType: %{public}d, pts: %{public}" PRIu64
1713 ", reserveFlag: %{public}x, receiverId: %{public}d, index: %{public}d.",
1714 dataSpec->mediaData->mediaType, dataSpec->mediaData->pts, dataSpec->reserveFlag.load(), receiverId,
1715 index);
1716 }
1717 }
1718
CancelReserve()1719 void BufferDispatcher::CancelReserve()
1720 {
1721 SHARING_LOGD("trace.");
1722 for (auto &data : circularBuffer_) {
1723 data->reserveFlag = 0xff;
1724 }
1725 }
1726
SetSpsNalu(MediaData::Ptr spsbuf)1727 void BufferDispatcher::SetSpsNalu(MediaData::Ptr spsbuf)
1728 {
1729 SHARING_LOGD("trace.");
1730 spsBuf_ = spsbuf;
1731 }
1732
GetSPS()1733 const MediaData::Ptr BufferDispatcher::GetSPS()
1734 {
1735 MEDIA_LOGD("trace.");
1736 return spsBuf_;
1737 }
1738
SetPpsNalu(MediaData::Ptr ppsbuf)1739 void BufferDispatcher::SetPpsNalu(MediaData::Ptr ppsbuf)
1740 {
1741 SHARING_LOGD("trace.");
1742 ppsBuf_ = ppsbuf;
1743 }
1744
GetPPS()1745 const MediaData::Ptr BufferDispatcher::GetPPS()
1746 {
1747 MEDIA_LOGD("trace.");
1748 return ppsBuf_;
1749 }
1750
GetCurrentGop()1751 uint32_t BufferDispatcher::GetCurrentGop()
1752 {
1753 SHARING_LOGD("trace.");
1754 return gop_;
1755 }
1756
OnKeyRedirect()1757 void BufferDispatcher::OnKeyRedirect()
1758 {
1759 SHARING_LOGD("trace.");
1760 std::unique_lock<std::mutex> lock(notifyMutex_);
1761 if (keyIndexList_.empty()) {
1762 return;
1763 }
1764
1765 auto nextIndex = keyIndexList_.back();
1766
1767 for (auto &[recvId, notifier] : notifiers_) {
1768 if (notifier->IsKeyRedirectReceiver()) {
1769 SHARING_LOGD("receiverId: %{public}u, videoIndex: %{public}d, nextIndex: %{public}d.",
1770 notifier->GetReceiverId(), notifier->videoIndex, nextIndex);
1771 auto curIndex = notifier->videoIndex;
1772 notifier->videoIndex = nextIndex;
1773 for (auto i = curIndex; i < nextIndex; i++) {
1774 SetReceiverReadFlag(notifier->GetReceiverId(), circularBuffer_[i]);
1775 }
1776 if (!rapidMode_) {
1777 auto receiver = notifier->GetBufferReceiver();
1778 if (receiver != nullptr) {
1779 receiver->EnableKeyRedirect(false);
1780 }
1781 }
1782 }
1783 }
1784 }
1785
EnableRapidMode(bool enable)1786 void BufferDispatcher::EnableRapidMode(bool enable)
1787 {
1788 SHARING_LOGD("trace.");
1789 rapidMode_ = enable;
1790 }
1791
1792 } // namespace Sharing
1793 } // namespace OHOS
1794