1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "copy/file_copy_listener.h"
17
18 #include <cinttypes>
19 #include <unistd.h>
20
21 #include "copy/file_size_utils.h"
22 #include "dfs_error.h"
23 #include "utils_log.h"
24
25 #undef LOG_DOMAIN
26 #undef LOG_TAG
27 #define LOG_DOMAIN 0xD004315
28 #define LOG_TAG "distributedfile_daemon"
29
30 namespace OHOS {
31 namespace Storage {
32 namespace DistributedFile {
33 using namespace FileManagement;
34 static constexpr int BUF_SIZE = 1024;
35 static constexpr std::chrono::milliseconds NOTIFY_PROGRESS_DELAY(100);
36 static constexpr int SLEEP_TIME_US = 100000;
37
FileCopyLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)38 FileCopyLocalListener::FileCopyLocalListener(const std::string &srcPath,
39 bool isFile, const ProcessCallback &processCallback) : isFile_(isFile), processCallback_(processCallback)
40 {
41 if (processCallback_ == nullptr) {
42 LOGI("processCallback is nullptr");
43 return;
44 }
45
46 notifyFd_ = inotify_init();
47 if (notifyFd_ < 0) {
48 LOGE("Failed to init inotify, errno:%{public}d", errno);
49 return;
50 }
51 eventFd_ = eventfd(0, EFD_CLOEXEC);
52 if (eventFd_ < 0) {
53 LOGE("Failed to init eventFd, errno:%{public}d", errno);
54 return;
55 }
56 }
57
~FileCopyLocalListener()58 FileCopyLocalListener::~FileCopyLocalListener()
59 {
60 CloseNotifyFdLocked();
61 if (notifyHandler_.joinable()) {
62 notifyHandler_.join();
63 }
64 }
65
GetLocalListener(const std::string & srcPath,bool isFile,const ProcessCallback & processCallback)66 std::shared_ptr<FileCopyLocalListener> FileCopyLocalListener::GetLocalListener(const std::string &srcPath,
67 bool isFile, const ProcessCallback &processCallback)
68 {
69 auto listener = std::make_shared<FileCopyLocalListener>(srcPath, isFile, processCallback);
70 return listener;
71 }
72
StartListener()73 void FileCopyLocalListener::StartListener()
74 {
75 std::lock_guard<std::mutex> lock(processMutex_);
76 if (processCallback_ == nullptr || totalSize_ == 0) {
77 LOGI("processCallback is nullptr or totalSize is zero, totalSize = %{public}" PRId64 "B", totalSize_);
78 return;
79 }
80 if (notifyHandler_.joinable()) {
81 LOGE("notifyHandler has join");
82 return;
83 }
84 notifyHandler_ = std::thread([this] {
85 LOGI("StartListener.");
86 GetNotifyEvent();
87 });
88 }
89
StopListener()90 void FileCopyLocalListener::StopListener()
91 {
92 std::lock_guard<std::mutex> lock(processMutex_);
93 LOGI("StopListener start.");
94 if (processCallback_ == nullptr) {
95 LOGI("processCallback is nullptr");
96 return;
97 }
98 processCallback_(progressSize_, totalSize_);
99 CloseNotifyFdLocked();
100 notifyRun_.store(false);
101 {
102 std::unique_lock<std::mutex> cvLock(cvLock_);
103 notifyCv_.notify_one();
104 }
105 if (notifyHandler_.joinable()) {
106 notifyHandler_.join();
107 }
108 }
109
AddFile(const std::string & fileName)110 void FileCopyLocalListener::AddFile(const std::string &fileName)
111 {
112 std::lock_guard<std::mutex> lock(filePathsMutex_);
113 filePaths_.insert(fileName);
114 }
115
AddListenerFile(const std::string & srcPath,const std::string & destPath,uint32_t mode)116 int32_t FileCopyLocalListener::AddListenerFile(const std::string &srcPath, const std::string &destPath, uint32_t mode)
117 {
118 LOGI("AddListenerFile start");
119 if (processCallback_ == nullptr) {
120 LOGI("processCallback is nullptr");
121 return E_OK;
122 }
123 std::lock_guard<std::mutex> lock(wdsMutex_);
124 int newWd = inotify_add_watch(notifyFd_, destPath.c_str(), mode);
125 if (newWd < 0) {
126 LOGE("inotify_add_watch, newWd is unvaild, newWd = %{public}d, errno = %{public}d", newWd, errno);
127 return errno;
128 }
129 std::shared_ptr<ReceiveInfo> receiveInfo = std::make_shared<ReceiveInfo>();
130 if (receiveInfo == nullptr) {
131 LOGE("Failed to request heap memory.");
132 return ENOMEM;
133 }
134 receiveInfo->path = destPath;
135 wds_.insert(std::make_pair(newWd, receiveInfo));
136 uint64_t fileSize = 0;
137 int32_t err;
138 if (!isFile_) {
139 err = FileSizeUtils::GetDirSize(srcPath, fileSize);
140 } else {
141 err = FileSizeUtils::GetFileSize(srcPath, fileSize);
142 }
143 if (err != E_OK) {
144 LOGE("Failed to get src size, isFile=%{public}d, err=%{public}d", isFile_, err);
145 return err;
146 }
147 totalSize_ = fileSize;
148 notifyTime_ = std::chrono::steady_clock::now() + NOTIFY_PROGRESS_DELAY;
149 LOGI("AddListenerFile end");
150 return E_OK;
151 }
152
GetNotifyEvent()153 void FileCopyLocalListener::GetNotifyEvent()
154 {
155 prctl(PR_SET_NAME, "NotifyThread");
156 nfds_t nfds = 2;
157 struct pollfd fds[2];
158 fds[0].events = 0;
159 fds[1].events = POLLIN;
160 fds[0].fd = eventFd_;
161 fds[1].fd = notifyFd_;
162 while (notifyRun_.load() && errorCode_== E_OK && eventFd_ != -1 && notifyFd_ != -1) {
163 auto ret = poll(fds, nfds, -1);
164 if (ret > 0) {
165 if (static_cast<unsigned short>(fds[0].revents) & POLLNVAL) {
166 notifyRun_.store(false);
167 return;
168 }
169 if (static_cast<unsigned short>(fds[1].revents) & POLLIN) {
170 ReadNotifyEventLocked();
171 }
172 } else if (ret < 0 && errno == EINTR) {
173 continue;
174 } else {
175 LOGE("poll failed error : %{public}d", errno);
176 errorCode_ = errno;
177 return;
178 }
179 {
180 std::unique_lock<std::mutex> cvLock(cvLock_);
181 notifyCv_.wait_for(cvLock, std::chrono::microseconds(SLEEP_TIME_US), [this]() -> bool {
182 return notifyFd_ == -1 || !notifyRun_.load();
183 });
184 }
185 }
186 }
187
ReadNotifyEventLocked()188 void FileCopyLocalListener::ReadNotifyEventLocked()
189 {
190 {
191 std::lock_guard<std::mutex> lock(readMutex_);
192 if (readClosed_) {
193 LOGE("read after close");
194 return;
195 }
196 readFlag_ = true;
197 }
198 ReadNotifyEvent();
199 {
200 std::lock_guard<std::mutex> lock(readMutex_);
201 readFlag_ = false;
202 if (readClosed_) {
203 LOGE("close after read");
204 CloseNotifyFd();
205 return;
206 }
207 }
208 }
209
ReadNotifyEvent()210 void FileCopyLocalListener::ReadNotifyEvent()
211 {
212 char buf[BUF_SIZE] = { 0x00 };
213 struct inotify_event *event = nullptr;
214 int len = 0;
215 int64_t index = 0;
216 while (((len = read(notifyFd_, &buf, sizeof(buf))) < 0) && (errno == EINTR)) {}
217 while (notifyRun_.load() && index < len) {
218 event = reinterpret_cast<inotify_event *>(buf + index);
219 auto [needContinue, errCode, needSend] = HandleProgress(event);
220 if (!needContinue) {
221 errorCode_ = errCode;
222 return;
223 }
224 if (needContinue && !needSend) {
225 index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
226 continue;
227 }
228 if (progressSize_ == totalSize_) {
229 notifyRun_.store(false);
230 return;
231 }
232 auto currentTime = std::chrono::steady_clock::now();
233 if (currentTime >= notifyTime_ && processCallback_ != nullptr) {
234 processCallback_(progressSize_, totalSize_);
235 notifyTime_ = currentTime + NOTIFY_PROGRESS_DELAY;
236 }
237 index += static_cast<int64_t>(sizeof(struct inotify_event) + event->len);
238 }
239 }
240
HandleProgress(inotify_event * event)241 std::tuple<bool, int, bool> FileCopyLocalListener::HandleProgress(inotify_event *event)
242 {
243 std::shared_ptr<ReceiveInfo> receivedInfo;
244 {
245 std::lock_guard<std::mutex> lock(wdsMutex_);
246 auto iter = wds_.find(event->wd);
247 if (iter == wds_.end()) {
248 return { true, -1, false };
249 }
250 receivedInfo = iter->second;
251 }
252 std::string fileName = receivedInfo->path;
253 if (!isFile_) { // files under subdir
254 fileName += "/" + std::string(event->name);
255 if (!CheckFileValid(fileName)) {
256 return { true, -1, false };
257 }
258 auto err = UpdateProgressSize(fileName, receivedInfo);
259 if (err != E_OK) {
260 return { false, err, false };
261 }
262 } else { // file
263 uint64_t fileSize = 0;
264 auto err = FileSizeUtils::GetFileSize(fileName, fileSize);
265 if (err != E_OK) {
266 return { false, err, false };
267 }
268 progressSize_ = fileSize;
269 }
270 return { true, E_OK, true };
271 }
272
CheckFileValid(const std::string & filePath)273 bool FileCopyLocalListener::CheckFileValid(const std::string &filePath)
274 {
275 std::lock_guard<std::mutex> lock(filePathsMutex_);
276 return filePaths_.count(filePath) != 0;
277 }
278
CloseNotifyFdLocked()279 void FileCopyLocalListener::CloseNotifyFdLocked()
280 {
281 std::lock_guard<std::mutex> lock(readMutex_);
282 readClosed_ = true;
283 if (readFlag_) {
284 LOGE("close while reading");
285 return;
286 }
287 CloseNotifyFd();
288 }
289
CloseNotifyFd()290 void FileCopyLocalListener::CloseNotifyFd()
291 {
292 readClosed_ = false;
293 std::unique_lock<std::mutex> cvLock(cvLock_);
294 if (eventFd_ != -1) {
295 close(eventFd_);
296 eventFd_ = -1;
297 }
298 if (notifyFd_ == -1) {
299 return;
300 }
301
302 std::lock_guard<std::mutex> wdsLock(wdsMutex_);
303 for (auto item : wds_) {
304 inotify_rm_watch(notifyFd_, item.first);
305 }
306 close(notifyFd_);
307 notifyFd_ = -1;
308 notifyCv_.notify_one();
309 }
310
UpdateProgressSize(const std::string & filePath,std::shared_ptr<ReceiveInfo> receivedInfo)311 int FileCopyLocalListener::UpdateProgressSize(const std::string &filePath, std::shared_ptr<ReceiveInfo> receivedInfo)
312 {
313 uint64_t fileSize = 0;
314 auto err = FileSizeUtils::GetFileSize(filePath, fileSize);
315 if (err != E_OK) {
316 LOGE("GetFileSize failed, err: %{public}d.", err);
317 return err;
318 }
319 auto size = fileSize;
320 auto iter = receivedInfo->fileList.find(filePath);
321 if (iter == receivedInfo->fileList.end()) { // new file
322 receivedInfo->fileList.insert({ filePath, size });
323 progressSize_ += size;
324 } else { // old file
325 if (size > iter->second) {
326 progressSize_ += (size - iter->second);
327 iter->second = size;
328 }
329 }
330 return E_OK;
331 }
332 } // namespace ModuleFileIO
333 } // namespace FileManagement
334 } // namespace OHOS
335