1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17
18 #include <dlfcn.h>
19 #include <sys/stat.h>
20 #include <unistd.h>
21
22 #include <algorithm>
23 #include <chrono>
24 #include <cinttypes>
25 #include <cstdint>
26 #include <fstream>
27 #include <memory>
28 #include <mutex>
29 #include <sstream>
30 #include <sys/stat.h>
31 #include <string>
32
33 #include "cache_result_set.h"
34 #include "connection_pool.h"
35 #include "delay_notify.h"
36 #include "directory_ex.h"
37 #include "knowledge_schema_helper.h"
38 #include "logger.h"
39 #include "rdb_common.h"
40 #include "rdb_errno.h"
41 #include "rdb_fault_hiview_reporter.h"
42 #include "rdb_local_db_observer.h"
43 #include "rdb_perfStat.h"
44 #include "rdb_radar_reporter.h"
45 #include "rdb_stat_reporter.h"
46 #include "rdb_security_manager.h"
47 #include "rdb_sql_log.h"
48 #include "rdb_sql_utils.h"
49 #include "rdb_sql_statistic.h"
50 #include "rdb_store.h"
51 #include "rdb_trace.h"
52 #include "rdb_types.h"
53 #include "relational_store_client.h"
54 #include "sqlite_global_config.h"
55 #include "sqlite_sql_builder.h"
56 #include "sqlite_utils.h"
57 #include "step_result_set.h"
58 #include "suspender.h"
59 #include "task_executor.h"
60 #include "traits.h"
61 #include "transaction.h"
62 #include "values_buckets.h"
63 #if !defined(CROSS_PLATFORM)
64 #include "raw_data_parser.h"
65 #include "rdb_manager_impl.h"
66 #include "relational_store_manager.h"
67 #include "security_policy.h"
68 #include "sqlite_shared_result_set.h"
69 #endif
70
71 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
72 #include "security_policy.h"
73 #endif
74 #ifdef WINDOWS_PLATFORM
75 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
76 #else
77 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
78 #endif
79 #include "rdb_time_utils.h"
80
81 namespace OHOS::NativeRdb {
82 using namespace OHOS::Rdb;
83 using namespace std::chrono;
84 using SqlStatistic = DistributedRdb::SqlStatistic;
85 using PerfStat = DistributedRdb::PerfStat;
86 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
87 using Reportor = RdbFaultHiViewReporter;
88
89 #if !defined(CROSS_PLATFORM)
90 using RdbMgr = DistributedRdb::RdbManagerImpl;
91 #endif
92
93 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
94 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
95 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
96 static constexpr const char *BACKUP_RESTORE = "backup.restore";
97 static constexpr const char *ASYNC_RESTORE = "-async.restore";
98 constexpr int64_t TIME_OUT = 1500;
99
InitSyncerParam(const RdbStoreConfig & config,bool created)100 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
101 {
102 syncerParam_.bundleName_ = config.GetBundleName();
103 syncerParam_.hapName_ = config.GetModuleName();
104 syncerParam_.storeName_ = config.GetName();
105 syncerParam_.customDir_ = config.GetCustomDir();
106 syncerParam_.area_ = config.GetArea();
107 syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
108 syncerParam_.type_ = config.GetDistributedType();
109 syncerParam_.isEncrypt_ = config.IsEncrypt();
110 syncerParam_.isAutoClean_ = config.GetAutoClean();
111 syncerParam_.isSearchable_ = config.IsSearchable();
112 syncerParam_.password_ = config.GetEncryptKey();
113 syncerParam_.haMode_ = config.GetHaMode();
114 syncerParam_.roleType_ = config.GetRoleType();
115 syncerParam_.tokenIds_ = config.GetPromiseInfo().tokenIds_;
116 syncerParam_.uids_ = config.GetPromiseInfo().uids_;
117 syncerParam_.user_ = config.GetPromiseInfo().user_;
118 syncerParam_.permissionNames_ = config.GetPromiseInfo().permissionNames_;
119 syncerParam_.subUser_ = config.GetSubUser();
120 syncerParam_.dfxInfo_.lastOpenTime_ = RdbTimeUtils::GetCurSysTimeWithMs();
121 if (created) {
122 syncerParam_.infos_ = Connection::Collect(config);
123 }
124 }
125
InnerOpen()126 int RdbStoreImpl::InnerOpen()
127 {
128 isOpen_ = true;
129 #if !defined(CROSS_PLATFORM)
130 // Only owner mode can store metadata information.
131 if (isReadOnly_ || isMemoryRdb_ || config_.IsCustomEncryptParam() || (config_.GetRoleType() != OWNER)) {
132 return E_OK;
133 }
134 if (config_.GetEnableSemanticIndex()) {
135 SetKnowledgeSchema();
136 }
137 AfterOpen(syncerParam_);
138 if (config_.GetDBType() == DB_VECTOR || (!config_.IsSearchable() && !config_.GetEnableSemanticIndex())) {
139 return E_OK;
140 }
141 int errCode = RegisterDataChangeCallback();
142 if (errCode != E_OK) {
143 LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
144 }
145 #endif
146 return E_OK;
147 }
148
InitReportFunc(const RdbParam & param)149 void RdbStoreImpl::InitReportFunc(const RdbParam ¶m)
150 {
151 #if !defined(CROSS_PLATFORM)
152 reportFunc_ = std::make_shared<ReportFunc>([reportParam = param](const DistributedRdb::RdbStatEvent &event) {
153 auto [err, service] = RdbMgr::GetInstance().GetRdbService(reportParam);
154 if (err != E_OK || service == nullptr) {
155 LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
156 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
157 return;
158 }
159 err = service->ReportStatistic(reportParam, event);
160 if (err != E_OK) {
161 LOG_ERROR("ReportStatistic failed, err: %{public}d, storeName: %{public}s.", err,
162 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
163 }
164 return;
165 });
166 #endif
167 }
168
Close()169 void RdbStoreImpl::Close()
170 {
171 {
172 std::unique_lock<decltype(poolMutex_)> lock(poolMutex_);
173 if (connectionPool_) {
174 connectionPool_->CloseAllConnections();
175 connectionPool_.reset();
176 }
177 }
178 {
179 std::lock_guard<decltype(mutex_)> guard(mutex_);
180 for (auto &it : transactions_) {
181 auto trans = it.lock();
182 if (trans != nullptr) {
183 trans->Close();
184 }
185 }
186 transactions_ = {};
187 }
188 {
189 std::lock_guard<decltype(helperMutex_)> autoLock(helperMutex_);
190 if (knowledgeSchemaHelper_ != nullptr) {
191 knowledgeSchemaHelper_->Close();
192 }
193 }
194 }
195
GetPool() const196 std::shared_ptr<ConnectionPool> RdbStoreImpl::GetPool() const
197 {
198 std::shared_lock<decltype(poolMutex_)> lock(poolMutex_);
199 return connectionPool_;
200 }
201
GetConn(bool isRead)202 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::GetConn(bool isRead)
203 {
204 auto pool = GetPool();
205 if (pool == nullptr) {
206 return { E_ALREADY_CLOSED, nullptr };
207 }
208
209 auto connection = pool->AcquireConnection(isRead);
210 if (connection == nullptr) {
211 return { E_DATABASE_BUSY, nullptr };
212 }
213 return { E_OK, connection };
214 }
215
216 #if !defined(CROSS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)217 void RdbStoreImpl::AfterOpen(const RdbParam ¶m, int32_t retry)
218 {
219 auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
220 if (err == E_NOT_SUPPORT) {
221 return;
222 }
223 if (err != E_OK || service == nullptr) {
224 if (err != E_INVALID_ARGS) {
225 LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
226 SqliteUtils::Anonymous(param.storeName_).c_str());
227 }
228 auto pool = TaskExecutor::GetInstance().GetExecutor();
229 if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry < MAX_RETRY_TIMES) {
230 retry++;
231 pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
232 AfterOpen(param, retry);
233 });
234 }
235 return;
236 }
237 err = service->AfterOpen(param);
238 if (err != E_OK) {
239 LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
240 SqliteUtils::Anonymous(param.storeName_).c_str());
241 }
242 }
243
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)244 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(
245 const std::string &table, const std::string &columnName, std::vector<PRIKey> &keys)
246 {
247 if (table.empty() || columnName.empty() || keys.empty()) {
248 LOG_ERROR("Invalid para.");
249 return {};
250 }
251
252 auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
253 if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
254 return GetModifyTimeByRowId(logTable, keys);
255 }
256 std::vector<ValueObject> hashKeys;
257 hashKeys.reserve(keys.size());
258 std::map<std::vector<uint8_t>, PRIKey> keyMap;
259 std::map<std::string, DistributedDB::Type> tmp;
260 for (const auto &key : keys) {
261 DistributedDB::Type value;
262 RawDataParser::Convert(key, value);
263 tmp[columnName] = value;
264 auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
265 if (hashKey.empty()) {
266 LOG_DEBUG("Hash key fail.");
267 continue;
268 }
269 hashKeys.emplace_back(ValueObject(hashKey));
270 keyMap[hashKey] = key;
271 }
272
273 std::string sql;
274 sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
275 sql.append(logTable);
276 sql.append(" where hash_key in (");
277 sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
278 sql.append(")");
279 auto resultSet = QueryByStep(sql, hashKeys, true);
280 int count = 0;
281 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
282 LOG_ERROR("Get resultSet err.");
283 return {};
284 }
285 return { resultSet, keyMap, false };
286 }
287
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)288 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
289 {
290 std::string sql;
291 sql.append("select data_key as key, timestamp/10000 as modify_time from ");
292 sql.append(logTable);
293 sql.append(" where data_key in (");
294 sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
295 sql.append(")");
296 std::vector<ValueObject> args;
297 args.reserve(keys.size());
298 for (auto &key : keys) {
299 ValueObject::Type value;
300 RawDataParser::Convert(key, value);
301 args.emplace_back(ValueObject(value));
302 }
303 auto resultSet = QueryByStep(sql, args, true);
304 int count = 0;
305 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
306 LOG_ERROR("Get resultSet err.");
307 return {};
308 }
309 return ModifyTime(resultSet, {}, true);
310 }
311
CleanDirtyData(const std::string & table,uint64_t cursor)312 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
313 {
314 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || isMemoryRdb_) {
315 LOG_ERROR("Not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d, isMemoryRdb:%{public}d.",
316 SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType(), isMemoryRdb_);
317 return E_NOT_SUPPORT;
318 }
319 auto [errCode, conn] = GetConn(false);
320 if (errCode != E_OK) {
321 LOG_ERROR("The database is busy or closed.");
322 return errCode;
323 }
324 errCode = conn->CleanDirtyData(table, cursor);
325 return errCode;
326 }
327
GetLogTableName(const std::string & tableName)328 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
329 {
330 return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
331 }
332
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)333 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(
334 const AbsRdbPredicates &predicates, const Fields &columns)
335 {
336 if (config_.GetDBType() == DB_VECTOR) {
337 return { E_NOT_SUPPORT, nullptr };
338 }
339 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
340 if (errCode != E_OK) {
341 return { errCode, nullptr };
342 }
343 auto [status, resultSet] =
344 service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
345 if (status != E_OK) {
346 return { status, nullptr };
347 }
348 return { status, resultSet };
349 }
350
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)351 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(
352 const std::string &device, const AbsRdbPredicates &predicates, const Fields &columns, int &errCode)
353 {
354 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
355 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
356 return nullptr;
357 }
358 std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
359 std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
360 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
361 if (err == E_NOT_SUPPORT) {
362 errCode = err;
363 return nullptr;
364 }
365 if (err != E_OK) {
366 LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed.");
367 errCode = err;
368 return nullptr;
369 }
370 auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
371 errCode = status;
372 return resultSet;
373 }
374
NotifyDataChange()375 void RdbStoreImpl::NotifyDataChange()
376 {
377 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
378 return;
379 }
380 config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, false);
381 int errCode = RegisterDataChangeCallback();
382 if (errCode != E_OK) {
383 LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
384 }
385 DistributedRdb::RdbChangedData rdbChangedData;
386 if (delayNotifier_ != nullptr) {
387 delayNotifier_->UpdateNotify(rdbChangedData, true);
388 }
389 }
390
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)391 int RdbStoreImpl::SetDistributedTables(
392 const std::vector<std::string> &tables, int32_t type, const DistributedRdb::DistributedConfig &distributedConfig)
393 {
394 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
395 if (config_.GetDBType() == DB_VECTOR || isReadOnly_ || isMemoryRdb_) {
396 return E_NOT_SUPPORT;
397 }
398 if (tables.empty()) {
399 LOG_WARN("The distributed tables to be set is empty.");
400 return E_OK;
401 }
402 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
403 if (errCode != E_OK) {
404 return errCode;
405 }
406 syncerParam_.asyncDownloadAsset_ = distributedConfig.asyncDownloadAsset;
407 syncerParam_.enableCloud_ = distributedConfig.enableCloud;
408 int32_t errorCode = service->SetDistributedTables(
409 syncerParam_, tables, distributedConfig.references, distributedConfig.isRebuild, type);
410 if (errorCode != E_OK) {
411 LOG_ERROR("Fail to set distributed tables, error=%{public}d.", errorCode);
412 return errorCode;
413 }
414 if (type == DistributedRdb::DISTRIBUTED_DEVICE && !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
415 RegisterDataChangeCallback();
416 }
417 if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
418 return E_OK;
419 }
420
421 return HandleCloudSyncAfterSetDistributedTables(tables, distributedConfig);
422 }
423
Rekey(const RdbStoreConfig::CryptoParam & cryptoParam)424 int32_t RdbStoreImpl::Rekey(const RdbStoreConfig::CryptoParam &cryptoParam)
425 {
426 if (config_.GetDBType() == DB_VECTOR || isReadOnly_ || isMemoryRdb_) {
427 return E_NOT_SUPPORT;
428 }
429 if (!cryptoParam.IsValid()) {
430 LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config_.GetName()).c_str());
431 return E_INVALID_ARGS_NEW;
432 }
433 if (!config_.IsEncrypt() || !config_.GetCryptoParam().Equal(cryptoParam) ||
434 (config_.IsCustomEncryptParam() == cryptoParam.encryptKey_.empty())) {
435 LOG_ERROR("Not supported! name:%{public}s, [%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}u]"
436 "->[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}u]",
437 SqliteUtils::Anonymous(config_.GetName()).c_str(), config_.GetCryptoParam().encryptKey_.empty(),
438 config_.GetCryptoParam().iterNum, config_.GetCryptoParam().encryptAlgo, config_.GetCryptoParam().hmacAlgo,
439 config_.GetCryptoParam().kdfAlgo, config_.GetCryptoParam().cryptoPageSize, cryptoParam.encryptKey_.empty(),
440 cryptoParam.iterNum, cryptoParam.encryptAlgo, cryptoParam.hmacAlgo,
441 cryptoParam.kdfAlgo, cryptoParam.cryptoPageSize);
442 return E_NOT_SUPPORT;
443 }
444
445 auto pool = GetPool();
446 if (pool == nullptr) {
447 LOG_ERROR("Database already closed.");
448 return E_ALREADY_CLOSED;
449 }
450
451 #if !defined(CROSS_PLATFORM)
452 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
453 if (service != nullptr) {
454 service->Disable(syncerParam_);
455 }
456 #endif
457 LOG_INFO("Start rekey, name:%{public}s, IsCustomEncrypt:%{public}d. ",
458 SqliteUtils::Anonymous(config_.GetName()).c_str(), config_.IsCustomEncryptParam());
459 auto errCode = pool->Rekey(cryptoParam);
460 #if !defined(CROSS_PLATFORM)
461 if (service != nullptr) {
462 service->Enable(syncerParam_);
463 if (errCode == E_OK && !config_.IsCustomEncryptParam()) {
464 auto syncerParam = syncerParam_;
465 syncerParam.password_ = config_.GetEncryptKey();
466 service->AfterOpen(syncerParam);
467 }
468 }
469 #endif
470 return errCode;
471 }
472
HandleCloudSyncAfterSetDistributedTables(const std::vector<std::string> & tables,const DistributedRdb::DistributedConfig & distributedConfig)473 int RdbStoreImpl::HandleCloudSyncAfterSetDistributedTables(
474 const std::vector<std::string> &tables, const DistributedRdb::DistributedConfig &distributedConfig)
475 {
476 auto pool = GetPool();
477 if (pool == nullptr) {
478 return E_ALREADY_CLOSED;
479 }
480 auto conn = pool->AcquireConnection(false);
481 if (conn != nullptr) {
482 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
483 if (strategy == ExchangeStrategy::BACKUP) {
484 (void)conn->Backup({}, {}, false, slaveStatus_);
485 }
486 }
487 {
488 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
489 if (distributedConfig.enableCloud && distributedConfig.autoSync) {
490 cloudInfo_->AddTables(tables);
491 } else {
492 cloudInfo_->RmvTables(tables);
493 return E_OK;
494 }
495 }
496 auto isRebuilt = RebuiltType::NONE;
497 GetRebuilt(isRebuilt);
498 if (isRebuilt == RebuiltType::REBUILT) {
499 DoCloudSync("");
500 }
501 return E_OK;
502 }
503
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)504 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
505 {
506 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
507 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
508 return "";
509 }
510 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
511 if (err != E_OK || service == nullptr) {
512 errCode = err;
513 return "";
514 }
515 auto tableName = service->ObtainDistributedTableName(syncerParam_, device, table);
516 errCode = tableName.empty() ? E_ERROR : E_OK;
517 return tableName;
518 }
519
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)520 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
521 {
522 if (config_.GetDBType() == DB_VECTOR) {
523 return E_NOT_SUPPORT;
524 }
525 return Sync(option, predicate, [callback](Details &&details) {
526 Briefs briefs;
527 for (auto &[key, value] : details) {
528 briefs.insert_or_assign(key, value.code);
529 }
530 if (callback != nullptr) {
531 callback(briefs);
532 }
533 });
534 }
535
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)536 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
537 {
538 return Sync(option, AbsRdbPredicates(tables), async);
539 }
540
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)541 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
542 {
543 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
544 if (isMemoryRdb_) {
545 return E_NOT_SUPPORT;
546 }
547 DistributedRdb::RdbService::Option rdbOption;
548 rdbOption.mode = option.mode;
549 rdbOption.isAsync = !option.isBlock;
550 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
551 ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
552 return ret;
553 }
554
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)555 int RdbStoreImpl::InnerSync(
556 const RdbParam ¶m, const Options &option, const Memo &predicates, const AsyncDetail &async)
557 {
558 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
559 if (errCode == E_NOT_SUPPORT) {
560 return errCode;
561 }
562 if (errCode != E_OK) {
563 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
564 param.bundleName_.c_str());
565 return errCode;
566 }
567 errCode = service->Sync(param, option, predicates, async);
568 if (errCode != E_OK) {
569 LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
570 return errCode;
571 }
572 return E_OK;
573 }
574
GetUri(const std::string & event)575 std::string RdbStoreImpl::GetUri(const std::string &event)
576 {
577 std::string rdbUri;
578 if (config_.GetDataGroupId().empty()) {
579 rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
580 } else {
581 rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
582 Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_GROUPID_INFO, config_.GetBundleName(),
583 "GetUri GroupId db:[" + SqliteUtils::Anonymous(name_) + "]"));
584 }
585 return rdbUri;
586 }
587
SubscribeLocal(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)588 int RdbStoreImpl::SubscribeLocal(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
589 {
590 std::lock_guard<std::mutex> lock(mutex_);
591 localObservers_.try_emplace(option.event);
592 auto &list = localObservers_.find(option.event)->second;
593 for (auto it = list.begin(); it != list.end(); it++) {
594 if ((*it)->getObserver() == observer) {
595 LOG_ERROR("Duplicate subscribe.");
596 return E_OK;
597 }
598 }
599
600 localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
601 return E_OK;
602 }
603
SubscribeLocalShared(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)604 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
605 {
606 return obsManger_.Register(GetUri(option.event), observer);
607 }
608
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)609 int32_t RdbStoreImpl::SubscribeLocalDetail(
610 const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
611 {
612 if (observer == nullptr) {
613 return E_OK;
614 }
615 std::lock_guard<std::mutex> lock(mutex_);
616 for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end(); it++) {
617 if ((*it)->GetObserver() == observer) {
618 LOG_WARN("duplicate subscribe.");
619 return E_OK;
620 }
621 }
622 auto localStoreObserver = std::make_shared<RdbStoreLocalDbObserver>(observer);
623 auto [errCode, conn] = GetConn(false);
624 if (conn == nullptr) {
625 return errCode;
626 }
627 errCode = conn->Subscribe(localStoreObserver);
628 if (errCode != E_OK) {
629 LOG_ERROR("Subscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
630 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
631 return errCode;
632 }
633 config_.SetRegisterInfo(RegisterType::STORE_OBSERVER, true);
634 localDetailObservers_.emplace_back(localStoreObserver);
635 return E_OK;
636 }
637
SubscribeRemote(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)638 int RdbStoreImpl::SubscribeRemote(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
639 {
640 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
641 if (errCode != E_OK) {
642 return errCode;
643 }
644 return service->Subscribe(syncerParam_, option, observer);
645 }
646
Subscribe(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)647 int RdbStoreImpl::Subscribe(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
648 {
649 if (config_.GetDBType() == DB_VECTOR) {
650 return E_NOT_SUPPORT;
651 }
652 if (option.mode == SubscribeMode::LOCAL) {
653 return SubscribeLocal(option, observer);
654 }
655 if (isMemoryRdb_) {
656 return E_NOT_SUPPORT;
657 }
658 if (option.mode == SubscribeMode::LOCAL_SHARED) {
659 return SubscribeLocalShared(option, observer);
660 }
661 return SubscribeRemote(option, observer);
662 }
663
UnSubscribeLocal(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)664 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
665 {
666 std::lock_guard<std::mutex> lock(mutex_);
667 auto obs = localObservers_.find(option.event);
668 if (obs == localObservers_.end()) {
669 return E_OK;
670 }
671
672 auto &list = obs->second;
673 for (auto it = list.begin(); it != list.end();) {
674 if (observer == nullptr || (*it)->getObserver() == observer) {
675 it = list.erase(it);
676 if (observer != nullptr) {
677 break;
678 }
679 } else {
680 it++;
681 }
682 }
683
684 if (list.empty()) {
685 localObservers_.erase(option.event);
686 }
687 return E_OK;
688 }
689
UnSubscribeLocalShared(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)690 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
691 {
692 return obsManger_.Unregister(GetUri(option.event), observer);
693 }
694
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)695 int32_t RdbStoreImpl::UnsubscribeLocalDetail(
696 const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
697 {
698 auto [errCode, conn] = GetConn(false);
699 if (conn == nullptr) {
700 return errCode;
701 }
702 std::lock_guard<std::mutex> lock(mutex_);
703 for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end();) {
704 if (observer == nullptr || (*it)->GetObserver() == observer) {
705 int32_t err = conn->Unsubscribe(*it);
706 if (err != 0) {
707 LOG_ERROR("Unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
708 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
709 return err;
710 }
711 it = localDetailObservers_.erase(it);
712 if (observer != nullptr) {
713 break;
714 }
715 } else {
716 it++;
717 }
718 }
719 return E_OK;
720 }
721
UnSubscribeRemote(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)722 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
723 {
724 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
725 if (errCode != E_OK) {
726 return errCode;
727 }
728 return service->UnSubscribe(syncerParam_, option, observer);
729 }
730
UnSubscribe(const SubscribeOption & option,std::shared_ptr<RdbStoreObserver> observer)731 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, std::shared_ptr<RdbStoreObserver> observer)
732 {
733 if (config_.GetDBType() == DB_VECTOR) {
734 return E_NOT_SUPPORT;
735 }
736 if (option.mode == SubscribeMode::LOCAL) {
737 return UnSubscribeLocal(option, observer);
738 }
739 if (isMemoryRdb_) {
740 return E_NOT_SUPPORT;
741 }
742 if (option.mode == SubscribeMode::LOCAL_SHARED) {
743 return UnSubscribeLocalShared(option, observer);
744 }
745 return UnSubscribeRemote(option, observer);
746 }
747
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)748 int RdbStoreImpl::SubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
749 {
750 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
751 return E_NOT_SUPPORT;
752 }
753 return SubscribeLocalDetail(option, observer);
754 }
755
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)756 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
757 {
758 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
759 return E_NOT_SUPPORT;
760 }
761 return UnsubscribeLocalDetail(option, observer);
762 }
763
Notify(const std::string & event)764 int RdbStoreImpl::Notify(const std::string &event)
765 {
766 if (config_.GetDBType() == DB_VECTOR) {
767 return E_NOT_SUPPORT;
768 }
769
770 {
771 std::lock_guard<std::mutex> lock(mutex_);
772 auto obs = localObservers_.find(event);
773 if (obs != localObservers_.end()) {
774 auto &list = obs->second;
775 for (auto &it : list) {
776 it->OnChange();
777 }
778 }
779 }
780 if (isMemoryRdb_) {
781 return E_OK;
782 }
783 int32_t err = obsManger_.Notify(GetUri(event));
784 if (err != 0) {
785 LOG_ERROR("Notify failed.");
786 }
787 return E_OK;
788 }
789
SetSearchable(bool isSearchable)790 int RdbStoreImpl::SetSearchable(bool isSearchable)
791 {
792 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
793 return E_NOT_SUPPORT;
794 }
795 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
796 if (errCode != E_OK || service == nullptr) {
797 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
798 return errCode;
799 }
800 return service->SetSearchable(syncerParam_, isSearchable);
801 }
802
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)803 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
804 {
805 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
806 return E_NOT_SUPPORT;
807 }
808 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
809 if (errCode != E_OK) {
810 return errCode;
811 }
812 return service->RegisterAutoSyncCallback(syncerParam_, observer);
813 }
814
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)815 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
816 {
817 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
818 return E_NOT_SUPPORT;
819 }
820 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
821 if (errCode != E_OK) {
822 return errCode;
823 }
824 return service->UnregisterAutoSyncCallback(syncerParam_, observer);
825 }
826
InitDelayNotifier()827 void RdbStoreImpl::InitDelayNotifier()
828 {
829 if (delayNotifier_ != nullptr) {
830 return;
831 }
832 delayNotifier_ = std::make_shared<DelayNotify>();
833 if (delayNotifier_ == nullptr) {
834 LOG_ERROR("Init delay notifier failed.");
835 return;
836 }
837 std::weak_ptr<NativeRdb::KnowledgeSchemaHelper> helper = GetKnowledgeSchemaHelper();
838 delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
839 delayNotifier_->SetTask([param = syncerParam_, helper = helper](
840 const DistributedRdb::RdbChangedData &rdbChangedData, const RdbNotifyConfig &rdbNotifyConfig) -> int {
841 if (IsKnowledgeDataChange(rdbChangedData)) {
842 auto realHelper = helper.lock();
843 if (realHelper == nullptr) {
844 LOG_WARN("knowledge helper is nullptr");
845 } else {
846 realHelper->DonateKnowledgeData();
847 }
848 }
849 if (!IsNotifyService(rdbChangedData)) {
850 return E_OK;
851 }
852 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
853 if (errCode == E_NOT_SUPPORT) {
854 return errCode;
855 }
856 if (errCode != E_OK || service == nullptr) {
857 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
858 return errCode;
859 }
860 return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
861 });
862 }
863
RegisterDataChangeCallback()864 int RdbStoreImpl::RegisterDataChangeCallback()
865 {
866 InitDelayNotifier();
867 auto connPool = GetPool();
868 if (connPool == nullptr) {
869 return E_ALREADY_CLOSED;
870 }
871
872 RegisterDataChangeCallback(delayNotifier_, connPool, 0);
873 config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, true);
874 return E_OK;
875 }
876
RegisterDataChangeCallback(std::shared_ptr<DelayNotify> delayNotifier,std::weak_ptr<ConnectionPool> connPool,int retry)877 void RdbStoreImpl::RegisterDataChangeCallback(
878 std::shared_ptr<DelayNotify> delayNotifier, std::weak_ptr<ConnectionPool> connPool, int retry)
879 {
880 auto relConnPool = connPool.lock();
881 if (relConnPool == nullptr) {
882 return;
883 }
884 auto conn = relConnPool->AcquireConnection(false);
885 if (conn == nullptr) {
886 relConnPool->Dump(true, "DATACHANGE");
887 auto pool = TaskExecutor::GetInstance().GetExecutor();
888 if (pool != nullptr && retry < MAX_RETRY_TIMES) {
889 retry++;
890 pool->Schedule(std::chrono::seconds(1),
891 [delayNotifier, connPool, retry]() { RegisterDataChangeCallback(delayNotifier, connPool, retry); });
892 }
893 return;
894 }
895 auto callBack = [delayNotifier](const DistributedRdb::RdbChangedData &rdbChangedData) {
896 if (delayNotifier != nullptr) {
897 delayNotifier->UpdateNotify(rdbChangedData);
898 }
899 };
900 auto errCode = conn->SubscribeTableChanges(callBack);
901 if (errCode != E_OK) {
902 return;
903 }
904 }
905
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)906 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
907 {
908 std::string table = predicates.GetTableName();
909 if (table.empty()) {
910 return E_EMPTY_TABLE_NAME;
911 }
912 auto logTable = GetLogTableName(table);
913 std::string sql;
914 sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
915 sql.append(" INNER JOIN ").append(table).append(" ON ");
916 sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
917 auto whereClause = predicates.GetWhereClause();
918 if (!whereClause.empty()) {
919 sql.append(" WHERE ").append(SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + "."));
920 }
921
922 auto result = QuerySql(sql, predicates.GetBindArgs());
923 if (result == nullptr) {
924 return E_ALREADY_CLOSED;
925 }
926 int count = 0;
927 if (result->GetRowCount(count) != E_OK) {
928 return E_NO_ROW_IN_QUERY;
929 }
930
931 if (count <= 0) {
932 return E_NO_ROW_IN_QUERY;
933 }
934 while (result->GoToNextRow() == E_OK) {
935 std::vector<uint8_t> hashKey;
936 if (result->GetBlob(0, hashKey) != E_OK) {
937 return E_ERROR;
938 }
939 hashKeys.push_back(std::move(hashKey));
940 }
941 return E_OK;
942 }
943
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)944 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
945 {
946 if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
947 return E_NOT_SUPPORT;
948 }
949 std::vector<std::vector<uint8_t>> hashKeys;
950 int ret = GetHashKeyForLockRow(predicates, hashKeys);
951 if (ret != E_OK) {
952 LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
953 return ret;
954 }
955 Suspender suspender(Suspender::SQL_LOG);
956 auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
957 if (statement == nullptr || err != E_OK) {
958 return err;
959 }
960 int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
961 if (errCode == E_WAIT_COMPENSATED_SYNC) {
962 LOG_DEBUG("Start compensation sync.");
963 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
964 auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
965 InnerSync(syncerParam_, option, memo, nullptr);
966 return E_OK;
967 }
968 if (errCode != E_OK) {
969 LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
970 }
971 return errCode;
972 }
973
LockCloudContainer()974 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
975 {
976 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
977 if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
978 return { E_NOT_SUPPORT, 0 };
979 }
980 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
981 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
982 if (errCode == E_NOT_SUPPORT) {
983 LOG_ERROR("not support");
984 return { errCode, 0 };
985 }
986 if (errCode != E_OK) {
987 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
988 syncerParam_.bundleName_.c_str());
989 return { errCode, 0 };
990 }
991 auto result = service->LockCloudContainer(syncerParam_);
992 if (result.first != E_OK) {
993 LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
994 }
995 return result;
996 }
997
UnlockCloudContainer()998 int32_t RdbStoreImpl::UnlockCloudContainer()
999 {
1000 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1001 if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
1002 return E_NOT_SUPPORT;
1003 }
1004 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
1005 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
1006 if (errCode == E_NOT_SUPPORT) {
1007 LOG_ERROR("not support");
1008 return errCode;
1009 }
1010 if (errCode != E_OK) {
1011 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
1012 syncerParam_.bundleName_.c_str());
1013 return errCode;
1014 }
1015 errCode = service->UnlockCloudContainer(syncerParam_);
1016 if (errCode != E_OK) {
1017 LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
1018 }
1019 return errCode;
1020 }
1021 #endif
1022
RdbStoreImpl(const RdbStoreConfig & config)1023 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
1024 : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
1025 fileType_(config.GetDatabaseFileType())
1026 {
1027 SqliteGlobalConfig::GetDbPath(config_, path_);
1028 isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
1029 }
1030
ProcessOpenCallback(int version,RdbOpenCallback & openCallback)1031 int32_t RdbStoreImpl::ProcessOpenCallback(int version, RdbOpenCallback &openCallback)
1032 {
1033 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1034 int32_t errCode = E_OK;
1035 if (version == -1) {
1036 return errCode;
1037 }
1038
1039 int32_t currentVersion;
1040 errCode = GetVersion(currentVersion);
1041 if (errCode != E_OK) {
1042 return errCode;
1043 }
1044
1045 if (version == currentVersion) {
1046 return openCallback.OnOpen(*this);
1047 }
1048
1049 if (currentVersion == 0) {
1050 errCode = openCallback.OnCreate(*this);
1051 } else if (version > currentVersion) {
1052 errCode = openCallback.OnUpgrade(*this, currentVersion, version);
1053 } else {
1054 errCode = openCallback.OnDowngrade(*this, currentVersion, version);
1055 }
1056
1057 if (errCode == E_OK) {
1058 errCode = SetVersion(version);
1059 }
1060
1061 if (errCode != E_OK) {
1062 LOG_ERROR("openCallback failed. version: %{public}d -> %{public}d, errCode:%{public}d",
1063 currentVersion, version, errCode);
1064 return errCode;
1065 }
1066
1067 return openCallback.OnOpen(*this);
1068 }
1069
TryAsyncRepair()1070 bool RdbStoreImpl::TryAsyncRepair()
1071 {
1072 std::string slavePath = SqliteUtils::GetSlavePath(path_);
1073 if (!IsUseAsyncRestore(path_, slavePath)) {
1074 return false;
1075 }
1076
1077 int errCode = Connection::CheckReplicaIntegrity(config_);
1078 if (errCode != E_OK) {
1079 return false;
1080 }
1081
1082 SqliteUtils::DeleteDirtyFiles(path_);
1083 auto pool = ConnectionPool::Create(config_, errCode);
1084 if (errCode != E_OK) {
1085 LOG_WARN("create new connection failed");
1086 return false;
1087 }
1088 connectionPool_ = pool;
1089 errCode = StartAsyncRestore(pool);
1090 if (errCode != E_OK) {
1091 return false;
1092 }
1093 rebuild_ = RebuiltType::REPAIRED;
1094
1095 Reportor::ReportRestore(Reportor::Create(config_, E_OK, "RestoreType:Rebuild", false), false);
1096 return true;
1097 }
CreatePool(bool & created)1098 int32_t RdbStoreImpl::CreatePool(bool &created)
1099 {
1100 int32_t errCode = E_OK;
1101 connectionPool_ = ConnectionPool::Create(config_, errCode);
1102 if (connectionPool_ == nullptr && (errCode == E_SQLITE_CORRUPT || errCode == E_INVALID_SECRET_KEY) &&
1103 !isReadOnly_) {
1104 LOG_ERROR("database corrupt, errCode:0x%{public}x, %{public}s, %{public}s", errCode,
1105 SqliteUtils::Anonymous(name_).c_str(),
1106 SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config_), "master").c_str());
1107 #if !defined(CROSS_PLATFORM)
1108 InitSyncerParam(config_, false);
1109 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
1110 if (service != nullptr) {
1111 service->Disable(syncerParam_);
1112 }
1113 #endif
1114 config_.SetIter(0);
1115 if (config_.IsEncrypt() && config_.GetAllowRebuild()) {
1116 auto key = config_.GetEncryptKey();
1117 RdbSecurityManager::GetInstance().RestoreKeyFile(path_, key);
1118 key.assign(key.size(), 0);
1119 }
1120
1121 if (TryAsyncRepair()) {
1122 errCode = E_OK;
1123 } else {
1124 std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
1125 }
1126 created = true;
1127 #if !defined(CROSS_PLATFORM)
1128 if (service != nullptr) {
1129 service->Enable(syncerParam_);
1130 }
1131 #endif
1132 }
1133 return errCode;
1134 }
1135
SetSecurityLabel(const RdbStoreConfig & config)1136 int32_t RdbStoreImpl::SetSecurityLabel(const RdbStoreConfig &config)
1137 {
1138 if (config.IsMemoryRdb()) {
1139 return E_OK;
1140 }
1141 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
1142 return SecurityPolicy::SetSecurityLabel(config);
1143 #endif
1144 return E_OK;
1145 }
1146
Init(int version,RdbOpenCallback & openCallback)1147 int32_t RdbStoreImpl::Init(int version, RdbOpenCallback &openCallback)
1148 {
1149 if (initStatus_ != -1) {
1150 return initStatus_;
1151 }
1152 std::lock_guard<std::mutex> lock(initMutex_);
1153 if (initStatus_ != -1) {
1154 return initStatus_;
1155 }
1156 int32_t errCode = E_OK;
1157 bool created = access(path_.c_str(), F_OK) != 0;
1158 errCode = CreatePool(created);
1159 if (connectionPool_ == nullptr || errCode != E_OK) {
1160 connectionPool_ = nullptr;
1161 LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
1162 SqliteUtils::Anonymous(path_).c_str());
1163 return errCode;
1164 }
1165 InitSyncerParam(config_, created);
1166 InitReportFunc(syncerParam_);
1167 InnerOpen();
1168
1169 if (config_.GetRoleType() == OWNER && !config_.IsReadOnly()) {
1170 errCode = SetSecurityLabel(config_);
1171 if (errCode != E_OK) {
1172 return errCode;
1173 }
1174 (void) ExchangeSlaverToMaster();
1175 SwitchOver(true);
1176 errCode = ProcessOpenCallback(version, openCallback);
1177 SwitchOver(false);
1178 if (errCode != E_OK) {
1179 LOG_ERROR("Callback fail, path:%{public}s code:%{public}d", SqliteUtils::Anonymous(path_).c_str(), errCode);
1180 return errCode;
1181 }
1182 }
1183 initStatus_ = errCode;
1184 return initStatus_;
1185 }
1186
~RdbStoreImpl()1187 RdbStoreImpl::~RdbStoreImpl()
1188 {
1189 connectionPool_ = nullptr;
1190 trxConnMap_ = {};
1191 for (auto &trans : transactions_) {
1192 auto realTrans = trans.lock();
1193 if (realTrans) {
1194 (void)realTrans->Close();
1195 }
1196 }
1197 transactions_ = {};
1198 if (knowledgeSchemaHelper_ != nullptr) {
1199 knowledgeSchemaHelper_->Close();
1200 }
1201 *slaveStatus_ = SlaveStatus::DB_CLOSING;
1202 }
1203
GetConfig()1204 const RdbStoreConfig &RdbStoreImpl::GetConfig()
1205 {
1206 return config_;
1207 }
1208
Insert(const std::string & table,const Row & row,Resolution resolution)1209 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
1210 {
1211 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1212 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1213 return { E_NOT_SUPPORT, -1 };
1214 }
1215 RdbStatReporter reportStat(RDB_PERF, INSERT, config_, reportFunc_);
1216 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1217 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1218 auto [status, sqlInfo] = RdbSqlUtils::GetInsertSqlInfo(table, row, resolution);
1219 if (status != E_OK) {
1220 return { status, -1 };
1221 }
1222
1223 int64_t rowid = -1;
1224 auto errCode = ExecuteForLastInsertedRowId(rowid, sqlInfo.sql, sqlInfo.args);
1225 if (errCode == E_OK) {
1226 DoCloudSync(table);
1227 }
1228
1229 return { errCode, rowid };
1230 }
1231
BatchInsert(const std::string & table,const ValuesBuckets & rows)1232 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
1233 {
1234 if (isReadOnly_) {
1235 return { E_NOT_SUPPORT, -1 };
1236 }
1237
1238 if (rows.RowSize() == 0) {
1239 return { E_OK, 0 };
1240 }
1241
1242 RdbStatReporter reportStat(RDB_PERF, BATCHINSERT, config_, reportFunc_);
1243 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1244 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL, 0, rows.RowSize());
1245 auto pool = GetPool();
1246 if (pool == nullptr) {
1247 return { E_ALREADY_CLOSED, -1 };
1248 }
1249 auto conn = pool->AcquireConnection(false);
1250 if (conn == nullptr) {
1251 return { E_DATABASE_BUSY, -1 };
1252 }
1253
1254 auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable());
1255 BatchInsertArgsDfx(static_cast<int>(executeSqlArgs.size()));
1256 if (executeSqlArgs.empty()) {
1257 LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.",
1258 SqliteUtils::Anonymous(table).c_str(), rows.RowSize(), conn->GetMaxVariable());
1259 return { E_INVALID_ARGS, -1 };
1260 }
1261 PauseDelayNotify pauseDelayNotify(delayNotifier_);
1262 for (const auto &[sql, bindArgs] : executeSqlArgs) {
1263 auto [errCode, statement] = GetStatement(sql, conn);
1264 if (statement == nullptr) {
1265 LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1266 "app self can check the SQL", errCode, bindArgs.size(), SqliteUtils::Anonymous(table).c_str());
1267 return { E_OK, -1 };
1268 }
1269 for (const auto &args : bindArgs) {
1270 auto errCode = statement->Execute(args);
1271 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1272 pool->Dump(true, "BATCH");
1273 return { errCode, -1 };
1274 }
1275 if (errCode != E_OK) {
1276 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,app self can check the SQL",
1277 errCode, bindArgs.size(), SqliteUtils::Anonymous(table).c_str());
1278 return { E_OK, -1 };
1279 }
1280 }
1281 }
1282 conn = nullptr;
1283 DoCloudSync(table);
1284 return { E_OK, int64_t(rows.RowSize()) };
1285 }
1286
BatchInsertArgsDfx(int argsSize)1287 void RdbStoreImpl::BatchInsertArgsDfx(int argsSize)
1288 {
1289 if (argsSize > 1) {
1290 Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_BATCH_INSERT_ARGS_SIZE, config_.GetBundleName(),
1291 "BatchInsert executeSqlArgs size[ " + std::to_string(argsSize) + "]"));
1292 }
1293 }
1294
BatchInsert(const std::string & table,const RefRows & rows,const std::vector<std::string> & returningFields,Resolution resolution)1295 std::pair<int32_t, Results> RdbStoreImpl::BatchInsert(const std::string &table, const RefRows &rows,
1296 const std::vector<std::string> &returningFields, Resolution resolution)
1297 {
1298 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1299 return { E_NOT_SUPPORT, -1 };
1300 }
1301
1302 if (rows.RowSize() == 0) {
1303 return { E_OK, 0 };
1304 }
1305
1306 RdbStatReporter reportStat(RDB_PERF, BATCHINSERT, config_, reportFunc_);
1307 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1308 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL, 0, rows.RowSize());
1309 auto pool = GetPool();
1310 if (pool == nullptr) {
1311 return { E_ALREADY_CLOSED, -1 };
1312 }
1313 auto conn = pool->AcquireConnection(false);
1314 if (conn == nullptr) {
1315 return { E_DATABASE_BUSY, -1 };
1316 }
1317
1318 auto sqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable(), resolution);
1319 // To ensure atomicity, execute SQL only once
1320 if (sqlArgs.size() != 1 || sqlArgs.front().second.size() != 1) {
1321 auto [fields, values] = rows.GetFieldsAndValues();
1322 LOG_ERROR("invalid! rows:%{public}zu, table:%{public}s, fields:%{public}zu, max:%{public}d.", rows.RowSize(),
1323 SqliteUtils::Anonymous(table).c_str(), fields != nullptr ? fields->size() : 0, conn->GetMaxVariable());
1324 return { E_INVALID_ARGS, -1 };
1325 }
1326 auto &[sql, bindArgs] = sqlArgs.front();
1327 SqliteSqlBuilder::AppendReturning(sql, returningFields);
1328 auto [errCode, statement] = GetStatement(sql, conn);
1329 if (statement == nullptr) {
1330 LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1331 "app self can check the SQL", errCode, bindArgs.size(), SqliteUtils::Anonymous(table).c_str());
1332 return { errCode, -1 };
1333 }
1334 PauseDelayNotify pauseDelayNotify(delayNotifier_);
1335 errCode = statement->Execute(std::ref(bindArgs.front()));
1336 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1337 pool->Dump(true, "BATCH");
1338 return { errCode, -1 };
1339 }
1340 if (errCode != E_OK) {
1341 LOG_ERROR("failed,errCode:%{public}d,table:%{public}s,args:%{public}zu,resolution:%{public}d.", errCode,
1342 SqliteUtils::Anonymous(table).c_str(), bindArgs.front().size(), static_cast<int32_t>(resolution));
1343 }
1344 auto result = GenerateResult(errCode, statement);
1345 if (result.changed > 0) {
1346 DoCloudSync(table);
1347 }
1348 return { errCode, result };
1349 }
1350
Update(const Row & row,const AbsRdbPredicates & predicates,const std::vector<std::string> & returningFields,Resolution resolution)1351 std::pair<int32_t, Results> RdbStoreImpl::Update(const Row &row, const AbsRdbPredicates &predicates,
1352 const std::vector<std::string> &returningFields, Resolution resolution)
1353 {
1354 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1355 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1356 return { E_NOT_SUPPORT, -1 };
1357 }
1358 RdbStatReporter reportStat(RDB_PERF, UPDATE, config_, reportFunc_);
1359 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1360 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1361 auto [status, sqlInfo] = RdbSqlUtils::GetUpdateSqlInfo(predicates, row, resolution, returningFields);
1362 if (status != E_OK) {
1363 return { status, -1 };
1364 }
1365 auto [code, result] = ExecuteForRow(sqlInfo.sql, sqlInfo.args);
1366 if (result.changed > 0) {
1367 DoCloudSync(predicates.GetTableName());
1368 }
1369 return { code, result };
1370 }
1371
Delete(const AbsRdbPredicates & predicates,const std::vector<std::string> & returningFields)1372 std::pair<int32_t, Results> RdbStoreImpl::Delete(
1373 const AbsRdbPredicates &predicates, const std::vector<std::string> &returningFields)
1374 {
1375 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1376 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1377 return { E_NOT_SUPPORT, -1 };
1378 }
1379 RdbStatReporter reportStat(RDB_PERF, DELETE, config_, reportFunc_);
1380 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1381 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1382 auto [status, sqlInfo] = RdbSqlUtils::GetDeleteSqlInfo(predicates, returningFields);
1383 if (status != E_OK) {
1384 return { status, -1 };
1385 }
1386
1387 auto [code, result] = ExecuteForRow(sqlInfo.sql, predicates.GetBindArgs());
1388 if (result.changed > 0) {
1389 DoCloudSync(predicates.GetTableName());
1390 }
1391 return { code, result };
1392 }
1393
QuerySql(const std::string & sql,const Values & bindArgs)1394 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1395 {
1396 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1397 if (config_.GetDBType() == DB_VECTOR) {
1398 return nullptr;
1399 }
1400 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1401 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1402 #if !defined(CROSS_PLATFORM)
1403 auto start = std::chrono::steady_clock::now();
1404 auto pool = GetPool();
1405 if (pool == nullptr) {
1406 LOG_ERROR("Database already closed.");
1407 return nullptr;
1408 }
1409 return std::make_shared<SqliteSharedResultSet>(start, pool->AcquireRef(true), sql, bindArgs, path_);
1410 #else
1411 (void)sql;
1412 (void)bindArgs;
1413 return nullptr;
1414 #endif
1415 }
1416
QueryByStep(const std::string & sql,const Values & args,bool preCount)1417 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args, bool preCount)
1418 {
1419 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1420 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1421 auto start = std::chrono::steady_clock::now();
1422 auto pool = GetPool();
1423 if (pool == nullptr) {
1424 LOG_ERROR("Database already closed.");
1425 return nullptr;
1426 }
1427 #if !defined(CROSS_PLATFORM)
1428 return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, preCount);
1429 #else
1430 return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, false);
1431 #endif
1432 }
1433
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1434 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1435 {
1436 if (config_.GetDBType() == DB_VECTOR) {
1437 return E_NOT_SUPPORT;
1438 }
1439 std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1440 return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1441 }
1442
WriteToCompareFile(const std::string & dbPath,const std::string & bundleName,const std::string & sql)1443 void WriteToCompareFile(const std::string &dbPath, const std::string &bundleName, const std::string &sql)
1444 {
1445 auto poolTask = TaskExecutor::GetInstance().GetExecutor();
1446 if (poolTask != nullptr) {
1447 poolTask->Execute([dbPath, bundleName, sql]() {
1448 std::string comparePath = dbPath + "-compare";
1449 if (SqliteUtils::CleanFileContent(comparePath)) {
1450 Reportor::ReportFault(
1451 RdbFaultEvent(FT_CURD, E_DFX_IS_NOT_EXIST, bundleName, "compare file is deleted"));
1452 }
1453 SqliteUtils::WriteSqlToFile(comparePath, sql);
1454 });
1455 }
1456 }
1457
HandleSchemaDDL(std::shared_ptr<Statement> && statement,const std::string & sql)1458 int32_t RdbStoreImpl::HandleSchemaDDL(std::shared_ptr<Statement> &&statement, const std::string &sql)
1459 {
1460 statement->Reset();
1461 Suspender suspender(Suspender::SQL_STATISTIC);
1462 statement->Prepare("PRAGMA schema_version");
1463 auto [err, version] = statement->ExecuteForValue();
1464 statement = nullptr;
1465 if (vSchema_ < static_cast<int64_t>(version)) {
1466 LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 ">",
1467 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version));
1468 vSchema_ = version;
1469 if (!isMemoryRdb_) {
1470 std::string dbPath = config_.GetPath();
1471 std::string bundleName = config_.GetBundleName();
1472 WriteToCompareFile(dbPath, bundleName, sql);
1473 }
1474 statement = nullptr;
1475 if (config_.GetEnableSemanticIndex() && !isKnowledgeSchemaReady_) {
1476 SetKnowledgeSchema();
1477 }
1478 auto pool = GetPool();
1479 if (pool == nullptr) {
1480 return E_ALREADY_CLOSED;
1481 }
1482 return pool->RestartConns();
1483 }
1484 return E_OK;
1485 }
1486
ExecuteSql(const std::string & sql,const Values & args)1487 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1488 {
1489 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1490 if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1491 return E_NOT_SUPPORT;
1492 }
1493 int ret = CheckAttach(sql);
1494 if (ret != E_OK) {
1495 return ret;
1496 }
1497 RdbStatReporter reportStat(RDB_PERF, EXECUTESQL, config_, reportFunc_);
1498 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1499 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1500 auto [errCode, statement] = BeginExecuteSql(sql);
1501 if (statement == nullptr) {
1502 return errCode;
1503 }
1504 errCode = statement->Execute(args);
1505 if (errCode != E_OK) {
1506 LOG_ERROR("failed,error:0x%{public}x app self can check the SQL.", errCode);
1507 TryDump(errCode, "EXECUTE");
1508 return errCode;
1509 }
1510 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1511 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1512 HandleSchemaDDL(std::move(statement), sql);
1513 }
1514 statement = nullptr;
1515 if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1516 DoCloudSync("");
1517 }
1518 return errCode;
1519 }
1520
Execute(const std::string & sql,const Values & args,int64_t trxId)1521 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1522 {
1523 ValueObject object;
1524 if (isReadOnly_) {
1525 return { E_NOT_SUPPORT, object };
1526 }
1527
1528 RdbStatReporter reportStat(RDB_PERF, EXECUTE, config_, reportFunc_);
1529 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1530 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
1531 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1532 if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1533 LOG_ERROR("Not support the sqlType: %{public}d, app self can check the SQL", sqlType);
1534 return { E_NOT_SUPPORT_THE_SQL, object };
1535 }
1536
1537 if (config_.IsVector() && trxId > 0) {
1538 return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1539 }
1540
1541 auto [errCode, statement] = GetStatement(sql, false);
1542 if (errCode != E_OK || statement == nullptr) {
1543 return { errCode != E_OK ? errCode : E_ERROR, object };
1544 }
1545
1546 errCode = statement->Execute(args);
1547 TryDump(errCode, "EXECUTE");
1548 if (config_.IsVector()) {
1549 return { errCode, object };
1550 }
1551
1552 return HandleDifferentSqlTypes(std::move(statement), sql, errCode, sqlType);
1553 }
1554
HandleDifferentSqlTypes(std::shared_ptr<Statement> && statement,const std::string & sql,int32_t code,int sqlType)1555 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(
1556 std::shared_ptr<Statement> &&statement, const std::string &sql, int32_t code, int sqlType)
1557 {
1558 if (code != E_OK) {
1559 return { code, ValueObject() };
1560 }
1561 if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1562 int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1563 return { code, ValueObject(outValue) };
1564 }
1565
1566 if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1567 int outValue = statement->Changes();
1568 return { code, ValueObject(outValue) };
1569 }
1570
1571 if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1572 if (statement->GetColumnCount() == 1) {
1573 return statement->GetColumn(0);
1574 }
1575
1576 if (statement->GetColumnCount() > 1) {
1577 LOG_ERROR("Not support the sql:app self can check the SQL, column count more than 1");
1578 return { E_NOT_SUPPORT_THE_SQL, ValueObject() };
1579 }
1580 }
1581
1582 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1583 HandleSchemaDDL(std::move(statement), sql);
1584 }
1585 return { code, ValueObject() };
1586 }
1587
ExecuteExt(const std::string & sql,const Values & args)1588 std::pair<int32_t, Results> RdbStoreImpl::ExecuteExt(const std::string &sql, const Values &args)
1589 {
1590 if (isReadOnly_ || config_.IsVector()) {
1591 return { E_NOT_SUPPORT, -1 };
1592 }
1593
1594 RdbStatReporter reportStat(RDB_PERF, EXECUTE, config_, reportFunc_);
1595 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1596 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1597 if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1598 LOG_ERROR("Not support the sqlType: %{public}d, app self can check the SQL", sqlType);
1599 return { E_NOT_SUPPORT_THE_SQL, -1 };
1600 }
1601 auto [errCode, statement] = GetStatement(sql, false);
1602 if (errCode != E_OK) {
1603 return { errCode, -1 };
1604 }
1605 errCode = statement->Execute(args);
1606 TryDump(errCode, "ExecuteExt");
1607 return HandleResults(std::move(statement), sql, errCode, sqlType);
1608 }
1609
HandleResults(std::shared_ptr<Statement> && statement,const std::string & sql,int32_t code,int sqlType)1610 std::pair<int32_t, Results> RdbStoreImpl::HandleResults(
1611 std::shared_ptr<Statement> &&statement, const std::string &sql, int32_t code, int sqlType)
1612 {
1613 Results result = GenerateResult(
1614 code, statement, sqlType == SqliteUtils::STATEMENT_INSERT || sqlType == SqliteUtils::STATEMENT_UPDATE);
1615 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1616 HandleSchemaDDL(std::move(statement), sql);
1617 }
1618 return { code, result };
1619 }
1620
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1621 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1622 {
1623 if (config_.GetDBType() == DB_VECTOR) {
1624 return E_NOT_SUPPORT;
1625 }
1626 auto [errCode, statement] = BeginExecuteSql(sql);
1627 if (statement == nullptr) {
1628 return errCode;
1629 }
1630 auto [err, object] = statement->ExecuteForValue(args);
1631 if (err != E_OK) {
1632 LOG_ERROR("failed, app self can check the SQL, ERROR is %{public}d.", err);
1633 }
1634 outValue = object;
1635 return err;
1636 }
1637
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1638 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1639 {
1640 if (config_.GetDBType() == DB_VECTOR) {
1641 return E_NOT_SUPPORT;
1642 }
1643 auto [errCode, statement] = BeginExecuteSql(sql);
1644 if (statement == nullptr) {
1645 return errCode;
1646 }
1647 ValueObject object;
1648 std::tie(errCode, object) = statement->ExecuteForValue(args);
1649 if (errCode != E_OK) {
1650 LOG_ERROR("failed, app self can check the SQL, ERROR is %{public}d.", errCode);
1651 }
1652 outValue = static_cast<std::string>(object);
1653 return errCode;
1654 }
1655
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1656 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1657 {
1658 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1659 return E_NOT_SUPPORT;
1660 }
1661 auto begin = std::chrono::steady_clock::now();
1662 auto [errCode, statement] = GetStatement(sql, false);
1663 if (statement == nullptr) {
1664 return errCode;
1665 }
1666 auto beginExec = std::chrono::steady_clock::now();
1667 errCode = statement->Execute(args);
1668 if (errCode != E_OK) {
1669 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1670 auto pool = GetPool();
1671 if (pool != nullptr) {
1672 pool->Dump(true, "INSERT");
1673 }
1674 }
1675 return errCode;
1676 }
1677 auto beginResult = std::chrono::steady_clock::now();
1678 outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1679 auto allEnd = std::chrono::steady_clock::now();
1680 int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - begin).count();
1681 if (totalCostTime >= TIME_OUT) {
1682 int64_t prepareCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1683 int64_t execCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginResult - beginExec).count();
1684 int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1685 LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1686 "] result[%{public}" PRId64 "] "
1687 "sql[%{public}s]",
1688 totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::SqlAnonymous(sql).c_str());
1689 }
1690 return E_OK;
1691 }
1692
ExecuteForRow(const std::string & sql,const Values & args)1693 std::pair<int32_t, Results> RdbStoreImpl::ExecuteForRow(const std::string &sql, const Values &args)
1694 {
1695 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1696 return { E_NOT_SUPPORT, -1 };
1697 }
1698 auto [errCode, statement] = GetStatement(sql, false);
1699 if (statement == nullptr) {
1700 return { errCode, -1 };
1701 }
1702 errCode = statement->Execute(args);
1703 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1704 auto pool = GetPool();
1705 if (pool != nullptr) {
1706 pool->Dump(true, "UPG DEL");
1707 }
1708 }
1709 return { errCode, GenerateResult(errCode, statement) };
1710 }
1711
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1712 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1713 {
1714 auto [code, result] = ExecuteForRow(sql, args);
1715 outValue = result.changed;
1716 return code;
1717 }
1718
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1719 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1720 {
1721 if (databasePath.empty()) {
1722 return E_INVALID_FILE_PATH;
1723 }
1724
1725 if (ISFILE(databasePath)) {
1726 backupFilePath = ExtractFilePath(path_) + databasePath;
1727 } else {
1728 // 2 represents two characters starting from the len - 2 position
1729 if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1730 databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1731 LOG_ERROR("Invalid databasePath.");
1732 return E_INVALID_FILE_PATH;
1733 }
1734 backupFilePath = databasePath;
1735 }
1736
1737 if (backupFilePath == path_) {
1738 LOG_ERROR("The backupPath and path should not be same.");
1739 return E_INVALID_FILE_PATH;
1740 }
1741
1742 LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1743 return E_OK;
1744 }
1745
GetSlaveName(const std::string & path,std::string & backupFilePath)1746 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1747 {
1748 std::string suffix(".db");
1749 std::string slaveSuffix("_slave.db");
1750 auto pos = path.find(suffix);
1751 if (pos == std::string::npos) {
1752 backupFilePath = path + slaveSuffix;
1753 } else {
1754 backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1755 }
1756 return E_OK;
1757 }
1758
1759 /**
1760 * Backup a database from a specified encrypted or unencrypted database file.
1761 * Support skipping verification.
1762 */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey,bool verifyDb)1763 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey, bool verifyDb)
1764 {
1765 LOG_INFO("Backup db: %{public}s, verify: %{public}d.", SqliteUtils::Anonymous(config_.GetName()).c_str(), verifyDb);
1766 if (isReadOnly_ || isMemoryRdb_) {
1767 return E_NOT_SUPPORT;
1768 }
1769 std::string backupFilePath;
1770 if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1771 return InnerBackup(backupFilePath, encryptKey, verifyDb);
1772 }
1773
1774 int ret = GetDataBasePath(databasePath, backupFilePath);
1775 if (ret != E_OK) {
1776 return ret;
1777 }
1778
1779 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1780 keyFiles.Lock();
1781
1782 auto walFile = backupFilePath + "-wal";
1783 if (access(walFile.c_str(), F_OK) == E_OK) {
1784 if (!SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1785 keyFiles.Unlock();
1786 return E_ERROR;
1787 }
1788 }
1789 std::string tempPath = backupFilePath + ".tmp";
1790 if (access(tempPath.c_str(), F_OK) == E_OK) {
1791 SqliteUtils::DeleteFile(backupFilePath);
1792 } else {
1793 if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1794 LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1795 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1796 keyFiles.Unlock();
1797 return E_ERROR;
1798 }
1799 }
1800 ret = InnerBackup(backupFilePath, encryptKey);
1801 if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1802 if (ret == E_DB_NOT_EXIST) {
1803 Reportor::ReportCorrupted(Reportor::Create(config_, ret, "ErrorType: BackupFailed"));
1804 }
1805 if (SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1806 SqliteUtils::RenameFile(tempPath, backupFilePath);
1807 }
1808 } else {
1809 SqliteUtils::DeleteFile(tempPath);
1810 }
1811 keyFiles.Unlock();
1812 return ret;
1813 }
1814
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1815 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(
1816 const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1817 {
1818 std::vector<ValueObject> bindArgs;
1819 bindArgs.emplace_back(databasePath);
1820 if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1821 bindArgs.emplace_back(destEncryptKey);
1822 } else if (config_.IsEncrypt()) {
1823 std::vector<uint8_t> key = config_.GetEncryptKey();
1824 bindArgs.emplace_back(key);
1825 key.assign(key.size(), 0);
1826 } else {
1827 bindArgs.emplace_back("");
1828 }
1829 return bindArgs;
1830 }
1831
1832 /**
1833 * Backup a database from a specified encrypted or unencrypted database file.
1834 */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey,bool verifyDb)1835 int RdbStoreImpl::InnerBackup(
1836 const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey, bool verifyDb)
1837 {
1838 if (isReadOnly_) {
1839 return E_NOT_SUPPORT;
1840 }
1841
1842 if (config_.GetDBType() == DB_VECTOR) {
1843 auto [errCode, conn] = GetConn(false);
1844 return errCode != E_OK ? errCode : conn->Backup(databasePath, destEncryptKey, false, slaveStatus_);
1845 }
1846
1847 if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1848 auto [errCode, conn] = GetConn(false);
1849 return errCode != E_OK ? errCode : conn->Backup(databasePath, {}, false, slaveStatus_, verifyDb);
1850 }
1851 Suspender suspender(Suspender::SQL_LOG);
1852 auto config = config_;
1853 config.SetHaMode(HAMode::SINGLE);
1854 config.SetCreateNecessary(false);
1855 auto [result, conn] = CreateWritableConn(config);
1856 if (result != E_OK) {
1857 return result;
1858 }
1859
1860 if (config_.IsEncrypt()) {
1861 result = SetDefaultEncryptAlgo(conn, config_);
1862 if (result != E_OK) {
1863 return result;
1864 }
1865 }
1866
1867 std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1868 auto [errCode, statement] = GetStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1869 errCode = statement->Execute(bindArgs);
1870 if (errCode != E_OK) {
1871 return errCode;
1872 }
1873 errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1874 errCode = statement->Execute();
1875 if (errCode != E_OK) {
1876 return errCode;
1877 }
1878 errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1879 int ret = statement->Execute();
1880 errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1881 int res = statement->Execute();
1882 return (res == E_OK) ? ret : res;
1883 }
1884
BeginExecuteSql(const std::string & sql)1885 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string &sql)
1886 {
1887 int type = SqliteUtils::GetSqlStatementType(sql);
1888 if (SqliteUtils::IsSpecial(type)) {
1889 return { E_NOT_SUPPORT, nullptr };
1890 }
1891
1892 bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1893 auto pool = GetPool();
1894 if (pool == nullptr) {
1895 return { E_ALREADY_CLOSED, nullptr };
1896 }
1897 auto conn = pool->AcquireConnection(assumeReadOnly);
1898 if (conn == nullptr) {
1899 return { E_DATABASE_BUSY, nullptr };
1900 }
1901
1902 auto [errCode, statement] = GetStatement(sql, conn);
1903 if (statement == nullptr) {
1904 return { errCode, nullptr };
1905 }
1906
1907 if (statement->ReadOnly() && conn->IsWriter()) {
1908 statement = nullptr;
1909 conn = nullptr;
1910 return GetStatement(sql, true);
1911 }
1912
1913 return { errCode, statement };
1914 }
1915
IsHoldingConnection()1916 bool RdbStoreImpl::IsHoldingConnection()
1917 {
1918 return GetPool() != nullptr;
1919 }
1920
SetDefaultEncryptSql(const std::shared_ptr<Statement> & statement,std::string sql,const RdbStoreConfig & config)1921 int RdbStoreImpl::SetDefaultEncryptSql(
1922 const std::shared_ptr<Statement> &statement, std::string sql, const RdbStoreConfig &config)
1923 {
1924 auto errCode = statement->Prepare(sql);
1925 if (errCode != E_OK) {
1926 LOG_ERROR("Prepare failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1927 SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1928 config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1929 config.GetCryptoParam().cryptoPageSize);
1930 return errCode;
1931 }
1932 errCode = statement->Execute();
1933 if (errCode != E_OK) {
1934 LOG_ERROR("Execute failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1935 SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1936 config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1937 config.GetCryptoParam().cryptoPageSize);
1938 return errCode;
1939 }
1940 return E_OK;
1941 }
1942
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1943 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1944 {
1945 if (conn == nullptr) {
1946 return E_DATABASE_BUSY;
1947 }
1948
1949 if (!config.GetCryptoParam().IsValid()) {
1950 LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config.GetName()).c_str());
1951 return E_INVALID_ARGS;
1952 }
1953
1954 std::string sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_CIPHER_PREFIX) +
1955 SqliteUtils::EncryptAlgoDescription(config.GetEncryptAlgo()) +
1956 std::string(GlobalExpr::ALGO_SUFFIX);
1957 Suspender suspender(Suspender::SQL_LOG);
1958 auto [errCode, statement] = GetStatement(sql, conn);
1959 errCode = SetDefaultEncryptSql(statement, sql, config);
1960 if (errCode != E_OK) {
1961 return errCode;
1962 }
1963
1964 if (config.GetIter() > 0) {
1965 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ITER_PREFIX) + std::to_string(config.GetIter());
1966 errCode = SetDefaultEncryptSql(statement, sql, config);
1967 if (errCode != E_OK) {
1968 return errCode;
1969 }
1970 }
1971
1972 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO_PREFIX) +
1973 SqliteUtils::HmacAlgoDescription(config.GetCryptoParam().hmacAlgo) + std::string(GlobalExpr::ALGO_SUFFIX);
1974 errCode = SetDefaultEncryptSql(statement, sql, config);
1975 if (errCode != E_OK) {
1976 return errCode;
1977 }
1978
1979 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ALGO_PREFIX) +
1980 SqliteUtils::KdfAlgoDescription(config.GetCryptoParam().kdfAlgo) +
1981 std::string(GlobalExpr::ALGO_SUFFIX);
1982 errCode = SetDefaultEncryptSql(statement, sql, config);
1983 if (errCode != E_OK) {
1984 return errCode;
1985 }
1986
1987 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_PAGE_SIZE_PREFIX) +
1988 std::to_string(config.GetCryptoParam().cryptoPageSize);
1989 return SetDefaultEncryptSql(statement, sql, config);
1990 }
1991
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1992 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1993 const std::vector<uint8_t> &key, int32_t waitTime)
1994 {
1995 auto pool = GetPool();
1996 if (pool == nullptr) {
1997 return E_ALREADY_CLOSED;
1998 }
1999 Suspender suspender(Suspender::SQL_LOG);
2000 auto [conn, readers] = pool->AcquireAll(waitTime);
2001 if (conn == nullptr) {
2002 return E_DATABASE_BUSY;
2003 }
2004
2005 if (!isMemoryRdb_ && conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
2006 // close first to prevent the connection from being put back.
2007 pool->CloseAllConnections();
2008 conn = nullptr;
2009 readers.clear();
2010 auto [err, newConn] = pool->DisableWal();
2011 if (err != E_OK) {
2012 return err;
2013 }
2014 conn = newConn;
2015 }
2016 std::vector<ValueObject> bindArgs;
2017 bindArgs.emplace_back(ValueObject(dbPath));
2018 bindArgs.emplace_back(ValueObject(attachName));
2019 if (!key.empty()) {
2020 auto ret = SetDefaultEncryptAlgo(conn, config);
2021 if (ret != E_OK) {
2022 return ret;
2023 }
2024 bindArgs.emplace_back(ValueObject(key));
2025 auto [errCode, statement] = GetStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
2026 if (statement == nullptr || errCode != E_OK) {
2027 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
2028 return E_ERROR;
2029 }
2030 return statement->Execute(bindArgs);
2031 }
2032
2033 auto [errCode, statement] = GetStatement(GlobalExpr::ATTACH_SQL, conn);
2034 if (statement == nullptr || errCode != E_OK) {
2035 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
2036 return errCode;
2037 }
2038 return statement->Execute(bindArgs);
2039 }
2040
2041 /**
2042 * Attaches a database.
2043 */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)2044 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
2045 const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
2046 {
2047 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE ||
2048 (config.IsMemoryRdb() && config.IsEncrypt())) {
2049 return { E_NOT_SUPPORT, 0 };
2050 }
2051 std::string dbPath;
2052 int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
2053 if (err != E_OK || (access(dbPath.c_str(), F_OK) != E_OK && !config.IsMemoryRdb())) {
2054 return { E_INVALID_FILE_PATH, 0 };
2055 }
2056
2057 // encrypted databases are not supported to attach a non encrypted database.
2058 if (!config.IsEncrypt() && config_.IsEncrypt()) {
2059 return { E_NOT_SUPPORT, 0 };
2060 }
2061
2062 if (attachedInfo_.Contains(attachName)) {
2063 return { E_ATTACHED_DATABASE_EXIST, 0 };
2064 }
2065
2066 std::vector<uint8_t> key;
2067 config.Initialize();
2068 if (config.IsEncrypt()) {
2069 key = config.GetEncryptKey();
2070 }
2071 err = AttachInner(config, attachName, dbPath, key, waitTime);
2072 key.assign(key.size(), 0);
2073 if (err == E_SQLITE_ERROR) {
2074 // only when attachName is already in use, SQLITE-ERROR will be reported here.
2075 return { E_ATTACHED_DATABASE_EXIST, 0 };
2076 } else if (err != E_OK) {
2077 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
2078 "[%{public}s]",
2079 err, SqliteUtils::Anonymous(config_.GetName()).c_str(), SqliteUtils::Anonymous(attachName).c_str(),
2080 SqliteUtils::Anonymous(config.GetName()).c_str());
2081 return { err, 0 };
2082 }
2083 if (!attachedInfo_.Insert(attachName, dbPath)) {
2084 return { E_ATTACHED_DATABASE_EXIST, 0 };
2085 }
2086 return { E_OK, attachedInfo_.Size() };
2087 }
2088
Detach(const std::string & attachName,int32_t waitTime)2089 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
2090 {
2091 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2092 return { E_NOT_SUPPORT, 0 };
2093 }
2094 if (!attachedInfo_.Contains(attachName)) {
2095 return { E_OK, attachedInfo_.Size() };
2096 }
2097 Suspender suspender(Suspender::SQL_LOG);
2098 auto pool = GetPool();
2099 if (pool == nullptr) {
2100 return { E_ALREADY_CLOSED, 0 };
2101 }
2102 auto [connection, readers] = pool->AcquireAll(waitTime);
2103 if (connection == nullptr) {
2104 return { E_DATABASE_BUSY, 0 };
2105 }
2106 std::vector<ValueObject> bindArgs;
2107 bindArgs.push_back(ValueObject(attachName));
2108 auto [errCode, statement] = GetStatement(GlobalExpr::DETACH_SQL, connection);
2109 if (statement == nullptr || errCode != E_OK) {
2110 LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
2111 return { errCode, 0 };
2112 }
2113 errCode = statement->Execute(bindArgs);
2114 if (errCode != E_OK) {
2115 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
2116 SqliteUtils::Anonymous(config_.GetName()).c_str(), SqliteUtils::Anonymous(attachName).c_str());
2117 return { errCode, 0 };
2118 }
2119 attachedInfo_.Erase(attachName);
2120 if (!attachedInfo_.Empty()) {
2121 return { E_OK, attachedInfo_.Size() };
2122 }
2123 statement = nullptr;
2124 if (!isMemoryRdb_ && connection->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
2125 // close first to prevent the connection from being put back.
2126 pool->CloseAllConnections();
2127 connection = nullptr;
2128 readers.clear();
2129 errCode = pool->EnableWal();
2130 }
2131 return { errCode, 0 };
2132 }
2133
2134 /**
2135 * Obtains the database version.
2136 */
GetVersion(int & version)2137 int RdbStoreImpl::GetVersion(int &version)
2138 {
2139 Suspender suspender(Suspender::SQL_LOG);
2140 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
2141 if (statement == nullptr) {
2142 return errCode;
2143 }
2144 ValueObject value;
2145 std::tie(errCode, value) = statement->ExecuteForValue();
2146 auto val = std::get_if<int64_t>(&value.value);
2147 if (val != nullptr) {
2148 version = static_cast<int>(*val);
2149 }
2150 return errCode;
2151 }
2152
2153 /**
2154 * Sets the version of a new database.
2155 */
SetVersion(int version)2156 int RdbStoreImpl::SetVersion(int version)
2157 {
2158 if (isReadOnly_) {
2159 return E_NOT_SUPPORT;
2160 }
2161 Suspender suspender(Suspender::SQL_LOG);
2162 std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
2163 auto [errCode, statement] = GetStatement(sql);
2164 if (statement == nullptr) {
2165 return errCode;
2166 }
2167 return statement->Execute();
2168 }
2169 /**
2170 * Begins a transaction in EXCLUSIVE mode.
2171 */
BeginTransaction()2172 int RdbStoreImpl::BeginTransaction()
2173 {
2174 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2175 auto pool = GetPool();
2176 if (pool == nullptr) {
2177 return E_ALREADY_CLOSED;
2178 }
2179 std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2180 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2181 return E_NOT_SUPPORT;
2182 }
2183 // size + 1 means the number of transactions in process
2184 RdbStatReporter reportStat(RDB_PERF, BEGINTRANSACTION, config_, reportFunc_);
2185 size_t transactionId = pool->GetTransactionStack().size() + 1;
2186 BaseTransaction transaction(pool->GetTransactionStack().size());
2187 auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
2188 if (statement == nullptr) {
2189 return errCode;
2190 }
2191 errCode = statement->Execute();
2192 if (errCode != E_OK) {
2193 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2194 pool->Dump(true, "BEGIN");
2195 }
2196 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2197 SqliteUtils::Anonymous(name_).c_str(), errCode);
2198 return errCode;
2199 }
2200 pool->SetInTransaction(true);
2201 pool->GetTransactionStack().push(transaction);
2202 // 1 means the number of transactions in process
2203 if (transactionId > 1) {
2204 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2205 SqliteUtils::Anonymous(name_).c_str(), errCode);
2206 }
2207
2208 return E_OK;
2209 }
2210
BeginTrans()2211 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
2212 {
2213 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2214 if (!config_.IsVector() || isReadOnly_) {
2215 return { E_NOT_SUPPORT, 0 };
2216 }
2217 int64_t tmpTrxId = 0;
2218 auto pool = GetPool();
2219 if (pool == nullptr) {
2220 return { E_ALREADY_CLOSED, 0 };
2221 }
2222 auto [errCode, connection] = pool->CreateTransConn(false);
2223 if (connection == nullptr) {
2224 LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
2225 SqliteUtils::Anonymous(name_).c_str(), errCode);
2226 return { errCode, 0 };
2227 }
2228
2229 tmpTrxId = newTrxId_.fetch_add(1);
2230 trxConnMap_.Insert(tmpTrxId, connection);
2231 errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
2232 if (errCode != E_OK) {
2233 trxConnMap_.Erase(tmpTrxId);
2234 }
2235 return { errCode, tmpTrxId };
2236 }
2237
2238 /**
2239 * Begins a transaction in EXCLUSIVE mode.
2240 */
RollBack()2241 int RdbStoreImpl::RollBack()
2242 {
2243 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2244 auto pool = GetPool();
2245 if (pool == nullptr) {
2246 return E_ALREADY_CLOSED;
2247 }
2248 std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2249 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2250 return E_NOT_SUPPORT;
2251 }
2252 RdbStatReporter reportStat(RDB_PERF, ROLLBACK, config_, reportFunc_);
2253 size_t transactionId = pool->GetTransactionStack().size();
2254
2255 if (pool->GetTransactionStack().empty()) {
2256 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
2257 SqliteUtils::Anonymous(name_).c_str());
2258 return E_NO_TRANSACTION_IN_SESSION;
2259 }
2260 BaseTransaction transaction = pool->GetTransactionStack().top();
2261 pool->GetTransactionStack().pop();
2262 if (transaction.GetType() != TransType::ROLLBACK_SELF && !pool->GetTransactionStack().empty()) {
2263 pool->GetTransactionStack().top().SetChildFailure(true);
2264 }
2265 auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
2266 if (statement == nullptr) {
2267 if (errCode == E_DATABASE_BUSY) {
2268 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2269 }
2270 // size + 1 means the number of transactions in process
2271 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
2272 SqliteUtils::Anonymous(name_).c_str());
2273 return E_DATABASE_BUSY;
2274 }
2275 errCode = statement->Execute();
2276 if (errCode != E_OK) {
2277 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2278 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2279 }
2280 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2281 SqliteUtils::Anonymous(name_).c_str(), errCode);
2282 return errCode;
2283 }
2284 if (pool->GetTransactionStack().empty()) {
2285 pool->SetInTransaction(false);
2286 }
2287 // 1 means the number of transactions in process
2288 if (transactionId > 1) {
2289 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2290 SqliteUtils::Anonymous(name_).c_str(), errCode);
2291 }
2292 return E_OK;
2293 }
2294
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)2295 int RdbStoreImpl::ExecuteByTrxId(
2296 const std::string &sql, int64_t trxId, bool closeConnAfterExecute, const std::vector<ValueObject> &bindArgs)
2297 {
2298 if ((!config_.IsVector()) || isReadOnly_) {
2299 return E_NOT_SUPPORT;
2300 }
2301 if (trxId == 0) {
2302 return E_INVALID_ARGS;
2303 }
2304
2305 if (!trxConnMap_.Contains(trxId)) {
2306 LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
2307 return E_INVALID_ARGS;
2308 }
2309 auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
2310 auto result = trxConnMap_.Find(trxId);
2311 auto connection = result.second;
2312 if (connection == nullptr) {
2313 LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
2314 SqliteUtils::Anonymous(name_).c_str(), time);
2315 return E_ERROR;
2316 }
2317 auto [ret, statement] = GetStatement(sql, connection);
2318 if (ret != E_OK) {
2319 return ret;
2320 }
2321 ret = statement->Execute(bindArgs);
2322 if (ret != E_OK) {
2323 LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
2324 SqliteUtils::Anonymous(name_).c_str(), ret);
2325 return ret;
2326 }
2327 if (closeConnAfterExecute) {
2328 trxConnMap_.Erase(trxId);
2329 }
2330 return E_OK;
2331 }
2332
RollBack(int64_t trxId)2333 int RdbStoreImpl::RollBack(int64_t trxId)
2334 {
2335 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2336 return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
2337 }
2338
2339 /**
2340 * Begins a transaction in EXCLUSIVE mode.
2341 */
Commit()2342 int RdbStoreImpl::Commit()
2343 {
2344 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2345 auto pool = GetPool();
2346 if (pool == nullptr) {
2347 return E_ALREADY_CLOSED;
2348 }
2349 std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2350 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2351 return E_NOT_SUPPORT;
2352 }
2353 RdbStatReporter reportStat(RDB_PERF, COMMIT, config_, reportFunc_);
2354 size_t transactionId = pool->GetTransactionStack().size();
2355
2356 if (pool->GetTransactionStack().empty()) {
2357 return E_OK;
2358 }
2359 BaseTransaction transaction = pool->GetTransactionStack().top();
2360 std::string sqlStr = transaction.GetCommitStr();
2361 if (sqlStr.size() <= 1) {
2362 LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s", transactionId,
2363 SqliteUtils::Anonymous(name_).c_str(),
2364 SqliteUtils::SqlAnonymous(sqlStr).c_str());
2365 pool->GetTransactionStack().pop();
2366 return E_OK;
2367 }
2368 auto [errCode, statement] = GetStatement(sqlStr);
2369 if (statement == nullptr) {
2370 if (errCode == E_DATABASE_BUSY) {
2371 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2372 }
2373 LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
2374 SqliteUtils::Anonymous(name_).c_str());
2375 return E_DATABASE_BUSY;
2376 }
2377 errCode = statement->Execute();
2378 if (errCode != E_OK) {
2379 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2380 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2381 }
2382 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2383 SqliteUtils::Anonymous(name_).c_str(), errCode);
2384 return errCode;
2385 }
2386 pool->SetInTransaction(false);
2387 // 1 means the number of transactions in process
2388 if (transactionId > 1) {
2389 LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2390 SqliteUtils::Anonymous(name_).c_str(), errCode);
2391 }
2392 pool->GetTransactionStack().pop();
2393 return E_OK;
2394 }
2395
Commit(int64_t trxId)2396 int RdbStoreImpl::Commit(int64_t trxId)
2397 {
2398 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2399 return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
2400 }
2401
IsInTransaction()2402 bool RdbStoreImpl::IsInTransaction()
2403 {
2404 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2405 return false;
2406 }
2407 auto pool = GetPool();
2408 if (pool == nullptr) {
2409 return false;
2410 }
2411 return pool->IsInTransaction();
2412 }
2413
CheckAttach(const std::string & sql)2414 int RdbStoreImpl::CheckAttach(const std::string &sql)
2415 {
2416 size_t index = sql.find_first_not_of(' ');
2417 if (index == std::string::npos) {
2418 return E_OK;
2419 }
2420
2421 /* The first 3 characters can determine the type */
2422 std::string sqlType = sql.substr(index, 3);
2423 sqlType = SqliteUtils::StrToUpper(sqlType);
2424 if (sqlType != "ATT") {
2425 return E_OK;
2426 }
2427 Suspender suspender(Suspender::SQL_LOG);
2428 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
2429 if (statement == nullptr) {
2430 return errCode;
2431 }
2432
2433 errCode = statement->Execute();
2434 if (errCode != E_OK) {
2435 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2436 return errCode;
2437 }
2438 auto [errorCode, valueObject] = statement->GetColumn(0);
2439 if (errorCode != E_OK) {
2440 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2441 return errorCode;
2442 }
2443 auto journal = std::get_if<std::string>(&valueObject.value);
2444 auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2445 if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2446 LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2447 return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2448 }
2449 return E_OK;
2450 }
2451
IsOpen() const2452 bool RdbStoreImpl::IsOpen() const
2453 {
2454 return isOpen_;
2455 }
2456
GetPath()2457 std::string RdbStoreImpl::GetPath()
2458 {
2459 return path_;
2460 }
2461
IsReadOnly() const2462 bool RdbStoreImpl::IsReadOnly() const
2463 {
2464 return isReadOnly_;
2465 }
2466
IsMemoryRdb() const2467 bool RdbStoreImpl::IsMemoryRdb() const
2468 {
2469 return isMemoryRdb_;
2470 }
2471
GetName()2472 std::string RdbStoreImpl::GetName()
2473 {
2474 return name_;
2475 }
2476
DoCloudSync(const std::string & table)2477 void RdbStoreImpl::DoCloudSync(const std::string &table)
2478 {
2479 #if !defined(CROSS_PLATFORM)
2480 auto needSync = cloudInfo_->Change(table);
2481 if (!needSync) {
2482 return;
2483 }
2484 auto pool = TaskExecutor::GetInstance().GetExecutor();
2485 if (pool == nullptr) {
2486 return;
2487 }
2488 auto interval =
2489 std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2490 pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2491 auto changeInfo = cloudInfo.lock();
2492 if (changeInfo == nullptr) {
2493 return;
2494 }
2495 auto tables = changeInfo->Steal();
2496 if (tables.empty()) {
2497 return;
2498 }
2499 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2500 auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2501 InnerSync(param, option, memo, nullptr);
2502 });
2503 #endif
2504 }
GetFileType()2505 std::string RdbStoreImpl::GetFileType()
2506 {
2507 return fileType_;
2508 }
2509
2510 /**
2511 * Sets the database locale.
2512 */
ConfigLocale(const std::string & localeStr)2513 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2514 {
2515 if (!isOpen_) {
2516 LOG_ERROR("The connection pool has been closed.");
2517 return E_ALREADY_CLOSED;
2518 }
2519
2520 auto pool = GetPool();
2521 if (pool == nullptr) {
2522 return E_ALREADY_CLOSED;
2523 }
2524 config_.SetCollatorLocales(localeStr);
2525 return pool->ConfigLocale(localeStr);
2526 }
2527
GetDestPath(const std::string & backupPath,std::string & destPath)2528 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2529 {
2530 int ret = GetDataBasePath(backupPath, destPath);
2531 if (ret != E_OK) {
2532 return ret;
2533 }
2534 std::string tempPath = destPath + ".tmp";
2535 if (access(tempPath.c_str(), F_OK) == E_OK) {
2536 destPath = tempPath;
2537 }
2538
2539 if (access(destPath.c_str(), F_OK) != E_OK) {
2540 LOG_ERROR("The backupFilePath does not exists.");
2541 return E_INVALID_FILE_PATH;
2542 }
2543 return E_OK;
2544 }
2545
SwitchOver(bool isUseReplicaDb)2546 void RdbStoreImpl::SwitchOver(bool isUseReplicaDb)
2547 {
2548 isUseReplicaDb_ = isUseReplicaDb;
2549 }
2550
IsUseAsyncRestore(const std::string & newPath,const std::string & backupPath)2551 bool RdbStoreImpl::IsUseAsyncRestore(const std::string &newPath, const std::string &backupPath)
2552 {
2553 if (config_.GetPath() == backupPath || newPath == backupPath ||
2554 config_.GetDBType() == DB_VECTOR || config_.GetHaMode() == HAMode::SINGLE ||
2555 !SqliteUtils::IsSlaveDbName(backupPath) || !SqliteUtils::IsSlaveLarge(config_.GetPath())) {
2556 return false;
2557 }
2558 return true;
2559 }
2560
RestoreWithPool(std::shared_ptr<ConnectionPool> pool,const std::string & path)2561 int32_t RdbStoreImpl::RestoreWithPool(std::shared_ptr<ConnectionPool> pool, const std::string &path)
2562 {
2563 if (pool == nullptr) {
2564 return E_OK;
2565 }
2566 auto connection = pool->AcquireConnection(false);
2567 if (connection == nullptr) {
2568 return E_DATABASE_BUSY;
2569 }
2570 pool->ReopenConns();
2571 auto curStatus = std::make_shared<SlaveStatus>(SlaveStatus::UNDEFINED);
2572 return connection->Restore(path, {}, curStatus);
2573 }
2574
StartAsyncRestore(std::shared_ptr<ConnectionPool> pool) const2575 int RdbStoreImpl::StartAsyncRestore(std::shared_ptr<ConnectionPool> pool) const
2576 {
2577 auto keyFilesPtr = std::make_shared<RdbSecurityManager::KeyFiles>(path_ + ASYNC_RESTORE);
2578 SqliteUtils::SetSlaveRestoring(path_);
2579 auto err = keyFilesPtr->Lock(false);
2580 if (err == E_OK) {
2581 auto taskPool = TaskExecutor::GetInstance().GetExecutor();
2582 if (taskPool == nullptr) {
2583 LOG_ERROR("Get thread pool failed");
2584 keyFilesPtr->Unlock();
2585 return E_ERROR;
2586 }
2587 taskPool->Execute([keyFilesPtr, config = config_, pool] {
2588 auto dbPath = config.GetPath();
2589 LOG_INFO("async restore started for %{public}s", SqliteUtils::Anonymous(dbPath).c_str());
2590 auto result = RdbStoreImpl::RestoreWithPool(pool, dbPath);
2591 if (result != E_OK) {
2592 LOG_WARN("async restore failed, retry once, %{public}d", result);
2593 result = RdbStoreImpl::RestoreWithPool(pool, dbPath);
2594 }
2595 if (result != E_OK) {
2596 LOG_WARN("async restore failed, %{public}d", result);
2597 SqliteUtils::SetSlaveInvalid(dbPath);
2598 }
2599 SqliteUtils::SetSlaveRestoring(dbPath, false);
2600 if (pool != nullptr) {
2601 pool->ReopenConns();
2602 }
2603 keyFilesPtr->Unlock();
2604 });
2605
2606 return E_OK;
2607 }
2608 LOG_WARN("Get process lock failed. Async restore is started in another process, %{public}s",
2609 SqliteUtils::Anonymous(path_).c_str());
2610 return E_OK;
2611 }
2612
StartAsyncBackupIfNeed(std::shared_ptr<SlaveStatus> slaveStatus)2613 int RdbStoreImpl::StartAsyncBackupIfNeed(std::shared_ptr<SlaveStatus> slaveStatus)
2614 {
2615 auto taskPool = TaskExecutor::GetInstance().GetExecutor();
2616 if (taskPool == nullptr) {
2617 LOG_ERROR("Get thread pool failed");
2618 return E_ERROR;
2619 }
2620 auto config = config_;
2621 config.SetCreateNecessary(false);
2622 taskPool->Execute([config, slaveStatus] {
2623 if (*slaveStatus == SlaveStatus::DB_CLOSING) {
2624 return;
2625 }
2626 auto [result, conn] = CreateWritableConn(config);
2627 if (result != E_OK || conn == nullptr) {
2628 return;
2629 }
2630 auto strategy = conn->GenerateExchangeStrategy(slaveStatus);
2631 if (*slaveStatus == SlaveStatus::DB_CLOSING) {
2632 return;
2633 }
2634 LOG_INFO("async exchange st:%{public}d,", strategy);
2635 if (strategy == ExchangeStrategy::BACKUP) {
2636 (void)conn->Backup({}, {}, false, slaveStatus);
2637 }
2638 });
2639 return E_OK;
2640 }
2641
RestoreInner(const std::string & destPath,const std::vector<uint8_t> & newKey,std::shared_ptr<ConnectionPool> pool)2642 int RdbStoreImpl::RestoreInner(const std::string &destPath, const std::vector<uint8_t> &newKey,
2643 std::shared_ptr<ConnectionPool> pool)
2644 {
2645 bool isUseAsync = IsUseAsyncRestore(path_, destPath);
2646 LOG_INFO("restore start, using async=%{public}d", isUseAsync);
2647 if (!isUseAsync) {
2648 auto err = pool->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2649 LOG_INFO("restore finished, sync mode rc=%{public}d", err);
2650 return err;
2651 }
2652
2653 auto connection = pool->AcquireConnection(false);
2654 if (connection == nullptr) {
2655 LOG_WARN("Failed to obtain writer for async restore");
2656 return E_DATABASE_BUSY;
2657 }
2658
2659 int errCode = connection->CheckReplicaForRestore();
2660 if (errCode != E_OK) {
2661 return errCode;
2662 }
2663 errCode = StartAsyncRestore(pool);
2664 LOG_INFO("restore finished, async mode rc=%{public}d", errCode);
2665 return errCode;
2666 }
2667
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2668 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2669 {
2670 LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2671 if (isReadOnly_ || isMemoryRdb_) {
2672 return E_NOT_SUPPORT;
2673 }
2674
2675 auto pool = GetPool();
2676 if (pool == nullptr || !isOpen_) {
2677 LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, pool == nullptr);
2678 return E_ALREADY_CLOSED;
2679 }
2680
2681 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2682 keyFiles.Lock();
2683 std::string destPath;
2684 bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2685 if (!isOK) {
2686 int ret = GetDestPath(backupPath, destPath);
2687 if (ret != E_OK) {
2688 keyFiles.Unlock();
2689 return ret;
2690 }
2691 }
2692 #if !defined(CROSS_PLATFORM)
2693 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2694 if (service != nullptr) {
2695 service->Disable(syncerParam_);
2696 }
2697 #endif
2698 bool corrupt = Reportor::IsReportCorruptedFault(path_);
2699 int errCode = RestoreInner(destPath, newKey, pool);
2700 keyFiles.Unlock();
2701 #if !defined(CROSS_PLATFORM)
2702 SecurityPolicy::SetSecurityLabel(config_);
2703 if (service != nullptr) {
2704 service->Enable(syncerParam_);
2705 if (errCode == E_OK) {
2706 auto syncerParam = syncerParam_;
2707 syncerParam.infos_ = Connection::Collect(config_);
2708 service->AfterOpen(syncerParam);
2709 NotifyDataChange();
2710 }
2711 }
2712 #endif
2713 if (errCode == E_OK) {
2714 ExchangeSlaverToMaster();
2715 Reportor::ReportRestore(Reportor::Create(config_, E_OK, "ErrorType::RdbStoreImpl::Restore", false), corrupt);
2716 rebuild_ = RebuiltType::NONE;
2717 }
2718 DoCloudSync("");
2719 return errCode;
2720 }
2721
CreateWritableConn(const RdbStoreConfig & config)2722 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn(const RdbStoreConfig &config)
2723 {
2724 auto [result, conn] = Connection::Create(config, true);
2725 if (result != E_OK || conn == nullptr) {
2726 LOG_ERROR("create connection failed, err:%{public}d", result);
2727 return { result, nullptr };
2728 }
2729 return { E_OK, conn };
2730 }
2731
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2732 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2733 const std::string &sql, std::shared_ptr<Connection> conn) const
2734 {
2735 if (conn == nullptr) {
2736 return { E_DATABASE_BUSY, nullptr };
2737 }
2738
2739 if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveRestoring(config_.GetPath())) {
2740 auto keyFiles = RdbSecurityManager::KeyFiles(config_.GetPath() + ASYNC_RESTORE);
2741 int32_t ret = keyFiles.Lock(false);
2742 if (ret != E_OK) {
2743 if (isUseReplicaDb_) {
2744 LOG_INFO("Use replica statement, %{public}s", SqliteUtils::Anonymous(config_.GetPath()).c_str());
2745 return conn->CreateReplicaStatement(sql, conn);
2746 }
2747 return { E_DATABASE_BUSY, nullptr };
2748 }
2749 SqliteUtils::SetSlaveRestoring(config_.GetPath(), false);
2750 (void)keyFiles.Unlock();
2751 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2752 LOG_WARN("Got lock file but process is not in restore, mark removed, st:%{public}d, %{public}s",
2753 strategy, SqliteUtils::Anonymous(config_.GetPath()).c_str());
2754 if (strategy != ExchangeStrategy::RESTORE) {
2755 return conn->CreateStatement(sql, conn);
2756 }
2757 auto result = conn->CheckReplicaForRestore();
2758 if (result == E_OK) {
2759 result = StartAsyncRestore(GetPool());
2760 return conn->CreateReplicaStatement(sql, conn);
2761 }
2762 }
2763
2764 return conn->CreateStatement(sql, conn);
2765 }
2766
GetStatement(const std::string & sql,bool read) const2767 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string &sql, bool read) const
2768 {
2769 auto pool = GetPool();
2770 if (pool == nullptr) {
2771 return { E_ALREADY_CLOSED, nullptr };
2772 }
2773 auto conn = pool->AcquireConnection(read);
2774 if (conn == nullptr) {
2775 return { E_DATABASE_BUSY, nullptr };
2776 }
2777 return GetStatement(sql, conn);
2778 }
2779
GetRebuilt(RebuiltType & rebuilt)2780 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2781 {
2782 rebuilt = static_cast<RebuiltType>(rebuild_);
2783 return E_OK;
2784 }
2785
InterruptBackup()2786 int RdbStoreImpl::InterruptBackup()
2787 {
2788 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2789 return E_NOT_SUPPORT;
2790 }
2791 if (*slaveStatus_ == SlaveStatus::BACKING_UP) {
2792 *slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2793 return E_OK;
2794 }
2795 return E_CANCEL;
2796 }
2797
GetBackupStatus() const2798 int32_t RdbStoreImpl::GetBackupStatus() const
2799 {
2800 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2801 return SlaveStatus::UNDEFINED;
2802 }
2803 return *slaveStatus_;
2804 }
2805
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2806 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2807 {
2808 if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2809 return false;
2810 }
2811 int ret = GetSlaveName(config_.GetPath(), destPath);
2812 if (ret != E_OK) {
2813 destPath = {};
2814 return false;
2815 }
2816 if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2817 LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2818 return false;
2819 }
2820 return true;
2821 }
2822
IsSlaveDiffFromMaster() const2823 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2824 {
2825 std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2826 return SqliteUtils::IsSlaveInvalid(config_.GetPath()) || (access(slaveDbPath.c_str(), F_OK) != 0);
2827 }
2828
ExchangeSlaverToMaster()2829 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2830 {
2831 if (isReadOnly_ || isMemoryRdb_ || rebuild_ != RebuiltType::NONE) {
2832 return E_OK;
2833 }
2834 auto [errCode, conn] = GetConn(false);
2835 if (errCode != E_OK) {
2836 return errCode;
2837 }
2838 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_, false);
2839 if (strategy != ExchangeStrategy::NOT_HANDLE) {
2840 LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2841 }
2842 int ret = E_OK;
2843 if (strategy == ExchangeStrategy::RESTORE && !SqliteUtils::IsSlaveRestoring(config_.GetPath())) {
2844 conn = nullptr;
2845 // disable is required before restore
2846 ret = Restore({}, {});
2847 } else if (strategy == ExchangeStrategy::BACKUP) {
2848 // async backup
2849 ret = conn->Backup({}, {}, true, slaveStatus_);
2850 } else if (strategy == ExchangeStrategy::PENDING_BACKUP) {
2851 ret = StartAsyncBackupIfNeed(slaveStatus_);
2852 }
2853 return ret;
2854 }
2855
GetDbType() const2856 int32_t RdbStoreImpl::GetDbType() const
2857 {
2858 return config_.GetDBType();
2859 }
2860
CreateTransaction(int32_t type)2861 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2862 {
2863 if (isReadOnly_) {
2864 return { E_NOT_SUPPORT, nullptr };
2865 }
2866 auto pool = GetPool();
2867 if (pool == nullptr) {
2868 return { E_ALREADY_CLOSED, nullptr };
2869 }
2870 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_TOTAL);
2871 auto [errCode, conn] = pool->CreateTransConn();
2872 if (conn == nullptr) {
2873 return { errCode, nullptr };
2874 }
2875 std::shared_ptr<Transaction> trans;
2876 std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetPath());
2877 if (trans == nullptr) {
2878 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2879 pool->Dump(true, "TRANS");
2880 }
2881 return { errCode, nullptr };
2882 }
2883
2884 std::lock_guard<decltype(mutex_)> guard(mutex_);
2885 for (auto it = transactions_.begin(); it != transactions_.end();) {
2886 if (it->expired()) {
2887 it = transactions_.erase(it);
2888 } else {
2889 it++;
2890 }
2891 }
2892 transactions_.push_back(trans);
2893 return { errCode, trans };
2894 }
2895
CleanDirtyLog(const std::string & table,uint64_t cursor)2896 int RdbStoreImpl::CleanDirtyLog(const std::string &table, uint64_t cursor)
2897 {
2898 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || isMemoryRdb_) {
2899 LOG_ERROR("Not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d, isMemoryRdb:%{public}d.",
2900 SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType(), isMemoryRdb_);
2901 return E_NOT_SUPPORT;
2902 }
2903 auto [errCode, conn] = GetConn(false);
2904 if (errCode != E_OK || conn == nullptr) {
2905 LOG_ERROR("The database is busy or closed errCode:%{public}d", errCode);
2906 return errCode;
2907 }
2908 return conn->CleanDirtyLog(table, cursor);
2909 }
2910
GetValues(std::shared_ptr<Statement> statement)2911 std::shared_ptr<ResultSet> RdbStoreImpl::GetValues(std::shared_ptr<Statement> statement)
2912 {
2913 if (statement == nullptr) {
2914 return nullptr;
2915 }
2916 auto [code, rows] = statement->GetRows(MAX_RETURNING_ROWS);
2917 auto size = rows.size();
2918 std::shared_ptr<ResultSet> result = std::make_shared<CacheResultSet>(std::move(rows));
2919 // The correct number of changed rows can only be obtained after completing the step
2920 while (code == E_OK && size == MAX_RETURNING_ROWS) {
2921 std::tie(code, rows) = statement->GetRows(MAX_RETURNING_ROWS);
2922 size = rows.size();
2923 }
2924 return result;
2925 }
2926
GenerateResult(int32_t code,std::shared_ptr<Statement> statement,bool isDML)2927 Results RdbStoreImpl::GenerateResult(int32_t code, std::shared_ptr<Statement> statement, bool isDML)
2928 {
2929 Results result{ -1 };
2930 if (statement == nullptr) {
2931 return result;
2932 }
2933 // There are no data changes in other scenarios
2934 if (code == E_OK) {
2935 result.results = GetValues(statement);
2936 result.changed = isDML ? statement->Changes() : 0;
2937 }
2938 if (code == E_SQLITE_CONSTRAINT) {
2939 result.changed = statement->Changes();
2940 }
2941 if (isDML && result.changed <= 0) {
2942 result.results = std::make_shared<CacheResultSet>();
2943 }
2944 return result;
2945 }
2946
TryDump(int32_t code,const char * dumpHeader)2947 void RdbStoreImpl::TryDump(int32_t code, const char *dumpHeader)
2948 {
2949 if (code != E_SQLITE_LOCKED && code != E_SQLITE_BUSY) {
2950 return;
2951 }
2952 auto pool = GetPool();
2953 if (pool != nullptr) {
2954 pool->Dump(true, dumpHeader);
2955 }
2956 }
2957
AddTables(const std::vector<std::string> & tables)2958 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2959 {
2960 std::lock_guard<std::mutex> lock(mutex_);
2961 for (auto &table : tables) {
2962 tables_.insert(table);
2963 }
2964 return E_OK;
2965 }
2966
RmvTables(const std::vector<std::string> & tables)2967 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2968 {
2969 std::lock_guard<std::mutex> lock(mutex_);
2970 for (auto &table : tables) {
2971 tables_.erase(table);
2972 }
2973 return E_OK;
2974 }
2975
Change(const std::string & table)2976 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2977 {
2978 bool needSync = false;
2979 {
2980 std::lock_guard<std::mutex> lock(mutex_);
2981 if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2982 return needSync;
2983 }
2984 // from empty, then need schedule the cloud sync, others only wait the schedule execute.
2985 needSync = changes_.empty();
2986 if (!table.empty()) {
2987 changes_.insert(table);
2988 } else {
2989 changes_.insert(tables_.begin(), tables_.end());
2990 }
2991 }
2992 return needSync;
2993 }
2994
Steal()2995 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2996 {
2997 std::set<std::string> result;
2998 {
2999 std::lock_guard<std::mutex> lock(mutex_);
3000 result = std::move(changes_);
3001 }
3002 return result;
3003 }
3004
SetKnowledgeSchema()3005 void RdbStoreImpl::SetKnowledgeSchema()
3006 {
3007 auto [errCode, schema] = GetKnowledgeSchemaHelper()->GetRdbKnowledgeSchema(config_.GetName());
3008 if (errCode != E_OK) {
3009 return;
3010 }
3011 auto [ret, conn] = GetConn(false);
3012 if (ret != E_OK) {
3013 LOG_ERROR("The database is busy or closed when set knowledge schema ret %{public}d.", ret);
3014 return;
3015 }
3016 if (isKnowledgeSchemaReady_) {
3017 return;
3018 }
3019 ret = conn->SetKnowledgeSchema(schema);
3020 if (ret != E_OK) {
3021 LOG_ERROR("Set knowledge schema failed %{public}d.", ret);
3022 return;
3023 }
3024 isKnowledgeSchemaReady_ = true;
3025 auto helper = GetKnowledgeSchemaHelper();
3026 helper->Init(config_, schema);
3027 helper->DonateKnowledgeData();
3028 }
3029
InitKnowledgeSchema(const DistributedRdb::RdbKnowledgeSchema & schema)3030 int RdbStoreImpl::InitKnowledgeSchema(const DistributedRdb::RdbKnowledgeSchema &schema)
3031 {
3032 auto [ret, conn] = GetConn(false);
3033 if (ret != E_OK) {
3034 LOG_ERROR("The database is busy or closed when set knowledge schema ret %{public}d.", ret);
3035 return ret;
3036 }
3037 ret = conn->SetKnowledgeSchema(schema);
3038 if (ret != E_OK) {
3039 LOG_ERROR("Set knowledge schema failed %{public}d.", ret);
3040 return ret;
3041 }
3042 return E_OK;
3043 }
3044
GetKnowledgeSchemaHelper()3045 std::shared_ptr<NativeRdb::KnowledgeSchemaHelper> RdbStoreImpl::GetKnowledgeSchemaHelper()
3046 {
3047 std::lock_guard<std::mutex> autoLock(helperMutex_);
3048 if (knowledgeSchemaHelper_ == nullptr) {
3049 knowledgeSchemaHelper_ = std::make_shared<NativeRdb::KnowledgeSchemaHelper>();
3050 }
3051 return knowledgeSchemaHelper_;
3052 }
3053
IsKnowledgeDataChange(const DistributedRdb::RdbChangedData & rdbChangedData)3054 bool RdbStoreImpl::IsKnowledgeDataChange(const DistributedRdb::RdbChangedData &rdbChangedData)
3055 {
3056 for (const auto &item : rdbChangedData.tableData) {
3057 if (item.second.isKnowledgeDataChange) {
3058 return true;
3059 }
3060 }
3061 return false;
3062 }
3063
IsNotifyService(const DistributedRdb::RdbChangedData & rdbChangedData)3064 bool RdbStoreImpl::IsNotifyService(const DistributedRdb::RdbChangedData &rdbChangedData)
3065 {
3066 for (const auto &item : rdbChangedData.tableData) {
3067 if (item.second.isP2pSyncDataChange || item.second.isTrackedDataChange) {
3068 return true;
3069 }
3070 }
3071 return false;
3072 }
3073
RegisterAlgo(const std::string & clstAlgoName,ClusterAlgoFunc func)3074 int RdbStoreImpl::RegisterAlgo(const std::string &clstAlgoName, ClusterAlgoFunc func)
3075 {
3076 if (!config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
3077 return E_NOT_SUPPORT;
3078 }
3079
3080 auto [ret, conn] = GetConn(false);
3081 if (ret != E_OK) {
3082 LOG_ERROR("The database is busy or closed when RegisterAlgo ret %{public}d.", ret);
3083 return ret;
3084 }
3085 return conn->RegisterAlgo(clstAlgoName, func);
3086 }
3087 } // namespace OHOS::NativeRdb
3088