• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud/asset_operation_utils.h"
16 
17 #include <mutex>
18 #include "cloud/cloud_db_types.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
21 using RecordAssetOpType = AssetOperationUtils::RecordAssetOpType;
22 using CloudSyncAction = AssetOperationUtils::CloudSyncAction;
23 namespace {
24 std::once_flag g_init;
25 using Reaction = std::function<AssetOperationUtils::AssetOpType (const Asset &, const Assets &)>;
26 std::map<CloudSyncAction, Reaction> reactions;
GetReaction(const CloudSyncAction & action)27 Reaction GetReaction(const CloudSyncAction &action)
28 {
29     if (reactions.find(action) != reactions.end()) {
30         return reactions[action];
31     } else {
32         return reactions[CloudSyncAction::DEFAULT_ACTION];
33     }
34 }
35 }
36 
CalAssetOperation(const VBucket & cacheAssets,const VBucket & dbAssets,const CloudSyncAction & action)37 RecordAssetOpType AssetOperationUtils::CalAssetOperation(const VBucket &cacheAssets,
38     const VBucket &dbAssets, const CloudSyncAction &action)
39 {
40     std::call_once(g_init, Init);
41     // switch produce function by action
42     Reaction reaction = GetReaction(action);
43     RecordAssetOpType res;
44     // check each cache asset with db asset by same col name and asset name
45     for (const auto &[colName, colData] : cacheAssets) {
46         auto checkAssets = GetAssets(colName, dbAssets);
47         if (TYPE_INDEX<Asset> == colData.index()) {
48             auto asset = std::get<Asset>(colData);
49             res[colName][asset.name] = reaction(asset, checkAssets);
50         } else if (TYPE_INDEX<Assets> == colData.index()) {
51             auto assets = std::get<Assets>(colData);
52             for (const auto &asset : assets) {
53                 res[colName][asset.name] = reaction(asset, checkAssets);
54             }
55         }
56     }
57     return res;
58 }
59 
CalAssetOperation(const std::string & colName,const Asset & cacheAsset,const VBucket & dbAssets,const AssetOperationUtils::CloudSyncAction & action)60 AssetOperationUtils::AssetOpType AssetOperationUtils::CalAssetOperation(const std::string &colName,
61     const Asset &cacheAsset, const VBucket &dbAssets, const AssetOperationUtils::CloudSyncAction &action)
62 {
63     std::call_once(g_init, Init);
64     // switch produce function by action
65     Reaction reaction = GetReaction(action);
66     return reaction(cacheAsset, GetAssets(colName, dbAssets));
67 }
68 
EraseBitMask(uint32_t status)69 uint32_t AssetOperationUtils::EraseBitMask(uint32_t status)
70 {
71     return ((status << BIT_MASK_COUNT) >> BIT_MASK_COUNT);
72 }
73 
UpdateAssetsFlag(std::vector<VBucket> & from,std::vector<VBucket> & target)74 void AssetOperationUtils::UpdateAssetsFlag(std::vector<VBucket> &from, std::vector<VBucket> &target)
75 {
76     if (from.size() != target.size()) {
77         LOGW("the num of VBucket are not equal when update assets flag.");
78         return;
79     }
80     for (size_t i = 0; i < from.size(); ++i) {
81         VBucket &fromRecord = from[i];
82         VBucket &targetRecord = target[i];
83         if (targetRecord.empty()) {
84             continue;
85         }
86         for (auto &[colName, colData] : targetRecord) {
87             auto fromAssets = GetAssets(colName, fromRecord);
88             MergeAssetsFlag(fromAssets, colData);
89         }
90     }
91 }
92 
FilterDeleteAsset(VBucket & record)93 void AssetOperationUtils::FilterDeleteAsset(VBucket &record)
94 {
95     int filterCount = 0;
96     for (auto &item : record) {
97         if (item.second.index() == TYPE_INDEX<Asset>) {
98             auto &asset = std::get<Asset>(item.second);
99             if (EraseBitMask(asset.status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
100                 item.second = Nil();
101                 filterCount++;
102             }
103             continue;
104         }
105         if (item.second.index() != TYPE_INDEX<Assets>) {
106             continue;
107         }
108         auto &assets = std::get<Assets>(item.second);
109         auto it = assets.begin();
110         while (it != assets.end()) {
111             if (EraseBitMask(it->status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
112                 it = assets.erase(it);
113                 filterCount++;
114             }
115             it++;
116         }
117     }
118     if (filterCount > 0) {
119         LOGW("[AssetOperationUtils] Filter %d asset", filterCount);
120     }
121 }
122 
Init()123 void AssetOperationUtils::Init()
124 {
125     reactions[CloudSyncAction::DEFAULT_ACTION] = DefaultOperation;
126     reactions[CloudSyncAction::START_DOWNLOAD] = CheckBeforeDownload;
127     reactions[CloudSyncAction::START_UPLOAD] = HandleIfExistAndSameStatus;
128     reactions[CloudSyncAction::END_DOWNLOAD] = CheckAfterDownload;
129     reactions[CloudSyncAction::END_UPLOAD] = CheckAfterUpload;
130 }
131 
DefaultOperation(const Asset &,const Assets &)132 AssetOperationUtils::AssetOpType AssetOperationUtils::DefaultOperation(const Asset &, const Assets &)
133 {
134     return AssetOpType::HANDLE;
135 }
136 
CheckBeforeDownload(const Asset & cacheAsset,const Assets & dbAssets)137 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckBeforeDownload(const Asset &cacheAsset,
138     const Assets &dbAssets)
139 {
140     return CheckWithDownload(true, cacheAsset, dbAssets);
141 }
142 
CheckAfterDownload(const Asset & cacheAsset,const Assets & dbAssets)143 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterDownload(const Asset &cacheAsset,
144     const Assets &dbAssets)
145 {
146     return CheckWithDownload(false, cacheAsset, dbAssets);
147 }
148 
CheckWithDownload(bool before,const Asset & cacheAsset,const Assets & dbAssets)149 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckWithDownload(bool before, const Asset &cacheAsset,
150     const Assets &dbAssets)
151 {
152     for (const auto &dbAsset : dbAssets) {
153         if (dbAsset.name != cacheAsset.name) {
154             continue;
155         }
156         if (EraseBitMask(dbAsset.status) == AssetStatus::DOWNLOADING) {
157             return AssetOpType::HANDLE;
158         }
159         return AssetOpType::NOT_HANDLE;
160     }
161     if (before) {
162         if (cacheAsset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL) ||
163             EraseBitMask(cacheAsset.status) == AssetStatus::ABNORMAL) {
164             return AssetOpType::NOT_HANDLE;
165         }
166         return (cacheAsset.flag == static_cast<uint32_t>(DistributedDB::AssetOpType::DELETE) &&
167             EraseBitMask(cacheAsset.status) != AssetStatus::DELETE) ?
168             AssetOpType::HANDLE : AssetOpType::NOT_HANDLE;
169     }
170     return AssetOpType::NOT_HANDLE;
171 }
172 
CheckAfterUpload(const Asset & cacheAsset,const Assets & dbAssets)173 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterUpload(const Asset &cacheAsset, const Assets &dbAssets)
174 {
175     for (const auto &dbAsset : dbAssets) {
176         if (dbAsset.name != cacheAsset.name) {
177             continue;
178         }
179         if ((dbAsset.status & static_cast<uint32_t>(AssetStatus::UPLOADING)) ==
180             static_cast<uint32_t>(AssetStatus::UPLOADING)) {
181             return AssetOpType::HANDLE;
182         }
183         return AssetOpType::NOT_HANDLE;
184     }
185     return AssetOpType::NOT_HANDLE;
186 }
187 
GetAssets(const std::string & colName,const VBucket & rowData)188 Assets AssetOperationUtils::GetAssets(const std::string &colName, const VBucket &rowData)
189 {
190     if (rowData.find(colName) == rowData.end()) {
191         return {};
192     }
193     Assets res;
194     auto value = rowData.at(colName);
195     if (TYPE_INDEX<Asset> == value.index()) {
196         res.push_back(std::get<Asset>(value));
197     } else if (TYPE_INDEX<Assets> == value.index()) {
198         for (const auto &asset : std::get<Assets>(value)) {
199             res.push_back(asset);
200         }
201     }
202     return res;
203 }
204 
HandleIfExistAndSameStatus(const Asset & cacheAsset,const Assets & dbAssets)205 AssetOperationUtils::AssetOpType AssetOperationUtils::HandleIfExistAndSameStatus(const Asset &cacheAsset,
206     const Assets &dbAssets)
207 {
208     for (const auto &dbAsset : dbAssets) {
209         if (dbAsset.name != cacheAsset.name) {
210             continue;
211         }
212         if (dbAsset.status == cacheAsset.status) {
213             return AssetOpType::HANDLE;
214         }
215         return AssetOpType::NOT_HANDLE;
216     }
217     return AssetOpType::NOT_HANDLE;
218 }
219 
MergeAssetFlag(const Assets & from,Asset & target)220 void AssetOperationUtils::MergeAssetFlag(const Assets &from, Asset &target)
221 {
222     for (const auto &fromAsset : from) {
223         if (fromAsset.name == target.name) {
224             target.flag = fromAsset.flag;
225         }
226     }
227 }
228 
MergeAssetsFlag(const Assets & from,Type & target)229 void AssetOperationUtils::MergeAssetsFlag(const Assets &from, Type &target)
230 {
231     if (TYPE_INDEX<Asset> == target.index()) {
232         MergeAssetFlag(from, std::get<Asset>(target));
233     } else if (TYPE_INDEX<Assets> == target.index()) {
234         for (auto &targetAsset : std::get<Assets>(target)) {
235             MergeAssetFlag(from, targetAsset);
236         }
237     }
238 }
239 }