• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "file_descriptor.h"
16 #ifndef HDC_HOST
17 #include <sys/epoll.h>
18 #endif
19 
20 namespace Hdc {
21 static const int SECONDS_TIMEOUT = 5;
22 
HdcFileDescriptor(uv_loop_t * loopIn,int fdToRead,void * callerContextIn,CallBackWhenRead callbackReadIn,CmdResultCallback callbackFinishIn)23 HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
24                                      CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn)
25 {
26     loop = loopIn;
27     workContinue = true;
28     callbackFinish = callbackFinishIn;
29     callbackRead = callbackReadIn;
30     fdIO = fdToRead;
31     refIO = 0;
32     callerContext = callerContextIn;
33     ioWriteThread = std::thread(IOWriteThread, this);
34 }
35 
~HdcFileDescriptor()36 HdcFileDescriptor::~HdcFileDescriptor()
37 {
38     workContinue = false;
39     NotifyWrite();
40     ioWriteThread.join();
41     if (ioReadThread.joinable()) {
42         ioReadThread.join();
43     }
44 }
45 
ReadyForRelease()46 bool HdcFileDescriptor::ReadyForRelease()
47 {
48     return refIO == 0;
49 }
50 
51 // just tryCloseFdIo = true, callback will be effect
StopWorkOnThread(bool tryCloseFdIo,std::function<void ()> closeFdCallback)52 void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
53 {
54     workContinue = false;
55     NotifyWrite();
56     callbackCloseFd = closeFdCallback;
57     if (tryCloseFdIo && refIO > 0) {
58         if (callbackCloseFd != nullptr) {
59             callbackCloseFd();
60         }
61     }
62 }
63 
FileIOOnThread(CtxFileIO * ctxIO,int bufSize)64 void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize)
65 {
66 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
67     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
68     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
69 #endif
70     HdcFileDescriptor *thisClass = ctxIO->thisClass;
71     uint8_t *buf = ctxIO->bufIO;
72     bool bFinish = false;
73     bool fetalFinish = false;
74     ssize_t nBytes;
75 #ifndef HDC_HOST
76     constexpr int epollSize = 1;
77     int epfd = epoll_create(epollSize);
78     struct epoll_event ev;
79     struct epoll_event events[epollSize];
80     ev.data.fd = thisClass->fdIO;
81     ev.events = EPOLLIN | EPOLLET;
82     epoll_ctl(epfd, EPOLL_CTL_ADD, thisClass->fdIO, &ev);
83 #endif
84     while (true) {
85         if (thisClass->workContinue == false) {
86             WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
87             bFinish = true;
88             break;
89         }
90 
91         if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
92             WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
93             break;
94         }
95 #ifndef HDC_HOST
96         int rc = epoll_wait(epfd, events, epollSize, SECONDS_TIMEOUT * TIME_BASE);
97 #else
98         struct timeval timeout;
99         timeout.tv_sec = SECONDS_TIMEOUT;
100         timeout.tv_usec = 0;
101         fd_set rset;
102         FD_ZERO(&rset);
103         FD_SET(thisClass->fdIO, &rset);
104         int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
105 #endif
106         if (rc < 0) {
107             WRITE_LOG(LOG_FATAL, "FileIOOnThread select or epoll_wait fdIO:%d error:%d",
108                 thisClass->fdIO, errno);
109             break;
110         } else if (rc == 0) {
111             continue;
112         }
113 #ifndef HDC_HOST
114         int fd = events[0].data.fd;
115         uint32_t event = events[0].events;
116         nBytes = 0;
117         if (event & EPOLLIN) {
118             nBytes = read(fd, buf, bufSize);
119         }
120         if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
121             bFinish = true;
122             fetalFinish = true;
123             if ((nBytes > 0) && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
124                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
125             }
126             break;
127         }
128         if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
129             WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
130             continue;
131         }
132         if (nBytes > 0) {
133             if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
134                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
135                 bFinish = true;
136                 break;
137             }
138             continue;
139         } else {
140             WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
141                 thisClass->fdIO, nBytes, errno);
142             bFinish = true;
143             fetalFinish = true;
144             break;
145         }
146 #else
147         nBytes = read(thisClass->fdIO, buf, bufSize);
148         if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
149             WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
150             continue;
151         }
152         if (nBytes > 0) {
153             if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
154                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
155                 bFinish = true;
156                 break;
157             }
158             continue;
159         } else {
160             WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
161                 thisClass->fdIO, nBytes, errno);
162             bFinish = true;
163             fetalFinish = true;
164             break;
165         }
166 #endif
167     }
168 #ifndef HDC_HOST
169     if (epoll_ctl(epfd, EPOLL_CTL_DEL, thisClass->fdIO, nullptr) == -1) {
170         WRITE_LOG(LOG_INFO, "EPOLL_CTL_DEL fail fd:%d epfd:%d errno:%d",
171             thisClass->fdIO, epfd, errno);
172     }
173     close(epfd);
174 #endif
175     if (buf != nullptr) {
176         delete[] buf;
177         buf = nullptr;
178     }
179     delete ctxIO;
180 
181     --thisClass->refIO;
182     if (bFinish) {
183         thisClass->workContinue = false;
184         thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
185     }
186 }
187 
LoopReadOnThread()188 int HdcFileDescriptor::LoopReadOnThread()
189 {
190 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
191     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
192     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
193 #endif
194     int readMax = Base::GetMaxBufSize() * 1.2;
195     auto contextIO = new(std::nothrow) CtxFileIO();
196     auto buf = new(std::nothrow) uint8_t[readMax]();
197     if (!contextIO || !buf) {
198         if (contextIO) {
199             delete contextIO;
200         }
201         if (buf) {
202             delete[] buf;
203         }
204         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
205         callbackFinish(callerContext, true, "Memory alloc failed");
206         return -1;
207     }
208     contextIO->bufIO = buf;
209     contextIO->thisClass = this;
210     ++refIO;
211     ioReadThread = std::thread(FileIOOnThread, contextIO, readMax);
212     return 0;
213 }
214 
StartWorkOnThread()215 bool HdcFileDescriptor::StartWorkOnThread()
216 {
217     if (LoopReadOnThread() < 0) {
218         return false;
219     }
220     return true;
221 }
222 
Write(uint8_t * data,int size)223 int HdcFileDescriptor::Write(uint8_t *data, int size)
224 {
225     if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
226         size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
227     }
228     if (size <= 0) {
229         WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
230         return -1;
231     }
232     auto buf = new(std::nothrow) uint8_t[size];
233     if (!buf) {
234         return -1;
235     }
236     if (memcpy_s(buf, size, data, size) != EOK) {
237         delete[] buf;
238         return -1;
239     }
240     return WriteWithMem(buf, size);
241 }
242 
243 // Data's memory must be Malloc, and the callback FREE after this function is completed
WriteWithMem(uint8_t * data,int size)244 int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
245 {
246 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
247     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
248     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
249 #endif
250     auto contextIO = new(std::nothrow) CtxFileIO();
251     if (!contextIO) {
252         delete[] data;
253         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
254         callbackFinish(callerContext, true, "Memory alloc failed");
255         return -1;
256     }
257     contextIO->bufIO = data;
258     contextIO->size = static_cast<size_t>(size);
259     contextIO->thisClass = this;
260     PushWrite(contextIO);
261     NotifyWrite();
262     return size;
263 }
264 
IOWriteThread(void * object)265 void HdcFileDescriptor::IOWriteThread(void *object)
266 {
267     HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
268     while (hfd->workContinue) {
269         hfd->HandleWrite();
270         hfd->WaitWrite();
271     }
272 }
273 
PushWrite(CtxFileIO * cfio)274 void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
275 {
276     std::unique_lock<std::mutex> lock(writeMutex);
277     writeQueue.push(cfio);
278 }
279 
PopWrite()280 CtxFileIO *HdcFileDescriptor::PopWrite()
281 {
282     std::unique_lock<std::mutex> lock(writeMutex);
283     CtxFileIO *cfio = nullptr;
284     if (!writeQueue.empty()) {
285         cfio = writeQueue.front();
286         writeQueue.pop();
287     }
288     return cfio;
289 }
290 
NotifyWrite()291 void HdcFileDescriptor::NotifyWrite()
292 {
293     std::unique_lock<std::mutex> lock(writeMutex);
294     writeCond.notify_one();
295 }
296 
WaitWrite()297 void HdcFileDescriptor::WaitWrite()
298 {
299     std::unique_lock<std::mutex> lock(writeMutex);
300     writeCond.wait(lock, [&]() { return !writeQueue.empty() || !workContinue; });
301 }
302 
HandleWrite()303 void HdcFileDescriptor::HandleWrite()
304 {
305     CtxFileIO *cfio = nullptr;
306     while ((cfio = PopWrite()) != nullptr) {
307         CtxFileIOWrite(cfio);
308         delete cfio;
309     }
310 }
311 
CtxFileIOWrite(CtxFileIO * cfio)312 void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
313 {
314     std::unique_lock<std::mutex> lock(writeMutex);
315     uint8_t *buf = cfio->bufIO;
316     uint8_t *data = buf;
317     size_t cnt = cfio->size;
318     constexpr int intrmax = 1000;
319     int intrcnt = 0;
320     while (cnt > 0) {
321         ssize_t rc = write(fdIO, data, cnt);
322         if (rc < 0) {
323             if (errno == EINTR || errno == EAGAIN) {
324                 if (++intrcnt > intrmax) {
325                     WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt errno:%d", fdIO, errno);
326                     intrcnt = 0;
327                 }
328                 continue;
329             } else {
330                 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
331                 break;
332             }
333         }
334         data += rc;
335         cnt -= static_cast<size_t>(rc);
336     }
337     delete[] buf;
338 }
339 }  // namespace Hdc
340