• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CLOUD_DB_PROXY_H
17 #define CLOUD_DB_PROXY_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <shared_mutex>
22 #include "cloud/cloud_db_types.h"
23 #include "cloud/icloud_db.h"
24 #include "cloud/iAssetLoader.h"
25 
26 namespace DistributedDB {
27 class CloudDBProxy {
28 public:
29     CloudDBProxy();
30     ~CloudDBProxy() = default;
31 
32     void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB);
33 
34     int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs);
35 
36     const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const;
37 
38     void SwitchCloudDB(const std::string &user);
39 
40     void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader);
41 
42     int BatchInsert(const std::string &tableName, std::vector<VBucket> &record,
43         std::vector<VBucket> &extend, Info &uploadInfo, uint32_t &retryCount);
44 
45     int BatchUpdate(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
46         Info &uploadInfo, uint32_t &retryCount);
47 
48     int BatchDelete(const std::string &tableName, std::vector<VBucket> &record, std::vector<VBucket> &extend,
49         Info &uploadInfo, uint32_t &retryCount);
50 
51     int Query(const std::string &tableName, VBucket &extend, std::vector<VBucket> &data);
52 
53     std::pair<int, std::string> GetEmptyCursor(const std::string &tableName);
54 
55     std::pair<int, uint64_t> Lock();
56 
57     int UnLock();
58 
59     int Close();
60 
61     int HeartBeat();
62 
63     bool IsNotExistCloudDB() const;
64 
65     int Download(const std::string &tableName, const std::string &gid, const Type &prefix,
66         std::map<std::string, Assets> &assets);
67 
68     int RemoveLocalAssets(const std::vector<Asset> &assets);
69 
70     int RemoveLocalAssets(const std::string &tableName, const std::string &gid, const Type &prefix,
71         std::map<std::string, Assets> &assets);
72 
73     void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback);
74 
75     bool IsExistCloudVersionCallback() const;
76 
77     std::pair<int, std::string> GetCloudVersion(const std::string &originVersion) const;
78 
79     void SetPrepareTraceId(const std::string &traceId) const;
80 
81     int BatchDownload(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &downloadAssets);
82 
83     int BatchRemoveLocalAssets(const std::string &tableName, std::vector<IAssetLoader::AssetRecord> &removeAssets);
84 
85     void CancelDownload();
86 
87     static int GetInnerErrorCode(DBStatus status);
88 protected:
89     class CloudActionContext {
90     public:
91         CloudActionContext();
92         ~CloudActionContext() = default;
93 
94         void MoveInRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend);
95 
96         void MoveInExtend(std::vector<VBucket> &extend);
97 
98         void MoveOutRecordAndExtend(std::vector<VBucket> &record, std::vector<VBucket> &extend);
99 
100         void MoveInQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data);
101 
102         void MoveOutQueryExtendAndData(VBucket &extend, std::vector<VBucket> &data);
103 
104         void MoveInLockStatus(std::pair<int, uint64_t> &lockStatus);
105 
106         void MoveOutLockStatus(std::pair<int, uint64_t> &lockStatus);
107 
108         void MoveInCursorStatus(std::pair<int, std::string> &cursorStatus);
109 
110         void MoveOutCursorStatus(std::pair<int, std::string> &cursorStatus);
111 
112         void SetActionRes(int res);
113 
114         int GetActionRes();
115 
116         void FinishAndNotify();
117 
118         Info GetInfo();
119 
120         void SetInfo(const CloudWaterType &type, DBStatus status, uint32_t size);
121 
122         void SetTableName(const std::string &tableName);
123 
124         std::string GetTableName();
125 
126         uint32_t GetRetryCount();
127     private:
128         static bool IsEmptyAssetId(const Assets &assets);
129 
130         static bool IsRecordActionFail(const VBucket &extend, const CloudWaterType &type, DBStatus status);
131 
132         std::mutex actionMutex_;
133         std::condition_variable actionCv_;
134         bool actionFinished_;
135         int actionRes_;
136         uint32_t totalCount_;
137         uint32_t successCount_;
138         uint32_t failedCount_;
139         uint32_t retryCount_;
140 
141         std::string tableName_;
142         std::vector<VBucket> record_;
143         std::vector<VBucket> extend_;
144         VBucket queryExtend_;
145         std::vector<VBucket> data_;
146         std::pair<int, uint64_t> lockStatus_;
147         std::pair<int, std::string> cursorStatus_;
148     };
149     enum class InnerActionCode : uint8_t {
150         INSERT = 0,
151         UPDATE,
152         DELETE,
153         QUERY,
154         GET_EMPTY_CURSOR,
155         LOCK,
156         UNLOCK,
157         HEARTBEAT,
158         // add action code before INVALID_ACTION
159         INVALID_ACTION
160     };
161 
162     enum InnerBatchOpType : uint8_t {
163         BATCH_DOWNLOAD = 0,
164         BATCH_REMOVE_LOCAL
165     };
166 
167     static int InnerAction(const std::shared_ptr<CloudActionContext> &context,
168         const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action);
169 
170     static DBStatus DMLActionTask(const std::shared_ptr<CloudActionContext> &context,
171         const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action);
172 
173     static void InnerActionTask(const std::shared_ptr<CloudActionContext> &context,
174         const std::shared_ptr<ICloudDb> &cloudDb, InnerActionCode action);
175 
176     static DBStatus InnerActionLock(const std::shared_ptr<CloudActionContext> &context,
177         const std::shared_ptr<ICloudDb> &cloudDb);
178 
179     static DBStatus InnerActionGetEmptyCursor(const std::shared_ptr<CloudActionContext> &context,
180         const std::shared_ptr<ICloudDb> &cloudDb);
181 
182     static DBStatus QueryAction(const std::shared_ptr<CloudActionContext> &context,
183         const std::shared_ptr<ICloudDb> &cloudDb);
184 
185     int BatchOperateAssetsWithAllRecords(const std::string &tableName,
186         std::vector<IAssetLoader::AssetRecord> &allRecords, const InnerBatchOpType operationType);
187 
188     int BatchOperateAssetsInner(const std::string &tableName,
189         std::vector<IAssetLoader::AssetRecord> &necessaryRecords, const InnerBatchOpType operationType);
190 
191     // save record with assets in nonEmptyRecords, return the indexes of these records in the original vector
192     static std::vector<int> GetNotEmptyAssetRecords(std::vector<IAssetLoader::AssetRecord> &originalRecords,
193         std::vector<IAssetLoader::AssetRecord> &nonEmptyRecords);
194 
195     // copy newRecords's assets and status back to originalRecords, based on indexes
196     static void CopyAssetsBack(std::vector<IAssetLoader::AssetRecord> &originalRecords, const std::vector<int> &indexes,
197         std::vector<IAssetLoader::AssetRecord> &newRecords);
198 
199     static void RecordSyncDataTimeStampLog(std::vector<VBucket> &data, InnerActionCode action);
200 
201     void FillErrorToExtend(int error, std::vector<VBucket> &extend);
202 
203     mutable std::shared_mutex cloudMutex_;
204     mutable std::shared_mutex assetLoaderMutex_;
205     std::shared_ptr<ICloudDb> iCloudDb_;
206     std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs_;
207     std::shared_ptr<IAssetLoader> iAssetLoader_;
208     std::atomic<bool> isDownloading_;
209 
210     mutable std::mutex genVersionMutex_;
211     GenerateCloudVersionCallback genVersionCallback_;
212 };
213 }
214 #endif // CLOUD_DB_PROXY_H
215