• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ObjectAssetLoader"
16 
17 #include "object_asset_loader.h"
18 #include "block_data.h"
19 #include "cloud_sync_asset_manager.h"
20 #include "log_print.h"
21 #include "object_common.h"
22 #include "utils/anonymous.h"
23 #include "object_radar_reporter.h"
24 #include "distributed_file_daemon_manager.h"
25 namespace OHOS::DistributedObject {
26 using namespace OHOS::FileManagement::CloudSync;
27 using namespace OHOS::DistributedData;
GetInstance()28 ObjectAssetLoader &ObjectAssetLoader::GetInstance()
29 {
30     static ObjectAssetLoader loader;
31     return loader;
32 }
33 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)34 void ObjectAssetLoader::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
35 {
36     executors_ = executors;
37 }
38 
Transfer(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const DistributedData::Asset & asset)39 bool ObjectAssetLoader::Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId,
40     const DistributedData::Asset& asset)
41 {
42     AssetInfo assetInfo;
43     assetInfo.uri = asset.uri;
44     assetInfo.assetName = asset.name;
45     ZLOGI("Start transfer, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
46         Anonymous::Change(deviceId).c_str(), Anonymous::Change(assetInfo.assetName).c_str());
47     auto block = std::make_shared<BlockData<std::tuple<bool, int32_t>>>(WAIT_TIME, std::tuple{ true, OBJECT_SUCCESS });
48     auto res = CloudSyncAssetManager::GetInstance().DownloadFile(userId, bundleName, deviceId, assetInfo,
49         [block](const std::string& uri, int32_t status) {
50             block->SetValue({ false, status });
51         });
52     if (res != OBJECT_SUCCESS) {
53         ZLOGE("fail, res: %{public}d, name: %{public}s, deviceId: %{public}s, bundleName: %{public}s", res,
54             Anonymous::Change(asset.name).c_str(), Anonymous::Change(deviceId).c_str(), bundleName.c_str());
55         return false;
56     }
57     auto [timeout, status] = block->GetValue();
58     if (timeout || status != OBJECT_SUCCESS) {
59         ZLOGE("fail, timeout: %{public}d, status: %{public}d, name: %{public}s, deviceId: %{public}s ", timeout,
60             status, Anonymous::Change(asset.name).c_str(), Anonymous::Change(deviceId).c_str());
61         return false;
62     }
63     ZLOGD("Transfer end, bundleName: %{public}s, deviceId: %{public}s, assetName: %{public}s", bundleName.c_str(),
64         Anonymous::Change(deviceId).c_str(), Anonymous::Change(assetInfo.assetName).c_str());
65     return true;
66 }
67 
TransferAssetsAsync(const int32_t userId,const std::string & bundleName,const std::string & deviceId,const std::vector<DistributedData::Asset> & assets,const TransferFunc & callback)68 void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::string& bundleName,
69     const std::string& deviceId, const std::vector<DistributedData::Asset>& assets, const TransferFunc& callback)
70 {
71     if (executors_ == nullptr) {
72         ZLOGE("executors is null, bundleName: %{public}s, deviceId: %{public}s, userId: %{public}d",
73             bundleName.c_str(), Anonymous::Change(deviceId).c_str(), userId);
74         callback(false);
75         return;
76     }
77     TransferTask task = { .callback = callback };
78     DistributedData::Assets downloadAssets;
79     for (auto& asset : assets) {
80         if (IsDownloaded(asset)) {
81             continue;
82         }
83         task.downloadAssets.insert(asset.uri);
84         downloadAssets.emplace_back(asset);
85     }
86     if (task.downloadAssets.empty()) {
87         callback(true);
88     }
89     tasks_.ComputeIfAbsent(++taskSeq_, [task](const uint32_t key) {
90         return task;
91     });
92     executors_->Execute([this, userId, bundleName, deviceId, downloadAssets]() {
93         bool result = true;
94         for (const auto& asset : downloadAssets) {
95             if (IsDownloaded(asset)) {
96                 FinishTask(asset.uri, result);
97                 continue;
98             }
99             if (IsDownloading(asset)) {
100                 continue;
101             }
102             downloading_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
103                 return asset.hash;
104             });
105             auto success = Transfer(userId, bundleName, deviceId, asset);
106             if (success) {
107                 std::lock_guard<std::mutex> lock(mutex_);
108                 downloading_.Erase(asset.uri);
109                 UpdateDownloaded(asset);
110             } else {
111                 downloading_.Erase(asset.uri);
112             }
113             result &= success;
114             FinishTask(asset.uri, result);
115         }
116     });
117 }
118 
FinishTask(const std::string & uri,bool result)119 void ObjectAssetLoader::FinishTask(const std::string& uri, bool result)
120 {
121     std::vector<uint32_t> finishedTasks;
122     tasks_.ForEach([&uri, &finishedTasks, result](auto& seq, auto& task) {
123         task.downloadAssets.erase(uri);
124         if (task.downloadAssets.size() == 0 && task.callback != nullptr) {
125             task.callback(result);
126             finishedTasks.emplace_back(seq);
127         }
128         return false;
129     });
130     for (auto taskId : finishedTasks) {
131         tasks_.Erase(taskId);
132     }
133 }
134 
UpdateDownloaded(const DistributedData::Asset & asset)135 void ObjectAssetLoader::UpdateDownloaded(const DistributedData::Asset& asset)
136 {
137     downloaded_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) {
138         return asset.hash;
139     });
140     assetQueue_.push(asset.uri);
141     if (assetQueue_.size() > LAST_DOWNLOAD_ASSET_SIZE) {
142         auto oldAsset = assetQueue_.front();
143         assetQueue_.pop();
144         downloaded_.Erase(oldAsset);
145     }
146 }
147 
IsDownloading(const DistributedData::Asset & asset)148 bool ObjectAssetLoader::IsDownloading(const DistributedData::Asset& asset)
149 {
150     auto [success, hash] = downloading_.Find(asset.uri);
151     if (success && hash == asset.hash) {
152         ZLOGD("asset is downloading. assetName:%{public}s", Anonymous::Change(asset.name).c_str());
153         return true;
154     }
155     return false;
156 }
157 
IsDownloaded(const DistributedData::Asset & asset)158 bool ObjectAssetLoader::IsDownloaded(const DistributedData::Asset& asset)
159 {
160     auto [success, hash] = downloaded_.Find(asset.uri);
161     if (success && hash == asset.hash) {
162         ZLOGD("asset is downloaded. assetName:%{public}s", Anonymous::Change(asset.name).c_str());
163         return true;
164     }
165     return false;
166 }
167 
PushAsset(int32_t userId,const sptr<AssetObj> & assetObj,const sptr<ObjectAssetsSendListener> & sendCallback)168 int32_t ObjectAssetLoader::PushAsset(int32_t userId, const sptr<AssetObj> &assetObj,
169     const sptr<ObjectAssetsSendListener> &sendCallback)
170 {
171     if (assetObj == nullptr) {
172         ZLOGE("PushAsset err, assetObj is null");
173         return OBJECT_INNER_ERROR;
174     }
175     ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
176         ObjectStore::PUSH_ASSETS, ObjectStore::IDLE);
177     ZLOGI("PushAsset start, userId:%{public}d, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
178         userId, assetObj->uris_.size(), assetObj->dstBundleName_.c_str(),
179         Anonymous::Change(assetObj->sessionId_).c_str());
180     auto status = Storage::DistributedFile::DistributedFileDaemonManager::GetInstance().PushAsset(userId, assetObj,
181         sendCallback);
182     if (status != OBJECT_SUCCESS) {
183         ZLOGE("PushAsset err status: %{public}d, asset size:%{public}zu, bundleName:%{public}s, sessionId:%{public}s",
184             status, assetObj->uris_.size(), assetObj->dstBundleName_.c_str(),
185             Anonymous::Change(assetObj->sessionId_).c_str());
186         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
187             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, status, ObjectStore::FINISHED);
188     }
189     return status;
190 }
191 
OnSendResult(const sptr<AssetObj> & assetObj,int32_t result)192 int32_t ObjectAssetsSendListener::OnSendResult(const sptr<AssetObj> &assetObj, int32_t result)
193 {
194     if (assetObj == nullptr) {
195         ZLOGE("OnSendResult error! status:%{public}d", result);
196         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
197             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
198         return result;
199     }
200     ZLOGI("OnSendResult, status:%{public}d, asset size:%{public}zu", result, assetObj->uris_.size());
201     if (result == OBJECT_SUCCESS) {
202         ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::SAVE,
203             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_SUCCESS);
204     } else {
205         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
206             ObjectStore::PUSH_ASSETS, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
207     }
208     return result;
209 }
210 } // namespace OHOS::DistributedObject