• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "copy/file_copy_listener.h"
17 
18 #include <unistd.h>
19 
20 #include "copy/file_size_utils.h"
21 #include "dfs_error.h"
22 #include "utils_log.h"
23 
24 #undef LOG_DOMAIN
25 #undef LOG_TAG
26 #define LOG_DOMAIN 0xD001600
27 #define LOG_TAG "distributedfile_daemon"
28 
29 namespace OHOS {
30 namespace Storage {
31 namespace DistributedFile {
32 using namespace FileManagement;
33 static constexpr int BUF_SIZE = 1024;
34 static constexpr std::chrono::milliseconds NOTIFY_PROGRESS_DELAY(100);
35 
FileCopyLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)36 FileCopyLocalListener::FileCopyLocalListener(const std::string &srcPath,
37     bool isFile, const ProcessCallback &processCallback) : isFile_(isFile), processCallback_(processCallback)
38 {
39     if (processCallback_ == nullptr) {
40         return;
41     }
42 
43     notifyFd_ = inotify_init();
44     if (notifyFd_ < 0) {
45         LOGE("Failed to init inotify, errno:%{public}d", errno);
46         return;
47     }
48     eventFd_ = eventfd(0, EFD_CLOEXEC);
49     if (eventFd_ < 0) {
50         LOGE("Failed to init eventFd, errno:%{public}d", errno);
51         return;
52     }
53 
54     uint64_t fileSize = 0;
55     int32_t err;
56     if (!isFile_) {
57         err = FileSizeUtils::GetDirSize(srcPath, fileSize);
58     } else {
59         err = FileSizeUtils::GetFileSize(srcPath, fileSize);
60     }
61     if (err == E_OK) {
62         totalSize_ = fileSize;
63     }
64     notifyTime_ = std::chrono::steady_clock::now() + NOTIFY_PROGRESS_DELAY;
65 }
66 
~FileCopyLocalListener()67 FileCopyLocalListener::~FileCopyLocalListener()
68 {
69     CloseNotifyFd();
70 }
71 
GetLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)72 std::shared_ptr<FileCopyLocalListener> FileCopyLocalListener::GetLocalListener(const std::string &srcPath,
73     bool isFile, const ProcessCallback &processCallback)
74 {
75     auto listener = std::make_shared<FileCopyLocalListener>(srcPath, isFile, processCallback);
76     return listener;
77 }
78 
StartListener()79 void FileCopyLocalListener::StartListener()
80 {
81     if (processCallback_ == nullptr || totalSize_ == 0) {
82         return;
83     }
84     notifyHandler_ = std::thread([this] {
85         GetNotifyEvent();
86     });
87 }
88 
StopListener()89 void FileCopyLocalListener::StopListener()
90 {
91     if (processCallback_ != nullptr) {
92         processCallback_(progressSize_, totalSize_);
93     }
94     CloseNotifyFd();
95     if (notifyHandler_.joinable()) {
96         notifyHandler_.join();
97     }
98 }
99 
AddFile(const std::string & fileName)100 void FileCopyLocalListener::AddFile(const std::string &fileName)
101 {
102     std::lock_guard<std::mutex> lock(filePathsMutex_);
103     filePaths_.insert(fileName);
104 }
105 
AddListenerFile(const std::string & destPath,uint32_t mode)106 void FileCopyLocalListener::AddListenerFile(const std::string &destPath, uint32_t mode)
107 {
108     if (processCallback_ == nullptr || totalSize_ == 0) {
109         return;
110     }
111 
112     std::lock_guard<std::mutex> lock(wdsMutex_);
113     int newWd = inotify_add_watch(notifyFd_, destPath.c_str(), mode);
114     if (newWd < 0) {
115         LOGE("inotify_add_watch, newWd is unvaild, newWd = %{public}d", newWd);
116         return;
117     }
118     std::shared_ptr<ReceiveInfo> receiveInfo = std::make_shared<ReceiveInfo>();
119     if (receiveInfo == nullptr) {
120         LOGE("Failed to request heap memory.");
121         return;
122     }
123     receiveInfo->path = destPath;
124     wds_.insert(std::make_pair(newWd, receiveInfo));
125 }
126 
GetNotifyEvent()127 void FileCopyLocalListener::GetNotifyEvent()
128 {
129     prctl(PR_SET_NAME, "NotifyThread");
130     nfds_t nfds = 2;
131     struct pollfd fds[2];
132     fds[0].events = 0;
133     fds[1].events = POLLIN;
134     fds[0].fd = eventFd_;
135     fds[1].fd = notifyFd_;
136     while (errorCode_== E_OK && eventFd_ != -1 && notifyFd_ != -1) {
137         auto ret = poll(fds, nfds, -1);
138         if (ret > 0) {
139             if (static_cast<unsigned short>(fds[0].revents) & POLLNVAL) {
140                 return;
141             }
142             if (static_cast<unsigned short>(fds[1].revents) & POLLIN) {
143                 ReadNotifyEvent();
144             }
145         } else if (ret < 0 && errno == EINTR) {
146             continue;
147         } else {
148             LOGE("poll failed error : %{public}d", errno);
149             errorCode_ = errno;
150             return;
151         }
152     }
153 }
154 
ReadNotifyEvent()155 void FileCopyLocalListener::ReadNotifyEvent()
156 {
157     char buf[BUF_SIZE] = { 0x00 };
158     struct inotify_event *event = nullptr;
159     int len = 0;
160     int64_t index = 0;
161     while (((len = read(notifyFd_, &buf, sizeof(buf))) < 0) && (errno == EINTR)) {}
162     while (index < len) {
163         event = reinterpret_cast<inotify_event *>(buf + index);
164         auto [needContinue, errCode, needSend] = HandleProgress(event);
165         if (!needContinue) {
166             errorCode_ = errCode;
167             return;
168         }
169         if (needContinue && !needSend) {
170             index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
171             continue;
172         }
173         if (progressSize_ == totalSize_) {
174             return;
175         }
176         auto currentTime = std::chrono::steady_clock::now();
177         if (currentTime >= notifyTime_ && processCallback_ != nullptr) {
178             processCallback_(progressSize_, totalSize_);
179             notifyTime_ = currentTime + NOTIFY_PROGRESS_DELAY;
180         }
181         index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
182     }
183 }
184 
HandleProgress(inotify_event * event)185 std::tuple<bool, int, bool> FileCopyLocalListener::HandleProgress(inotify_event *event)
186 {
187     std::shared_ptr<ReceiveInfo> receivedInfo;
188     {
189         std::lock_guard<std::mutex> lock(wdsMutex_);
190         auto iter = wds_.find(event->wd);
191         if (iter == wds_.end()) {
192             return { true, -1, false };
193         }
194         receivedInfo = iter->second;
195     }
196     std::string fileName = receivedInfo->path;
197     if (!isFile_) { // files under subdir
198         fileName += "/" + std::string(event->name);
199         if (!CheckFileValid(fileName)) {
200             return { true, -1, false };
201         }
202         auto err = UpdateProgressSize(fileName, receivedInfo);
203         if (err != E_OK) {
204             return { false, err, false };
205         }
206     } else { // file
207         uint64_t fileSize = 0;
208         auto err = FileSizeUtils::GetFileSize(fileName, fileSize);
209         if (err != E_OK) {
210             return { false, err, false };
211         }
212         progressSize_ = fileSize;
213     }
214     return { true, E_OK, true };
215 }
216 
CheckFileValid(const std::string & filePath)217 bool FileCopyLocalListener::CheckFileValid(const std::string &filePath)
218 {
219     std::lock_guard<std::mutex> lock(filePathsMutex_);
220     return filePaths_.count(filePath) != 0;
221 }
222 
CloseNotifyFd()223 void FileCopyLocalListener::CloseNotifyFd()
224 {
225     if (eventFd_ != -1) {
226         close(eventFd_);
227         eventFd_ = -1;
228     }
229     if (notifyFd_ == -1) {
230         return;
231     }
232 
233     std::lock_guard<std::mutex> lock(wdsMutex_);
234     for (auto item : wds_) {
235         inotify_rm_watch(notifyFd_, item.first);
236     }
237     close(notifyFd_);
238     notifyFd_ = -1;
239 }
240 
UpdateProgressSize(const std::string & filePath,std::shared_ptr<ReceiveInfo> receivedInfo)241 int FileCopyLocalListener::UpdateProgressSize(const std::string &filePath, std::shared_ptr<ReceiveInfo> receivedInfo)
242 {
243     uint64_t fileSize = 0;
244     auto err = FileSizeUtils::GetFileSize(filePath, fileSize);
245     if (err != E_OK) {
246         LOGE("GetFileSize failed, err: %{public}d.", err);
247         return err;
248     }
249     auto size = fileSize;
250     auto iter = receivedInfo->fileList.find(filePath);
251     if (iter == receivedInfo->fileList.end()) { // new file
252         receivedInfo->fileList.insert({ filePath, size });
253         progressSize_ += size;
254     } else { // old file
255         if (size > iter->second) {
256             progressSize_ += (size - iter->second);
257             iter->second = size;
258         }
259     }
260     return E_OK;
261 }
262 } // namespace ModuleFileIO
263 } // namespace FileManagement
264 } // namespace OHOS
265