1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "storage_proxy.h"
17
18 #include "cloud/cloud_storage_utils.h"
19 #include "cloud/schema_mgr.h"
20 #include "db_common.h"
21 #include "store_types.h"
22
23 namespace DistributedDB {
StorageProxy(ICloudSyncStorageInterface * iCloud)24 StorageProxy::StorageProxy(ICloudSyncStorageInterface *iCloud)
25 :store_(iCloud),
26 transactionExeFlag_(false),
27 isWrite_(false)
28 {
29 }
30
GetCloudDb(ICloudSyncStorageInterface * iCloud)31 std::shared_ptr<StorageProxy> StorageProxy::GetCloudDb(ICloudSyncStorageInterface *iCloud)
32 {
33 std::shared_ptr<StorageProxy> proxy = std::make_shared<StorageProxy>(iCloud);
34 proxy->Init();
35 return proxy;
36 }
37
Init()38 void StorageProxy::Init()
39 {
40 cloudMetaData_ = std::make_shared<CloudMetaData>(store_);
41 }
42
Close()43 int StorageProxy::Close()
44 {
45 std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
46 if (transactionExeFlag_.load()) {
47 LOGE("the transaction has been started, storage proxy can not closed");
48 return -E_BUSY;
49 }
50 store_ = nullptr;
51 cloudMetaData_ = nullptr;
52 return E_OK;
53 }
54
GetLocalWaterMark(const std::string & tableName,Timestamp & localMark)55 int StorageProxy::GetLocalWaterMark(const std::string &tableName, Timestamp &localMark)
56 {
57 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
58 if (cloudMetaData_ == nullptr) {
59 return -E_INVALID_DB;
60 }
61 if (transactionExeFlag_.load() && isWrite_.load()) {
62 LOGE("the write transaction has been started, can not get meta");
63 return -E_BUSY;
64 }
65 return cloudMetaData_->GetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
66 }
67
GetLocalWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)68 int StorageProxy::GetLocalWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
69 {
70 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
71 if (cloudMetaData_ == nullptr) {
72 return -E_INVALID_DB;
73 }
74 if (transactionExeFlag_.load() && isWrite_.load()) {
75 LOGE("the write transaction has been started, can not get meta");
76 return -E_BUSY;
77 }
78 return cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
79 }
80
PutLocalWaterMark(const std::string & tableName,Timestamp & localMark)81 int StorageProxy::PutLocalWaterMark(const std::string &tableName, Timestamp &localMark)
82 {
83 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
84 if (cloudMetaData_ == nullptr) {
85 return -E_INVALID_DB;
86 }
87 if (transactionExeFlag_.load() && isWrite_.load()) {
88 LOGE("the write transaction has been started, can not put meta");
89 return -E_BUSY;
90 }
91 return cloudMetaData_->SetLocalWaterMark(AppendWithUserIfNeed(tableName), localMark);
92 }
93
PutWaterMarkByMode(const std::string & tableName,CloudWaterType mode,Timestamp & localMark)94 int StorageProxy::PutWaterMarkByMode(const std::string &tableName, CloudWaterType mode, Timestamp &localMark)
95 {
96 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
97 if (cloudMetaData_ == nullptr) {
98 return -E_INVALID_DB;
99 }
100 if (transactionExeFlag_.load() && isWrite_.load()) {
101 LOGE("the write transaction has been started, can not put meta");
102 return -E_BUSY;
103 }
104 return cloudMetaData_->SetLocalWaterMarkByType(AppendWithUserIfNeed(tableName), mode, localMark);
105 }
106
GetCloudWaterMark(const std::string & tableName,std::string & cloudMark)107 int StorageProxy::GetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
108 {
109 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
110 if (cloudMetaData_ == nullptr) {
111 return -E_INVALID_DB;
112 }
113 return cloudMetaData_->GetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
114 }
115
SetCloudWaterMark(const std::string & tableName,std::string & cloudMark)116 int StorageProxy::SetCloudWaterMark(const std::string &tableName, std::string &cloudMark)
117 {
118 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
119 if (cloudMetaData_ == nullptr) {
120 return -E_INVALID_DB;
121 }
122 return cloudMetaData_->SetCloudWaterMark(AppendWithUserIfNeed(tableName), cloudMark);
123 }
124
StartTransaction(TransactType type)125 int StorageProxy::StartTransaction(TransactType type)
126 {
127 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
128 if (store_ == nullptr) {
129 return -E_INVALID_DB;
130 }
131 int errCode = store_->StartTransaction(type);
132 if (errCode == E_OK) {
133 transactionExeFlag_.store(true);
134 isWrite_.store(type == TransactType::IMMEDIATE);
135 }
136 return errCode;
137 }
138
Commit()139 int StorageProxy::Commit()
140 {
141 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
142 if (store_ == nullptr) {
143 return -E_INVALID_DB;
144 }
145 int errCode = store_->Commit();
146 if (errCode == E_OK) {
147 transactionExeFlag_.store(false);
148 }
149 return errCode;
150 }
151
Rollback()152 int StorageProxy::Rollback()
153 {
154 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
155 if (store_ == nullptr) {
156 return -E_INVALID_DB;
157 }
158 int errCode = store_->Rollback();
159 if (errCode == E_OK) {
160 transactionExeFlag_.store(false);
161 }
162 return errCode;
163 }
164
GetUploadCount(const QuerySyncObject & query,const bool isCloudForcePush,bool isCompensatedTask,bool isUseWaterMark,int64_t & count)165 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const bool isCloudForcePush,
166 bool isCompensatedTask, bool isUseWaterMark, int64_t &count)
167 {
168 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
169 if (store_ == nullptr) {
170 return -E_INVALID_DB;
171 }
172 if (!transactionExeFlag_.load()) {
173 LOGE("the transaction has not been started");
174 return -E_TRANSACT_STATE;
175 }
176 std::vector<Timestamp> timeStampVec;
177 std::vector<CloudWaterType> waterTypeVec = DBCommon::GetWaterTypeVec();
178 for (size_t i = 0; i < waterTypeVec.size(); i++) {
179 Timestamp tmpMark = 0u;
180 if (isUseWaterMark) {
181 int errCode = cloudMetaData_->GetLocalWaterMarkByType(AppendWithUserIfNeed(query.GetTableName()),
182 waterTypeVec[i], tmpMark);
183 if (errCode != E_OK) {
184 return errCode;
185 }
186 }
187 timeStampVec.push_back(tmpMark);
188 }
189 return store_->GetAllUploadCount(query, timeStampVec, isCloudForcePush, isCompensatedTask, count);
190 }
191
GetUploadCount(const std::string & tableName,const Timestamp & localMark,const bool isCloudForcePush,int64_t & count)192 int StorageProxy::GetUploadCount(const std::string &tableName, const Timestamp &localMark,
193 const bool isCloudForcePush, int64_t &count)
194 {
195 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
196 if (store_ == nullptr) {
197 return -E_INVALID_DB;
198 }
199 if (!transactionExeFlag_.load()) {
200 LOGE("the transaction has not been started");
201 return -E_TRANSACT_STATE;
202 }
203 QuerySyncObject query;
204 query.SetTableName(tableName);
205 return store_->GetUploadCount(query, localMark, isCloudForcePush, false, count);
206 }
207
GetUploadCount(const QuerySyncObject & query,const Timestamp & localMark,bool isCloudForcePush,bool isCompensatedTask,int64_t & count)208 int StorageProxy::GetUploadCount(const QuerySyncObject &query, const Timestamp &localMark,
209 bool isCloudForcePush, bool isCompensatedTask, int64_t &count)
210 {
211 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
212 if (store_ == nullptr) {
213 return -E_INVALID_DB;
214 }
215 if (!transactionExeFlag_.load()) {
216 LOGE("the transaction has not been started");
217 return -E_TRANSACT_STATE;
218 }
219 return store_->GetUploadCount(query, localMark, isCloudForcePush, isCompensatedTask, count);
220 }
221
GetCloudData(const std::string & tableName,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)222 int StorageProxy::GetCloudData(const std::string &tableName, const Timestamp &timeRange,
223 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
224 {
225 QuerySyncObject querySyncObject;
226 querySyncObject.SetTableName(tableName);
227 return GetCloudData(querySyncObject, timeRange, continueStmtToken, cloudDataResult);
228 }
229
GetCloudData(const QuerySyncObject & querySyncObject,const Timestamp & timeRange,ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult)230 int StorageProxy::GetCloudData(const QuerySyncObject &querySyncObject, const Timestamp &timeRange,
231 ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult)
232 {
233 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
234 if (store_ == nullptr) {
235 return -E_INVALID_DB;
236 }
237 if (!transactionExeFlag_.load()) {
238 LOGE("the transaction has not been started");
239 return -E_TRANSACT_STATE;
240 }
241 TableSchema tableSchema;
242 int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
243 if (errCode != E_OK) {
244 return errCode;
245 }
246 return store_->GetCloudData(tableSchema, querySyncObject, timeRange, continueStmtToken, cloudDataResult);
247 }
248
GetCloudDataNext(ContinueToken & continueStmtToken,CloudSyncData & cloudDataResult) const249 int StorageProxy::GetCloudDataNext(ContinueToken &continueStmtToken, CloudSyncData &cloudDataResult) const
250 {
251 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
252 if (store_ == nullptr) {
253 return -E_INVALID_DB;
254 }
255 if (!transactionExeFlag_.load()) {
256 LOGE("the transaction has not been started");
257 return -E_TRANSACT_STATE;
258 }
259 return store_->GetCloudDataNext(continueStmtToken, cloudDataResult);
260 }
261
GetCloudGid(const QuerySyncObject & querySyncObject,bool isCloudForcePush,bool isCompensatedTask,std::vector<std::string> & cloudGid)262 int StorageProxy::GetCloudGid(const QuerySyncObject &querySyncObject, bool isCloudForcePush,
263 bool isCompensatedTask, std::vector<std::string> &cloudGid)
264 {
265 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
266 if (store_ == nullptr) {
267 return -E_INVALID_DB;
268 }
269 TableSchema tableSchema;
270 int errCode = store_->GetCloudTableSchema(querySyncObject.GetRelationTableName(), tableSchema);
271 if (errCode != E_OK) {
272 return errCode;
273 }
274 return store_->GetCloudGid(tableSchema, querySyncObject, isCloudForcePush, isCompensatedTask, cloudGid);
275 }
276
GetInfoByPrimaryKeyOrGid(const std::string & tableName,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)277 int StorageProxy::GetInfoByPrimaryKeyOrGid(const std::string &tableName, const VBucket &vBucket,
278 DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
279 {
280 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
281 if (store_ == nullptr) {
282 return -E_INVALID_DB;
283 }
284 if (!transactionExeFlag_.load()) {
285 LOGE("the transaction has not been started");
286 return -E_TRANSACT_STATE;
287 }
288
289 int errCode = store_->GetInfoByPrimaryKeyOrGid(tableName, vBucket, dataInfoWithLog, assetInfo);
290 if (errCode == E_OK) {
291 dataInfoWithLog.logInfo.timestamp = EraseNanoTime(dataInfoWithLog.logInfo.timestamp);
292 dataInfoWithLog.logInfo.wTimestamp = EraseNanoTime(dataInfoWithLog.logInfo.wTimestamp);
293 }
294 if ((dataInfoWithLog.logInfo.flag & static_cast<uint64_t>(LogInfoFlag::FLAG_LOGIC_DELETE)) != 0) {
295 assetInfo.clear();
296 }
297 return errCode;
298 }
299
PutCloudSyncData(const std::string & tableName,DownloadData & downloadData)300 int StorageProxy::PutCloudSyncData(const std::string &tableName, DownloadData &downloadData)
301 {
302 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
303 if (store_ == nullptr) {
304 return -E_INVALID_DB;
305 }
306 if (!transactionExeFlag_.load()) {
307 LOGE("the transaction has not been started");
308 return -E_TRANSACT_STATE;
309 }
310 downloadData.user = user_;
311 return store_->PutCloudSyncData(tableName, downloadData);
312 }
313
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)314 int StorageProxy::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
315 const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
316 {
317 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
318 if (store_ == nullptr) {
319 return -E_INVALID_DB;
320 }
321 if (!transactionExeFlag_.load()) {
322 LOGE("the transaction has not been started");
323 return -E_TRANSACT_STATE;
324 }
325 return store_->CleanCloudData(mode, tableNameList, localSchema, assets);
326 }
327
ReleaseContinueToken(ContinueToken & continueStmtToken)328 int StorageProxy::ReleaseContinueToken(ContinueToken &continueStmtToken)
329 {
330 return store_->ReleaseCloudDataToken(continueStmtToken);
331 }
332
CheckSchema(const TableName & tableName) const333 int StorageProxy::CheckSchema(const TableName &tableName) const
334 {
335 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
336 if (store_ == nullptr) {
337 return -E_INVALID_DB;
338 }
339 return store_->ChkSchema(tableName);
340 }
341
CheckSchema(std::vector<std::string> & tables)342 int StorageProxy::CheckSchema(std::vector<std::string> &tables)
343 {
344 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
345 if (store_ == nullptr) {
346 return -E_INVALID_DB;
347 }
348 if (tables.empty()) {
349 return -E_INVALID_ARGS;
350 }
351 for (const auto &table : tables) {
352 int ret = store_->ChkSchema(table);
353 if (ret != E_OK) {
354 return ret;
355 }
356 }
357 return E_OK;
358 }
359
GetPrimaryColNamesWithAssetsFields(const TableName & tableName,std::vector<std::string> & colNames,std::vector<Field> & assetFields)360 int StorageProxy::GetPrimaryColNamesWithAssetsFields(const TableName &tableName, std::vector<std::string> &colNames,
361 std::vector<Field> &assetFields)
362 {
363 if (!colNames.empty()) {
364 // output parameter should be empty
365 return -E_INVALID_ARGS;
366 }
367
368 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
369 if (store_ == nullptr) {
370 return -E_INVALID_DB;
371 }
372 // GetTableInfo
373 TableSchema tableSchema;
374 int ret = store_->GetCloudTableSchema(tableName, tableSchema);
375 if (ret != E_OK) {
376 LOGE("Cannot get cloud table schema: %d", ret);
377 return ret;
378 }
379 for (const auto &field : tableSchema.fields) {
380 if (field.primary) {
381 colNames.push_back(field.colName);
382 }
383 if (field.type == TYPE_INDEX<Asset> || field.type == TYPE_INDEX<Assets>) {
384 assetFields.push_back(field);
385 }
386 }
387 if (colNames.empty() || colNames.size() > 1) {
388 (void)colNames.insert(colNames.begin(), CloudDbConstant::ROW_ID_FIELD_NAME);
389 }
390 return E_OK;
391 }
392
NotifyChangedData(const std::string & deviceName,ChangedData && changedData)393 int StorageProxy::NotifyChangedData(const std::string &deviceName, ChangedData &&changedData)
394 {
395 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
396 if (store_ == nullptr) {
397 return -E_INVALID_DB;
398 }
399 ChangeProperties changeProperties;
400 store_->GetAndResetServerObserverData(changedData.tableName, changeProperties);
401 changedData.properties = changeProperties;
402 store_->TriggerObserverAction(deviceName, std::move(changedData), true);
403 return E_OK;
404 }
405
FillCloudAssetForDownload(const std::string & tableName,VBucket & asset,bool isDownloadSuccess)406 int StorageProxy::FillCloudAssetForDownload(const std::string &tableName, VBucket &asset, bool isDownloadSuccess)
407 {
408 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
409 if (store_ == nullptr) {
410 return -E_INVALID_DB;
411 }
412 if (!transactionExeFlag_.load() || !isWrite_.load()) {
413 LOGE("the write transaction has not started before fill download assets");
414 return -E_TRANSACT_STATE;
415 }
416 return store_->FillCloudAssetForDownload(tableName, asset, isDownloadSuccess);
417 }
418
SetLogTriggerStatus(bool status)419 int StorageProxy::SetLogTriggerStatus(bool status)
420 {
421 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
422 if (store_ == nullptr) {
423 return -E_INVALID_DB;
424 }
425 return store_->SetLogTriggerStatus(status);
426 }
427
FillCloudLogAndAsset(OpType opType,const CloudSyncData & data)428 int StorageProxy::FillCloudLogAndAsset(OpType opType, const CloudSyncData &data)
429 {
430 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
431 if (store_ == nullptr) {
432 return -E_INVALID_DB;
433 }
434 if (!transactionExeFlag_.load()) {
435 LOGE("the transaction has not been started");
436 return -E_TRANSACT_STATE;
437 }
438 return store_->FillCloudLogAndAsset(opType, data, true, false);
439 }
440
GetIdentify() const441 std::string StorageProxy::GetIdentify() const
442 {
443 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
444 if (store_ == nullptr) {
445 LOGW("[StorageProxy] store is nullptr return default");
446 return "";
447 }
448 return store_->GetIdentify();
449 }
450
EraseNanoTime(DistributedDB::Timestamp localTime)451 Timestamp StorageProxy::EraseNanoTime(DistributedDB::Timestamp localTime)
452 {
453 return localTime / CloudDbConstant::TEN_THOUSAND * CloudDbConstant::TEN_THOUSAND;
454 }
455
CleanWaterMark(const DistributedDB::TableName & tableName)456 int StorageProxy::CleanWaterMark(const DistributedDB::TableName &tableName)
457 {
458 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
459 if (cloudMetaData_ == nullptr) {
460 LOGW("[StorageProxy] meta is nullptr return default");
461 return -E_INVALID_DB;
462 }
463 return cloudMetaData_->CleanWaterMark(AppendWithUserIfNeed(tableName));
464 }
465
CleanWaterMarkInMemory(const DistributedDB::TableName & tableName)466 int StorageProxy::CleanWaterMarkInMemory(const DistributedDB::TableName &tableName)
467 {
468 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
469 if (cloudMetaData_ == nullptr) {
470 LOGW("[StorageProxy] CleanWaterMarkInMemory is nullptr return default");
471 return -E_INVALID_DB;
472 }
473 cloudMetaData_->CleanWaterMarkInMemory(AppendWithUserIfNeed(tableName));
474 return E_OK;
475 }
476
SetUser(const std::string & user)477 void StorageProxy::SetUser(const std::string &user)
478 {
479 user_ = user;
480 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
481 if (store_ != nullptr) {
482 store_->SetUser(user);
483 }
484 }
485
CreateTempSyncTrigger(const std::string & tableName)486 int StorageProxy::CreateTempSyncTrigger(const std::string &tableName)
487 {
488 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
489 if (store_ == nullptr) {
490 return -E_INVALID_DB;
491 }
492 return store_->CreateTempSyncTrigger(tableName);
493 }
494
ClearAllTempSyncTrigger()495 int StorageProxy::ClearAllTempSyncTrigger()
496 {
497 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
498 if (store_ == nullptr) {
499 return -E_INVALID_DB;
500 }
501 // Clean up all temporary triggers
502 return store_->ClearAllTempSyncTrigger();
503 }
504
IsSharedTable(const std::string & tableName,bool & IsSharedTable)505 int StorageProxy::IsSharedTable(const std::string &tableName, bool &IsSharedTable)
506 {
507 std::unique_lock<std::shared_mutex> writeLock(storeMutex_);
508 if (store_ == nullptr) {
509 return -E_INVALID_DB;
510 }
511 IsSharedTable = store_->IsSharedTable(tableName);
512 return E_OK;
513 }
514
FillCloudGidIfSuccess(const OpType opType,const CloudSyncData & data)515 void StorageProxy::FillCloudGidIfSuccess(const OpType opType, const CloudSyncData &data)
516 {
517 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
518 if (store_ == nullptr) {
519 LOGW("[StorageProxy] fill gid failed with store invalid");
520 return;
521 }
522 int errCode = store_->FillCloudLogAndAsset(opType, data, true, true);
523 if (errCode != E_OK) {
524 LOGW("[StorageProxy] fill gid failed %d", errCode);
525 }
526 }
527
SetCloudTaskConfig(const CloudTaskConfig & config)528 void StorageProxy::SetCloudTaskConfig(const CloudTaskConfig &config)
529 {
530 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
531 if (store_ == nullptr) {
532 LOGW("[StorageProxy] fill gid failed with store invalid");
533 return;
534 }
535 store_->SetCloudTaskConfig(config);
536 }
537
GetAssetsByGidOrHashKey(const std::string & tableName,const std::string & gid,const Bytes & hashKey,VBucket & assets)538 std::pair<int, uint32_t> StorageProxy::GetAssetsByGidOrHashKey(const std::string &tableName, const std::string &gid,
539 const Bytes &hashKey, VBucket &assets)
540 {
541 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
542 if (store_ == nullptr) {
543 return { -E_INVALID_DB, static_cast<uint32_t>(LockStatus::UNLOCK) };
544 }
545 TableSchema tableSchema;
546 int errCode = store_->GetCloudTableSchema(tableName, tableSchema);
547 if (errCode != E_OK) {
548 LOGE("get cloud table schema failed: %d", errCode);
549 return { errCode, static_cast<uint32_t>(LockStatus::UNLOCK) };
550 }
551 return store_->GetAssetsByGidOrHashKey(tableSchema, gid, hashKey, assets);
552 }
553
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)554 int StorageProxy::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
555 {
556 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
557 if (store_ == nullptr) {
558 return -E_INVALID_DB;
559 }
560 return store_->SetIAssetLoader(loader);
561 }
562
UpdateRecordFlag(const std::string & tableName,bool recordConflict,const LogInfo & logInfo)563 int StorageProxy::UpdateRecordFlag(const std::string &tableName, bool recordConflict, const LogInfo &logInfo)
564 {
565 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
566 if (store_ == nullptr) {
567 return -E_INVALID_DB;
568 }
569 return store_->UpdateRecordFlag(tableName, recordConflict, logInfo);
570 }
571
GetCompensatedSyncQuery(std::vector<QuerySyncObject> & syncQuery)572 int StorageProxy::GetCompensatedSyncQuery(std::vector<QuerySyncObject> &syncQuery)
573 {
574 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
575 if (store_ == nullptr) {
576 return -E_INVALID_DB;
577 }
578 return store_->GetCompensatedSyncQuery(syncQuery);
579 }
580
MarkFlagAsConsistent(const std::string & tableName,const DownloadData & downloadData,const std::set<std::string> & gidFilters)581 int StorageProxy::MarkFlagAsConsistent(const std::string &tableName, const DownloadData &downloadData,
582 const std::set<std::string> &gidFilters)
583 {
584 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
585 if (store_ == nullptr) {
586 return -E_INVALID_DB;
587 }
588 return store_->MarkFlagAsConsistent(tableName, downloadData, gidFilters);
589 }
590
OnSyncFinish()591 void StorageProxy::OnSyncFinish()
592 {
593 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
594 if (store_ == nullptr) {
595 return;
596 }
597 store_->SyncFinishHook();
598 }
599
OnUploadStart()600 void StorageProxy::OnUploadStart()
601 {
602 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
603 if (store_ == nullptr) {
604 return;
605 }
606 store_->DoUploadHook();
607 }
608
CleanAllWaterMark()609 void StorageProxy::CleanAllWaterMark()
610 {
611 cloudMetaData_->CleanAllWaterMark();
612 }
613
AppendWithUserIfNeed(const std::string & source) const614 std::string StorageProxy::AppendWithUserIfNeed(const std::string &source) const
615 {
616 if (user_.empty()) {
617 return source;
618 }
619 return source + "_" + user_;
620 }
621
GetCloudDbSchema(std::shared_ptr<DataBaseSchema> & cloudSchema)622 int StorageProxy::GetCloudDbSchema(std::shared_ptr<DataBaseSchema> &cloudSchema)
623 {
624 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
625 if (store_ == nullptr) {
626 return -E_INVALID_DB;
627 }
628 return store_->GetCloudDbSchema(cloudSchema);
629 }
630
GetLocalCloudVersion()631 std::pair<int, CloudSyncData> StorageProxy::GetLocalCloudVersion()
632 {
633 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
634 if (store_ == nullptr) {
635 return {-E_INTERNAL_ERROR, {}};
636 }
637 return store_->GetLocalCloudVersion();
638 }
639
GetCloudSyncConfig() const640 CloudSyncConfig StorageProxy::GetCloudSyncConfig() const
641 {
642 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
643 if (store_ == nullptr) {
644 return {};
645 }
646 return store_->GetCloudSyncConfig();
647 }
648
IsTableExistReference(const std::string & table)649 bool StorageProxy::IsTableExistReference(const std::string &table)
650 {
651 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
652 if (store_ == nullptr) {
653 return false;
654 }
655 return store_->IsTableExistReference(table);
656 }
657
IsTableExistReferenceOrReferenceBy(const std::string & table)658 bool StorageProxy::IsTableExistReferenceOrReferenceBy(const std::string &table)
659 {
660 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
661 if (store_ == nullptr) {
662 return false;
663 }
664 return store_->IsTableExistReferenceOrReferenceBy(table);
665 }
666
ReleaseUploadRecord(const std::string & table,const CloudWaterType & type,Timestamp localWaterMark)667 void StorageProxy::ReleaseUploadRecord(const std::string &table, const CloudWaterType &type, Timestamp localWaterMark)
668 {
669 std::shared_lock<std::shared_mutex> readLock(storeMutex_);
670 if (store_ == nullptr) {
671 return;
672 }
673 store_->ReleaseUploadRecord(table, type, localWaterMark);
674 }
675
IsTagCloudUpdateLocal(const LogInfo & localInfo,const LogInfo & cloudInfo,SingleVerConflictResolvePolicy policy)676 bool StorageProxy::IsTagCloudUpdateLocal(const LogInfo &localInfo, const LogInfo &cloudInfo,
677 SingleVerConflictResolvePolicy policy)
678 {
679 if (store_ == nullptr) {
680 return false;
681 }
682 return store_->IsTagCloudUpdateLocal(localInfo, cloudInfo, policy);
683 }
684 }
685