1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "ringbuffer.h"
17
18 #include <memory>
19 #include <sys/uio.h>
20 #include <securec.h>
21 #include <cstring>
22 #include <iostream>
RingBuffer(const std::size_t bufSize,const enum MemAlignShift shift)23 RingBuffer::RingBuffer(const std::size_t bufSize, const enum MemAlignShift shift)
24 : bufSize_ {bufSize},
25 alignShift_ {shift}
26 {
27 if (bufSize_ <= DEFAULT_SIZE) {
28 bufSize_ = DEFAULT_SIZE;
29 }
30 switch (shift) {
31 case B_ALIGN_SHIFT: {
32 bufSize_ = (bufSize_ >> B_ALIGN_SHIFT);
33 buffer_ = new(std::nothrow) char[bufSize_];
34 bufSize_ = (bufSize_ << B_ALIGN_SHIFT);
35 break;
36 }
37 case H_ALIGN_SHIFT: {
38 bufSize_ = (bufSize_ >> H_ALIGN_SHIFT);
39 uint16_t *temp = new(std::nothrow) uint16_t[bufSize_];
40 buffer_ = reinterpret_cast<char *>(temp);
41 bufSize_ = (bufSize_ << H_ALIGN_SHIFT);
42 break;
43 }
44 case W_ALIGN_SHIFT: {
45 bufSize_ = (bufSize_ >> W_ALIGN_SHIFT);
46 uint32_t *temp = new(std::nothrow) uint32_t[bufSize_];
47 buffer_ = reinterpret_cast<char *>(temp);
48 bufSize_ = (bufSize_ << W_ALIGN_SHIFT);
49 break;
50 }
51 case D_ALIGN_SHIFT: {
52 bufSize_ = (bufSize_ >> D_ALIGN_SHIFT);
53 uint64_t *temp = new(std::nothrow) uint64_t[bufSize_];
54 buffer_ = reinterpret_cast<char *>(temp);
55 bufSize_ = (bufSize_ << D_ALIGN_SHIFT);
56 break;
57 }
58 }
59 }
60
Read(const int fd,const std::size_t len)61 ssize_t RingBuffer::Read(const int fd, const std::size_t len)
62 {
63 if (fd < 0) {
64 return -1;
65 }
66 if (len == 0) {
67 return 0;
68 }
69 constexpr std::size_t numDests {2};
70 struct iovec destBufs[numDests];
71 // resize if free space is not big enough
72 std::lock_guard<std::mutex> lk {mtx_};
73 while (len >= FreeSize()) {
74 // the equal sign makes sure the buffer will not be fully filled
75 if (Resize() != 0) {
76 return -1;
77 }
78 }
79 // now we have enough free space to read in from fd
80 destBufs[0].iov_base = buffer_ + tail_;
81 if (tail_ + len < bufSize_) {
82 // continuous free space
83 destBufs[0].iov_len = len;
84 destBufs[1].iov_base = buffer_ + tail_ + len;
85 destBufs[1].iov_len = 0;
86 } else {
87 // free space splitted
88 destBufs[0].iov_len = bufSize_ - tail_;
89 destBufs[1].iov_base = buffer_;
90 destBufs[1].iov_len = len + tail_ - bufSize_;
91 }
92 ssize_t ret = readv(fd, destBufs, numDests);
93 if (ret != -1) {
94 // update buffer status
95 tail_ += static_cast<std::size_t>(ret);
96 while (tail_ >= bufSize_) {
97 tail_ -= bufSize_;
98 }
99 }
100 return ret;
101 }
102
Write(const int fd,std::size_t len)103 ssize_t RingBuffer::Write(const int fd, std::size_t len)
104 {
105 if (fd < 0) {
106 return -1;
107 }
108 constexpr std::size_t numSrcs {2};
109 struct iovec srcBufs[numSrcs];
110 std::lock_guard<std::mutex> lk {mtx_};
111 std::size_t dataSize = DataSize();
112 if (dataSize < len) {
113 len = dataSize;
114 }
115 if (len == 0) {
116 return 0;
117 }
118 // now we are sure there is at least 'len' bytes data in the buffer
119 srcBufs[0].iov_base = buffer_ + head_;
120 if (head_ + len > bufSize_) {
121 // data splitted
122 srcBufs[0].iov_len = bufSize_ - head_;
123 srcBufs[1].iov_base = buffer_;
124 srcBufs[1].iov_len = len + head_- bufSize_;
125 } else {
126 // continuous data
127 srcBufs[0].iov_len = len;
128 srcBufs[1].iov_base = buffer_ + head_ + len;
129 srcBufs[1].iov_len = 0;
130 }
131 ssize_t ret = writev(fd, srcBufs, numSrcs);
132 if (ret != -1) {
133 // update buffer status
134 head_ += static_cast<std::size_t>(ret);
135 while (head_ >= bufSize_) {
136 head_ -= bufSize_;
137 }
138 }
139 return ret;
140 }
141
Get(char * dest,const std::size_t len)142 std::size_t RingBuffer::Get(char* dest, const std::size_t len)
143 {
144 if (dest == nullptr) {
145 return 0;
146 }
147 if (len == 0) {
148 return 0;
149 }
150 std::lock_guard<std::mutex> lk {mtx_};
151 auto dataSize = DataSize();
152 if (len > dataSize) {
153 return 0;
154 }
155 if (head_ + len > bufSize_) {
156 // data splitted
157 if (memcpy_s(dest, len, buffer_ + head_, bufSize_ - head_) != EOK) {
158 return 0;
159 }
160 if (memcpy_s(dest + bufSize_ - head_, len + head_ - bufSize_, buffer_, len + head_ - bufSize_) != EOK) {
161 return 0;
162 }
163 } else {
164 if (memcpy_s(dest, len, buffer_ + head_, len) != EOK) {
165 return 0;
166 }
167 }
168 // update buffer status
169 head_ += len;
170 while (head_ >= bufSize_) {
171 head_ -= bufSize_;
172 }
173 return len;
174 }
175
Put(const char * str,const std::size_t len)176 int RingBuffer::Put(const char* str, const std::size_t len)
177 {
178 if (str == nullptr) {
179 return -1;
180 }
181 if (len == 0) {
182 return 0;
183 }
184 // resize if free space is not big enough
185 std::lock_guard<std::mutex> lk {mtx_};
186 while (len >= FreeSize()) {
187 // the equal sign makes sure the buffer will not be fully filled
188 if (Resize() != 0) {
189 return -1;
190 }
191 }
192 if (tail_ + len < bufSize_) {
193 // continuous free space
194 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, len) != EOK) {
195 return -1;
196 }
197 } else {
198 // splitted free space
199 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, bufSize_ - tail_) != EOK) {
200 return -1;
201 }
202 if (memcpy_s(buffer_, bufSize_, str + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
203 return -1;
204 }
205 }
206 // update buffer status
207 tail_ += len;
208 while (tail_ >= bufSize_) {
209 tail_ -= bufSize_;
210 }
211 return len;
212 }
213
Put(const std::string & str)214 int RingBuffer::Put(const std::string& str)
215 {
216 if (str.empty()) {
217 return -1;
218 }
219 std::size_t len = str.length();
220 if (len == 0) {
221 return 0;
222 }
223 // resize if free space is not big enough
224 std::lock_guard<std::mutex> lk {mtx_};
225 while (len >= FreeSize()) {
226 // the equal sign makes sure the buffer will not be fully filled
227 if (Resize() != 0) {
228 return -1;
229 }
230 }
231 if (tail_ + len < bufSize_) {
232 // continuous free space
233 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), len) != EOK) {
234 return -1;
235 }
236 } else {
237 // splitted free space
238 if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), bufSize_ - tail_) != EOK) {
239 return -1;
240 }
241 if (memcpy_s(buffer_, bufSize_, str.c_str() + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
242 return -1;
243 }
244 }
245 // update buffer status
246 tail_ += len;
247 while (tail_ >= bufSize_) {
248 tail_ -= bufSize_;
249 }
250 return len;
251 }
252
Allocate(std::size_t bufSize)253 char* RingBuffer::Allocate(std::size_t bufSize)
254 {
255 char *newBuffer {nullptr};
256 switch (alignShift_) {
257 case B_ALIGN_SHIFT: {
258 bufSize = (bufSize >> B_ALIGN_SHIFT);
259 newBuffer = new(std::nothrow) char[bufSize];
260 break;
261 }
262 case H_ALIGN_SHIFT: {
263 bufSize = (bufSize >> H_ALIGN_SHIFT);
264 uint16_t *temp = new(std::nothrow) uint16_t[bufSize];
265 newBuffer = reinterpret_cast<char *>(temp);
266 break;
267 }
268 case W_ALIGN_SHIFT: {
269 bufSize = (bufSize >> W_ALIGN_SHIFT);
270 uint32_t *temp = new(std::nothrow) uint32_t[bufSize];
271 newBuffer = reinterpret_cast<char *>(temp);
272 break;
273 }
274 case D_ALIGN_SHIFT: {
275 bufSize = (bufSize >> D_ALIGN_SHIFT);
276 uint64_t *temp = new(std::nothrow) uint64_t[bufSize];
277 newBuffer = reinterpret_cast<char *>(temp);
278 break;
279 }
280 }
281 return newBuffer;
282 }
283
Resize()284 int RingBuffer::Resize()
285 {
286 std::size_t expandedSize {bufSize_ << 1};
287 char* newBuf = Allocate(expandedSize);
288 if (newBuf == nullptr) {
289 return -1;
290 }
291 // copy data to the new buffer
292 auto dataSize = DataSize();
293 if (head_ + dataSize > bufSize_) {
294 // data splitted
295 if (memcpy_s(newBuf, expandedSize, buffer_ + head_, bufSize_ - head_) != EOK) {
296 delete[] newBuf;
297 return -1;
298 }
299 if (memcpy_s(newBuf + bufSize_ - head_,
300 expandedSize - (bufSize_ - head_),
301 buffer_,
302 dataSize - (bufSize_ - head_)) != EOK) {
303 delete[] newBuf;
304 return -1;
305 }
306 } else {
307 // continuous data
308 if (memcpy_s(newBuf, expandedSize, buffer_ + head_, dataSize) != EOK) {
309 delete[] newBuf;
310 return -1;
311 }
312 }
313 // update buffer status
314 delete[] buffer_;
315 buffer_ = newBuf;
316 bufSize_ = expandedSize;
317 head_ = 0;
318 tail_ = dataSize;
319
320 return 0;
321 }