1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "ringbuffer.h" 17 18 #include <memory> 19 #include <sys/uio.h> 20 #include <securec.h> 21 #include <cstring> 22 #include <iostream> RingBuffer(const std::size_t bufSize,const enum MemAlignShift shift)23 RingBuffer::RingBuffer(const std::size_t bufSize, const enum MemAlignShift shift) 24 : bufSize_ {bufSize}, 25 alignShift_ {shift} 26 { 27 if (bufSize_ <= DEFAULT_SIZE) { 28 bufSize_ = DEFAULT_SIZE; 29 } 30 switch (shift) { 31 case B_ALIGN_SHIFT: { 32 bufSize_ = (bufSize_ >> B_ALIGN_SHIFT); 33 buffer_ = new(std::nothrow) char[bufSize_]; 34 bufSize_ = (bufSize_ << B_ALIGN_SHIFT); 35 break; 36 } 37 case H_ALIGN_SHIFT: { 38 bufSize_ = (bufSize_ >> H_ALIGN_SHIFT); 39 uint16_t *temp = new(std::nothrow) uint16_t[bufSize_]; 40 buffer_ = reinterpret_cast<char *>(temp); 41 bufSize_ = (bufSize_ << H_ALIGN_SHIFT); 42 break; 43 } 44 case W_ALIGN_SHIFT: { 45 bufSize_ = (bufSize_ >> W_ALIGN_SHIFT); 46 uint32_t *temp = new(std::nothrow) uint32_t[bufSize_]; 47 buffer_ = reinterpret_cast<char *>(temp); 48 bufSize_ = (bufSize_ << W_ALIGN_SHIFT); 49 break; 50 } 51 case D_ALIGN_SHIFT: { 52 bufSize_ = (bufSize_ >> D_ALIGN_SHIFT); 53 uint64_t *temp = new(std::nothrow) uint64_t[bufSize_]; 54 buffer_ = reinterpret_cast<char *>(temp); 55 bufSize_ = (bufSize_ << D_ALIGN_SHIFT); 56 break; 57 } 58 } 59 } 60 Read(const int fd,const std::size_t len)61 ssize_t RingBuffer::Read(const int fd, const std::size_t len) 62 { 63 if (fd < 0) { 64 return -1; 65 } 66 if (len == 0) { 67 return 0; 68 } 69 constexpr std::size_t numDests {2}; 70 struct iovec destBufs[numDests]; 71 // resize if free space is not big enough 72 std::lock_guard<std::mutex> lk {mtx_}; 73 while (len >= FreeSize()) { 74 // the equal sign makes sure the buffer will not be fully filled 75 if (Resize() != 0) { 76 return -1; 77 } 78 } 79 // now we have enough free space to read in from fd 80 destBufs[0].iov_base = buffer_ + tail_; 81 if (tail_ + len < bufSize_) { 82 // continuous free space 83 destBufs[0].iov_len = len; 84 destBufs[1].iov_base = buffer_ + tail_ + len; 85 destBufs[1].iov_len = 0; 86 } else { 87 // free space splitted 88 destBufs[0].iov_len = bufSize_ - tail_; 89 destBufs[1].iov_base = buffer_; 90 destBufs[1].iov_len = len + tail_ - bufSize_; 91 } 92 ssize_t ret = readv(fd, destBufs, numDests); 93 if (ret != -1) { 94 // update buffer status 95 tail_ += static_cast<std::size_t>(ret); 96 while (tail_ >= bufSize_) { 97 tail_ -= bufSize_; 98 } 99 } 100 return ret; 101 } 102 Write(const int fd,std::size_t len)103 ssize_t RingBuffer::Write(const int fd, std::size_t len) 104 { 105 if (fd < 0) { 106 return -1; 107 } 108 constexpr std::size_t numSrcs {2}; 109 struct iovec srcBufs[numSrcs]; 110 std::lock_guard<std::mutex> lk {mtx_}; 111 std::size_t dataSize = DataSize(); 112 if (dataSize < len) { 113 len = dataSize; 114 } 115 if (len == 0) { 116 return 0; 117 } 118 // now we are sure there is at least 'len' bytes data in the buffer 119 srcBufs[0].iov_base = buffer_ + head_; 120 if (head_ + len > bufSize_) { 121 // data splitted 122 srcBufs[0].iov_len = bufSize_ - head_; 123 srcBufs[1].iov_base = buffer_; 124 srcBufs[1].iov_len = len + head_- bufSize_; 125 } else { 126 // continuous data 127 srcBufs[0].iov_len = len; 128 srcBufs[1].iov_base = buffer_ + head_ + len; 129 srcBufs[1].iov_len = 0; 130 } 131 ssize_t ret = writev(fd, srcBufs, numSrcs); 132 if (ret != -1) { 133 // update buffer status 134 head_ += static_cast<std::size_t>(ret); 135 while (head_ >= bufSize_) { 136 head_ -= bufSize_; 137 } 138 } 139 return ret; 140 } 141 Get(char * dest,const std::size_t len)142 std::size_t RingBuffer::Get(char* dest, const std::size_t len) 143 { 144 if (dest == nullptr) { 145 return 0; 146 } 147 if (len == 0) { 148 return 0; 149 } 150 std::lock_guard<std::mutex> lk {mtx_}; 151 auto dataSize = DataSize(); 152 if (len > dataSize) { 153 return 0; 154 } 155 if (head_ + len > bufSize_) { 156 // data splitted 157 if (memcpy_s(dest, len, buffer_ + head_, bufSize_ - head_) != EOK) { 158 return 0; 159 } 160 if (memcpy_s(dest + bufSize_ - head_, len + head_ - bufSize_, buffer_, len + head_ - bufSize_) != EOK) { 161 return 0; 162 } 163 } else { 164 if (memcpy_s(dest, len, buffer_ + head_, len) != EOK) { 165 return 0; 166 } 167 } 168 // update buffer status 169 head_ += len; 170 while (head_ >= bufSize_) { 171 head_ -= bufSize_; 172 } 173 return len; 174 } 175 Put(const char * str,const std::size_t len)176 int RingBuffer::Put(const char* str, const std::size_t len) 177 { 178 if (str == nullptr) { 179 return -1; 180 } 181 if (len == 0) { 182 return 0; 183 } 184 // resize if free space is not big enough 185 std::lock_guard<std::mutex> lk {mtx_}; 186 while (len >= FreeSize()) { 187 // the equal sign makes sure the buffer will not be fully filled 188 if (Resize() != 0) { 189 return -1; 190 } 191 } 192 if (tail_ + len < bufSize_) { 193 // continuous free space 194 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, len) != EOK) { 195 return -1; 196 } 197 } else { 198 // splitted free space 199 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, bufSize_ - tail_) != EOK) { 200 return -1; 201 } 202 if (memcpy_s(buffer_, bufSize_, str + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) { 203 return -1; 204 } 205 } 206 // update buffer status 207 tail_ += len; 208 while (tail_ >= bufSize_) { 209 tail_ -= bufSize_; 210 } 211 return len; 212 } 213 Put(const std::string & str)214 int RingBuffer::Put(const std::string& str) 215 { 216 if (str.empty()) { 217 return -1; 218 } 219 std::size_t len = str.length(); 220 if (len == 0) { 221 return 0; 222 } 223 // resize if free space is not big enough 224 std::lock_guard<std::mutex> lk {mtx_}; 225 while (len >= FreeSize()) { 226 // the equal sign makes sure the buffer will not be fully filled 227 if (Resize() != 0) { 228 return -1; 229 } 230 } 231 if (tail_ + len < bufSize_) { 232 // continuous free space 233 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), len) != EOK) { 234 return -1; 235 } 236 } else { 237 // splitted free space 238 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), bufSize_ - tail_) != EOK) { 239 return -1; 240 } 241 if (memcpy_s(buffer_, bufSize_, str.c_str() + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) { 242 return -1; 243 } 244 } 245 // update buffer status 246 tail_ += len; 247 while (tail_ >= bufSize_) { 248 tail_ -= bufSize_; 249 } 250 return len; 251 } 252 Allocate(std::size_t bufSize)253 char* RingBuffer::Allocate(std::size_t bufSize) 254 { 255 char *newBuffer {nullptr}; 256 switch (alignShift_) { 257 case B_ALIGN_SHIFT: { 258 bufSize = (bufSize >> B_ALIGN_SHIFT); 259 newBuffer = new(std::nothrow) char[bufSize]; 260 break; 261 } 262 case H_ALIGN_SHIFT: { 263 bufSize = (bufSize >> H_ALIGN_SHIFT); 264 uint16_t *temp = new(std::nothrow) uint16_t[bufSize]; 265 newBuffer = reinterpret_cast<char *>(temp); 266 break; 267 } 268 case W_ALIGN_SHIFT: { 269 bufSize = (bufSize >> W_ALIGN_SHIFT); 270 uint32_t *temp = new(std::nothrow) uint32_t[bufSize]; 271 newBuffer = reinterpret_cast<char *>(temp); 272 break; 273 } 274 case D_ALIGN_SHIFT: { 275 bufSize = (bufSize >> D_ALIGN_SHIFT); 276 uint64_t *temp = new(std::nothrow) uint64_t[bufSize]; 277 newBuffer = reinterpret_cast<char *>(temp); 278 break; 279 } 280 } 281 return newBuffer; 282 } 283 Resize()284 int RingBuffer::Resize() 285 { 286 std::size_t expandedSize {bufSize_ << 1}; 287 char* newBuf = Allocate(expandedSize); 288 if (newBuf == nullptr) { 289 return -1; 290 } 291 // copy data to the new buffer 292 auto dataSize = DataSize(); 293 if (head_ + dataSize > bufSize_) { 294 // data splitted 295 if (memcpy_s(newBuf, expandedSize, buffer_ + head_, bufSize_ - head_) != EOK) { 296 delete[] newBuf; 297 return -1; 298 } 299 if (memcpy_s(newBuf + bufSize_ - head_, 300 expandedSize - (bufSize_ - head_), 301 buffer_, 302 dataSize - (bufSize_ - head_)) != EOK) { 303 delete[] newBuf; 304 return -1; 305 } 306 } else { 307 // continuous data 308 if (memcpy_s(newBuf, expandedSize, buffer_ + head_, dataSize) != EOK) { 309 delete[] newBuf; 310 return -1; 311 } 312 } 313 // update buffer status 314 delete[] buffer_; 315 buffer_ = newBuf; 316 bufSize_ = expandedSize; 317 head_ = 0; 318 tail_ = dataSize; 319 320 return 0; 321 }