1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef CLOUD_DB_PROXY_H 17 #define CLOUD_DB_PROXY_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <shared_mutex> 22 #include "cloud/cloud_db_types.h" 23 #include "cloud/icloud_db.h" 24 #include "cloud/iAssetLoader.h" 25 26 namespace DistributedDB { 27 class CloudDBProxy { 28 public: 29 CloudDBProxy(); 30 ~CloudDBProxy() = default; 31 32 void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB); 33 34 int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs); 35 36 const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const; 37 38 void SwitchCloudDB(const std::string &user); 39 40 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 41 42 int BatchInsert(const std::string &tableName, std::vector<VBucket> &record, 43 std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount); 44 45 int BatchUpdate(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend, 46 Info &uploadInfo, uint32_t &retryCount); 47 48 int BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend, 49 Info &uploadInfo, uint32_t &retryCount); 50 51 int Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data); 52 53 std::pair<int, std::string> GetEmptyCursor(const std::string &tableName); 54 55 std::pair<int, uint64_t> Lock(); 56 57 int UnLock(); 58 59 int Close(); 60 61 int HeartBeat(); 62 63 bool IsNotExistCloudDB() const; 64 65 int Download(const std::string &tableName, const std::string &gid, const Type &prefix, 66 std::map<std::string, Assets> &assets); 67 68 int RemoveLocalAssets(const std::vector<Asset> &assets); 69 70 int RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix, 71 std::map<std::string, Assets> &assets); 72 73 void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback); 74 75 bool IsExistCloudVersionCallback() const; 76 77 std::pair<int, std::string> GetCloudVersion(const std::string &originVersion) const; 78 79 void SetPrepareTraceId(const std::string &traceId) const; 80 81 int BatchDownload(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &downloadAssets); 82 83 int BatchRemoveLocalAssets(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &removeAssets); 84 85 void CancelDownload(); 86 87 static int GetInnerErrorCode(DBStatus status); 88 protected: 89 class CloudActionContext { 90 public: 91 CloudActionContext(); 92 ~CloudActionContext() = default; 93 94 void MoveInRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend); 95 96 void MoveInExtend(std::vector<VBucket> &extend); 97 98 void MoveOutRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend); 99 100 void MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data); 101 102 void MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data); 103 104 void MoveInLockStatus(std::pair<int, uint64_t> &lockStatus); 105 106 void MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus); 107 108 void MoveInCursorStatus(std::pair<int, std::string> &cursorStatus); 109 110 void MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus); 111 112 void SetActionRes(int res); 113 114 int GetActionRes(); 115 116 void FinishAndNotify(); 117 118 Info GetInfo(); 119 120 void SetInfo(const CloudWaterType &type, DBStatus status, uint32_t size); 121 122 void SetTableName(const std::string &tableName); 123 124 std::string GetTableName(); 125 126 uint32_t GetRetryCount(); 127 private: 128 static bool IsEmptyAssetId(const Assets &assets); 129 130 static bool IsRecordActionFail(const VBucket &extend, const CloudWaterType &type, DBStatus status); 131 132 std::mutex actionMutex_; 133 std::condition_variable actionCv_; 134 bool actionFinished_; 135 int actionRes_; 136 uint32_t totalCount_; 137 uint32_t successCount_; 138 uint32_t failedCount_; 139 uint32_t retryCount_; 140 141 std::string tableName_; 142 std::vector<VBucket> record_; 143 std::vector<VBucket> extend_; 144 VBucket queryExtend_; 145 std::vector<VBucket> data_; 146 std::pair<int, uint64_t> lockStatus_; 147 std::pair<int, std::string> cursorStatus_; 148 }; 149 enum class InnerActionCode : uint8_t { 150 INSERT = 0, 151 UPDATE, 152 DELETE, 153 QUERY, 154 GET_EMPTY_CURSOR, 155 LOCK, 156 UNLOCK, 157 HEARTBEAT, 158 // add action code before INVALID_ACTION 159 INVALID_ACTION 160 }; 161 162 enum InnerBatchOpType : uint8_t { 163 BATCH_DOWNLOAD = 0, 164 BATCH_REMOVE_LOCAL 165 }; 166 167 static int InnerAction(const std::shared_ptr<CloudActionContext> &context, 168 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action); 169 170 static DBStatus DMLActionTask(const std::shared_ptr<CloudActionContext> &context, 171 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action); 172 173 static void InnerActionTask(const std::shared_ptr<CloudActionContext> &context, 174 const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action); 175 176 static DBStatus InnerActionLock(const std::shared_ptr<CloudActionContext> &context, 177 const std::shared_ptr<ICloudDb> &cloudDb); 178 179 static DBStatus InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context, 180 const std::shared_ptr<ICloudDb> &cloudDb); 181 182 static DBStatus QueryAction(const std::shared_ptr<CloudActionContext> &context, 183 const std::shared_ptr<ICloudDb> &cloudDb); 184 185 int BatchOperateAssetsWithAllRecords(const std::string &tableName, 186 std::vector<IAssetLoader::AssetRecord> &allRecords, const InnerBatchOpType operationType); 187 188 int BatchOperateAssetsInner(const std::string &tableName, 189 std::vector<IAssetLoader::AssetRecord> &necessaryRecords, const InnerBatchOpType operationType); 190 191 // save record with assets in nonEmptyRecords, return the indexes of these records in the original vector 192 static std::vector<int> GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> &originalRecords, 193 std::vector<IAssetLoader::AssetRecord> &nonEmptyRecords); 194 195 // copy newRecords's assets and status back to originalRecords, based on indexes 196 static void CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> &originalRecords, const std::vector<int> &indexes, 197 std::vector<IAssetLoader::AssetRecord> &newRecords); 198 199 static void RecordSyncDataTimeStampLog(std::vector<VBucket> &data, InnerActionCode action); 200 201 void FillErrorToExtend(int error, std::vector<VBucket> &extend); 202 203 mutable std::shared_mutex cloudMutex_; 204 mutable std::shared_mutex assetLoaderMutex_; 205 std::shared_ptr<ICloudDb> iCloudDb_; 206 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs_; 207 std::shared_ptr<IAssetLoader> iAssetLoader_; 208 std::atomic<bool> isDownloading_; 209 210 mutable std::mutex genVersionMutex_; 211 GenerateCloudVersionCallback genVersionCallback_; 212 }; 213 } 214 #endif // CLOUD_DB_PROXY_H 215