1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "ringbuffer.h"
17
18 #include <memory>
19 #include <sys/uio.h>
20 #include <securec.h>
21 #include <cstring>
22 #include <iostream>
RingBuffer(const std::size_t bufSize,const enum MemAlignShift shift)23 RingBuffer::RingBuffer(const std::size_t bufSize, const enum MemAlignShift shift)
24 : bufSize_ {bufSize},
25 alignShift_ {shift}
26 {
27 if (bufSize_ <= DEFAULT_SIZE) {
28 bufSize_ = DEFAULT_SIZE;
29 }
30 switch (shift) {
31 case B_ALIGN_SHIFT: {
32 bufSize_ = (bufSize_ >> B_ALIGN_SHIFT);
33 uint16_t *temp = new(std::nothrow) uint16_t[bufSize_];
34 if (temp != nullptr) {
35 buffer_ = reinterpret_cast<char *>(temp);
36 }
37 bufSize_ = (bufSize_ << B_ALIGN_SHIFT);
38 break;
39 }
40 case H_ALIGN_SHIFT: {
41 bufSize_ = (bufSize_ >> H_ALIGN_SHIFT);
42 uint16_t *temp = new(std::nothrow) uint16_t[bufSize_];
43 if (temp != nullptr) {
44 buffer_ = reinterpret_cast<char *>(temp);
45 }
46 bufSize_ = (bufSize_ << H_ALIGN_SHIFT);
47 break;
48 }
49 case W_ALIGN_SHIFT: {
50 bufSize_ = (bufSize_ >> W_ALIGN_SHIFT);
51 uint32_t *temp = new(std::nothrow) uint32_t[bufSize_];
52 if (temp != nullptr) {
53 buffer_ = reinterpret_cast<char *>(temp);
54 }
55 bufSize_ = (bufSize_ << W_ALIGN_SHIFT);
56 break;
57 }
58 case D_ALIGN_SHIFT: {
59 bufSize_ = (bufSize_ >> D_ALIGN_SHIFT);
60 uint64_t *temp = new(std::nothrow) uint64_t[bufSize_];
61 if (temp != nullptr) {
62 buffer_ = reinterpret_cast<char *>(temp);
63 }
64 bufSize_ = (bufSize_ << D_ALIGN_SHIFT);
65 break;
66 }
67 }
68 }
69
Read(const int fd,const std::size_t len)70 ssize_t RingBuffer::Read(const int fd, const std::size_t len)
71 {
72 if (fd < 0) {
73 return -1;
74 }
75 if (len == 0) {
76 return 0;
77 }
78 constexpr std::size_t numDests {2};
79 struct iovec destBufs[numDests];
80 // resize if free space is not big enough
81 std::lock_guard<std::mutex> lk {mtx_};
82 while (len >= FreeSize()) {
83 // the equal sign makes sure the buffer will not be fully filled
84 if (Resize() != 0) {
85 return -1;
86 }
87 }
88 if (buffer_ == nullptr) {
89 return -1;
90 }
91 // now we have enough free space to read in from fd
92 destBufs[0].iov_base = buffer_ + tail_;
93 if (tail_ + len < bufSize_) {
94 // continuous free space
95 destBufs[0].iov_len = len;
96 destBufs[1].iov_base = buffer_ + tail_ + len;
97 destBufs[1].iov_len = 0;
98 } else {
99 // free space splitted
100 destBufs[0].iov_len = bufSize_ - tail_;
101 destBufs[1].iov_base = buffer_;
102 destBufs[1].iov_len = len + tail_ - bufSize_;
103 }
104 ssize_t ret = readv(fd, destBufs, numDests);
105 if (ret != -1) {
106 // update buffer status
107 tail_ += static_cast<std::size_t>(ret);
108 while (tail_ >= bufSize_) {
109 tail_ -= bufSize_;
110 }
111 }
112 return ret;
113 }
114
Write(const int fd,std::size_t len)115 ssize_t RingBuffer::Write(const int fd, std::size_t len)
116 {
117 if (fd < 0) {
118 return -1;
119 }
120 constexpr std::size_t numSrcs {2};
121 struct iovec srcBufs[numSrcs];
122 std::lock_guard<std::mutex> lk {mtx_};
123 std::size_t dataSize = DataSize();
124 if (dataSize < len) {
125 len = dataSize;
126 }
127 if (len == 0) {
128 return 0;
129 }
130 if (buffer_ == nullptr) {
131 return -1;
132 }
133 // now we are sure there is at least 'len' bytes data in the buffer
134 srcBufs[0].iov_base = buffer_ + head_;
135 if (head_ + len > bufSize_) {
136 // data splitted
137 srcBufs[0].iov_len = bufSize_ - head_;
138 srcBufs[1].iov_base = buffer_;
139 srcBufs[1].iov_len = len + head_- bufSize_;
140 } else {
141 // continuous data
142 srcBufs[0].iov_len = len;
143 srcBufs[1].iov_base = buffer_ + head_ + len;
144 srcBufs[1].iov_len = 0;
145 }
146 ssize_t ret = writev(fd, srcBufs, numSrcs);
147 if (ret != -1) {
148 // update buffer status
149 head_ += static_cast<std::size_t>(ret);
150 while (head_ >= bufSize_) {
151 head_ -= bufSize_;
152 }
153 }
154 return ret;
155 }
156
Get(char * dest,const std::size_t len)157 std::size_t RingBuffer::Get(char* dest, const std::size_t len)
158 {
159 if (dest == nullptr) {
160 return 0;
161 }
162 if (len == 0) {
163 return 0;
164 }
165 std::lock_guard<std::mutex> lk {mtx_};
166 auto dataSize = DataSize();
167 if (len > dataSize) {
168 return 0;
169 }
170 if (buffer_ == nullptr) {
171 return -1;
172 }
173 if (head_ + len > bufSize_) {
174 // data splitted
175 if (memcpy_s(dest, len, buffer_ + head_, bufSize_ - head_) != EOK) {
176 return 0;
177 }
178 if (memcpy_s(dest + bufSize_ - head_, len + head_ - bufSize_, buffer_, len + head_ - bufSize_) != EOK) {
179 return 0;
180 }
181 } else {
182 if (memcpy_s(dest, len, buffer_ + head_, len) != EOK) {
183 return 0;
184 }
185 }
186 // update buffer status
187 head_ += len;
188 while (head_ >= bufSize_) {
189 head_ -= bufSize_;
190 }
191 return len;
192 }
193
Put(const char * str,const std::size_t len)194 int RingBuffer::Put(const char* str, const std::size_t len)
195 {
196 if (str == nullptr) {
197 return -1;
198 }
199 if (len == 0) {
200 return 0;
201 }
202 // resize if free space is not big enough
203 std::lock_guard<std::mutex> lk {mtx_};
204 while (len >= FreeSize()) {
205 // the equal sign makes sure the buffer will not be fully filled
206 if (Resize() != 0) {
207 return -1;
208 }
209 }
210 if (buffer_ == nullptr) {
211 return -1;
212 }
213 if (tail_ + len < bufSize_) {
214 // continuous free space
215 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, len) != EOK) {
216 return -1;
217 }
218 } else {
219 // splitted free space
220 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, bufSize_ - tail_) != EOK) {
221 return -1;
222 }
223 if (memcpy_s(buffer_, bufSize_, str + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
224 return -1;
225 }
226 }
227 // update buffer status
228 tail_ += len;
229 while (tail_ >= bufSize_) {
230 tail_ -= bufSize_;
231 }
232 return len;
233 }
234
Put(const std::string & str)235 int RingBuffer::Put(const std::string& str)
236 {
237 if (str.empty()) {
238 return -1;
239 }
240 std::size_t len = str.length();
241 if (len == 0) {
242 return 0;
243 }
244 // resize if free space is not big enough
245 std::lock_guard<std::mutex> lk {mtx_};
246 while (len >= FreeSize()) {
247 // the equal sign makes sure the buffer will not be fully filled
248 if (Resize() != 0) {
249 return -1;
250 }
251 }
252 if (buffer_ == nullptr) {
253 return -1;
254 }
255 if (tail_ + len < bufSize_) {
256 // continuous free space
257 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), len) != EOK) {
258 return -1;
259 }
260 } else {
261 // splitted free space
262 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), bufSize_ - tail_) != EOK) {
263 return -1;
264 }
265 if (memcpy_s(buffer_, bufSize_, str.c_str() + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
266 return -1;
267 }
268 }
269 // update buffer status
270 tail_ += len;
271 while (tail_ >= bufSize_) {
272 tail_ -= bufSize_;
273 }
274 return len;
275 }
276
Allocate(std::size_t bufSize)277 char* RingBuffer::Allocate(std::size_t bufSize)
278 {
279 char *newBuffer {nullptr};
280 switch (alignShift_) {
281 case B_ALIGN_SHIFT: {
282 bufSize = (bufSize >> B_ALIGN_SHIFT);
283 newBuffer = new(std::nothrow) char[bufSize];
284 break;
285 }
286 case H_ALIGN_SHIFT: {
287 bufSize = (bufSize >> H_ALIGN_SHIFT);
288 uint16_t *temp = new(std::nothrow) uint16_t[bufSize];
289 if (temp != nullptr) {
290 newBuffer = reinterpret_cast<char *>(temp);
291 }
292 break;
293 }
294 case W_ALIGN_SHIFT: {
295 bufSize = (bufSize >> W_ALIGN_SHIFT);
296 uint32_t *temp = new(std::nothrow) uint32_t[bufSize];
297 if (temp != nullptr) {
298 newBuffer = reinterpret_cast<char *>(temp);
299 }
300 break;
301 }
302 case D_ALIGN_SHIFT: {
303 bufSize = (bufSize >> D_ALIGN_SHIFT);
304 uint64_t *temp = new(std::nothrow) uint64_t[bufSize];
305 if (temp != nullptr) {
306 newBuffer = reinterpret_cast<char *>(temp);
307 }
308 break;
309 }
310 }
311 return newBuffer;
312 }
313
Resize()314 int RingBuffer::Resize()
315 {
316 std::size_t expandedSize {bufSize_ << 1};
317 char* newBuf = Allocate(expandedSize);
318 if (newBuf == nullptr) {
319 return -1;
320 }
321 if (buffer_ == nullptr) {
322 return -1;
323 }
324 // copy data to the new buffer
325 auto dataSize = DataSize();
326 if (head_ + dataSize > bufSize_) {
327 // data splitted
328 if (memcpy_s(newBuf, expandedSize, buffer_ + head_, bufSize_ - head_) != EOK) {
329 delete[] newBuf;
330 return -1;
331 }
332 if (memcpy_s(newBuf + bufSize_ - head_,
333 expandedSize - (bufSize_ - head_),
334 buffer_,
335 dataSize - (bufSize_ - head_)) != EOK) {
336 delete[] newBuf;
337 return -1;
338 }
339 } else {
340 // continuous data
341 if (memcpy_s(newBuf, expandedSize, buffer_ + head_, dataSize) != EOK) {
342 delete[] newBuf;
343 return -1;
344 }
345 }
346 // update buffer status
347 delete[] buffer_;
348 buffer_ = newBuf;
349 bufSize_ = expandedSize;
350 head_ = 0;
351 tail_ = dataSize;
352
353 return 0;
354 }