1 /* 2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef HISTREAMER_RING_BUFFER_H 17 #define HISTREAMER_RING_BUFFER_H 18 19 #include <atomic> 20 #include <memory> 21 #include "cpp_ext/memory_ext.h" 22 #include "common/log.h" 23 #include "osal/task/condition_variable.h" 24 #include "osal/task/mutex.h" 25 #include "osal/task/autolock.h" 26 #include "securec.h" 27 28 namespace OHOS { 29 namespace Media { 30 class RingBuffer { 31 public: RingBuffer(size_t bufferSize)32 explicit RingBuffer(size_t bufferSize) : bufferSize_(bufferSize) 33 { 34 } 35 36 ~RingBuffer() = default; 37 Init()38 bool Init() 39 { 40 buffer_ = CppExt::make_unique<uint8_t[]>(bufferSize_); 41 return buffer_ != nullptr; 42 } 43 44 size_t ReadBuffer(void* ptr, size_t readSize, int waitTimes = 0) 45 { 46 AutoLock lck(writeMutex_); 47 if (!isActive_ || !isReadBlockingAllowed_) { 48 return 0; 49 } 50 MEDIA_LOG_D("ReadBuffer in current tail " PUBLIC_LOG_ZU ", head_ " PUBLIC_LOG_ZU, tail_, head_); 51 auto available = tail_ - head_; 52 while (waitTimes > 0 && available == 0) { 53 MEDIA_LOG_DD("ReadBuffer wait , waitTimes is " PUBLIC_LOG_U64, waitTimes); 54 writeCondition_.Wait(lck); 55 if (!isActive_ || !isReadBlockingAllowed_) { 56 return 0; 57 } 58 available = tail_ - head_; 59 waitTimes--; 60 } 61 available = (available > readSize) ? readSize : available; 62 size_t index = head_ % bufferSize_; 63 if (index + available < bufferSize_) { 64 (void)memcpy_s(ptr, available, buffer_.get() + index, available); 65 } else { 66 (void)memcpy_s(ptr, bufferSize_ - index, buffer_.get() + index, bufferSize_ - index); 67 (void)memcpy_s(((uint8_t*)ptr) + (bufferSize_ - index), available - (bufferSize_ - index), buffer_.get(), 68 available - (bufferSize_ - index)); 69 } 70 head_ += available; 71 mediaOffset_ += available; 72 MEDIA_LOG_DD("ReadBuffer finish available is " PUBLIC_LOG_ZU ", mediaOffset_ " PUBLIC_LOG_U64, available, 73 mediaOffset_); 74 writeCondition_.NotifyAll(); 75 MEDIA_LOG_D("ReadBuffer end current tail " PUBLIC_LOG_ZU ", head_ " PUBLIC_LOG_ZU, tail_, head_); 76 return available; 77 } 78 WriteBuffer(void * ptr,size_t writeSize)79 bool WriteBuffer(void* ptr, size_t writeSize) 80 { 81 AutoLock lck(writeMutex_); 82 if (!isActive_) { 83 return false; 84 } 85 MEDIA_LOG_D("WriteBuffer in current tail " PUBLIC_LOG_ZU ", head_ " PUBLIC_LOG_ZU, tail_, head_); 86 while (writeSize + tail_ > head_ + bufferSize_) { 87 MEDIA_LOG_DD("WriteBuffer wait writeSize is " PUBLIC_LOG_U64, writeSize); 88 writeCondition_.Wait(lck); 89 if (!isActive_) { 90 return false; 91 } 92 } 93 size_t index = tail_ % bufferSize_; 94 if (index + writeSize < bufferSize_) { 95 (void)memcpy_s(buffer_.get() + index, writeSize, ptr, writeSize); 96 } else { 97 (void)memcpy_s(buffer_.get() + index, bufferSize_ - index, ptr, bufferSize_ - index); 98 (void)memcpy_s(buffer_.get(), writeSize - (bufferSize_ - index), ((uint8_t*)ptr) + bufferSize_ - index, 99 writeSize - (bufferSize_ - index)); 100 } 101 tail_ += writeSize; 102 writeCondition_.NotifyAll(); 103 MEDIA_LOG_D("WriteBuffer out current tail " PUBLIC_LOG_ZU ", head_ " PUBLIC_LOG_ZU, tail_, head_); 104 return true; 105 } 106 107 void SetActive(bool active, bool cleanData = true) 108 { 109 AutoLock lck(writeMutex_); 110 isActive_ = active; 111 if (!active) { 112 if (cleanData) { 113 head_ = 0; 114 tail_ = 0; 115 } 116 writeCondition_.NotifyAll(); 117 } 118 } 119 SetReadBlocking(bool isReadBlockingAllowed)120 void SetReadBlocking(bool isReadBlockingAllowed) 121 { 122 { 123 AutoLock lck(writeMutex_); 124 isReadBlockingAllowed_ = isReadBlockingAllowed; 125 } 126 writeCondition_.NotifyAll(); 127 } 128 GetSize()129 size_t GetSize() 130 { 131 return (tail_ - head_); 132 } 133 GetFreeSize()134 size_t GetFreeSize() 135 { 136 return bufferSize_ - GetSize(); 137 } 138 GetHead()139 inline size_t GetHead() 140 { 141 return head_; 142 } 143 GetTail()144 inline size_t GetTail() 145 { 146 return tail_; 147 } 148 GetMediaOffset()149 uint64_t GetMediaOffset() 150 { 151 return mediaOffset_; 152 } 153 SetMediaOffset(uint64_t offset)154 void SetMediaOffset(uint64_t offset) 155 { 156 mediaOffset_ = offset; 157 } 158 Clear()159 void Clear() 160 { 161 AutoLock lck(writeMutex_); 162 head_ = 0; 163 tail_ = 0; 164 writeCondition_.NotifyAll(); 165 } 166 SetTail(size_t newTail)167 void SetTail(size_t newTail) 168 { 169 { 170 AutoLock lck(writeMutex_); 171 MEDIA_LOG_I("SetTail: current tail " PUBLIC_LOG_ZU ", to tail " PUBLIC_LOG_ZU, tail_, newTail); 172 if (newTail >= 0 && newTail >= head_) { 173 tail_ = newTail; 174 } 175 } 176 MEDIA_LOG_I("SetTail in current tail " PUBLIC_LOG_ZU ", head_ " PUBLIC_LOG_ZU, tail_, head_); 177 writeCondition_.NotifyAll(); 178 } 179 Seek(uint64_t offset)180 bool Seek(uint64_t offset) 181 { 182 AutoLock lck(writeMutex_); 183 MEDIA_LOG_I("Seek: buffer size " PUBLIC_LOG_ZU ", offset " PUBLIC_LOG_U64 184 ", mediaOffset_ " PUBLIC_LOG_U64, GetSize(), offset, mediaOffset_); 185 bool result = false; 186 // case1: seek forward success without dropping data already downloaded 187 if (offset >= mediaOffset_ && (offset - mediaOffset_ < GetSize())) { 188 head_ += offset - mediaOffset_; 189 mediaOffset_ = offset; 190 result = true; 191 } else if (offset < mediaOffset_ && 192 (mediaOffset_ - offset <= bufferSize_ - GetSize())) { // case2: seek backward 193 size_t minPosition = tail_ > bufferSize_ ? tail_ - bufferSize_ : 0; 194 size_t maxInterval = head_ - minPosition; 195 size_t interval = static_cast<size_t>(mediaOffset_ - offset); 196 // Seek backward success without dropping data already downloaded 197 if (interval <= maxInterval) { 198 MEDIA_LOG_I("Seek backward success, size:" PUBLIC_LOG_ZU ", head:" PUBLIC_LOG_ZU ", tail:" PUBLIC_LOG_ZU 199 ", minPosition:" PUBLIC_LOG_ZU ", maxInterval:" PUBLIC_LOG_ZU ", interval:" PUBLIC_LOG_ZU 200 ", target offset:" PUBLIC_LOG_U64 ", current offset:" PUBLIC_LOG_U64, 201 GetSize(), head_, tail_, minPosition, maxInterval, interval, offset, mediaOffset_); 202 head_ -= interval; 203 mediaOffset_ = offset; 204 result = true; 205 } 206 } 207 writeCondition_.NotifyAll(); 208 return result; 209 } 210 SetHead(size_t newHead)211 bool SetHead(size_t newHead) 212 { 213 bool result = false; 214 { 215 AutoLock lck(writeMutex_); 216 MEDIA_LOG_I("SetHead: current head " PUBLIC_LOG_ZU ", to head " PUBLIC_LOG_ZU, head_, newHead); 217 if (newHead >= head_ && newHead <= tail_) { 218 mediaOffset_ += (newHead - head_); 219 head_ = newHead; 220 result = true; 221 } 222 } 223 MEDIA_LOG_I("SetHead in current tail " PUBLIC_LOG_ZU ", head_ " PUBLIC_LOG_ZU, tail_, head_); 224 writeCondition_.NotifyAll(); 225 return result; 226 } 227 private: 228 static constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "RingBuffer" }; 229 const size_t bufferSize_; 230 std::unique_ptr<uint8_t[]> buffer_; 231 size_t head_ {0}; // head 232 size_t tail_ {0}; // tail 233 Mutex writeMutex_ {}; 234 ConditionVariable writeCondition_ {}; 235 bool isActive_ {true}; 236 uint64_t mediaOffset_ {0}; 237 bool isReadBlockingAllowed_ {true}; 238 }; 239 } // namespace Media 240 } // namespace OHOS 241 242 #endif // HISTREAMER_RING_BUFFER_H