• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbGeneralStore"
16 
17 #include "rdb_general_store.h"
18 
19 #include <chrono>
20 #include <cinttypes>
21 
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_mark.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "device_sync_app/device_sync_app_manager.h"
30 #include "cloud_service.h"
31 #include "commonevent/data_sync_event.h"
32 #include "crypto/crypto_manager.h"
33 #include "device_manager_adapter.h"
34 #include "dfx/dfx_types.h"
35 #include "dfx/reporter.h"
36 #include "eventcenter/event_center.h"
37 #include "log_print.h"
38 #include "metadata/meta_data_manager.h"
39 #include "metadata/secret_key_meta_data.h"
40 #include "rdb_cursor.h"
41 #include "rdb_query.h"
42 #include "relational_store_manager.h"
43 #include "snapshot/bind_event.h"
44 #include "utils/anonymous.h"
45 #include "value_proxy.h"
46 #include "snapshot/snapshot.h"
47 namespace OHOS::DistributedRdb {
48 using namespace DistributedData;
49 using namespace DistributedDB;
50 using namespace NativeRdb;
51 using namespace std::chrono;
52 using namespace DistributedDataDfx;
53 using DBField = DistributedDB::Field;
54 using DBTable = DistributedDB::TableSchema;
55 using DBSchema = DistributedDB::DataBaseSchema;
56 using ClearMode = DistributedDB::ClearMode;
57 using DBStatus = DistributedDB::DBStatus;
58 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
59 
60 constexpr const char *INSERT = "INSERT INTO ";
61 constexpr const char *REPLACE = "REPLACE INTO ";
62 constexpr const char *VALUES = " VALUES ";
63 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
64 constexpr const char *LOGOUT_RESERVE_FLAG = "RESERVE#ALL_CLOUDDATA";
65 constexpr const LockAction LOCK_ACTION =
66     static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE) |
67                             static_cast<uint32_t>(LockAction::DELETE) | static_cast<uint32_t>(LockAction::DOWNLOAD));
68 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
69 constexpr uint32_t SEARCHABLE_FLAG = 2;
70 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
71 static constexpr const char *FT_OPEN_STORE = "OPEN_STORE";
72 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
73 
GetDBSchema(const Database & database)74 static DBSchema GetDBSchema(const Database &database)
75 {
76     DBSchema schema;
77     schema.tables.resize(database.tables.size());
78     for (size_t i = 0; i < database.tables.size(); i++) {
79         const Table &table = database.tables[i];
80         DBTable &dbTable = schema.tables[i];
81         dbTable.name = table.name;
82         dbTable.sharedTableName = table.sharedTableName;
83         for (auto &field : table.fields) {
84             DBField dbField;
85             dbField.colName = field.colName;
86             dbField.type = field.type;
87             dbField.primary = field.primary;
88             dbField.nullable = field.nullable;
89             dbTable.fields.push_back(std::move(dbField));
90         }
91     }
92     return schema;
93 }
94 
IsExistence(const std::string & col,const std::vector<std::string> & cols)95 static bool IsExistence(const std::string &col, const std::vector<std::string> &cols)
96 {
97     for (auto &column : cols) {
98         if (col == column) {
99             return true;
100         }
101     }
102     return false;
103 }
104 
GetGaussDistributedSchema(const Database & database)105 static DistributedSchema GetGaussDistributedSchema(const Database &database)
106 {
107     DistributedSchema distributedSchema;
108     distributedSchema.version = database.version;
109     distributedSchema.tables.resize(database.tables.size());
110     for (size_t i = 0; i < database.tables.size(); i++) {
111         const Table &table = database.tables[i];
112         DistributedTable &dbTable = distributedSchema.tables[i];
113         dbTable.tableName = table.name;
114         for (auto &field : table.fields) {
115             DistributedField dbField;
116             dbField.colName = field.colName;
117             dbField.isP2pSync = IsExistence(field.colName, table.deviceSyncFields);
118             dbField.isSpecified = field.primary;
119             dbTable.fields.push_back(std::move(dbField));
120         }
121     }
122     return distributedSchema;
123 }
124 
GetDistributedSchema(const StoreMetaData & meta)125 static std::pair<bool, Database> GetDistributedSchema(const StoreMetaData &meta)
126 {
127     std::pair<bool, Database> tableData;
128     Database database;
129     database.bundleName = meta.bundleName;
130     database.name = meta.storeId;
131     database.user = meta.user;
132     database.deviceId = meta.deviceId;
133     tableData.first = MetaDataManager::GetInstance().LoadMeta(database.GetKey(), database, true);
134     tableData.second = database;
135     return tableData;
136 }
137 
InitStoreInfo(const StoreMetaData & meta)138 void RdbGeneralStore::InitStoreInfo(const StoreMetaData &meta)
139 {
140     storeInfo_.tokenId = meta.tokenId;
141     storeInfo_.bundleName = meta.bundleName;
142     storeInfo_.storeName = meta.storeId;
143     storeInfo_.instanceId = meta.instanceId;
144     storeInfo_.user = std::atoi(meta.user.c_str());
145     storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
146     storeInfo_.path = meta.dataDir;
147 }
148 
GetOption(const StoreMetaData & meta)149 RelationalStoreDelegate::Option GetOption(const StoreMetaData &meta)
150 {
151     RelationalStoreDelegate::Option option;
152     option.syncDualTupleMode = true;
153     if (GetDistributedSchema(meta).first) {
154         option.tableMode = DistributedTableMode::COLLABORATION;
155     }
156     return option;
157 }
158 
GetDBPassword(const StoreMetaData & data)159 RdbGeneralStore::DBPassword RdbGeneralStore::GetDBPassword(const StoreMetaData &data)
160 {
161     DBPassword dbPassword;
162     SecretKeyMetaData secretKey;
163     auto metaKey = data.GetSecretKey();
164     if (!MetaDataManager::GetInstance().LoadMeta(metaKey, secretKey, true) || secretKey.sKey.empty()) {
165         return dbPassword;
166     }
167     CryptoManager::CryptoParams decryptParams = { .area = secretKey.area, .userId = data.user,
168         .nonce = secretKey.nonce };
169     auto password = CryptoManager::GetInstance().Decrypt(secretKey.sKey, decryptParams);
170     if (password.empty()) {
171         return dbPassword;
172     }
173     // update secret key of area or nonce
174     CryptoManager::GetInstance().UpdateSecretMeta(password, data, metaKey, secretKey);
175     dbPassword.SetValue(password.data(), password.size());
176     password.assign(password.size(), 0);
177     return dbPassword;
178 }
179 
RdbGeneralStore(const StoreMetaData & meta)180 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
181     : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
182 {
183     observer_.storeId_ = meta.storeId;
184     observer_.meta_ = meta;
185     RelationalStoreDelegate::Option option = GetOption(meta);
186     option.isNeedCompressOnSync = true;
187     option.observer = &observer_;
188     if (meta.isEncrypt) {
189         option.passwd = GetDBPassword(meta);
190         option.isEncryptedDb = meta.isEncrypt;
191         option.cipher = CipherType::AES_256_GCM;
192         for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
193             option.iterateTimes = ITERS[i];
194             auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
195             if (ret == DBStatus::OK && delegate_ != nullptr) {
196                 break;
197             }
198             manager_.CloseStore(delegate_);
199             delegate_ = nullptr;
200         }
201     } else {
202         auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
203         if (ret != DBStatus::OK || delegate_ == nullptr) {
204             manager_.CloseStore(delegate_);
205             delegate_ = nullptr;
206         }
207     }
208     InitStoreInfo(meta);
209     if (meta.isSearchable) {
210         syncNotifyFlag_ |= SEARCHABLE_FLAG;
211     }
212     if (delegate_ != nullptr && meta.isManualClean) {
213         PragmaData data = static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
214         delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
215     }
216     ZLOGI("Get rdb store, tokenId:%{public}u, bundleName:%{public}s, storeName:%{public}s, user:%{public}s,"
217           "isEncrypt:%{public}d, isManualClean:%{public}d, isSearchable:%{public}d",
218           meta.tokenId, meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), meta.user.c_str(),
219           meta.isEncrypt, meta.isManualClean, meta.isSearchable);
220 }
221 
~RdbGeneralStore()222 RdbGeneralStore::~RdbGeneralStore()
223 {
224     ZLOGI("Destruct. BundleName: %{public}s. StoreName: %{public}s. user: %{public}d",
225         storeInfo_.bundleName.c_str(), Anonymous::Change(storeInfo_.storeName).c_str(), storeInfo_.user);
226     manager_.CloseStore(delegate_);
227     delegate_ = nullptr;
228     bindInfo_.loader_ = nullptr;
229     if (bindInfo_.db_ != nullptr) {
230         bindInfo_.db_->Close();
231     }
232     bindInfo_.db_ = nullptr;
233     rdbCloud_ = nullptr;
234     rdbLoader_ = nullptr;
235     RemoveTasks();
236     tasks_ = nullptr;
237     executor_ = nullptr;
238 }
239 
BindSnapshots(BindAssets bindAssets)240 int32_t RdbGeneralStore::BindSnapshots(BindAssets bindAssets)
241 {
242     if (snapshots_ == nullptr) {
243         snapshots_ = bindAssets;
244     }
245     return GenErr::E_OK;
246 }
247 
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)248 int32_t RdbGeneralStore::Bind(const Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
249     const CloudConfig &config)
250 {
251     if (bindInfos.empty()) {
252         return GeneralError::E_OK;
253     }
254     auto bindInfo = bindInfos.begin()->second;
255     if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
256         return GeneralError::E_INVALID_ARGS;
257     }
258 
259     if (isBound_.exchange(true)) {
260         return GeneralError::E_OK;
261     }
262 
263     BindEvent::BindEventInfo eventInfo;
264     eventInfo.tokenId = storeInfo_.tokenId;
265     eventInfo.bundleName = storeInfo_.bundleName;
266     eventInfo.storeName = storeInfo_.storeName;
267     eventInfo.user = storeInfo_.user;
268     eventInfo.instanceId = storeInfo_.instanceId;
269 
270     auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
271     EventCenter::GetInstance().PostEvent(std::move(evt));
272     bindInfo_ = std::move(bindInfo);
273     {
274         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
275         rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, snapshots_);
276         rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, snapshots_);
277     }
278 
279     DistributedDB::CloudSyncConfig dbConfig;
280     dbConfig.maxUploadCount = config.maxNumber;
281     dbConfig.maxUploadSize = config.maxSize;
282     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
283     dbConfig.isSupportEncrypt = config.isSupportEncrypt;
284     DBSchema schema = GetDBSchema(database);
285     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
286     if (delegate_ == nullptr) {
287         ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
288         return GeneralError::E_ALREADY_CLOSED;
289     }
290     delegate_->SetCloudDB(rdbCloud_);
291     delegate_->SetIAssetLoader(rdbLoader_);
292     delegate_->SetCloudDbSchema(std::move(schema));
293     delegate_->SetCloudSyncConfig(dbConfig);
294 
295     syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
296     return GeneralError::E_OK;
297 }
298 
IsBound(uint32_t user)299 bool RdbGeneralStore::IsBound(uint32_t user)
300 {
301     return isBound_;
302 }
303 
Close(bool isForce)304 int32_t RdbGeneralStore::Close(bool isForce)
305 {
306     {
307         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
308         if (!lock) {
309             return GeneralError::E_BUSY;
310         }
311 
312         if (delegate_ == nullptr) {
313             return GeneralError::E_OK;
314         }
315         auto [dbStatus, downloadCount] = delegate_->GetDownloadingAssetsCount();
316         if (!isForce &&
317             (delegate_->GetCloudSyncTaskCount() > 0 || downloadCount > 0 || delegate_->GetDeviceSyncTaskCount() > 0)) {
318             return GeneralError::E_BUSY;
319         }
320         auto status = manager_.CloseStore(delegate_);
321         if (status != DBStatus::OK) {
322             return status;
323         }
324         delegate_ = nullptr;
325     }
326     RemoveTasks();
327     bindInfo_.loader_ = nullptr;
328     if (bindInfo_.db_ != nullptr) {
329         bindInfo_.db_->Close();
330     }
331     bindInfo_.db_ = nullptr;
332     {
333         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
334         rdbCloud_ = nullptr;
335         rdbLoader_ = nullptr;
336     }
337     return GeneralError::E_OK;
338 }
339 
Execute(const std::string & table,const std::string & sql)340 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
341 {
342     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
343     if (delegate_ == nullptr) {
344         ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
345             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
346             Anonymous::Change(sql).c_str());
347         return GeneralError::E_ERROR;
348     }
349     std::vector<DistributedDB::VBucket> changedData;
350     auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
351     if (status != DBStatus::OK) {
352         ZLOGE("Execute failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
353               Anonymous::Change(sql).c_str(), changedData.size());
354         if (status == DBStatus::BUSY) {
355             return GeneralError::E_BUSY;
356         }
357         return GeneralError::E_ERROR;
358     }
359     return GeneralError::E_OK;
360 }
361 
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)362 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
363 {
364     strColumnSql += " (";
365     strRowValueSql += " (";
366     auto columnSize = value.size();
367     for (auto column = value.begin(); column != value.end(); ++column) {
368         strRowValueSql += " ?,";
369         strColumnSql += " " + column->first + ",";
370     }
371     if (columnSize != 0) {
372         strColumnSql.pop_back();
373         strColumnSql += ")";
374         strRowValueSql.pop_back();
375         strRowValueSql += ")";
376     }
377     return columnSize;
378 }
379 
Insert(const std::string & table,VBuckets && values)380 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
381 {
382     if (table.empty() || values.size() == 0) {
383         ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
384         return GeneralError::E_INVALID_ARGS;
385     }
386 
387     std::string strColumnSql;
388     std::string strRowValueSql;
389     auto value = values.front();
390     size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
391     if (columnSize == 0) {
392         ZLOGE("Insert: columnSize error, can't be 0!");
393         return GeneralError::E_INVALID_ARGS;
394     }
395 
396     Values args;
397     std::string strValueSql;
398     for (auto &rowData : values) {
399         if (rowData.size() != columnSize) {
400             ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
401             return GeneralError::E_INVALID_ARGS;
402         }
403         for (auto column = rowData.begin(); column != rowData.end(); ++column) {
404             args.push_back(std::move(column->second));
405         }
406         strValueSql += strRowValueSql + ",";
407     }
408     strValueSql.pop_back();
409     std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
410 
411     std::vector<DistributedDB::VBucket> changedData;
412     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
413     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
414     if (delegate_ == nullptr) {
415         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
416             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
417         return GeneralError::E_ERROR;
418     }
419     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
420     if (status != DBStatus::OK) {
421         if (IsPrintLog(status)) {
422             auto time =
423                 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
424                 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
425                 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
426         }
427         return GeneralError::E_ERROR;
428     }
429     return GeneralError::E_OK;
430 }
431 
IsPrintLog(DBStatus status)432 bool RdbGeneralStore::IsPrintLog(DBStatus status)
433 {
434     bool isPrint = false;
435     if (status == lastError_) {
436         if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
437             isPrint = true;
438         }
439     } else {
440         isPrint = true;
441         lastErrCnt_ = 0;
442         lastError_ = status;
443     }
444     return isPrint;
445 }
446 
447 /**
448  * This function does not support batch updates in rdb, it only supports single data updates.
449  */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)450 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
451     const std::string &whereSql, Values &&conditions)
452 {
453     if (table.empty()) {
454         ZLOGE("Update: table can't be empty!");
455         return GeneralError::E_INVALID_ARGS;
456     }
457     if (setSql.empty() || values.size() == 0) {
458         ZLOGE("Update: setSql and values can't be empty!");
459         return GeneralError::E_INVALID_ARGS;
460     }
461     if (whereSql.empty() || conditions.size() == 0) {
462         ZLOGE("Update: whereSql and conditions can't be empty!");
463         return GeneralError::E_INVALID_ARGS;
464     }
465 
466     std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
467     Values args;
468     for (auto &value : values) {
469         args.push_back(std::move(value));
470     }
471     for (auto &condition : conditions) {
472         args.push_back(std::move(condition));
473     }
474 
475     std::vector<DistributedDB::VBucket> changedData;
476     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
477     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
478     if (delegate_ == nullptr) {
479         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
480             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
481         return GeneralError::E_ERROR;
482     }
483     auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
484     if (status != DBStatus::OK) {
485         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
486               changedData.size());
487         return GeneralError::E_ERROR;
488     }
489     return GeneralError::E_OK;
490 }
491 
Replace(const std::string & table,VBucket && value)492 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
493 {
494     if (table.empty() || value.size() == 0) {
495         return GeneralError::E_INVALID_ARGS;
496     }
497     std::string columnSql;
498     std::string valueSql;
499     SqlConcatenate(value, columnSql, valueSql);
500     std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
501 
502     Values args;
503     for (auto &[k, v] : value) {
504         args.emplace_back(std::move(v));
505     }
506     std::vector<DistributedDB::VBucket> changedData;
507     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
508     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
509     if (delegate_ == nullptr) {
510         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
511             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
512         return GeneralError::E_ERROR;
513     }
514     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
515     if (status != DBStatus::OK) {
516         ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
517             status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
518         if (status == DBStatus::BUSY) {
519             return GeneralError::E_BUSY;
520         }
521         return GeneralError::E_ERROR;
522     }
523     return GeneralError::E_OK;
524 }
525 
Delete(const std::string & table,const std::string & sql,Values && args)526 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
527 {
528     return 0;
529 }
530 
Query(const std::string & table,const std::string & sql,Values && args)531 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
532     const std::string &sql, Values &&args)
533 {
534     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
535     if (delegate_ == nullptr) {
536         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
537         return { GeneralError::E_ALREADY_CLOSED, nullptr };
538     }
539     auto [errCode, records] = QuerySql(sql, std::move(args));
540     return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
541 }
542 
Query(const std::string & table,GenQuery & query)543 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
544 {
545     RdbQuery *rdbQuery = nullptr;
546     auto ret = query.QueryInterface(rdbQuery);
547     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
548         ZLOGE("not RdbQuery!");
549         return { GeneralError::E_INVALID_ARGS, nullptr };
550     }
551     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
552     if (delegate_ == nullptr) {
553         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
554             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
555         return { GeneralError::E_ALREADY_CLOSED, nullptr };
556     }
557     if (rdbQuery->IsRemoteQuery()) {
558         if (rdbQuery->GetDevices().size() != 1) {
559             ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
560             return { GeneralError::E_ERROR, nullptr };
561         }
562         auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
563         return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
564     }
565     return { GeneralError::E_ERROR, nullptr };
566 }
567 
MergeMigratedData(const std::string & tableName,VBuckets && values)568 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
569 {
570     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
571     if (delegate_ == nullptr) {
572         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
573             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
574         return GeneralError::E_ERROR;
575     }
576 
577     auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
578     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
579 }
580 
CleanTrackerData(const std::string & tableName,int64_t cursor)581 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
582 {
583     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
584     if (delegate_ == nullptr) {
585         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
586               Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
587         return GeneralError::E_ERROR;
588     }
589 
590     auto status = delegate_->CleanTrackerData(tableName, cursor);
591     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
592 }
593 
DoCloudSync(const Devices & devices,const DistributedDB::Query & dbQuery,const SyncParam & syncParam,bool isPriority,DetailAsync async)594 std::pair<int32_t, int32_t> RdbGeneralStore::DoCloudSync(const Devices &devices, const DistributedDB::Query &dbQuery,
595     const SyncParam &syncParam, bool isPriority, DetailAsync async)
596 {
597     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
598     auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
599     SyncId syncId = ++syncTaskId_;
600     auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
601     if (executor_ != nullptr && tasks_ != nullptr) {
602         auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
603         tasks_->Insert(syncId, { id, callback });
604     }
605     CloudSyncOption option;
606     option.devices = devices;
607     option.mode = DistributedDB::SyncMode(syncMode);
608     option.query = dbQuery;
609     option.waitTime = syncParam.wait;
610     option.priorityTask = isPriority || highMode == MANUAL_SYNC_MODE;
611     option.priorityLevel = GetPriorityLevel(highMode);
612     option.compensatedSyncOnly = syncParam.isCompensation;
613     option.merge = highMode == AUTO_SYNC_MODE;
614     option.lockAction = LOCK_ACTION;
615     option.prepareTraceId = syncParam.prepareTraceId;
616     option.asyncDownloadAssets = syncParam.asyncDownloadAsset;
617     auto dbStatus = DistributedDB::INVALID_ARGS;
618     dbStatus = delegate_->Sync(option, tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
619     if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
620         return { ConvertStatus(dbStatus), dbStatus };
621     }
622     Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
623         "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
624     tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
625         if (executor != nullptr) {
626             executor->Remove(task.taskId);
627         }
628         return false;
629     });
630     return { ConvertStatus(dbStatus), dbStatus };
631 }
632 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)633 std::pair<int32_t, int32_t> RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
634     const SyncParam &syncParam)
635 {
636     DistributedDB::Query dbQuery;
637     RdbQuery *rdbQuery = nullptr;
638     bool isPriority = false;
639     auto ret = query.QueryInterface(rdbQuery);
640     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
641         dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
642     } else {
643         dbQuery = rdbQuery->GetQuery();
644         isPriority = rdbQuery->IsPriority();
645     }
646     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
647     auto dbMode = DistributedDB::SyncMode(syncMode);
648     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
649     if (delegate_ == nullptr) {
650         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
651               "wait:%{public}d", devices.size(),
652               devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
653         return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
654     }
655     auto dbStatus = DistributedDB::INVALID_ARGS;
656     if (syncMode < NEARBY_END) {
657         dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
658     } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
659         return DoCloudSync(devices, dbQuery, syncParam, isPriority, async);
660     }
661     return { ConvertStatus(dbStatus), dbStatus };
662 }
663 
PreSharing(GenQuery & query)664 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
665 {
666     RdbQuery *rdbQuery = nullptr;
667     auto ret = query.QueryInterface(rdbQuery);
668     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
669         ZLOGE("not RdbQuery!");
670         return { GeneralError::E_INVALID_ARGS, nullptr };
671     }
672     auto tables = rdbQuery->GetTables();
673     auto statement = rdbQuery->GetStatement();
674     if (statement.empty() || tables.empty()) {
675         ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
676         return { GeneralError::E_INVALID_ARGS, nullptr };
677     }
678     std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
679     VBuckets values;
680     {
681         std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
682         if (delegate_ == nullptr) {
683             ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
684             return { GeneralError::E_ALREADY_CLOSED, nullptr };
685         }
686         auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
687         values = std::move(ret);
688     }
689     auto rdbCloud = GetRdbCloud();
690     if (rdbCloud == nullptr || values.empty()) {
691         ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
692             values.size());
693         return { GeneralError::E_CLOUD_DISABLED, nullptr };
694     }
695     VBuckets extends = ExtractExtend(values);
696     rdbCloud->PreSharing(*tables.begin(), extends);
697     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
698          ++value, ++extend) {
699         value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
700         value->erase(CLOUD_GID);
701     }
702     return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
703 }
704 
ExtractExtend(VBuckets & values) const705 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
706 {
707     VBuckets extends(values.size());
708     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
709          ++value, ++extend) {
710         auto it = value->find(CLOUD_GID);
711         if (it == value->end()) {
712             continue;
713         }
714         auto gid = std::get_if<std::string>(&(it->second));
715         if (gid == nullptr || gid->empty()) {
716             continue;
717         }
718         extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
719     }
720     return extends;
721 }
722 
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const723 std::string RdbGeneralStore::BuildSql(
724     const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
725 {
726     std::string sql = "select ";
727     sql.append(CLOUD_GID);
728     std::string sqlNode = "select rowid";
729     for (auto &column : columns) {
730         sql.append(", ").append(column);
731         sqlNode.append(", ").append(column);
732     }
733     sqlNode.append(" from ").append(table).append(statement);
734     auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
735     sql.append(" from ").append(logTable).append(", (").append(sqlNode);
736     sql.append(") where ").append(DATE_KEY).append(" = rowid");
737     return sql;
738 }
739 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)740 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
741 {
742     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
743         return GeneralError::E_INVALID_ARGS;
744     }
745     DBStatus status = DistributedDB::DB_ERROR;
746     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
747     if (delegate_ == nullptr) {
748         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
749               "tableName:%{public}s",
750             devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
751             Anonymous::Change(tableName).c_str());
752         return GeneralError::E_ALREADY_CLOSED;
753     }
754     switch (mode) {
755         case CLOUD_INFO:
756             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
757             if (status == DistributedDB::OK) {
758                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
759                 break;
760             }
761             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
762             break;
763         case CLOUD_DATA:
764             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
765             if (status == DistributedDB::OK) {
766                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
767                 break;
768             }
769             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
770             break;
771         case NEARBY_DATA:
772             if (devices.empty()) {
773                 status = delegate_->RemoveDeviceData();
774                 break;
775             }
776             for (auto device : devices) {
777                 status = delegate_->RemoveDeviceData(device, tableName);
778             }
779             break;
780         default:
781             return GeneralError::E_ERROR;
782     }
783     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
784 }
785 
Watch(int32_t origin,Watcher & watcher)786 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
787 {
788     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
789         return GeneralError::E_INVALID_ARGS;
790     }
791 
792     observer_.watcher_ = &watcher;
793     return GeneralError::E_OK;
794 }
795 
Unwatch(int32_t origin,Watcher & watcher)796 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
797 {
798     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
799         return GeneralError::E_INVALID_ARGS;
800     }
801 
802     observer_.watcher_ = nullptr;
803     return GeneralError::E_OK;
804 }
805 
GetDBBriefCB(DetailAsync async)806 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
807 {
808     if (!async) {
809         return [](auto &) {};
810     }
811     return [async = std::move(async)](
812         const std::map<std::string, std::vector<TableStatus>> &result) {
813         DistributedData::GenDetails details;
814         for (auto &[key, tables] : result) {
815             auto &value = details[key];
816             value.progress = FINISHED;
817             value.code = GeneralError::E_OK;
818             for (auto &table : tables) {
819                 if (table.status != DBStatus::OK) {
820                     value.code = GeneralError::E_ERROR;
821                 }
822             }
823         }
824         async(details);
825     };
826 }
827 
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)828 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
829     uint32_t highMode)
830 {
831     std::shared_lock<std::shared_mutex> lock(asyncMutex_);
832     return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
833         rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
834         DistributedData::GenDetails details;
835         for (auto &[id, process] : processes) {
836             bool isDownload = false;
837             auto &detail = details[id];
838             detail.progress = process.process;
839             detail.code = ConvertStatus(process.errCode);
840             detail.dbCode = process.errCode;
841             uint32_t totalCount = 0;
842             for (auto [key, value] : process.tableProcess) {
843                 auto &table = detail.details[key];
844                 table.upload.total = value.upLoadInfo.total;
845                 table.upload.success = value.upLoadInfo.successCount;
846                 table.upload.failed = value.upLoadInfo.failCount;
847                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
848                 totalCount += table.upload.total;
849                 isDownload = table.download.total > 0;
850                 table.download.total = value.downLoadInfo.total;
851                 table.download.success = value.downLoadInfo.successCount;
852                 table.download.failed = value.downLoadInfo.failCount;
853                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
854                 detail.changeCount = (process.process == FINISHED)
855                                         ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
856                                               value.downLoadInfo.deleteCount
857                                         : 0;
858                 totalCount += table.download.total;
859             }
860             if (process.process == FINISHED) {
861                 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
862             } else {
863                 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
864             }
865 
866             if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
867                 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
868                 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
869             }
870         }
871         if (async) {
872             async(details);
873         }
874 
875         if (highMode == AUTO_SYNC_MODE && autoAsync
876             && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
877             autoAsync(details);
878         }
879     };
880 }
881 
Release()882 int32_t RdbGeneralStore::Release()
883 {
884     auto ref = 1;
885     {
886         std::lock_guard<decltype(mutex_)> lock(mutex_);
887         if (ref_ == 0) {
888             return 0;
889         }
890         ref = --ref_;
891     }
892     ZLOGD("ref:%{public}d", ref);
893     if (ref == 0) {
894         delete this;
895     }
896     return ref;
897 }
898 
AddRef()899 int32_t RdbGeneralStore::AddRef()
900 {
901     std::lock_guard<decltype(mutex_)> lock(mutex_);
902     if (ref_ == 0) {
903         return 0;
904     }
905     return ++ref_;
906 }
907 
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)908 void RdbGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
909 {
910     ArkDataFaultMsg msg = { .faultType = faultType,
911         .bundleName = storeInfo_.bundleName,
912         .moduleName = ModuleName::RDB_STORE,
913         .storeName = storeInfo_.storeName,
914         .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
915         .appendixMsg = appendix };
916     Reporter::GetInstance()->CloudSyncFault()->Report(msg);
917 }
918 
SetReference(const std::vector<Reference> & references)919 int32_t RdbGeneralStore::SetReference(const std::vector<Reference> &references)
920 {
921     std::vector<DistributedDB::TableReferenceProperty> properties;
922     for (const auto &reference : references) {
923         properties.push_back({reference.sourceTable, reference.targetTable, reference.refFields});
924     }
925     auto status = delegate_->SetReference(properties);
926     if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::PROPERTY_CHANGED) {
927         ZLOGE("distributed table set reference failed, err:%{public}d", status);
928         return GeneralError::E_ERROR;
929     }
930     return GeneralError::E_OK;
931 }
932 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)933 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
934     const std::vector<Reference> &references)
935 {
936     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
937     if (delegate_ == nullptr) {
938         ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
939             Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
940         return GeneralError::E_ALREADY_CLOSED;
941     }
942     for (const auto &table : tables) {
943         ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
944         auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
945         if (dBStatus != DistributedDB::DBStatus::OK) {
946             ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
947                 Anonymous::Change(table).c_str(), dBStatus);
948             Report(FT_OPEN_STORE, static_cast<int32_t>(Fault::CSF_GS_CREATE_DISTRIBUTED_TABLE),
949                 "SetDistributedTables: set table(" + Anonymous::Change(table) + ") =" +
950                 std::to_string(static_cast<int32_t>(dBStatus)));
951             return GeneralError::E_ERROR;
952         }
953     }
954     if (type == DistributedTableType::DISTRIBUTED_CLOUD) {
955         auto status = SetReference(references);
956         if (status != GeneralError::E_OK) {
957             Report(FT_OPEN_STORE, static_cast<int32_t>(Fault::CSF_GS_CREATE_DISTRIBUTED_TABLE),
958                 "SetDistributedTables: set reference=" + std::to_string(static_cast<int32_t>(status)));
959             return GeneralError::E_ERROR;
960         }
961     }
962     auto [exist, database] = GetDistributedSchema(observer_.meta_);
963     if (exist && type == DistributedTableType::DISTRIBUTED_DEVICE) {
964         auto force = DeviceSyncAppManager::GetInstance().Check(
965             {observer_.meta_.appId, observer_.meta_.bundleName, database.version});
966         delegate_->SetDistributedSchema(GetGaussDistributedSchema(database), force);
967     }
968     CloudMark metaData(storeInfo_);
969     if (MetaDataManager::GetInstance().LoadMeta(metaData.GetKey(), metaData, true) && metaData.isClearWaterMark) {
970         DistributedDB::ClearMetaDataOption option{ .mode = DistributedDB::ClearMetaDataMode::CLOUD_WATERMARK };
971         auto ret = delegate_->ClearMetaData(option);
972         if (ret != DBStatus::OK) {
973             ZLOGE("clear watermark failed, err:%{public}d", ret);
974             return GeneralError::E_ERROR;
975         }
976         MetaDataManager::GetInstance().DelMeta(metaData.GetKey(), true);
977         auto event = std::make_unique<CloudEvent>(CloudEvent::UPGRADE_SCHEMA, storeInfo_);
978         EventCenter::GetInstance().PostEvent(std::move(event));
979         ZLOGI("clear watermark success, bundleName:%{public}s, storeName:%{public}s", storeInfo_.bundleName.c_str(),
980             Anonymous::Change(storeInfo_.storeName).c_str());
981     }
982     return GeneralError::E_OK;
983 }
984 
SetConfig(const StoreConfig & storeConfig)985 void RdbGeneralStore::SetConfig(const StoreConfig &storeConfig)
986 {
987     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
988      if (delegate_ == nullptr) {
989         ZLOGE("database already closed!, tableMode is :%{public}d",
990               storeConfig.tableMode.has_value() ? static_cast<int32_t>(storeConfig.tableMode.value()) : -1);
991         return;
992     }
993     if (storeConfig.tableMode.has_value()) {
994         RelationalStoreDelegate::StoreConfig config;
995         if (storeConfig.tableMode == DistributedTableMode::COLLABORATION) {
996             config.tableMode = DistributedDB::DistributedTableMode::COLLABORATION;
997         } else if (storeConfig.tableMode == DistributedTableMode::SPLIT_BY_DEVICE) {
998             config.tableMode = DistributedDB::DistributedTableMode::SPLIT_BY_DEVICE;
999         }
1000         delegate_->SetStoreConfig(config);
1001     }
1002 }
1003 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)1004 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
1005     const std::set<std::string> &extendColNames, bool isForceUpgrade)
1006 {
1007     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1008     if (delegate_ == nullptr) {
1009         ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
1010             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
1011         return GeneralError::E_ALREADY_CLOSED;
1012     }
1013     auto status = delegate_->SetTrackerTable({ tableName, extendColNames, trackerColNames, isForceUpgrade });
1014     if (status == DBStatus::WITH_INVENTORY_DATA) {
1015         ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
1016             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
1017         return GeneralError::E_WITH_INVENTORY_DATA;
1018     }
1019     if (status != DBStatus::OK) {
1020         ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
1021             status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
1022         return GeneralError::E_ERROR;
1023     }
1024     return GeneralError::E_OK;
1025 }
1026 
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)1027 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
1028     const DistributedDB::RemoteCondition &remoteCondition)
1029 {
1030     std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
1031     DistributedDB::DBStatus status =
1032         delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
1033     if (status != DistributedDB::DBStatus::OK) {
1034         ZLOGE("DistributedDB remote query failed, device:%{public}s, status is  %{public}d.",
1035             Anonymous::Change(device).c_str(), status);
1036         return nullptr;
1037     }
1038     return std::make_shared<RdbCursor>(dbResultSet);
1039 }
1040 
ConvertStatus(DistributedDB::DBStatus status)1041 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
1042 {
1043     switch (status) {
1044         case DBStatus::OK:
1045             return GenErr::E_OK;
1046         case DBStatus::CLOUD_NETWORK_ERROR:
1047             return GenErr::E_NETWORK_ERROR;
1048         case DBStatus::CLOUD_LOCK_ERROR:
1049             return GenErr::E_LOCKED_BY_OTHERS;
1050         case DBStatus::CLOUD_FULL_RECORDS:
1051             return GenErr::E_RECODE_LIMIT_EXCEEDED;
1052         case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
1053             return GenErr::E_NO_SPACE_FOR_ASSET;
1054         case DBStatus::BUSY:
1055             return GenErr::E_BUSY;
1056         case DBStatus::CLOUD_SYNC_TASK_MERGED:
1057             return GenErr::E_SYNC_TASK_MERGED;
1058         case DBStatus::CLOUD_DISABLED:
1059             return GeneralError::E_CLOUD_DISABLED;
1060         default:
1061             ZLOGI("status:0x%{public}x", status);
1062             break;
1063     }
1064     return GenErr::E_ERROR;
1065 }
1066 
IsValid()1067 bool RdbGeneralStore::IsValid()
1068 {
1069     return delegate_ != nullptr;
1070 }
1071 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)1072 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
1073 {
1074     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1075     async_ = std::move(async);
1076     return GenErr::E_OK;
1077 }
1078 
UnregisterDetailProgressObserver()1079 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
1080 {
1081     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1082     async_ = nullptr;
1083     return GenErr::E_OK;
1084 }
1085 
QuerySql(const std::string & sql,Values && args)1086 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
1087 {
1088     std::vector<DistributedDB::VBucket> changedData;
1089     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
1090     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
1091     if (status != DBStatus::OK) {
1092         ZLOGE("Query failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
1093             Anonymous::Change(sql).c_str(), changedData.size());
1094         if (status == DBStatus::BUSY) {
1095             return { GenErr::E_BUSY, {} };
1096         }
1097         return { GenErr::E_ERROR, {} };
1098     }
1099     return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
1100 }
1101 
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)1102 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
1103     uint32_t syncCount)
1104 {
1105     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1106     if (requiredFlag != (requiredFlag & flag)) {
1107         return;
1108     }
1109     StoreInfo info = storeInfo;
1110     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
1111         traceId, syncCount);
1112     EventCenter::GetInstance().PostEvent(std::move(evt));
1113 }
1114 
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)1115 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
1116 {
1117     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1118     if (requiredFlag != (requiredFlag & flag)) {
1119         return;
1120     }
1121     StoreInfo info = storeInfo;
1122     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
1123         traceId);
1124     EventCenter::GetInstance().PostEvent(std::move(evt));
1125 }
1126 
GetTables()1127 std::set<std::string> RdbGeneralStore::GetTables()
1128 {
1129     std::set<std::string> tables;
1130     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1131     if (delegate_ == nullptr) {
1132         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1133         return tables;
1134     }
1135     auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
1136     if (errCode != GenErr::E_OK) {
1137         return tables;
1138     }
1139     for (auto &table : res) {
1140         auto it = table.find("name");
1141         if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
1142             ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1143             continue;
1144         }
1145         tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
1146     }
1147     return tables;
1148 }
1149 
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)1150 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
1151     const std::set<std::string> &localTables)
1152 {
1153     std::vector<std::string> res;
1154     for (auto &it : syncTables) {
1155         if (localTables.count(it) &&
1156             localTables.count(RelationalStoreManager::GetDistributedLogTableName(it))) {
1157             res.push_back(std::move(it));
1158         }
1159     }
1160     return res;
1161 }
1162 
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)1163 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
1164     const std::vector<std::string> &tables, ChangeType type)
1165 {
1166     RemoteChangeEvent::DataInfo info;
1167     info.userId = meta.user;
1168     info.storeId = meta.storeId;
1169     info.deviceId = meta.deviceId;
1170     info.bundleName = meta.bundleName;
1171     info.tables = tables;
1172     info.changeType = type;
1173     auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
1174     EventCenter::GetInstance().PostEvent(std::move(evt));
1175 }
1176 
OnChange(const DBChangedIF & data)1177 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1178 {
1179     if (!HasWatcher()) {
1180         return;
1181     }
1182     std::string device = data.GetDataChangeDevice();
1183     auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1184     ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1185         Anonymous::Change(device).c_str());
1186     GenOrigin genOrigin;
1187     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1188     genOrigin.dataType = GenOrigin::BASIC_DATA;
1189     DistributedDB::StoreProperty property;
1190     data.GetStoreProperty(property);
1191     genOrigin.id.push_back(networkId);
1192     genOrigin.store = storeId_;
1193     GeneralWatcher::ChangeInfo changeInfo{};
1194     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1195     return;
1196 }
1197 
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1198 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1199 {
1200     if (!HasWatcher()) {
1201         return;
1202     }
1203     ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1204         Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1205     GenOrigin genOrigin;
1206     genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1207                            ? GenOrigin::ORIGIN_LOCAL
1208                            : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1209     genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1210     genOrigin.id.push_back(originalId);
1211     genOrigin.store = storeId_;
1212     Watcher::PRIFields fields;
1213     Watcher::ChangeInfo changeInfo;
1214     bool notifyFlag = false;
1215     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1216         auto &info = changeInfo[data.tableName][i];
1217         for (auto &priData : data.primaryData[i]) {
1218             Watcher::PRIValue value;
1219             if (priData.empty()) {
1220                 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
1221                     Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
1222                     Anonymous::Change(originalId).c_str(), i);
1223                 continue;
1224             }
1225             Convert(std::move(*(priData.begin())), value);
1226             if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1227                 info.push_back(std::move(value));
1228                 continue;
1229             }
1230             auto key = std::get_if<std::string>(&value);
1231             if (key != nullptr && (*key == LOGOUT_DELETE_FLAG || *key == LOGOUT_RESERVE_FLAG)) {
1232                 // notify to start app
1233                 notifyFlag = true;
1234             }
1235             info.push_back(std::move(value));
1236         }
1237     }
1238     if (notifyFlag) {
1239         ZLOGI("post data change for cleaning cloud data. store:%{public}s table:%{public}s data change from "
1240               ":%{public}s",
1241             Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
1242             Anonymous::Change(originalId).c_str());
1243         PostDataChange(meta_, {}, CLOUD_LOGOUT);
1244     }
1245     if (!data.field.empty()) {
1246         fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1247     }
1248     watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1249 }
1250 
LockCloudDB()1251 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1252 {
1253     auto rdbCloud = GetRdbCloud();
1254     if (rdbCloud == nullptr) {
1255         return { GeneralError::E_ERROR, 0 };
1256     }
1257     return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1258 }
1259 
UnLockCloudDB()1260 int32_t RdbGeneralStore::UnLockCloudDB()
1261 {
1262     auto rdbCloud = GetRdbCloud();
1263     if (rdbCloud == nullptr) {
1264         return GeneralError::E_ERROR;
1265     }
1266     return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1267 }
1268 
GetRdbCloud() const1269 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1270 {
1271     std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1272     return rdbCloud_;
1273 }
1274 
IsFinished(SyncId syncId) const1275 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1276 {
1277     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1278     if (delegate_ == nullptr) {
1279         ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1280         return true;
1281     }
1282     return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1283 }
1284 
GetFinishTask(SyncId syncId)1285 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1286 {
1287     return [this, executor = executor_, task = tasks_, syncId]() {
1288         auto [exist, finishTask] = task->Find(syncId);
1289         if (!exist || finishTask.cb == nullptr) {
1290             task->Erase(syncId);
1291             return;
1292         }
1293         if (!IsFinished(syncId)) {
1294             task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1295                 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1296                 return true;
1297             });
1298             return;
1299         }
1300         DBProcessCB cb;
1301         task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1302             cb = task.cb;
1303             return false;
1304         });
1305         if (cb != nullptr) {
1306             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1307                   Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1308             std::map<std::string, SyncProcess> result;
1309             result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1310             cb(result);
1311         }
1312     };
1313 }
1314 
SetExecutor(std::shared_ptr<Executor> executor)1315 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1316 {
1317     if (executor_ == nullptr) {
1318         executor_ = executor;
1319     }
1320 }
1321 
RemoveTasks()1322 void RdbGeneralStore::RemoveTasks()
1323 {
1324     if (tasks_ == nullptr) {
1325         return;
1326     }
1327     std::list<DBProcessCB> cbs;
1328     std::list<TaskId> taskIds;
1329     tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1330         if (task.cb != nullptr) {
1331             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1332                   syncId);
1333         }
1334         cbs.push_back(std::move(task.cb));
1335         taskIds.push_back(task.taskId);
1336         return true;
1337     });
1338     auto func = [](const std::list<DBProcessCB> &cbs) {
1339         std::map<std::string, SyncProcess> result;
1340         result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1341         for (auto &cb : cbs) {
1342             if (cb != nullptr) {
1343                 cb(result);
1344             }
1345         }
1346     };
1347     if (executor_ != nullptr) {
1348         for (auto taskId: taskIds) {
1349             executor_->Remove(taskId, true);
1350         }
1351         executor_->Execute([cbs, func]() {
1352             func(cbs);
1353         });
1354     } else {
1355         func(cbs);
1356     }
1357 }
1358 
GetCB(SyncId syncId)1359 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1360 {
1361     return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1362         if (task == nullptr) {
1363             return;
1364         }
1365         DBProcessCB cb;
1366         task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1367             cb = finishTask.cb;
1368             bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1369             if (isFinished) {
1370                 finishTask.cb = nullptr;
1371             }
1372             return true;
1373         });
1374         if (cb != nullptr) {
1375             cb(progress);
1376         }
1377         return;
1378     };
1379 }
1380 
UpdateDBStatus()1381 int32_t RdbGeneralStore::UpdateDBStatus()
1382 {
1383     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1384     if (delegate_ == nullptr) {
1385         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1386         return GeneralError::E_ALREADY_CLOSED;
1387     }
1388     return delegate_->OperateDataStatus(static_cast<uint32_t>(DataOperator::UPDATE_TIME));
1389 }
1390 } // namespace OHOS::DistributedRdb