• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "storage_proxy.h"
17 
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/schema_mgr.h"
20 #include "db_common.h"
21 #include "store_types.h"
22 
23 namespace DistributedDB {
StorageProxy(ICloudSyncStorageInterface * iCloud)24 StorageProxy::StorageProxy(ICloudSyncStorageInterface *iCloud)
25     :store_(iCloud),
26     transactionExeFlag_(false),
27     isWrite_(false)
28 {
29 }
30 
GetCloudDb(ICloudSyncStorageInterface * iCloud)31 std::shared_ptr<StorageProxy> StorageProxy::GetCloudDb(ICloudSyncStorageInterface *iCloud)
32 {
33     std::shared_ptr<StorageProxy> proxy = std::make_shared<StorageProxy>(iCloud);
34     proxy->Init();
35     return proxy;
36 }
37 
Init()38 void StorageProxy::Init()
39 {
40     cloudMetaData_ = std::make_shared<CloudMetaData>(store_);
41 }
42 
Close()43 int StorageProxy::Close()
44 {
45     std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
46     if (transactionExeFlag_.load()) {
47         LOGE("the transaction has been started, storage proxy can not closed");
48         return -E_BUSY;
49     }
50     store_ = nullptr;
51     cloudMetaData_ = nullptr;
52     return E_OK;
53 }
54 
GetLocalWaterMark(const std::string & tableName,Timestamp & localMark)55 int StorageProxy::GetLocalWaterMark(const std::string &tableName, Timestamp &localMark)
56 {
57     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
58     if (cloudMetaData_ == nullptr) {
59         return -E_INVALID_DB;
60     }
61     if (transactionExeFlag_.load() && isWrite_.load()) {
62         LOGE("the write transaction has been started, can not get meta");
63         return -E_BUSY;
64     }
65     return cloudMetaData_->GetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
66 }
67 
GetLocalWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)68 int StorageProxy::GetLocalWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
69 {
70     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
71     if (cloudMetaData_ == nullptr) {
72         return -E_INVALID_DB;
73     }
74     if (transactionExeFlag_.load() && isWrite_.load()) {
75         LOGE("the write transaction has been started, can not get meta");
76         return -E_BUSY;
77     }
78     return cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
79 }
80 
PutLocalWaterMark(const std::string & tableName,Timestamp & localMark)81 int StorageProxy::PutLocalWaterMark(const std::string &tableName, Timestamp &localMark)
82 {
83     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
84     if (cloudMetaData_ == nullptr) {
85         return -E_INVALID_DB;
86     }
87     if (transactionExeFlag_.load() && isWrite_.load()) {
88         LOGE("the write transaction has been started, can not put meta");
89         return -E_BUSY;
90     }
91     return cloudMetaData_->SetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
92 }
93 
PutWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)94 int StorageProxy::PutWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
95 {
96     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
97     if (cloudMetaData_ == nullptr) {
98         return -E_INVALID_DB;
99     }
100     if (transactionExeFlag_.load() && isWrite_.load()) {
101         LOGE("the write transaction has been started, can not put meta");
102         return -E_BUSY;
103     }
104     return cloudMetaData_->SetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
105 }
106 
GetCloudWaterMark(const std::string & tableName,std::string & cloudMark)107 int StorageProxy::GetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
108 {
109     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
110     if (cloudMetaData_ == nullptr) {
111         return -E_INVALID_DB;
112     }
113     return cloudMetaData_->GetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
114 }
115 
SetCloudWaterMark(const std::string & tableName,std::string & cloudMark)116 int StorageProxy::SetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
117 {
118     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
119     if (cloudMetaData_ == nullptr) {
120         return -E_INVALID_DB;
121     }
122     return cloudMetaData_->SetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
123 }
124 
StartTransaction(TransactType type)125 int StorageProxy::StartTransaction(TransactType type)
126 {
127     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
128     if (store_ == nullptr) {
129         return -E_INVALID_DB;
130     }
131     int errCode = store_->StartTransaction(type);
132     if (errCode == E_OK) {
133         transactionExeFlag_.store(true);
134         isWrite_.store(type == TransactType::IMMEDIATE);
135     }
136     return errCode;
137 }
138 
Commit()139 int StorageProxy::Commit()
140 {
141     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
142     if (store_ == nullptr) {
143         return -E_INVALID_DB;
144     }
145     int errCode = store_->Commit();
146     if (errCode == E_OK) {
147         transactionExeFlag_.store(false);
148     }
149     return errCode;
150 }
151 
Rollback()152 int StorageProxy::Rollback()
153 {
154     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
155     if (store_ == nullptr) {
156         return -E_INVALID_DB;
157     }
158     int errCode = store_->Rollback();
159     if (errCode == E_OK) {
160         transactionExeFlag_.store(false);
161     }
162     return errCode;
163 }
164 
GetUploadCount(const QuerySyncObject & query,const bool isCloudForcePush,bool isCompensatedTask,bool isUseWaterMark,int64_t & count)165 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const bool isCloudForcePush,
166     bool isCompensatedTask, bool isUseWaterMark, int64_t &count)
167 {
168     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
169     if (store_ == nullptr) {
170         return -E_INVALID_DB;
171     }
172     if (!transactionExeFlag_.load()) {
173         LOGE("the transaction has not been started");
174         return -E_TRANSACT_STATE;
175     }
176     std::vector<Timestamp> timeStampVec;
177     std::vector<CloudWaterType> waterTypeVec = DBCommon::GetWaterTypeVec();
178     for (size_t i = 0; i < waterTypeVec.size(); i++) {
179         Timestamp tmpMark = 0u;
180         if (isUseWaterMark) {
181             int errCode = cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(query.GetTableName()),
182                 waterTypeVec[i], tmpMark);
183             if (errCode != E_OK) {
184                 return errCode;
185             }
186         }
187         timeStampVec.push_back(tmpMark);
188     }
189     return store_->GetAllUploadCount(query, timeStampVec, isCloudForcePush, isCompensatedTask, count);
190 }
191 
GetUploadCount(const std::string & tableName,const Timestamp & localMark,const bool isCloudForcePush,int64_t & count)192 int StorageProxy::GetUploadCount(const std::string &tableName, const Timestamp &localMark,
193     const bool isCloudForcePush, int64_t &count)
194 {
195     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
196     if (store_ == nullptr) {
197         return -E_INVALID_DB;
198     }
199     if (!transactionExeFlag_.load()) {
200         LOGE("the transaction has not been started");
201         return -E_TRANSACT_STATE;
202     }
203     QuerySyncObject query;
204     query.SetTableName(tableName);
205     return store_->GetUploadCount(query, localMark, isCloudForcePush, false, count);
206 }
207 
GetUploadCount(const QuerySyncObject & query,const Timestamp & localMark,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)208 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const Timestamp &localMark,
209     bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
210 {
211     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
212     if (store_ == nullptr) {
213         return -E_INVALID_DB;
214     }
215     if (!transactionExeFlag_.load()) {
216         LOGE("the transaction has not been started");
217         return -E_TRANSACT_STATE;
218     }
219     return store_->GetUploadCount(query, localMark, isCloudForcePush, isCompensatedTask, count);
220 }
221 
GetCloudData(const std::string & tableName,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)222 int StorageProxy::GetCloudData(const std::string &tableName, const Timestamp &timeRange,
223     ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
224 {
225     QuerySyncObject querySyncObject;
226     querySyncObject.SetTableName(tableName);
227     return GetCloudData(querySyncObject, timeRange, continueStmtToken, cloudDataResult);
228 }
229 
GetCloudData(const QuerySyncObject & querySyncObject,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)230 int StorageProxy::GetCloudData(const QuerySyncObject &querySyncObject, const Timestamp &timeRange,
231     ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
232 {
233     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
234     if (store_ == nullptr) {
235         return -E_INVALID_DB;
236     }
237     if (!transactionExeFlag_.load()) {
238         LOGE("the transaction has not been started");
239         return -E_TRANSACT_STATE;
240     }
241     TableSchema tableSchema;
242     int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
243     if (errCode != E_OK) {
244         return errCode;
245     }
246     return store_->GetCloudData(tableSchema, querySyncObject, timeRange, continueStmtToken, cloudDataResult);
247 }
248 
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult) const249 int StorageProxy::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) const
250 {
251     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
252     if (store_ == nullptr) {
253         return -E_INVALID_DB;
254     }
255     if (!transactionExeFlag_.load()) {
256         LOGE("the transaction has not been started");
257         return -E_TRANSACT_STATE;
258     }
259     return store_->GetCloudDataNext(continueStmtToken, cloudDataResult);
260 }
261 
GetCloudGid(const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)262 int StorageProxy::GetCloudGid(const QuerySyncObject &querySyncObject, bool isCloudForcePush,
263     bool isCompensatedTask, std::vector<std::string> &cloudGid)
264 {
265     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
266     if (store_ == nullptr) {
267         return -E_INVALID_DB;
268     }
269     TableSchema tableSchema;
270     int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
271     if (errCode != E_OK) {
272         return errCode;
273     }
274     return store_->GetCloudGid(tableSchema, querySyncObject, isCloudForcePush, isCompensatedTask, cloudGid);
275 }
276 
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)277 int StorageProxy::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
278     DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
279 {
280     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
281     if (store_ == nullptr) {
282         return -E_INVALID_DB;
283     }
284     if (!transactionExeFlag_.load()) {
285         LOGE("the transaction has not been started");
286         return -E_TRANSACT_STATE;
287     }
288 
289     int errCode = store_->GetInfoByPrimaryKeyOrGid(tableName, vBucket, dataInfoWithLog, assetInfo);
290     if (errCode == E_OK) {
291         dataInfoWithLog.logInfo.timestamp = EraseNanoTime(dataInfoWithLog.logInfo.timestamp);
292         dataInfoWithLog.logInfo.wTimestamp = EraseNanoTime(dataInfoWithLog.logInfo.wTimestamp);
293     }
294     if ((dataInfoWithLog.logInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) != 0) {
295         assetInfo.clear();
296     }
297     return errCode;
298 }
299 
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)300 int StorageProxy::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
301 {
302     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
303     if (store_ == nullptr) {
304         return -E_INVALID_DB;
305     }
306     if (!transactionExeFlag_.load()) {
307         LOGE("the transaction has not been started");
308         return -E_TRANSACT_STATE;
309     }
310     downloadData.user = user_;
311     return store_->PutCloudSyncData(tableName, downloadData);
312 }
313 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)314 int StorageProxy::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
315     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
316 {
317     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
318     if (store_ == nullptr) {
319         return -E_INVALID_DB;
320     }
321     if (!transactionExeFlag_.load()) {
322         LOGE("the transaction has not been started");
323         return -E_TRANSACT_STATE;
324     }
325     return store_->CleanCloudData(mode, tableNameList, localSchema, assets);
326 }
327 
ReleaseContinueToken(ContinueToken & continueStmtToken)328 int StorageProxy::ReleaseContinueToken(ContinueToken &continueStmtToken)
329 {
330     return store_->ReleaseCloudDataToken(continueStmtToken);
331 }
332 
CheckSchema(const TableName & tableName) const333 int StorageProxy::CheckSchema(const TableName &tableName) const
334 {
335     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
336     if (store_ == nullptr) {
337         return -E_INVALID_DB;
338     }
339     return store_->ChkSchema(tableName);
340 }
341 
CheckSchema(std::vector<std::string> & tables)342 int StorageProxy::CheckSchema(std::vector<std::string> &tables)
343 {
344     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
345     if (store_ == nullptr) {
346         return -E_INVALID_DB;
347     }
348     if (tables.empty()) {
349         return -E_INVALID_ARGS;
350     }
351     for (const auto &table : tables) {
352         int ret = store_->ChkSchema(table);
353         if (ret != E_OK) {
354             return ret;
355         }
356     }
357     return E_OK;
358 }
359 
GetPrimaryColNamesWithAssetsFields(const TableName & tableName,std::vector<std::string> & colNames,std::vector<Field> & assetFields)360 int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector<std::string> &colNames,
361     std::vector<Field> &assetFields)
362 {
363     if (!colNames.empty()) {
364         // output parameter should be empty
365         return -E_INVALID_ARGS;
366     }
367 
368     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
369     if (store_ == nullptr) {
370         return -E_INVALID_DB;
371     }
372     // GetTableInfo
373     TableSchema tableSchema;
374     int ret = store_->GetCloudTableSchema(tableName, tableSchema);
375     if (ret != E_OK) {
376         LOGE("Cannot get cloud table schema: %d", ret);
377         return ret;
378     }
379     for (const auto &field : tableSchema.fields) {
380         if (field.primary) {
381             colNames.push_back(field.colName);
382         }
383         if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
384             assetFields.push_back(field);
385         }
386     }
387     if (colNames.empty() || colNames.size() > 1) {
388         (void)colNames.insert(colNames.begin(), CloudDbConstant::ROW_ID_FIELD_NAME);
389     }
390     return E_OK;
391 }
392 
NotifyChangedData(const std::string & deviceName,ChangedData && changedData)393 int StorageProxy::NotifyChangedData(const std::string &deviceName, ChangedData &&changedData)
394 {
395     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
396     if (store_ == nullptr) {
397         return -E_INVALID_DB;
398     }
399     ChangeProperties changeProperties;
400     store_->GetAndResetServerObserverData(changedData.tableName, changeProperties);
401     changedData.properties = changeProperties;
402     store_->TriggerObserverAction(deviceName, std::move(changedData), true);
403     return E_OK;
404 }
405 
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)406 int StorageProxy::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
407 {
408     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
409     if (store_ == nullptr) {
410         return -E_INVALID_DB;
411     }
412     if (!transactionExeFlag_.load() || !isWrite_.load()) {
413         LOGE("the write transaction has not started before fill download assets");
414         return -E_TRANSACT_STATE;
415     }
416     return store_->FillCloudAssetForDownload(tableName, asset, isDownloadSuccess);
417 }
418 
SetLogTriggerStatus(bool status)419 int StorageProxy::SetLogTriggerStatus(bool status)
420 {
421     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
422     if (store_ == nullptr) {
423         return -E_INVALID_DB;
424     }
425     return store_->SetLogTriggerStatus(status);
426 }
427 
FillCloudLogAndAsset(OpType opType,const CloudSyncData & data)428 int StorageProxy::FillCloudLogAndAsset(OpType opType, const CloudSyncData &data)
429 {
430     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
431     if (store_ == nullptr) {
432         return -E_INVALID_DB;
433     }
434     if (!transactionExeFlag_.load()) {
435         LOGE("the transaction has not been started");
436         return -E_TRANSACT_STATE;
437     }
438     return store_->FillCloudLogAndAsset(opType, data, true, false);
439 }
440 
GetIdentify() const441 std::string StorageProxy::GetIdentify() const
442 {
443     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
444     if (store_ == nullptr) {
445         LOGW("[StorageProxy] store is nullptr return default");
446         return "";
447     }
448     return store_->GetIdentify();
449 }
450 
EraseNanoTime(DistributedDB::Timestamp localTime)451 Timestamp StorageProxy::EraseNanoTime(DistributedDB::Timestamp localTime)
452 {
453     return localTime / CloudDbConstant::TEN_THOUSAND * CloudDbConstant::TEN_THOUSAND;
454 }
455 
CleanWaterMark(const DistributedDB::TableName & tableName)456 int StorageProxy::CleanWaterMark(const DistributedDB::TableName &tableName)
457 {
458     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
459     if (cloudMetaData_ == nullptr) {
460         LOGW("[StorageProxy] meta is nullptr return default");
461         return -E_INVALID_DB;
462     }
463     return cloudMetaData_->CleanWaterMark(AppendWithUserIfNeed(tableName));
464 }
465 
CleanWaterMarkInMemory(const DistributedDB::TableName & tableName)466 int StorageProxy::CleanWaterMarkInMemory(const DistributedDB::TableName &tableName)
467 {
468     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
469     if (cloudMetaData_ == nullptr) {
470         LOGW("[StorageProxy] CleanWaterMarkInMemory is nullptr return default");
471         return -E_INVALID_DB;
472     }
473     cloudMetaData_->CleanWaterMarkInMemory(AppendWithUserIfNeed(tableName));
474     return E_OK;
475 }
476 
SetUser(const std::string & user)477 void StorageProxy::SetUser(const std::string &user)
478 {
479     user_ = user;
480     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
481     if (store_ != nullptr) {
482         store_->SetUser(user);
483     }
484 }
485 
CreateTempSyncTrigger(const std::string & tableName)486 int StorageProxy::CreateTempSyncTrigger(const std::string &tableName)
487 {
488     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
489     if (store_ == nullptr) {
490         return -E_INVALID_DB;
491     }
492     return store_->CreateTempSyncTrigger(tableName);
493 }
494 
ClearAllTempSyncTrigger()495 int StorageProxy::ClearAllTempSyncTrigger()
496 {
497     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
498     if (store_ == nullptr) {
499         return -E_INVALID_DB;
500     }
501     // Clean up all temporary triggers
502     return store_->ClearAllTempSyncTrigger();
503 }
504 
IsSharedTable(const std::string & tableName,bool & IsSharedTable)505 int StorageProxy::IsSharedTable(const std::string &tableName, bool &IsSharedTable)
506 {
507     std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
508     if (store_ == nullptr) {
509         return -E_INVALID_DB;
510     }
511     IsSharedTable = store_->IsSharedTable(tableName);
512     return E_OK;
513 }
514 
FillCloudGidIfSuccess(const OpType opType,const CloudSyncData & data)515 void StorageProxy::FillCloudGidIfSuccess(const OpType opType, const CloudSyncData &data)
516 {
517     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
518     if (store_ == nullptr) {
519         LOGW("[StorageProxy] fill gid failed with store invalid");
520         return;
521     }
522     int errCode = store_->FillCloudLogAndAsset(opType, data, true, true);
523     if (errCode != E_OK) {
524         LOGW("[StorageProxy] fill gid failed %d", errCode);
525     }
526 }
527 
SetCloudTaskConfig(const CloudTaskConfig & config)528 void StorageProxy::SetCloudTaskConfig(const CloudTaskConfig &config)
529 {
530     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
531     if (store_ == nullptr) {
532         LOGW("[StorageProxy] fill gid failed with store invalid");
533         return;
534     }
535     store_->SetCloudTaskConfig(config);
536 }
537 
GetAssetsByGidOrHashKey(const std::string & tableName,const std::string & gid,const Bytes & hashKey,VBucket & assets)538 std::pair<int, uint32_t> StorageProxy::GetAssetsByGidOrHashKey(const std::string &tableName, const std::string &gid,
539     const Bytes &hashKey, VBucket &assets)
540 {
541     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
542     if (store_ == nullptr) {
543         return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
544     }
545     TableSchema tableSchema;
546     int errCode = store_->GetCloudTableSchema(tableName, tableSchema);
547     if (errCode != E_OK) {
548         LOGE("get cloud table schema failed: %d", errCode);
549         return { errCode, static_cast<uint32_t>(LockStatus::UNLOCK) };
550     }
551     return store_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
552 }
553 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)554 int StorageProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
555 {
556     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
557     if (store_ == nullptr) {
558         return -E_INVALID_DB;
559     }
560     return store_->SetIAssetLoader(loader);
561 }
562 
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)563 int StorageProxy::UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo)
564 {
565     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
566     if (store_ == nullptr) {
567         return -E_INVALID_DB;
568     }
569     return store_->UpdateRecordFlag(tableName, recordConflict, logInfo);
570 }
571 
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery)572 int StorageProxy::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery)
573 {
574     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
575     if (store_ == nullptr) {
576         return -E_INVALID_DB;
577     }
578     return store_->GetCompensatedSyncQuery(syncQuery);
579 }
580 
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)581 int StorageProxy::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
582     const std::set<std::string> &gidFilters)
583 {
584     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
585     if (store_ == nullptr) {
586         return -E_INVALID_DB;
587     }
588     return store_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
589 }
590 
OnSyncFinish()591 void StorageProxy::OnSyncFinish()
592 {
593     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
594     if (store_ == nullptr) {
595         return;
596     }
597     store_->SyncFinishHook();
598 }
599 
OnUploadStart()600 void StorageProxy::OnUploadStart()
601 {
602     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
603     if (store_ == nullptr) {
604         return;
605     }
606     store_->DoUploadHook();
607 }
608 
CleanAllWaterMark()609 void StorageProxy::CleanAllWaterMark()
610 {
611     cloudMetaData_->CleanAllWaterMark();
612 }
613 
AppendWithUserIfNeed(const std::string & source) const614 std::string StorageProxy::AppendWithUserIfNeed(const std::string &source) const
615 {
616     if (user_.empty()) {
617         return source;
618     }
619     return source + "_" + user_;
620 }
621 
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)622 int StorageProxy::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
623 {
624     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
625     if (store_ == nullptr) {
626         return -E_INVALID_DB;
627     }
628     return store_->GetCloudDbSchema(cloudSchema);
629 }
630 
GetLocalCloudVersion()631 std::pair<int, CloudSyncData> StorageProxy::GetLocalCloudVersion()
632 {
633     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
634     if (store_ == nullptr) {
635         return {-E_INTERNAL_ERROR, {}};
636     }
637     return store_->GetLocalCloudVersion();
638 }
639 
GetCloudSyncConfig() const640 CloudSyncConfig StorageProxy::GetCloudSyncConfig() const
641 {
642     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
643     if (store_ == nullptr) {
644         return {};
645     }
646     return store_->GetCloudSyncConfig();
647 }
648 
IsTableExistReference(const std::string & table)649 bool StorageProxy::IsTableExistReference(const std::string &table)
650 {
651     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
652     if (store_ == nullptr) {
653         return false;
654     }
655     return store_->IsTableExistReference(table);
656 }
657 
IsTableExistReferenceOrReferenceBy(const std::string & table)658 bool StorageProxy::IsTableExistReferenceOrReferenceBy(const std::string &table)
659 {
660     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
661     if (store_ == nullptr) {
662         return false;
663     }
664     return store_->IsTableExistReferenceOrReferenceBy(table);
665 }
666 
ReleaseUploadRecord(const std::string & table,const CloudWaterType & type,Timestamp localWaterMark)667 void StorageProxy::ReleaseUploadRecord(const std::string &table, const CloudWaterType &type, Timestamp localWaterMark)
668 {
669     std::shared_lock<std::shared_mutex> readLock(storeMutex_);
670     if (store_ == nullptr) {
671         return;
672     }
673     store_->ReleaseUploadRecord(table, type, localWaterMark);
674 }
675 
IsTagCloudUpdateLocal(const LogInfo & localInfo,const LogInfo & cloudInfo,SingleVerConflictResolvePolicy policy)676 bool StorageProxy::IsTagCloudUpdateLocal(const LogInfo &localInfo, const LogInfo &cloudInfo,
677     SingleVerConflictResolvePolicy policy)
678 {
679     if (store_ == nullptr) {
680         return false;
681     }
682     return store_->IsTagCloudUpdateLocal(localInfo, cloudInfo, policy);
683 }
684 }
685