1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "file_descriptor.h"
16
17 namespace Hdc {
18 static const int SECONDS_TIMEOUT = 5;
19
HdcFileDescriptor(uv_loop_t * loopIn,int fdToRead,void * callerContextIn,CallBackWhenRead callbackReadIn,CmdResultCallback callbackFinishIn)20 HdcFileDescriptor::HdcFileDescriptor(uv_loop_t *loopIn, int fdToRead, void *callerContextIn,
21 CallBackWhenRead callbackReadIn, CmdResultCallback callbackFinishIn)
22 {
23 loop = loopIn;
24 workContinue = true;
25 callbackFinish = callbackFinishIn;
26 callbackRead = callbackReadIn;
27 fdIO = fdToRead;
28 refIO = 0;
29 callerContext = callerContextIn;
30 ioWriteThread = std::thread(IOWriteThread, this);
31 }
32
~HdcFileDescriptor()33 HdcFileDescriptor::~HdcFileDescriptor()
34 {
35 workContinue = false;
36 NotifyWrite();
37 ioWriteThread.join();
38 ioReadThread.join();
39 WRITE_LOG(LOG_FATAL, "~HdcFileDescriptor refIO:%d", refIO);
40 }
41
ReadyForRelease()42 bool HdcFileDescriptor::ReadyForRelease()
43 {
44 return refIO == 0;
45 }
46
47 // just tryCloseFdIo = true, callback will be effect
StopWorkOnThread(bool tryCloseFdIo,std::function<void ()> closeFdCallback)48 void HdcFileDescriptor::StopWorkOnThread(bool tryCloseFdIo, std::function<void()> closeFdCallback)
49 {
50 workContinue = false;
51 callbackCloseFd = closeFdCallback;
52 if (tryCloseFdIo && refIO > 0) {
53 if (callbackCloseFd != nullptr) {
54 callbackCloseFd();
55 }
56 }
57 }
58
FileIOOnThread(CtxFileIO * ctxIO,int bufSize,bool isWrite)59 void HdcFileDescriptor::FileIOOnThread(CtxFileIO *ctxIO, int bufSize, bool isWrite)
60 {
61 HdcFileDescriptor *thisClass = ctxIO->thisClass;
62 uint8_t *buf = ctxIO->bufIO;
63 bool bFinish = false;
64 bool fetalFinish = false;
65 ssize_t nBytes;
66 fd_set rset;
67 struct timeval timeout;
68 timeout.tv_sec = SECONDS_TIMEOUT;
69 timeout.tv_usec = 0;
70
71 while (true) {
72 if (thisClass->workContinue == false) {
73 WRITE_LOG(LOG_INFO, "FileIOOnThread fdIO:%d workContinue false", thisClass->fdIO);
74 bFinish = true;
75 break;
76 }
77
78 if (isWrite) {
79 nBytes = write(thisClass->fdIO, buf, bufSize);
80 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
81 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d write interrupt", thisClass->fdIO);
82 continue;
83 }
84 bufSize -= nBytes;
85 } else {
86 if (memset_s(buf, bufSize, 0, bufSize) != EOK) {
87 WRITE_LOG(LOG_DEBUG, "FileIOOnThread buf memset_s fail.");
88 break;
89 }
90 FD_ZERO(&rset);
91 FD_SET(thisClass->fdIO, &rset);
92 int rc = select(thisClass->fdIO + 1, &rset, nullptr, nullptr, &timeout);
93 if (rc < 0) {
94 WRITE_LOG(LOG_FATAL, "FileIOOnThread select fdIO:%d error:%d", thisClass->fdIO, errno);
95 break;
96 } else if (rc == 0) {
97 continue;
98 }
99 nBytes = read(thisClass->fdIO, buf, bufSize);
100 if (nBytes < 0 && (errno == EINTR || errno == EAGAIN)) {
101 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d read interrupt", thisClass->fdIO);
102 continue;
103 }
104 }
105 if (nBytes > 0) {
106 if (isWrite && bufSize == 0) {
107 break;
108 } else if (!isWrite && !thisClass->callbackRead(thisClass->callerContext, buf, nBytes)) {
109 WRITE_LOG(LOG_WARN, "FileIOOnThread fdIO:%d callbackRead false", thisClass->fdIO);
110 bFinish = true;
111 break;
112 }
113 continue;
114 } else {
115 if (nBytes != 0) {
116 char buffer[BUF_SIZE_DEFAULT] = { 0 };
117 #ifdef HOST_MINGW
118 strerror_s(buffer, BUF_SIZE_DEFAULT, errno);
119 #else
120 strerror_r(errno, buffer, BUF_SIZE_DEFAULT);
121 #endif
122 WRITE_LOG(LOG_DEBUG, "FileIOOnThread fd:%d failed:%s", thisClass->fdIO, buffer);
123 }
124 WRITE_LOG(LOG_INFO, "FileIOOnThread fd:%d nBytes:%u", thisClass->fdIO, nBytes);
125 bFinish = true;
126 fetalFinish = true;
127 break;
128 }
129 }
130 if (buf != nullptr) {
131 delete[] buf;
132 buf = nullptr;
133 }
134 delete ctxIO;
135
136 --thisClass->refIO;
137 if (bFinish) {
138 thisClass->workContinue = false;
139 thisClass->callbackFinish(thisClass->callerContext, fetalFinish, STRING_EMPTY);
140 }
141 }
142
LoopReadOnThread()143 int HdcFileDescriptor::LoopReadOnThread()
144 {
145 int readMax = Base::GetMaxBufSize() * 1.2;
146 auto contextIO = new(std::nothrow) CtxFileIO();
147 auto buf = new(std::nothrow) uint8_t[readMax]();
148 if (!contextIO || !buf) {
149 if (contextIO) {
150 delete contextIO;
151 }
152 if (buf) {
153 delete[] buf;
154 }
155 WRITE_LOG(LOG_FATAL, "Memory alloc failed");
156 callbackFinish(callerContext, true, "Memory alloc failed");
157 return -1;
158 }
159 contextIO->bufIO = buf;
160 contextIO->thisClass = this;
161 ++refIO;
162 ioReadThread = std::thread(FileIOOnThread, contextIO, readMax, false);
163 return 0;
164 }
165
StartWorkOnThread()166 bool HdcFileDescriptor::StartWorkOnThread()
167 {
168 if (LoopReadOnThread() < 0) {
169 return false;
170 }
171 return true;
172 }
173
Write(uint8_t * data,int size)174 int HdcFileDescriptor::Write(uint8_t *data, int size)
175 {
176 if (size > static_cast<int>(HDC_BUF_MAX_BYTES - 1)) {
177 size = static_cast<int>(HDC_BUF_MAX_BYTES - 1);
178 }
179 if (size <= 0) {
180 WRITE_LOG(LOG_WARN, "Write failed, size:%d", size);
181 return -1;
182 }
183 auto buf = new(std::nothrow) uint8_t[size];
184 if (!buf) {
185 return -1;
186 }
187 (void)memcpy_s(buf, size, data, size);
188 return WriteWithMem(buf, size);
189 }
190
191 // Data's memory must be Malloc, and the callback FREE after this function is completed
WriteWithMem(uint8_t * data,int size)192 int HdcFileDescriptor::WriteWithMem(uint8_t *data, int size)
193 {
194 auto contextIO = new(std::nothrow) CtxFileIO();
195 if (!contextIO) {
196 delete[] data;
197 WRITE_LOG(LOG_FATAL, "Memory alloc failed");
198 callbackFinish(callerContext, true, "Memory alloc failed");
199 return -1;
200 }
201 contextIO->bufIO = data;
202 contextIO->size = static_cast<size_t>(size);
203 contextIO->thisClass = this;
204 PushWrite(contextIO);
205 NotifyWrite();
206 return size;
207 }
208
IOWriteThread(void * object)209 void HdcFileDescriptor::IOWriteThread(void *object)
210 {
211 HdcFileDescriptor *hfd = reinterpret_cast<HdcFileDescriptor *>(object);
212 while (hfd->workContinue) {
213 hfd->HandleWrite();
214 hfd->WaitWrite();
215 }
216 }
217
PushWrite(CtxFileIO * cfio)218 void HdcFileDescriptor::PushWrite(CtxFileIO *cfio)
219 {
220 std::unique_lock<std::mutex> lock(writeMutex);
221 writeQueue.push(cfio);
222 }
223
PopWrite()224 CtxFileIO *HdcFileDescriptor::PopWrite()
225 {
226 std::unique_lock<std::mutex> lock(writeMutex);
227 CtxFileIO *cfio = nullptr;
228 if (!writeQueue.empty()) {
229 cfio = writeQueue.front();
230 writeQueue.pop();
231 }
232 return cfio;
233 }
234
NotifyWrite()235 void HdcFileDescriptor::NotifyWrite()
236 {
237 std::unique_lock<std::mutex> lock(writeMutex);
238 writeCond.notify_one();
239 }
240
WaitWrite()241 void HdcFileDescriptor::WaitWrite()
242 {
243 std::unique_lock<std::mutex> lock(writeMutex);
244 writeCond.wait(lock, [&]() { return !writeQueue.empty() || !workContinue; });
245 }
246
HandleWrite()247 void HdcFileDescriptor::HandleWrite()
248 {
249 CtxFileIO *cfio = nullptr;
250 while ((cfio = PopWrite()) != nullptr) {
251 CtxFileIOWrite(cfio);
252 delete cfio;
253 }
254 }
255
CtxFileIOWrite(CtxFileIO * cfio)256 void HdcFileDescriptor::CtxFileIOWrite(CtxFileIO *cfio)
257 {
258 std::unique_lock<std::mutex> lock(writeMutex);
259 uint8_t *buf = cfio->bufIO;
260 uint8_t *data = buf;
261 size_t cnt = cfio->size;
262 while (cnt > 0) {
263 ssize_t rc = write(fdIO, data, cnt);
264 if (rc < 0 ) {
265 if (errno == EINTR || errno == EAGAIN) {
266 WRITE_LOG(LOG_WARN, "CtxFileIOWrite fdIO:%d interrupt or again", fdIO);
267 continue;
268 } else {
269 WRITE_LOG(LOG_FATAL, "CtxFileIOWrite fdIO:%d rc:%d error:%d", fdIO, rc, errno);
270 break;
271 }
272 }
273 data += rc;
274 cnt -= static_cast<size_t>(rc);
275 }
276 delete[] buf;
277 }
278 } // namespace Hdc
279