• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "ringbuffer.h"
17 
18 #include <memory>
19 #include <sys/uio.h>
20 #include <securec.h>
21 #include <cstring>
22 #include <iostream>
RingBuffer(const std::size_t bufSize,const enum MemAlignShift shift)23 RingBuffer::RingBuffer(const std::size_t bufSize, const enum MemAlignShift shift)
24     : bufSize_ {bufSize},
25       alignShift_ {shift}
26 {
27     if (bufSize_ <= DEFAULT_SIZE) {
28         bufSize_ = DEFAULT_SIZE;
29     }
30     switch (shift) {
31         case B_ALIGN_SHIFT: {
32             bufSize_ = (bufSize_ >> B_ALIGN_SHIFT);
33             buffer_ = new(std::nothrow) char[bufSize_];
34             bufSize_ = (bufSize_ << B_ALIGN_SHIFT);
35             break;
36         }
37         case H_ALIGN_SHIFT: {
38             bufSize_ = (bufSize_ >> H_ALIGN_SHIFT);
39             uint16_t *temp = new(std::nothrow) uint16_t[bufSize_];
40             buffer_ = reinterpret_cast<char *>(temp);
41             bufSize_ = (bufSize_ << H_ALIGN_SHIFT);
42             break;
43         }
44         case W_ALIGN_SHIFT: {
45             bufSize_ = (bufSize_ >> W_ALIGN_SHIFT);
46             uint32_t *temp = new(std::nothrow) uint32_t[bufSize_];
47             buffer_ = reinterpret_cast<char *>(temp);
48             bufSize_ = (bufSize_ << W_ALIGN_SHIFT);
49             break;
50         }
51         case D_ALIGN_SHIFT: {
52             bufSize_ = (bufSize_ >> D_ALIGN_SHIFT);
53             uint64_t *temp = new(std::nothrow) uint64_t[bufSize_];
54             buffer_ = reinterpret_cast<char *>(temp);
55             bufSize_ = (bufSize_ << D_ALIGN_SHIFT);
56             break;
57         }
58     }
59 }
60 
Read(const int fd,const std::size_t len)61 ssize_t RingBuffer::Read(const int fd, const std::size_t len)
62 {
63     if (fd < 0) {
64         return -1;
65     }
66     if (len == 0) {
67         return 0;
68     }
69     constexpr std::size_t numDests {2};
70     struct iovec destBufs[numDests];
71     // resize if free space is not big enough
72     std::lock_guard<std::mutex> lk {mtx_};
73     while (len >= FreeSize()) {
74         // the equal sign makes sure the buffer will not be fully filled
75         if (Resize() != 0) {
76             return -1;
77         }
78     }
79     // now we have enough free space to read in from fd
80     destBufs[0].iov_base = buffer_ + tail_;
81     if (tail_ + len < bufSize_) {
82         // continuous free space
83         destBufs[0].iov_len = len;
84         destBufs[1].iov_base = buffer_ + tail_ + len;
85         destBufs[1].iov_len = 0;
86     } else {
87         // free space splitted
88         destBufs[0].iov_len = bufSize_ - tail_;
89         destBufs[1].iov_base = buffer_;
90         destBufs[1].iov_len = len + tail_ - bufSize_;
91     }
92     ssize_t ret = readv(fd, destBufs, numDests);
93     if (ret != -1) {
94         // update buffer status
95         tail_ += static_cast<std::size_t>(ret);
96         while (tail_ >= bufSize_) {
97             tail_ -= bufSize_;
98         }
99     }
100     return ret;
101 }
102 
Write(const int fd,std::size_t len)103 ssize_t RingBuffer::Write(const int fd, std::size_t len)
104 {
105     if (fd < 0) {
106         return -1;
107     }
108     constexpr std::size_t numSrcs {2};
109     struct iovec srcBufs[numSrcs];
110     std::lock_guard<std::mutex> lk {mtx_};
111     std::size_t dataSize = DataSize();
112     if (dataSize < len) {
113         len = dataSize;
114     }
115     if (len == 0) {
116         return 0;
117     }
118     // now we are sure there is at least 'len' bytes data in the buffer
119     srcBufs[0].iov_base = buffer_ + head_;
120     if (head_ + len > bufSize_) {
121         // data splitted
122         srcBufs[0].iov_len = bufSize_ - head_;
123         srcBufs[1].iov_base = buffer_;
124         srcBufs[1].iov_len = len + head_- bufSize_;
125     } else {
126         // continuous data
127         srcBufs[0].iov_len = len;
128         srcBufs[1].iov_base = buffer_ + head_ + len;
129         srcBufs[1].iov_len = 0;
130     }
131     ssize_t ret = writev(fd, srcBufs, numSrcs);
132     if (ret != -1) {
133         // update buffer status
134         head_ += static_cast<std::size_t>(ret);
135         while (head_ >= bufSize_) {
136             head_ -= bufSize_;
137         }
138     }
139     return ret;
140 }
141 
Get(char * dest,const std::size_t len)142 std::size_t RingBuffer::Get(char* dest, const std::size_t len)
143 {
144     if (dest == nullptr) {
145         return 0;
146     }
147     if (len == 0) {
148         return 0;
149     }
150     std::lock_guard<std::mutex> lk {mtx_};
151     auto dataSize = DataSize();
152     if (len > dataSize) {
153         return 0;
154     }
155     if (head_ + len > bufSize_) {
156         // data splitted
157         if (memcpy_s(dest, len, buffer_ + head_, bufSize_ - head_) != EOK) {
158             return 0;
159         }
160         if (memcpy_s(dest + bufSize_ - head_, len + head_ - bufSize_, buffer_, len + head_ - bufSize_) != EOK) {
161             return 0;
162         }
163     } else {
164         if (memcpy_s(dest, len, buffer_ + head_, len) != EOK) {
165             return 0;
166         }
167     }
168     // update buffer status
169     head_ += len;
170     while (head_ >= bufSize_) {
171         head_ -= bufSize_;
172     }
173     return len;
174 }
175 
Put(const char * str,const std::size_t len)176 int RingBuffer::Put(const char* str, const std::size_t len)
177 {
178     if (str == nullptr) {
179         return -1;
180     }
181     if (len == 0) {
182         return 0;
183     }
184     // resize if free space is not big enough
185     std::lock_guard<std::mutex> lk {mtx_};
186     while (len >= FreeSize()) {
187         // the equal sign makes sure the buffer will not be fully filled
188         if (Resize() != 0) {
189             return -1;
190         }
191     }
192     if (tail_ + len < bufSize_) {
193         // continuous free space
194         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, len) != EOK) {
195             return -1;
196         }
197     } else {
198         // splitted free space
199         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str, bufSize_ - tail_) != EOK) {
200             return -1;
201         }
202         if (memcpy_s(buffer_, bufSize_, str + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
203             return -1;
204         }
205     }
206     // update buffer status
207     tail_ += len;
208     while (tail_ >= bufSize_) {
209         tail_ -= bufSize_;
210     }
211     return len;
212 }
213 
Put(const std::string & str)214 int RingBuffer::Put(const std::string& str)
215 {
216     if (str.empty()) {
217         return -1;
218     }
219     std::size_t len = str.length();
220     if (len == 0) {
221         return 0;
222     }
223     // resize if free space is not big enough
224     std::lock_guard<std::mutex> lk {mtx_};
225     while (len >= FreeSize()) {
226         // the equal sign makes sure the buffer will not be fully filled
227         if (Resize() != 0) {
228             return -1;
229         }
230     }
231     if (tail_ + len < bufSize_) {
232         // continuous free space
233         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), len) != EOK) {
234             return -1;
235         }
236     } else {
237         // splitted free space
238         if (memcpy_s(buffer_ + tail_, bufSize_ - tail_, str.c_str(), bufSize_ - tail_) != EOK) {
239             return -1;
240         }
241         if (memcpy_s(buffer_, bufSize_, str.c_str() + bufSize_ - tail_, len + tail_ - bufSize_) != EOK) {
242             return -1;
243         }
244     }
245     // update buffer status
246     tail_ += len;
247     while (tail_ >= bufSize_) {
248         tail_ -= bufSize_;
249     }
250     return len;
251 }
252 
Allocate(std::size_t bufSize)253 char* RingBuffer::Allocate(std::size_t bufSize)
254 {
255     char *newBuffer {nullptr};
256     switch (alignShift_) {
257         case B_ALIGN_SHIFT: {
258             bufSize = (bufSize >> B_ALIGN_SHIFT);
259             newBuffer = new(std::nothrow) char[bufSize];
260             break;
261         }
262         case H_ALIGN_SHIFT: {
263             bufSize = (bufSize >> H_ALIGN_SHIFT);
264             uint16_t *temp = new(std::nothrow) uint16_t[bufSize];
265             newBuffer = reinterpret_cast<char *>(temp);
266             break;
267         }
268         case W_ALIGN_SHIFT: {
269             bufSize = (bufSize >> W_ALIGN_SHIFT);
270             uint32_t *temp = new(std::nothrow) uint32_t[bufSize];
271             newBuffer = reinterpret_cast<char *>(temp);
272             break;
273         }
274         case D_ALIGN_SHIFT: {
275             bufSize = (bufSize >> D_ALIGN_SHIFT);
276             uint64_t *temp = new(std::nothrow) uint64_t[bufSize];
277             newBuffer = reinterpret_cast<char *>(temp);
278             break;
279         }
280     }
281     return newBuffer;
282 }
283 
Resize()284 int RingBuffer::Resize()
285 {
286     std::size_t expandedSize {bufSize_ << 1};
287     char* newBuf = Allocate(expandedSize);
288     if (newBuf == nullptr) {
289         return -1;
290     }
291     // copy data to the new buffer
292     auto dataSize = DataSize();
293     if (head_ + dataSize > bufSize_) {
294         // data splitted
295         if (memcpy_s(newBuf, expandedSize, buffer_ + head_, bufSize_ - head_) != EOK) {
296             delete[] newBuf;
297             return -1;
298         }
299         if (memcpy_s(newBuf + bufSize_ - head_,
300                      expandedSize - (bufSize_ - head_),
301                      buffer_,
302                      dataSize - (bufSize_ - head_)) != EOK) {
303             delete[] newBuf;
304             return -1;
305         }
306     } else {
307         // continuous data
308         if (memcpy_s(newBuf, expandedSize, buffer_ + head_, dataSize) != EOK) {
309             delete[] newBuf;
310             return -1;
311         }
312     }
313     // update buffer status
314     delete[] buffer_;
315     buffer_ = newBuf;
316     bufSize_ = expandedSize;
317     head_ = 0;
318     tail_ = dataSize;
319 
320     return 0;
321 }