1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud/asset_operation_utils.h"
16
17 #include <mutex>
18 #include "cloud/cloud_db_types.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
21 using RecordAssetOpType = AssetOperationUtils::RecordAssetOpType;
22 using CloudSyncAction = AssetOperationUtils::CloudSyncAction;
23 namespace {
24 std::once_flag g_init;
25 using Reaction = std::function<AssetOperationUtils::AssetOpType (const Asset &, const Assets &)>;
26 std::map<CloudSyncAction, Reaction> reactions;
GetReaction(const CloudSyncAction & action)27 Reaction GetReaction(const CloudSyncAction &action)
28 {
29 if (reactions.find(action) != reactions.end()) {
30 return reactions[action];
31 } else {
32 return reactions[CloudSyncAction::DEFAULT_ACTION];
33 }
34 }
35 }
36
CalAssetOperation(const VBucket & cacheAssets,const VBucket & dbAssets,const CloudSyncAction & action)37 RecordAssetOpType AssetOperationUtils::CalAssetOperation(const VBucket &cacheAssets,
38 const VBucket &dbAssets, const CloudSyncAction &action)
39 {
40 std::call_once(g_init, Init);
41 // switch produce function by action
42 Reaction reaction = GetReaction(action);
43 RecordAssetOpType res;
44 // check each cache asset with db asset by same col name and asset name
45 for (const auto &[colName, colData] : cacheAssets) {
46 auto checkAssets = GetAssets(colName, dbAssets);
47 if (TYPE_INDEX<Asset> == colData.index()) {
48 auto asset = std::get<Asset>(colData);
49 res[colName][asset.name] = reaction(asset, checkAssets);
50 } else if (TYPE_INDEX<Assets> == colData.index()) {
51 auto assets = std::get<Assets>(colData);
52 for (const auto &asset : assets) {
53 res[colName][asset.name] = reaction(asset, checkAssets);
54 }
55 }
56 }
57 return res;
58 }
59
CalAssetOperation(const std::string & colName,const Asset & cacheAsset,const VBucket & dbAssets,const AssetOperationUtils::CloudSyncAction & action)60 AssetOperationUtils::AssetOpType AssetOperationUtils::CalAssetOperation(const std::string &colName,
61 const Asset &cacheAsset, const VBucket &dbAssets, const AssetOperationUtils::CloudSyncAction &action)
62 {
63 std::call_once(g_init, Init);
64 // switch produce function by action
65 Reaction reaction = GetReaction(action);
66 return reaction(cacheAsset, GetAssets(colName, dbAssets));
67 }
68
EraseBitMask(uint32_t status)69 uint32_t AssetOperationUtils::EraseBitMask(uint32_t status)
70 {
71 return ((status << BIT_MASK_COUNT) >> BIT_MASK_COUNT);
72 }
73
UpdateAssetsFlag(std::vector<VBucket> & from,std::vector<VBucket> & target)74 void AssetOperationUtils::UpdateAssetsFlag(std::vector<VBucket> &from, std::vector<VBucket> &target)
75 {
76 if (from.size() != target.size()) {
77 LOGW("the num of VBucket are not equal when update assets flag.");
78 return;
79 }
80 for (size_t i = 0; i < from.size(); ++i) {
81 VBucket &fromRecord = from[i];
82 VBucket &targetRecord = target[i];
83 if (targetRecord.empty()) {
84 continue;
85 }
86 for (auto &[colName, colData] : targetRecord) {
87 auto fromAssets = GetAssets(colName, fromRecord);
88 MergeAssetsFlag(fromAssets, colData);
89 }
90 }
91 }
92
FilterDeleteAsset(VBucket & record)93 void AssetOperationUtils::FilterDeleteAsset(VBucket &record)
94 {
95 int filterCount = 0;
96 for (auto &item : record) {
97 if (item.second.index() == TYPE_INDEX<Asset>) {
98 auto &asset = std::get<Asset>(item.second);
99 if (EraseBitMask(asset.status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
100 item.second = Nil();
101 filterCount++;
102 }
103 continue;
104 }
105 if (item.second.index() != TYPE_INDEX<Assets>) {
106 continue;
107 }
108 auto &assets = std::get<Assets>(item.second);
109 auto it = assets.begin();
110 while (it != assets.end()) {
111 if (EraseBitMask(it->status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
112 it = assets.erase(it);
113 filterCount++;
114 }
115 it++;
116 }
117 }
118 if (filterCount > 0) {
119 LOGW("[AssetOperationUtils] Filter %d asset", filterCount);
120 }
121 }
122
Init()123 void AssetOperationUtils::Init()
124 {
125 reactions[CloudSyncAction::DEFAULT_ACTION] = DefaultOperation;
126 reactions[CloudSyncAction::START_DOWNLOAD] = CheckBeforeDownload;
127 reactions[CloudSyncAction::START_UPLOAD] = HandleIfExistAndSameStatus;
128 reactions[CloudSyncAction::END_DOWNLOAD] = CheckAfterDownload;
129 reactions[CloudSyncAction::END_UPLOAD] = CheckAfterUpload;
130 }
131
DefaultOperation(const Asset &,const Assets &)132 AssetOperationUtils::AssetOpType AssetOperationUtils::DefaultOperation(const Asset &, const Assets &)
133 {
134 return AssetOpType::HANDLE;
135 }
136
CheckBeforeDownload(const Asset & cacheAsset,const Assets & dbAssets)137 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckBeforeDownload(const Asset &cacheAsset,
138 const Assets &dbAssets)
139 {
140 return CheckWithDownload(true, cacheAsset, dbAssets);
141 }
142
CheckAfterDownload(const Asset & cacheAsset,const Assets & dbAssets)143 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterDownload(const Asset &cacheAsset,
144 const Assets &dbAssets)
145 {
146 return CheckWithDownload(false, cacheAsset, dbAssets);
147 }
148
CheckWithDownload(bool before,const Asset & cacheAsset,const Assets & dbAssets)149 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckWithDownload(bool before, const Asset &cacheAsset,
150 const Assets &dbAssets)
151 {
152 for (const auto &dbAsset : dbAssets) {
153 if (dbAsset.name != cacheAsset.name) {
154 continue;
155 }
156 if (EraseBitMask(dbAsset.status) == AssetStatus::DOWNLOADING) {
157 return AssetOpType::HANDLE;
158 }
159 return AssetOpType::NOT_HANDLE;
160 }
161 if (before) {
162 if (cacheAsset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL) ||
163 EraseBitMask(cacheAsset.status) == AssetStatus::ABNORMAL) {
164 return AssetOpType::NOT_HANDLE;
165 }
166 return (cacheAsset.flag == static_cast<uint32_t>(DistributedDB::AssetOpType::DELETE) &&
167 EraseBitMask(cacheAsset.status) != AssetStatus::DELETE) ?
168 AssetOpType::HANDLE : AssetOpType::NOT_HANDLE;
169 }
170 return AssetOpType::NOT_HANDLE;
171 }
172
CheckAfterUpload(const Asset & cacheAsset,const Assets & dbAssets)173 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterUpload(const Asset &cacheAsset, const Assets &dbAssets)
174 {
175 for (const auto &dbAsset : dbAssets) {
176 if (dbAsset.name != cacheAsset.name) {
177 continue;
178 }
179 if ((dbAsset.status & static_cast<uint32_t>(AssetStatus::UPLOADING)) ==
180 static_cast<uint32_t>(AssetStatus::UPLOADING)) {
181 return AssetOpType::HANDLE;
182 }
183 return AssetOpType::NOT_HANDLE;
184 }
185 return AssetOpType::NOT_HANDLE;
186 }
187
GetAssets(const std::string & colName,const VBucket & rowData)188 Assets AssetOperationUtils::GetAssets(const std::string &colName, const VBucket &rowData)
189 {
190 if (rowData.find(colName) == rowData.end()) {
191 return {};
192 }
193 Assets res;
194 auto value = rowData.at(colName);
195 if (TYPE_INDEX<Asset> == value.index()) {
196 res.push_back(std::get<Asset>(value));
197 } else if (TYPE_INDEX<Assets> == value.index()) {
198 for (const auto &asset : std::get<Assets>(value)) {
199 res.push_back(asset);
200 }
201 }
202 return res;
203 }
204
HandleIfExistAndSameStatus(const Asset & cacheAsset,const Assets & dbAssets)205 AssetOperationUtils::AssetOpType AssetOperationUtils::HandleIfExistAndSameStatus(const Asset &cacheAsset,
206 const Assets &dbAssets)
207 {
208 for (const auto &dbAsset : dbAssets) {
209 if (dbAsset.name != cacheAsset.name) {
210 continue;
211 }
212 if (dbAsset.status == cacheAsset.status) {
213 return AssetOpType::HANDLE;
214 }
215 return AssetOpType::NOT_HANDLE;
216 }
217 return AssetOpType::NOT_HANDLE;
218 }
219
MergeAssetFlag(const Assets & from,Asset & target)220 void AssetOperationUtils::MergeAssetFlag(const Assets &from, Asset &target)
221 {
222 for (const auto &fromAsset : from) {
223 if (fromAsset.name == target.name) {
224 target.flag = fromAsset.flag;
225 }
226 }
227 }
228
MergeAssetsFlag(const Assets & from,Type & target)229 void AssetOperationUtils::MergeAssetsFlag(const Assets &from, Type &target)
230 {
231 if (TYPE_INDEX<Asset> == target.index()) {
232 MergeAssetFlag(from, std::get<Asset>(target));
233 } else if (TYPE_INDEX<Assets> == target.index()) {
234 for (auto &targetAsset : std::get<Assets>(target)) {
235 MergeAssetFlag(from, targetAsset);
236 }
237 }
238 }
239 }