• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <iostream>
16 #include <cstring>
17 #include <string>
18 #include <thread>
19 
20 #include "log.h"
21 #include "securec.h"
22 #include "ring_buffer.h"
23 
24 namespace Updater {
~RingBuffer()25 RingBuffer::~RingBuffer()
26 {
27     Release();
28 }
29 
Init(uint32_t singleSize,uint32_t num)30 bool RingBuffer::Init(uint32_t singleSize, uint32_t num)
31 {
32     if (singleSize == 0 || num == 0 || (num & (num - 1)) != 0) { // power of 2
33         LOG(ERROR) << "singleSize:" <<  singleSize << " num:" << num << " error";
34         return false;
35     }
36     bufArray_ = new (std::nothrow) uint8_t* [num] {};
37     lenArray_ = new (std::nothrow) uint32_t [num] {};
38     if (bufArray_ == nullptr || lenArray_ == nullptr) {
39         LOG(ERROR) << "new buf or len " << num << " error";
40         return false;
41     }
42     for (uint32_t i = 0; i < num; i++) {
43         bufArray_[i] = new (std::nothrow) uint8_t [singleSize] {};
44         if (bufArray_[i] == nullptr) {
45             LOG(ERROR) << "new buf " << i << " size " << singleSize << " error";
46             return false;
47         }
48     }
49 
50     writeIndex_ = 0;
51     readIndex_ = 0;
52     num_ = num;
53     singleSize_ = singleSize;
54     return true;
55 }
56 
Reset()57 void RingBuffer::Reset()
58 {
59     isStop_ = false;
60     writeIndex_ = 0;
61     readIndex_ = 0;
62     for (uint32_t i = 0; i < num_; ++i) {
63         lenArray_[i] = 0;
64     }
65 }
66 
IsFull()67 bool RingBuffer::IsFull()
68 {
69     // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1
70     // when writeIndex_ - readIndex_ == n means full
71     return writeIndex_ == (readIndex_ ^ num_);
72 }
73 
IsEmpty()74 bool RingBuffer::IsEmpty()
75 {
76     // writeIndex readIndex real size: 0 ~ num_ -1, logic size: 0 ~ 2num_ - 1
77     // when same means empty
78     return writeIndex_ == readIndex_;
79 }
80 
Push(uint8_t * buf,uint32_t len)81 bool RingBuffer::Push(uint8_t *buf, uint32_t len)
82 {
83     if (buf == nullptr || len == 0 || len > singleSize_) {
84         LOG(ERROR) << "RingBuffer push error, len:" << len << " singleSize:" << singleSize_;
85         return false;
86     }
87     if (IsFull()) {
88         std::unique_lock<std::mutex> pushLock(notifyMtx_);
89         while (IsFull()) {
90             if (isStop_) {
91                 LOG(WARNING) << "RingBuffer push stopped";
92                 return false;
93             }
94             LOG(DEBUG) << "RingBuffer full, wait !!!";
95             notFull_.wait(pushLock);
96         }
97     }
98 
99     {
100         std::unique_lock<std::mutex> arrayLock(arrayMtx_);
101         uint32_t index = writeIndex_ & (num_ - 1);
102         if (memcpy_s(bufArray_[index], singleSize_, buf, len) != EOK) {
103             LOG(ERROR) << "memcpy error, len:" << len;
104             return false;
105         }
106         lenArray_[index] = len;
107         writeIndex_ = (writeIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size
108     }
109 
110     std::unique_lock<std::mutex> popLock(notifyMtx_);
111     notEmpty_.notify_all();
112     return true;
113 }
114 
Pop(uint8_t * buf,uint32_t maxLen,uint32_t & len)115 bool RingBuffer::Pop(uint8_t *buf, uint32_t maxLen, uint32_t &len)
116 {
117     if (buf == nullptr) {
118         LOG(ERROR) << "RingBuffer pop para error";
119         return false;
120     }
121     if (IsEmpty()) {
122         std::unique_lock<std::mutex> popLock(notifyMtx_);
123         while (IsEmpty()) {
124             if (isStop_) {
125                 LOG(WARNING) << "RingBuffer pop stopped";
126                 return false;
127             }
128             LOG(DEBUG) << "RingBuffer empty, wait !!!";
129             notEmpty_.wait(popLock);
130         }
131     }
132 
133     {
134         std::unique_lock<std::mutex> arrayLock(arrayMtx_);
135         uint32_t index = readIndex_ & (num_ - 1);
136         if (memcpy_s(buf, maxLen, bufArray_[index], lenArray_[index]) != EOK) {
137             LOG(ERROR) << "memcpy error, len:" << lenArray_[index];
138             return false;
139         }
140         len = lenArray_[index];
141         readIndex_ = (readIndex_ + 1) & (2 * num_ - 1); // 2: logic buffer size
142     }
143 
144     std::unique_lock<std::mutex> popLock(notifyMtx_);
145     notFull_.notify_all();
146     return true;
147 }
148 
Stop()149 void RingBuffer::Stop()
150 {
151     isStop_ = true;
152     notFull_.notify_all();
153     notEmpty_.notify_all();
154 }
155 
StopPush()156 void RingBuffer::StopPush()
157 {
158     {
159         std::unique_lock<std::mutex> pushLock(notifyMtx_);
160         isStop_ = true;
161     }
162     notFull_.notify_all();
163 }
164 
StopPop()165 void RingBuffer::StopPop()
166 {
167     {
168         std::unique_lock<std::mutex> popLock(notifyMtx_);
169         isStop_ = true;
170     }
171     notEmpty_.notify_all();
172 }
173 
Release()174 void RingBuffer::Release()
175 {
176     if (lenArray_ != nullptr) {
177         delete[] lenArray_;
178         lenArray_ = nullptr;
179     }
180 
181     if (bufArray_ != nullptr) {
182         for (uint32_t i = 0; i < num_ && bufArray_[i] != nullptr; i++) {
183             delete[] bufArray_[i];
184             bufArray_[i] = nullptr;
185         }
186         delete[] bufArray_;
187         bufArray_ = nullptr;
188     }
189 }
190 } // namespace Updater
191