• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ringbuffer.h"
17 
18 #include <memory>
19 #include <sys/uio.h>
20 #include <securec.h>
21 #include <cstring>
22 #include <iostream>
RingBuffer(const std::size_t bufSize,const enum MemAlignShift shift)23 RingBuffer::RingBuffer(const std::size_t bufSize, const enum MemAlignShift shift)
24     : bufSize_ {bufSize},
25       alignShift_ {shift}
26 {
27     if (bufSize_ <= DEFAULT_SIZE) {
28         bufSize_ = DEFAULT_SIZE;
29     }
30     switch (shift) {
31         case B_ALIGN_SHIFT: {
32             bufSize_ = (bufSize_ >> B_ALIGN_SHIFT);
33             uint16_t *temp = new(std::nothrow) uint16_t[bufSize_];
34             if (temp != nullptr) {
35                 buffer_ = reinterpret_cast<char *>(temp);
36             }
37             bufSize_ = (bufSize_ << B_ALIGN_SHIFT);
38             break;
39         }
40         case H_ALIGN_SHIFT: {
41             bufSize_ = (bufSize_ >> H_ALIGN_SHIFT);
42             uint16_t *temp = new(std::nothrow) uint16_t[bufSize_];
43             if (temp != nullptr) {
44                 buffer_ = reinterpret_cast<char *>(temp);
45             }
46             bufSize_ = (bufSize_ << H_ALIGN_SHIFT);
47             break;
48         }
49         case W_ALIGN_SHIFT: {
50             bufSize_ = (bufSize_ >> W_ALIGN_SHIFT);
51             uint32_t *temp = new(std::nothrow) uint32_t[bufSize_];
52             if (temp != nullptr) {
53                 buffer_ = reinterpret_cast<char *>(temp);
54             }
55             bufSize_ = (bufSize_ << W_ALIGN_SHIFT);
56             break;
57         }
58         case D_ALIGN_SHIFT: {
59             bufSize_ = (bufSize_ >> D_ALIGN_SHIFT);
60             uint64_t *temp = new(std::nothrow) uint64_t[bufSize_];
61             if (temp != nullptr) {
62                 buffer_ = reinterpret_cast<char *>(temp);
63             }
64             bufSize_ = (bufSize_ << D_ALIGN_SHIFT);
65             break;
66         }
67     }
68 }
69 
Read(const int fd,const std::size_t len)70 ssize_t RingBuffer::Read(const int fd, const std::size_t len)
71 {
72     if (fd < 0) {
73         return -1;
74     }
75     if (len == 0) {
76         return 0;
77     }
78     constexpr std::size_t numDests {2};
79     struct iovec destBufs[numDests];
80     // resize if free space is not big enough
81     std::lock_guard<std::mutex> lk {mtx_};
82     while (len >= FreeSize()) {
83         // the equal sign makes sure the buffer will not be fully filled
84         if (Resize() != 0) {
85             return -1;
86         }
87     }
88     if (buffer_ == nullptr) {
89         return -1;
90     }
91     // now we have enough free space to read in from fd
92     destBufs[0].iov_base = buffer_ + tail_;
93     if (tail_ + len < bufSize_) {
94         // continuous free space
95         destBufs[0].iov_len = len;
96         destBufs[1].iov_base = buffer_ + tail_ + len;
97         destBufs[1].iov_len = 0;
98     } else {
99         // free space splitted
100         destBufs[0].iov_len = bufSize_ - tail_;
101         destBufs[1].iov_base = buffer_;
102         destBufs[1].iov_len = len + tail_ - bufSize_;
103     }
104     ssize_t ret = readv(fd, destBufs, numDests);
105     if (ret != -1) {
106         // update buffer status
107         tail_ += static_cast<std::size_t>(ret);
108         while (tail_ >= bufSize_) {
109             tail_ -= bufSize_;
110         }
111     }
112     return ret;
113 }
114 
Write(const int fd,std::size_t len)115 ssize_t RingBuffer::Write(const int fd, std::size_t len)
116 {
117     if (fd < 0) {
118         return -1;
119     }
120     constexpr std::size_t numSrcs {2};
121     struct iovec srcBufs[numSrcs];
122     std::lock_guard<std::mutex> lk {mtx_};
123     std::size_t dataSize = DataSize();
124     if (dataSize < len) {
125         len = dataSize;
126     }
127     if (len == 0) {
128         return 0;
129     }
130     if (buffer_ == nullptr) {
131         return -1;
132     }
133     // now we are sure there is at least 'len' bytes data in the buffer
134     srcBufs[0].iov_base = buffer_ + head_;
135     if (head_ + len > bufSize_) {
136         // data splitted
137         srcBufs[0].iov_len = bufSize_ - head_;
138         srcBufs[1].iov_base = buffer_;
139         srcBufs[1].iov_len = len + head_- bufSize_;
140     } else {
141         // continuous data
142         srcBufs[0].iov_len = len;
143         srcBufs[1].iov_base = buffer_ + head_ + len;
144         srcBufs[1].iov_len = 0;
145     }
146     ssize_t ret = writev(fd, srcBufs, numSrcs);
147     if (ret != -1) {
148         // update buffer status
149         head_ += static_cast<std::size_t>(ret);
150         while (head_ >= bufSize_) {
151             head_ -= bufSize_;
152         }
153     }
154     return ret;
155 }
156 
Get(char * dest,const std::size_t len)157 std::size_t RingBuffer::Get(char* dest, const std::size_t len)
158 {
159     if (dest == nullptr) {
160         return 0;
161     }
162     if (len == 0) {
163         return 0;
164     }
165     std::lock_guard<std::mutex> lk {mtx_};
166     auto dataSize = DataSize();
167     if (len > dataSize) {
168         return 0;
169     }
170     if (buffer_ == nullptr) {
171         return -1;
172     }
173     if (head_ + len > bufSize_) {
174         // data splitted
175         if (memcpy_s(dest, len, buffer_ + head_, bufSize_ - head_) != EOK) {
176             return 0;
177         }
178         if (memcpy_s(dest + bufSize_ - head_, len + head_ - bufSize_, buffer_, len + head_ - bufSize_) != EOK) {
179             return 0;
180         }
181     } else {
182         if (memcpy_s(dest, len, buffer_ + head_, len) != EOK) {
183             return 0;
184         }
185     }
186     // update buffer status
187     head_ += len;
188     while (head_ >= bufSize_) {
189         head_ -= bufSize_;
190     }
191     return len;
192 }
193 
Put(const char * str,const std::size_t len)194 int RingBuffer::Put(const char* str, const std::size_t len)
195 {
196     if (str == nullptr) {
197         return -1;
198     }
199     if (len == 0) {
200         return 0;
201     }
202     // resize if free space is not big enough
203     std::lock_guard<std::mutex> lk {mtx_};
204     while (len >= FreeSize()) {
205         // the equal sign makes sure the buffer will not be fully filled
206         if (Resize() != 0) {
207             return -1;
208         }
209     }
210     if (buffer_ == nullptr) {
211         return -1;
212     }
213     if (tail_ + len < bufSize_) {
214         // continuous free space
215         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, len) != EOK) {
216             return -1;
217         }
218     } else {
219         // splitted free space
220         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, bufSize_ - tail_) != EOK) {
221             return -1;
222         }
223         if (memcpy_s(buffer_, bufSize_, str + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
224             return -1;
225         }
226     }
227     // update buffer status
228     tail_ += len;
229     while (tail_ >= bufSize_) {
230         tail_ -= bufSize_;
231     }
232     return len;
233 }
234 
Put(const std::string & str)235 int RingBuffer::Put(const std::string& str)
236 {
237     if (str.empty()) {
238         return -1;
239     }
240     std::size_t len = str.length();
241     if (len == 0) {
242         return 0;
243     }
244     // resize if free space is not big enough
245     std::lock_guard<std::mutex> lk {mtx_};
246     while (len >= FreeSize()) {
247         // the equal sign makes sure the buffer will not be fully filled
248         if (Resize() != 0) {
249             return -1;
250         }
251     }
252     if (buffer_ == nullptr) {
253         return -1;
254     }
255     if (tail_ + len < bufSize_) {
256         // continuous free space
257         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), len) != EOK) {
258             return -1;
259         }
260     } else {
261         // splitted free space
262         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), bufSize_ - tail_) != EOK) {
263             return -1;
264         }
265         if (memcpy_s(buffer_, bufSize_, str.c_str() + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
266             return -1;
267         }
268     }
269     // update buffer status
270     tail_ += len;
271     while (tail_ >= bufSize_) {
272         tail_ -= bufSize_;
273     }
274     return len;
275 }
276 
Allocate(std::size_t bufSize)277 char* RingBuffer::Allocate(std::size_t bufSize)
278 {
279     char *newBuffer {nullptr};
280     switch (alignShift_) {
281         case B_ALIGN_SHIFT: {
282             bufSize = (bufSize >> B_ALIGN_SHIFT);
283             newBuffer = new(std::nothrow) char[bufSize];
284             break;
285         }
286         case H_ALIGN_SHIFT: {
287             bufSize = (bufSize >> H_ALIGN_SHIFT);
288             uint16_t *temp = new(std::nothrow) uint16_t[bufSize];
289             if (temp != nullptr) {
290                 newBuffer = reinterpret_cast<char *>(temp);
291             }
292             break;
293         }
294         case W_ALIGN_SHIFT: {
295             bufSize = (bufSize >> W_ALIGN_SHIFT);
296             uint32_t *temp = new(std::nothrow) uint32_t[bufSize];
297             if (temp != nullptr) {
298                 newBuffer = reinterpret_cast<char *>(temp);
299             }
300             break;
301         }
302         case D_ALIGN_SHIFT: {
303             bufSize = (bufSize >> D_ALIGN_SHIFT);
304             uint64_t *temp = new(std::nothrow) uint64_t[bufSize];
305             if (temp != nullptr) {
306                 newBuffer = reinterpret_cast<char *>(temp);
307             }
308             break;
309         }
310     }
311     return newBuffer;
312 }
313 
Resize()314 int RingBuffer::Resize()
315 {
316     std::size_t expandedSize {bufSize_ << 1};
317     char* newBuf = Allocate(expandedSize);
318     if (newBuf == nullptr) {
319         return -1;
320     }
321     if (buffer_ == nullptr) {
322         return -1;
323     }
324     // copy data to the new buffer
325     auto dataSize = DataSize();
326     if (head_ + dataSize > bufSize_) {
327         // data splitted
328         if (memcpy_s(newBuf, expandedSize, buffer_ + head_, bufSize_ - head_) != EOK) {
329             delete[] newBuf;
330             return -1;
331         }
332         if (memcpy_s(newBuf + bufSize_ - head_,
333                      expandedSize - (bufSize_ - head_),
334                      buffer_,
335                      dataSize - (bufSize_ - head_)) != EOK) {
336             delete[] newBuf;
337             return -1;
338         }
339     } else {
340         // continuous data
341         if (memcpy_s(newBuf, expandedSize, buffer_ + head_, dataSize) != EOK) {
342             delete[] newBuf;
343             return -1;
344         }
345     }
346     // update buffer status
347     delete[] buffer_;
348     buffer_ = newBuf;
349     bufSize_ = expandedSize;
350     head_ = 0;
351     tail_ = dataSize;
352 
353     return 0;
354 }