• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   * Copyright (c) 2022 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #include "ringbuffer.h"
17  
18  #include <memory>
19  #include <sys/uio.h>
20  #include <securec.h>
21  #include <cstring>
22  #include <iostream>
RingBuffer(const std::size_t bufSize,const enum MemAlignShift shift)23  RingBuffer::RingBuffer(const std::size_t bufSize, const enum MemAlignShift shift)
24      : bufSize_ {bufSize},
25        alignShift_ {shift}
26  {
27      if (bufSize_ <= DEFAULT_SIZE) {
28          bufSize_ = DEFAULT_SIZE;
29      }
30      switch (shift) {
31          case B_ALIGN_SHIFT: {
32              bufSize_ = (bufSize_ >> B_ALIGN_SHIFT);
33              buffer_ = new(std::nothrow) char[bufSize_];
34              bufSize_ = (bufSize_ << B_ALIGN_SHIFT);
35              break;
36          }
37          case H_ALIGN_SHIFT: {
38              bufSize_ = (bufSize_ >> H_ALIGN_SHIFT);
39              uint16_t *temp = new(std::nothrow) uint16_t[bufSize_];
40              buffer_ = reinterpret_cast<char *>(temp);
41              bufSize_ = (bufSize_ << H_ALIGN_SHIFT);
42              break;
43          }
44          case W_ALIGN_SHIFT: {
45              bufSize_ = (bufSize_ >> W_ALIGN_SHIFT);
46              uint32_t *temp = new(std::nothrow) uint32_t[bufSize_];
47              buffer_ = reinterpret_cast<char *>(temp);
48              bufSize_ = (bufSize_ << W_ALIGN_SHIFT);
49              break;
50          }
51          case D_ALIGN_SHIFT: {
52              bufSize_ = (bufSize_ >> D_ALIGN_SHIFT);
53              uint64_t *temp = new(std::nothrow) uint64_t[bufSize_];
54              buffer_ = reinterpret_cast<char *>(temp);
55              bufSize_ = (bufSize_ << D_ALIGN_SHIFT);
56              break;
57          }
58      }
59  }
60  
Read(const int fd,const std::size_t len)61  ssize_t RingBuffer::Read(const int fd, const std::size_t len)
62  {
63      if (fd < 0) {
64          return -1;
65      }
66      if (len == 0) {
67          return 0;
68      }
69      constexpr std::size_t numDests {2};
70      struct iovec destBufs[numDests];
71      // resize if free space is not big enough
72      std::lock_guard<std::mutex> lk {mtx_};
73      while (len >= FreeSize()) {
74          // the equal sign makes sure the buffer will not be fully filled
75          if (Resize() != 0) {
76              return -1;
77          }
78      }
79      // now we have enough free space to read in from fd
80      destBufs[0].iov_base = buffer_ + tail_;
81      if (tail_ + len < bufSize_) {
82          // continuous free space
83          destBufs[0].iov_len = len;
84          destBufs[1].iov_base = buffer_ + tail_ + len;
85          destBufs[1].iov_len = 0;
86      } else {
87          // free space splitted
88          destBufs[0].iov_len = bufSize_ - tail_;
89          destBufs[1].iov_base = buffer_;
90          destBufs[1].iov_len = len + tail_ - bufSize_;
91      }
92      ssize_t ret = readv(fd, destBufs, numDests);
93      if (ret != -1) {
94          // update buffer status
95          tail_ += static_cast<std::size_t>(ret);
96          while (tail_ >= bufSize_) {
97              tail_ -= bufSize_;
98          }
99      }
100      return ret;
101  }
102  
Write(const int fd,std::size_t len)103  ssize_t RingBuffer::Write(const int fd, std::size_t len)
104  {
105      if (fd < 0) {
106          return -1;
107      }
108      constexpr std::size_t numSrcs {2};
109      struct iovec srcBufs[numSrcs];
110      std::lock_guard<std::mutex> lk {mtx_};
111      std::size_t dataSize = DataSize();
112      if (dataSize < len) {
113          len = dataSize;
114      }
115      if (len == 0) {
116          return 0;
117      }
118      // now we are sure there is at least 'len' bytes data in the buffer
119      srcBufs[0].iov_base = buffer_ + head_;
120      if (head_ + len > bufSize_) {
121          // data splitted
122          srcBufs[0].iov_len = bufSize_ - head_;
123          srcBufs[1].iov_base = buffer_;
124          srcBufs[1].iov_len = len + head_- bufSize_;
125      } else {
126          // continuous data
127          srcBufs[0].iov_len = len;
128          srcBufs[1].iov_base = buffer_ + head_ + len;
129          srcBufs[1].iov_len = 0;
130      }
131      ssize_t ret = writev(fd, srcBufs, numSrcs);
132      if (ret != -1) {
133          // update buffer status
134          head_ += static_cast<std::size_t>(ret);
135          while (head_ >= bufSize_) {
136              head_ -= bufSize_;
137          }
138      }
139      return ret;
140  }
141  
Get(char * dest,const std::size_t len)142  std::size_t RingBuffer::Get(char* dest, const std::size_t len)
143  {
144      if (dest == nullptr) {
145          return 0;
146      }
147      if (len == 0) {
148          return 0;
149      }
150      std::lock_guard<std::mutex> lk {mtx_};
151      auto dataSize = DataSize();
152      if (len > dataSize) {
153          return 0;
154      }
155      if (head_ + len > bufSize_) {
156          // data splitted
157          if (memcpy_s(dest, len, buffer_ + head_, bufSize_ - head_) != EOK) {
158              return 0;
159          }
160          if (memcpy_s(dest + bufSize_ - head_, len + head_ - bufSize_, buffer_, len + head_ - bufSize_) != EOK) {
161              return 0;
162          }
163      } else {
164          if (memcpy_s(dest, len, buffer_ + head_, len) != EOK) {
165              return 0;
166          }
167      }
168      // update buffer status
169      head_ += len;
170      while (head_ >= bufSize_) {
171          head_ -= bufSize_;
172      }
173      return len;
174  }
175  
Put(const char * str,const std::size_t len)176  int RingBuffer::Put(const char* str, const std::size_t len)
177  {
178      if (str == nullptr) {
179          return -1;
180      }
181      if (len == 0) {
182          return 0;
183      }
184      // resize if free space is not big enough
185      std::lock_guard<std::mutex> lk {mtx_};
186      while (len >= FreeSize()) {
187          // the equal sign makes sure the buffer will not be fully filled
188          if (Resize() != 0) {
189              return -1;
190          }
191      }
192      if (tail_ + len < bufSize_) {
193          // continuous free space
194          if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, len) != EOK) {
195              return -1;
196          }
197      } else {
198          // splitted free space
199          if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, bufSize_ - tail_) != EOK) {
200              return -1;
201          }
202          if (memcpy_s(buffer_, bufSize_, str + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
203              return -1;
204          }
205      }
206      // update buffer status
207      tail_ += len;
208      while (tail_ >= bufSize_) {
209          tail_ -= bufSize_;
210      }
211      return len;
212  }
213  
Put(const std::string & str)214  int RingBuffer::Put(const std::string& str)
215  {
216      if (str.empty()) {
217          return -1;
218      }
219      std::size_t len = str.length();
220      if (len == 0) {
221          return 0;
222      }
223      // resize if free space is not big enough
224      std::lock_guard<std::mutex> lk {mtx_};
225      while (len >= FreeSize()) {
226          // the equal sign makes sure the buffer will not be fully filled
227          if (Resize() != 0) {
228              return -1;
229          }
230      }
231      if (tail_ + len < bufSize_) {
232          // continuous free space
233          if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), len) != EOK) {
234              return -1;
235          }
236      } else {
237          // splitted free space
238          if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), bufSize_ - tail_) != EOK) {
239              return -1;
240          }
241          if (memcpy_s(buffer_, bufSize_, str.c_str() + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
242              return -1;
243          }
244      }
245      // update buffer status
246      tail_ += len;
247      while (tail_ >= bufSize_) {
248          tail_ -= bufSize_;
249      }
250      return len;
251  }
252  
Allocate(std::size_t bufSize)253  char* RingBuffer::Allocate(std::size_t bufSize)
254  {
255      char *newBuffer {nullptr};
256      switch (alignShift_) {
257          case B_ALIGN_SHIFT: {
258              bufSize = (bufSize >> B_ALIGN_SHIFT);
259              newBuffer = new(std::nothrow) char[bufSize];
260              break;
261          }
262          case H_ALIGN_SHIFT: {
263              bufSize = (bufSize >> H_ALIGN_SHIFT);
264              uint16_t *temp = new(std::nothrow) uint16_t[bufSize];
265              newBuffer = reinterpret_cast<char *>(temp);
266              break;
267          }
268          case W_ALIGN_SHIFT: {
269              bufSize = (bufSize >> W_ALIGN_SHIFT);
270              uint32_t *temp = new(std::nothrow) uint32_t[bufSize];
271              newBuffer = reinterpret_cast<char *>(temp);
272              break;
273          }
274          case D_ALIGN_SHIFT: {
275              bufSize = (bufSize >> D_ALIGN_SHIFT);
276              uint64_t *temp = new(std::nothrow) uint64_t[bufSize];
277              newBuffer = reinterpret_cast<char *>(temp);
278              break;
279          }
280      }
281      return newBuffer;
282  }
283  
Resize()284  int RingBuffer::Resize()
285  {
286      std::size_t expandedSize {bufSize_ << 1};
287      char* newBuf = Allocate(expandedSize);
288      if (newBuf == nullptr) {
289          return -1;
290      }
291      // copy data to the new buffer
292      auto dataSize = DataSize();
293      if (head_ + dataSize > bufSize_) {
294          // data splitted
295          if (memcpy_s(newBuf, expandedSize, buffer_ + head_, bufSize_ - head_) != EOK) {
296              delete[] newBuf;
297              return -1;
298          }
299          if (memcpy_s(newBuf + bufSize_ - head_,
300                       expandedSize - (bufSize_ - head_),
301                       buffer_,
302                       dataSize - (bufSize_ - head_)) != EOK) {
303              delete[] newBuf;
304              return -1;
305          }
306      } else {
307          // continuous data
308          if (memcpy_s(newBuf, expandedSize, buffer_ + head_, dataSize) != EOK) {
309              delete[] newBuf;
310              return -1;
311          }
312      }
313      // update buffer status
314      delete[] buffer_;
315      buffer_ = newBuf;
316      bufSize_ = expandedSize;
317      head_ = 0;
318      tail_ = dataSize;
319  
320      return 0;
321  }