• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbGeneralStore"
16 
17 #include "rdb_general_store.h"
18 
19 #include <chrono>
20 #include <cinttypes>
21 
22 #include "cache_cursor.h"
23 #include "changeevent/remote_change_event.h"
24 #include "cloud/asset_loader.h"
25 #include "cloud/cloud_db.h"
26 #include "cloud/cloud_lock_event.h"
27 #include "cloud/cloud_store_types.h"
28 #include "cloud/schema_meta.h"
29 #include "cloud_service.h"
30 #include "commonevent/data_sync_event.h"
31 #include "communicator/device_manager_adapter.h"
32 #include "crypto_manager.h"
33 #include "dfx_types.h"
34 #include "device_manager_adapter.h"
35 #include "eventcenter/event_center.h"
36 #include "log_print.h"
37 #include "metadata/meta_data_manager.h"
38 #include "metadata/secret_key_meta_data.h"
39 #include "rdb_cursor.h"
40 #include "rdb_helper.h"
41 #include "rdb_query.h"
42 #include "rdb_result_set_impl.h"
43 #include "relational_store_manager.h"
44 #include "reporter.h"
45 #include "snapshot/bind_event.h"
46 #include "utils/anonymous.h"
47 #include "value_proxy.h"
48 namespace OHOS::DistributedRdb {
49 using namespace DistributedData;
50 using namespace DistributedDB;
51 using namespace NativeRdb;
52 using namespace std::chrono;
53 using namespace DistributedDataDfx;
54 using DBField = DistributedDB::Field;
55 using DBTable = DistributedDB::TableSchema;
56 using DBSchema = DistributedDB::DataBaseSchema;
57 using ClearMode = DistributedDB::ClearMode;
58 using DBStatus = DistributedDB::DBStatus;
59 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
60 
61 constexpr const char *INSERT = "INSERT INTO ";
62 constexpr const char *REPLACE = "REPLACE INTO ";
63 constexpr const char *VALUES = " VALUES ";
64 constexpr const char *LOGOUT_DELETE_FLAG = "DELETE#ALL_CLOUDDATA";
65 constexpr const LockAction LOCK_ACTION =
66     static_cast<LockAction>(static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE) |
67                             static_cast<uint32_t>(LockAction::DELETE) | static_cast<uint32_t>(LockAction::DOWNLOAD));
68 constexpr uint32_t CLOUD_SYNC_FLAG = 1;
69 constexpr uint32_t SEARCHABLE_FLAG = 2;
70 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
71 static constexpr const char *FT_OPEN_STORE = "OPEN_STORE";
72 static constexpr const char *FT_CLOUD_SYNC = "CLOUD_SYNC";
73 
GetDBSchema(const Database & database)74 static DBSchema GetDBSchema(const Database &database)
75 {
76     DBSchema schema;
77     schema.tables.resize(database.tables.size());
78     for (size_t i = 0; i < database.tables.size(); i++) {
79         const Table &table = database.tables[i];
80         DBTable &dbTable = schema.tables[i];
81         dbTable.name = table.name;
82         dbTable.sharedTableName = table.sharedTableName;
83         for (auto &field : table.fields) {
84             DBField dbField;
85             dbField.colName = field.colName;
86             dbField.type = field.type;
87             dbField.primary = field.primary;
88             dbField.nullable = field.nullable;
89             dbTable.fields.push_back(std::move(dbField));
90         }
91     }
92     return schema;
93 }
94 
IsExistence(const std::string & col,const std::vector<std::string> & cols)95 static bool IsExistence(const std::string &col, const std::vector<std::string> &cols)
96 {
97     for (auto &column : cols) {
98         if (col == column) {
99             return true;
100         }
101     }
102     return false;
103 }
104 
GetGaussDistributedSchema(const Database & database)105 static DistributedSchema GetGaussDistributedSchema(const Database &database)
106 {
107     DistributedSchema distributedSchema;
108     distributedSchema.version = database.version;
109     distributedSchema.tables.resize(database.tables.size());
110     for (size_t i = 0; i < database.tables.size(); i++) {
111         const Table &table = database.tables[i];
112         DistributedTable &dbTable = distributedSchema.tables[i];
113         dbTable.tableName = table.name;
114         for (auto &field : table.fields) {
115             DistributedField dbField;
116             dbField.colName = field.colName;
117             dbField.isP2pSync = IsExistence(field.colName, table.deviceSyncFields);
118             dbField.isSpecified = field.primary;
119             dbTable.fields.push_back(std::move(dbField));
120         }
121     }
122     return distributedSchema;
123 }
124 
GetDistributedSchema(const StoreMetaData & meta)125 static std::pair<bool, Database> GetDistributedSchema(const StoreMetaData &meta)
126 {
127     std::pair<bool, Database> tableData;
128     Database database;
129     database.bundleName = meta.bundleName;
130     database.name = meta.storeId;
131     database.user = meta.user;
132     database.deviceId = meta.deviceId;
133     tableData.first = MetaDataManager::GetInstance().LoadMeta(database.GetKey(), database, true);
134     tableData.second = database;
135     return tableData;
136 }
137 
InitStoreInfo(const StoreMetaData & meta)138 void RdbGeneralStore::InitStoreInfo(const StoreMetaData &meta)
139 {
140     storeInfo_.tokenId = meta.tokenId;
141     storeInfo_.bundleName = meta.bundleName;
142     storeInfo_.storeName = meta.storeId;
143     storeInfo_.instanceId = meta.instanceId;
144     storeInfo_.user = std::atoi(meta.user.c_str());
145     storeInfo_.deviceId = DeviceManagerAdapter::GetInstance().GetLocalDevice().uuid;
146 }
147 
GetOption(const StoreMetaData & meta)148 RelationalStoreDelegate::Option GetOption(const StoreMetaData &meta)
149 {
150     RelationalStoreDelegate::Option option;
151     option.syncDualTupleMode = true;
152     if (GetDistributedSchema(meta).first) {
153         option.tableMode = DistributedTableMode::COLLABORATION;
154     }
155     return option;
156 }
157 
RdbGeneralStore(const StoreMetaData & meta)158 RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta)
159     : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared<ConcurrentMap<SyncId, FinishTask>>())
160 {
161     observer_.storeId_ = meta.storeId;
162     observer_.meta_ = meta;
163     RelationalStoreDelegate::Option option = GetOption(meta);
164     option.observer = &observer_;
165     if (meta.isEncrypt) {
166         std::string key = meta.GetSecretKey();
167         SecretKeyMetaData secretKeyMeta;
168         MetaDataManager::GetInstance().LoadMeta(key, secretKeyMeta, true);
169         std::vector<uint8_t> decryptKey;
170         CryptoManager::GetInstance().Decrypt(meta, secretKeyMeta, decryptKey);
171         option.passwd.SetValue(decryptKey.data(), decryptKey.size());
172         std::fill(decryptKey.begin(), decryptKey.end(), 0);
173         option.isEncryptedDb = meta.isEncrypt;
174         option.cipher = CipherType::AES_256_GCM;
175         for (uint32_t i = 0; i < ITERS_COUNT; ++i) {
176             option.iterateTimes = ITERS[i];
177             auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
178             if (ret == DBStatus::OK && delegate_ != nullptr) {
179                 break;
180             }
181             manager_.CloseStore(delegate_);
182             delegate_ = nullptr;
183         }
184     } else {
185         auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_);
186         if (ret != DBStatus::OK || delegate_ == nullptr) {
187             manager_.CloseStore(delegate_);
188             delegate_ = nullptr;
189         }
190     }
191     InitStoreInfo(meta);
192     if (meta.isSearchable) {
193         syncNotifyFlag_ |= SEARCHABLE_FLAG;
194     }
195     if (delegate_ != nullptr && meta.isManualClean) {
196         PragmaData data = static_cast<PragmaData>(const_cast<void *>(static_cast<const void *>(&meta.isManualClean)));
197         delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data);
198     }
199     ZLOGI("Get rdb store, tokenId:%{public}u, bundleName:%{public}s, storeName:%{public}s, user:%{public}s,"
200           "isEncrypt:%{public}d, isManualClean:%{public}d, isSearchable:%{public}d",
201           meta.tokenId, meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), meta.user.c_str(),
202           meta.isEncrypt, meta.isManualClean, meta.isSearchable);
203 }
204 
~RdbGeneralStore()205 RdbGeneralStore::~RdbGeneralStore()
206 {
207     ZLOGI("Destruct. BundleName: %{public}s. StoreName: %{public}s. user: %{public}d",
208         storeInfo_.bundleName.c_str(), Anonymous::Change(storeInfo_.storeName).c_str(), storeInfo_.user);
209     manager_.CloseStore(delegate_);
210     delegate_ = nullptr;
211     bindInfo_.loader_ = nullptr;
212     if (bindInfo_.db_ != nullptr) {
213         bindInfo_.db_->Close();
214     }
215     bindInfo_.db_ = nullptr;
216     rdbCloud_ = nullptr;
217     rdbLoader_ = nullptr;
218     RemoveTasks();
219     tasks_ = nullptr;
220     executor_ = nullptr;
221 }
222 
BindSnapshots(std::shared_ptr<std::map<std::string,std::shared_ptr<Snapshot>>> bindAssets)223 int32_t RdbGeneralStore::BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets)
224 {
225     if (snapshots_.bindAssets == nullptr) {
226         snapshots_.bindAssets = bindAssets;
227     }
228     return GenErr::E_OK;
229 }
230 
Bind(const Database & database,const std::map<uint32_t,BindInfo> & bindInfos,const CloudConfig & config)231 int32_t RdbGeneralStore::Bind(const Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
232     const CloudConfig &config)
233 {
234     if (bindInfos.empty()) {
235         return GeneralError::E_OK;
236     }
237     auto bindInfo = bindInfos.begin()->second;
238     if (bindInfo.db_ == nullptr || bindInfo.loader_ == nullptr) {
239         return GeneralError::E_INVALID_ARGS;
240     }
241 
242     if (isBound_.exchange(true)) {
243         return GeneralError::E_OK;
244     }
245 
246     BindEvent::BindEventInfo eventInfo;
247     eventInfo.tokenId = storeInfo_.tokenId;
248     eventInfo.bundleName = storeInfo_.bundleName;
249     eventInfo.storeName = storeInfo_.storeName;
250     eventInfo.user = storeInfo_.user;
251     eventInfo.instanceId = storeInfo_.instanceId;
252 
253     auto evt = std::make_unique<BindEvent>(BindEvent::BIND_SNAPSHOT, std::move(eventInfo));
254     EventCenter::GetInstance().PostEvent(std::move(evt));
255     bindInfo_ = std::move(bindInfo);
256     {
257         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
258         rdbCloud_ = std::make_shared<RdbCloud>(bindInfo_.db_, &snapshots_);
259         rdbLoader_ = std::make_shared<RdbAssetLoader>(bindInfo_.loader_, &snapshots_);
260     }
261 
262     DistributedDB::CloudSyncConfig dbConfig;
263     dbConfig.maxUploadCount = config.maxNumber;
264     dbConfig.maxUploadSize = config.maxSize;
265     dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes;
266     DBSchema schema = GetDBSchema(database);
267     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
268     if (delegate_ == nullptr) {
269         ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str());
270         return GeneralError::E_ALREADY_CLOSED;
271     }
272     delegate_->SetCloudDB(rdbCloud_);
273     delegate_->SetIAssetLoader(rdbLoader_);
274     delegate_->SetCloudDbSchema(std::move(schema));
275     delegate_->SetCloudSyncConfig(dbConfig);
276 
277     syncNotifyFlag_ |= CLOUD_SYNC_FLAG;
278     return GeneralError::E_OK;
279 }
280 
IsBound(uint32_t user)281 bool RdbGeneralStore::IsBound(uint32_t user)
282 {
283     return isBound_;
284 }
285 
Close(bool isForce)286 int32_t RdbGeneralStore::Close(bool isForce)
287 {
288     {
289         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0));
290         if (!lock) {
291             return GeneralError::E_BUSY;
292         }
293 
294         if (delegate_ == nullptr) {
295             return GeneralError::E_OK;
296         }
297         auto [dbStatus, downloadCount] = delegate_->GetDownloadingAssetsCount();
298         if (!isForce && (delegate_->GetCloudSyncTaskCount() > 0 || downloadCount > 0)) {
299             return GeneralError::E_BUSY;
300         }
301         auto status = manager_.CloseStore(delegate_);
302         if (status != DBStatus::OK) {
303             return status;
304         }
305         delegate_ = nullptr;
306     }
307     RemoveTasks();
308     bindInfo_.loader_ = nullptr;
309     if (bindInfo_.db_ != nullptr) {
310         bindInfo_.db_->Close();
311     }
312     bindInfo_.db_ = nullptr;
313     {
314         std::unique_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
315         rdbCloud_ = nullptr;
316         rdbLoader_ = nullptr;
317     }
318     return GeneralError::E_OK;
319 }
320 
Execute(const std::string & table,const std::string & sql)321 int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql)
322 {
323     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
324     if (delegate_ == nullptr) {
325         ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s",
326             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str(),
327             Anonymous::Change(sql).c_str());
328         return GeneralError::E_ERROR;
329     }
330     std::vector<DistributedDB::VBucket> changedData;
331     auto status = delegate_->ExecuteSql({ sql, {}, false }, changedData);
332     if (status != DBStatus::OK) {
333         ZLOGE("Execute failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
334               Anonymous::Change(sql).c_str(), changedData.size());
335         if (status == DBStatus::BUSY) {
336             return GeneralError::E_BUSY;
337         }
338         return GeneralError::E_ERROR;
339     }
340     return GeneralError::E_OK;
341 }
342 
SqlConcatenate(VBucket & value,std::string & strColumnSql,std::string & strRowValueSql)343 size_t RdbGeneralStore::SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql)
344 {
345     strColumnSql += " (";
346     strRowValueSql += " (";
347     auto columnSize = value.size();
348     for (auto column = value.begin(); column != value.end(); ++column) {
349         strRowValueSql += " ?,";
350         strColumnSql += " " + column->first + ",";
351     }
352     if (columnSize != 0) {
353         strColumnSql.pop_back();
354         strColumnSql += ")";
355         strRowValueSql.pop_back();
356         strRowValueSql += ")";
357     }
358     return columnSize;
359 }
360 
Insert(const std::string & table,VBuckets && values)361 int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values)
362 {
363     if (table.empty() || values.size() == 0) {
364         ZLOGE("Insert:table maybe empty:%{public}d,value size is:%{public}zu", table.empty(), values.size());
365         return GeneralError::E_INVALID_ARGS;
366     }
367 
368     std::string strColumnSql;
369     std::string strRowValueSql;
370     auto value = values.front();
371     size_t columnSize = SqlConcatenate(value, strColumnSql, strRowValueSql);
372     if (columnSize == 0) {
373         ZLOGE("Insert: columnSize error, can't be 0!");
374         return GeneralError::E_INVALID_ARGS;
375     }
376 
377     Values args;
378     std::string strValueSql;
379     for (auto &rowData : values) {
380         if (rowData.size() != columnSize) {
381             ZLOGE("Insert: VBucket size error, Each VBucket in values must be of the same length!");
382             return GeneralError::E_INVALID_ARGS;
383         }
384         for (auto column = rowData.begin(); column != rowData.end(); ++column) {
385             args.push_back(std::move(column->second));
386         }
387         strValueSql += strRowValueSql + ",";
388     }
389     strValueSql.pop_back();
390     std::string sql = INSERT + table + strColumnSql + VALUES + strValueSql;
391 
392     std::vector<DistributedDB::VBucket> changedData;
393     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
394     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
395     if (delegate_ == nullptr) {
396         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
397             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
398         return GeneralError::E_ERROR;
399     }
400     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), false }, changedData);
401     if (status != DBStatus::OK) {
402         if (IsPrintLog(status)) {
403             auto time =
404                 static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
405                 ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu times %{public}" PRIu64 ".",
406                 status, Anonymous::Change(sql).c_str(), changedData.size(), time);
407         }
408         return GeneralError::E_ERROR;
409     }
410     return GeneralError::E_OK;
411 }
412 
IsPrintLog(DBStatus status)413 bool RdbGeneralStore::IsPrintLog(DBStatus status)
414 {
415     bool isPrint = false;
416     if (status == lastError_) {
417         if (++lastErrCnt_ % PRINT_ERROR_CNT == 0) {
418             isPrint = true;
419         }
420     } else {
421         isPrint = true;
422         lastErrCnt_ = 0;
423         lastError_ = status;
424     }
425     return isPrint;
426 }
427 
428 /**
429  * This function does not support batch updates in rdb, it only supports single data updates.
430  */
Update(const std::string & table,const std::string & setSql,Values && values,const std::string & whereSql,Values && conditions)431 int32_t RdbGeneralStore::Update(const std::string &table, const std::string &setSql, Values &&values,
432     const std::string &whereSql, Values &&conditions)
433 {
434     if (table.empty()) {
435         ZLOGE("Update: table can't be empty!");
436         return GeneralError::E_INVALID_ARGS;
437     }
438     if (setSql.empty() || values.size() == 0) {
439         ZLOGE("Update: setSql and values can't be empty!");
440         return GeneralError::E_INVALID_ARGS;
441     }
442     if (whereSql.empty() || conditions.size() == 0) {
443         ZLOGE("Update: whereSql and conditions can't be empty!");
444         return GeneralError::E_INVALID_ARGS;
445     }
446 
447     std::string sqlIn = " UPDATE " + table + " SET " + setSql + " WHERE " + whereSql;
448     Values args;
449     for (auto &value : values) {
450         args.push_back(std::move(value));
451     }
452     for (auto &condition : conditions) {
453         args.push_back(std::move(condition));
454     }
455 
456     std::vector<DistributedDB::VBucket> changedData;
457     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
458     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
459     if (delegate_ == nullptr) {
460         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
461             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
462         return GeneralError::E_ERROR;
463     }
464     auto status = delegate_->ExecuteSql({ sqlIn, std::move(bindArgs), false }, changedData);
465     if (status != DBStatus::OK) {
466         ZLOGE("Failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status, Anonymous::Change(sqlIn).c_str(),
467               changedData.size());
468         return GeneralError::E_ERROR;
469     }
470     return GeneralError::E_OK;
471 }
472 
Replace(const std::string & table,VBucket && value)473 int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value)
474 {
475     if (table.empty() || value.size() == 0) {
476         return GeneralError::E_INVALID_ARGS;
477     }
478     std::string columnSql;
479     std::string valueSql;
480     SqlConcatenate(value, columnSql, valueSql);
481     std::string sql = REPLACE + table + columnSql + VALUES + valueSql;
482 
483     Values args;
484     for (auto &[k, v] : value) {
485         args.emplace_back(std::move(v));
486     }
487     std::vector<DistributedDB::VBucket> changedData;
488     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
489     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
490     if (delegate_ == nullptr) {
491         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
492             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
493         return GeneralError::E_ERROR;
494     }
495     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs) }, changedData);
496     if (status != DBStatus::OK) {
497         ZLOGE("Replace failed! ret:%{public}d, table:%{public}s, sql:%{public}s, fields:%{public}s",
498             status, Anonymous::Change(table).c_str(), Anonymous::Change(sql).c_str(), columnSql.c_str());
499         if (status == DBStatus::BUSY) {
500             return GeneralError::E_BUSY;
501         }
502         return GeneralError::E_ERROR;
503     }
504     return GeneralError::E_OK;
505 }
506 
Delete(const std::string & table,const std::string & sql,Values && args)507 int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql, Values &&args)
508 {
509     return 0;
510 }
511 
Query(const std::string & table,const std::string & sql,Values && args)512 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(__attribute__((unused))const std::string &table,
513     const std::string &sql, Values &&args)
514 {
515     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
516     if (delegate_ == nullptr) {
517         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
518         return { GeneralError::E_ALREADY_CLOSED, nullptr };
519     }
520     auto [errCode, records] = QuerySql(sql, std::move(args));
521     return { errCode, std::make_shared<CacheCursor>(std::move(records)) };
522 }
523 
Query(const std::string & table,GenQuery & query)524 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::Query(const std::string &table, GenQuery &query)
525 {
526     RdbQuery *rdbQuery = nullptr;
527     auto ret = query.QueryInterface(rdbQuery);
528     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
529         ZLOGE("not RdbQuery!");
530         return { GeneralError::E_INVALID_ARGS, nullptr };
531     }
532     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
533     if (delegate_ == nullptr) {
534         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
535             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(table).c_str());
536         return { GeneralError::E_ALREADY_CLOSED, nullptr };
537     }
538     if (rdbQuery->IsRemoteQuery()) {
539         if (rdbQuery->GetDevices().size() != 1) {
540             ZLOGE("RemoteQuery: devices size error! size:%{public}zu", rdbQuery->GetDevices().size());
541             return { GeneralError::E_ERROR, nullptr };
542         }
543         auto cursor = RemoteQuery(*rdbQuery->GetDevices().begin(), rdbQuery->GetRemoteCondition());
544         return { cursor != nullptr ? GeneralError::E_OK : GeneralError::E_ERROR, cursor};
545     }
546     return { GeneralError::E_ERROR, nullptr };
547 }
548 
MergeMigratedData(const std::string & tableName,VBuckets && values)549 int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values)
550 {
551     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
552     if (delegate_ == nullptr) {
553         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
554             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
555         return GeneralError::E_ERROR;
556     }
557 
558     auto status = delegate_->UpsertData(tableName, ValueProxy::Convert(std::move(values)));
559     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
560 }
561 
CleanTrackerData(const std::string & tableName,int64_t cursor)562 int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor)
563 {
564     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
565     if (delegate_ == nullptr) {
566         ZLOGE("Database already closed! database:%{public}s, table:%{public}s",
567               Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
568         return GeneralError::E_ERROR;
569     }
570 
571     auto status = delegate_->CleanTrackerData(tableName, cursor);
572     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
573 }
574 
DoCloudSync(const Devices & devices,const DistributedDB::Query & dbQuery,const SyncParam & syncParam,bool isPriority,DetailAsync async)575 std::pair<int32_t, int32_t> RdbGeneralStore::DoCloudSync(const Devices &devices, const DistributedDB::Query &dbQuery,
576     const SyncParam &syncParam, bool isPriority, DetailAsync async)
577 {
578     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
579     auto highMode = GetHighMode(static_cast<uint32_t>(syncParam.mode));
580     SyncId syncId = ++syncTaskId_;
581     auto callback = GetDBProcessCB(std::move(async), syncMode, syncId, highMode);
582     if (executor_ != nullptr && tasks_ != nullptr) {
583         auto id = executor_->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
584         tasks_->Insert(syncId, { id, callback });
585     }
586     CloudSyncOption option;
587     option.devices = devices;
588     option.mode = DistributedDB::SyncMode(syncMode);
589     option.query = dbQuery;
590     option.waitTime = syncParam.wait;
591     option.priorityTask = isPriority || highMode == MANUAL_SYNC_MODE;
592     option.priorityLevel = GetPriorityLevel(highMode);
593     option.compensatedSyncOnly = syncParam.isCompensation;
594     option.merge = highMode == AUTO_SYNC_MODE;
595     option.lockAction = LOCK_ACTION;
596     option.prepareTraceId = syncParam.prepareTraceId;
597     option.asyncDownloadAssets = syncParam.asyncDownloadAsset;
598     auto dbStatus = DistributedDB::INVALID_ARGS;
599     dbStatus = delegate_->Sync(option, tasks_ != nullptr ? GetCB(syncId) : callback, syncId);
600     if (dbStatus == DBStatus::OK || tasks_ == nullptr) {
601         return { ConvertStatus(dbStatus), dbStatus };
602     }
603     Report(FT_CLOUD_SYNC, static_cast<int32_t>(Fault::CSF_GS_CLOUD_SYNC),
604         "Cloud sync ret=" + std::to_string(static_cast<int32_t>(dbStatus)));
605     tasks_->ComputeIfPresent(syncId, [executor = executor_](SyncId syncId, const FinishTask &task) {
606         if (executor != nullptr) {
607             executor->Remove(task.taskId);
608         }
609         return false;
610     });
611     return { ConvertStatus(dbStatus), dbStatus };
612 }
613 
Sync(const Devices & devices,GenQuery & query,DetailAsync async,const SyncParam & syncParam)614 std::pair<int32_t, int32_t> RdbGeneralStore::Sync(const Devices &devices, GenQuery &query, DetailAsync async,
615     const SyncParam &syncParam)
616 {
617     DistributedDB::Query dbQuery;
618     RdbQuery *rdbQuery = nullptr;
619     bool isPriority = false;
620     auto ret = query.QueryInterface(rdbQuery);
621     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
622         dbQuery.FromTable(GetIntersection(query.GetTables(), GetTables()));
623     } else {
624         dbQuery = rdbQuery->GetQuery();
625         isPriority = rdbQuery->IsPriority();
626     }
627     auto syncMode = GeneralStore::GetSyncMode(syncParam.mode);
628     auto dbMode = DistributedDB::SyncMode(syncMode);
629     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
630     if (delegate_ == nullptr) {
631         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
632               "wait:%{public}d", devices.size(),
633               devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), syncParam.mode, syncParam.wait);
634         return { GeneralError::E_ALREADY_CLOSED, DBStatus::OK };
635     }
636     auto dbStatus = DistributedDB::INVALID_ARGS;
637     if (syncMode < NEARBY_END) {
638         dbStatus = delegate_->Sync(devices, dbMode, dbQuery, GetDBBriefCB(std::move(async)), syncParam.wait != 0);
639     } else if (syncMode > NEARBY_END && syncMode < CLOUD_END) {
640         return DoCloudSync(devices, dbQuery, syncParam, isPriority, async);
641     }
642     return { ConvertStatus(dbStatus), dbStatus };
643 }
644 
PreSharing(GenQuery & query)645 std::pair<int32_t, std::shared_ptr<Cursor>> RdbGeneralStore::PreSharing(GenQuery &query)
646 {
647     RdbQuery *rdbQuery = nullptr;
648     auto ret = query.QueryInterface(rdbQuery);
649     if (ret != GeneralError::E_OK || rdbQuery == nullptr) {
650         ZLOGE("not RdbQuery!");
651         return { GeneralError::E_INVALID_ARGS, nullptr };
652     }
653     auto tables = rdbQuery->GetTables();
654     auto statement = rdbQuery->GetStatement();
655     if (statement.empty() || tables.empty()) {
656         ZLOGE("statement size:%{public}zu, tables size:%{public}zu", statement.size(), tables.size());
657         return { GeneralError::E_INVALID_ARGS, nullptr };
658     }
659     std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns());
660     VBuckets values;
661     {
662         std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
663         if (delegate_ == nullptr) {
664             ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
665             return { GeneralError::E_ALREADY_CLOSED, nullptr };
666         }
667         auto [errCode, ret] = QuerySql(sql, rdbQuery->GetBindArgs());
668         values = std::move(ret);
669     }
670     auto rdbCloud = GetRdbCloud();
671     if (rdbCloud == nullptr || values.empty()) {
672         ZLOGW("rdbCloud is %{public}s, values size:%{public}zu", rdbCloud == nullptr ? "nullptr" : "not nullptr",
673             values.size());
674         return { GeneralError::E_CLOUD_DISABLED, nullptr };
675     }
676     VBuckets extends = ExtractExtend(values);
677     rdbCloud->PreSharing(*tables.begin(), extends);
678     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
679          ++value, ++extend) {
680         value->insert_or_assign(DistributedRdb::Field::SHARING_RESOURCE_FIELD, (*extend)[SchemaMeta::SHARING_RESOURCE]);
681         value->erase(CLOUD_GID);
682     }
683     return { GeneralError::E_OK, std::make_shared<CacheCursor>(std::move(values)) };
684 }
685 
ExtractExtend(VBuckets & values) const686 VBuckets RdbGeneralStore::ExtractExtend(VBuckets &values) const
687 {
688     VBuckets extends(values.size());
689     for (auto value = values.begin(), extend = extends.begin(); value != values.end() && extend != extends.end();
690          ++value, ++extend) {
691         auto it = value->find(CLOUD_GID);
692         if (it == value->end()) {
693             continue;
694         }
695         auto gid = std::get_if<std::string>(&(it->second));
696         if (gid == nullptr || gid->empty()) {
697             continue;
698         }
699         extend->insert_or_assign(SchemaMeta::GID_FIELD, std::move(*gid));
700     }
701     return extends;
702 }
703 
BuildSql(const std::string & table,const std::string & statement,const std::vector<std::string> & columns) const704 std::string RdbGeneralStore::BuildSql(
705     const std::string &table, const std::string &statement, const std::vector<std::string> &columns) const
706 {
707     std::string sql = "select ";
708     sql.append(CLOUD_GID);
709     std::string sqlNode = "select rowid";
710     for (auto &column : columns) {
711         sql.append(", ").append(column);
712         sqlNode.append(", ").append(column);
713     }
714     sqlNode.append(" from ").append(table).append(statement);
715     auto logTable = RelationalStoreManager::GetDistributedLogTableName(table);
716     sql.append(" from ").append(logTable).append(", (").append(sqlNode);
717     sql.append(") where ").append(DATE_KEY).append(" = rowid");
718     return sql;
719 }
720 
Clean(const std::vector<std::string> & devices,int32_t mode,const std::string & tableName)721 int32_t RdbGeneralStore::Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName)
722 {
723     if (mode < 0 || mode > CLEAN_MODE_BUTT) {
724         return GeneralError::E_INVALID_ARGS;
725     }
726     DBStatus status = DistributedDB::DB_ERROR;
727     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
728     if (delegate_ == nullptr) {
729         ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, "
730               "tableName:%{public}s",
731             devices.size(), devices.empty() ? "null" : Anonymous::Change(*devices.begin()).c_str(), mode,
732             Anonymous::Change(tableName).c_str());
733         return GeneralError::E_ALREADY_CLOSED;
734     }
735     switch (mode) {
736         case CLOUD_INFO:
737             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_INFO));
738             if (status == DistributedDB::OK) {
739                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
740                 break;
741             }
742             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
743             break;
744         case CLOUD_DATA:
745             status = delegate_->RemoveDeviceData("", static_cast<ClearMode>(CLOUD_DATA));
746             if (status == DistributedDB::OK) {
747                 status = delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
748                 break;
749             }
750             (void)delegate_->RemoveDeviceData("", ClearMode::CLEAR_SHARED_TABLE);
751             break;
752         case NEARBY_DATA:
753             if (devices.empty()) {
754                 status = delegate_->RemoveDeviceData();
755                 break;
756             }
757             for (auto device : devices) {
758                 status = delegate_->RemoveDeviceData(device, tableName);
759             }
760             break;
761         default:
762             return GeneralError::E_ERROR;
763     }
764     return status == DistributedDB::OK ? GeneralError::E_OK : GeneralError::E_ERROR;
765 }
766 
Watch(int32_t origin,Watcher & watcher)767 int32_t RdbGeneralStore::Watch(int32_t origin, Watcher &watcher)
768 {
769     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != nullptr) {
770         return GeneralError::E_INVALID_ARGS;
771     }
772 
773     observer_.watcher_ = &watcher;
774     return GeneralError::E_OK;
775 }
776 
Unwatch(int32_t origin,Watcher & watcher)777 int32_t RdbGeneralStore::Unwatch(int32_t origin, Watcher &watcher)
778 {
779     if (origin != Watcher::Origin::ORIGIN_ALL || observer_.watcher_ != &watcher) {
780         return GeneralError::E_INVALID_ARGS;
781     }
782 
783     observer_.watcher_ = nullptr;
784     return GeneralError::E_OK;
785 }
786 
GetDBBriefCB(DetailAsync async)787 RdbGeneralStore::DBBriefCB RdbGeneralStore::GetDBBriefCB(DetailAsync async)
788 {
789     if (!async) {
790         return [](auto &) {};
791     }
792     return [async = std::move(async)](
793         const std::map<std::string, std::vector<TableStatus>> &result) {
794         DistributedData::GenDetails details;
795         for (auto &[key, tables] : result) {
796             auto &value = details[key];
797             value.progress = FINISHED;
798             value.code = GeneralError::E_OK;
799             for (auto &table : tables) {
800                 if (table.status != DBStatus::OK) {
801                     value.code = GeneralError::E_ERROR;
802                 }
803             }
804         }
805         async(details);
806     };
807 }
808 
GetDBProcessCB(DetailAsync async,uint32_t syncMode,SyncId syncId,uint32_t highMode)809 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
810     uint32_t highMode)
811 {
812     std::shared_lock<std::shared_mutex> lock(asyncMutex_);
813     return [async, autoAsync = async_, highMode, storeInfo = storeInfo_, flag = syncNotifyFlag_, syncMode, syncId,
814         rdbCloud = GetRdbCloud()](const std::map<std::string, SyncProcess> &processes) {
815         DistributedData::GenDetails details;
816         for (auto &[id, process] : processes) {
817             bool isDownload = false;
818             auto &detail = details[id];
819             detail.progress = process.process;
820             detail.code = ConvertStatus(process.errCode);
821             detail.dbCode = process.errCode;
822             uint32_t totalCount = 0;
823             for (auto [key, value] : process.tableProcess) {
824                 auto &table = detail.details[key];
825                 table.upload.total = value.upLoadInfo.total;
826                 table.upload.success = value.upLoadInfo.successCount;
827                 table.upload.failed = value.upLoadInfo.failCount;
828                 table.upload.untreated = table.upload.total - table.upload.success - table.upload.failed;
829                 totalCount += table.upload.total;
830                 isDownload = table.download.total > 0;
831                 table.download.total = value.downLoadInfo.total;
832                 table.download.success = value.downLoadInfo.successCount;
833                 table.download.failed = value.downLoadInfo.failCount;
834                 table.download.untreated = table.download.total - table.download.success - table.download.failed;
835                 detail.changeCount = (process.process == FINISHED)
836                                         ? value.downLoadInfo.insertCount + value.downLoadInfo.updateCount +
837                                               value.downLoadInfo.deleteCount
838                                         : 0;
839                 totalCount += table.download.total;
840             }
841             if (process.process == FINISHED) {
842                 RdbGeneralStore::OnSyncFinish(storeInfo, flag, syncMode, syncId);
843             } else {
844                 RdbGeneralStore::OnSyncStart(storeInfo, flag, syncMode, syncId, totalCount);
845             }
846 
847             if (isDownload && (process.process == FINISHED || process.process == PROCESSING) && rdbCloud != nullptr &&
848                 (rdbCloud->GetLockFlag() & RdbCloud::FLAG::APPLICATION)) {
849                 rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
850             }
851         }
852         if (async) {
853             async(details);
854         }
855 
856         if (highMode == AUTO_SYNC_MODE && autoAsync
857             && (details.empty() || details.begin()->second.code != E_SYNC_TASK_MERGED)) {
858             autoAsync(details);
859         }
860     };
861 }
862 
Release()863 int32_t RdbGeneralStore::Release()
864 {
865     auto ref = 1;
866     {
867         std::lock_guard<decltype(mutex_)> lock(mutex_);
868         if (ref_ == 0) {
869             return 0;
870         }
871         ref = --ref_;
872     }
873     ZLOGD("ref:%{public}d", ref);
874     if (ref == 0) {
875         delete this;
876     }
877     return ref;
878 }
879 
AddRef()880 int32_t RdbGeneralStore::AddRef()
881 {
882     std::lock_guard<decltype(mutex_)> lock(mutex_);
883     if (ref_ == 0) {
884         return 0;
885     }
886     return ++ref_;
887 }
888 
Report(const std::string & faultType,int32_t errCode,const std::string & appendix)889 void RdbGeneralStore::Report(const std::string &faultType, int32_t errCode, const std::string &appendix)
890 {
891     ArkDataFaultMsg msg = { .faultType = faultType,
892         .bundleName = storeInfo_.bundleName,
893         .moduleName = ModuleName::RDB_STORE,
894         .storeName = storeInfo_.storeName,
895         .errorType = errCode + GeneralStore::CLOUD_ERR_OFFSET,
896         .appendixMsg = appendix };
897     Reporter::GetInstance()->CloudSyncFault()->Report(msg);
898 }
899 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const std::vector<Reference> & references)900 int32_t RdbGeneralStore::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
901     const std::vector<Reference> &references)
902 {
903     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
904     if (delegate_ == nullptr) {
905         ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d",
906             Anonymous::Change(storeInfo_.storeName).c_str(), tables.size(), type);
907         return GeneralError::E_ALREADY_CLOSED;
908     }
909     for (const auto &table : tables) {
910         ZLOGD("tableName:%{public}s, type:%{public}d", Anonymous::Change(table).c_str(), type);
911         auto dBStatus = delegate_->CreateDistributedTable(table, static_cast<DistributedDB::TableSyncType>(type));
912         if (dBStatus != DistributedDB::DBStatus::OK) {
913             ZLOGE("create distributed table failed, table:%{public}s, err:%{public}d",
914                 Anonymous::Change(table).c_str(), dBStatus);
915             Report(FT_OPEN_STORE, static_cast<int32_t>(Fault::CSF_GS_CREATE_DISTRIBUTED_TABLE),
916                 "SetDistributedTables ret=" + std::to_string(static_cast<int32_t>(dBStatus)));
917             return GeneralError::E_ERROR;
918         }
919     }
920     std::vector<DistributedDB::TableReferenceProperty> properties;
921     for (const auto &reference : references) {
922         properties.push_back({ reference.sourceTable, reference.targetTable, reference.refFields });
923     }
924     auto status = delegate_->SetReference(properties);
925     if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::PROPERTY_CHANGED) {
926         ZLOGE("distributed table set reference failed, err:%{public}d", status);
927         return GeneralError::E_ERROR;
928     }
929     auto [exist, database] = GetDistributedSchema(observer_.meta_);
930     if (exist) {
931         delegate_->SetDistributedSchema(GetGaussDistributedSchema(database));
932     }
933     return GeneralError::E_OK;
934 }
935 
SetConfig(const StoreConfig & storeConfig)936 void RdbGeneralStore::SetConfig(const StoreConfig &storeConfig)
937 {
938     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
939      if (delegate_ == nullptr) {
940         ZLOGE("database already closed!, tableMode is :%{public}d",
941               storeConfig.tableMode.has_value() ? static_cast<int32_t>(storeConfig.tableMode.value()) : -1);
942     }
943     if (storeConfig.tableMode.has_value()) {
944         RelationalStoreDelegate::StoreConfig config;
945         if (storeConfig.tableMode == DistributedTableMode::COLLABORATION) {
946             config.tableMode = DistributedDB::DistributedTableMode::COLLABORATION;
947         } else if (storeConfig.tableMode == DistributedTableMode::SPLIT_BY_DEVICE) {
948             config.tableMode = DistributedDB::DistributedTableMode::SPLIT_BY_DEVICE;
949         }
950         delegate_->SetStoreConfig(config);
951     }
952 }
953 
SetTrackerTable(const std::string & tableName,const std::set<std::string> & trackerColNames,const std::set<std::string> & extendColNames,bool isForceUpgrade)954 int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set<std::string> &trackerColNames,
955     const std::set<std::string> &extendColNames, bool isForceUpgrade)
956 {
957     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
958     if (delegate_ == nullptr) {
959         ZLOGE("database already closed! database:%{public}s, tables name:%{public}s",
960             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
961         return GeneralError::E_ALREADY_CLOSED;
962     }
963     auto status = delegate_->SetTrackerTable({ tableName, extendColNames, trackerColNames, isForceUpgrade });
964     if (status == DBStatus::WITH_INVENTORY_DATA) {
965         ZLOGI("Set tracker table with inventory data, database:%{public}s, tables name:%{public}s",
966             Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
967         return GeneralError::E_WITH_INVENTORY_DATA;
968     }
969     if (status != DBStatus::OK) {
970         ZLOGE("Set tracker table failed! ret:%{public}d, database:%{public}s, tables name:%{public}s",
971             status, Anonymous::Change(storeInfo_.storeName).c_str(), Anonymous::Change(tableName).c_str());
972         return GeneralError::E_ERROR;
973     }
974     return GeneralError::E_OK;
975 }
976 
RemoteQuery(const std::string & device,const DistributedDB::RemoteCondition & remoteCondition)977 std::shared_ptr<Cursor> RdbGeneralStore::RemoteQuery(const std::string &device,
978     const DistributedDB::RemoteCondition &remoteCondition)
979 {
980     std::shared_ptr<DistributedDB::ResultSet> dbResultSet;
981     DistributedDB::DBStatus status =
982         delegate_->RemoteQuery(device, remoteCondition, REMOTE_QUERY_TIME_OUT, dbResultSet);
983     if (status != DistributedDB::DBStatus::OK) {
984         ZLOGE("DistributedDB remote query failed, device:%{public}s, status is  %{public}d.",
985             Anonymous::Change(device).c_str(), status);
986         return nullptr;
987     }
988     return std::make_shared<RdbCursor>(dbResultSet);
989 }
990 
ConvertStatus(DistributedDB::DBStatus status)991 RdbGeneralStore::GenErr RdbGeneralStore::ConvertStatus(DistributedDB::DBStatus status)
992 {
993     switch (status) {
994         case DBStatus::OK:
995             return GenErr::E_OK;
996         case DBStatus::CLOUD_NETWORK_ERROR:
997             return GenErr::E_NETWORK_ERROR;
998         case DBStatus::CLOUD_LOCK_ERROR:
999             return GenErr::E_LOCKED_BY_OTHERS;
1000         case DBStatus::CLOUD_FULL_RECORDS:
1001             return GenErr::E_RECODE_LIMIT_EXCEEDED;
1002         case DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT:
1003             return GenErr::E_NO_SPACE_FOR_ASSET;
1004         case DBStatus::BUSY:
1005             return GenErr::E_BUSY;
1006         case DBStatus::CLOUD_SYNC_TASK_MERGED:
1007             return GenErr::E_SYNC_TASK_MERGED;
1008         case DBStatus::CLOUD_DISABLED:
1009             return GeneralError::E_CLOUD_DISABLED;
1010         default:
1011             ZLOGI("status:0x%{public}x", status);
1012             break;
1013     }
1014     return GenErr::E_ERROR;
1015 }
1016 
IsValid()1017 bool RdbGeneralStore::IsValid()
1018 {
1019     return delegate_ != nullptr;
1020 }
1021 
RegisterDetailProgressObserver(GeneralStore::DetailAsync async)1022 int32_t RdbGeneralStore::RegisterDetailProgressObserver(GeneralStore::DetailAsync async)
1023 {
1024     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1025     async_ = std::move(async);
1026     return GenErr::E_OK;
1027 }
1028 
UnregisterDetailProgressObserver()1029 int32_t RdbGeneralStore::UnregisterDetailProgressObserver()
1030 {
1031     std::unique_lock<std::shared_mutex> lock(asyncMutex_);
1032     async_ = nullptr;
1033     return GenErr::E_OK;
1034 }
1035 
QuerySql(const std::string & sql,Values && args)1036 std::pair<int32_t, VBuckets> RdbGeneralStore::QuerySql(const std::string &sql, Values &&args)
1037 {
1038     std::vector<DistributedDB::VBucket> changedData;
1039     std::vector<DistributedDB::Type> bindArgs = ValueProxy::Convert(std::move(args));
1040     auto status = delegate_->ExecuteSql({ sql, std::move(bindArgs), true }, changedData);
1041     if (status != DBStatus::OK) {
1042         ZLOGE("Query failed! ret:%{public}d, sql:%{public}s, data size:%{public}zu", status,
1043             Anonymous::Change(sql).c_str(), changedData.size());
1044         if (status == DBStatus::BUSY) {
1045             return { GenErr::E_BUSY, {} };
1046         }
1047         return { GenErr::E_ERROR, {} };
1048     }
1049     return { GenErr::E_OK, ValueProxy::Convert(std::move(changedData)) };
1050 }
1051 
OnSyncStart(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId,uint32_t syncCount)1052 void RdbGeneralStore::OnSyncStart(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId,
1053     uint32_t syncCount)
1054 {
1055     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1056     if (requiredFlag != (requiredFlag & flag)) {
1057         return;
1058     }
1059     StoreInfo info = storeInfo;
1060     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::START,
1061         traceId, syncCount);
1062     EventCenter::GetInstance().PostEvent(std::move(evt));
1063 }
1064 
OnSyncFinish(const StoreInfo & storeInfo,uint32_t flag,uint32_t syncMode,uint32_t traceId)1065 void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, uint32_t traceId)
1066 {
1067     uint32_t requiredFlag = (CLOUD_SYNC_FLAG | SEARCHABLE_FLAG);
1068     if (requiredFlag != (requiredFlag & flag)) {
1069         return;
1070     }
1071     StoreInfo info = storeInfo;
1072     auto evt = std::make_unique<DataSyncEvent>(std::move(info), syncMode, DataSyncEvent::DataSyncStatus::FINISH,
1073         traceId);
1074     EventCenter::GetInstance().PostEvent(std::move(evt));
1075 }
1076 
GetTables()1077 std::set<std::string> RdbGeneralStore::GetTables()
1078 {
1079     std::set<std::string> tables;
1080     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1081     if (delegate_ == nullptr) {
1082         ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1083         return tables;
1084     }
1085     auto [errCode, res] = QuerySql(QUERY_TABLES_SQL, {});
1086     if (errCode != GenErr::E_OK) {
1087         return tables;
1088     }
1089     for (auto &table : res) {
1090         auto it = table.find("name");
1091         if (it == table.end() || TYPE_INDEX<std::string> != it->second.index()) {
1092             ZLOGW("error res! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1093             continue;
1094         }
1095         tables.emplace(std::move(*std::get_if<std::string>(&(it->second))));
1096     }
1097     return tables;
1098 }
1099 
GetIntersection(std::vector<std::string> && syncTables,const std::set<std::string> & localTables)1100 std::vector<std::string> RdbGeneralStore::GetIntersection(std::vector<std::string> &&syncTables,
1101     const std::set<std::string> &localTables)
1102 {
1103     std::vector<std::string> res;
1104     for (auto &it : syncTables) {
1105         if (localTables.count(it) &&
1106             localTables.count(RelationalStoreManager::GetDistributedLogTableName(it))) {
1107             res.push_back(std::move(it));
1108         }
1109     }
1110     return res;
1111 }
1112 
PostDataChange(const StoreMetaData & meta,const std::vector<std::string> & tables,ChangeType type)1113 void RdbGeneralStore::ObserverProxy::PostDataChange(const StoreMetaData &meta,
1114     const std::vector<std::string> &tables, ChangeType type)
1115 {
1116     RemoteChangeEvent::DataInfo info;
1117     info.userId = meta.user;
1118     info.storeId = meta.storeId;
1119     info.deviceId = meta.deviceId;
1120     info.bundleName = meta.bundleName;
1121     info.tables = tables;
1122     info.changeType = type;
1123     auto evt = std::make_unique<RemoteChangeEvent>(RemoteChangeEvent::DATA_CHANGE, std::move(info));
1124     EventCenter::GetInstance().PostEvent(std::move(evt));
1125 }
1126 
OnChange(const DBChangedIF & data)1127 void RdbGeneralStore::ObserverProxy::OnChange(const DBChangedIF &data)
1128 {
1129     if (!HasWatcher()) {
1130         return;
1131     }
1132     std::string device = data.GetDataChangeDevice();
1133     auto networkId = DmAdapter::GetInstance().ToNetworkID(device);
1134     ZLOGD("store:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1135         Anonymous::Change(device).c_str());
1136     GenOrigin genOrigin;
1137     genOrigin.origin = GenOrigin::ORIGIN_NEARBY;
1138     genOrigin.dataType = GenOrigin::BASIC_DATA;
1139     DistributedDB::StoreProperty property;
1140     data.GetStoreProperty(property);
1141     genOrigin.id.push_back(networkId);
1142     genOrigin.store = storeId_;
1143     GeneralWatcher::ChangeInfo changeInfo{};
1144     watcher_->OnChange(genOrigin, {}, std::move(changeInfo));
1145     return;
1146 }
1147 
OnChange(DBOrigin origin,const std::string & originalId,DBChangedData && data)1148 void RdbGeneralStore::ObserverProxy::OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data)
1149 {
1150     if (!HasWatcher()) {
1151         return;
1152     }
1153     ZLOGD("store:%{public}s table:%{public}s data change from :%{public}s", Anonymous::Change(storeId_).c_str(),
1154         Anonymous::Change(data.tableName).c_str(), Anonymous::Change(originalId).c_str());
1155     GenOrigin genOrigin;
1156     genOrigin.origin = (origin == DBOrigin::ORIGIN_LOCAL)
1157                            ? GenOrigin::ORIGIN_LOCAL
1158                            : (origin == DBOrigin::ORIGIN_CLOUD) ? GenOrigin::ORIGIN_CLOUD : GenOrigin::ORIGIN_NEARBY;
1159     genOrigin.dataType = data.type == DistributedDB::ASSET ? GenOrigin::ASSET_DATA : GenOrigin::BASIC_DATA;
1160     genOrigin.id.push_back(originalId);
1161     genOrigin.store = storeId_;
1162     Watcher::PRIFields fields;
1163     Watcher::ChangeInfo changeInfo;
1164     bool notifyFlag = false;
1165     for (uint32_t i = 0; i < DistributedDB::OP_BUTT; ++i) {
1166         auto &info = changeInfo[data.tableName][i];
1167         for (auto &priData : data.primaryData[i]) {
1168             Watcher::PRIValue value;
1169             if (priData.empty()) {
1170                 ZLOGW("priData is empty, store:%{public}s table:%{public}s data change from :%{public}s, i=%{public}d",
1171                     Anonymous::Change(storeId_).c_str(), Anonymous::Change(data.tableName).c_str(),
1172                     Anonymous::Change(originalId).c_str(), i);
1173                 continue;
1174             }
1175             Convert(std::move(*(priData.begin())), value);
1176             if (notifyFlag || origin != DBOrigin::ORIGIN_CLOUD || i != DistributedDB::OP_DELETE) {
1177                 info.push_back(std::move(value));
1178                 continue;
1179             }
1180             auto deleteKey = std::get_if<std::string>(&value);
1181             if (deleteKey != nullptr && (*deleteKey == LOGOUT_DELETE_FLAG)) {
1182                 // notify to start app
1183                 notifyFlag = true;
1184             }
1185             info.push_back(std::move(value));
1186         }
1187     }
1188     if (notifyFlag) {
1189         ZLOGI("post data change for cleaning cloud data");
1190         PostDataChange(meta_, {}, CLOUD_DATA_CLEAN);
1191     }
1192     if (!data.field.empty()) {
1193         fields[std::move(data.tableName)] = std::move(*(data.field.begin()));
1194     }
1195     watcher_->OnChange(genOrigin, fields, std::move(changeInfo));
1196 }
1197 
LockCloudDB()1198 std::pair<int32_t, uint32_t> RdbGeneralStore::LockCloudDB()
1199 {
1200     auto rdbCloud = GetRdbCloud();
1201     if (rdbCloud == nullptr) {
1202         return { GeneralError::E_ERROR, 0 };
1203     }
1204     return rdbCloud->LockCloudDB(RdbCloud::FLAG::APPLICATION);
1205 }
1206 
UnLockCloudDB()1207 int32_t RdbGeneralStore::UnLockCloudDB()
1208 {
1209     auto rdbCloud = GetRdbCloud();
1210     if (rdbCloud == nullptr) {
1211         return GeneralError::E_ERROR;
1212     }
1213     return rdbCloud->UnLockCloudDB(RdbCloud::FLAG::APPLICATION);
1214 }
1215 
GetRdbCloud() const1216 std::shared_ptr<RdbCloud> RdbGeneralStore::GetRdbCloud() const
1217 {
1218     std::shared_lock<decltype(rdbCloudMutex_)> lock(rdbCloudMutex_);
1219     return rdbCloud_;
1220 }
1221 
IsFinished(SyncId syncId) const1222 bool RdbGeneralStore::IsFinished(SyncId syncId) const
1223 {
1224     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1225     if (delegate_ == nullptr) {
1226         ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str());
1227         return true;
1228     }
1229     return delegate_->GetCloudTaskStatus(syncId).process == DistributedDB::FINISHED;
1230 }
1231 
GetFinishTask(SyncId syncId)1232 Executor::Task RdbGeneralStore::GetFinishTask(SyncId syncId)
1233 {
1234     return [this, executor = executor_, task = tasks_, syncId]() {
1235         auto [exist, finishTask] = task->Find(syncId);
1236         if (!exist || finishTask.cb == nullptr) {
1237             task->Erase(syncId);
1238             return;
1239         }
1240         if (!IsFinished(syncId)) {
1241             task->ComputeIfPresent(syncId, [executor = executor_, this](SyncId syncId, FinishTask &task) {
1242                 task.taskId = executor->Schedule(std::chrono::minutes(INTERVAL), GetFinishTask(syncId));
1243                 return true;
1244             });
1245             return;
1246         }
1247         DBProcessCB cb;
1248         task->ComputeIfPresent(syncId, [&cb, executor = executor_](SyncId syncId, const FinishTask &task) {
1249             cb = task.cb;
1250             return false;
1251         });
1252         if (cb != nullptr) {
1253             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ",
1254                   Anonymous::Change(storeInfo_.storeName).c_str(), syncId);
1255             std::map<std::string, SyncProcess> result;
1256             result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1257             cb(result);
1258         }
1259     };
1260 }
1261 
SetExecutor(std::shared_ptr<Executor> executor)1262 void RdbGeneralStore::SetExecutor(std::shared_ptr<Executor> executor)
1263 {
1264     if (executor_ == nullptr) {
1265         executor_ = executor;
1266     }
1267 }
1268 
RemoveTasks()1269 void RdbGeneralStore::RemoveTasks()
1270 {
1271     if (tasks_ == nullptr) {
1272         return;
1273     }
1274     std::list<DBProcessCB> cbs;
1275     std::list<TaskId> taskIds;
1276     tasks_->EraseIf([&cbs, &taskIds, store = storeInfo_.storeName](SyncId syncId, const FinishTask &task) {
1277         if (task.cb != nullptr) {
1278             ZLOGW("database:%{public}s syncId:%{public}" PRIu64 " miss finished. ", Anonymous::Change(store).c_str(),
1279                   syncId);
1280         }
1281         cbs.push_back(std::move(task.cb));
1282         taskIds.push_back(task.taskId);
1283         return true;
1284     });
1285     auto func = [](const std::list<DBProcessCB> &cbs) {
1286         std::map<std::string, SyncProcess> result;
1287         result.insert({ "", { DistributedDB::FINISHED, DBStatus::DB_ERROR } });
1288         for (auto &cb : cbs) {
1289             if (cb != nullptr) {
1290                 cb(result);
1291             }
1292         }
1293     };
1294     if (executor_ != nullptr) {
1295         for (auto taskId: taskIds) {
1296             executor_->Remove(taskId, true);
1297         }
1298         executor_->Execute([cbs, func]() {
1299             func(cbs);
1300         });
1301     } else {
1302         func(cbs);
1303     }
1304 }
1305 
GetCB(SyncId syncId)1306 RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId)
1307 {
1308     return [task = tasks_, executor = executor_, syncId](const std::map<std::string, SyncProcess> &progress) {
1309         if (task == nullptr) {
1310             return;
1311         }
1312         DBProcessCB cb;
1313         task->ComputeIfPresent(syncId, [&cb, &progress, executor](SyncId syncId, FinishTask &finishTask) {
1314             cb = finishTask.cb;
1315             bool isFinished = !progress.empty() && progress.begin()->second.process == DistributedDB::FINISHED;
1316             if (isFinished) {
1317                 finishTask.cb = nullptr;
1318             }
1319             return true;
1320         });
1321         if (cb != nullptr) {
1322             cb(progress);
1323         }
1324         return;
1325     };
1326 }
1327 } // namespace OHOS::DistributedRdb