1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "transfer.h"
16 #include "serial_struct.h"
17 #include <sys/stat.h>
18 #ifdef HARMONY_PROJECT
19 #include <lz4.h>
20 #endif
21
22 namespace Hdc {
23 constexpr uint64_t HDC_TIME_CONVERT_BASE = 1000000000;
24
HdcTransferBase(HTaskInfo hTaskInfo)25 HdcTransferBase::HdcTransferBase(HTaskInfo hTaskInfo)
26 : HdcTaskBase(hTaskInfo)
27 {
28 ResetCtx(&ctxNow, true);
29 commandBegin = 0;
30 commandData = 0;
31 }
32
~HdcTransferBase()33 HdcTransferBase::~HdcTransferBase()
34 {
35 WRITE_LOG(LOG_DEBUG, "~HdcTransferBase");
36 };
37
ResetCtx(CtxFile * context,bool full)38 bool HdcTransferBase::ResetCtx(CtxFile *context, bool full)
39 {
40 if (full) {
41 *context = {};
42 context->fsOpenReq.data = context;
43 context->fsCloseReq.data = context;
44 context->thisClass = this;
45 context->loop = loopTask;
46 context->cb = OnFileIO;
47 }
48 context->closeNotify = false;
49 context->indexIO = 0;
50 context->lastErrno = 0;
51 context->ioFinish = false;
52 return true;
53 }
54
SimpleFileIO(CtxFile * context,uint64_t index,uint8_t * sendBuf,int bytes)55 int HdcTransferBase::SimpleFileIO(CtxFile *context, uint64_t index, uint8_t *sendBuf, int bytes)
56 {
57 // The first 8 bytes file offset
58 uint8_t *buf = new uint8_t[bytes]();
59 CtxFileIO *ioContext = new CtxFileIO();
60 bool ret = false;
61 while (true) {
62 if (!buf || !ioContext || bytes < 0) {
63 WRITE_LOG(LOG_DEBUG, "SimpleFileIO param check failed");
64 break;
65 }
66 if (context->ioFinish) {
67 WRITE_LOG(LOG_DEBUG, "SimpleFileIO to closed IOStream");
68 break;
69 }
70 uv_fs_t *req = &ioContext->fs;
71 ioContext->bufIO = buf;
72 ioContext->context = context;
73 req->data = ioContext;
74 ++refCount;
75 if (context->master) { // master just read, and slave just write.when master/read, sendBuf can be nullptr
76 uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(buf), bytes);
77 uv_fs_read(context->loop, req, context->fsOpenReq.result, &iov, 1, index, context->cb);
78 } else {
79 // The US_FS_WRITE here must be brought into the actual file offset, which cannot be incorporated with local
80 // accumulated index because UV_FS_WRITE will be executed multiple times and then trigger a callback.
81 if (bytes > 0 && memcpy_s(ioContext->bufIO, bytes, sendBuf, bytes) != EOK) {
82 WRITE_LOG(LOG_WARN, "SimpleFileIO memcpy error");
83 break;
84 }
85 uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(ioContext->bufIO), bytes);
86 uv_fs_write(context->loop, req, context->fsOpenReq.result, &iov, 1, index, context->cb);
87 }
88 ret = true;
89 break;
90 }
91 if (!ret) {
92 if (buf != nullptr) {
93 delete[] buf;
94 buf = nullptr;
95 }
96 if (ioContext != nullptr) {
97 delete ioContext;
98 ioContext = nullptr;
99 }
100 return -1;
101 }
102 return bytes;
103 }
104
OnFileClose(uv_fs_t * req)105 void HdcTransferBase::OnFileClose(uv_fs_t *req)
106 {
107 uv_fs_req_cleanup(req);
108 CtxFile *context = (CtxFile *)req->data;
109 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
110 if (context->closeNotify) {
111 // close-step2
112 // maybe successful finish or failed finish
113 thisClass->WhenTransferFinish(context);
114 }
115 --thisClass->refCount;
116 return;
117 }
118
SetFileTime(CtxFile * context)119 void HdcTransferBase::SetFileTime(CtxFile *context)
120 {
121 if (!context->transferConfig.holdTimestamp) {
122 return;
123 }
124 if (!context->transferConfig.mtime) {
125 return;
126 }
127 uv_fs_t fs;
128 double aTimeSec = static_cast<long double>(context->transferConfig.atime) / HDC_TIME_CONVERT_BASE;
129 double mTimeSec = static_cast<long double>(context->transferConfig.mtime) / HDC_TIME_CONVERT_BASE;
130 uv_fs_futime(nullptr, &fs, context->fsOpenReq.result, aTimeSec, mTimeSec, nullptr);
131 uv_fs_req_cleanup(&fs);
132 }
133
SendIOPayload(CtxFile * context,int index,uint8_t * data,int dataSize)134 bool HdcTransferBase::SendIOPayload(CtxFile *context, int index, uint8_t *data, int dataSize)
135 {
136 TransferPayload payloadHead;
137 string head;
138 int compressSize = 0;
139 int sendBufSize = payloadPrefixReserve + dataSize;
140 uint8_t *sendBuf = new uint8_t[sendBufSize]();
141 if (!sendBuf) {
142 return false;
143 }
144 payloadHead.compressType = context->transferConfig.compressType;
145 payloadHead.uncompressSize = dataSize;
146 payloadHead.index = index;
147 if (dataSize > 0) {
148 switch (payloadHead.compressType) {
149 #ifdef HARMONY_PROJECT
150 case COMPRESS_LZ4: {
151 compressSize = LZ4_compress_default((const char *)data, (char *)sendBuf + payloadPrefixReserve,
152 dataSize, dataSize);
153 break;
154 }
155 #endif
156 default: { // COMPRESS_NONE
157 if (memcpy_s(sendBuf + payloadPrefixReserve, sendBufSize - payloadPrefixReserve, data, dataSize)
158 != EOK) {
159 delete[] sendBuf;
160 return false;
161 }
162 compressSize = dataSize;
163 break;
164 }
165 }
166 }
167 payloadHead.compressSize = compressSize;
168 head = SerialStruct::SerializeToString(payloadHead);
169 if (head.size() + 1 > payloadPrefixReserve) {
170 delete[] sendBuf;
171 return false;
172 }
173 int errCode = memcpy_s(sendBuf, sendBufSize, head.c_str(), head.size() + 1);
174 if (errCode != EOK) {
175 delete[] sendBuf;
176 return false;
177 }
178 bool ret = SendToAnother(commandData, sendBuf, payloadPrefixReserve + compressSize) > 0;
179 delete[] sendBuf;
180 return ret;
181 }
182
OnFileIO(uv_fs_t * req)183 void HdcTransferBase::OnFileIO(uv_fs_t *req)
184 {
185 CtxFileIO *contextIO = (CtxFileIO *)req->data;
186 CtxFile *context = (CtxFile *)contextIO->context;
187 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
188 uint8_t *bufIO = contextIO->bufIO;
189 uv_fs_req_cleanup(req);
190 while (true) {
191 if (context->ioFinish) {
192 break;
193 }
194 if (req->result < 0) {
195 constexpr int bufSize = 1024;
196 char buf[bufSize] = { 0 };
197 uv_strerror_r((int)req->result, buf, bufSize);
198 WRITE_LOG(LOG_DEBUG, "OnFileIO error: %s", buf);
199 context->closeNotify = true;
200 context->lastErrno = abs(req->result);
201 context->ioFinish = true;
202 break;
203 }
204 context->indexIO += req->result;
205 if (req->fs_type == UV_FS_READ) {
206 #ifdef HDC_DEBUG
207 WRITE_LOG(LOG_DEBUG, "read file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
208 context->fileSize);
209 #endif // HDC_DEBUG
210 if (!thisClass->SendIOPayload(context, context->indexIO - req->result, bufIO, req->result)) {
211 context->ioFinish = true;
212 break;
213 }
214 if (context->indexIO < context->fileSize) {
215 thisClass->SimpleFileIO(context, context->indexIO, nullptr,
216 Base::GetMaxBufSize() * thisClass->maxTransferBufFactor);
217 } else {
218 context->ioFinish = true;
219 }
220 } else if (req->fs_type == UV_FS_WRITE) { // write
221 #ifdef HDC_DEBUG
222 WRITE_LOG(LOG_DEBUG, "write file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
223 context->fileSize);
224 #endif // HDC_DEBUG
225 if (context->indexIO >= context->fileSize) {
226 // The active end must first read it first, but you can't make Finish first, because Slave may not
227 // end.Only slave receives complete talents Finish
228 context->closeNotify = true;
229 context->ioFinish = true;
230 thisClass->SetFileTime(context);
231 }
232 } else {
233 context->ioFinish = true;
234 }
235 break;
236 }
237 if (context->ioFinish) {
238 // close-step1
239 ++thisClass->refCount;
240 if (req->fs_type == UV_FS_WRITE) {
241 uv_fs_fsync(thisClass->loopTask, &context->fsCloseReq, context->fsOpenReq.result, nullptr);
242 }
243 uv_fs_close(thisClass->loopTask, &context->fsCloseReq, context->fsOpenReq.result, OnFileClose);
244 }
245 --thisClass->refCount;
246 delete[] bufIO;
247 delete contextIO; // Req is part of the Contextio structure, no free release
248 }
249
OnFileOpen(uv_fs_t * req)250 void HdcTransferBase::OnFileOpen(uv_fs_t *req)
251 {
252 CtxFile *context = (CtxFile *)req->data;
253 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
254 uv_fs_req_cleanup(req);
255 WRITE_LOG(LOG_DEBUG, "Filemod openfile:%s", context->localPath.c_str());
256 --thisClass->refCount;
257 if (req->result < 0) {
258 constexpr int bufSize = 1024;
259 char buf[bufSize] = { 0 };
260 uv_strerror_r((int)req->result, buf, bufSize);
261 thisClass->LogMsg(MSG_FAIL, "Error opening file: %s, path:%s", buf,
262 context->localPath.c_str());
263 thisClass->TaskFinish();
264 return;
265 }
266 thisClass->ResetCtx(context);
267 if (context->master) {
268 // init master
269 uv_fs_t fs = {};
270 uv_fs_fstat(nullptr, &fs, context->fsOpenReq.result, nullptr);
271 TransferConfig &st = context->transferConfig;
272 st.fileSize = fs.statbuf.st_size;
273 st.optionalName = context->localName;
274 if (st.holdTimestamp) {
275 st.atime = fs.statbuf.st_atim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_atim.tv_nsec;
276 st.mtime = fs.statbuf.st_mtim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_mtim.tv_nsec;
277 }
278 st.path = context->remotePath;
279 // update ctxNow=context child value
280 context->fileSize = st.fileSize;
281
282 uv_fs_req_cleanup(&fs);
283 thisClass->CheckMaster(context);
284 } else { // write
285 thisClass->SendToAnother(thisClass->commandBegin, nullptr, 0);
286 }
287 }
288
MatchPackageExtendName(string fileName,string extName)289 bool HdcTransferBase::MatchPackageExtendName(string fileName, string extName)
290 {
291 bool match = false;
292 int subfixIndex = fileName.rfind(extName);
293 if ((fileName.size() - subfixIndex) != extName.size()) {
294 return false;
295 }
296 match = true;
297 return match;
298 }
299
300 // filter can be empty
GetSubFiles(const char * path,string filter,vector<string> * out)301 int HdcTransferBase::GetSubFiles(const char *path, string filter, vector<string> *out)
302 {
303 int retNum = 0;
304 uv_fs_t req = {};
305 uv_dirent_t dent;
306 vector<string> filterStrings;
307 if (!strlen(path)) {
308 return retNum;
309 }
310 if (filter.size()) {
311 Base::SplitString(filter, ";", filterStrings);
312 }
313
314 if (uv_fs_scandir(nullptr, &req, path, 0, nullptr) < 0) {
315 uv_fs_req_cleanup(&req);
316 return retNum;
317 }
318 while (uv_fs_scandir_next(&req, &dent) != UV_EOF) {
319 // Skip. File
320 if (strcmp(dent.name, ".") == 0 || strcmp(dent.name, "..") == 0)
321 continue;
322 if (!(static_cast<uint32_t>(dent.type) & UV_DIRENT_FILE))
323 continue;
324 string fileName = dent.name;
325 for (auto &&s : filterStrings) {
326 int subfixIndex = fileName.rfind(s);
327 if ((fileName.size() - subfixIndex) != s.size())
328 continue;
329 string fullPath = string(path) + "/";
330 fullPath += fileName;
331 out->push_back(fullPath);
332 ++retNum;
333 }
334 }
335 uv_fs_req_cleanup(&req);
336 return retNum;
337 }
338
339 // https://en.cppreference.com/w/cpp/filesystem/is_directory
340 // return true if file exist, false if file not exist
SmartSlavePath(string & cwd,string & localPath,const char * optName)341 bool HdcTransferBase::SmartSlavePath(string &cwd, string &localPath, const char *optName)
342 {
343 string errStr;
344 if (taskInfo->serverOrDaemon) {
345 // slave and server
346 ExtractRelativePath(cwd, localPath);
347 }
348 if (Base::CheckDirectoryOrPath(localPath.c_str(), true, false, errStr)) {
349 WRITE_LOG(LOG_INFO, "%s", errStr.c_str());
350 return true;
351 }
352 uv_fs_t req;
353 int r = uv_fs_lstat(nullptr, &req, localPath.c_str(), nullptr);
354 uv_fs_req_cleanup(&req);
355 if (r == 0 && req.statbuf.st_mode & S_IFDIR) { // is dir
356 localPath = Base::StringFormat("%s%c%s", localPath.c_str(), Base::GetPathSep(), optName);
357 }
358 return false;
359 }
360
RecvIOPayload(CtxFile * context,uint8_t * data,int dataSize)361 bool HdcTransferBase::RecvIOPayload(CtxFile *context, uint8_t *data, int dataSize)
362 {
363 uint8_t *clearBuf = nullptr;
364 string serialStrring((char *)data, payloadPrefixReserve);
365 TransferPayload pld;
366 bool ret = false;
367 SerialStruct::ParseFromString(pld, serialStrring);
368 clearBuf = new uint8_t[pld.uncompressSize]();
369 if (!clearBuf) {
370 return false;
371 }
372 int clearSize = 0;
373 if (pld.compressSize > 0) {
374 switch (pld.compressType) {
375 #ifdef HARMONY_PROJECT
376 case COMPRESS_LZ4: {
377 clearSize = LZ4_decompress_safe((const char *)data + payloadPrefixReserve, (char *)clearBuf,
378 pld.compressSize, pld.uncompressSize);
379 break;
380 }
381 #endif
382 default: { // COMPRESS_NONE
383 if (memcpy_s(clearBuf, pld.uncompressSize, data + payloadPrefixReserve, pld.compressSize) != EOK) {
384 delete[] clearBuf;
385 return false;
386 }
387 clearSize = pld.compressSize;
388 break;
389 }
390 }
391 }
392 while (true) {
393 if ((uint32_t)clearSize != pld.uncompressSize) {
394 break;
395 }
396 if (SimpleFileIO(context, pld.index, clearBuf, clearSize) < 0) {
397 break;
398 }
399 ret = true;
400 break;
401 }
402 delete[] clearBuf;
403 return ret;
404 }
405
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)406 bool HdcTransferBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
407 {
408 bool ret = true;
409 while (true) {
410 if (command == commandBegin) {
411 CtxFile *context = &ctxNow;
412 int ioRet = SimpleFileIO(context, context->indexIO, nullptr, Base::GetMaxBufSize() * maxTransferBufFactor);
413 if (ioRet < 0) {
414 ret = false;
415 break;
416 }
417 context->transferBegin = Base::GetRuntimeMSec();
418 } else if (command == commandData) {
419 if ((uint32_t)payloadSize > HDC_BUF_MAX_BYTES || payloadSize < 0) {
420 ret = false;
421 break;
422 }
423 // Note, I will trigger FileIO after multiple times.
424 CtxFile *context = &ctxNow;
425 if (!RecvIOPayload(context, payload, payloadSize)) {
426 ret = false;
427 break;
428 }
429 } else {
430 // Other subclass commands
431 }
432 break;
433 }
434 return ret;
435 }
436
ExtractRelativePath(string & cwd,string & path)437 void HdcTransferBase::ExtractRelativePath(string &cwd, string &path)
438 {
439 bool absPath = Base::IsAbsolutePath(path);
440 if (!absPath) {
441 path = cwd + path;
442 }
443 }
444 } // namespace Hdc
445