1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "file_descriptor.h"
16 #ifndef HDC_HOST
17 #include <sys/epoll.h>
18 #endif
19
20 namespace Hdc {
21 static const int SECONDS_TIMEOUT = 5;
22
HdcFileDescriptor(uv_loop_t * loopIn,int fdToRead,void * callerContextIn,CallBackWhenRead callbackReadIn,CmdResultCallback callbackFinishIn)23 HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
24 CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn)
25 {
26 loop = loopIn;
27 workContinue = true;
28 callbackFinish = callbackFinishIn;
29 callbackRead = callbackReadIn;
30 fdIO = fdToRead;
31 refIO = 0;
32 callerContext = callerContextIn;
33 ioWriteThread = std::thread(IOWriteThread, this);
34 }
35
~HdcFileDescriptor()36 HdcFileDescriptor::~HdcFileDescriptor()
37 {
38 workContinue = false;
39 NotifyWrite();
40 ioWriteThread.join();
41 if (ioReadThread.joinable()) {
42 ioReadThread.join();
43 }
44 }
45
ReadyForRelease()46 bool HdcFileDescriptor::ReadyForRelease()
47 {
48 return refIO == 0;
49 }
50
51 // just tryCloseFdIo = true, callback will be effect
StopWorkOnThread(bool tryCloseFdIo,std::function<void ()> closeFdCallback)52 void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
53 {
54 workContinue = false;
55 NotifyWrite();
56 callbackCloseFd = closeFdCallback;
57 if (tryCloseFdIo && refIO > 0) {
58 if (callbackCloseFd != nullptr) {
59 callbackCloseFd();
60 }
61 }
62 }
63
FileIOOnThread(CtxFileIO * ctxIO,int bufSize)64 void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize)
65 {
66 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
67 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
68 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
69 #endif
70 HdcFileDescriptor *thisClass = ctxIO->thisClass;
71 uint8_t *buf = ctxIO->bufIO;
72 bool bFinish = false;
73 bool fetalFinish = false;
74 ssize_t nBytes;
75 #ifndef HDC_HOST
76 constexpr int epollSize = 1;
77 int epfd = epoll_create(epollSize);
78 struct epoll_event ev;
79 struct epoll_event events[epollSize];
80 ev.data.fd = thisClass->fdIO;
81 ev.events = EPOLLIN | EPOLLET;
82 epoll_ctl(epfd, EPOLL_CTL_ADD, thisClass->fdIO, &ev);
83 #endif
84 while (true) {
85 if (thisClass->workContinue == false) {
86 WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
87 bFinish = true;
88 break;
89 }
90
91 if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
92 WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
93 break;
94 }
95 #ifndef HDC_HOST
96 int rc = epoll_wait(epfd, events, epollSize, SECONDS_TIMEOUT * TIME_BASE);
97 #else
98 struct timeval timeout;
99 timeout.tv_sec = SECONDS_TIMEOUT;
100 timeout.tv_usec = 0;
101 fd_set rset;
102 FD_ZERO(&rset);
103 FD_SET(thisClass->fdIO, &rset);
104 int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
105 #endif
106 if (rc < 0) {
107 WRITE_LOG(LOG_FATAL, "FileIOOnThread select or epoll_wait fdIO:%d error:%d",
108 thisClass->fdIO, errno);
109 break;
110 } else if (rc == 0) {
111 continue;
112 }
113 #ifndef HDC_HOST
114 int fd = events[0].data.fd;
115 uint32_t event = events[0].events;
116 nBytes = 0;
117 if (event & EPOLLIN) {
118 nBytes = read(fd, buf, bufSize);
119 }
120 if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
121 bFinish = true;
122 fetalFinish = true;
123 if ((nBytes > 0) && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
124 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
125 }
126 break;
127 }
128 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
129 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
130 continue;
131 }
132 if (nBytes > 0) {
133 if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
134 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
135 bFinish = true;
136 break;
137 }
138 continue;
139 } else {
140 WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
141 thisClass->fdIO, nBytes, errno);
142 bFinish = true;
143 fetalFinish = true;
144 break;
145 }
146 #else
147 nBytes = read(thisClass->fdIO, buf, bufSize);
148 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
149 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
150 continue;
151 }
152 if (nBytes > 0) {
153 if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
154 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
155 bFinish = true;
156 break;
157 }
158 continue;
159 } else {
160 WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
161 thisClass->fdIO, nBytes, errno);
162 bFinish = true;
163 fetalFinish = true;
164 break;
165 }
166 #endif
167 }
168 #ifndef HDC_HOST
169 if (epoll_ctl(epfd, EPOLL_CTL_DEL, thisClass->fdIO, nullptr) == -1) {
170 WRITE_LOG(LOG_INFO, "EPOLL_CTL_DEL fail fd:%d epfd:%d errno:%d",
171 thisClass->fdIO, epfd, errno);
172 }
173 close(epfd);
174 #endif
175 if (buf != nullptr) {
176 delete[] buf;
177 buf = nullptr;
178 }
179 delete ctxIO;
180
181 --thisClass->refIO;
182 if (bFinish) {
183 thisClass->workContinue = false;
184 thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
185 }
186 }
187
LoopReadOnThread()188 int HdcFileDescriptor::LoopReadOnThread()
189 {
190 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
191 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
192 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
193 #endif
194 int readMax = Base::GetMaxBufSize() * 1.2;
195 auto contextIO = new(std::nothrow) CtxFileIO();
196 auto buf = new(std::nothrow) uint8_t[readMax]();
197 if (!contextIO || !buf) {
198 if (contextIO) {
199 delete contextIO;
200 }
201 if (buf) {
202 delete[] buf;
203 }
204 WRITE_LOG(LOG_FATAL, "Memory alloc failed");
205 callbackFinish(callerContext, true, "Memory alloc failed");
206 return -1;
207 }
208 contextIO->bufIO = buf;
209 contextIO->thisClass = this;
210 ++refIO;
211 ioReadThread = std::thread(FileIOOnThread, contextIO, readMax);
212 return 0;
213 }
214
StartWorkOnThread()215 bool HdcFileDescriptor::StartWorkOnThread()
216 {
217 if (LoopReadOnThread() < 0) {
218 return false;
219 }
220 return true;
221 }
222
Write(uint8_t * data,int size)223 int HdcFileDescriptor::Write(uint8_t *data, int size)
224 {
225 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
226 size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
227 }
228 if (size <= 0) {
229 WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
230 return -1;
231 }
232 auto buf = new(std::nothrow) uint8_t[size];
233 if (!buf) {
234 return -1;
235 }
236 if (memcpy_s(buf, size, data, size) != EOK) {
237 delete[] buf;
238 return -1;
239 }
240 return WriteWithMem(buf, size);
241 }
242
243 // Data's memory must be Malloc, and the callback FREE after this function is completed
WriteWithMem(uint8_t * data,int size)244 int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
245 {
246 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
247 mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
248 mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
249 #endif
250 auto contextIO = new(std::nothrow) CtxFileIO();
251 if (!contextIO) {
252 delete[] data;
253 WRITE_LOG(LOG_FATAL, "Memory alloc failed");
254 callbackFinish(callerContext, true, "Memory alloc failed");
255 return -1;
256 }
257 contextIO->bufIO = data;
258 contextIO->size = static_cast<size_t>(size);
259 contextIO->thisClass = this;
260 PushWrite(contextIO);
261 NotifyWrite();
262 return size;
263 }
264
IOWriteThread(void * object)265 void HdcFileDescriptor::IOWriteThread(void *object)
266 {
267 HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
268 while (hfd->workContinue) {
269 hfd->HandleWrite();
270 hfd->WaitWrite();
271 }
272 }
273
PushWrite(CtxFileIO * cfio)274 void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
275 {
276 std::unique_lock<std::mutex> lock(writeMutex);
277 writeQueue.push(cfio);
278 }
279
PopWrite()280 CtxFileIO *HdcFileDescriptor::PopWrite()
281 {
282 std::unique_lock<std::mutex> lock(writeMutex);
283 CtxFileIO *cfio = nullptr;
284 if (!writeQueue.empty()) {
285 cfio = writeQueue.front();
286 writeQueue.pop();
287 }
288 return cfio;
289 }
290
NotifyWrite()291 void HdcFileDescriptor::NotifyWrite()
292 {
293 std::unique_lock<std::mutex> lock(writeMutex);
294 writeCond.notify_one();
295 }
296
WaitWrite()297 void HdcFileDescriptor::WaitWrite()
298 {
299 std::unique_lock<std::mutex> lock(writeMutex);
300 writeCond.wait(lock, [&]() { return !writeQueue.empty() || !workContinue; });
301 }
302
HandleWrite()303 void HdcFileDescriptor::HandleWrite()
304 {
305 CtxFileIO *cfio = nullptr;
306 while ((cfio = PopWrite()) != nullptr) {
307 CtxFileIOWrite(cfio);
308 delete cfio;
309 }
310 }
311
CtxFileIOWrite(CtxFileIO * cfio)312 void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
313 {
314 std::unique_lock<std::mutex> lock(writeMutex);
315 uint8_t *buf = cfio->bufIO;
316 uint8_t *data = buf;
317 size_t cnt = cfio->size;
318 constexpr int intrmax = 1000;
319 int intrcnt = 0;
320 while (cnt > 0) {
321 ssize_t rc = write(fdIO, data, cnt);
322 if (rc < 0) {
323 if (errno == EINTR || errno == EAGAIN) {
324 if (++intrcnt > intrmax) {
325 WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt errno:%d", fdIO, errno);
326 intrcnt = 0;
327 }
328 continue;
329 } else {
330 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
331 break;
332 }
333 }
334 data += rc;
335 cnt -= static_cast<size_t>(rc);
336 }
337 delete[] buf;
338 }
339 } // namespace Hdc
340