1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "db_errno.h"
16 #include "log_print.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_sync_utils.h"
19
20 namespace DistributedDB {
21
GetCloudPkVals(const VBucket & datum,const std::vector<std::string> & pkColNames,int64_t dataKey,std::vector<Type> & cloudPkVals)22 int GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey,
23 std::vector<Type> &cloudPkVals)
24 {
25 if (!cloudPkVals.empty()) {
26 LOGE("[CloudSyncer] Output paramater should be empty");
27 return -E_INVALID_ARGS;
28 }
29 for (const auto &pkColName : pkColNames) {
30 // if data is primary key or is a composite primary key, then use rowID as value
31 // The single primary key table, does not contain rowid.
32 if (pkColName == CloudDbConstant::ROW_ID_FIELD_NAME) {
33 cloudPkVals.push_back(dataKey);
34 continue;
35 }
36 auto iter = datum.find(pkColName);
37 if (iter == datum.end()) {
38 LOGE("[CloudSyncer] Cloud data do not contain expected primary field value");
39 return -E_CLOUD_ERROR;
40 }
41 cloudPkVals.push_back(iter->second);
42 }
43 return E_OK;
44 }
45
OpTypeToChangeType(OpType strategy)46 ChangeType OpTypeToChangeType(OpType strategy)
47 {
48 switch (strategy) {
49 case OpType::INSERT:
50 return OP_INSERT;
51 case OpType::DELETE:
52 return OP_DELETE;
53 case OpType::UPDATE:
54 return OP_UPDATE;
55 default:
56 return OP_BUTT;
57 }
58 }
59
IsSinglePrimaryKey(const std::vector<std::string> & pkColNames)60 bool IsSinglePrimaryKey(const std::vector<std::string> &pkColNames)
61 {
62 return pkColNames.size() == 1 && pkColNames[0] != CloudDbConstant::ROW_ID_FIELD_NAME;
63 }
64
RemoveDataExceptExtendInfo(VBucket & datum,const std::vector<std::string> & pkColNames)65 void RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames)
66 {
67 for (auto item = datum.begin(); item != datum.end();) {
68 const auto &key = item->first;
69 if (key != CloudDbConstant::GID_FIELD &&
70 key != CloudDbConstant::CREATE_FIELD &&
71 key != CloudDbConstant::MODIFY_FIELD &&
72 key != CloudDbConstant::DELETE_FIELD &&
73 key != CloudDbConstant::CURSOR_FIELD &&
74 (std::find(pkColNames.begin(), pkColNames.end(), key) == pkColNames.end())) {
75 item = datum.erase(item);
76 } else {
77 item++;
78 }
79 }
80 }
81
StatusToFlag(AssetStatus status)82 AssetOpType StatusToFlag(AssetStatus status)
83 {
84 switch (status) {
85 case AssetStatus::INSERT:
86 return AssetOpType::INSERT;
87 case AssetStatus::DELETE:
88 return AssetOpType::DELETE;
89 case AssetStatus::UPDATE:
90 return AssetOpType::UPDATE;
91 case AssetStatus::NORMAL:
92 return AssetOpType::NO_CHANGE;
93 default:
94 LOGW("[CloudSyncer] Unexpected Situation and won't be handled"
95 ", Caller should ensure that current situation won't occur");
96 return AssetOpType::NO_CHANGE;
97 }
98 }
99
StatusToFlagForAsset(Asset & asset)100 void StatusToFlagForAsset(Asset &asset)
101 {
102 asset.flag = static_cast<uint32_t>(StatusToFlag(static_cast<AssetStatus>(asset.status)));
103 asset.status = static_cast<uint32_t>(AssetStatus::NORMAL);
104 }
105
StatusToFlagForAssets(Assets & assets)106 void StatusToFlagForAssets(Assets &assets)
107 {
108 for (Asset &asset : assets) {
109 StatusToFlagForAsset(asset);
110 }
111 }
112
StatusToFlagForAssetsInRecord(const std::vector<Field> & fields,VBucket & record)113 void StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record)
114 {
115 for (const Field &field : fields) {
116 if (field.type == TYPE_INDEX<Assets> && record[field.colName].index() == TYPE_INDEX<Assets>) {
117 StatusToFlagForAssets(std::get<Assets>(record[field.colName]));
118 } else if (field.type == TYPE_INDEX<Asset> && record[field.colName].index() == TYPE_INDEX<Asset>) {
119 StatusToFlagForAsset(std::get<Asset>(record[field.colName]));
120 }
121 }
122 }
123
IsChngDataEmpty(const ChangedData & changedData)124 bool IsChngDataEmpty(const ChangedData &changedData)
125 {
126 return changedData.primaryData[ChangeType::OP_INSERT].empty() ||
127 changedData.primaryData[ChangeType::OP_UPDATE].empty() ||
128 changedData.primaryData[ChangeType::OP_DELETE].empty();
129 }
130
EqualInMsLevel(const Timestamp cmp,const Timestamp beCmp)131 bool EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp)
132 {
133 return cmp / CloudDbConstant::TEN_THOUSAND == beCmp / CloudDbConstant::TEN_THOUSAND;
134 }
135
NeedSaveData(const LogInfo & localLogInfo,const LogInfo & cloudLogInfo)136 bool NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo)
137 {
138 // if timeStamp, write timestamp, cloudGid are all the same,
139 // we thought that the datum is mostly be the same between cloud and local
140 // However, there are still slightly possibility that it may be created from different device,
141 // So, during the strategy policy [i.e. TagSyncDataStatus], the datum was tagged as UPDATE
142 // But we won't notify the datum
143 bool isSame = localLogInfo.timestamp == cloudLogInfo.timestamp &&
144 EqualInMsLevel(localLogInfo.wTimestamp, cloudLogInfo.wTimestamp) &&
145 localLogInfo.cloudGid == cloudLogInfo.cloudGid;
146 return !isSame;
147 }
148 }