1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "transfer.h"
16 #include "serial_struct.h"
17 #include <sys/stat.h>
18 #ifdef HARMONY_PROJECT
19 #include <lz4.h>
20 #endif
21 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
22 #include <selinux/selinux.h>
23 #endif
24 namespace Hdc {
25 constexpr uint64_t HDC_TIME_CONVERT_BASE = 1000000000;
26
27
HdcTransferBase(HTaskInfo hTaskInfo)28 HdcTransferBase::HdcTransferBase(HTaskInfo hTaskInfo)
29 : HdcTaskBase(hTaskInfo)
30 {
31 ResetCtx(&ctxNow, true);
32 commandBegin = 0;
33 commandData = 0;
34 isStableBuf = false;
35 }
36
~HdcTransferBase()37 HdcTransferBase::~HdcTransferBase()
38 {
39 if (ctxNow.isFdOpen) {
40 WRITE_LOG(LOG_DEBUG, "~HdcTransferBase channelId:%u lastErrno:%u result:%d ioFinish:%d",
41 taskInfo->channelId, ctxNow.lastErrno, ctxNow.fsOpenReq.result, ctxNow.ioFinish);
42
43 if (ctxNow.lastErrno != 0 || (ctxNow.fsOpenReq.result > 0 && !ctxNow.ioFinish)) {
44 uv_fs_close(nullptr, &ctxNow.fsCloseReq, ctxNow.fsOpenReq.result, nullptr);
45 ctxNow.isFdOpen = false;
46 }
47 } else {
48 WRITE_LOG(LOG_DEBUG, "~HdcTransferBase channelId:%u lastErrno:%u ioFinish:%d",
49 taskInfo->channelId, ctxNow.lastErrno, ctxNow.ioFinish);
50 }
51 };
52
ResetCtx(CtxFile * context,bool full)53 bool HdcTransferBase::ResetCtx(CtxFile *context, bool full)
54 {
55 if (full) {
56 *context = {};
57 context->fsOpenReq.data = context;
58 context->fsCloseReq.data = context;
59 context->thisClass = this;
60 context->loop = loopTask;
61 context->cb = OnFileIO;
62 }
63 context->closeNotify = false;
64 context->indexIO = 0;
65 context->lastErrno = 0;
66 context->ioFinish = false;
67 context->closeReqSubmitted = false;
68 return true;
69 }
70
SimpleFileIO(CtxFile * context,uint64_t index,uint8_t * sendBuf,int bytes)71 int HdcTransferBase::SimpleFileIO(CtxFile *context, uint64_t index, uint8_t *sendBuf, int bytes)
72 {
73 StartTraceScope("HdcTransferBase::SimpleFileIO");
74 // The first 8 bytes file offset
75 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
76 uint8_t *buf = cirbuf.Malloc();
77 #else
78 uint8_t *buf = new uint8_t[bytes + payloadPrefixReserve]();
79 #endif
80 if (buf == nullptr) {
81 WRITE_LOG(LOG_FATAL, "SimpleFileIO buf nullptr");
82 return -1;
83 }
84 CtxFileIO *ioContext = new(std::nothrow) CtxFileIO();
85 if (ioContext == nullptr) {
86 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
87 cirbuf.Free(buf);
88 #else
89 delete[] buf;
90 #endif
91 WRITE_LOG(LOG_FATAL, "SimpleFileIO ioContext nullptr");
92 return -1;
93 }
94 bool ret = false;
95 while (true) {
96 size_t bufMaxSize = context->isStableBufSize ?
97 static_cast<size_t>(Base::GetUsbffsBulkSizeStable() - payloadPrefixReserve) :
98 static_cast<size_t>(Base::GetUsbffsBulkSize() - payloadPrefixReserve);
99 if (bytes < 0 || static_cast<size_t>(bytes) > bufMaxSize) {
100 WRITE_LOG(LOG_DEBUG, "SimpleFileIO param check failed");
101 break;
102 }
103 if (context->ioFinish) {
104 WRITE_LOG(LOG_DEBUG, "SimpleFileIO to closed IOStream");
105 break;
106 }
107 uv_fs_t *req = &ioContext->fs;
108 ioContext->bufIO = buf + payloadPrefixReserve;
109 ioContext->context = context;
110 req->data = ioContext;
111 ++refCount;
112 if (context->master) { // master just read, and slave just write.when master/read, sendBuf can be nullptr
113 uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(ioContext->bufIO), bytes);
114 uv_fs_read(context->loop, req, context->fsOpenReq.result, &iov, 1, index, context->cb);
115 } else {
116 // The US_FS_WRITE here must be brought into the actual file offset, which cannot be incorporated with local
117 // accumulated index because UV_FS_WRITE will be executed multiple times and then trigger a callback.
118 if (bytes > 0 && memcpy_s(ioContext->bufIO, bufMaxSize, sendBuf, bytes) != EOK) {
119 WRITE_LOG(LOG_WARN, "SimpleFileIO memcpy error");
120 break;
121 }
122 uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(ioContext->bufIO), bytes);
123 uv_fs_write(context->loop, req, context->fsOpenReq.result, &iov, 1, index, context->cb);
124 }
125 ret = true;
126 break;
127 }
128 if (!ret) {
129 if (ioContext != nullptr) {
130 delete ioContext;
131 ioContext = nullptr;
132 }
133 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
134 cirbuf.Free(buf);
135 #else
136 delete[] buf;
137 #endif
138 return -1;
139 }
140 return bytes;
141 }
142
OnFileClose(uv_fs_t * req)143 void HdcTransferBase::OnFileClose(uv_fs_t *req)
144 {
145 StartTraceScope("HdcTransferBase::OnFileClose");
146 uv_fs_req_cleanup(req);
147 CtxFile *context = (CtxFile *)req->data;
148 context->closeReqSubmitted = false;
149 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
150 if (context->closeNotify) {
151 // close-step2
152 // maybe successful finish or failed finish
153 thisClass->WhenTransferFinish(context);
154 }
155 --thisClass->refCount;
156 return;
157 }
158
SetFileTime(CtxFile * context)159 void HdcTransferBase::SetFileTime(CtxFile *context)
160 {
161 if (!context->transferConfig.holdTimestamp) {
162 return;
163 }
164 if (!context->transferConfig.mtime) {
165 return;
166 }
167 uv_fs_t fs;
168 double aTimeSec = static_cast<long double>(context->transferConfig.atime) / HDC_TIME_CONVERT_BASE;
169 double mTimeSec = static_cast<long double>(context->transferConfig.mtime) / HDC_TIME_CONVERT_BASE;
170 uv_fs_futime(nullptr, &fs, context->fsOpenReq.result, aTimeSec, mTimeSec, nullptr);
171 uv_fs_req_cleanup(&fs);
172 }
173
SendIOPayload(CtxFile * context,uint64_t index,uint8_t * data,int dataSize)174 bool HdcTransferBase::SendIOPayload(CtxFile *context, uint64_t index, uint8_t *data, int dataSize)
175 {
176 TransferPayload payloadHead;
177 string head;
178 int compressSize = 0;
179 int sendBufSize = payloadPrefixReserve + dataSize;
180 uint8_t *sendBuf = data - payloadPrefixReserve;
181 bool ret = false;
182
183 StartTraceScope("HdcTransferBase::SendIOPayload");
184 payloadHead.compressType = context->transferConfig.compressType;
185 payloadHead.uncompressSize = dataSize;
186 payloadHead.index = index;
187 if (dataSize > 0) {
188 switch (payloadHead.compressType) {
189 #ifdef HARMONY_PROJECT
190 case COMPRESS_LZ4: {
191 sendBuf = new uint8_t[sendBufSize]();
192 if (!sendBuf) {
193 WRITE_LOG(LOG_FATAL, "alloc LZ4 buffer failed");
194 return false;
195 }
196 compressSize = LZ4_compress_default((const char *)data, (char *)sendBuf + payloadPrefixReserve,
197 dataSize, dataSize);
198 break;
199 }
200 #endif
201 default: { // COMPRESS_NONE
202 compressSize = dataSize;
203 break;
204 }
205 }
206 }
207 payloadHead.compressSize = compressSize;
208 head = SerialStruct::SerializeToString(payloadHead);
209 if (head.size() + 1 > payloadPrefixReserve) {
210 goto out;
211 }
212 if (EOK != memcpy_s(sendBuf, sendBufSize, head.c_str(), head.size() + 1)) {
213 goto out;
214 }
215 ret = SendToAnother(commandData, sendBuf, payloadPrefixReserve + compressSize) > 0;
216
217 out:
218 if (dataSize > 0 && payloadHead.compressType == COMPRESS_LZ4) {
219 delete[] sendBuf;
220 }
221 return ret;
222 }
223
OnFileIO(uv_fs_t * req)224 void HdcTransferBase::OnFileIO(uv_fs_t *req)
225 {
226 CtxFileIO *contextIO = reinterpret_cast<CtxFileIO *>(req->data);
227 CtxFile *context = reinterpret_cast<CtxFile *>(contextIO->context);
228 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
229 uint8_t *bufIO = contextIO->bufIO;
230 StartTraceScope("HdcTransferBase::OnFileIO");
231 uv_fs_req_cleanup(req);
232 while (true) {
233 if (context->ioFinish) {
234 break;
235 }
236 if (req->result < 0) {
237 constexpr int bufSize = 1024;
238 char buf[bufSize] = { 0 };
239 uv_strerror_r((int)req->result, buf, bufSize);
240 WRITE_LOG(LOG_DEBUG, "OnFileIO error: %s", buf);
241 context->closeNotify = true;
242 context->lastErrno = abs(req->result);
243 context->ioFinish = true;
244 break;
245 }
246 context->indexIO += req->result;
247 if (req->fs_type == UV_FS_READ) {
248 #ifdef HDC_DEBUG
249 WRITE_LOG(LOG_DEBUG, "read file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
250 context->fileSize);
251 #endif // HDC_DEBUG
252 if (!thisClass->SendIOPayload(context, context->indexIO - req->result, bufIO, req->result)) {
253 context->ioFinish = true;
254 break;
255 }
256 if (req->result == 0) {
257 context->ioFinish = true;
258 WRITE_LOG(LOG_DEBUG, "path:%s fd:%d eof",
259 context->localPath.c_str(), context->fsOpenReq.result);
260 break;
261 }
262 if (context->indexIO < context->fileSize) {
263 thisClass->SimpleFileIO(context, context->indexIO, nullptr, context->isStableBufSize ?
264 (Base::GetMaxBufSizeStable() * thisClass->maxTransferBufFactor) :
265 (Base::GetMaxBufSize() * thisClass->maxTransferBufFactor));
266 } else {
267 context->ioFinish = true;
268 }
269 } else if (req->fs_type == UV_FS_WRITE) { // write
270 #ifdef HDC_DEBUG
271 WRITE_LOG(LOG_DEBUG, "write file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
272 context->fileSize);
273 #endif // HDC_DEBUG
274 if (context->indexIO >= context->fileSize || req->result == 0) {
275 // The active end must first read it first, but you can't make Finish first, because Slave may not
276 // end.Only slave receives complete talents Finish
277 context->closeNotify = true;
278 context->ioFinish = true;
279 thisClass->SetFileTime(context);
280 }
281 } else {
282 context->ioFinish = true;
283 }
284 break;
285 }
286 if (context->ioFinish) {
287 // close-step1
288 ++thisClass->refCount;
289 if (req->fs_type == UV_FS_WRITE) {
290 uv_fs_fsync(thisClass->loopTask, &context->fsCloseReq, context->fsOpenReq.result, nullptr);
291 }
292 WRITE_LOG(LOG_DEBUG, "channelId:%u result:%d, closeReqSubmitted:%d",
293 thisClass->taskInfo->channelId, context->fsOpenReq.result, context->closeReqSubmitted);
294 if (context->lastErrno == 0 && !context->closeReqSubmitted) {
295 context->closeReqSubmitted = true;
296 WRITE_LOG(LOG_DEBUG, "OnFileIO fs_close, channelId:%u", thisClass->taskInfo->channelId);
297 uv_fs_close(thisClass->loopTask, &context->fsCloseReq, context->fsOpenReq.result, OnFileClose);
298 context->isFdOpen = false;
299 } else {
300 thisClass->WhenTransferFinish(context);
301 --thisClass->refCount;
302 }
303 }
304 #ifndef CONFIG_USE_JEMALLOC_DFX_INIF
305 thisClass->cirbuf.Free(bufIO - payloadPrefixReserve);
306 #else
307 delete [] (bufIO - payloadPrefixReserve);
308 #endif
309 --thisClass->refCount;
310 delete contextIO; // Req is part of the Contextio structure, no free release
311 }
312
OnFileOpen(uv_fs_t * req)313 void HdcTransferBase::OnFileOpen(uv_fs_t *req)
314 {
315 CtxFile *context = (CtxFile *)req->data;
316 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
317 StartTraceScope("HdcTransferBase::OnFileOpen");
318 uv_fs_req_cleanup(req);
319 WRITE_LOG(LOG_DEBUG, "Filemod openfile:%s channelId:%u result:%d",
320 context->localPath.c_str(), thisClass->taskInfo->channelId, context->fsOpenReq.result);
321 --thisClass->refCount;
322 if (req->result <= 0) {
323 constexpr int bufSize = 1024;
324 char buf[bufSize] = { 0 };
325 uv_strerror_r((int)req->result, buf, bufSize);
326 thisClass->LogMsg(MSG_FAIL, "Error opening file: %s, path:%s", buf,
327 context->localPath.c_str());
328 WRITE_LOG(LOG_FATAL, "open path:%s error:%s", context->localPath.c_str(), buf);
329 if (context->isDir && context->master) {
330 uint8_t payload = 1;
331 thisClass->CommandDispatch(CMD_FILE_FINISH, &payload, 1);
332 } else if (context->isDir && !context->master) {
333 uint8_t payload = 1;
334 thisClass->SendToAnother(CMD_FILE_FINISH, &payload, 1);
335 } else {
336 thisClass->TaskFinish();
337 }
338 return;
339 }
340 thisClass->ResetCtx(context);
341 context->isFdOpen = true;
342 if (context->master) { // master just read, and slave just write.
343 // init master
344 uv_fs_t fs = {};
345 uv_fs_fstat(nullptr, &fs, context->fsOpenReq.result, nullptr);
346 WRITE_LOG(LOG_DEBUG, "uv_fs_fstat result:%d fileSize:%llu",
347 context->fsOpenReq.result, fs.statbuf.st_size);
348 TransferConfig &st = context->transferConfig;
349 st.fileSize = fs.statbuf.st_size;
350 st.optionalName = context->localName;
351 if (st.holdTimestamp) {
352 st.atime = fs.statbuf.st_atim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_atim.tv_nsec;
353 st.mtime = fs.statbuf.st_mtim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_mtim.tv_nsec;
354 }
355 st.path = context->remotePath;
356 // update ctxNow=context child value
357 context->fileSize = st.fileSize;
358 context->fileMode.perm = fs.statbuf.st_mode;
359 context->fileMode.uId = fs.statbuf.st_uid;
360 context->fileMode.gId = fs.statbuf.st_gid;
361 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
362 char *con = nullptr;
363 getfilecon(context->localPath.c_str(), &con);
364 if (con != nullptr) {
365 context->fileMode.context = con;
366 freecon(con);
367 }
368 #endif
369 uv_fs_req_cleanup(&fs);
370 thisClass->CheckMaster(context);
371 } else { // write
372 if (context->fileModeSync) {
373 FileMode &mode = context->fileMode;
374 uv_fs_t fs = {};
375 uv_fs_chmod(nullptr, &fs, context->localPath.c_str(), mode.perm, nullptr);
376 uv_fs_chown(nullptr, &fs, context->localPath.c_str(), mode.uId, mode.gId, nullptr);
377 uv_fs_req_cleanup(&fs);
378
379 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
380 if (!mode.context.empty()) {
381 WRITE_LOG(LOG_DEBUG, "setfilecon from master = %s", mode.context.c_str());
382 setfilecon(context->localPath.c_str(), mode.context.c_str());
383 }
384 #endif
385 }
386 union FeatureFlagsUnion f{};
387 if (!thisClass->AddFeatures(f)) {
388 WRITE_LOG(LOG_FATAL, "AddFeatureFlag failed");
389 thisClass->SendToAnother(thisClass->commandBegin, nullptr, 0);
390 } else {
391 thisClass->SendToAnother(thisClass->commandBegin, f.raw, sizeof(f));
392 }
393 }
394 }
395
MatchPackageExtendName(string fileName,string extName)396 bool HdcTransferBase::MatchPackageExtendName(string fileName, string extName)
397 {
398 bool match = false;
399 int subfixIndex = fileName.rfind(extName);
400 if ((fileName.size() - subfixIndex) != extName.size()) {
401 return false;
402 }
403 match = true;
404 return match;
405 }
406
407 // filter can be empty
GetSubFiles(const char * path,string filter,vector<string> * out)408 int HdcTransferBase::GetSubFiles(const char *path, string filter, vector<string> *out)
409 {
410 int retNum = 0;
411 uv_fs_t req = {};
412 uv_dirent_t dent;
413 vector<string> filterStrings;
414 if (!strlen(path)) {
415 return retNum;
416 }
417 if (filter.size()) {
418 Base::SplitString(filter, ";", filterStrings);
419 }
420
421 if (uv_fs_scandir(nullptr, &req, path, 0, nullptr) < 0) {
422 uv_fs_req_cleanup(&req);
423 return retNum;
424 }
425 while (uv_fs_scandir_next(&req, &dent) != UV_EOF) {
426 // Skip. File
427 if (strcmp(dent.name, ".") == 0 || strcmp(dent.name, "..") == 0) {
428 continue;
429 }
430 if (!(static_cast<uint32_t>(dent.type) & UV_DIRENT_FILE)) {
431 continue;
432 }
433 string fileName = dent.name;
434 for (auto &&s : filterStrings) {
435 int subfixIndex = fileName.rfind(s);
436 if ((fileName.size() - subfixIndex) != s.size())
437 continue;
438 string fullPath = string(path) + Base::GetPathSep();
439 fullPath += fileName;
440 out->push_back(fullPath);
441 ++retNum;
442 }
443 }
444 uv_fs_req_cleanup(&req);
445 return retNum;
446 }
447
448
GetSubFilesRecursively(string path,string currentDirname,vector<string> * out)449 int HdcTransferBase::GetSubFilesRecursively(string path, string currentDirname, vector<string> *out)
450 {
451 int retNum = 0;
452 uv_fs_t req = {};
453 uv_dirent_t dent;
454
455 WRITE_LOG(LOG_DEBUG, "GetSubFiles path = %s currentDirname = %s", path.c_str(), currentDirname.c_str());
456
457 if (!path.size()) {
458 return retNum;
459 }
460
461 if (uv_fs_scandir(nullptr, &req, path.c_str(), 0, nullptr) < 0) {
462 uv_fs_req_cleanup(&req);
463 return retNum;
464 }
465
466 uv_fs_t fs = {};
467 int ret = uv_fs_stat(nullptr, &fs, path.c_str(), nullptr);
468 if (ret == 0) {
469 FileMode mode;
470 mode.fullName = currentDirname;
471 mode.perm = fs.statbuf.st_mode;
472 mode.uId = fs.statbuf.st_uid;
473 mode.gId = fs.statbuf.st_gid;
474
475 #if (!(defined(HOST_MINGW)||defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
476 char *con = nullptr;
477 getfilecon(path.c_str(), &con);
478 if (con != nullptr) {
479 mode.context = con;
480 freecon(con);
481 }
482 #endif
483 ctxNow.dirMode.push_back(mode);
484 }
485 while (uv_fs_scandir_next(&req, &dent) != UV_EOF) {
486 // Skip. File
487 if (strcmp(dent.name, ".") == 0 || strcmp(dent.name, "..") == 0) {
488 continue;
489 }
490 if (!(static_cast<uint32_t>(dent.type) & UV_DIRENT_FILE)) {
491 WRITE_LOG(LOG_DEBUG, "subdir dent.name fileName = %s", dent.name);
492 GetSubFilesRecursively(path + Base::GetPathSep() + dent.name,
493 currentDirname + Base::GetPathSep() + dent.name, out);
494 continue;
495 }
496 string fileName = dent.name;
497 WRITE_LOG(LOG_DEBUG, "GetSubFiles fileName = %s", fileName.c_str());
498
499 out->push_back(currentDirname + Base::GetPathSep() + fileName);
500 }
501 uv_fs_req_cleanup(&req);
502 return retNum;
503 }
504
505
CheckLocalPath(string & localPath,string & optName,string & errStr)506 bool HdcTransferBase::CheckLocalPath(string &localPath, string &optName, string &errStr)
507 {
508 // If optName show this is directory mode, check localPath and try create each layer
509 WRITE_LOG(LOG_DEBUG, "CheckDirectory localPath = %s optName = %s", localPath.c_str(), optName.c_str());
510 if ((optName.find('/') == string::npos) && (optName.find('\\') == string::npos)) {
511 WRITE_LOG(LOG_DEBUG, "Not directory mode optName = %s, return", optName.c_str());
512 return true;
513 }
514 ctxNow.isDir = true;
515 uv_fs_t req;
516 int r = uv_fs_lstat(nullptr, &req, localPath.c_str(), nullptr);
517 mode_t mode = req.statbuf.st_mode;
518 uv_fs_req_cleanup(&req);
519
520 if (r) {
521 vector<string> dirsOflocalPath;
522 string split(1, Base::GetPathSep());
523 Base::SplitString(localPath, split, dirsOflocalPath);
524
525 WRITE_LOG(LOG_DEBUG, "localPath = %s dir layers = %zu", localPath.c_str(), dirsOflocalPath.size());
526 string makedirPath;
527
528 if (!Base::IsAbsolutePath(localPath)) {
529 makedirPath = ".";
530 }
531
532 for (auto dir : dirsOflocalPath) {
533 WRITE_LOG(LOG_DEBUG, "CheckLocalPath create dir = %s", dir.c_str());
534
535 if (dir == ".") {
536 continue;
537 } else {
538 #ifdef _WIN32
539 if (dir.find(":") == 1) {
540 makedirPath = dir;
541 continue;
542 }
543 #endif
544 makedirPath = makedirPath + Base::GetPathSep() + dir;
545 if (!Base::TryCreateDirectory(makedirPath, errStr)) {
546 return false;
547 }
548 }
549 }
550 // set flag to remove first layer directory of filename from master
551 ctxNow.targetDirNotExist = true;
552 } else if (!(mode & S_IFDIR)) {
553 WRITE_LOG(LOG_WARN, "Not a directory, path:%s", localPath.c_str());
554 errStr = "Not a directory, path:";
555 errStr += localPath.c_str();
556 return false;
557 }
558 return true;
559 }
560
CheckFilename(string & localPath,string & optName,string & errStr)561 bool HdcTransferBase::CheckFilename(string &localPath, string &optName, string &errStr)
562 {
563 string localPathBackup = localPath;
564 if (ctxNow.targetDirNotExist) {
565 // If target directory not exist, the first layer directory from master should remove
566 if (optName.find('/') != string::npos) {
567 optName = optName.substr(optName.find('/') + 1);
568 } else if (optName.find('\\') != string::npos) {
569 optName = optName.substr(optName.find('\\') + 1);
570 }
571 }
572 vector<string> dirsOfOptName;
573
574 if (optName.find('/') != string::npos) {
575 Base::SplitString(optName, "/", dirsOfOptName);
576 } else if (optName.find('\\') != string::npos) {
577 Base::SplitString(optName, "\\", dirsOfOptName);
578 } else {
579 WRITE_LOG(LOG_DEBUG, "No need create dir for file = %s", optName.c_str());
580 return true;
581 }
582
583 // If filename still include dir, try create each layer
584 optName = dirsOfOptName.back();
585 dirsOfOptName.pop_back();
586
587 for (auto s : dirsOfOptName) {
588 // Add each layer directory to localPath
589 localPath = localPath + Base::GetPathSep() + s;
590 if (!Base::TryCreateDirectory(localPath, errStr)) {
591 return false;
592 }
593 if (ctxNow.fileModeSync) {
594 string resolvedPath = Base::CanonicalizeSpecPath(localPath);
595 auto pos = resolvedPath.find(localPathBackup);
596 if (pos == 0) {
597 string shortPath = resolvedPath.substr(localPathBackup.size());
598 if (shortPath.at(0) == Base::GetPathSep()) {
599 shortPath = shortPath.substr(1);
600 }
601 WRITE_LOG(LOG_DEBUG, "pos = %zu, shortPath = %s", pos, shortPath.c_str());
602
603 // set mode
604 auto it = ctxNow.dirModeMap.find(shortPath);
605 if (it != ctxNow.dirModeMap.end()) {
606 auto mode = it->second;
607 uv_fs_t fs = {};
608 uv_fs_chmod(nullptr, &fs, localPath.c_str(), mode.perm, nullptr);
609 uv_fs_chown(nullptr, &fs, localPath.c_str(), mode.uId, mode.gId, nullptr);
610 uv_fs_req_cleanup(&fs);
611 #if (!(defined(HOST_MINGW) || defined(HOST_MAC))) && defined(SURPPORT_SELINUX)
612 if (!mode.context.empty()) {
613 WRITE_LOG(LOG_DEBUG, "setfilecon from master = %s", mode.context.c_str());
614 setfilecon(localPath.c_str(), mode.context.c_str());
615 }
616 #endif
617 }
618 }
619 }
620 }
621
622 WRITE_LOG(LOG_DEBUG, "CheckFilename finish localPath:%s optName:%s", localPath.c_str(), optName.c_str());
623 return true;
624 }
625
626 // https://en.cppreference.com/w/cpp/filesystem/is_directory
627 // return true if file exist, false if file not exist
SmartSlavePath(string & cwd,string & localPath,const char * optName)628 bool HdcTransferBase::SmartSlavePath(string &cwd, string &localPath, const char *optName)
629 {
630 string errStr;
631 if (taskInfo->serverOrDaemon) {
632 // slave and server
633 ExtractRelativePath(cwd, localPath);
634 }
635 mode_t mode = mode_t(~S_IFMT);
636 if (Base::CheckDirectoryOrPath(localPath.c_str(), true, false, errStr, mode)) {
637 WRITE_LOG(LOG_DEBUG, "%s", errStr.c_str());
638 return true;
639 }
640
641 uv_fs_t req;
642 int r = uv_fs_lstat(nullptr, &req, localPath.c_str(), nullptr);
643 uv_fs_req_cleanup(&req);
644 if (r == 0 && (req.statbuf.st_mode & S_IFDIR)) { // is dir
645 localPath = localPath + Base::GetPathSep() + optName;
646 }
647 if (r != 0 && (localPath.back() == Base::GetPathSep())) { // not exist and is dir
648 localPath = localPath + optName;
649 }
650 return false;
651 }
652
RecvIOPayload(CtxFile * context,uint8_t * data,int dataSize)653 bool HdcTransferBase::RecvIOPayload(CtxFile *context, uint8_t *data, int dataSize)
654 {
655 if (dataSize < static_cast<int>(payloadPrefixReserve)) {
656 WRITE_LOG(LOG_WARN, "unable to parse TransferPayload: invalid dataSize %d", dataSize);
657 return false;
658 }
659 uint8_t *clearBuf = nullptr;
660 string serialString(reinterpret_cast<char *>(data), payloadPrefixReserve);
661 TransferPayload pld;
662 Base::ZeroStruct(pld);
663 bool ret = false;
664 SerialStruct::ParseFromString(pld, serialString);
665 int clearSize = 0;
666 StartTraceScope("HdcTransferBase::RecvIOPayload");
667 if (pld.compressSize > static_cast<uint32_t>(dataSize) || pld.uncompressSize > MAX_SIZE_IOBUF) {
668 WRITE_LOG(LOG_FATAL, "compress size is greater than the dataSize. pld.compressSize = %d", pld.compressSize);
669 return false;
670 }
671 if (pld.compressSize > 0) {
672 switch (pld.compressType) {
673 #ifdef HARMONY_PROJECT
674 case COMPRESS_LZ4: {
675 clearBuf = new uint8_t[pld.uncompressSize]();
676 if (!clearBuf) {
677 WRITE_LOG(LOG_FATAL, "alloc LZ4 buffer failed");
678 return false;
679 }
680 clearSize = LZ4_decompress_safe((const char *)data + payloadPrefixReserve, (char *)clearBuf,
681 pld.compressSize, pld.uncompressSize);
682 break;
683 }
684 #endif
685 default: { // COMPRESS_NONE
686 clearBuf = data + payloadPrefixReserve;
687 clearSize = pld.compressSize;
688 break;
689 }
690 }
691 }
692 while (true) {
693 if (static_cast<uint32_t>(clearSize) != pld.uncompressSize || dataSize - payloadPrefixReserve < clearSize) {
694 WRITE_LOG(LOG_WARN, "invalid data size for fileIO: %d", clearSize);
695 break;
696 }
697 if (SimpleFileIO(context, pld.index, clearBuf, clearSize) < 0) {
698 break;
699 }
700 ret = true;
701 break;
702 }
703 if (pld.compressSize > 0 && pld.compressType != COMPRESS_NONE) {
704 delete[] clearBuf;
705 }
706 return ret;
707 }
708
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)709 bool HdcTransferBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
710 {
711 StartTraceScope("HdcTransferBase::CommandDispatch");
712 bool ret = true;
713 while (true) {
714 if (command == commandBegin) {
715 CtxFile *context = &ctxNow;
716 if (!CheckFeatures(context, payload, payloadSize)) {
717 WRITE_LOG(LOG_FATAL, "CommandDispatch CheckFeatures command:%u", command);
718 ret = false;
719 break;
720 }
721 int ioRet = SimpleFileIO(context, context->indexIO, nullptr, (context->isStableBufSize) ?
722 Base::GetMaxBufSizeStable() * maxTransferBufFactor :
723 Base::GetMaxBufSize() * maxTransferBufFactor);
724 if (ioRet < 0) {
725 WRITE_LOG(LOG_FATAL, "CommandDispatch SimpleFileIO ioRet:%d", ioRet);
726 ret = false;
727 break;
728 }
729 context->transferBegin = Base::GetRuntimeMSec();
730 } else if (command == commandData) {
731 if (static_cast<uint32_t>(payloadSize) > HDC_BUF_MAX_BYTES || payloadSize < 0) {
732 WRITE_LOG(LOG_FATAL, "CommandDispatch payloadSize:%d", payloadSize);
733 ret = false;
734 break;
735 }
736 // Note, I will trigger FileIO after multiple times.
737 CtxFile *context = &ctxNow;
738 if (!RecvIOPayload(context, payload, payloadSize)) {
739 WRITE_LOG(LOG_DEBUG, "RecvIOPayload return false. channelId:%u lastErrno:%u result:%d",
740 taskInfo->channelId, ctxNow.lastErrno, ctxNow.fsOpenReq.result);
741 uv_fs_close(nullptr, &ctxNow.fsCloseReq, ctxNow.fsOpenReq.result, nullptr);
742 ctxNow.isFdOpen = false;
743 HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
744 thisClass->CommandDispatch(CMD_FILE_FINISH, payload, 1);
745 ret = false;
746 break;
747 }
748 } else {
749 // Other subclass commands
750 }
751 break;
752 }
753 return ret;
754 }
755
ExtractRelativePath(string & cwd,string & path)756 void HdcTransferBase::ExtractRelativePath(string &cwd, string &path)
757 {
758 bool absPath = Base::IsAbsolutePath(path);
759 if (!absPath) {
760 path = cwd + path;
761 }
762 }
763
AddFeatures(FeatureFlagsUnion & feature)764 bool HdcTransferBase::AddFeatures(FeatureFlagsUnion &feature)
765 {
766 feature.bits.hugeBuf = !isStableBuf;
767 return true;
768 }
769
CheckFeatures(CtxFile * context,uint8_t * payload,const int payloadSize)770 bool HdcTransferBase::CheckFeatures(CtxFile *context, uint8_t *payload, const int payloadSize)
771 {
772 if (payloadSize == FEATURE_FLAG_MAX_SIZE) {
773 union FeatureFlagsUnion feature{};
774 if (memcpy_s(&feature, sizeof(feature), payload, payloadSize) != EOK) {
775 WRITE_LOG(LOG_FATAL, "CheckFeatures memcpy_s failed");
776 return false;
777 }
778 WRITE_LOG(LOG_DEBUG, "isStableBuf:%d, hugeBuf:%d", isStableBuf, feature.bits.hugeBuf);
779 context->isStableBufSize = isStableBuf ? true : (!feature.bits.hugeBuf);
780 return true;
781 } else if (payloadSize == 0) {
782 WRITE_LOG(LOG_DEBUG, "FileBegin CheckFeatures payloadSize:%d, use default feature.", payloadSize);
783 context->isStableBufSize = true;
784 return true;
785 } else {
786 WRITE_LOG(LOG_FATAL, "CheckFeatures payloadSize:%d", payloadSize);
787 return false;
788 }
789 }
790 } // namespace Hdc
791