1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef CLOUD_SYNC_UTILS_H 16 #define CLOUD_SYNC_UTILS_H 17 18 #include <cstdint> 19 #include <string> 20 #include "cloud/cloud_store_types.h" 21 #include "cloud_syncer.h" 22 #include "icloud_sync_storage_interface.h" 23 24 namespace DistributedDB { 25 class CloudSyncUtils { 26 public: 27 static constexpr const int GID_INDEX = 0; 28 static constexpr const int PREFIX_INDEX = 1; 29 static constexpr const int STRATEGY_INDEX = 2; 30 static constexpr const int ASSETS_INDEX = 3; 31 static constexpr const int HASH_KEY_INDEX = 4; 32 static constexpr const int PRIMARY_KEY_INDEX = 5; 33 static constexpr const int TIMESTAMP_INDEX = 6; 34 35 static int GetCloudPkVals(const VBucket &datum, const std::vector<std::string> &pkColNames, int64_t dataKey, 36 std::vector<Type> &cloudPkVals); 37 38 static ChangeType OpTypeToChangeType(OpType strategy); 39 40 static bool IsSinglePrimaryKey(const std::vector<std::string> &pKColNames); 41 42 static void RemoveDataExceptExtendInfo(VBucket &datum, const std::vector<std::string> &pkColNames); 43 44 static AssetOpType StatusToFlag(AssetStatus status); 45 46 static void StatusToFlagForAsset(Asset &asset); 47 48 static void StatusToFlagForAssets(Assets &assets); 49 50 static void StatusToFlagForAssetsInRecord(const std::vector<Field> &fields, VBucket &record); 51 52 static bool IsChangeDataEmpty(const ChangedData &changedData); 53 54 static bool EqualInMsLevel(const Timestamp cmp, const Timestamp beCmp); 55 56 static bool NeedSaveData(const LogInfo &localLogInfo, const LogInfo &cloudLogInfo); 57 58 static int CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode); 59 60 static LogInfo GetCloudLogInfo(VBucket &datum); 61 62 static int SaveChangedDataByType(const VBucket &datum, ChangedData &changedData, const DataInfoWithLog &localInfo, 63 ChangeType type); 64 65 static int CheckCloudSyncDataValid(const CloudSyncData &uploadData, const std::string &tableName, int64_t count); 66 67 static void ClearCloudSyncData(CloudSyncData &uploadData); 68 69 static int GetWaterMarkAndUpdateTime(std::vector<VBucket>& extend, Timestamp &waterMark); 70 71 static bool CheckCloudSyncDataEmpty(const CloudSyncData &uploadData); 72 73 static void ModifyCloudDataTime(VBucket &data); 74 75 static int UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count, uint64_t taskId, 76 Timestamp &waterMark); 77 78 static void UpdateLocalCache(OpType opType, const LogInfo &cloudInfo, 79 const LogInfo &localInfo, std::map<std::string, LogInfo> &localLogInfoCache); 80 81 static int SaveChangedData(ICloudSyncer::SyncParam ¶m, size_t dataIndex, const ICloudSyncer::DataInfo &dataInfo, 82 std::vector<std::pair<Key, size_t>> &deletedList); 83 84 static void ClearWithoutData(ICloudSyncer::SyncParam ¶m); 85 86 static bool IsSkipAssetsMissingRecord(const std::vector<VBucket> &extend); 87 88 static int FillAssetIdToAssets(CloudSyncBatch &data, int errorCode, const CloudWaterType &type); 89 90 static int FillAssetIdToAssetData(const Type &extend, Type &assetData); 91 92 static void FillAssetIdToAssetsData(const Assets &extend, Assets &assets); 93 94 static bool CheckIfContainsInsertAssets(const Type &assetData); 95 96 static void UpdateAssetsFlag(CloudSyncData &uploadData); 97 98 static OpType CalOpType(ICloudSyncer::SyncParam ¶m, size_t dataIndex); 99 100 static CloudSyncer::CloudTaskInfo InitCompensatedSyncTaskInfo(); 101 102 static CloudSyncer::CloudTaskInfo InitCompensatedSyncTaskInfo(const CloudSyncOption &option, 103 const SyncProcessCallback &onProcess); 104 105 static CloudSyncer::CloudTaskInfo InitCompensatedSyncTaskInfo(const CloudSyncer::CloudTaskInfo &oriTaskInfo); 106 107 static void CheckQueryCloudData(std::string &traceId, DownloadData &downloadData, 108 std::vector<std::string> &pkColNames); 109 110 static bool IsNeedUpdateAsset(const VBucket &data); 111 112 static std::tuple<int, DownloadList, ChangedData> GetDownloadListByGid(const std::shared_ptr<StorageProxy> &proxy, 113 const std::vector<std::string> &data, const std::string &table); 114 115 static void UpdateMaxTimeWithDownloadList(const DownloadList &downloadList, const std::string &table, 116 std::map<std::string, int64_t> &downloadBeginTime); 117 118 static bool IsContainDownloading(const DownloadAssetUnit &downloadAssetUnit); 119 120 static int GetDownloadAssetsOnlyMapFromDownLoadData( 121 size_t idx, ICloudSyncer::SyncParam ¶m, std::map<std::string, Assets> &downloadAssetsMap); 122 123 static int NotifyChangeData(const std::string &dev, const std::shared_ptr<StorageProxy> &proxy, 124 ChangedData &&changedData); 125 126 static int GetQueryAndUsersForCompensatedSync(bool isQueryDownloadRecords, 127 std::shared_ptr<StorageProxy> &storageProxy, std::vector<std::string> &users, 128 std::vector<QuerySyncObject> &syncQuery); 129 130 static void GetUserListForCompensatedSync( 131 CloudDBProxy &cloudDB, const std::vector<std::string> &users, std::vector<std::string> &userList); 132 133 static bool SetAssetsMapByCloudGid(std::vector<std::string> &cloudGid, const AssetsMap &groupAssetsMap, 134 std::map<std::string, AssetsMap> &gidAssetsMap); 135 136 static bool CheckAssetsOnlyIsEmptyInGroup( 137 const std::map<std::string, AssetsMap> &gidAssetsMap, const AssetsMap &assetsMap); 138 139 static bool IsAssetOnlyData(VBucket &queryData, AssetsMap &assetsMap, bool isDownloading); 140 141 static int ClearCloudWatermark(const std::vector<std::string> &tableNameList, 142 std::shared_ptr<StorageProxy> &storageProxy); 143 144 static bool HaveReferenceOrReferenceByTable( 145 const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy); 146 147 static int StartTransactionIfNeed( 148 const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy); 149 150 static void EndTransactionIfNeed( 151 const int &errCode, const CloudSyncer::CloudTaskInfo &taskInfo, std::shared_ptr<StorageProxy> &storageProxy); 152 153 static bool CanStartAsyncDownload(int scheduleCount); 154 private: 155 static void InsertOrReplaceChangedDataByType(ChangeType type, std::vector<Type> &pkVal, 156 ChangedData &changedData); 157 }; 158 } 159 #endif // CLOUD_SYNC_UTILS_H