1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_proxy.h"
17
18 #include "cloud/schema_mgr.h"
19 #include "store_types.h"
20
21 namespace DistributedDB {
StorageProxy(ICloudSyncStorageInterface * iCloud)22 StorageProxy::StorageProxy(ICloudSyncStorageInterface *iCloud)
23 :store_(iCloud),
24 transactionExeFlag_(false),
25 isWrite_(false)
26 {
27 }
28
GetCloudDb(ICloudSyncStorageInterface * iCloud)29 std::shared_ptr<StorageProxy> StorageProxy::GetCloudDb(ICloudSyncStorageInterface *iCloud)
30 {
31 std::shared_ptr<StorageProxy> proxy = std::make_shared<StorageProxy>(iCloud);
32 proxy->Init();
33 return proxy;
34 }
35
Init()36 void StorageProxy::Init()
37 {
38 cloudMetaData_ = std::make_shared<CloudMetaData>(store_);
39 }
40
Close()41 int StorageProxy::Close()
42 {
43 std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
44 if (transactionExeFlag_.load()) {
45 LOGE("the transaction has been started, storage proxy can not closed");
46 return -E_BUSY;
47 }
48 store_ = nullptr;
49 cloudMetaData_ = nullptr;
50 return E_OK;
51 }
52
GetLocalWaterMark(const std::string & tableName,Timestamp & localMark)53 int StorageProxy::GetLocalWaterMark(const std::string &tableName, Timestamp &localMark)
54 {
55 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
56 if (cloudMetaData_ == nullptr) {
57 return -E_INVALID_DB;
58 }
59 if (transactionExeFlag_.load() && isWrite_.load()) {
60 LOGE("the write transaction has been started, can not get meta");
61 return -E_BUSY;
62 }
63 return cloudMetaData_->GetLocalWaterMark(tableName, localMark);
64 }
65
PutLocalWaterMark(const std::string & tableName,Timestamp & localMark)66 int StorageProxy::PutLocalWaterMark(const std::string &tableName, Timestamp &localMark)
67 {
68 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
69 if (cloudMetaData_ == nullptr) {
70 return -E_INVALID_DB;
71 }
72 if (transactionExeFlag_.load() && isWrite_.load()) {
73 LOGE("the write transaction has been started, can not put meta");
74 return -E_BUSY;
75 }
76 return cloudMetaData_->SetLocalWaterMark(tableName, localMark);
77 }
78
GetCloudWaterMark(const std::string & tableName,std::string & cloudMark)79 int StorageProxy::GetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
80 {
81 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
82 if (cloudMetaData_ == nullptr) {
83 return -E_INVALID_DB;
84 }
85 return cloudMetaData_->GetCloudWaterMark(tableName, cloudMark);
86 }
87
SetCloudWaterMark(const std::string & tableName,std::string & cloudMark)88 int StorageProxy::SetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
89 {
90 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
91 if (cloudMetaData_ == nullptr) {
92 return -E_INVALID_DB;
93 }
94 return cloudMetaData_->SetCloudWaterMark(tableName, cloudMark);
95 }
96
StartTransaction(TransactType type)97 int StorageProxy::StartTransaction(TransactType type)
98 {
99 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
100 if (store_ == nullptr) {
101 return -E_INVALID_DB;
102 }
103 int errCode = store_->StartTransaction(type);
104 if (errCode == E_OK) {
105 transactionExeFlag_.store(true);
106 isWrite_.store(type == TransactType::IMMEDIATE);
107 }
108 return errCode;
109 }
110
Commit()111 int StorageProxy::Commit()
112 {
113 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
114 if (store_ == nullptr) {
115 return -E_INVALID_DB;
116 }
117 int errCode = store_->Commit();
118 if (errCode == E_OK) {
119 transactionExeFlag_.store(false);
120 }
121 return errCode;
122 }
123
Rollback()124 int StorageProxy::Rollback()
125 {
126 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
127 if (store_ == nullptr) {
128 return -E_INVALID_DB;
129 }
130 int errCode = store_->Rollback();
131 if (errCode == E_OK) {
132 transactionExeFlag_.store(false);
133 }
134 return errCode;
135 }
136
GetUploadCount(const std::string & tableName,const Timestamp & localMark,const bool isCloudForcePush,int64_t & count)137 int StorageProxy::GetUploadCount(const std::string &tableName, const Timestamp &localMark,
138 const bool isCloudForcePush, int64_t &count)
139 {
140 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
141 if (store_ == nullptr) {
142 return -E_INVALID_DB;
143 }
144 if (!transactionExeFlag_.load()) {
145 LOGE("the transaction has not been started");
146 return -E_TRANSACT_STATE;
147 }
148 return store_->GetUploadCount(tableName, localMark, isCloudForcePush, count);
149 }
150
FillCloudGid(const CloudSyncData & data)151 int StorageProxy::FillCloudGid(const CloudSyncData &data)
152 {
153 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
154 if (store_ == nullptr) {
155 return -E_INVALID_DB;
156 }
157 if (!transactionExeFlag_.load()) {
158 LOGE("the transaction has not been started");
159 return -E_TRANSACT_STATE;
160 }
161 return store_->FillCloudGid(data);
162 }
163
GetCloudData(const std::string & tableName,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)164 int StorageProxy::GetCloudData(const std::string &tableName, const Timestamp &timeRange,
165 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
166 {
167 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
168 if (store_ == nullptr) {
169 return -E_INVALID_DB;
170 }
171 if (!transactionExeFlag_.load()) {
172 LOGE("the transaction has not been started");
173 return -E_TRANSACT_STATE;
174 }
175 TableSchema tableSchema;
176 int errCode = store_->GetCloudTableSchema(tableName, tableSchema);
177 if (errCode != E_OK) {
178 return errCode;
179 }
180 return store_->GetCloudData(tableSchema, timeRange, continueStmtToken, cloudDataResult);
181 }
182
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult) const183 int StorageProxy::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) const
184 {
185 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
186 if (store_ == nullptr) {
187 return -E_INVALID_DB;
188 }
189 if (!transactionExeFlag_.load()) {
190 LOGE("the transaction has not been started");
191 return -E_TRANSACT_STATE;
192 }
193 return store_->GetCloudDataNext(continueStmtToken, cloudDataResult);
194 }
195
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)196 int StorageProxy::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
197 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
198 {
199 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
200 if (store_ == nullptr) {
201 return -E_INVALID_DB;
202 }
203 if (!transactionExeFlag_.load()) {
204 LOGE("the transaction has not been started");
205 return -E_TRANSACT_STATE;
206 }
207
208 int errCode = store_->GetInfoByPrimaryKeyOrGid(tableName, vBucket, dataInfoWithLog, assetInfo);
209 if (errCode == E_OK) {
210 dataInfoWithLog.logInfo.timestamp = EraseNanoTime(dataInfoWithLog.logInfo.timestamp);
211 dataInfoWithLog.logInfo.wTimestamp = EraseNanoTime(dataInfoWithLog.logInfo.wTimestamp);
212 }
213 return errCode;
214 }
215
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)216 int StorageProxy::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
217 {
218 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
219 if (store_ == nullptr) {
220 return -E_INVALID_DB;
221 }
222 if (!transactionExeFlag_.load()) {
223 LOGE("the transaction has not been started");
224 return -E_TRANSACT_STATE;
225 }
226
227 return store_->PutCloudSyncData(tableName, downloadData);
228 }
229
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)230 int StorageProxy::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
231 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
232 {
233 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
234 if (store_ == nullptr) {
235 return -E_INVALID_DB;
236 }
237 if (!transactionExeFlag_.load()) {
238 LOGE("the transaction has not been started");
239 return -E_TRANSACT_STATE;
240 }
241 return store_->CleanCloudData(mode, tableNameList, localSchema, assets);
242 }
243
ReleaseContinueToken(ContinueToken & continueStmtToken)244 int StorageProxy::ReleaseContinueToken(ContinueToken &continueStmtToken)
245 {
246 return store_->ReleaseCloudDataToken(continueStmtToken);
247 }
248
CheckSchema(const TableName & tableName) const249 int StorageProxy::CheckSchema(const TableName &tableName) const
250 {
251 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
252 if (store_ == nullptr) {
253 return -E_INVALID_DB;
254 }
255 return store_->ChkSchema(tableName);
256 }
257
CheckSchema(std::vector<std::string> & tables)258 int StorageProxy::CheckSchema(std::vector<std::string> &tables)
259 {
260 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
261 if (store_ == nullptr) {
262 return -E_INVALID_DB;
263 }
264 if (tables.empty()) {
265 return -E_INVALID_ARGS;
266 }
267 for (const auto &table : tables) {
268 int ret = store_->ChkSchema(table);
269 if (ret != E_OK) {
270 return ret;
271 }
272 }
273 return E_OK;
274 }
275
GetPrimaryColNamesWithAssetsFields(const TableName & tableName,std::vector<std::string> & colNames,std::vector<Field> & assetFields)276 int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector<std::string> &colNames,
277 std::vector<Field> &assetFields)
278 {
279 if (!colNames.empty()) {
280 // output parameter should be empty
281 return -E_INVALID_ARGS;
282 }
283
284 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
285 if (store_ == nullptr) {
286 return -E_INVALID_DB;
287 }
288 // GetTableInfo
289 TableSchema tableSchema;
290 int ret = store_->GetCloudTableSchema(tableName, tableSchema);
291 if (ret != E_OK) {
292 LOGE("Cannot get cloud table schema: %d", ret);
293 return ret;
294 }
295 for (const auto &field : tableSchema.fields) {
296 if (field.primary) {
297 colNames.push_back(field.colName);
298 }
299 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
300 assetFields.push_back(field);
301 }
302 }
303 if (colNames.empty() || colNames.size() > 1) {
304 (void)colNames.insert(colNames.begin(), CloudDbConstant::ROW_ID_FIELD_NAME);
305 }
306 return E_OK;
307 }
308
NotifyChangedData(const std::string deviceName,ChangedData && changedData)309 int StorageProxy::NotifyChangedData(const std::string deviceName, ChangedData &&changedData)
310 {
311 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
312 if (store_ == nullptr) {
313 return -E_INVALID_DB;
314 }
315 store_->TriggerObserverAction(deviceName, std::move(changedData), true);
316 return E_OK;
317 }
318
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)319 int StorageProxy::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
320 {
321 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
322 if (store_ == nullptr) {
323 return -E_INVALID_DB;
324 }
325 if (!transactionExeFlag_.load() || !isWrite_.load()) {
326 LOGE("the write transaction has not started before fill download assets");
327 return -E_TRANSACT_STATE;
328 }
329 return store_->FillCloudAssetForDownload(tableName, asset, isDownloadSuccess);
330 }
331
SetLogTriggerStatus(bool status)332 int StorageProxy::SetLogTriggerStatus(bool status)
333 {
334 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
335 if (store_ == nullptr) {
336 return -E_INVALID_DB;
337 }
338 return store_->SetLogTriggerStatus(status);
339 }
340
FillCloudGidAndAsset(OpType opType,const CloudSyncData & data)341 int StorageProxy::FillCloudGidAndAsset(OpType opType, const CloudSyncData &data)
342 {
343 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
344 if (store_ == nullptr) {
345 return -E_INVALID_DB;
346 }
347 if (!transactionExeFlag_.load()) {
348 LOGE("the transaction has not been started");
349 return -E_TRANSACT_STATE;
350 }
351 return store_->FillCloudGidAndAsset(opType, data);
352 }
353
GetIdentify() const354 std::string StorageProxy::GetIdentify() const
355 {
356 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
357 if (store_ == nullptr) {
358 LOGW("[StorageProxy] store is nullptr return default");
359 return "";
360 }
361 return store_->GetIdentify();
362 }
363
EraseNanoTime(DistributedDB::Timestamp localTime)364 Timestamp StorageProxy::EraseNanoTime(DistributedDB::Timestamp localTime)
365 {
366 return localTime / CloudDbConstant::TEN_THOUSAND * CloudDbConstant::TEN_THOUSAND;
367 }
368
CleanWaterMark(const DistributedDB::TableName & tableName)369 int StorageProxy::CleanWaterMark(const DistributedDB::TableName &tableName)
370 {
371 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
372 if (cloudMetaData_ == nullptr) {
373 LOGW("[StorageProxy] meta is nullptr return default");
374 return -E_INVALID_DB;
375 }
376 return cloudMetaData_->CleanWaterMark(tableName);
377 }
378 }
379