• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "file_descriptor.h"
16 #ifndef HDC_HOST
17 #include <sys/epoll.h>
18 #endif
19 
20 namespace Hdc {
21 static const int SECONDS_TIMEOUT = 5;
22 
HdcFileDescriptor(uv_loop_t * loopIn,int fdToRead,void * callerContextIn,CallBackWhenRead callbackReadIn,CmdResultCallback callbackFinishIn,bool interactiveShell)23 HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
24                                      CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn,
25                                      bool interactiveShell)
26 {
27     loop = loopIn;
28     workContinue = true;
29     callbackFinish = callbackFinishIn;
30     callbackRead = callbackReadIn;
31     fdIO = fdToRead;
32     refIO = 0;
33     isInteractive = interactiveShell;
34     callerContext = callerContextIn;
35     if (isInteractive) {
36         std::thread(IOWriteThread, this).detach();
37     }
38 }
39 
~HdcFileDescriptor()40 HdcFileDescriptor::~HdcFileDescriptor()
41 {
42     workContinue = false;
43     if (isInteractive) {
44         NotifyWrite();
45         uv_sleep(MILL_SECONDS);
46     }
47 }
48 
ReadyForRelease()49 bool HdcFileDescriptor::ReadyForRelease()
50 {
51     return refIO == 0;
52 }
53 
54 // just tryCloseFdIo = true, callback will be effect
StopWorkOnThread(bool tryCloseFdIo,std::function<void ()> closeFdCallback)55 void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
56 {
57     workContinue = false;
58     if (isInteractive) {
59         NotifyWrite();
60     }
61     callbackCloseFd = closeFdCallback;
62     if (tryCloseFdIo && refIO > 0) {
63         if (callbackCloseFd != nullptr) {
64             callbackCloseFd();
65         }
66     }
67 }
68 
FileIOOnThread(CtxFileIO * ctxIO,int bufSize)69 void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize)
70 {
71 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
72     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
73     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
74 #endif
75     HdcFileDescriptor *thisClass = ctxIO->thisClass;
76     uint8_t *buf = ctxIO->bufIO;
77     bool bFinish = false;
78     bool fetalFinish = false;
79     ssize_t nBytes;
80 #ifndef HDC_HOST
81     constexpr int epollSize = 1;
82     int epfd = epoll_create(epollSize);
83     struct epoll_event ev;
84     struct epoll_event events[epollSize];
85     ev.data.fd = thisClass->fdIO;
86     ev.events = EPOLLIN | EPOLLET;
87     epoll_ctl(epfd, EPOLL_CTL_ADD, thisClass->fdIO, &ev);
88 #endif
89     while (true) {
90         if (thisClass->workContinue == false) {
91             WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
92             bFinish = true;
93             break;
94         }
95 
96         if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
97             WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
98             bFinish = true;
99             break;
100         }
101 #ifndef HDC_HOST
102         int rc = epoll_wait(epfd, events, epollSize, SECONDS_TIMEOUT * TIME_BASE);
103 #else
104         struct timeval timeout;
105         timeout.tv_sec = SECONDS_TIMEOUT;
106         timeout.tv_usec = 0;
107         fd_set rset;
108         FD_ZERO(&rset);
109         FD_SET(thisClass->fdIO, &rset);
110         int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
111 #endif
112         if (rc < 0) {
113             WRITE_LOG(LOG_FATAL, "FileIOOnThread select or epoll_wait fdIO:%d error:%d",
114                 thisClass->fdIO, errno);
115             if (errno == EINTR || errno == EAGAIN) {
116                 continue;
117             }
118             bFinish = true;
119             break;
120         } else if (rc == 0) {
121             WRITE_LOG(LOG_WARN, "FileIOOnThread select rc = 0, timeout.");
122             continue;
123         }
124 #ifndef HDC_HOST
125         int fd = events[0].data.fd;
126         uint32_t event = events[0].events;
127         nBytes = 0;
128         if (event & EPOLLIN) {
129             nBytes = read(fd, buf, bufSize);
130         }
131         if ((event & EPOLLERR) || (event & EPOLLHUP) || (event & EPOLLRDHUP)) {
132             bFinish = true;
133             fetalFinish = true;
134             if ((nBytes > 0) && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
135                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
136             }
137             break;
138         }
139         if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
140             WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
141             continue;
142         }
143         if (nBytes > 0) {
144             if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
145                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
146                 bFinish = true;
147                 break;
148             }
149             continue;
150         } else {
151             WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
152                 thisClass->fdIO, nBytes, errno);
153             bFinish = true;
154             fetalFinish = true;
155             break;
156         }
157 #else
158         nBytes = read(thisClass->fdIO, buf, bufSize);
159         if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
160             WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
161             continue;
162         }
163         if (nBytes > 0) {
164             if (!thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
165                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
166                 bFinish = true;
167                 break;
168             }
169             continue;
170         } else {
171             WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%d errno:%d",
172                 thisClass->fdIO, nBytes, errno);
173             bFinish = true;
174             fetalFinish = true;
175             break;
176         }
177 #endif
178     }
179 #ifndef HDC_HOST
180     if (epoll_ctl(epfd, EPOLL_CTL_DEL, thisClass->fdIO, nullptr) == -1) {
181         WRITE_LOG(LOG_INFO, "EPOLL_CTL_DEL fail fd:%d epfd:%d errno:%d",
182             thisClass->fdIO, epfd, errno);
183     }
184     close(epfd);
185 #endif
186     if (buf != nullptr) {
187         delete[] buf;
188         buf = nullptr;
189     }
190     delete ctxIO;
191 
192     --thisClass->refIO;
193     if (bFinish) {
194         thisClass->workContinue = false;
195         thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
196     }
197 }
198 
LoopReadOnThread()199 int HdcFileDescriptor::LoopReadOnThread()
200 {
201 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
202     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
203     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
204 #endif
205     int readMax = Base::GetMaxBufSize() * 1.2;
206     auto contextIO = new(std::nothrow) CtxFileIO();
207     auto buf = new(std::nothrow) uint8_t[readMax]();
208     if (!contextIO || !buf) {
209         if (contextIO) {
210             delete contextIO;
211         }
212         if (buf) {
213             delete[] buf;
214         }
215         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
216         callbackFinish(callerContext, true, "Memory alloc failed");
217         return -1;
218     }
219     contextIO->bufIO = buf;
220     contextIO->thisClass = this;
221     ++refIO;
222     std::thread(FileIOOnThread, contextIO, readMax).detach();
223     return 0;
224 }
225 
StartWorkOnThread()226 bool HdcFileDescriptor::StartWorkOnThread()
227 {
228     if (LoopReadOnThread() < 0) {
229         return false;
230     }
231     return true;
232 }
233 
Write(uint8_t * data,int size)234 int HdcFileDescriptor::Write(uint8_t *data, int size)
235 {
236     if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
237         size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
238     }
239     if (size <= 0) {
240         WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
241         return -1;
242     }
243     auto buf = new(std::nothrow) uint8_t[size];
244     if (!buf) {
245         return -1;
246     }
247     if (memcpy_s(buf, size, data, size) != EOK) {
248         delete[] buf;
249         return -1;
250     }
251     return WriteWithMem(buf, size);
252 }
253 
254 // Data's memory must be Malloc, and the callback FREE after this function is completed
WriteWithMem(uint8_t * data,int size)255 int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
256 {
257 #ifdef CONFIG_USE_JEMALLOC_DFX_INIF
258     mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
259     mallopt(M_SET_THREAD_CACHE, M_THREAD_CACHE_DISABLE);
260 #endif
261     auto contextIO = new(std::nothrow) CtxFileIO();
262     if (!contextIO) {
263         delete[] data;
264         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
265         callbackFinish(callerContext, true, "Memory alloc failed");
266         return -1;
267     }
268     contextIO->bufIO = data;
269     contextIO->size = static_cast<size_t>(size);
270     contextIO->thisClass = this;
271     PushWrite(contextIO);
272     NotifyWrite();
273     return size;
274 }
275 
IOWriteThread(void * object)276 void HdcFileDescriptor::IOWriteThread(void *object)
277 {
278     HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
279     while (hfd->workContinue) {
280         hfd->HandleWrite();
281         hfd->WaitWrite();
282     }
283 }
284 
PushWrite(CtxFileIO * cfio)285 void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
286 {
287     std::unique_lock<std::mutex> lock(writeMutex);
288     writeQueue.push(cfio);
289 }
290 
PopWrite()291 CtxFileIO *HdcFileDescriptor::PopWrite()
292 {
293     std::unique_lock<std::mutex> lock(writeMutex);
294     CtxFileIO *cfio = nullptr;
295     if (!writeQueue.empty()) {
296         cfio = writeQueue.front();
297         writeQueue.pop();
298     }
299     return cfio;
300 }
301 
NotifyWrite()302 void HdcFileDescriptor::NotifyWrite()
303 {
304     writeCond.notify_one();
305 }
306 
WaitWrite()307 void HdcFileDescriptor::WaitWrite()
308 {
309     std::unique_lock<std::mutex> lock(writeMutex);
310     writeCond.wait_for(lock, std::chrono::seconds(WAIT_SECONDS), [&]() {
311         return !writeQueue.empty() || !workContinue;
312     });
313 }
314 
HandleWrite()315 void HdcFileDescriptor::HandleWrite()
316 {
317     CtxFileIO *cfio = nullptr;
318     while ((cfio = PopWrite()) != nullptr) {
319         CtxFileIOWrite(cfio);
320         delete cfio;
321     }
322 }
323 
CtxFileIOWrite(CtxFileIO * cfio)324 void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
325 {
326     std::unique_lock<std::mutex> lock(writeMutex);
327     uint8_t *buf = cfio->bufIO;
328     uint8_t *data = buf;
329     size_t cnt = cfio->size;
330     constexpr int intrmax = 1000;
331     int intrcnt = 0;
332     while (cnt > 0) {
333         ssize_t rc = write(fdIO, data, cnt);
334         if (rc < 0) {
335             if (errno == EINTR || errno == EAGAIN) {
336                 if (++intrcnt > intrmax) {
337                     WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt errno:%d", fdIO, errno);
338                     intrcnt = 0;
339                 }
340                 continue;
341             } else {
342                 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
343                 break;
344             }
345         }
346         data += rc;
347         cnt -= static_cast<size_t>(rc);
348     }
349     delete[] buf;
350 }
351 }  // namespace Hdc
352