1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ObjectAssetLoader"
16
17 #include "object_asset_loader.h"
18 #include "block_data.h"
19 #include "cloud_sync_asset_manager.h"
20 #include "log_print.h"
21 #include "object_common.h"
22 #include "utils/anonymous.h"
23 #include "object_radar_reporter.h"
24 #include "distributed_file_daemon_manager.h"
25 namespace OHOS::DistributedObject {
26 using namespace OHOS::FileManagement::CloudSync;
27 using namespace OHOS::DistributedData;
GetInstance()28 ObjectAssetLoader &ObjectAssetLoader::GetInstance()
29 {
30 static ObjectAssetLoader loader;
31 return loader;
32 }
33
SetThreadPool(std::shared_ptr<ExecutorPool> executors)34 void ObjectAssetLoader::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
35 {
36 executors_ = executors;
37 }
38
Transfer(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const DistributedData::Asset & asset)39 bool ObjectAssetLoader::Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId,
40 const DistributedData::Asset& asset)
41 {
42 AssetInfo assetInfo;
43 assetInfo.uri = asset.uri;
44 assetInfo.assetName = asset.name;
45 ZLOGI("Start transfer, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
46 Anonymous::Change(deviceId).c_str(), Anonymous::Change(assetInfo.assetName).c_str());
47 auto block = std::make_shared<BlockData<std::tuple<bool, int32_t>>>(WAIT_TIME, std::tuple{ true, OBJECT_SUCCESS });
48 auto res = CloudSyncAssetManager::GetInstance().DownloadFile(userId, bundleName, deviceId, assetInfo,
49 [block](const std::string& uri, int32_t status) {
50 block->SetValue({ false, status });
51 });
52 if (res != OBJECT_SUCCESS) {
53 ZLOGE("fail, res: %{public}d, name: %{public}s, deviceId: %{public}s, bundleName: %{public}s", res,
54 Anonymous::Change(asset.name).c_str(), Anonymous::Change(deviceId).c_str(), bundleName.c_str());
55 return false;
56 }
57 auto [timeout, status] = block->GetValue();
58 if (timeout || status != OBJECT_SUCCESS) {
59 ZLOGE("fail, timeout: %{public}d, status: %{public}d, name: %{public}s, deviceId: %{public}s ", timeout,
60 status, Anonymous::Change(asset.name).c_str(), Anonymous::Change(deviceId).c_str());
61 return false;
62 }
63 ZLOGD("Transfer end, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
64 Anonymous::Change(deviceId).c_str(), Anonymous::Change(assetInfo.assetName).c_str());
65 return true;
66 }
67
TransferAssetsAsync(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const std::vector<DistributedData::Asset> & assets,const TransferFunc & callback)68 void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::string& bundleName,
69 const std::string& deviceId, const std::vector<DistributedData::Asset>& assets, const TransferFunc& callback)
70 {
71 if (executors_ == nullptr) {
72 ZLOGE("executors is null, bundleName: %{public}s, deviceId: %{public}s, userId: %{public}d",
73 bundleName.c_str(), Anonymous::Change(deviceId).c_str(), userId);
74 callback(false);
75 return;
76 }
77 TransferTask task = { .callback = callback };
78 DistributedData::Assets downloadAssets;
79 for (auto& asset : assets) {
80 if (IsDownloaded(asset)) {
81 continue;
82 }
83 task.downloadAssets.insert(asset.uri);
84 downloadAssets.emplace_back(asset);
85 }
86 if (task.downloadAssets.empty()) {
87 callback(true);
88 }
89 tasks_.ComputeIfAbsent(++taskSeq_, [task](const uint32_t key) {
90 return task;
91 });
92 executors_->Execute([this, userId, bundleName, deviceId, downloadAssets]() {
93 bool result = true;
94 for (const auto& asset : downloadAssets) {
95 if (IsDownloaded(asset)) {
96 FinishTask(asset.uri, result);
97 continue;
98 }
99 if (IsDownloading(asset)) {
100 continue;
101 }
102 downloading_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
103 return asset.hash;
104 });
105 auto success = Transfer(userId, bundleName, deviceId, asset);
106 if (success) {
107 std::lock_guard<std::mutex> lock(mutex_);
108 downloading_.Erase(asset.uri);
109 UpdateDownloaded(asset);
110 } else {
111 downloading_.Erase(asset.uri);
112 }
113 result &= success;
114 FinishTask(asset.uri, result);
115 }
116 });
117 }
118
FinishTask(const std::string & uri,bool result)119 void ObjectAssetLoader::FinishTask(const std::string& uri, bool result)
120 {
121 std::vector<uint32_t> finishedTasks;
122 tasks_.ForEach([&uri, &finishedTasks, result](auto& seq, auto& task) {
123 task.downloadAssets.erase(uri);
124 if (task.downloadAssets.size() == 0 && task.callback != nullptr) {
125 task.callback(result);
126 finishedTasks.emplace_back(seq);
127 }
128 return false;
129 });
130 for (auto taskId : finishedTasks) {
131 tasks_.Erase(taskId);
132 }
133 }
134
UpdateDownloaded(const DistributedData::Asset & asset)135 void ObjectAssetLoader::UpdateDownloaded(const DistributedData::Asset& asset)
136 {
137 downloaded_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
138 return asset.hash;
139 });
140 assetQueue_.push(asset.uri);
141 if (assetQueue_.size() > LAST_DOWNLOAD_ASSET_SIZE) {
142 auto oldAsset = assetQueue_.front();
143 assetQueue_.pop();
144 downloaded_.Erase(oldAsset);
145 }
146 }
147
IsDownloading(const DistributedData::Asset & asset)148 bool ObjectAssetLoader::IsDownloading(const DistributedData::Asset& asset)
149 {
150 auto [success, hash] = downloading_.Find(asset.uri);
151 if (success && hash == asset.hash) {
152 ZLOGD("asset is downloading. assetName:%{public}s", Anonymous::Change(asset.name).c_str());
153 return true;
154 }
155 return false;
156 }
157
IsDownloaded(const DistributedData::Asset & asset)158 bool ObjectAssetLoader::IsDownloaded(const DistributedData::Asset& asset)
159 {
160 auto [success, hash] = downloaded_.Find(asset.uri);
161 if (success && hash == asset.hash) {
162 ZLOGD("asset is downloaded. assetName:%{public}s", Anonymous::Change(asset.name).c_str());
163 return true;
164 }
165 return false;
166 }
167
PushAsset(int32_t userId,const sptr<AssetObj> & assetObj,const sptr<ObjectAssetsSendListener> & sendCallback)168 int32_t ObjectAssetLoader::PushAsset(int32_t userId, const sptr<AssetObj> &assetObj,
169 const sptr<ObjectAssetsSendListener> &sendCallback)
170 {
171 if (assetObj == nullptr) {
172 ZLOGE("PushAsset err, assetObj is null");
173 return OBJECT_INNER_ERROR;
174 }
175 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
176 ObjectStore::PUSH_ASSETS, ObjectStore::IDLE);
177 ZLOGI("PushAsset start, userId:%{public}d, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
178 userId, assetObj->uris_.size(), assetObj->dstBundleName_.c_str(),
179 Anonymous::Change(assetObj->sessionId_).c_str());
180 auto status = Storage::DistributedFile::DistributedFileDaemonManager::GetInstance().PushAsset(userId, assetObj,
181 sendCallback);
182 if (status != OBJECT_SUCCESS) {
183 ZLOGE("PushAsset err status: %{public}d, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
184 status, assetObj->uris_.size(), assetObj->dstBundleName_.c_str(),
185 Anonymous::Change(assetObj->sessionId_).c_str());
186 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
187 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, status, ObjectStore::FINISHED);
188 }
189 return status;
190 }
191
OnSendResult(const sptr<AssetObj> & assetObj,int32_t result)192 int32_t ObjectAssetsSendListener::OnSendResult(const sptr<AssetObj> &assetObj, int32_t result)
193 {
194 if (assetObj == nullptr) {
195 ZLOGE("OnSendResult error! status:%{public}d", result);
196 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
197 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
198 return result;
199 }
200 ZLOGI("OnSendResult, status:%{public}d, asset size:%{public}zu", result, assetObj->uris_.size());
201 if (result == OBJECT_SUCCESS) {
202 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
203 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_SUCCESS);
204 } else {
205 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
206 ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
207 }
208 return result;
209 }
210 } // namespace OHOS::DistributedObject