• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "file_descriptor.h"
16 
17 namespace Hdc {
18 static const int SECONDS_TIMEOUT = 5;
19 
HdcFileDescriptor(uv_loop_t * loopIn,int fdToRead,void * callerContextIn,CallBackWhenRead callbackReadIn,CmdResultCallback callbackFinishIn)20 HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
21                                      CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn)
22 {
23     loop = loopIn;
24     workContinue = true;
25     callbackFinish = callbackFinishIn;
26     callbackRead = callbackReadIn;
27     fdIO = fdToRead;
28     refIO = 0;
29     callerContext = callerContextIn;
30     ioWriteThread = std::thread(IOWriteThread, this);
31 }
32 
~HdcFileDescriptor()33 HdcFileDescriptor::~HdcFileDescriptor()
34 {
35     workContinue = false;
36     NotifyWrite();
37     ioWriteThread.join();
38     ioReadThread.join();
39     WRITE_LOG(LOG_FATAL, "~HdcFileDescriptor refIO:%d", refIO);
40 }
41 
ReadyForRelease()42 bool HdcFileDescriptor::ReadyForRelease()
43 {
44     return refIO == 0;
45 }
46 
47 // just tryCloseFdIo = true, callback will be effect
StopWorkOnThread(bool tryCloseFdIo,std::function<void ()> closeFdCallback)48 void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
49 {
50     workContinue = false;
51     callbackCloseFd = closeFdCallback;
52     if (tryCloseFdIo && refIO > 0) {
53         if (callbackCloseFd != nullptr) {
54             callbackCloseFd();
55         }
56     }
57 }
58 
FileIOOnThread(CtxFileIO * ctxIO,int bufSize,bool isWrite)59 void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize, bool isWrite)
60 {
61     HdcFileDescriptor *thisClass = ctxIO->thisClass;
62     uint8_t *buf = ctxIO->bufIO;
63     bool bFinish = false;
64     bool fetalFinish = false;
65     ssize_t nBytes;
66     fd_set rset;
67     struct timeval timeout;
68     timeout.tv_sec = SECONDS_TIMEOUT;
69     timeout.tv_usec = 0;
70 
71     while (true) {
72         if (thisClass->workContinue == false) {
73             WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
74             bFinish = true;
75             break;
76         }
77 
78         if (isWrite) {
79             nBytes = write(thisClass->fdIO, buf, bufSize);
80             if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
81                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d write interrupt", thisClass->fdIO);
82                 continue;
83             }
84             bufSize -= nBytes;
85         } else {
86             if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
87                 WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
88                 break;
89             }
90             FD_ZERO(&rset);
91             FD_SET(thisClass->fdIO, &rset);
92             int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
93             if (rc < 0) {
94                 WRITE_LOG(LOG_FATAL, "FileIOOnThread select fdIO:%d error:%d", thisClass->fdIO, errno);
95                 break;
96             } else if (rc == 0) {
97                 continue;
98             }
99             nBytes = read(thisClass->fdIO, buf, bufSize);
100             if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
101                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
102                 continue;
103             }
104         }
105         if (nBytes > 0) {
106             if (isWrite && bufSize == 0) {
107                 break;
108             } else if (!isWrite && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
109                 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
110                 bFinish = true;
111                 break;
112             }
113             continue;
114         } else {
115             if (nBytes != 0) {
116                 char buffer[BUF_SIZE_DEFAULT] = { 0 };
117 #ifdef HOST_MINGW
118                 strerror_s(buffer, BUF_SIZE_DEFAULT, errno);
119 #else
120                 strerror_r(errno, buffer, BUF_SIZE_DEFAULT);
121 #endif
122                 WRITE_LOG(LOG_DEBUG, "FileIOOnThread fd:%d failed:%s", thisClass->fdIO, buffer);
123             }
124             WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%u", thisClass->fdIO, nBytes);
125             bFinish = true;
126             fetalFinish = true;
127             break;
128         }
129     }
130     if (buf != nullptr) {
131         delete[] buf;
132         buf = nullptr;
133     }
134     delete ctxIO;
135 
136     --thisClass->refIO;
137     if (bFinish) {
138         thisClass->workContinue = false;
139         thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
140     }
141 }
142 
LoopReadOnThread()143 int HdcFileDescriptor::LoopReadOnThread()
144 {
145     int readMax = Base::GetMaxBufSize() * 1.2;
146     auto contextIO = new(std::nothrow) CtxFileIO();
147     auto buf = new(std::nothrow) uint8_t[readMax]();
148     if (!contextIO || !buf) {
149         if (contextIO) {
150             delete contextIO;
151         }
152         if (buf) {
153             delete[] buf;
154         }
155         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
156         callbackFinish(callerContext, true, "Memory alloc failed");
157         return -1;
158     }
159     contextIO->bufIO = buf;
160     contextIO->thisClass = this;
161     ++refIO;
162     ioReadThread = std::thread(FileIOOnThread, contextIO, readMax, false);
163     return 0;
164 }
165 
StartWorkOnThread()166 bool HdcFileDescriptor::StartWorkOnThread()
167 {
168     if (LoopReadOnThread() < 0) {
169         return false;
170     }
171     return true;
172 }
173 
Write(uint8_t * data,int size)174 int HdcFileDescriptor::Write(uint8_t *data, int size)
175 {
176     if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
177         size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
178     }
179     if (size <= 0) {
180         WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
181         return -1;
182     }
183     auto buf = new(std::nothrow) uint8_t[size];
184     if (!buf) {
185         return -1;
186     }
187     (void)memcpy_s(buf, size, data, size);
188     return WriteWithMem(buf, size);
189 }
190 
191 // Data's memory must be Malloc, and the callback FREE after this function is completed
WriteWithMem(uint8_t * data,int size)192 int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
193 {
194     auto contextIO = new(std::nothrow) CtxFileIO();
195     if (!contextIO) {
196         delete[] data;
197         WRITE_LOG(LOG_FATAL, "Memory alloc failed");
198         callbackFinish(callerContext, true, "Memory alloc failed");
199         return -1;
200     }
201     contextIO->bufIO = data;
202     contextIO->size = static_cast<size_t>(size);
203     contextIO->thisClass = this;
204     PushWrite(contextIO);
205     NotifyWrite();
206     return size;
207 }
208 
IOWriteThread(void * object)209 void HdcFileDescriptor::IOWriteThread(void *object)
210 {
211     HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
212     while (hfd->workContinue) {
213         hfd->HandleWrite();
214         hfd->WaitWrite();
215     }
216 }
217 
PushWrite(CtxFileIO * cfio)218 void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
219 {
220     std::unique_lock<std::mutex> lock(writeMutex);
221     writeQueue.push(cfio);
222 }
223 
PopWrite()224 CtxFileIO *HdcFileDescriptor::PopWrite()
225 {
226     std::unique_lock<std::mutex> lock(writeMutex);
227     CtxFileIO *cfio = nullptr;
228     if (!writeQueue.empty()) {
229         cfio = writeQueue.front();
230         writeQueue.pop();
231     }
232     return cfio;
233 }
234 
NotifyWrite()235 void HdcFileDescriptor::NotifyWrite()
236 {
237     std::unique_lock<std::mutex> lock(writeMutex);
238     writeCond.notify_one();
239 }
240 
WaitWrite()241 void HdcFileDescriptor::WaitWrite()
242 {
243     std::unique_lock<std::mutex> lock(writeMutex);
244     writeCond.wait(lock, [&]() { return !writeQueue.empty() || !workContinue; });
245 }
246 
HandleWrite()247 void HdcFileDescriptor::HandleWrite()
248 {
249     CtxFileIO *cfio = nullptr;
250     while ((cfio = PopWrite()) != nullptr) {
251         CtxFileIOWrite(cfio);
252         delete cfio;
253     }
254 }
255 
CtxFileIOWrite(CtxFileIO * cfio)256 void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
257 {
258     std::unique_lock<std::mutex> lock(writeMutex);
259     uint8_t *buf = cfio->bufIO;
260     uint8_t *data = buf;
261     size_t cnt = cfio->size;
262     while (cnt > 0) {
263         ssize_t rc = write(fdIO, data, cnt);
264         if (rc < 0 ) {
265             if (errno == EINTR || errno == EAGAIN) {
266                 WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt or again", fdIO);
267                 continue;
268             } else {
269                 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
270                 break;
271             }
272         }
273         data += rc;
274         cnt -= static_cast<size_t>(rc);
275     }
276     delete[] buf;
277 }
278 }  // namespace Hdc
279