• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "copy/file_copy_listener.h"
17 
18 #include <cinttypes>
19 #include <unistd.h>
20 
21 #include "copy/file_size_utils.h"
22 #include "dfs_error.h"
23 #include "utils_log.h"
24 
25 #undef LOG_DOMAIN
26 #undef LOG_TAG
27 #define LOG_DOMAIN 0xD004315
28 #define LOG_TAG "distributedfile_daemon"
29 
30 namespace OHOS {
31 namespace Storage {
32 namespace DistributedFile {
33 using namespace FileManagement;
34 static constexpr int BUF_SIZE = 1024;
35 static constexpr std::chrono::milliseconds NOTIFY_PROGRESS_DELAY(100);
36 static constexpr int SLEEP_TIME_US = 100000;
37 
FileCopyLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)38 FileCopyLocalListener::FileCopyLocalListener(const std::string &srcPath,
39     bool isFile, const ProcessCallback &processCallback) : isFile_(isFile), processCallback_(processCallback)
40 {
41     if (processCallback_ == nullptr) {
42         LOGI("processCallback is nullptr");
43         return;
44     }
45 
46     notifyFd_ = inotify_init();
47     if (notifyFd_ < 0) {
48         LOGE("Failed to init inotify, errno:%{public}d", errno);
49         return;
50     }
51     eventFd_ = eventfd(0, EFD_CLOEXEC);
52     if (eventFd_ < 0) {
53         LOGE("Failed to init eventFd, errno:%{public}d", errno);
54         return;
55     }
56 }
57 
~FileCopyLocalListener()58 FileCopyLocalListener::~FileCopyLocalListener()
59 {
60     CloseNotifyFdLocked();
61     if (notifyHandler_.joinable()) {
62         notifyHandler_.join();
63     }
64 }
65 
GetLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)66 std::shared_ptr<FileCopyLocalListener> FileCopyLocalListener::GetLocalListener(const std::string &srcPath,
67     bool isFile, const ProcessCallback &processCallback)
68 {
69     auto listener = std::make_shared<FileCopyLocalListener>(srcPath, isFile, processCallback);
70     return listener;
71 }
72 
StartListener()73 void FileCopyLocalListener::StartListener()
74 {
75     std::lock_guard<std::mutex> lock(processMutex_);
76     if (processCallback_ == nullptr || totalSize_ == 0) {
77         LOGI("processCallback is nullptr or totalSize is zero, totalSize = %{public}" PRId64 "B", totalSize_);
78         return;
79     }
80     if (notifyHandler_.joinable()) {
81         LOGE("notifyHandler has join");
82         return;
83     }
84     notifyHandler_ = std::thread([this] {
85         LOGI("StartListener.");
86         GetNotifyEvent();
87     });
88 }
89 
StopListener()90 void FileCopyLocalListener::StopListener()
91 {
92     std::lock_guard<std::mutex> lock(processMutex_);
93     LOGI("StopListener start.");
94     if (processCallback_ == nullptr) {
95         LOGI("processCallback is nullptr");
96         return;
97     }
98     processCallback_(progressSize_, totalSize_);
99     CloseNotifyFdLocked();
100     notifyRun_.store(false);
101     {
102         std::unique_lock<std::mutex> cvLock(cvLock_);
103         notifyCv_.notify_one();
104     }
105     if (notifyHandler_.joinable()) {
106         notifyHandler_.join();
107     }
108 }
109 
AddFile(const std::string & fileName)110 void FileCopyLocalListener::AddFile(const std::string &fileName)
111 {
112     std::lock_guard<std::mutex> lock(filePathsMutex_);
113     filePaths_.insert(fileName);
114 }
115 
AddListenerFile(const std::string & srcPath,const std::string & destPath,uint32_t mode)116 int32_t FileCopyLocalListener::AddListenerFile(const std::string &srcPath, const std::string &destPath, uint32_t mode)
117 {
118     LOGI("AddListenerFile start");
119     if (processCallback_ == nullptr) {
120         LOGI("processCallback is nullptr");
121         return E_OK;
122     }
123     std::lock_guard<std::mutex> lock(wdsMutex_);
124     int newWd = inotify_add_watch(notifyFd_, destPath.c_str(), mode);
125     if (newWd < 0) {
126         LOGE("inotify_add_watch, newWd is unvaild, newWd = %{public}d, errno = %{public}d", newWd, errno);
127         return errno;
128     }
129     std::shared_ptr<ReceiveInfo> receiveInfo = std::make_shared<ReceiveInfo>();
130     if (receiveInfo == nullptr) {
131         LOGE("Failed to request heap memory.");
132         return ENOMEM;
133     }
134     receiveInfo->path = destPath;
135     wds_.insert(std::make_pair(newWd, receiveInfo));
136     uint64_t fileSize = 0;
137     int32_t err;
138     if (!isFile_) {
139         err = FileSizeUtils::GetDirSize(srcPath, fileSize);
140     } else {
141         err = FileSizeUtils::GetFileSize(srcPath, fileSize);
142     }
143     if (err != E_OK) {
144         LOGE("Failed to get src size, isFile=%{public}d, err=%{public}d", isFile_, err);
145         return err;
146     }
147     totalSize_ = fileSize;
148     notifyTime_ = std::chrono::steady_clock::now() + NOTIFY_PROGRESS_DELAY;
149     LOGI("AddListenerFile end");
150     return E_OK;
151 }
152 
GetNotifyEvent()153 void FileCopyLocalListener::GetNotifyEvent()
154 {
155     prctl(PR_SET_NAME, "NotifyThread");
156     nfds_t nfds = 2;
157     struct pollfd fds[2];
158     fds[0].events = 0;
159     fds[1].events = POLLIN;
160     fds[0].fd = eventFd_;
161     fds[1].fd = notifyFd_;
162     while (notifyRun_.load() && errorCode_== E_OK && eventFd_ != -1 && notifyFd_ != -1) {
163         auto ret = poll(fds, nfds, -1);
164         if (ret > 0) {
165             if (static_cast<unsigned short>(fds[0].revents) & POLLNVAL) {
166                 notifyRun_.store(false);
167                 return;
168             }
169             if (static_cast<unsigned short>(fds[1].revents) & POLLIN) {
170                 ReadNotifyEventLocked();
171             }
172         } else if (ret < 0 && errno == EINTR) {
173             continue;
174         } else {
175             LOGE("poll failed error : %{public}d", errno);
176             errorCode_ = errno;
177             return;
178         }
179         {
180             std::unique_lock<std::mutex> cvLock(cvLock_);
181             notifyCv_.wait_for(cvLock, std::chrono::microseconds(SLEEP_TIME_US), [this]() -> bool {
182                 return notifyFd_ == -1 || !notifyRun_.load();
183             });
184         }
185     }
186 }
187 
ReadNotifyEventLocked()188 void FileCopyLocalListener::ReadNotifyEventLocked()
189 {
190     {
191         std::lock_guard<std::mutex> lock(readMutex_);
192         if (readClosed_) {
193             LOGE("read after close");
194             return;
195         }
196         readFlag_ = true;
197     }
198     ReadNotifyEvent();
199     {
200         std::lock_guard<std::mutex> lock(readMutex_);
201         readFlag_ = false;
202         if (readClosed_) {
203             LOGE("close after read");
204             CloseNotifyFd();
205             return;
206         }
207     }
208 }
209 
ReadNotifyEvent()210 void FileCopyLocalListener::ReadNotifyEvent()
211 {
212     char buf[BUF_SIZE] = { 0x00 };
213     struct inotify_event *event = nullptr;
214     int len = 0;
215     int64_t index = 0;
216     while (((len = read(notifyFd_, &buf, sizeof(buf))) < 0) && (errno == EINTR)) {}
217     while (notifyRun_.load() && index < len) {
218         event = reinterpret_cast<inotify_event *>(buf + index);
219         auto [needContinue, errCode, needSend] = HandleProgress(event);
220         if (!needContinue) {
221             errorCode_ = errCode;
222             return;
223         }
224         if (needContinue && !needSend) {
225             index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
226             continue;
227         }
228         if (progressSize_ == totalSize_) {
229             notifyRun_.store(false);
230             return;
231         }
232         auto currentTime = std::chrono::steady_clock::now();
233         if (currentTime >= notifyTime_ && processCallback_ != nullptr) {
234             processCallback_(progressSize_, totalSize_);
235             notifyTime_ = currentTime + NOTIFY_PROGRESS_DELAY;
236         }
237         index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
238     }
239 }
240 
HandleProgress(inotify_event * event)241 std::tuple<bool, int, bool> FileCopyLocalListener::HandleProgress(inotify_event *event)
242 {
243     std::shared_ptr<ReceiveInfo> receivedInfo;
244     {
245         std::lock_guard<std::mutex> lock(wdsMutex_);
246         auto iter = wds_.find(event->wd);
247         if (iter == wds_.end()) {
248             return { true, -1, false };
249         }
250         receivedInfo = iter->second;
251     }
252     std::string fileName = receivedInfo->path;
253     if (!isFile_) { // files under subdir
254         fileName += "/" + std::string(event->name);
255         if (!CheckFileValid(fileName)) {
256             return { true, -1, false };
257         }
258         auto err = UpdateProgressSize(fileName, receivedInfo);
259         if (err != E_OK) {
260             return { false, err, false };
261         }
262     } else { // file
263         uint64_t fileSize = 0;
264         auto err = FileSizeUtils::GetFileSize(fileName, fileSize);
265         if (err != E_OK) {
266             return { false, err, false };
267         }
268         progressSize_ = fileSize;
269     }
270     return { true, E_OK, true };
271 }
272 
CheckFileValid(const std::string & filePath)273 bool FileCopyLocalListener::CheckFileValid(const std::string &filePath)
274 {
275     std::lock_guard<std::mutex> lock(filePathsMutex_);
276     return filePaths_.count(filePath) != 0;
277 }
278 
CloseNotifyFdLocked()279 void FileCopyLocalListener::CloseNotifyFdLocked()
280 {
281     std::lock_guard<std::mutex> lock(readMutex_);
282     readClosed_ = true;
283     if (readFlag_) {
284         LOGE("close while reading");
285         return;
286     }
287     CloseNotifyFd();
288 }
289 
CloseNotifyFd()290 void FileCopyLocalListener::CloseNotifyFd()
291 {
292     readClosed_ = false;
293     std::unique_lock<std::mutex> cvLock(cvLock_);
294     if (eventFd_ != -1) {
295         close(eventFd_);
296         eventFd_ = -1;
297     }
298     if (notifyFd_ == -1) {
299         return;
300     }
301 
302     std::lock_guard<std::mutex> wdsLock(wdsMutex_);
303     for (auto item : wds_) {
304         inotify_rm_watch(notifyFd_, item.first);
305     }
306     close(notifyFd_);
307     notifyFd_ = -1;
308     notifyCv_.notify_one();
309 }
310 
UpdateProgressSize(const std::string & filePath,std::shared_ptr<ReceiveInfo> receivedInfo)311 int FileCopyLocalListener::UpdateProgressSize(const std::string &filePath, std::shared_ptr<ReceiveInfo> receivedInfo)
312 {
313     uint64_t fileSize = 0;
314     auto err = FileSizeUtils::GetFileSize(filePath, fileSize);
315     if (err != E_OK) {
316         LOGE("GetFileSize failed, err: %{public}d.", err);
317         return err;
318     }
319     auto size = fileSize;
320     auto iter = receivedInfo->fileList.find(filePath);
321     if (iter == receivedInfo->fileList.end()) { // new file
322         receivedInfo->fileList.insert({ filePath, size });
323         progressSize_ += size;
324     } else { // old file
325         if (size > iter->second) {
326             progressSize_ += (size - iter->second);
327             iter->second = size;
328         }
329     }
330     return E_OK;
331 }
332 } // namespace ModuleFileIO
333 } // namespace FileManagement
334 } // namespace OHOS
335