1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "copy/file_copy_listener.h"
17
18 #include <unistd.h>
19
20 #include "copy/file_size_utils.h"
21 #include "dfs_error.h"
22 #include "utils_log.h"
23
24 #undef LOG_DOMAIN
25 #undef LOG_TAG
26 #define LOG_DOMAIN 0xD001600
27 #define LOG_TAG "distributedfile_daemon"
28
29 namespace OHOS {
30 namespace Storage {
31 namespace DistributedFile {
32 using namespace FileManagement;
33 static constexpr int BUF_SIZE = 1024;
34 static constexpr std::chrono::milliseconds NOTIFY_PROGRESS_DELAY(100);
35
FileCopyLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)36 FileCopyLocalListener::FileCopyLocalListener(const std::string &srcPath,
37 bool isFile, const ProcessCallback &processCallback) : isFile_(isFile), processCallback_(processCallback)
38 {
39 if (processCallback_ == nullptr) {
40 return;
41 }
42
43 notifyFd_ = inotify_init();
44 if (notifyFd_ < 0) {
45 LOGE("Failed to init inotify, errno:%{public}d", errno);
46 return;
47 }
48 eventFd_ = eventfd(0, EFD_CLOEXEC);
49 if (eventFd_ < 0) {
50 LOGE("Failed to init eventFd, errno:%{public}d", errno);
51 return;
52 }
53
54 uint64_t fileSize = 0;
55 int32_t err;
56 if (!isFile_) {
57 err = FileSizeUtils::GetDirSize(srcPath, fileSize);
58 } else {
59 err = FileSizeUtils::GetFileSize(srcPath, fileSize);
60 }
61 if (err == E_OK) {
62 totalSize_ = fileSize;
63 }
64 notifyTime_ = std::chrono::steady_clock::now() + NOTIFY_PROGRESS_DELAY;
65 }
66
~FileCopyLocalListener()67 FileCopyLocalListener::~FileCopyLocalListener()
68 {
69 CloseNotifyFd();
70 }
71
GetLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)72 std::shared_ptr<FileCopyLocalListener> FileCopyLocalListener::GetLocalListener(const std::string &srcPath,
73 bool isFile, const ProcessCallback &processCallback)
74 {
75 auto listener = std::make_shared<FileCopyLocalListener>(srcPath, isFile, processCallback);
76 return listener;
77 }
78
StartListener()79 void FileCopyLocalListener::StartListener()
80 {
81 if (processCallback_ == nullptr || totalSize_ == 0) {
82 return;
83 }
84 notifyHandler_ = std::thread([this] {
85 GetNotifyEvent();
86 });
87 }
88
StopListener()89 void FileCopyLocalListener::StopListener()
90 {
91 if (processCallback_ != nullptr) {
92 processCallback_(progressSize_, totalSize_);
93 }
94 CloseNotifyFd();
95 if (notifyHandler_.joinable()) {
96 notifyHandler_.join();
97 }
98 }
99
AddFile(const std::string & fileName)100 void FileCopyLocalListener::AddFile(const std::string &fileName)
101 {
102 std::lock_guard<std::mutex> lock(filePathsMutex_);
103 filePaths_.insert(fileName);
104 }
105
AddListenerFile(const std::string & destPath,uint32_t mode)106 void FileCopyLocalListener::AddListenerFile(const std::string &destPath, uint32_t mode)
107 {
108 if (processCallback_ == nullptr || totalSize_ == 0) {
109 return;
110 }
111
112 std::lock_guard<std::mutex> lock(wdsMutex_);
113 int newWd = inotify_add_watch(notifyFd_, destPath.c_str(), mode);
114 if (newWd < 0) {
115 LOGE("inotify_add_watch, newWd is unvaild, newWd = %{public}d", newWd);
116 return;
117 }
118 std::shared_ptr<ReceiveInfo> receiveInfo = std::make_shared<ReceiveInfo>();
119 if (receiveInfo == nullptr) {
120 LOGE("Failed to request heap memory.");
121 return;
122 }
123 receiveInfo->path = destPath;
124 wds_.insert(std::make_pair(newWd, receiveInfo));
125 }
126
GetNotifyEvent()127 void FileCopyLocalListener::GetNotifyEvent()
128 {
129 prctl(PR_SET_NAME, "NotifyThread");
130 nfds_t nfds = 2;
131 struct pollfd fds[2];
132 fds[0].events = 0;
133 fds[1].events = POLLIN;
134 fds[0].fd = eventFd_;
135 fds[1].fd = notifyFd_;
136 while (errorCode_== E_OK && eventFd_ != -1 && notifyFd_ != -1) {
137 auto ret = poll(fds, nfds, -1);
138 if (ret > 0) {
139 if (static_cast<unsigned short>(fds[0].revents) & POLLNVAL) {
140 return;
141 }
142 if (static_cast<unsigned short>(fds[1].revents) & POLLIN) {
143 ReadNotifyEvent();
144 }
145 } else if (ret < 0 && errno == EINTR) {
146 continue;
147 } else {
148 LOGE("poll failed error : %{public}d", errno);
149 errorCode_ = errno;
150 return;
151 }
152 }
153 }
154
ReadNotifyEvent()155 void FileCopyLocalListener::ReadNotifyEvent()
156 {
157 char buf[BUF_SIZE] = { 0x00 };
158 struct inotify_event *event = nullptr;
159 int len = 0;
160 int64_t index = 0;
161 while (((len = read(notifyFd_, &buf, sizeof(buf))) < 0) && (errno == EINTR)) {}
162 while (index < len) {
163 event = reinterpret_cast<inotify_event *>(buf + index);
164 auto [needContinue, errCode, needSend] = HandleProgress(event);
165 if (!needContinue) {
166 errorCode_ = errCode;
167 return;
168 }
169 if (needContinue && !needSend) {
170 index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
171 continue;
172 }
173 if (progressSize_ == totalSize_) {
174 return;
175 }
176 auto currentTime = std::chrono::steady_clock::now();
177 if (currentTime >= notifyTime_ && processCallback_ != nullptr) {
178 processCallback_(progressSize_, totalSize_);
179 notifyTime_ = currentTime + NOTIFY_PROGRESS_DELAY;
180 }
181 index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
182 }
183 }
184
HandleProgress(inotify_event * event)185 std::tuple<bool, int, bool> FileCopyLocalListener::HandleProgress(inotify_event *event)
186 {
187 std::shared_ptr<ReceiveInfo> receivedInfo;
188 {
189 std::lock_guard<std::mutex> lock(wdsMutex_);
190 auto iter = wds_.find(event->wd);
191 if (iter == wds_.end()) {
192 return { true, -1, false };
193 }
194 receivedInfo = iter->second;
195 }
196 std::string fileName = receivedInfo->path;
197 if (!isFile_) { // files under subdir
198 fileName += "/" + std::string(event->name);
199 if (!CheckFileValid(fileName)) {
200 return { true, -1, false };
201 }
202 auto err = UpdateProgressSize(fileName, receivedInfo);
203 if (err != E_OK) {
204 return { false, err, false };
205 }
206 } else { // file
207 uint64_t fileSize = 0;
208 auto err = FileSizeUtils::GetFileSize(fileName, fileSize);
209 if (err != E_OK) {
210 return { false, err, false };
211 }
212 progressSize_ = fileSize;
213 }
214 return { true, E_OK, true };
215 }
216
CheckFileValid(const std::string & filePath)217 bool FileCopyLocalListener::CheckFileValid(const std::string &filePath)
218 {
219 std::lock_guard<std::mutex> lock(filePathsMutex_);
220 return filePaths_.count(filePath) != 0;
221 }
222
CloseNotifyFd()223 void FileCopyLocalListener::CloseNotifyFd()
224 {
225 if (eventFd_ != -1) {
226 close(eventFd_);
227 eventFd_ = -1;
228 }
229 if (notifyFd_ == -1) {
230 return;
231 }
232
233 std::lock_guard<std::mutex> lock(wdsMutex_);
234 for (auto item : wds_) {
235 inotify_rm_watch(notifyFd_, item.first);
236 }
237 close(notifyFd_);
238 notifyFd_ = -1;
239 }
240
UpdateProgressSize(const std::string & filePath,std::shared_ptr<ReceiveInfo> receivedInfo)241 int FileCopyLocalListener::UpdateProgressSize(const std::string &filePath, std::shared_ptr<ReceiveInfo> receivedInfo)
242 {
243 uint64_t fileSize = 0;
244 auto err = FileSizeUtils::GetFileSize(filePath, fileSize);
245 if (err != E_OK) {
246 LOGE("GetFileSize failed, err: %{public}d.", err);
247 return err;
248 }
249 auto size = fileSize;
250 auto iter = receivedInfo->fileList.find(filePath);
251 if (iter == receivedInfo->fileList.end()) { // new file
252 receivedInfo->fileList.insert({ filePath, size });
253 progressSize_ += size;
254 } else { // old file
255 if (size > iter->second) {
256 progressSize_ += (size - iter->second);
257 iter->second = size;
258 }
259 }
260 return E_OK;
261 }
262 } // namespace ModuleFileIO
263 } // namespace FileManagement
264 } // namespace OHOS
265