• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "transfer.h"
16 #include "serial_struct.h"
17 #include <sys/stat.h>
18 #ifdef HARMONY_PROJECT
19 #include <lz4.h>
20 #endif
21 
22 namespace Hdc {
23 constexpr uint64_t HDC_TIME_CONVERT_BASE = 1000000000;
24 
HdcTransferBase(HTaskInfo hTaskInfo)25 HdcTransferBase::HdcTransferBase(HTaskInfo hTaskInfo)
26     : HdcTaskBase(hTaskInfo)
27 {
28     ResetCtx(&ctxNow, true);
29     commandBegin = 0;
30     commandData = 0;
31 }
32 
~HdcTransferBase()33 HdcTransferBase::~HdcTransferBase()
34 {
35     WRITE_LOG(LOG_DEBUG, "~HdcTransferBase");
36 };
37 
ResetCtx(CtxFile * context,bool full)38 bool HdcTransferBase::ResetCtx(CtxFile *context, bool full)
39 {
40     if (full) {
41         *context = {};
42         context->fsOpenReq.data = context;
43         context->fsCloseReq.data = context;
44         context->thisClass = this;
45         context->loop = loopTask;
46         context->cb = OnFileIO;
47     }
48     context->closeNotify = false;
49     context->indexIO = 0;
50     context->lastErrno = 0;
51     context->ioFinish = false;
52     return true;
53 }
54 
SimpleFileIO(CtxFile * context,uint64_t index,uint8_t * sendBuf,int bytes)55 int HdcTransferBase::SimpleFileIO(CtxFile *context, uint64_t index, uint8_t *sendBuf, int bytes)
56 {
57     // The first 8 bytes file offset
58     uint8_t *buf = new uint8_t[bytes]();
59     CtxFileIO *ioContext = new CtxFileIO();
60     bool ret = false;
61     while (true) {
62         if (!buf || !ioContext || bytes < 0) {
63             WRITE_LOG(LOG_DEBUG, "SimpleFileIO param check failed");
64             break;
65         }
66         if (context->ioFinish) {
67             WRITE_LOG(LOG_DEBUG, "SimpleFileIO to closed IOStream");
68             break;
69         }
70         uv_fs_t *req = &ioContext->fs;
71         ioContext->bufIO = buf;
72         ioContext->context = context;
73         req->data = ioContext;
74         ++refCount;
75         if (context->master) {  // master just read, and slave just write.when master/read, sendBuf can be nullptr
76             uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(buf), bytes);
77             uv_fs_read(context->loop, req, context->fsOpenReq.result, &iov, 1, index, context->cb);
78         } else {
79             // The US_FS_WRITE here must be brought into the actual file offset, which cannot be incorporated with local
80             // accumulated index because UV_FS_WRITE will be executed multiple times and then trigger a callback.
81             if (bytes > 0 && memcpy_s(ioContext->bufIO, bytes, sendBuf, bytes) != EOK) {
82                 WRITE_LOG(LOG_WARN, "SimpleFileIO memcpy error");
83                 break;
84             }
85             uv_buf_t iov = uv_buf_init(reinterpret_cast<char *>(ioContext->bufIO), bytes);
86             uv_fs_write(context->loop, req, context->fsOpenReq.result, &iov, 1, index, context->cb);
87         }
88         ret = true;
89         break;
90     }
91     if (!ret) {
92         if (buf != nullptr) {
93             delete[] buf;
94             buf = nullptr;
95         }
96         if (ioContext != nullptr) {
97             delete ioContext;
98             ioContext = nullptr;
99         }
100         return -1;
101     }
102     return bytes;
103 }
104 
OnFileClose(uv_fs_t * req)105 void HdcTransferBase::OnFileClose(uv_fs_t *req)
106 {
107     uv_fs_req_cleanup(req);
108     CtxFile *context = (CtxFile *)req->data;
109     HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
110     if (context->closeNotify) {
111         // close-step2
112         // maybe successful finish or failed finish
113         thisClass->WhenTransferFinish(context);
114     }
115     --thisClass->refCount;
116     return;
117 }
118 
SetFileTime(CtxFile * context)119 void HdcTransferBase::SetFileTime(CtxFile *context)
120 {
121     if (!context->transferConfig.holdTimestamp) {
122         return;
123     }
124     if (!context->transferConfig.mtime) {
125         return;
126     }
127     uv_fs_t fs;
128     double aTimeSec = static_cast<long double>(context->transferConfig.atime) / HDC_TIME_CONVERT_BASE;
129     double mTimeSec = static_cast<long double>(context->transferConfig.mtime) / HDC_TIME_CONVERT_BASE;
130     uv_fs_futime(nullptr, &fs, context->fsOpenReq.result, aTimeSec, mTimeSec, nullptr);
131     uv_fs_req_cleanup(&fs);
132 }
133 
SendIOPayload(CtxFile * context,int index,uint8_t * data,int dataSize)134 bool HdcTransferBase::SendIOPayload(CtxFile *context, int index, uint8_t *data, int dataSize)
135 {
136     TransferPayload payloadHead;
137     string head;
138     int compressSize = 0;
139     int sendBufSize = payloadPrefixReserve + dataSize;
140     uint8_t *sendBuf = new uint8_t[sendBufSize]();
141     if (!sendBuf) {
142         return false;
143     }
144     payloadHead.compressType = context->transferConfig.compressType;
145     payloadHead.uncompressSize = dataSize;
146     payloadHead.index = index;
147     if (dataSize > 0) {
148         switch (payloadHead.compressType) {
149 #ifdef HARMONY_PROJECT
150             case COMPRESS_LZ4: {
151                 compressSize = LZ4_compress_default((const char *)data, (char *)sendBuf + payloadPrefixReserve,
152                                                     dataSize, dataSize);
153                 break;
154             }
155 #endif
156             default: {  // COMPRESS_NONE
157                 if (memcpy_s(sendBuf + payloadPrefixReserve, sendBufSize - payloadPrefixReserve, data, dataSize)
158                     != EOK) {
159                     delete[] sendBuf;
160                     return false;
161                 }
162                 compressSize = dataSize;
163                 break;
164             }
165         }
166     }
167     payloadHead.compressSize = compressSize;
168     head = SerialStruct::SerializeToString(payloadHead);
169     if (head.size() + 1 > payloadPrefixReserve) {
170         delete[] sendBuf;
171         return false;
172     }
173     int errCode = memcpy_s(sendBuf, sendBufSize, head.c_str(), head.size() + 1);
174     if (errCode != EOK) {
175         delete[] sendBuf;
176         return false;
177     }
178     bool ret = SendToAnother(commandData, sendBuf, payloadPrefixReserve + compressSize) > 0;
179     delete[] sendBuf;
180     return ret;
181 }
182 
OnFileIO(uv_fs_t * req)183 void HdcTransferBase::OnFileIO(uv_fs_t *req)
184 {
185     CtxFileIO *contextIO = (CtxFileIO *)req->data;
186     CtxFile *context = (CtxFile *)contextIO->context;
187     HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
188     uint8_t *bufIO = contextIO->bufIO;
189     uv_fs_req_cleanup(req);
190     while (true) {
191         if (context->ioFinish) {
192             break;
193         }
194         if (req->result < 0) {
195             constexpr int bufSize = 1024;
196             char buf[bufSize] = { 0 };
197             uv_strerror_r((int)req->result, buf, bufSize);
198             WRITE_LOG(LOG_DEBUG, "OnFileIO error: %s", buf);
199             context->closeNotify = true;
200             context->lastErrno = abs(req->result);
201             context->ioFinish = true;
202             break;
203         }
204         context->indexIO += req->result;
205         if (req->fs_type == UV_FS_READ) {
206 #ifdef HDC_DEBUG
207             WRITE_LOG(LOG_DEBUG, "read file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
208                       context->fileSize);
209 #endif // HDC_DEBUG
210             if (!thisClass->SendIOPayload(context, context->indexIO - req->result, bufIO, req->result)) {
211                 context->ioFinish = true;
212                 break;
213             }
214             if (context->indexIO < context->fileSize) {
215                 thisClass->SimpleFileIO(context, context->indexIO, nullptr,
216                                         Base::GetMaxBufSize() * thisClass->maxTransferBufFactor);
217             } else {
218                 context->ioFinish = true;
219             }
220         } else if (req->fs_type == UV_FS_WRITE) {  // write
221 #ifdef HDC_DEBUG
222             WRITE_LOG(LOG_DEBUG, "write file data %" PRIu64 "/%" PRIu64 "", context->indexIO,
223                       context->fileSize);
224 #endif // HDC_DEBUG
225             if (context->indexIO >= context->fileSize) {
226                 // The active end must first read it first, but you can't make Finish first, because Slave may not
227                 // end.Only slave receives complete talents Finish
228                 context->closeNotify = true;
229                 context->ioFinish = true;
230                 thisClass->SetFileTime(context);
231             }
232         } else {
233             context->ioFinish = true;
234         }
235         break;
236     }
237     if (context->ioFinish) {
238         // close-step1
239         ++thisClass->refCount;
240         if (req->fs_type == UV_FS_WRITE) {
241             uv_fs_fsync(thisClass->loopTask, &context->fsCloseReq, context->fsOpenReq.result, nullptr);
242         }
243         uv_fs_close(thisClass->loopTask, &context->fsCloseReq, context->fsOpenReq.result, OnFileClose);
244     }
245     --thisClass->refCount;
246     delete[] bufIO;
247     delete contextIO;  // Req is part of the Contextio structure, no free release
248 }
249 
OnFileOpen(uv_fs_t * req)250 void HdcTransferBase::OnFileOpen(uv_fs_t *req)
251 {
252     CtxFile *context = (CtxFile *)req->data;
253     HdcTransferBase *thisClass = (HdcTransferBase *)context->thisClass;
254     uv_fs_req_cleanup(req);
255     WRITE_LOG(LOG_DEBUG, "Filemod openfile:%s", context->localPath.c_str());
256     --thisClass->refCount;
257     if (req->result < 0) {
258         constexpr int bufSize = 1024;
259         char buf[bufSize] = { 0 };
260         uv_strerror_r((int)req->result, buf, bufSize);
261         thisClass->LogMsg(MSG_FAIL, "Error opening file: %s, path:%s", buf,
262                           context->localPath.c_str());
263         thisClass->TaskFinish();
264         return;
265     }
266     thisClass->ResetCtx(context);
267     if (context->master) {
268         // init master
269         uv_fs_t fs = {};
270         uv_fs_fstat(nullptr, &fs, context->fsOpenReq.result, nullptr);
271         TransferConfig &st = context->transferConfig;
272         st.fileSize = fs.statbuf.st_size;
273         st.optionalName = context->localName;
274         if (st.holdTimestamp) {
275             st.atime = fs.statbuf.st_atim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_atim.tv_nsec;
276             st.mtime = fs.statbuf.st_mtim.tv_sec * HDC_TIME_CONVERT_BASE + fs.statbuf.st_mtim.tv_nsec;
277         }
278         st.path = context->remotePath;
279         // update ctxNow=context child value
280         context->fileSize = st.fileSize;
281 
282         uv_fs_req_cleanup(&fs);
283         thisClass->CheckMaster(context);
284     } else {  // write
285         thisClass->SendToAnother(thisClass->commandBegin, nullptr, 0);
286     }
287 }
288 
MatchPackageExtendName(string fileName,string extName)289 bool HdcTransferBase::MatchPackageExtendName(string fileName, string extName)
290 {
291     bool match = false;
292     int subfixIndex = fileName.rfind(extName);
293     if ((fileName.size() - subfixIndex) != extName.size()) {
294         return false;
295     }
296     match = true;
297     return match;
298 }
299 
300 // filter can be empty
GetSubFiles(const char * path,string filter,vector<string> * out)301 int HdcTransferBase::GetSubFiles(const char *path, string filter, vector<string> *out)
302 {
303     int retNum = 0;
304     uv_fs_t req = {};
305     uv_dirent_t dent;
306     vector<string> filterStrings;
307     if (!strlen(path)) {
308         return retNum;
309     }
310     if (filter.size()) {
311         Base::SplitString(filter, ";", filterStrings);
312     }
313 
314     if (uv_fs_scandir(nullptr, &req, path, 0, nullptr) < 0) {
315         uv_fs_req_cleanup(&req);
316         return retNum;
317     }
318     while (uv_fs_scandir_next(&req, &dent) != UV_EOF) {
319         // Skip. File
320         if (strcmp(dent.name, ".") == 0 || strcmp(dent.name, "..") == 0)
321             continue;
322         if (!(static_cast<uint32_t>(dent.type) & UV_DIRENT_FILE))
323             continue;
324         string fileName = dent.name;
325         for (auto &&s : filterStrings) {
326             int subfixIndex = fileName.rfind(s);
327             if ((fileName.size() - subfixIndex) != s.size())
328                 continue;
329             string fullPath = string(path) + "/";
330             fullPath += fileName;
331             out->push_back(fullPath);
332             ++retNum;
333         }
334     }
335     uv_fs_req_cleanup(&req);
336     return retNum;
337 }
338 
339 // https://en.cppreference.com/w/cpp/filesystem/is_directory
340 // return true if file exist, false if file not exist
SmartSlavePath(string & cwd,string & localPath,const char * optName)341 bool HdcTransferBase::SmartSlavePath(string &cwd, string &localPath, const char *optName)
342 {
343     string errStr;
344     if (taskInfo->serverOrDaemon) {
345         // slave and server
346         ExtractRelativePath(cwd, localPath);
347     }
348     if (Base::CheckDirectoryOrPath(localPath.c_str(), true, false, errStr)) {
349         WRITE_LOG(LOG_INFO, "%s", errStr.c_str());
350         return true;
351     }
352     uv_fs_t req;
353     int r = uv_fs_lstat(nullptr, &req, localPath.c_str(), nullptr);
354     uv_fs_req_cleanup(&req);
355     if (r == 0 && req.statbuf.st_mode & S_IFDIR) {  // is dir
356         localPath = Base::StringFormat("%s%c%s", localPath.c_str(), Base::GetPathSep(), optName);
357     }
358     return false;
359 }
360 
RecvIOPayload(CtxFile * context,uint8_t * data,int dataSize)361 bool HdcTransferBase::RecvIOPayload(CtxFile *context, uint8_t *data, int dataSize)
362 {
363     uint8_t *clearBuf = nullptr;
364     string serialStrring((char *)data, payloadPrefixReserve);
365     TransferPayload pld;
366     bool ret = false;
367     SerialStruct::ParseFromString(pld, serialStrring);
368     clearBuf = new uint8_t[pld.uncompressSize]();
369     if (!clearBuf) {
370         return false;
371     }
372     int clearSize = 0;
373     if (pld.compressSize > 0) {
374         switch (pld.compressType) {
375 #ifdef HARMONY_PROJECT
376             case COMPRESS_LZ4: {
377                 clearSize = LZ4_decompress_safe((const char *)data + payloadPrefixReserve, (char *)clearBuf,
378                                                 pld.compressSize, pld.uncompressSize);
379                 break;
380             }
381 #endif
382             default: {  // COMPRESS_NONE
383                 if (memcpy_s(clearBuf, pld.uncompressSize, data + payloadPrefixReserve, pld.compressSize) != EOK) {
384                     delete[] clearBuf;
385                     return false;
386                 }
387                 clearSize = pld.compressSize;
388                 break;
389             }
390         }
391     }
392     while (true) {
393         if ((uint32_t)clearSize != pld.uncompressSize) {
394             break;
395         }
396         if (SimpleFileIO(context, pld.index, clearBuf, clearSize) < 0) {
397             break;
398         }
399         ret = true;
400         break;
401     }
402     delete[] clearBuf;
403     return ret;
404 }
405 
CommandDispatch(const uint16_t command,uint8_t * payload,const int payloadSize)406 bool HdcTransferBase::CommandDispatch(const uint16_t command, uint8_t *payload, const int payloadSize)
407 {
408     bool ret = true;
409     while (true) {
410         if (command == commandBegin) {
411             CtxFile *context = &ctxNow;
412             int ioRet = SimpleFileIO(context, context->indexIO, nullptr, Base::GetMaxBufSize() * maxTransferBufFactor);
413             if (ioRet < 0) {
414                 ret = false;
415                 break;
416             }
417             context->transferBegin = Base::GetRuntimeMSec();
418         } else if (command == commandData) {
419             if ((uint32_t)payloadSize > HDC_BUF_MAX_BYTES || payloadSize < 0) {
420                 ret = false;
421                 break;
422             }
423             // Note, I will trigger FileIO after multiple times.
424             CtxFile *context = &ctxNow;
425             if (!RecvIOPayload(context, payload, payloadSize)) {
426                 ret = false;
427                 break;
428             }
429         } else {
430             // Other subclass commands
431         }
432         break;
433     }
434     return ret;
435 }
436 
ExtractRelativePath(string & cwd,string & path)437 void HdcTransferBase::ExtractRelativePath(string &cwd, string &path)
438 {
439     bool absPath = Base::IsAbsolutePath(path);
440     if (!absPath) {
441         path = cwd + path;
442     }
443 }
444 }  // namespace Hdc
445