• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17 
18 #include <dlfcn.h>
19 #include <sys/stat.h>
20 #include <unistd.h>
21 
22 #include <algorithm>
23 #include <chrono>
24 #include <cinttypes>
25 #include <cstdint>
26 #include <fstream>
27 #include <memory>
28 #include <mutex>
29 #include <sstream>
30 #include <sys/stat.h>
31 #include <string>
32 
33 #include "cache_result_set.h"
34 #include "connection_pool.h"
35 #include "delay_notify.h"
36 #include "directory_ex.h"
37 #include "knowledge_schema_helper.h"
38 #include "logger.h"
39 #include "rdb_common.h"
40 #include "rdb_errno.h"
41 #include "rdb_fault_hiview_reporter.h"
42 #include "rdb_local_db_observer.h"
43 #include "rdb_perfStat.h"
44 #include "rdb_radar_reporter.h"
45 #include "rdb_stat_reporter.h"
46 #include "rdb_security_manager.h"
47 #include "rdb_sql_log.h"
48 #include "rdb_sql_utils.h"
49 #include "rdb_sql_statistic.h"
50 #include "rdb_store.h"
51 #include "rdb_trace.h"
52 #include "rdb_types.h"
53 #include "relational_store_client.h"
54 #include "sqlite_global_config.h"
55 #include "sqlite_sql_builder.h"
56 #include "sqlite_utils.h"
57 #include "step_result_set.h"
58 #include "suspender.h"
59 #include "task_executor.h"
60 #include "traits.h"
61 #include "transaction.h"
62 #include "values_buckets.h"
63 #if !defined(CROSS_PLATFORM)
64 #include "raw_data_parser.h"
65 #include "rdb_manager_impl.h"
66 #include "relational_store_manager.h"
67 #include "security_policy.h"
68 #include "sqlite_shared_result_set.h"
69 #endif
70 
71 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
72 #include "security_policy.h"
73 #endif
74 #ifdef WINDOWS_PLATFORM
75 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
76 #else
77 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
78 #endif
79 #include "rdb_time_utils.h"
80 
81 namespace OHOS::NativeRdb {
82 using namespace OHOS::Rdb;
83 using namespace std::chrono;
84 using SqlStatistic = DistributedRdb::SqlStatistic;
85 using PerfStat = DistributedRdb::PerfStat;
86 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
87 using Reportor = RdbFaultHiViewReporter;
88 
89 #if !defined(CROSS_PLATFORM)
90 using RdbMgr = DistributedRdb::RdbManagerImpl;
91 #endif
92 
93 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
94 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
95 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
96 static constexpr const char *BACKUP_RESTORE = "backup.restore";
97 static constexpr const char *ASYNC_RESTORE = "-async.restore";
98 constexpr int64_t TIME_OUT = 1500;
99 
InitSyncerParam(const RdbStoreConfig & config,bool created)100 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
101 {
102     syncerParam_.bundleName_ = config.GetBundleName();
103     syncerParam_.hapName_ = config.GetModuleName();
104     syncerParam_.storeName_ = config.GetName();
105     syncerParam_.customDir_ = config.GetCustomDir();
106     syncerParam_.area_ = config.GetArea();
107     syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
108     syncerParam_.type_ = config.GetDistributedType();
109     syncerParam_.isEncrypt_ = config.IsEncrypt();
110     syncerParam_.isAutoClean_ = config.GetAutoClean();
111     syncerParam_.isSearchable_ = config.IsSearchable();
112     syncerParam_.password_ = config.GetEncryptKey();
113     syncerParam_.haMode_ = config.GetHaMode();
114     syncerParam_.roleType_ = config.GetRoleType();
115     syncerParam_.tokenIds_ = config.GetPromiseInfo().tokenIds_;
116     syncerParam_.uids_ = config.GetPromiseInfo().uids_;
117     syncerParam_.user_ = config.GetPromiseInfo().user_;
118     syncerParam_.permissionNames_ = config.GetPromiseInfo().permissionNames_;
119     syncerParam_.subUser_ = config.GetSubUser();
120     syncerParam_.dfxInfo_.lastOpenTime_ = RdbTimeUtils::GetCurSysTimeWithMs();
121     if (created) {
122         syncerParam_.infos_ = Connection::Collect(config);
123     }
124 }
125 
InnerOpen()126 int RdbStoreImpl::InnerOpen()
127 {
128     isOpen_ = true;
129 #if !defined(CROSS_PLATFORM)
130     // Only owner mode can store metadata information.
131     if (isReadOnly_ || isMemoryRdb_ || config_.IsCustomEncryptParam() || (config_.GetRoleType() != OWNER)) {
132         return E_OK;
133     }
134     if (config_.GetEnableSemanticIndex()) {
135         SetKnowledgeSchema();
136     }
137     AfterOpen(syncerParam_);
138     if (config_.GetDBType() == DB_VECTOR || (!config_.IsSearchable() && !config_.GetEnableSemanticIndex())) {
139         return E_OK;
140     }
141     int errCode = RegisterDataChangeCallback();
142     if (errCode != E_OK) {
143         LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
144     }
145 #endif
146     return E_OK;
147 }
148 
InitReportFunc(const RdbParam & param)149 void RdbStoreImpl::InitReportFunc(const RdbParam &param)
150 {
151 #if !defined(CROSS_PLATFORM)
152     reportFunc_ = std::make_shared<ReportFunc>([reportParam = param](const DistributedRdb::RdbStatEvent &event) {
153         auto [err, service] = RdbMgr::GetInstance().GetRdbService(reportParam);
154         if (err != E_OK || service == nullptr) {
155             LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
156                 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
157             return;
158         }
159         err = service->ReportStatistic(reportParam, event);
160         if (err != E_OK) {
161             LOG_ERROR("ReportStatistic failed, err: %{public}d, storeName: %{public}s.", err,
162                 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
163         }
164         return;
165     });
166 #endif
167 }
168 
Close()169 void RdbStoreImpl::Close()
170 {
171     {
172         std::unique_lock<decltype(poolMutex_)> lock(poolMutex_);
173         if (connectionPool_) {
174             connectionPool_->CloseAllConnections();
175             connectionPool_.reset();
176         }
177     }
178     {
179         std::lock_guard<decltype(mutex_)> guard(mutex_);
180         for (auto &it : transactions_) {
181             auto trans = it.lock();
182             if (trans != nullptr) {
183                 trans->Close();
184             }
185         }
186         transactions_ = {};
187     }
188     {
189         std::lock_guard<decltype(helperMutex_)> autoLock(helperMutex_);
190         if (knowledgeSchemaHelper_ != nullptr) {
191             knowledgeSchemaHelper_->Close();
192         }
193     }
194 }
195 
GetPool() const196 std::shared_ptr<ConnectionPool> RdbStoreImpl::GetPool() const
197 {
198     std::shared_lock<decltype(poolMutex_)> lock(poolMutex_);
199     return connectionPool_;
200 }
201 
GetConn(bool isRead)202 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::GetConn(bool isRead)
203 {
204     auto pool = GetPool();
205     if (pool == nullptr) {
206         return { E_ALREADY_CLOSED, nullptr };
207     }
208 
209     auto connection = pool->AcquireConnection(isRead);
210     if (connection == nullptr) {
211         return { E_DATABASE_BUSY, nullptr };
212     }
213     return { E_OK, connection };
214 }
215 
216 #if !defined(CROSS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)217 void RdbStoreImpl::AfterOpen(const RdbParam &param, int32_t retry)
218 {
219     auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
220     if (err == E_NOT_SUPPORT) {
221         return;
222     }
223     if (err != E_OK || service == nullptr) {
224         if (err != E_INVALID_ARGS) {
225             LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
226                 SqliteUtils::Anonymous(param.storeName_).c_str());
227         }
228         auto pool = TaskExecutor::GetInstance().GetExecutor();
229         if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry < MAX_RETRY_TIMES) {
230             retry++;
231             pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
232                 AfterOpen(param, retry);
233             });
234         }
235         return;
236     }
237     err = service->AfterOpen(param);
238     if (err != E_OK) {
239         LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
240             SqliteUtils::Anonymous(param.storeName_).c_str());
241     }
242 }
243 
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)244 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(
245     const std::string &table, const std::string &columnName, std::vector<PRIKey> &keys)
246 {
247     if (table.empty() || columnName.empty() || keys.empty()) {
248         LOG_ERROR("Invalid para.");
249         return {};
250     }
251 
252     auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
253     if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
254         return GetModifyTimeByRowId(logTable, keys);
255     }
256     std::vector<ValueObject> hashKeys;
257     hashKeys.reserve(keys.size());
258     std::map<std::vector<uint8_t>, PRIKey> keyMap;
259     std::map<std::string, DistributedDB::Type> tmp;
260     for (const auto &key : keys) {
261         DistributedDB::Type value;
262         RawDataParser::Convert(key, value);
263         tmp[columnName] = value;
264         auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
265         if (hashKey.empty()) {
266             LOG_DEBUG("Hash key fail.");
267             continue;
268         }
269         hashKeys.emplace_back(ValueObject(hashKey));
270         keyMap[hashKey] = key;
271     }
272 
273     std::string sql;
274     sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
275     sql.append(logTable);
276     sql.append(" where hash_key in (");
277     sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
278     sql.append(")");
279     auto resultSet = QueryByStep(sql, hashKeys, true);
280     int count = 0;
281     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
282         LOG_ERROR("Get resultSet err.");
283         return {};
284     }
285     return { resultSet, keyMap, false };
286 }
287 
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)288 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
289 {
290     std::string sql;
291     sql.append("select data_key as key, timestamp/10000 as modify_time from ");
292     sql.append(logTable);
293     sql.append(" where data_key in (");
294     sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
295     sql.append(")");
296     std::vector<ValueObject> args;
297     args.reserve(keys.size());
298     for (auto &key : keys) {
299         ValueObject::Type value;
300         RawDataParser::Convert(key, value);
301         args.emplace_back(ValueObject(value));
302     }
303     auto resultSet = QueryByStep(sql, args, true);
304     int count = 0;
305     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
306         LOG_ERROR("Get resultSet err.");
307         return {};
308     }
309     return ModifyTime(resultSet, {}, true);
310 }
311 
CleanDirtyData(const std::string & table,uint64_t cursor)312 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
313 {
314     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || isMemoryRdb_) {
315         LOG_ERROR("Not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d, isMemoryRdb:%{public}d.",
316             SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType(), isMemoryRdb_);
317         return E_NOT_SUPPORT;
318     }
319     auto [errCode, conn] = GetConn(false);
320     if (errCode != E_OK) {
321         LOG_ERROR("The database is busy or closed.");
322         return errCode;
323     }
324     errCode = conn->CleanDirtyData(table, cursor);
325     return errCode;
326 }
327 
GetLogTableName(const std::string & tableName)328 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
329 {
330     return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
331 }
332 
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)333 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(
334     const AbsRdbPredicates &predicates, const Fields &columns)
335 {
336     if (config_.GetDBType() == DB_VECTOR) {
337         return { E_NOT_SUPPORT, nullptr };
338     }
339     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
340     if (errCode != E_OK) {
341         return { errCode, nullptr };
342     }
343     auto [status, resultSet] =
344         service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
345     if (status != E_OK) {
346         return { status, nullptr };
347     }
348     return { status, resultSet };
349 }
350 
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)351 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(
352     const std::string &device, const AbsRdbPredicates &predicates, const Fields &columns, int &errCode)
353 {
354     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
355     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
356         return nullptr;
357     }
358     std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
359     std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
360     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
361     if (err == E_NOT_SUPPORT) {
362         errCode = err;
363         return nullptr;
364     }
365     if (err != E_OK) {
366         LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed.");
367         errCode = err;
368         return nullptr;
369     }
370     auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
371     errCode = status;
372     return resultSet;
373 }
374 
NotifyDataChange()375 void RdbStoreImpl::NotifyDataChange()
376 {
377     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
378         return;
379     }
380     config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, false);
381     int errCode = RegisterDataChangeCallback();
382     if (errCode != E_OK) {
383         LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
384     }
385     DistributedRdb::RdbChangedData rdbChangedData;
386     if (delayNotifier_ != nullptr) {
387         delayNotifier_->UpdateNotify(rdbChangedData, true);
388     }
389 }
390 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)391 int RdbStoreImpl::SetDistributedTables(
392     const std::vector<std::string> &tables, int32_t type, const DistributedRdb::DistributedConfig &distributedConfig)
393 {
394     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
395     if (config_.GetDBType() == DB_VECTOR || isReadOnly_ || isMemoryRdb_) {
396         return E_NOT_SUPPORT;
397     }
398     if (tables.empty()) {
399         LOG_WARN("The distributed tables to be set is empty.");
400         return E_OK;
401     }
402     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
403     if (errCode != E_OK) {
404         return errCode;
405     }
406     syncerParam_.asyncDownloadAsset_ = distributedConfig.asyncDownloadAsset;
407     syncerParam_.enableCloud_ = distributedConfig.enableCloud;
408     int32_t errorCode = service->SetDistributedTables(
409         syncerParam_, tables, distributedConfig.references, distributedConfig.isRebuild, type);
410     if (errorCode != E_OK) {
411         LOG_ERROR("Fail to set distributed tables, error=%{public}d.", errorCode);
412         return errorCode;
413     }
414     if (type == DistributedRdb::DISTRIBUTED_DEVICE && !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
415         RegisterDataChangeCallback();
416     }
417     if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
418         return E_OK;
419     }
420 
421     return HandleCloudSyncAfterSetDistributedTables(tables, distributedConfig);
422 }
423 
Rekey(const RdbStoreConfig::CryptoParam & cryptoParam)424 int32_t RdbStoreImpl::Rekey(const RdbStoreConfig::CryptoParam &cryptoParam)
425 {
426     if (config_.GetDBType() == DB_VECTOR || isReadOnly_ || isMemoryRdb_) {
427         return E_NOT_SUPPORT;
428     }
429     if (!cryptoParam.IsValid()) {
430         LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config_.GetName()).c_str());
431         return E_INVALID_ARGS_NEW;
432     }
433     if (!config_.IsEncrypt() || !config_.GetCryptoParam().Equal(cryptoParam) ||
434         (config_.IsCustomEncryptParam() == cryptoParam.encryptKey_.empty())) {
435         LOG_ERROR("Not supported! name:%{public}s, [%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}u]"
436             "->[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}u]",
437             SqliteUtils::Anonymous(config_.GetName()).c_str(), config_.GetCryptoParam().encryptKey_.empty(),
438             config_.GetCryptoParam().iterNum, config_.GetCryptoParam().encryptAlgo, config_.GetCryptoParam().hmacAlgo,
439             config_.GetCryptoParam().kdfAlgo, config_.GetCryptoParam().cryptoPageSize, cryptoParam.encryptKey_.empty(),
440             cryptoParam.iterNum, cryptoParam.encryptAlgo, cryptoParam.hmacAlgo,
441             cryptoParam.kdfAlgo, cryptoParam.cryptoPageSize);
442         return E_NOT_SUPPORT;
443     }
444 
445     auto pool = GetPool();
446     if (pool == nullptr) {
447         LOG_ERROR("Database already closed.");
448         return E_ALREADY_CLOSED;
449     }
450 
451 #if !defined(CROSS_PLATFORM)
452     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
453     if (service != nullptr) {
454         service->Disable(syncerParam_);
455     }
456 #endif
457     LOG_INFO("Start rekey, name:%{public}s, IsCustomEncrypt:%{public}d. ",
458         SqliteUtils::Anonymous(config_.GetName()).c_str(), config_.IsCustomEncryptParam());
459     auto errCode = pool->Rekey(cryptoParam);
460 #if !defined(CROSS_PLATFORM)
461     if (service != nullptr) {
462         service->Enable(syncerParam_);
463         if (errCode == E_OK && !config_.IsCustomEncryptParam()) {
464             auto syncerParam = syncerParam_;
465             syncerParam.password_ = config_.GetEncryptKey();
466             service->AfterOpen(syncerParam);
467         }
468     }
469 #endif
470     return errCode;
471 }
472 
HandleCloudSyncAfterSetDistributedTables(const std::vector<std::string> & tables,const DistributedRdb::DistributedConfig & distributedConfig)473 int RdbStoreImpl::HandleCloudSyncAfterSetDistributedTables(
474     const std::vector<std::string> &tables, const DistributedRdb::DistributedConfig &distributedConfig)
475 {
476     auto pool = GetPool();
477     if (pool == nullptr) {
478         return E_ALREADY_CLOSED;
479     }
480     auto conn = pool->AcquireConnection(false);
481     if (conn != nullptr) {
482         auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
483         if (strategy == ExchangeStrategy::BACKUP) {
484             (void)conn->Backup({}, {}, false, slaveStatus_);
485         }
486     }
487     {
488         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
489         if (distributedConfig.enableCloud && distributedConfig.autoSync) {
490             cloudInfo_->AddTables(tables);
491         } else {
492             cloudInfo_->RmvTables(tables);
493             return E_OK;
494         }
495     }
496     auto isRebuilt = RebuiltType::NONE;
497     GetRebuilt(isRebuilt);
498     if (isRebuilt == RebuiltType::REBUILT) {
499         DoCloudSync("");
500     }
501     return E_OK;
502 }
503 
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)504 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
505 {
506     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
507     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
508         return "";
509     }
510     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
511     if (err != E_OK || service == nullptr) {
512         errCode = err;
513         return "";
514     }
515     auto tableName = service->ObtainDistributedTableName(syncerParam_, device, table);
516     errCode = tableName.empty() ? E_ERROR : E_OK;
517     return tableName;
518 }
519 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)520 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
521 {
522     if (config_.GetDBType() == DB_VECTOR) {
523         return E_NOT_SUPPORT;
524     }
525     return Sync(option, predicate, [callback](Details &&details) {
526         Briefs briefs;
527         for (auto &[key, value] : details) {
528             briefs.insert_or_assign(key, value.code);
529         }
530         if (callback != nullptr) {
531             callback(briefs);
532         }
533     });
534 }
535 
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)536 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
537 {
538     return Sync(option, AbsRdbPredicates(tables), async);
539 }
540 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)541 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
542 {
543     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
544     if (isMemoryRdb_) {
545         return E_NOT_SUPPORT;
546     }
547     DistributedRdb::RdbService::Option rdbOption;
548     rdbOption.mode = option.mode;
549     rdbOption.isAsync = !option.isBlock;
550     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
551     ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
552     return ret;
553 }
554 
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)555 int RdbStoreImpl::InnerSync(
556     const RdbParam &param, const Options &option, const Memo &predicates, const AsyncDetail &async)
557 {
558     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
559     if (errCode == E_NOT_SUPPORT) {
560         return errCode;
561     }
562     if (errCode != E_OK) {
563         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
564             param.bundleName_.c_str());
565         return errCode;
566     }
567     errCode = service->Sync(param, option, predicates, async);
568     if (errCode != E_OK) {
569         LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
570         return errCode;
571     }
572     return E_OK;
573 }
574 
GetUri(const std::string & event)575 std::string RdbStoreImpl::GetUri(const std::string &event)
576 {
577     std::string rdbUri;
578     if (config_.GetDataGroupId().empty()) {
579         rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
580     } else {
581         rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
582         Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_GROUPID_INFO, config_.GetBundleName(),
583             "GetUri GroupId db:[" + SqliteUtils::Anonymous(name_) + "]"));
584     }
585     return rdbUri;
586 }
587 
SubscribeLocal(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)588 int RdbStoreImpl::SubscribeLocal(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
589 {
590     std::lock_guard<std::mutex> lock(mutex_);
591     localObservers_.try_emplace(option.event);
592     auto &list = localObservers_.find(option.event)->second;
593     for (auto it = list.begin(); it != list.end(); it++) {
594         if ((*it)->getObserver() == observer) {
595             LOG_ERROR("Duplicate subscribe.");
596             return E_OK;
597         }
598     }
599 
600     localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
601     return E_OK;
602 }
603 
SubscribeLocalShared(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)604 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
605 {
606     return obsManger_.Register(GetUri(option.event), observer);
607 }
608 
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)609 int32_t RdbStoreImpl::SubscribeLocalDetail(
610     const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
611 {
612     if (observer == nullptr) {
613         return E_OK;
614     }
615     std::lock_guard<std::mutex> lock(mutex_);
616     for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end(); it++) {
617         if ((*it)->GetObserver() == observer) {
618             LOG_WARN("duplicate subscribe.");
619             return E_OK;
620         }
621     }
622     auto localStoreObserver = std::make_shared<RdbStoreLocalDbObserver>(observer);
623     auto [errCode, conn] = GetConn(false);
624     if (conn == nullptr) {
625         return errCode;
626     }
627     errCode = conn->Subscribe(localStoreObserver);
628     if (errCode != E_OK) {
629         LOG_ERROR("Subscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
630             SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
631         return errCode;
632     }
633     config_.SetRegisterInfo(RegisterType::STORE_OBSERVER, true);
634     localDetailObservers_.emplace_back(localStoreObserver);
635     return E_OK;
636 }
637 
SubscribeRemote(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)638 int RdbStoreImpl::SubscribeRemote(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
639 {
640     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
641     if (errCode != E_OK) {
642         return errCode;
643     }
644     return service->Subscribe(syncerParam_, option, observer);
645 }
646 
Subscribe(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)647 int RdbStoreImpl::Subscribe(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
648 {
649     if (config_.GetDBType() == DB_VECTOR) {
650         return E_NOT_SUPPORT;
651     }
652     if (option.mode == SubscribeMode::LOCAL) {
653         return SubscribeLocal(option, observer);
654     }
655     if (isMemoryRdb_) {
656         return E_NOT_SUPPORT;
657     }
658     if (option.mode == SubscribeMode::LOCAL_SHARED) {
659         return SubscribeLocalShared(option, observer);
660     }
661     return SubscribeRemote(option, observer);
662 }
663 
UnSubscribeLocal(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)664 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
665 {
666     std::lock_guard<std::mutex> lock(mutex_);
667     auto obs = localObservers_.find(option.event);
668     if (obs == localObservers_.end()) {
669         return E_OK;
670     }
671 
672     auto &list = obs->second;
673     for (auto it = list.begin(); it != list.end();) {
674         if (observer == nullptr || (*it)->getObserver() == observer) {
675             it = list.erase(it);
676             if (observer != nullptr) {
677                 break;
678             }
679         } else {
680             it++;
681         }
682     }
683 
684     if (list.empty()) {
685         localObservers_.erase(option.event);
686     }
687     return E_OK;
688 }
689 
UnSubscribeLocalShared(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)690 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
691 {
692     return obsManger_.Unregister(GetUri(option.event), observer);
693 }
694 
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)695 int32_t RdbStoreImpl::UnsubscribeLocalDetail(
696     const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
697 {
698     auto [errCode, conn] = GetConn(false);
699     if (conn == nullptr) {
700         return errCode;
701     }
702     std::lock_guard<std::mutex> lock(mutex_);
703     for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end();) {
704         if (observer == nullptr || (*it)->GetObserver() == observer) {
705             int32_t err = conn->Unsubscribe(*it);
706             if (err != 0) {
707                 LOG_ERROR("Unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
708                     SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
709                 return err;
710             }
711             it = localDetailObservers_.erase(it);
712             if (observer != nullptr) {
713                 break;
714             }
715         } else {
716             it++;
717         }
718     }
719     return E_OK;
720 }
721 
UnSubscribeRemote(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)722 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
723 {
724     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
725     if (errCode != E_OK) {
726         return errCode;
727     }
728     return service->UnSubscribe(syncerParam_, option, observer);
729 }
730 
UnSubscribe(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)731 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
732 {
733     if (config_.GetDBType() == DB_VECTOR) {
734         return E_NOT_SUPPORT;
735     }
736     if (option.mode == SubscribeMode::LOCAL) {
737         return UnSubscribeLocal(option, observer);
738     }
739     if (isMemoryRdb_) {
740         return E_NOT_SUPPORT;
741     }
742     if (option.mode == SubscribeMode::LOCAL_SHARED) {
743         return UnSubscribeLocalShared(option, observer);
744     }
745     return UnSubscribeRemote(option, observer);
746 }
747 
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)748 int RdbStoreImpl::SubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
749 {
750     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
751         return E_NOT_SUPPORT;
752     }
753     return SubscribeLocalDetail(option, observer);
754 }
755 
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)756 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
757 {
758     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
759         return E_NOT_SUPPORT;
760     }
761     return UnsubscribeLocalDetail(option, observer);
762 }
763 
Notify(const std::string & event)764 int RdbStoreImpl::Notify(const std::string &event)
765 {
766     if (config_.GetDBType() == DB_VECTOR) {
767         return E_NOT_SUPPORT;
768     }
769 
770     {
771         std::lock_guard<std::mutex> lock(mutex_);
772         auto obs = localObservers_.find(event);
773         if (obs != localObservers_.end()) {
774             auto &list = obs->second;
775             for (auto &it : list) {
776                 it->OnChange();
777             }
778         }
779     }
780     if (isMemoryRdb_) {
781         return E_OK;
782     }
783     int32_t err = obsManger_.Notify(GetUri(event));
784     if (err != 0) {
785         LOG_ERROR("Notify failed.");
786     }
787     return E_OK;
788 }
789 
SetSearchable(bool isSearchable)790 int RdbStoreImpl::SetSearchable(bool isSearchable)
791 {
792     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
793         return E_NOT_SUPPORT;
794     }
795     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
796     if (errCode != E_OK || service == nullptr) {
797         LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
798         return errCode;
799     }
800     return service->SetSearchable(syncerParam_, isSearchable);
801 }
802 
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)803 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
804 {
805     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
806         return E_NOT_SUPPORT;
807     }
808     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
809     if (errCode != E_OK) {
810         return errCode;
811     }
812     return service->RegisterAutoSyncCallback(syncerParam_, observer);
813 }
814 
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)815 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
816 {
817     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
818         return E_NOT_SUPPORT;
819     }
820     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
821     if (errCode != E_OK) {
822         return errCode;
823     }
824     return service->UnregisterAutoSyncCallback(syncerParam_, observer);
825 }
826 
InitDelayNotifier()827 void RdbStoreImpl::InitDelayNotifier()
828 {
829     if (delayNotifier_ != nullptr) {
830         return;
831     }
832     delayNotifier_ = std::make_shared<DelayNotify>();
833     if (delayNotifier_ == nullptr) {
834         LOG_ERROR("Init delay notifier failed.");
835         return;
836     }
837     std::weak_ptr<NativeRdb::KnowledgeSchemaHelper> helper = GetKnowledgeSchemaHelper();
838     delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
839     delayNotifier_->SetTask([param = syncerParam_, helper = helper](
840         const DistributedRdb::RdbChangedData &rdbChangedData, const RdbNotifyConfig &rdbNotifyConfig) -> int {
841         if (IsKnowledgeDataChange(rdbChangedData)) {
842             auto realHelper = helper.lock();
843             if (realHelper == nullptr) {
844                 LOG_WARN("knowledge helper is nullptr");
845             } else {
846                 realHelper->DonateKnowledgeData();
847             }
848         }
849         if (!IsNotifyService(rdbChangedData)) {
850             return E_OK;
851         }
852         auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
853         if (errCode == E_NOT_SUPPORT) {
854             return errCode;
855         }
856         if (errCode != E_OK || service == nullptr) {
857             LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
858             return errCode;
859         }
860         return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
861     });
862 }
863 
RegisterDataChangeCallback()864 int RdbStoreImpl::RegisterDataChangeCallback()
865 {
866     InitDelayNotifier();
867     auto connPool = GetPool();
868     if (connPool == nullptr) {
869         return E_ALREADY_CLOSED;
870     }
871 
872     RegisterDataChangeCallback(delayNotifier_, connPool, 0);
873     config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, true);
874     return E_OK;
875 }
876 
RegisterDataChangeCallback(std::shared_ptr<DelayNotify> delayNotifier,std::weak_ptr<ConnectionPool> connPool,int retry)877 void RdbStoreImpl::RegisterDataChangeCallback(
878     std::shared_ptr<DelayNotify> delayNotifier, std::weak_ptr<ConnectionPool> connPool, int retry)
879 {
880     auto relConnPool = connPool.lock();
881     if (relConnPool == nullptr) {
882         return;
883     }
884     auto conn = relConnPool->AcquireConnection(false);
885     if (conn == nullptr) {
886         relConnPool->Dump(true, "DATACHANGE");
887         auto pool = TaskExecutor::GetInstance().GetExecutor();
888         if (pool != nullptr && retry < MAX_RETRY_TIMES) {
889             retry++;
890             pool->Schedule(std::chrono::seconds(1),
891                 [delayNotifier, connPool, retry]() { RegisterDataChangeCallback(delayNotifier, connPool, retry); });
892         }
893         return;
894     }
895     auto callBack = [delayNotifier](const DistributedRdb::RdbChangedData &rdbChangedData) {
896         if (delayNotifier != nullptr) {
897             delayNotifier->UpdateNotify(rdbChangedData);
898         }
899     };
900     auto errCode = conn->SubscribeTableChanges(callBack);
901     if (errCode != E_OK) {
902         return;
903     }
904 }
905 
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)906 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
907 {
908     std::string table = predicates.GetTableName();
909     if (table.empty()) {
910         return E_EMPTY_TABLE_NAME;
911     }
912     auto logTable = GetLogTableName(table);
913     std::string sql;
914     sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
915     sql.append(" INNER JOIN ").append(table).append(" ON ");
916     sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
917     auto whereClause = predicates.GetWhereClause();
918     if (!whereClause.empty()) {
919         sql.append(" WHERE ").append(SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + "."));
920     }
921 
922     auto result = QuerySql(sql, predicates.GetBindArgs());
923     if (result == nullptr) {
924         return E_ALREADY_CLOSED;
925     }
926     int count = 0;
927     if (result->GetRowCount(count) != E_OK) {
928         return E_NO_ROW_IN_QUERY;
929     }
930 
931     if (count <= 0) {
932         return E_NO_ROW_IN_QUERY;
933     }
934     while (result->GoToNextRow() == E_OK) {
935         std::vector<uint8_t> hashKey;
936         if (result->GetBlob(0, hashKey) != E_OK) {
937             return E_ERROR;
938         }
939         hashKeys.push_back(std::move(hashKey));
940     }
941     return E_OK;
942 }
943 
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)944 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
945 {
946     if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
947         return E_NOT_SUPPORT;
948     }
949     std::vector<std::vector<uint8_t>> hashKeys;
950     int ret = GetHashKeyForLockRow(predicates, hashKeys);
951     if (ret != E_OK) {
952         LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
953         return ret;
954     }
955     Suspender suspender(Suspender::SQL_LOG);
956     auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
957     if (statement == nullptr || err != E_OK) {
958         return err;
959     }
960     int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
961     if (errCode == E_WAIT_COMPENSATED_SYNC) {
962         LOG_DEBUG("Start compensation sync.");
963         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
964         auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
965         InnerSync(syncerParam_, option, memo, nullptr);
966         return E_OK;
967     }
968     if (errCode != E_OK) {
969         LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
970     }
971     return errCode;
972 }
973 
LockCloudContainer()974 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
975 {
976     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
977     if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
978         return { E_NOT_SUPPORT, 0 };
979     }
980     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
981     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
982     if (errCode == E_NOT_SUPPORT) {
983         LOG_ERROR("not support");
984         return { errCode, 0 };
985     }
986     if (errCode != E_OK) {
987         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
988             syncerParam_.bundleName_.c_str());
989         return { errCode, 0 };
990     }
991     auto result = service->LockCloudContainer(syncerParam_);
992     if (result.first != E_OK) {
993         LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
994     }
995     return result;
996 }
997 
UnlockCloudContainer()998 int32_t RdbStoreImpl::UnlockCloudContainer()
999 {
1000     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1001     if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
1002         return E_NOT_SUPPORT;
1003     }
1004     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
1005     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
1006     if (errCode == E_NOT_SUPPORT) {
1007         LOG_ERROR("not support");
1008         return errCode;
1009     }
1010     if (errCode != E_OK) {
1011         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
1012             syncerParam_.bundleName_.c_str());
1013         return errCode;
1014     }
1015     errCode = service->UnlockCloudContainer(syncerParam_);
1016     if (errCode != E_OK) {
1017         LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
1018     }
1019     return errCode;
1020 }
1021 #endif
1022 
RdbStoreImpl(const RdbStoreConfig & config)1023 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
1024     : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
1025       fileType_(config.GetDatabaseFileType())
1026 {
1027     SqliteGlobalConfig::GetDbPath(config_, path_);
1028     isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
1029 }
1030 
ProcessOpenCallback(int version,RdbOpenCallback & openCallback)1031 int32_t RdbStoreImpl::ProcessOpenCallback(int version, RdbOpenCallback &openCallback)
1032 {
1033     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1034     int32_t errCode = E_OK;
1035     if (version == -1) {
1036         return errCode;
1037     }
1038 
1039     int32_t currentVersion;
1040     errCode = GetVersion(currentVersion);
1041     if (errCode != E_OK) {
1042         return errCode;
1043     }
1044 
1045     if (version == currentVersion) {
1046         return openCallback.OnOpen(*this);
1047     }
1048 
1049     if (currentVersion == 0) {
1050         errCode = openCallback.OnCreate(*this);
1051     } else if (version > currentVersion) {
1052         errCode = openCallback.OnUpgrade(*this, currentVersion, version);
1053     } else {
1054         errCode = openCallback.OnDowngrade(*this, currentVersion, version);
1055     }
1056 
1057     if (errCode == E_OK) {
1058         errCode = SetVersion(version);
1059     }
1060 
1061     if (errCode != E_OK) {
1062         LOG_ERROR("openCallback failed. version: %{public}d -> %{public}d, errCode:%{public}d",
1063             currentVersion, version, errCode);
1064         return errCode;
1065     }
1066 
1067     return openCallback.OnOpen(*this);
1068 }
1069 
TryAsyncRepair()1070 bool RdbStoreImpl::TryAsyncRepair()
1071 {
1072     std::string slavePath = SqliteUtils::GetSlavePath(path_);
1073     if (!IsUseAsyncRestore(path_, slavePath)) {
1074         return false;
1075     }
1076 
1077     int errCode = Connection::CheckReplicaIntegrity(config_);
1078     if (errCode != E_OK) {
1079         return false;
1080     }
1081 
1082     SqliteUtils::DeleteDirtyFiles(path_);
1083     auto pool = ConnectionPool::Create(config_, errCode);
1084     if (errCode != E_OK) {
1085         LOG_WARN("create new connection failed");
1086         return false;
1087     }
1088     connectionPool_ = pool;
1089     errCode = StartAsyncRestore(pool);
1090     if (errCode != E_OK) {
1091         return false;
1092     }
1093     rebuild_ = RebuiltType::REPAIRED;
1094 
1095     Reportor::ReportRestore(Reportor::Create(config_, E_OK, "RestoreType:Rebuild", false), false);
1096     return true;
1097 }
CreatePool(bool & created)1098 int32_t RdbStoreImpl::CreatePool(bool &created)
1099 {
1100     int32_t errCode = E_OK;
1101     connectionPool_ = ConnectionPool::Create(config_, errCode);
1102     if (connectionPool_ == nullptr && (errCode == E_SQLITE_CORRUPT || errCode == E_INVALID_SECRET_KEY) &&
1103         !isReadOnly_) {
1104         LOG_ERROR("database corrupt, errCode:0x%{public}x, %{public}s, %{public}s", errCode,
1105             SqliteUtils::Anonymous(name_).c_str(),
1106             SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config_), "master").c_str());
1107 #if !defined(CROSS_PLATFORM)
1108         InitSyncerParam(config_, false);
1109         auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
1110         if (service != nullptr) {
1111             service->Disable(syncerParam_);
1112         }
1113 #endif
1114         config_.SetIter(0);
1115         if (config_.IsEncrypt() && config_.GetAllowRebuild()) {
1116             auto key = config_.GetEncryptKey();
1117             RdbSecurityManager::GetInstance().RestoreKeyFile(path_, key);
1118             key.assign(key.size(), 0);
1119         }
1120 
1121         if (TryAsyncRepair()) {
1122             errCode = E_OK;
1123         } else {
1124             std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
1125         }
1126         created = true;
1127 #if !defined(CROSS_PLATFORM)
1128         if (service != nullptr) {
1129             service->Enable(syncerParam_);
1130         }
1131 #endif
1132     }
1133     return errCode;
1134 }
1135 
SetSecurityLabel(const RdbStoreConfig & config)1136 int32_t RdbStoreImpl::SetSecurityLabel(const RdbStoreConfig &config)
1137 {
1138     if (config.IsMemoryRdb()) {
1139         return E_OK;
1140     }
1141 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
1142     return SecurityPolicy::SetSecurityLabel(config);
1143 #endif
1144     return E_OK;
1145 }
1146 
Init(int version,RdbOpenCallback & openCallback)1147 int32_t RdbStoreImpl::Init(int version, RdbOpenCallback &openCallback)
1148 {
1149     if (initStatus_ != -1) {
1150         return initStatus_;
1151     }
1152     std::lock_guard<std::mutex> lock(initMutex_);
1153     if (initStatus_ != -1) {
1154         return initStatus_;
1155     }
1156     int32_t errCode = E_OK;
1157     bool created = access(path_.c_str(), F_OK) != 0;
1158     errCode = CreatePool(created);
1159     if (connectionPool_ == nullptr || errCode != E_OK) {
1160         connectionPool_ = nullptr;
1161         LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
1162             SqliteUtils::Anonymous(path_).c_str());
1163         return errCode;
1164     }
1165     InitSyncerParam(config_, created);
1166     InitReportFunc(syncerParam_);
1167     InnerOpen();
1168 
1169     if (config_.GetRoleType() == OWNER && !config_.IsReadOnly()) {
1170         errCode = SetSecurityLabel(config_);
1171         if (errCode != E_OK) {
1172             return errCode;
1173         }
1174         (void) ExchangeSlaverToMaster();
1175         SwitchOver(true);
1176         errCode = ProcessOpenCallback(version, openCallback);
1177         SwitchOver(false);
1178         if (errCode != E_OK) {
1179             LOG_ERROR("Callback fail, path:%{public}s code:%{public}d", SqliteUtils::Anonymous(path_).c_str(), errCode);
1180             return errCode;
1181         }
1182     }
1183     initStatus_ = errCode;
1184     return initStatus_;
1185 }
1186 
~RdbStoreImpl()1187 RdbStoreImpl::~RdbStoreImpl()
1188 {
1189     connectionPool_ = nullptr;
1190     trxConnMap_ = {};
1191     for (auto &trans : transactions_) {
1192         auto realTrans = trans.lock();
1193         if (realTrans) {
1194             (void)realTrans->Close();
1195         }
1196     }
1197     transactions_ = {};
1198     if (knowledgeSchemaHelper_ != nullptr) {
1199         knowledgeSchemaHelper_->Close();
1200     }
1201     *slaveStatus_ = SlaveStatus::DB_CLOSING;
1202 }
1203 
GetConfig()1204 const RdbStoreConfig &RdbStoreImpl::GetConfig()
1205 {
1206     return config_;
1207 }
1208 
Insert(const std::string & table,const Row & row,Resolution resolution)1209 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
1210 {
1211     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1212     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1213         return { E_NOT_SUPPORT, -1 };
1214     }
1215     RdbStatReporter reportStat(RDB_PERF, INSERT, config_, reportFunc_);
1216     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1217     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1218     auto [status, sqlInfo] = RdbSqlUtils::GetInsertSqlInfo(table, row, resolution);
1219     if (status != E_OK) {
1220         return { status, -1 };
1221     }
1222 
1223     int64_t rowid = -1;
1224     auto errCode = ExecuteForLastInsertedRowId(rowid, sqlInfo.sql, sqlInfo.args);
1225     if (errCode == E_OK) {
1226         DoCloudSync(table);
1227     }
1228 
1229     return { errCode, rowid };
1230 }
1231 
BatchInsert(const std::string & table,const ValuesBuckets & rows)1232 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
1233 {
1234     if (isReadOnly_) {
1235         return { E_NOT_SUPPORT, -1 };
1236     }
1237 
1238     if (rows.RowSize() == 0) {
1239         return { E_OK, 0 };
1240     }
1241 
1242     RdbStatReporter reportStat(RDB_PERF, BATCHINSERT, config_, reportFunc_);
1243     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1244     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL, 0, rows.RowSize());
1245     auto pool = GetPool();
1246     if (pool == nullptr) {
1247         return { E_ALREADY_CLOSED, -1 };
1248     }
1249     auto conn = pool->AcquireConnection(false);
1250     if (conn == nullptr) {
1251         return { E_DATABASE_BUSY, -1 };
1252     }
1253 
1254     auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable());
1255     BatchInsertArgsDfx(static_cast<int>(executeSqlArgs.size()));
1256     if (executeSqlArgs.empty()) {
1257         LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.",
1258             SqliteUtils::Anonymous(table).c_str(), rows.RowSize(), conn->GetMaxVariable());
1259         return { E_INVALID_ARGS, -1 };
1260     }
1261     PauseDelayNotify pauseDelayNotify(delayNotifier_);
1262     for (const auto &[sql, bindArgs] : executeSqlArgs) {
1263         auto [errCode, statement] = GetStatement(sql, conn);
1264         if (statement == nullptr) {
1265             LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1266                 "app self can check the SQL", errCode, bindArgs.size(), SqliteUtils::Anonymous(table).c_str());
1267             return { E_OK, -1 };
1268         }
1269         for (const auto &args : bindArgs) {
1270             auto errCode = statement->Execute(args);
1271             if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1272                 pool->Dump(true, "BATCH");
1273                 return { errCode, -1 };
1274             }
1275             if (errCode != E_OK) {
1276                 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,app self can check the SQL",
1277                     errCode, bindArgs.size(), SqliteUtils::Anonymous(table).c_str());
1278                 return { E_OK, -1 };
1279             }
1280         }
1281     }
1282     conn = nullptr;
1283     DoCloudSync(table);
1284     return { E_OK, int64_t(rows.RowSize()) };
1285 }
1286 
BatchInsertArgsDfx(int argsSize)1287 void RdbStoreImpl::BatchInsertArgsDfx(int argsSize)
1288 {
1289     if (argsSize > 1) {
1290         Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_BATCH_INSERT_ARGS_SIZE, config_.GetBundleName(),
1291             "BatchInsert executeSqlArgs size[ " + std::to_string(argsSize) + "]"));
1292     }
1293 }
1294 
BatchInsert(const std::string & table,const RefRows & rows,const std::vector<std::string> & returningFields,Resolution resolution)1295 std::pair<int32_t, Results> RdbStoreImpl::BatchInsert(const std::string &table, const RefRows &rows,
1296     const std::vector<std::string> &returningFields, Resolution resolution)
1297 {
1298     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1299         return { E_NOT_SUPPORT, -1 };
1300     }
1301 
1302     if (rows.RowSize() == 0) {
1303         return { E_OK, 0 };
1304     }
1305 
1306     RdbStatReporter reportStat(RDB_PERF, BATCHINSERT, config_, reportFunc_);
1307     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1308     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL, 0, rows.RowSize());
1309     auto pool = GetPool();
1310     if (pool == nullptr) {
1311         return { E_ALREADY_CLOSED, -1 };
1312     }
1313     auto conn = pool->AcquireConnection(false);
1314     if (conn == nullptr) {
1315         return { E_DATABASE_BUSY, -1 };
1316     }
1317 
1318     auto sqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable(), resolution);
1319     // To ensure atomicity, execute SQL only once
1320     if (sqlArgs.size() != 1 || sqlArgs.front().second.size() != 1) {
1321         auto [fields, values] = rows.GetFieldsAndValues();
1322         LOG_ERROR("invalid! rows:%{public}zu, table:%{public}s, fields:%{public}zu, max:%{public}d.", rows.RowSize(),
1323             SqliteUtils::Anonymous(table).c_str(), fields != nullptr ? fields->size() : 0, conn->GetMaxVariable());
1324         return { E_INVALID_ARGS, -1 };
1325     }
1326     auto &[sql, bindArgs] = sqlArgs.front();
1327     SqliteSqlBuilder::AppendReturning(sql, returningFields);
1328     auto [errCode, statement] = GetStatement(sql, conn);
1329     if (statement == nullptr) {
1330         LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1331                   "app self can check the SQL", errCode, bindArgs.size(), SqliteUtils::Anonymous(table).c_str());
1332         return { errCode, -1 };
1333     }
1334     PauseDelayNotify pauseDelayNotify(delayNotifier_);
1335     errCode = statement->Execute(std::ref(bindArgs.front()));
1336     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1337         pool->Dump(true, "BATCH");
1338         return { errCode, -1 };
1339     }
1340     if (errCode != E_OK) {
1341         LOG_ERROR("failed,errCode:%{public}d,table:%{public}s,args:%{public}zu,resolution:%{public}d.", errCode,
1342             SqliteUtils::Anonymous(table).c_str(), bindArgs.front().size(), static_cast<int32_t>(resolution));
1343     }
1344     auto result = GenerateResult(errCode, statement);
1345     if (result.changed > 0) {
1346         DoCloudSync(table);
1347     }
1348     return { errCode, result };
1349 }
1350 
Update(const Row & row,const AbsRdbPredicates & predicates,const std::vector<std::string> & returningFields,Resolution resolution)1351 std::pair<int32_t, Results> RdbStoreImpl::Update(const Row &row, const AbsRdbPredicates &predicates,
1352     const std::vector<std::string> &returningFields, Resolution resolution)
1353 {
1354     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1355     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1356         return { E_NOT_SUPPORT, -1 };
1357     }
1358     RdbStatReporter reportStat(RDB_PERF, UPDATE, config_, reportFunc_);
1359     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1360     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1361     auto [status, sqlInfo] = RdbSqlUtils::GetUpdateSqlInfo(predicates, row, resolution, returningFields);
1362     if (status != E_OK) {
1363         return { status, -1 };
1364     }
1365     auto [code, result] = ExecuteForRow(sqlInfo.sql, sqlInfo.args);
1366     if (result.changed > 0) {
1367         DoCloudSync(predicates.GetTableName());
1368     }
1369     return { code, result };
1370 }
1371 
Delete(const AbsRdbPredicates & predicates,const std::vector<std::string> & returningFields)1372 std::pair<int32_t, Results> RdbStoreImpl::Delete(
1373     const AbsRdbPredicates &predicates, const std::vector<std::string> &returningFields)
1374 {
1375     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1376     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1377         return { E_NOT_SUPPORT, -1 };
1378     }
1379     RdbStatReporter reportStat(RDB_PERF, DELETE, config_, reportFunc_);
1380     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1381     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1382     auto [status, sqlInfo] = RdbSqlUtils::GetDeleteSqlInfo(predicates, returningFields);
1383     if (status != E_OK) {
1384         return { status, -1 };
1385     }
1386 
1387     auto [code, result] = ExecuteForRow(sqlInfo.sql, predicates.GetBindArgs());
1388     if (result.changed > 0) {
1389         DoCloudSync(predicates.GetTableName());
1390     }
1391     return { code, result };
1392 }
1393 
QuerySql(const std::string & sql,const Values & bindArgs)1394 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1395 {
1396     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1397     if (config_.GetDBType() == DB_VECTOR) {
1398         return nullptr;
1399     }
1400     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1401     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1402 #if !defined(CROSS_PLATFORM)
1403     auto start = std::chrono::steady_clock::now();
1404     auto pool = GetPool();
1405     if (pool == nullptr) {
1406         LOG_ERROR("Database already closed.");
1407         return nullptr;
1408     }
1409     return std::make_shared<SqliteSharedResultSet>(start, pool->AcquireRef(true), sql, bindArgs, path_);
1410 #else
1411     (void)sql;
1412     (void)bindArgs;
1413     return nullptr;
1414 #endif
1415 }
1416 
QueryByStep(const std::string & sql,const Values & args,bool preCount)1417 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args, bool preCount)
1418 {
1419     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1420     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1421     auto start = std::chrono::steady_clock::now();
1422     auto pool = GetPool();
1423     if (pool == nullptr) {
1424         LOG_ERROR("Database already closed.");
1425         return nullptr;
1426     }
1427 #if !defined(CROSS_PLATFORM)
1428     return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, preCount);
1429 #else
1430     return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, false);
1431 #endif
1432 }
1433 
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1434 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1435 {
1436     if (config_.GetDBType() == DB_VECTOR) {
1437         return E_NOT_SUPPORT;
1438     }
1439     std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1440     return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1441 }
1442 
WriteToCompareFile(const std::string & dbPath,const std::string & bundleName,const std::string & sql)1443 void WriteToCompareFile(const std::string &dbPath, const std::string &bundleName, const std::string &sql)
1444 {
1445     auto poolTask = TaskExecutor::GetInstance().GetExecutor();
1446     if (poolTask != nullptr) {
1447         poolTask->Execute([dbPath, bundleName, sql]() {
1448             std::string comparePath = dbPath + "-compare";
1449             if (SqliteUtils::CleanFileContent(comparePath)) {
1450                 Reportor::ReportFault(
1451                     RdbFaultEvent(FT_CURD, E_DFX_IS_NOT_EXIST, bundleName, "compare file is deleted"));
1452             }
1453             SqliteUtils::WriteSqlToFile(comparePath, sql);
1454         });
1455     }
1456 }
1457 
HandleSchemaDDL(std::shared_ptr<Statement> && statement,const std::string & sql)1458 int32_t RdbStoreImpl::HandleSchemaDDL(std::shared_ptr<Statement> &&statement, const std::string &sql)
1459 {
1460     statement->Reset();
1461     Suspender suspender(Suspender::SQL_STATISTIC);
1462     statement->Prepare("PRAGMA schema_version");
1463     auto [err, version] = statement->ExecuteForValue();
1464     statement = nullptr;
1465     if (vSchema_ < static_cast<int64_t>(version)) {
1466         LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 ">",
1467             SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version));
1468         vSchema_ = version;
1469         if (!isMemoryRdb_) {
1470             std::string dbPath = config_.GetPath();
1471             std::string bundleName = config_.GetBundleName();
1472             WriteToCompareFile(dbPath, bundleName, sql);
1473         }
1474         statement = nullptr;
1475         if (config_.GetEnableSemanticIndex() && !isKnowledgeSchemaReady_) {
1476             SetKnowledgeSchema();
1477         }
1478         auto pool = GetPool();
1479         if (pool == nullptr) {
1480             return E_ALREADY_CLOSED;
1481         }
1482         return pool->RestartConns();
1483     }
1484     return E_OK;
1485 }
1486 
ExecuteSql(const std::string & sql,const Values & args)1487 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1488 {
1489     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1490     if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1491         return E_NOT_SUPPORT;
1492     }
1493     int ret = CheckAttach(sql);
1494     if (ret != E_OK) {
1495         return ret;
1496     }
1497     RdbStatReporter reportStat(RDB_PERF, EXECUTESQL, config_, reportFunc_);
1498     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1499     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1500     auto [errCode, statement] = BeginExecuteSql(sql);
1501     if (statement == nullptr) {
1502         return errCode;
1503     }
1504     errCode = statement->Execute(args);
1505     if (errCode != E_OK) {
1506         LOG_ERROR("failed,error:0x%{public}x app self can check the SQL.", errCode);
1507         TryDump(errCode, "EXECUTE");
1508         return errCode;
1509     }
1510     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1511     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1512         HandleSchemaDDL(std::move(statement), sql);
1513     }
1514     statement = nullptr;
1515     if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1516         DoCloudSync("");
1517     }
1518     return errCode;
1519 }
1520 
Execute(const std::string & sql,const Values & args,int64_t trxId)1521 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1522 {
1523     ValueObject object;
1524     if (isReadOnly_) {
1525         return { E_NOT_SUPPORT, object };
1526     }
1527 
1528     RdbStatReporter reportStat(RDB_PERF, EXECUTE, config_, reportFunc_);
1529     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1530     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1531     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1532     if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1533         LOG_ERROR("Not support the sqlType: %{public}d, app self can check the SQL", sqlType);
1534         return { E_NOT_SUPPORT_THE_SQL, object };
1535     }
1536 
1537     if (config_.IsVector() && trxId > 0) {
1538         return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1539     }
1540 
1541     auto [errCode, statement] = GetStatement(sql, false);
1542     if (errCode != E_OK || statement == nullptr) {
1543         return { errCode != E_OK ? errCode : E_ERROR, object };
1544     }
1545 
1546     errCode = statement->Execute(args);
1547     TryDump(errCode, "EXECUTE");
1548     if (config_.IsVector()) {
1549         return { errCode, object };
1550     }
1551 
1552     return HandleDifferentSqlTypes(std::move(statement), sql, errCode, sqlType);
1553 }
1554 
HandleDifferentSqlTypes(std::shared_ptr<Statement> && statement,const std::string & sql,int32_t code,int sqlType)1555 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(
1556     std::shared_ptr<Statement> &&statement, const std::string &sql, int32_t code, int sqlType)
1557 {
1558     if (code != E_OK) {
1559         return { code, ValueObject() };
1560     }
1561     if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1562         int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1563         return { code, ValueObject(outValue) };
1564     }
1565 
1566     if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1567         int outValue = statement->Changes();
1568         return { code, ValueObject(outValue) };
1569     }
1570 
1571     if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1572         if (statement->GetColumnCount() == 1) {
1573             return statement->GetColumn(0);
1574         }
1575 
1576         if (statement->GetColumnCount() > 1) {
1577             LOG_ERROR("Not support the sql:app self can check the SQL, column count more than 1");
1578             return { E_NOT_SUPPORT_THE_SQL, ValueObject() };
1579         }
1580     }
1581 
1582     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1583         HandleSchemaDDL(std::move(statement), sql);
1584     }
1585     return { code, ValueObject() };
1586 }
1587 
ExecuteExt(const std::string & sql,const Values & args)1588 std::pair<int32_t, Results> RdbStoreImpl::ExecuteExt(const std::string &sql, const Values &args)
1589 {
1590     if (isReadOnly_ || config_.IsVector()) {
1591         return { E_NOT_SUPPORT, -1 };
1592     }
1593 
1594     RdbStatReporter reportStat(RDB_PERF, EXECUTE, config_, reportFunc_);
1595     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1596     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1597     if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1598         LOG_ERROR("Not support the sqlType: %{public}d, app self can check the SQL", sqlType);
1599         return { E_NOT_SUPPORT_THE_SQL, -1 };
1600     }
1601     auto [errCode, statement] = GetStatement(sql, false);
1602     if (errCode != E_OK) {
1603         return { errCode, -1 };
1604     }
1605     errCode = statement->Execute(args);
1606     TryDump(errCode, "ExecuteExt");
1607     return HandleResults(std::move(statement), sql, errCode, sqlType);
1608 }
1609 
HandleResults(std::shared_ptr<Statement> && statement,const std::string & sql,int32_t code,int sqlType)1610 std::pair<int32_t, Results> RdbStoreImpl::HandleResults(
1611     std::shared_ptr<Statement> &&statement, const std::string &sql, int32_t code, int sqlType)
1612 {
1613     Results result = GenerateResult(
1614         code, statement, sqlType == SqliteUtils::STATEMENT_INSERT || sqlType == SqliteUtils::STATEMENT_UPDATE);
1615     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1616         HandleSchemaDDL(std::move(statement), sql);
1617     }
1618     return { code, result };
1619 }
1620 
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1621 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1622 {
1623     if (config_.GetDBType() == DB_VECTOR) {
1624         return E_NOT_SUPPORT;
1625     }
1626     auto [errCode, statement] = BeginExecuteSql(sql);
1627     if (statement == nullptr) {
1628         return errCode;
1629     }
1630     auto [err, object] = statement->ExecuteForValue(args);
1631     if (err != E_OK) {
1632         LOG_ERROR("failed, app self can check the SQL,  ERROR is %{public}d.", err);
1633     }
1634     outValue = object;
1635     return err;
1636 }
1637 
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1638 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1639 {
1640     if (config_.GetDBType() == DB_VECTOR) {
1641         return E_NOT_SUPPORT;
1642     }
1643     auto [errCode, statement] = BeginExecuteSql(sql);
1644     if (statement == nullptr) {
1645         return errCode;
1646     }
1647     ValueObject object;
1648     std::tie(errCode, object) = statement->ExecuteForValue(args);
1649     if (errCode != E_OK) {
1650         LOG_ERROR("failed, app self can check the SQL,  ERROR is %{public}d.", errCode);
1651     }
1652     outValue = static_cast<std::string>(object);
1653     return errCode;
1654 }
1655 
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1656 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1657 {
1658     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1659         return E_NOT_SUPPORT;
1660     }
1661     auto begin = std::chrono::steady_clock::now();
1662     auto [errCode, statement] = GetStatement(sql, false);
1663     if (statement == nullptr) {
1664         return errCode;
1665     }
1666     auto beginExec = std::chrono::steady_clock::now();
1667     errCode = statement->Execute(args);
1668     if (errCode != E_OK) {
1669         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1670             auto pool = GetPool();
1671             if (pool != nullptr) {
1672                 pool->Dump(true, "INSERT");
1673             }
1674         }
1675         return errCode;
1676     }
1677     auto beginResult = std::chrono::steady_clock::now();
1678     outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1679     auto allEnd = std::chrono::steady_clock::now();
1680     int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - begin).count();
1681     if (totalCostTime >= TIME_OUT) {
1682         int64_t prepareCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1683         int64_t execCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginResult - beginExec).count();
1684         int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1685         LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1686                  "] result[%{public}" PRId64 "] "
1687                  "sql[%{public}s]",
1688             totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::SqlAnonymous(sql).c_str());
1689     }
1690     return E_OK;
1691 }
1692 
ExecuteForRow(const std::string & sql,const Values & args)1693 std::pair<int32_t, Results> RdbStoreImpl::ExecuteForRow(const std::string &sql, const Values &args)
1694 {
1695     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1696         return { E_NOT_SUPPORT, -1 };
1697     }
1698     auto [errCode, statement] = GetStatement(sql, false);
1699     if (statement == nullptr) {
1700         return { errCode, -1 };
1701     }
1702     errCode = statement->Execute(args);
1703     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1704         auto pool = GetPool();
1705         if (pool != nullptr) {
1706             pool->Dump(true, "UPG DEL");
1707         }
1708     }
1709     return { errCode, GenerateResult(errCode, statement) };
1710 }
1711 
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1712 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1713 {
1714     auto [code, result] = ExecuteForRow(sql, args);
1715     outValue = result.changed;
1716     return code;
1717 }
1718 
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1719 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1720 {
1721     if (databasePath.empty()) {
1722         return E_INVALID_FILE_PATH;
1723     }
1724 
1725     if (ISFILE(databasePath)) {
1726         backupFilePath = ExtractFilePath(path_) + databasePath;
1727     } else {
1728         // 2 represents two characters starting from the len - 2 position
1729         if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1730             databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1731             LOG_ERROR("Invalid databasePath.");
1732             return E_INVALID_FILE_PATH;
1733         }
1734         backupFilePath = databasePath;
1735     }
1736 
1737     if (backupFilePath == path_) {
1738         LOG_ERROR("The backupPath and path should not be same.");
1739         return E_INVALID_FILE_PATH;
1740     }
1741 
1742     LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1743     return E_OK;
1744 }
1745 
GetSlaveName(const std::string & path,std::string & backupFilePath)1746 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1747 {
1748     std::string suffix(".db");
1749     std::string slaveSuffix("_slave.db");
1750     auto pos = path.find(suffix);
1751     if (pos == std::string::npos) {
1752         backupFilePath = path + slaveSuffix;
1753     } else {
1754         backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1755     }
1756     return E_OK;
1757 }
1758 
1759 /**
1760  * Backup a database from a specified encrypted or unencrypted database file.
1761  * Support skipping verification.
1762  */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey,bool verifyDb)1763 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey, bool verifyDb)
1764 {
1765     LOG_INFO("Backup db: %{public}s, verify: %{public}d.", SqliteUtils::Anonymous(config_.GetName()).c_str(), verifyDb);
1766     if (isReadOnly_ || isMemoryRdb_) {
1767         return E_NOT_SUPPORT;
1768     }
1769     std::string backupFilePath;
1770     if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1771         return InnerBackup(backupFilePath, encryptKey, verifyDb);
1772     }
1773 
1774     int ret = GetDataBasePath(databasePath, backupFilePath);
1775     if (ret != E_OK) {
1776         return ret;
1777     }
1778 
1779     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1780     keyFiles.Lock();
1781 
1782     auto walFile = backupFilePath + "-wal";
1783     if (access(walFile.c_str(), F_OK) == E_OK) {
1784         if (!SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1785             keyFiles.Unlock();
1786             return E_ERROR;
1787         }
1788     }
1789     std::string tempPath = backupFilePath + ".tmp";
1790     if (access(tempPath.c_str(), F_OK) == E_OK) {
1791         SqliteUtils::DeleteFile(backupFilePath);
1792     } else {
1793         if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1794             LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1795                 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1796             keyFiles.Unlock();
1797             return E_ERROR;
1798         }
1799     }
1800     ret = InnerBackup(backupFilePath, encryptKey);
1801     if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1802         if (ret == E_DB_NOT_EXIST) {
1803             Reportor::ReportCorrupted(Reportor::Create(config_, ret, "ErrorType: BackupFailed"));
1804         }
1805         if (SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1806             SqliteUtils::RenameFile(tempPath, backupFilePath);
1807         }
1808     } else {
1809         SqliteUtils::DeleteFile(tempPath);
1810     }
1811     keyFiles.Unlock();
1812     return ret;
1813 }
1814 
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1815 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(
1816     const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1817 {
1818     std::vector<ValueObject> bindArgs;
1819     bindArgs.emplace_back(databasePath);
1820     if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1821         bindArgs.emplace_back(destEncryptKey);
1822     } else if (config_.IsEncrypt()) {
1823         std::vector<uint8_t> key = config_.GetEncryptKey();
1824         bindArgs.emplace_back(key);
1825         key.assign(key.size(), 0);
1826     } else {
1827         bindArgs.emplace_back("");
1828     }
1829     return bindArgs;
1830 }
1831 
1832 /**
1833  * Backup a database from a specified encrypted or unencrypted database file.
1834  */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey,bool verifyDb)1835 int RdbStoreImpl::InnerBackup(
1836     const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey, bool verifyDb)
1837 {
1838     if (isReadOnly_) {
1839         return E_NOT_SUPPORT;
1840     }
1841 
1842     if (config_.GetDBType() == DB_VECTOR) {
1843         auto [errCode, conn] = GetConn(false);
1844         return errCode != E_OK ? errCode : conn->Backup(databasePath, destEncryptKey, false, slaveStatus_);
1845     }
1846 
1847     if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1848         auto [errCode, conn] = GetConn(false);
1849         return errCode != E_OK ? errCode : conn->Backup(databasePath, {}, false, slaveStatus_, verifyDb);
1850     }
1851     Suspender suspender(Suspender::SQL_LOG);
1852     auto config = config_;
1853     config.SetHaMode(HAMode::SINGLE);
1854     config.SetCreateNecessary(false);
1855     auto [result, conn] = CreateWritableConn(config);
1856     if (result != E_OK) {
1857         return result;
1858     }
1859 
1860     if (config_.IsEncrypt()) {
1861         result = SetDefaultEncryptAlgo(conn, config_);
1862         if (result != E_OK) {
1863             return result;
1864         }
1865     }
1866 
1867     std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1868     auto [errCode, statement] = GetStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1869     errCode = statement->Execute(bindArgs);
1870     if (errCode != E_OK) {
1871         return errCode;
1872     }
1873     errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1874     errCode = statement->Execute();
1875     if (errCode != E_OK) {
1876         return errCode;
1877     }
1878     errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1879     int ret = statement->Execute();
1880     errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1881     int res = statement->Execute();
1882     return (res == E_OK) ? ret : res;
1883 }
1884 
BeginExecuteSql(const std::string & sql)1885 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string &sql)
1886 {
1887     int type = SqliteUtils::GetSqlStatementType(sql);
1888     if (SqliteUtils::IsSpecial(type)) {
1889         return { E_NOT_SUPPORT, nullptr };
1890     }
1891 
1892     bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1893     auto pool = GetPool();
1894     if (pool == nullptr) {
1895         return { E_ALREADY_CLOSED, nullptr };
1896     }
1897     auto conn = pool->AcquireConnection(assumeReadOnly);
1898     if (conn == nullptr) {
1899         return { E_DATABASE_BUSY, nullptr };
1900     }
1901 
1902     auto [errCode, statement] = GetStatement(sql, conn);
1903     if (statement == nullptr) {
1904         return { errCode, nullptr };
1905     }
1906 
1907     if (statement->ReadOnly() && conn->IsWriter()) {
1908         statement = nullptr;
1909         conn = nullptr;
1910         return GetStatement(sql, true);
1911     }
1912 
1913     return { errCode, statement };
1914 }
1915 
IsHoldingConnection()1916 bool RdbStoreImpl::IsHoldingConnection()
1917 {
1918     return GetPool() != nullptr;
1919 }
1920 
SetDefaultEncryptSql(const std::shared_ptr<Statement> & statement,std::string sql,const RdbStoreConfig & config)1921 int RdbStoreImpl::SetDefaultEncryptSql(
1922     const std::shared_ptr<Statement> &statement, std::string sql, const RdbStoreConfig &config)
1923 {
1924     auto errCode = statement->Prepare(sql);
1925     if (errCode != E_OK) {
1926         LOG_ERROR("Prepare failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1927             SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1928             config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1929             config.GetCryptoParam().cryptoPageSize);
1930         return errCode;
1931     }
1932     errCode = statement->Execute();
1933     if (errCode != E_OK) {
1934         LOG_ERROR("Execute failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1935             SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1936             config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1937             config.GetCryptoParam().cryptoPageSize);
1938         return errCode;
1939     }
1940     return E_OK;
1941 }
1942 
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1943 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1944 {
1945     if (conn == nullptr) {
1946         return E_DATABASE_BUSY;
1947     }
1948 
1949     if (!config.GetCryptoParam().IsValid()) {
1950         LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config.GetName()).c_str());
1951         return E_INVALID_ARGS;
1952     }
1953 
1954     std::string sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_CIPHER_PREFIX) +
1955                       SqliteUtils::EncryptAlgoDescription(config.GetEncryptAlgo()) +
1956                       std::string(GlobalExpr::ALGO_SUFFIX);
1957     Suspender suspender(Suspender::SQL_LOG);
1958     auto [errCode, statement] = GetStatement(sql, conn);
1959     errCode = SetDefaultEncryptSql(statement, sql, config);
1960     if (errCode != E_OK) {
1961         return errCode;
1962     }
1963 
1964     if (config.GetIter() > 0) {
1965         sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ITER_PREFIX) + std::to_string(config.GetIter());
1966         errCode = SetDefaultEncryptSql(statement, sql, config);
1967         if (errCode != E_OK) {
1968             return errCode;
1969         }
1970     }
1971 
1972     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO_PREFIX) +
1973           SqliteUtils::HmacAlgoDescription(config.GetCryptoParam().hmacAlgo) + std::string(GlobalExpr::ALGO_SUFFIX);
1974     errCode = SetDefaultEncryptSql(statement, sql, config);
1975     if (errCode != E_OK) {
1976         return errCode;
1977     }
1978 
1979     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ALGO_PREFIX) +
1980                       SqliteUtils::KdfAlgoDescription(config.GetCryptoParam().kdfAlgo) +
1981                       std::string(GlobalExpr::ALGO_SUFFIX);
1982     errCode = SetDefaultEncryptSql(statement, sql, config);
1983     if (errCode != E_OK) {
1984         return errCode;
1985     }
1986 
1987     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_PAGE_SIZE_PREFIX) +
1988           std::to_string(config.GetCryptoParam().cryptoPageSize);
1989     return SetDefaultEncryptSql(statement, sql, config);
1990 }
1991 
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1992 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1993     const std::vector<uint8_t> &key, int32_t waitTime)
1994 {
1995     auto pool = GetPool();
1996     if (pool == nullptr) {
1997         return E_ALREADY_CLOSED;
1998     }
1999     Suspender suspender(Suspender::SQL_LOG);
2000     auto [conn, readers] = pool->AcquireAll(waitTime);
2001     if (conn == nullptr) {
2002         return E_DATABASE_BUSY;
2003     }
2004 
2005     if (!isMemoryRdb_ && conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
2006         // close first to prevent the connection from being put back.
2007         pool->CloseAllConnections();
2008         conn = nullptr;
2009         readers.clear();
2010         auto [err, newConn] = pool->DisableWal();
2011         if (err != E_OK) {
2012             return err;
2013         }
2014         conn = newConn;
2015     }
2016     std::vector<ValueObject> bindArgs;
2017     bindArgs.emplace_back(ValueObject(dbPath));
2018     bindArgs.emplace_back(ValueObject(attachName));
2019     if (!key.empty()) {
2020         auto ret = SetDefaultEncryptAlgo(conn, config);
2021         if (ret != E_OK) {
2022             return ret;
2023         }
2024         bindArgs.emplace_back(ValueObject(key));
2025         auto [errCode, statement] = GetStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
2026         if (statement == nullptr || errCode != E_OK) {
2027             LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
2028             return E_ERROR;
2029         }
2030         return statement->Execute(bindArgs);
2031     }
2032 
2033     auto [errCode, statement] = GetStatement(GlobalExpr::ATTACH_SQL, conn);
2034     if (statement == nullptr || errCode != E_OK) {
2035         LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
2036         return errCode;
2037     }
2038     return statement->Execute(bindArgs);
2039 }
2040 
2041 /**
2042  * Attaches a database.
2043  */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)2044 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
2045     const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
2046 {
2047     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE ||
2048         (config.IsMemoryRdb() && config.IsEncrypt())) {
2049         return { E_NOT_SUPPORT, 0 };
2050     }
2051     std::string dbPath;
2052     int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
2053     if (err != E_OK || (access(dbPath.c_str(), F_OK) != E_OK && !config.IsMemoryRdb())) {
2054         return { E_INVALID_FILE_PATH, 0 };
2055     }
2056 
2057     // encrypted databases are not supported to attach a non encrypted database.
2058     if (!config.IsEncrypt() && config_.IsEncrypt()) {
2059         return { E_NOT_SUPPORT, 0 };
2060     }
2061 
2062     if (attachedInfo_.Contains(attachName)) {
2063         return { E_ATTACHED_DATABASE_EXIST, 0 };
2064     }
2065 
2066     std::vector<uint8_t> key;
2067     config.Initialize();
2068     if (config.IsEncrypt()) {
2069         key = config.GetEncryptKey();
2070     }
2071     err = AttachInner(config, attachName, dbPath, key, waitTime);
2072     key.assign(key.size(), 0);
2073     if (err == E_SQLITE_ERROR) {
2074         // only when attachName is already in use, SQLITE-ERROR will be reported here.
2075         return { E_ATTACHED_DATABASE_EXIST, 0 };
2076     } else if (err != E_OK) {
2077         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
2078                   "[%{public}s]",
2079             err, SqliteUtils::Anonymous(config_.GetName()).c_str(), SqliteUtils::Anonymous(attachName).c_str(),
2080             SqliteUtils::Anonymous(config.GetName()).c_str());
2081         return { err, 0 };
2082     }
2083     if (!attachedInfo_.Insert(attachName, dbPath)) {
2084         return { E_ATTACHED_DATABASE_EXIST, 0 };
2085     }
2086     return { E_OK, attachedInfo_.Size() };
2087 }
2088 
Detach(const std::string & attachName,int32_t waitTime)2089 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
2090 {
2091     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2092         return { E_NOT_SUPPORT, 0 };
2093     }
2094     if (!attachedInfo_.Contains(attachName)) {
2095         return { E_OK, attachedInfo_.Size() };
2096     }
2097     Suspender suspender(Suspender::SQL_LOG);
2098     auto pool = GetPool();
2099     if (pool == nullptr) {
2100         return { E_ALREADY_CLOSED, 0 };
2101     }
2102     auto [connection, readers] = pool->AcquireAll(waitTime);
2103     if (connection == nullptr) {
2104         return { E_DATABASE_BUSY, 0 };
2105     }
2106     std::vector<ValueObject> bindArgs;
2107     bindArgs.push_back(ValueObject(attachName));
2108     auto [errCode, statement] = GetStatement(GlobalExpr::DETACH_SQL, connection);
2109     if (statement == nullptr || errCode != E_OK) {
2110         LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
2111         return { errCode, 0 };
2112     }
2113     errCode = statement->Execute(bindArgs);
2114     if (errCode != E_OK) {
2115         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
2116             SqliteUtils::Anonymous(config_.GetName()).c_str(), SqliteUtils::Anonymous(attachName).c_str());
2117         return { errCode, 0 };
2118     }
2119     attachedInfo_.Erase(attachName);
2120     if (!attachedInfo_.Empty()) {
2121         return { E_OK, attachedInfo_.Size() };
2122     }
2123     statement = nullptr;
2124     if (!isMemoryRdb_ && connection->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
2125         // close first to prevent the connection from being put back.
2126         pool->CloseAllConnections();
2127         connection = nullptr;
2128         readers.clear();
2129         errCode = pool->EnableWal();
2130     }
2131     return { errCode, 0 };
2132 }
2133 
2134 /**
2135  * Obtains the database version.
2136  */
GetVersion(int & version)2137 int RdbStoreImpl::GetVersion(int &version)
2138 {
2139     Suspender suspender(Suspender::SQL_LOG);
2140     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
2141     if (statement == nullptr) {
2142         return errCode;
2143     }
2144     ValueObject value;
2145     std::tie(errCode, value) = statement->ExecuteForValue();
2146     auto val = std::get_if<int64_t>(&value.value);
2147     if (val != nullptr) {
2148         version = static_cast<int>(*val);
2149     }
2150     return errCode;
2151 }
2152 
2153 /**
2154  * Sets the version of a new database.
2155  */
SetVersion(int version)2156 int RdbStoreImpl::SetVersion(int version)
2157 {
2158     if (isReadOnly_) {
2159         return E_NOT_SUPPORT;
2160     }
2161     Suspender suspender(Suspender::SQL_LOG);
2162     std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
2163     auto [errCode, statement] = GetStatement(sql);
2164     if (statement == nullptr) {
2165         return errCode;
2166     }
2167     return statement->Execute();
2168 }
2169 /**
2170  * Begins a transaction in EXCLUSIVE mode.
2171  */
BeginTransaction()2172 int RdbStoreImpl::BeginTransaction()
2173 {
2174     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2175     auto pool = GetPool();
2176     if (pool == nullptr) {
2177         return E_ALREADY_CLOSED;
2178     }
2179     std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2180     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2181         return E_NOT_SUPPORT;
2182     }
2183     // size + 1 means the number of transactions in process
2184     RdbStatReporter reportStat(RDB_PERF, BEGINTRANSACTION, config_, reportFunc_);
2185     size_t transactionId = pool->GetTransactionStack().size() + 1;
2186     BaseTransaction transaction(pool->GetTransactionStack().size());
2187     auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
2188     if (statement == nullptr) {
2189         return errCode;
2190     }
2191     errCode = statement->Execute();
2192     if (errCode != E_OK) {
2193         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2194             pool->Dump(true, "BEGIN");
2195         }
2196         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2197             SqliteUtils::Anonymous(name_).c_str(), errCode);
2198         return errCode;
2199     }
2200     pool->SetInTransaction(true);
2201     pool->GetTransactionStack().push(transaction);
2202     // 1 means the number of transactions in process
2203     if (transactionId > 1) {
2204         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2205             SqliteUtils::Anonymous(name_).c_str(), errCode);
2206     }
2207 
2208     return E_OK;
2209 }
2210 
BeginTrans()2211 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
2212 {
2213     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2214     if (!config_.IsVector() || isReadOnly_) {
2215         return { E_NOT_SUPPORT, 0 };
2216     }
2217     int64_t tmpTrxId = 0;
2218     auto pool = GetPool();
2219     if (pool == nullptr) {
2220         return { E_ALREADY_CLOSED, 0 };
2221     }
2222     auto [errCode, connection] = pool->CreateTransConn(false);
2223     if (connection == nullptr) {
2224         LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
2225             SqliteUtils::Anonymous(name_).c_str(), errCode);
2226         return { errCode, 0 };
2227     }
2228 
2229     tmpTrxId = newTrxId_.fetch_add(1);
2230     trxConnMap_.Insert(tmpTrxId, connection);
2231     errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
2232     if (errCode != E_OK) {
2233         trxConnMap_.Erase(tmpTrxId);
2234     }
2235     return { errCode, tmpTrxId };
2236 }
2237 
2238 /**
2239 * Begins a transaction in EXCLUSIVE mode.
2240 */
RollBack()2241 int RdbStoreImpl::RollBack()
2242 {
2243     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2244     auto pool = GetPool();
2245     if (pool == nullptr) {
2246         return E_ALREADY_CLOSED;
2247     }
2248     std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2249     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2250         return E_NOT_SUPPORT;
2251     }
2252     RdbStatReporter reportStat(RDB_PERF, ROLLBACK, config_, reportFunc_);
2253     size_t transactionId = pool->GetTransactionStack().size();
2254 
2255     if (pool->GetTransactionStack().empty()) {
2256         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
2257             SqliteUtils::Anonymous(name_).c_str());
2258         return E_NO_TRANSACTION_IN_SESSION;
2259     }
2260     BaseTransaction transaction = pool->GetTransactionStack().top();
2261     pool->GetTransactionStack().pop();
2262     if (transaction.GetType() != TransType::ROLLBACK_SELF && !pool->GetTransactionStack().empty()) {
2263         pool->GetTransactionStack().top().SetChildFailure(true);
2264     }
2265     auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
2266     if (statement == nullptr) {
2267         if (errCode == E_DATABASE_BUSY) {
2268             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2269         }
2270         // size + 1 means the number of transactions in process
2271         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
2272             SqliteUtils::Anonymous(name_).c_str());
2273         return E_DATABASE_BUSY;
2274     }
2275     errCode = statement->Execute();
2276     if (errCode != E_OK) {
2277         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2278             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2279         }
2280         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2281             SqliteUtils::Anonymous(name_).c_str(), errCode);
2282         return errCode;
2283     }
2284     if (pool->GetTransactionStack().empty()) {
2285         pool->SetInTransaction(false);
2286     }
2287     // 1 means the number of transactions in process
2288     if (transactionId > 1) {
2289         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2290             SqliteUtils::Anonymous(name_).c_str(), errCode);
2291     }
2292     return E_OK;
2293 }
2294 
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)2295 int RdbStoreImpl::ExecuteByTrxId(
2296     const std::string &sql, int64_t trxId, bool closeConnAfterExecute, const std::vector<ValueObject> &bindArgs)
2297 {
2298     if ((!config_.IsVector()) || isReadOnly_) {
2299         return E_NOT_SUPPORT;
2300     }
2301     if (trxId == 0) {
2302         return E_INVALID_ARGS;
2303     }
2304 
2305     if (!trxConnMap_.Contains(trxId)) {
2306         LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
2307         return E_INVALID_ARGS;
2308     }
2309     auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
2310     auto result = trxConnMap_.Find(trxId);
2311     auto connection = result.second;
2312     if (connection == nullptr) {
2313         LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
2314             SqliteUtils::Anonymous(name_).c_str(), time);
2315         return E_ERROR;
2316     }
2317     auto [ret, statement] = GetStatement(sql, connection);
2318     if (ret != E_OK) {
2319         return ret;
2320     }
2321     ret = statement->Execute(bindArgs);
2322     if (ret != E_OK) {
2323         LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
2324             SqliteUtils::Anonymous(name_).c_str(), ret);
2325         return ret;
2326     }
2327     if (closeConnAfterExecute) {
2328         trxConnMap_.Erase(trxId);
2329     }
2330     return E_OK;
2331 }
2332 
RollBack(int64_t trxId)2333 int RdbStoreImpl::RollBack(int64_t trxId)
2334 {
2335     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2336     return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
2337 }
2338 
2339 /**
2340 * Begins a transaction in EXCLUSIVE mode.
2341 */
Commit()2342 int RdbStoreImpl::Commit()
2343 {
2344     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2345     auto pool = GetPool();
2346     if (pool == nullptr) {
2347         return E_ALREADY_CLOSED;
2348     }
2349     std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2350     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2351         return E_NOT_SUPPORT;
2352     }
2353     RdbStatReporter reportStat(RDB_PERF, COMMIT, config_, reportFunc_);
2354     size_t transactionId = pool->GetTransactionStack().size();
2355 
2356     if (pool->GetTransactionStack().empty()) {
2357         return E_OK;
2358     }
2359     BaseTransaction transaction = pool->GetTransactionStack().top();
2360     std::string sqlStr = transaction.GetCommitStr();
2361     if (sqlStr.size() <= 1) {
2362         LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s", transactionId,
2363             SqliteUtils::Anonymous(name_).c_str(),
2364             SqliteUtils::SqlAnonymous(sqlStr).c_str());
2365         pool->GetTransactionStack().pop();
2366         return E_OK;
2367     }
2368     auto [errCode, statement] = GetStatement(sqlStr);
2369     if (statement == nullptr) {
2370         if (errCode == E_DATABASE_BUSY) {
2371             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2372         }
2373         LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
2374             SqliteUtils::Anonymous(name_).c_str());
2375         return E_DATABASE_BUSY;
2376     }
2377     errCode = statement->Execute();
2378     if (errCode != E_OK) {
2379         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2380             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2381         }
2382         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2383             SqliteUtils::Anonymous(name_).c_str(), errCode);
2384         return errCode;
2385     }
2386     pool->SetInTransaction(false);
2387     // 1 means the number of transactions in process
2388     if (transactionId > 1) {
2389         LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2390             SqliteUtils::Anonymous(name_).c_str(), errCode);
2391     }
2392     pool->GetTransactionStack().pop();
2393     return E_OK;
2394 }
2395 
Commit(int64_t trxId)2396 int RdbStoreImpl::Commit(int64_t trxId)
2397 {
2398     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2399     return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
2400 }
2401 
IsInTransaction()2402 bool RdbStoreImpl::IsInTransaction()
2403 {
2404     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2405         return false;
2406     }
2407     auto pool = GetPool();
2408     if (pool == nullptr) {
2409         return false;
2410     }
2411     return pool->IsInTransaction();
2412 }
2413 
CheckAttach(const std::string & sql)2414 int RdbStoreImpl::CheckAttach(const std::string &sql)
2415 {
2416     size_t index = sql.find_first_not_of(' ');
2417     if (index == std::string::npos) {
2418         return E_OK;
2419     }
2420 
2421     /* The first 3 characters can determine the type */
2422     std::string sqlType = sql.substr(index, 3);
2423     sqlType = SqliteUtils::StrToUpper(sqlType);
2424     if (sqlType != "ATT") {
2425         return E_OK;
2426     }
2427     Suspender suspender(Suspender::SQL_LOG);
2428     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
2429     if (statement == nullptr) {
2430         return errCode;
2431     }
2432 
2433     errCode = statement->Execute();
2434     if (errCode != E_OK) {
2435         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2436         return errCode;
2437     }
2438     auto [errorCode, valueObject] = statement->GetColumn(0);
2439     if (errorCode != E_OK) {
2440         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2441         return errorCode;
2442     }
2443     auto journal = std::get_if<std::string>(&valueObject.value);
2444     auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2445     if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2446         LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2447         return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2448     }
2449     return E_OK;
2450 }
2451 
IsOpen() const2452 bool RdbStoreImpl::IsOpen() const
2453 {
2454     return isOpen_;
2455 }
2456 
GetPath()2457 std::string RdbStoreImpl::GetPath()
2458 {
2459     return path_;
2460 }
2461 
IsReadOnly() const2462 bool RdbStoreImpl::IsReadOnly() const
2463 {
2464     return isReadOnly_;
2465 }
2466 
IsMemoryRdb() const2467 bool RdbStoreImpl::IsMemoryRdb() const
2468 {
2469     return isMemoryRdb_;
2470 }
2471 
GetName()2472 std::string RdbStoreImpl::GetName()
2473 {
2474     return name_;
2475 }
2476 
DoCloudSync(const std::string & table)2477 void RdbStoreImpl::DoCloudSync(const std::string &table)
2478 {
2479 #if !defined(CROSS_PLATFORM)
2480     auto needSync = cloudInfo_->Change(table);
2481     if (!needSync) {
2482         return;
2483     }
2484     auto pool = TaskExecutor::GetInstance().GetExecutor();
2485     if (pool == nullptr) {
2486         return;
2487     }
2488     auto interval =
2489         std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2490     pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2491         auto changeInfo = cloudInfo.lock();
2492         if (changeInfo == nullptr) {
2493             return;
2494         }
2495         auto tables = changeInfo->Steal();
2496         if (tables.empty()) {
2497             return;
2498         }
2499         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2500         auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2501         InnerSync(param, option, memo, nullptr);
2502     });
2503 #endif
2504 }
GetFileType()2505 std::string RdbStoreImpl::GetFileType()
2506 {
2507     return fileType_;
2508 }
2509 
2510 /**
2511  * Sets the database locale.
2512  */
ConfigLocale(const std::string & localeStr)2513 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2514 {
2515     if (!isOpen_) {
2516         LOG_ERROR("The connection pool has been closed.");
2517         return E_ALREADY_CLOSED;
2518     }
2519 
2520     auto pool = GetPool();
2521     if (pool == nullptr) {
2522         return E_ALREADY_CLOSED;
2523     }
2524     config_.SetCollatorLocales(localeStr);
2525     return pool->ConfigLocale(localeStr);
2526 }
2527 
GetDestPath(const std::string & backupPath,std::string & destPath)2528 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2529 {
2530     int ret = GetDataBasePath(backupPath, destPath);
2531     if (ret != E_OK) {
2532         return ret;
2533     }
2534     std::string tempPath = destPath + ".tmp";
2535     if (access(tempPath.c_str(), F_OK) == E_OK) {
2536         destPath = tempPath;
2537     }
2538 
2539     if (access(destPath.c_str(), F_OK) != E_OK) {
2540         LOG_ERROR("The backupFilePath does not exists.");
2541         return E_INVALID_FILE_PATH;
2542     }
2543     return E_OK;
2544 }
2545 
SwitchOver(bool isUseReplicaDb)2546 void RdbStoreImpl::SwitchOver(bool isUseReplicaDb)
2547 {
2548     isUseReplicaDb_ = isUseReplicaDb;
2549 }
2550 
IsUseAsyncRestore(const std::string & newPath,const std::string & backupPath)2551 bool RdbStoreImpl::IsUseAsyncRestore(const std::string &newPath, const std::string &backupPath)
2552 {
2553     if (config_.GetPath() == backupPath || newPath == backupPath ||
2554         config_.GetDBType() == DB_VECTOR || config_.GetHaMode() == HAMode::SINGLE ||
2555         !SqliteUtils::IsSlaveDbName(backupPath) || !SqliteUtils::IsSlaveLarge(config_.GetPath())) {
2556         return false;
2557     }
2558     return true;
2559 }
2560 
RestoreWithPool(std::shared_ptr<ConnectionPool> pool,const std::string & path)2561 int32_t RdbStoreImpl::RestoreWithPool(std::shared_ptr<ConnectionPool> pool, const std::string &path)
2562 {
2563     if (pool == nullptr) {
2564         return E_OK;
2565     }
2566     auto connection = pool->AcquireConnection(false);
2567     if (connection == nullptr) {
2568         return E_DATABASE_BUSY;
2569     }
2570     pool->ReopenConns();
2571     auto curStatus = std::make_shared<SlaveStatus>(SlaveStatus::UNDEFINED);
2572     return connection->Restore(path, {}, curStatus);
2573 }
2574 
StartAsyncRestore(std::shared_ptr<ConnectionPool> pool) const2575 int RdbStoreImpl::StartAsyncRestore(std::shared_ptr<ConnectionPool> pool) const
2576 {
2577     auto keyFilesPtr = std::make_shared<RdbSecurityManager::KeyFiles>(path_ + ASYNC_RESTORE);
2578     SqliteUtils::SetSlaveRestoring(path_);
2579     auto err = keyFilesPtr->Lock(false);
2580     if (err == E_OK) {
2581         auto taskPool = TaskExecutor::GetInstance().GetExecutor();
2582         if (taskPool == nullptr) {
2583             LOG_ERROR("Get thread pool failed");
2584             keyFilesPtr->Unlock();
2585             return E_ERROR;
2586         }
2587         taskPool->Execute([keyFilesPtr, config = config_, pool] {
2588             auto dbPath = config.GetPath();
2589             LOG_INFO("async restore started for %{public}s", SqliteUtils::Anonymous(dbPath).c_str());
2590             auto result = RdbStoreImpl::RestoreWithPool(pool, dbPath);
2591             if (result != E_OK) {
2592                 LOG_WARN("async restore failed, retry once, %{public}d", result);
2593                 result = RdbStoreImpl::RestoreWithPool(pool, dbPath);
2594             }
2595             if (result != E_OK) {
2596                 LOG_WARN("async restore failed, %{public}d", result);
2597                 SqliteUtils::SetSlaveInvalid(dbPath);
2598             }
2599             SqliteUtils::SetSlaveRestoring(dbPath, false);
2600             if (pool != nullptr) {
2601                 pool->ReopenConns();
2602             }
2603             keyFilesPtr->Unlock();
2604         });
2605 
2606         return E_OK;
2607     }
2608     LOG_WARN("Get process lock failed. Async restore is started in another process, %{public}s",
2609         SqliteUtils::Anonymous(path_).c_str());
2610     return E_OK;
2611 }
2612 
StartAsyncBackupIfNeed(std::shared_ptr<SlaveStatus> slaveStatus)2613 int RdbStoreImpl::StartAsyncBackupIfNeed(std::shared_ptr<SlaveStatus> slaveStatus)
2614 {
2615     auto taskPool = TaskExecutor::GetInstance().GetExecutor();
2616     if (taskPool == nullptr) {
2617         LOG_ERROR("Get thread pool failed");
2618         return E_ERROR;
2619     }
2620     auto config = config_;
2621     config.SetCreateNecessary(false);
2622     taskPool->Execute([config, slaveStatus] {
2623         if (*slaveStatus == SlaveStatus::DB_CLOSING) {
2624             return;
2625         }
2626         auto [result, conn] = CreateWritableConn(config);
2627         if (result != E_OK || conn == nullptr) {
2628             return;
2629         }
2630         auto strategy = conn->GenerateExchangeStrategy(slaveStatus);
2631         if (*slaveStatus == SlaveStatus::DB_CLOSING) {
2632             return;
2633         }
2634         LOG_INFO("async exchange st:%{public}d,", strategy);
2635         if (strategy == ExchangeStrategy::BACKUP) {
2636             (void)conn->Backup({}, {}, false, slaveStatus);
2637         }
2638     });
2639     return E_OK;
2640 }
2641 
RestoreInner(const std::string & destPath,const std::vector<uint8_t> & newKey,std::shared_ptr<ConnectionPool> pool)2642 int RdbStoreImpl::RestoreInner(const std::string &destPath, const std::vector<uint8_t> &newKey,
2643     std::shared_ptr<ConnectionPool> pool)
2644 {
2645     bool isUseAsync = IsUseAsyncRestore(path_, destPath);
2646     LOG_INFO("restore start, using async=%{public}d", isUseAsync);
2647     if (!isUseAsync) {
2648         auto err = pool->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2649         LOG_INFO("restore finished, sync mode rc=%{public}d", err);
2650         return err;
2651     }
2652 
2653     auto connection = pool->AcquireConnection(false);
2654     if (connection == nullptr) {
2655         LOG_WARN("Failed to obtain writer for async restore");
2656         return E_DATABASE_BUSY;
2657     }
2658 
2659     int errCode = connection->CheckReplicaForRestore();
2660     if (errCode != E_OK) {
2661         return errCode;
2662     }
2663     errCode = StartAsyncRestore(pool);
2664     LOG_INFO("restore finished, async mode rc=%{public}d", errCode);
2665     return errCode;
2666 }
2667 
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2668 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2669 {
2670     LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2671     if (isReadOnly_ || isMemoryRdb_) {
2672         return E_NOT_SUPPORT;
2673     }
2674 
2675     auto pool = GetPool();
2676     if (pool == nullptr || !isOpen_) {
2677         LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, pool == nullptr);
2678         return E_ALREADY_CLOSED;
2679     }
2680 
2681     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2682     keyFiles.Lock();
2683     std::string destPath;
2684     bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2685     if (!isOK) {
2686         int ret = GetDestPath(backupPath, destPath);
2687         if (ret != E_OK) {
2688             keyFiles.Unlock();
2689             return ret;
2690         }
2691     }
2692 #if !defined(CROSS_PLATFORM)
2693     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2694     if (service != nullptr) {
2695         service->Disable(syncerParam_);
2696     }
2697 #endif
2698     bool corrupt = Reportor::IsReportCorruptedFault(path_);
2699     int errCode = RestoreInner(destPath, newKey, pool);
2700     keyFiles.Unlock();
2701 #if !defined(CROSS_PLATFORM)
2702     SecurityPolicy::SetSecurityLabel(config_);
2703     if (service != nullptr) {
2704         service->Enable(syncerParam_);
2705         if (errCode == E_OK) {
2706             auto syncerParam = syncerParam_;
2707             syncerParam.infos_ = Connection::Collect(config_);
2708             service->AfterOpen(syncerParam);
2709             NotifyDataChange();
2710         }
2711     }
2712 #endif
2713     if (errCode == E_OK) {
2714         ExchangeSlaverToMaster();
2715         Reportor::ReportRestore(Reportor::Create(config_, E_OK, "ErrorType::RdbStoreImpl::Restore", false), corrupt);
2716         rebuild_ = RebuiltType::NONE;
2717     }
2718     DoCloudSync("");
2719     return errCode;
2720 }
2721 
CreateWritableConn(const RdbStoreConfig & config)2722 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn(const RdbStoreConfig &config)
2723 {
2724     auto [result, conn] = Connection::Create(config, true);
2725     if (result != E_OK || conn == nullptr) {
2726         LOG_ERROR("create connection failed, err:%{public}d", result);
2727         return { result, nullptr };
2728     }
2729     return { E_OK, conn };
2730 }
2731 
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2732 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2733     const std::string &sql, std::shared_ptr<Connection> conn) const
2734 {
2735     if (conn == nullptr) {
2736         return { E_DATABASE_BUSY, nullptr };
2737     }
2738 
2739     if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveRestoring(config_.GetPath())) {
2740         auto keyFiles = RdbSecurityManager::KeyFiles(config_.GetPath() + ASYNC_RESTORE);
2741         int32_t ret = keyFiles.Lock(false);
2742         if (ret != E_OK) {
2743             if (isUseReplicaDb_) {
2744                 LOG_INFO("Use replica statement, %{public}s", SqliteUtils::Anonymous(config_.GetPath()).c_str());
2745                 return conn->CreateReplicaStatement(sql, conn);
2746             }
2747             return { E_DATABASE_BUSY, nullptr };
2748         }
2749         SqliteUtils::SetSlaveRestoring(config_.GetPath(), false);
2750         (void)keyFiles.Unlock();
2751         auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2752         LOG_WARN("Got lock file but process is not in restore, mark removed, st:%{public}d, %{public}s",
2753             strategy, SqliteUtils::Anonymous(config_.GetPath()).c_str());
2754         if (strategy != ExchangeStrategy::RESTORE) {
2755             return conn->CreateStatement(sql, conn);
2756         }
2757         auto result = conn->CheckReplicaForRestore();
2758         if (result == E_OK) {
2759             result = StartAsyncRestore(GetPool());
2760             return conn->CreateReplicaStatement(sql, conn);
2761         }
2762     }
2763 
2764     return conn->CreateStatement(sql, conn);
2765 }
2766 
GetStatement(const std::string & sql,bool read) const2767 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string &sql, bool read) const
2768 {
2769     auto pool = GetPool();
2770     if (pool == nullptr) {
2771         return { E_ALREADY_CLOSED, nullptr };
2772     }
2773     auto conn = pool->AcquireConnection(read);
2774     if (conn == nullptr) {
2775         return { E_DATABASE_BUSY, nullptr };
2776     }
2777     return GetStatement(sql, conn);
2778 }
2779 
GetRebuilt(RebuiltType & rebuilt)2780 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2781 {
2782     rebuilt = static_cast<RebuiltType>(rebuild_);
2783     return E_OK;
2784 }
2785 
InterruptBackup()2786 int RdbStoreImpl::InterruptBackup()
2787 {
2788     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2789         return E_NOT_SUPPORT;
2790     }
2791     if (*slaveStatus_ == SlaveStatus::BACKING_UP) {
2792         *slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2793         return E_OK;
2794     }
2795     return E_CANCEL;
2796 }
2797 
GetBackupStatus() const2798 int32_t RdbStoreImpl::GetBackupStatus() const
2799 {
2800     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2801         return SlaveStatus::UNDEFINED;
2802     }
2803     return *slaveStatus_;
2804 }
2805 
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2806 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2807 {
2808     if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2809         return false;
2810     }
2811     int ret = GetSlaveName(config_.GetPath(), destPath);
2812     if (ret != E_OK) {
2813         destPath = {};
2814         return false;
2815     }
2816     if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2817         LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2818         return false;
2819     }
2820     return true;
2821 }
2822 
IsSlaveDiffFromMaster() const2823 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2824 {
2825     std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2826     return SqliteUtils::IsSlaveInvalid(config_.GetPath()) || (access(slaveDbPath.c_str(), F_OK) != 0);
2827 }
2828 
ExchangeSlaverToMaster()2829 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2830 {
2831     if (isReadOnly_ || isMemoryRdb_ || rebuild_ != RebuiltType::NONE) {
2832         return E_OK;
2833     }
2834     auto [errCode, conn] = GetConn(false);
2835     if (errCode != E_OK) {
2836         return errCode;
2837     }
2838     auto strategy = conn->GenerateExchangeStrategy(slaveStatus_, false);
2839     if (strategy != ExchangeStrategy::NOT_HANDLE) {
2840         LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2841     }
2842     int ret = E_OK;
2843     if (strategy == ExchangeStrategy::RESTORE && !SqliteUtils::IsSlaveRestoring(config_.GetPath())) {
2844         conn = nullptr;
2845         // disable is required before restore
2846         ret = Restore({}, {});
2847     } else if (strategy == ExchangeStrategy::BACKUP) {
2848         // async backup
2849         ret = conn->Backup({}, {}, true, slaveStatus_);
2850     } else if (strategy == ExchangeStrategy::PENDING_BACKUP) {
2851         ret = StartAsyncBackupIfNeed(slaveStatus_);
2852     }
2853     return ret;
2854 }
2855 
GetDbType() const2856 int32_t RdbStoreImpl::GetDbType() const
2857 {
2858     return config_.GetDBType();
2859 }
2860 
CreateTransaction(int32_t type)2861 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2862 {
2863     if (isReadOnly_) {
2864         return { E_NOT_SUPPORT, nullptr };
2865     }
2866     auto pool = GetPool();
2867     if (pool == nullptr) {
2868         return { E_ALREADY_CLOSED, nullptr };
2869     }
2870     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
2871     auto [errCode, conn] = pool->CreateTransConn();
2872     if (conn == nullptr) {
2873         return { errCode, nullptr };
2874     }
2875     std::shared_ptr<Transaction> trans;
2876     std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetPath());
2877     if (trans == nullptr) {
2878         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2879             pool->Dump(true, "TRANS");
2880         }
2881         return { errCode, nullptr };
2882     }
2883 
2884     std::lock_guard<decltype(mutex_)> guard(mutex_);
2885     for (auto it = transactions_.begin(); it != transactions_.end();) {
2886         if (it->expired()) {
2887             it = transactions_.erase(it);
2888         } else {
2889             it++;
2890         }
2891     }
2892     transactions_.push_back(trans);
2893     return { errCode, trans };
2894 }
2895 
CleanDirtyLog(const std::string & table,uint64_t cursor)2896 int RdbStoreImpl::CleanDirtyLog(const std::string &table, uint64_t cursor)
2897 {
2898     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || isMemoryRdb_) {
2899         LOG_ERROR("Not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d, isMemoryRdb:%{public}d.",
2900             SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType(), isMemoryRdb_);
2901         return E_NOT_SUPPORT;
2902     }
2903     auto [errCode, conn] = GetConn(false);
2904     if (errCode != E_OK || conn == nullptr) {
2905         LOG_ERROR("The database is busy or closed errCode:%{public}d", errCode);
2906         return errCode;
2907     }
2908     return conn->CleanDirtyLog(table, cursor);
2909 }
2910 
GetValues(std::shared_ptr<Statement> statement)2911 std::shared_ptr<ResultSet> RdbStoreImpl::GetValues(std::shared_ptr<Statement> statement)
2912 {
2913     if (statement == nullptr) {
2914         return nullptr;
2915     }
2916     auto [code, rows] = statement->GetRows(MAX_RETURNING_ROWS);
2917     auto size = rows.size();
2918     std::shared_ptr<ResultSet> result = std::make_shared<CacheResultSet>(std::move(rows));
2919     // The correct number of changed rows can only be obtained after completing the step
2920     while (code == E_OK && size == MAX_RETURNING_ROWS) {
2921         std::tie(code, rows) = statement->GetRows(MAX_RETURNING_ROWS);
2922         size = rows.size();
2923     }
2924     return result;
2925 }
2926 
GenerateResult(int32_t code,std::shared_ptr<Statement> statement,bool isDML)2927 Results RdbStoreImpl::GenerateResult(int32_t code, std::shared_ptr<Statement> statement, bool isDML)
2928 {
2929     Results result{ -1 };
2930     if (statement == nullptr) {
2931         return result;
2932     }
2933     // There are no data changes in other scenarios
2934     if (code == E_OK) {
2935         result.results = GetValues(statement);
2936         result.changed = isDML ? statement->Changes() : 0;
2937     }
2938     if (code == E_SQLITE_CONSTRAINT) {
2939         result.changed = statement->Changes();
2940     }
2941     if (isDML && result.changed <= 0) {
2942         result.results = std::make_shared<CacheResultSet>();
2943     }
2944     return result;
2945 }
2946 
TryDump(int32_t code,const char * dumpHeader)2947 void RdbStoreImpl::TryDump(int32_t code, const char *dumpHeader)
2948 {
2949     if (code != E_SQLITE_LOCKED && code != E_SQLITE_BUSY) {
2950         return;
2951     }
2952     auto pool = GetPool();
2953     if (pool != nullptr) {
2954         pool->Dump(true, dumpHeader);
2955     }
2956 }
2957 
AddTables(const std::vector<std::string> & tables)2958 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2959 {
2960     std::lock_guard<std::mutex> lock(mutex_);
2961     for (auto &table : tables) {
2962         tables_.insert(table);
2963     }
2964     return E_OK;
2965 }
2966 
RmvTables(const std::vector<std::string> & tables)2967 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2968 {
2969     std::lock_guard<std::mutex> lock(mutex_);
2970     for (auto &table : tables) {
2971         tables_.erase(table);
2972     }
2973     return E_OK;
2974 }
2975 
Change(const std::string & table)2976 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2977 {
2978     bool needSync = false;
2979     {
2980         std::lock_guard<std::mutex> lock(mutex_);
2981         if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2982             return needSync;
2983         }
2984         // from empty, then need schedule the cloud sync, others only wait the schedule execute.
2985         needSync = changes_.empty();
2986         if (!table.empty()) {
2987             changes_.insert(table);
2988         } else {
2989             changes_.insert(tables_.begin(), tables_.end());
2990         }
2991     }
2992     return needSync;
2993 }
2994 
Steal()2995 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2996 {
2997     std::set<std::string> result;
2998     {
2999         std::lock_guard<std::mutex> lock(mutex_);
3000         result = std::move(changes_);
3001     }
3002     return result;
3003 }
3004 
SetKnowledgeSchema()3005 void RdbStoreImpl::SetKnowledgeSchema()
3006 {
3007     auto [errCode, schema] = GetKnowledgeSchemaHelper()->GetRdbKnowledgeSchema(config_.GetName());
3008     if (errCode != E_OK) {
3009         return;
3010     }
3011     auto [ret, conn] = GetConn(false);
3012     if (ret != E_OK) {
3013         LOG_ERROR("The database is busy or closed when set knowledge schema ret %{public}d.", ret);
3014         return;
3015     }
3016     if (isKnowledgeSchemaReady_) {
3017         return;
3018     }
3019     ret = conn->SetKnowledgeSchema(schema);
3020     if (ret != E_OK) {
3021         LOG_ERROR("Set knowledge schema failed %{public}d.", ret);
3022         return;
3023     }
3024     isKnowledgeSchemaReady_ = true;
3025     auto helper = GetKnowledgeSchemaHelper();
3026     helper->Init(config_, schema);
3027     helper->DonateKnowledgeData();
3028 }
3029 
InitKnowledgeSchema(const DistributedRdb::RdbKnowledgeSchema & schema)3030 int RdbStoreImpl::InitKnowledgeSchema(const DistributedRdb::RdbKnowledgeSchema &schema)
3031 {
3032     auto [ret, conn] = GetConn(false);
3033     if (ret != E_OK) {
3034         LOG_ERROR("The database is busy or closed when set knowledge schema ret %{public}d.", ret);
3035         return ret;
3036     }
3037     ret = conn->SetKnowledgeSchema(schema);
3038     if (ret != E_OK) {
3039         LOG_ERROR("Set knowledge schema failed %{public}d.", ret);
3040         return ret;
3041     }
3042     return E_OK;
3043 }
3044 
GetKnowledgeSchemaHelper()3045 std::shared_ptr<NativeRdb::KnowledgeSchemaHelper> RdbStoreImpl::GetKnowledgeSchemaHelper()
3046 {
3047     std::lock_guard<std::mutex> autoLock(helperMutex_);
3048     if (knowledgeSchemaHelper_ == nullptr) {
3049         knowledgeSchemaHelper_ = std::make_shared<NativeRdb::KnowledgeSchemaHelper>();
3050     }
3051     return knowledgeSchemaHelper_;
3052 }
3053 
IsKnowledgeDataChange(const DistributedRdb::RdbChangedData & rdbChangedData)3054 bool RdbStoreImpl::IsKnowledgeDataChange(const DistributedRdb::RdbChangedData &rdbChangedData)
3055 {
3056     for (const auto &item : rdbChangedData.tableData) {
3057         if (item.second.isKnowledgeDataChange) {
3058             return true;
3059         }
3060     }
3061     return false;
3062 }
3063 
IsNotifyService(const DistributedRdb::RdbChangedData & rdbChangedData)3064 bool RdbStoreImpl::IsNotifyService(const DistributedRdb::RdbChangedData &rdbChangedData)
3065 {
3066     for (const auto &item : rdbChangedData.tableData) {
3067         if (item.second.isP2pSyncDataChange || item.second.isTrackedDataChange) {
3068             return true;
3069         }
3070     }
3071     return false;
3072 }
3073 
RegisterAlgo(const std::string & clstAlgoName,ClusterAlgoFunc func)3074 int RdbStoreImpl::RegisterAlgo(const std::string &clstAlgoName, ClusterAlgoFunc func)
3075 {
3076     if (!config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
3077         return E_NOT_SUPPORT;
3078     }
3079 
3080     auto [ret, conn] = GetConn(false);
3081     if (ret != E_OK) {
3082         LOG_ERROR("The database is busy or closed when RegisterAlgo ret %{public}d.", ret);
3083         return ret;
3084     }
3085     return conn->RegisterAlgo(clstAlgoName, func);
3086 }
3087 } // namespace OHOS::NativeRdb
3088