1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17
18 #include <unistd.h>
19
20 #include <algorithm>
21 #include <chrono>
22 #include <cinttypes>
23 #include <cstdint>
24 #include <fstream>
25 #include <memory>
26 #include <mutex>
27 #include <sstream>
28 #include <sys/stat.h>
29 #include <string>
30
31 #include "cache_result_set.h"
32 #include "connection_pool.h"
33 #include "delay_notify.h"
34 #include "directory_ex.h"
35 #include "logger.h"
36 #include "rdb_common.h"
37 #include "rdb_errno.h"
38 #include "rdb_fault_hiview_reporter.h"
39 #include "rdb_local_db_observer.h"
40 #include "rdb_radar_reporter.h"
41 #include "rdb_stat_reporter.h"
42 #include "rdb_security_manager.h"
43 #include "rdb_sql_statistic.h"
44 #include "rdb_store.h"
45 #include "rdb_trace.h"
46 #include "rdb_types.h"
47 #include "relational_store_client.h"
48 #include "sqlite_global_config.h"
49 #include "sqlite_sql_builder.h"
50 #include "sqlite_statement.h"
51 #include "sqlite_utils.h"
52 #include "step_result_set.h"
53 #include "task_executor.h"
54 #include "traits.h"
55 #include "transaction.h"
56 #include "values_buckets.h"
57 #if !defined(CROSS_PLATFORM)
58 #include "raw_data_parser.h"
59 #include "rdb_device_manager_adapter.h"
60 #include "rdb_manager_impl.h"
61 #include "relational_store_manager.h"
62 #include "runtime_config.h"
63 #include "security_policy.h"
64 #include "sqlite_shared_result_set.h"
65 #endif
66
67 #ifdef WINDOWS_PLATFORM
68 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
69 #else
70 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
71 #endif
72 #include "rdb_time_utils.h"
73
74 namespace OHOS::NativeRdb {
75 using namespace OHOS::Rdb;
76 using namespace std::chrono;
77 using SqlStatistic = DistributedRdb::SqlStatistic;
78 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
79 using Reportor = RdbFaultHiViewReporter;
80 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
81 using RdbMgr = DistributedRdb::RdbManagerImpl;
82 #endif
83
84 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
85 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
86 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
87 static constexpr const char *BACKUP_RESTORE = "backup.restore";
88 constexpr int64_t TIME_OUT = 1500;
89
InitSyncerParam(const RdbStoreConfig & config,bool created)90 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
91 {
92 syncerParam_.bundleName_ = config.GetBundleName();
93 syncerParam_.hapName_ = config.GetModuleName();
94 syncerParam_.storeName_ = config.GetName();
95 syncerParam_.customDir_ = config.GetCustomDir();
96 syncerParam_.area_ = config.GetArea();
97 syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
98 syncerParam_.type_ = config.GetDistributedType();
99 syncerParam_.isEncrypt_ = config.IsEncrypt();
100 syncerParam_.isAutoClean_ = config.GetAutoClean();
101 syncerParam_.isSearchable_ = config.IsSearchable();
102 syncerParam_.password_ = config.GetEncryptKey();
103 syncerParam_.haMode_ = config.GetHaMode();
104 syncerParam_.roleType_ = config.GetRoleType();
105 syncerParam_.tokenIds_ = config.GetPromiseInfo().tokenIds_;
106 syncerParam_.uids_ = config.GetPromiseInfo().uids_;
107 syncerParam_.user_ = config.GetPromiseInfo().user_;
108 syncerParam_.permissionNames_ = config.GetPromiseInfo().permissionNames_;
109 syncerParam_.subUser_ = config.GetSubUser();
110 syncerParam_.dfxInfo_.lastOpenTime_ = RdbTimeUtils::GetCurSysTimeWithMs();
111 if (created) {
112 syncerParam_.infos_ = Connection::Collect(config);
113 }
114 }
115
InnerOpen()116 int RdbStoreImpl::InnerOpen()
117 {
118 isOpen_ = true;
119 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
120 if (isReadOnly_ || isMemoryRdb_) {
121 return E_OK;
122 }
123
124 AfterOpen(syncerParam_);
125 if (config_.GetDBType() == DB_VECTOR || !config_.IsSearchable()) {
126 return E_OK;
127 }
128 int errCode = RegisterDataChangeCallback();
129 if (errCode != E_OK) {
130 LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
131 }
132 #endif
133 return E_OK;
134 }
135
InitReportFunc(const RdbParam & param)136 void RdbStoreImpl::InitReportFunc(const RdbParam ¶m)
137 {
138 #if !defined(CROSS_PLATFORM)
139 reportFunc_ = std::make_shared<ReportFunc>([reportParam = param](const DistributedRdb::RdbStatEvent &event) {
140 auto [err, service] = RdbMgr::GetInstance().GetRdbService(reportParam);
141 if (err != E_OK || service == nullptr) {
142 LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
143 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
144 return;
145 }
146 err = service->ReportStatistic(reportParam, event);
147 if (err != E_OK) {
148 LOG_ERROR("ReportStatistic failed, err: %{public}d, storeName: %{public}s.", err,
149 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
150 }
151 return;
152 });
153 #endif
154 }
155
Close()156 void RdbStoreImpl::Close()
157 {
158 {
159 std::unique_lock<decltype(poolMutex_)> lock(poolMutex_);
160 if (connectionPool_) {
161 connectionPool_->CloseAllConnections();
162 connectionPool_.reset();
163 }
164 }
165 {
166 std::lock_guard<decltype(mutex_)> guard(mutex_);
167 for (auto &it : transactions_) {
168 auto trans = it.lock();
169 if (trans != nullptr) {
170 trans->Close();
171 }
172 }
173 transactions_ = {};
174 }
175 }
176
GetPool() const177 std::shared_ptr<ConnectionPool> RdbStoreImpl::GetPool() const
178 {
179 std::shared_lock<decltype(poolMutex_)> lock(poolMutex_);
180 return connectionPool_;
181 }
182
GetConn(bool isRead)183 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::GetConn(bool isRead)
184 {
185 auto pool = GetPool();
186 if (pool == nullptr) {
187 return { E_ALREADY_CLOSED, nullptr };
188 }
189
190 auto connection = pool->AcquireConnection(isRead);
191 if (connection == nullptr) {
192 return { E_DATABASE_BUSY, nullptr };
193 }
194 return { E_OK, connection };
195 }
196
197 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)198 void RdbStoreImpl::AfterOpen(const RdbParam ¶m, int32_t retry)
199 {
200 auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
201 if (err == E_NOT_SUPPORT) {
202 return;
203 }
204 if (err != E_OK || service == nullptr) {
205 LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
206 SqliteUtils::Anonymous(param.storeName_).c_str());
207 auto pool = TaskExecutor::GetInstance().GetExecutor();
208 if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry++ < MAX_RETRY_TIMES) {
209 pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
210 AfterOpen(param, retry);
211 });
212 }
213 return;
214 }
215 err = service->AfterOpen(param);
216 if (err != E_OK) {
217 LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
218 SqliteUtils::Anonymous(param.storeName_).c_str());
219 }
220 }
221
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)222 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(
223 const std::string &table, const std::string &columnName, std::vector<PRIKey> &keys)
224 {
225 if (table.empty() || columnName.empty() || keys.empty()) {
226 LOG_ERROR("Invalid para.");
227 return {};
228 }
229
230 auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
231 if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
232 return GetModifyTimeByRowId(logTable, keys);
233 }
234 std::vector<ValueObject> hashKeys;
235 hashKeys.reserve(keys.size());
236 std::map<std::vector<uint8_t>, PRIKey> keyMap;
237 std::map<std::string, DistributedDB::Type> tmp;
238 for (const auto &key : keys) {
239 DistributedDB::Type value;
240 RawDataParser::Convert(key, value);
241 tmp[columnName] = value;
242 auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
243 if (hashKey.empty()) {
244 LOG_DEBUG("Hash key fail.");
245 continue;
246 }
247 hashKeys.emplace_back(ValueObject(hashKey));
248 keyMap[hashKey] = key;
249 }
250
251 std::string sql;
252 sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
253 sql.append(logTable);
254 sql.append(" where hash_key in (");
255 sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
256 sql.append(")");
257 auto resultSet = QueryByStep(sql, hashKeys, true);
258 int count = 0;
259 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
260 LOG_ERROR("Get resultSet err.");
261 return {};
262 }
263 return { resultSet, keyMap, false };
264 }
265
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)266 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
267 {
268 std::string sql;
269 sql.append("select data_key as key, timestamp/10000 as modify_time from ");
270 sql.append(logTable);
271 sql.append(" where data_key in (");
272 sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
273 sql.append(")");
274 std::vector<ValueObject> args;
275 args.reserve(keys.size());
276 for (auto &key : keys) {
277 ValueObject::Type value;
278 RawDataParser::Convert(key, value);
279 args.emplace_back(ValueObject(value));
280 }
281 auto resultSet = QueryByStep(sql, args, true);
282 int count = 0;
283 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
284 LOG_ERROR("Get resultSet err.");
285 return {};
286 }
287 return ModifyTime(resultSet, {}, true);
288 }
289
CleanDirtyData(const std::string & table,uint64_t cursor)290 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
291 {
292 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || isMemoryRdb_) {
293 LOG_ERROR("Not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d, isMemoryRdb:%{public}d.",
294 SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType(), isMemoryRdb_);
295 return E_NOT_SUPPORT;
296 }
297 auto [errCode, conn] = GetConn(false);
298 if (errCode != E_OK) {
299 LOG_ERROR("The database is busy or closed.");
300 return errCode;
301 }
302 errCode = conn->CleanDirtyData(table, cursor);
303 return errCode;
304 }
305
GetLogTableName(const std::string & tableName)306 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
307 {
308 return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
309 }
310
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)311 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(
312 const AbsRdbPredicates &predicates, const Fields &columns)
313 {
314 if (config_.GetDBType() == DB_VECTOR) {
315 return { E_NOT_SUPPORT, nullptr };
316 }
317 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
318 if (errCode != E_OK) {
319 return { errCode, nullptr };
320 }
321 auto [status, resultSet] =
322 service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
323 if (status != E_OK) {
324 return { status, nullptr };
325 }
326 return { status, resultSet };
327 }
328
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)329 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(
330 const std::string &device, const AbsRdbPredicates &predicates, const Fields &columns, int &errCode)
331 {
332 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
333 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
334 return nullptr;
335 }
336 std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
337 std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
338 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
339 if (err == E_NOT_SUPPORT) {
340 errCode = err;
341 return nullptr;
342 }
343 if (err != E_OK) {
344 LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed.");
345 errCode = err;
346 return nullptr;
347 }
348 auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
349 errCode = status;
350 return resultSet;
351 }
352
NotifyDataChange()353 void RdbStoreImpl::NotifyDataChange()
354 {
355 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
356 return;
357 }
358 config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, false);
359 int errCode = RegisterDataChangeCallback();
360 if (errCode != E_OK) {
361 LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
362 }
363 DistributedRdb::RdbChangedData rdbChangedData;
364 if (delayNotifier_ != nullptr) {
365 delayNotifier_->UpdateNotify(rdbChangedData, true);
366 }
367 }
368
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)369 int RdbStoreImpl::SetDistributedTables(
370 const std::vector<std::string> &tables, int32_t type, const DistributedRdb::DistributedConfig &distributedConfig)
371 {
372 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
373 if (config_.GetDBType() == DB_VECTOR || isReadOnly_ || isMemoryRdb_) {
374 return E_NOT_SUPPORT;
375 }
376 if (tables.empty()) {
377 LOG_WARN("The distributed tables to be set is empty.");
378 return E_OK;
379 }
380 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
381 if (errCode != E_OK) {
382 return errCode;
383 }
384 syncerParam_.asyncDownloadAsset_ = distributedConfig.asyncDownloadAsset;
385 syncerParam_.enableCloud_ = distributedConfig.enableCloud;
386 int32_t errorCode = service->SetDistributedTables(
387 syncerParam_, tables, distributedConfig.references, distributedConfig.isRebuild, type);
388 if (errorCode != E_OK) {
389 LOG_ERROR("Fail to set distributed tables, error=%{public}d.", errorCode);
390 return errorCode;
391 }
392 if (type == DistributedRdb::DISTRIBUTED_DEVICE && !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
393 RegisterDataChangeCallback();
394 }
395 if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
396 return E_OK;
397 }
398
399 return HandleCloudSyncAfterSetDistributedTables(tables, distributedConfig);
400 }
401
HandleCloudSyncAfterSetDistributedTables(const std::vector<std::string> & tables,const DistributedRdb::DistributedConfig & distributedConfig)402 int RdbStoreImpl::HandleCloudSyncAfterSetDistributedTables(
403 const std::vector<std::string> &tables, const DistributedRdb::DistributedConfig &distributedConfig)
404 {
405 auto pool = GetPool();
406 if (pool == nullptr) {
407 return E_ALREADY_CLOSED;
408 }
409 auto conn = pool->AcquireConnection(false);
410 if (conn != nullptr) {
411 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
412 if (strategy == ExchangeStrategy::BACKUP) {
413 (void)conn->Backup({}, {}, false, slaveStatus_);
414 }
415 }
416 {
417 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
418 if (distributedConfig.enableCloud && distributedConfig.autoSync) {
419 cloudInfo_->AddTables(tables);
420 } else {
421 cloudInfo_->RmvTables(tables);
422 return E_OK;
423 }
424 }
425 auto isRebuilt = RebuiltType::NONE;
426 GetRebuilt(isRebuilt);
427 if (isRebuilt == RebuiltType::REBUILT) {
428 DoCloudSync("");
429 }
430 return E_OK;
431 }
432
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)433 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
434 {
435 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
436 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
437 return "";
438 }
439 std::string uuid;
440 DeviceManagerAdaptor::RdbDeviceManagerAdaptor &deviceManager =
441 DeviceManagerAdaptor::RdbDeviceManagerAdaptor::GetInstance(syncerParam_.bundleName_);
442 errCode = deviceManager.GetEncryptedUuidByNetworkId(device, uuid);
443 if (errCode != E_OK) {
444 LOG_ERROR("GetUuid is failed.");
445 return "";
446 }
447
448 auto translateCall = [uuid](const std::string &oriDevId, const DistributedDB::StoreInfo &info) {
449 return uuid;
450 };
451 DistributedDB::RuntimeConfig::SetTranslateToDeviceIdCallback(translateCall);
452
453 return DistributedDB::RelationalStoreManager::GetDistributedTableName(uuid, table);
454 }
455
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)456 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
457 {
458 if (config_.GetDBType() == DB_VECTOR) {
459 return E_NOT_SUPPORT;
460 }
461 return Sync(option, predicate, [callback](Details &&details) {
462 Briefs briefs;
463 for (auto &[key, value] : details) {
464 briefs.insert_or_assign(key, value.code);
465 }
466 if (callback != nullptr) {
467 callback(briefs);
468 }
469 });
470 }
471
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)472 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
473 {
474 return Sync(option, AbsRdbPredicates(tables), async);
475 }
476
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)477 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
478 {
479 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
480 if (isMemoryRdb_) {
481 return E_NOT_SUPPORT;
482 }
483 DistributedRdb::RdbService::Option rdbOption;
484 rdbOption.mode = option.mode;
485 rdbOption.isAsync = !option.isBlock;
486 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
487 ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
488 return ret;
489 }
490
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)491 int RdbStoreImpl::InnerSync(
492 const RdbParam ¶m, const Options &option, const Memo &predicates, const AsyncDetail &async)
493 {
494 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
495 if (errCode == E_NOT_SUPPORT) {
496 return errCode;
497 }
498 if (errCode != E_OK) {
499 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
500 param.bundleName_.c_str());
501 return errCode;
502 }
503 errCode = service->Sync(param, option, predicates, async);
504 if (errCode != E_OK) {
505 LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
506 return errCode;
507 }
508 return E_OK;
509 }
510
GetUri(const std::string & event)511 Uri RdbStoreImpl::GetUri(const std::string &event)
512 {
513 std::string rdbUri;
514 if (config_.GetDataGroupId().empty()) {
515 rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
516 } else {
517 rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
518 }
519 return Uri(rdbUri);
520 }
521
SubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)522 int RdbStoreImpl::SubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer)
523 {
524 std::lock_guard<std::mutex> lock(mutex_);
525 localObservers_.try_emplace(option.event);
526 auto &list = localObservers_.find(option.event)->second;
527 for (auto it = list.begin(); it != list.end(); it++) {
528 if ((*it)->getObserver() == observer) {
529 LOG_ERROR("Duplicate subscribe.");
530 return E_OK;
531 }
532 }
533
534 localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
535 return E_OK;
536 }
537
SubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)538 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer)
539 {
540 std::lock_guard<std::mutex> lock(mutex_);
541 localSharedObservers_.try_emplace(option.event);
542 auto &list = localSharedObservers_.find(option.event)->second;
543 for (auto it = list.begin(); it != list.end(); it++) {
544 if ((*it)->getObserver() == observer) {
545 LOG_ERROR("Duplicate subscribe.");
546 return E_OK;
547 }
548 }
549
550 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
551 if (client == nullptr) {
552 LOG_ERROR("Failed to get DataObsMgrClient.");
553 return E_GET_DATAOBSMGRCLIENT_FAIL;
554 }
555 sptr<RdbStoreLocalSharedObserver> localSharedObserver(new (std::nothrow) RdbStoreLocalSharedObserver(observer));
556 int32_t err = client->RegisterObserver(GetUri(option.event), localSharedObserver);
557 if (err != 0) {
558 LOG_ERROR("Subscribe failed.");
559 return err;
560 }
561 localSharedObservers_[option.event].push_back(std::move(localSharedObserver));
562 return E_OK;
563 }
564
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)565 int32_t RdbStoreImpl::SubscribeLocalDetail(
566 const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
567 {
568 if (observer == nullptr) {
569 return E_OK;
570 }
571 std::lock_guard<std::mutex> lock(mutex_);
572 for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end(); it++) {
573 if ((*it)->GetObserver() == observer) {
574 LOG_WARN("duplicate subscribe.");
575 return E_OK;
576 }
577 }
578 auto localStoreObserver = std::make_shared<RdbStoreLocalDbObserver>(observer);
579 auto [errCode, conn] = GetConn(false);
580 if (conn == nullptr) {
581 return errCode;
582 }
583 errCode = conn->Subscribe(localStoreObserver);
584 if (errCode != E_OK) {
585 LOG_ERROR("Subscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
586 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
587 return errCode;
588 }
589 config_.SetRegisterInfo(RegisterType::STORE_OBSERVER, true);
590 localDetailObservers_.emplace_back(localStoreObserver);
591 return E_OK;
592 }
593
SubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)594 int RdbStoreImpl::SubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer)
595 {
596 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
597 if (errCode != E_OK) {
598 return errCode;
599 }
600 return service->Subscribe(syncerParam_, option, observer);
601 }
602
Subscribe(const SubscribeOption & option,RdbStoreObserver * observer)603 int RdbStoreImpl::Subscribe(const SubscribeOption &option, RdbStoreObserver *observer)
604 {
605 if (config_.GetDBType() == DB_VECTOR) {
606 return E_NOT_SUPPORT;
607 }
608 if (option.mode == SubscribeMode::LOCAL) {
609 return SubscribeLocal(option, observer);
610 }
611 if (isMemoryRdb_) {
612 return E_NOT_SUPPORT;
613 }
614 if (option.mode == SubscribeMode::LOCAL_SHARED) {
615 return SubscribeLocalShared(option, observer);
616 }
617 return SubscribeRemote(option, observer);
618 }
619
UnSubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)620 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer)
621 {
622 std::lock_guard<std::mutex> lock(mutex_);
623 auto obs = localObservers_.find(option.event);
624 if (obs == localObservers_.end()) {
625 return E_OK;
626 }
627
628 auto &list = obs->second;
629 for (auto it = list.begin(); it != list.end();) {
630 if (observer == nullptr || (*it)->getObserver() == observer) {
631 it = list.erase(it);
632 if (observer != nullptr) {
633 break;
634 }
635 } else {
636 it++;
637 }
638 }
639
640 if (list.empty()) {
641 localObservers_.erase(option.event);
642 }
643 return E_OK;
644 }
645
UnSubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)646 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer)
647 {
648 std::lock_guard<std::mutex> lock(mutex_);
649 auto obs = localSharedObservers_.find(option.event);
650 if (obs == localSharedObservers_.end()) {
651 return E_OK;
652 }
653
654 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
655 if (client == nullptr) {
656 LOG_ERROR("Failed to get DataObsMgrClient.");
657 return E_GET_DATAOBSMGRCLIENT_FAIL;
658 }
659
660 auto &list = obs->second;
661 for (auto it = list.begin(); it != list.end();) {
662 if (observer == nullptr || (*it)->getObserver() == observer) {
663 int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
664 if (err != 0) {
665 LOG_ERROR("UnSubscribeLocalShared failed.");
666 return err;
667 }
668 it = list.erase(it);
669 if (observer != nullptr) {
670 break;
671 }
672 } else {
673 it++;
674 }
675 }
676 if (list.empty()) {
677 localSharedObservers_.erase(option.event);
678 }
679 return E_OK;
680 }
681
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)682 int32_t RdbStoreImpl::UnsubscribeLocalDetail(
683 const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
684 {
685 auto [errCode, conn] = GetConn(false);
686 if (conn == nullptr) {
687 return errCode;
688 }
689 std::lock_guard<std::mutex> lock(mutex_);
690 for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end();) {
691 if (observer == nullptr || (*it)->GetObserver() == observer) {
692 int32_t err = conn->Unsubscribe(*it);
693 if (err != 0) {
694 LOG_ERROR("Unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
695 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
696 return err;
697 }
698 it = localDetailObservers_.erase(it);
699 if (observer != nullptr) {
700 break;
701 }
702 } else {
703 it++;
704 }
705 }
706 return E_OK;
707 }
708
UnSubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)709 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer)
710 {
711 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
712 if (errCode != E_OK) {
713 return errCode;
714 }
715 return service->UnSubscribe(syncerParam_, option, observer);
716 }
717
UnSubscribe(const SubscribeOption & option,RdbStoreObserver * observer)718 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer)
719 {
720 if (config_.GetDBType() == DB_VECTOR) {
721 return E_NOT_SUPPORT;
722 }
723 if (option.mode == SubscribeMode::LOCAL) {
724 return UnSubscribeLocal(option, observer);
725 }
726 if (isMemoryRdb_) {
727 return E_NOT_SUPPORT;
728 }
729 if (option.mode == SubscribeMode::LOCAL_SHARED) {
730 return UnSubscribeLocalShared(option, observer);
731 }
732 return UnSubscribeRemote(option, observer);
733 }
734
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)735 int RdbStoreImpl::SubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
736 {
737 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
738 return E_NOT_SUPPORT;
739 }
740 return SubscribeLocalDetail(option, observer);
741 }
742
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)743 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
744 {
745 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
746 return E_NOT_SUPPORT;
747 }
748 return UnsubscribeLocalDetail(option, observer);
749 }
750
Notify(const std::string & event)751 int RdbStoreImpl::Notify(const std::string &event)
752 {
753 if (config_.GetDBType() == DB_VECTOR) {
754 return E_NOT_SUPPORT;
755 }
756
757 {
758 std::lock_guard<std::mutex> lock(mutex_);
759 auto obs = localObservers_.find(event);
760 if (obs != localObservers_.end()) {
761 auto &list = obs->second;
762 for (auto &it : list) {
763 it->OnChange();
764 }
765 }
766 }
767 if (isMemoryRdb_) {
768 return E_OK;
769 }
770 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
771 if (client == nullptr) {
772 LOG_ERROR("Failed to get DataObsMgrClient.");
773 return E_GET_DATAOBSMGRCLIENT_FAIL;
774 }
775 int32_t err = client->NotifyChange(GetUri(event));
776 if (err != 0) {
777 LOG_ERROR("Notify failed.");
778 }
779 return E_OK;
780 }
781
SetSearchable(bool isSearchable)782 int RdbStoreImpl::SetSearchable(bool isSearchable)
783 {
784 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
785 return E_NOT_SUPPORT;
786 }
787 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
788 if (errCode != E_OK || service == nullptr) {
789 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
790 return errCode;
791 }
792 return service->SetSearchable(syncerParam_, isSearchable);
793 }
794
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)795 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
796 {
797 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
798 return E_NOT_SUPPORT;
799 }
800 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
801 if (errCode != E_OK) {
802 return errCode;
803 }
804 return service->RegisterAutoSyncCallback(syncerParam_, observer);
805 }
806
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)807 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
808 {
809 if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
810 return E_NOT_SUPPORT;
811 }
812 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
813 if (errCode != E_OK) {
814 return errCode;
815 }
816 return service->UnregisterAutoSyncCallback(syncerParam_, observer);
817 }
818
InitDelayNotifier()819 void RdbStoreImpl::InitDelayNotifier()
820 {
821 if (delayNotifier_ != nullptr) {
822 return;
823 }
824 delayNotifier_ = std::make_shared<DelayNotify>();
825 if (delayNotifier_ == nullptr) {
826 LOG_ERROR("Init delay notifier failed.");
827 return;
828 }
829 delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
830 delayNotifier_->SetTask([param = syncerParam_](const DistributedRdb::RdbChangedData &rdbChangedData,
831 const RdbNotifyConfig &rdbNotifyConfig) -> int {
832 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
833 if (errCode == E_NOT_SUPPORT) {
834 return errCode;
835 }
836 if (errCode != E_OK || service == nullptr) {
837 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
838 return errCode;
839 }
840 return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
841 });
842 }
843
RegisterDataChangeCallback()844 int RdbStoreImpl::RegisterDataChangeCallback()
845 {
846 InitDelayNotifier();
847 auto connPool = GetPool();
848 if (connPool == nullptr) {
849 return E_ALREADY_CLOSED;
850 }
851
852 RegisterDataChangeCallback(delayNotifier_, connPool, 0);
853 config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, true);
854 return E_OK;
855 }
856
RegisterDataChangeCallback(std::shared_ptr<DelayNotify> delayNotifier,std::weak_ptr<ConnectionPool> connPool,int retry)857 void RdbStoreImpl::RegisterDataChangeCallback(
858 std::shared_ptr<DelayNotify> delayNotifier, std::weak_ptr<ConnectionPool> connPool, int retry)
859 {
860 auto relConnPool = connPool.lock();
861 if (relConnPool == nullptr) {
862 return;
863 }
864 auto conn = relConnPool->AcquireConnection(false);
865 if (conn == nullptr) {
866 relConnPool->Dump(true, "DATACHANGE");
867 auto pool = TaskExecutor::GetInstance().GetExecutor();
868 if (pool != nullptr && retry++ < MAX_RETRY_TIMES) {
869 pool->Schedule(std::chrono::seconds(1),
870 [delayNotifier, connPool, retry]() { RegisterDataChangeCallback(delayNotifier, connPool, retry); });
871 }
872 return;
873 }
874 auto callBack = [delayNotifier](const DistributedRdb::RdbChangedData &rdbChangedData) {
875 if (delayNotifier != nullptr) {
876 delayNotifier->UpdateNotify(rdbChangedData);
877 }
878 };
879 auto errCode = conn->SubscribeTableChanges(callBack);
880 if (errCode != E_OK) {
881 return;
882 }
883 }
884
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)885 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
886 {
887 std::string table = predicates.GetTableName();
888 if (table.empty()) {
889 return E_EMPTY_TABLE_NAME;
890 }
891 auto logTable = GetLogTableName(table);
892 std::string sql;
893 sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
894 sql.append(" INNER JOIN ").append(table).append(" ON ");
895 sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
896 auto whereClause = predicates.GetWhereClause();
897 if (!whereClause.empty()) {
898 SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + ".");
899 sql.append(" WHERE ").append(whereClause);
900 }
901
902 auto result = QuerySql(sql, predicates.GetBindArgs());
903 if (result == nullptr) {
904 return E_ALREADY_CLOSED ;
905 }
906 int count = 0;
907 if (result->GetRowCount(count) != E_OK) {
908 return E_NO_ROW_IN_QUERY;
909 }
910
911 if (count <= 0) {
912 return E_NO_ROW_IN_QUERY;
913 }
914 while (result->GoToNextRow() == E_OK) {
915 std::vector<uint8_t> hashKey;
916 if (result->GetBlob(0, hashKey) != E_OK) {
917 return E_ERROR;
918 }
919 hashKeys.push_back(std::move(hashKey));
920 }
921 return E_OK;
922 }
923
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)924 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
925 {
926 if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
927 return E_NOT_SUPPORT;
928 }
929 std::vector<std::vector<uint8_t>> hashKeys;
930 int ret = GetHashKeyForLockRow(predicates, hashKeys);
931 if (ret != E_OK) {
932 LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
933 return ret;
934 }
935 auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
936 if (statement == nullptr || err != E_OK) {
937 return err;
938 }
939 int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
940 if (errCode == E_WAIT_COMPENSATED_SYNC) {
941 LOG_DEBUG("Start compensation sync.");
942 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
943 auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
944 InnerSync(syncerParam_, option, memo, nullptr);
945 return E_OK;
946 }
947 if (errCode != E_OK) {
948 LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
949 }
950 return errCode;
951 }
952
LockCloudContainer()953 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
954 {
955 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
956 if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
957 return { E_NOT_SUPPORT, 0 };
958 }
959 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
960 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
961 if (errCode == E_NOT_SUPPORT) {
962 LOG_ERROR("not support");
963 return { errCode, 0 };
964 }
965 if (errCode != E_OK) {
966 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
967 syncerParam_.bundleName_.c_str());
968 return { errCode, 0 };
969 }
970 auto result = service->LockCloudContainer(syncerParam_);
971 if (result.first != E_OK) {
972 LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
973 }
974 return result;
975 }
976
UnlockCloudContainer()977 int32_t RdbStoreImpl::UnlockCloudContainer()
978 {
979 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
980 if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
981 return E_NOT_SUPPORT;
982 }
983 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
984 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
985 if (errCode == E_NOT_SUPPORT) {
986 LOG_ERROR("not support");
987 return errCode;
988 }
989 if (errCode != E_OK) {
990 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
991 syncerParam_.bundleName_.c_str());
992 return errCode;
993 }
994 errCode = service->UnlockCloudContainer(syncerParam_);
995 if (errCode != E_OK) {
996 LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
997 }
998 return errCode;
999 }
1000 #endif
1001
RdbStoreImpl(const RdbStoreConfig & config)1002 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
1003 : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
1004 fileType_(config.GetDatabaseFileType())
1005 {
1006 SqliteGlobalConfig::GetDbPath(config_, path_);
1007 isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
1008 }
1009
RdbStoreImpl(const RdbStoreConfig & config,int & errCode)1010 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config, int &errCode)
1011 : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
1012 fileType_(config.GetDatabaseFileType())
1013 {
1014 isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
1015 SqliteGlobalConfig::GetDbPath(config_, path_);
1016 bool created = access(path_.c_str(), F_OK) != 0;
1017 connectionPool_ = ConnectionPool::Create(config_, errCode);
1018 if (connectionPool_ == nullptr && (errCode == E_SQLITE_CORRUPT || errCode == E_INVALID_SECRET_KEY) &&
1019 !isReadOnly_) {
1020 LOG_ERROR("database corrupt, errCode:0x%{public}x, %{public}s, %{public}s", errCode,
1021 SqliteUtils::Anonymous(name_).c_str(),
1022 SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config_), "master").c_str());
1023 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1024 RdbParam param;
1025 param.bundleName_ = config_.GetBundleName();
1026 param.storeName_ = config_.GetName();
1027 param.subUser_ = config_.GetSubUser();
1028 auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
1029 if (service != nullptr) {
1030 service->Disable(param);
1031 }
1032 #endif
1033 config_.SetIter(0);
1034 if (config_.IsEncrypt() && config_.GetAllowRebuild()) {
1035 auto key = config_.GetEncryptKey();
1036 RdbSecurityManager::GetInstance().RestoreKeyFile(path_, key);
1037 key.assign(key.size(), 0);
1038 }
1039 std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
1040 created = true;
1041 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1042 if (service != nullptr) {
1043 service->Enable(param);
1044 }
1045 #endif
1046 }
1047 if (connectionPool_ == nullptr || errCode != E_OK) {
1048 connectionPool_ = nullptr;
1049 LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
1050 SqliteUtils::Anonymous(path_).c_str());
1051 return;
1052 }
1053 InitSyncerParam(config_, created);
1054 InitReportFunc(syncerParam_);
1055 InnerOpen();
1056 }
1057
~RdbStoreImpl()1058 RdbStoreImpl::~RdbStoreImpl()
1059 {
1060 connectionPool_ = nullptr;
1061 trxConnMap_ = {};
1062 for (auto &trans : transactions_) {
1063 auto realTrans = trans.lock();
1064 if (realTrans) {
1065 (void)realTrans->Close();
1066 }
1067 }
1068 transactions_ = {};
1069 }
1070
GetConfig()1071 const RdbStoreConfig &RdbStoreImpl::GetConfig()
1072 {
1073 return config_;
1074 }
1075
Insert(const std::string & table,const Row & row,Resolution resolution)1076 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
1077 {
1078 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1079 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1080 return { E_NOT_SUPPORT, -1 };
1081 }
1082 if (table.empty()) {
1083 return { E_EMPTY_TABLE_NAME, -1 };
1084 }
1085
1086 if (row.IsEmpty()) {
1087 return { E_EMPTY_VALUES_BUCKET, -1 };
1088 }
1089
1090 auto conflictClause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1091 if (conflictClause == nullptr) {
1092 return { E_INVALID_CONFLICT_FLAG, -1 };
1093 }
1094 RdbStatReporter reportStat(RDB_PERF, INSERT, config_, reportFunc_);
1095 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1096 std::string sql;
1097 sql.append("INSERT").append(conflictClause).append(" INTO ").append(table).append("(");
1098 size_t bindArgsSize = row.values_.size();
1099 std::vector<ValueObject> bindArgs;
1100 bindArgs.reserve(bindArgsSize);
1101 const char *split = "";
1102 for (const auto &[key, val] : row.values_) {
1103 sql.append(split).append(key);
1104 if (val.GetType() == ValueObject::TYPE_ASSETS && resolution == ConflictResolution::ON_CONFLICT_REPLACE) {
1105 return { E_INVALID_ARGS, -1 };
1106 }
1107 SqliteSqlBuilder::UpdateAssetStatus(val, AssetValue::STATUS_INSERT);
1108 bindArgs.push_back(val); // columnValue
1109 split = ",";
1110 }
1111
1112 sql.append(") VALUES (");
1113 if (bindArgsSize > 0) {
1114 sql.append(SqliteSqlBuilder::GetSqlArgs(bindArgsSize));
1115 }
1116
1117 sql.append(")");
1118 int64_t rowid = -1;
1119 auto errCode = ExecuteForLastInsertedRowId(rowid, sql, bindArgs);
1120 if (errCode == E_OK) {
1121 DoCloudSync(table);
1122 }
1123
1124 return { errCode, rowid };
1125 }
1126
BatchInsert(const std::string & table,const ValuesBuckets & rows)1127 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
1128 {
1129 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1130 return { E_NOT_SUPPORT, -1 };
1131 }
1132
1133 if (rows.RowSize() == 0) {
1134 return { E_OK, 0 };
1135 }
1136
1137 RdbStatReporter reportStat(RDB_PERF, BATCHINSERT, config_, reportFunc_);
1138 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1139 auto pool = GetPool();
1140 if (pool == nullptr) {
1141 return { E_ALREADY_CLOSED, -1 };
1142 }
1143 auto conn = pool->AcquireConnection(false);
1144 if (conn == nullptr) {
1145 return { E_DATABASE_BUSY, -1 };
1146 }
1147
1148 auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable());
1149 BatchInsertArgsDfx(static_cast<int>(executeSqlArgs.size()));
1150 if (executeSqlArgs.empty()) {
1151 LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(), rows.RowSize(),
1152 conn->GetMaxVariable());
1153 return { E_INVALID_ARGS, -1 };
1154 }
1155 PauseDelayNotify pauseDelayNotify(delayNotifier_);
1156 for (const auto &[sql, bindArgs] : executeSqlArgs) {
1157 auto [errCode, statement] = GetStatement(sql, conn);
1158 if (statement == nullptr) {
1159 LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1160 "app self can check the SQL", errCode, bindArgs.size(), table.c_str());
1161 return { E_OK, -1 };
1162 }
1163 for (const auto &args : bindArgs) {
1164 auto errCode = statement->Execute(args);
1165 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1166 pool->Dump(true, "BATCH");
1167 return { errCode, -1 };
1168 }
1169 if (errCode != E_OK) {
1170 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,app self can check the SQL",
1171 errCode, bindArgs.size(), table.c_str());
1172 return { E_OK, -1 };
1173 }
1174 }
1175 }
1176 conn = nullptr;
1177 DoCloudSync(table);
1178 return { E_OK, int64_t(rows.RowSize()) };
1179 }
1180
BatchInsertArgsDfx(int argsSize)1181 void RdbStoreImpl::BatchInsertArgsDfx(int argsSize)
1182 {
1183 if (argsSize > 1) {
1184 Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_BATCH_INSERT_ARGS_SIZE, config_.GetBundleName(),
1185 "BatchInsert executeSqlArgs size[ " + std::to_string(argsSize) + "]"));
1186 }
1187 }
1188
BatchInsertWithConflictResolution(const std::string & table,const ValuesBuckets & rows,Resolution resolution)1189 std::pair<int, int64_t> RdbStoreImpl::BatchInsertWithConflictResolution(
1190 const std::string &table, const ValuesBuckets &rows, Resolution resolution)
1191 {
1192 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1193 return { E_NOT_SUPPORT, -1 };
1194 }
1195
1196 if (rows.RowSize() == 0) {
1197 return { E_OK, 0 };
1198 }
1199
1200 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1201 auto pool = GetPool();
1202 if (pool == nullptr) {
1203 return { E_ALREADY_CLOSED, -1 };
1204 }
1205 auto conn = pool->AcquireConnection(false);
1206 if (conn == nullptr) {
1207 return { E_DATABASE_BUSY, -1 };
1208 }
1209
1210 auto sqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable(), resolution);
1211 // To ensure atomicity, execute SQL only once
1212 if (sqlArgs.size() != 1 || sqlArgs.front().second.size() != 1) {
1213 auto [fields, values] = rows.GetFieldsAndValues();
1214 LOG_ERROR("invalid args, table=%{public}s, rows:%{public}zu, fields:%{public}zu, max:%{public}d.",
1215 table.c_str(), rows.RowSize(), fields != nullptr ? fields->size() : 0, conn->GetMaxVariable());
1216 return { E_INVALID_ARGS, -1 };
1217 }
1218 auto &[sql, bindArgs] = sqlArgs.front();
1219 auto [errCode, statement] = GetStatement(sql, conn);
1220 if (statement == nullptr) {
1221 LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1222 "app self can check the SQL", errCode, bindArgs.size(), table.c_str());
1223 return { errCode, -1 };
1224 }
1225 PauseDelayNotify pauseDelayNotify(delayNotifier_);
1226 auto args = std::ref(bindArgs.front());
1227 errCode = statement->Execute(args);
1228 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1229 pool->Dump(true, "BATCH");
1230 return { errCode, -1 };
1231 }
1232 if (errCode != E_OK) {
1233 LOG_ERROR("failed,errCode:%{public}d,table:%{public}s,args:%{public}zu,resolution:%{public}d.", errCode,
1234 table.c_str(), args.get().size(), static_cast<int32_t>(resolution));
1235 return { errCode, errCode == E_SQLITE_CONSTRAINT ? int64_t(statement->Changes()) : -1 };
1236 }
1237 conn = nullptr;
1238 DoCloudSync(table);
1239 return { E_OK, int64_t(statement->Changes()) };
1240 }
1241
Update(const std::string & table,const Row & row,const std::string & where,const Values & args,Resolution resolution)1242 std::pair<int, int> RdbStoreImpl::Update(
1243 const std::string &table, const Row &row, const std::string &where, const Values &args, Resolution resolution)
1244 {
1245 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1246 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1247 return { E_NOT_SUPPORT, -1 };
1248 }
1249 if (table.empty()) {
1250 return { E_EMPTY_TABLE_NAME, -1 };
1251 }
1252
1253 if (row.IsEmpty()) {
1254 return { E_EMPTY_VALUES_BUCKET, -1 };
1255 }
1256
1257 auto clause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1258 if (clause == nullptr) {
1259 return { E_INVALID_CONFLICT_FLAG, -1 };
1260 }
1261 RdbStatReporter reportStat(RDB_PERF, UPDATE, config_, reportFunc_);
1262 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1263 std::string sql;
1264 sql.append("UPDATE").append(clause).append(" ").append(table).append(" SET ");
1265 std::vector<ValueObject> tmpBindArgs;
1266 size_t tmpBindSize = row.values_.size() + args.size();
1267 tmpBindArgs.reserve(tmpBindSize);
1268 const char *split = "";
1269 for (auto &[key, val] : row.values_) {
1270 sql.append(split);
1271 if (val.GetType() == ValueObject::TYPE_ASSETS) {
1272 sql.append(key).append("=merge_assets(").append(key).append(", ?)"); // columnName
1273 } else if (val.GetType() == ValueObject::TYPE_ASSET) {
1274 sql.append(key).append("=merge_asset(").append(key).append(", ?)"); // columnName
1275 } else {
1276 sql.append(key).append("=?"); // columnName
1277 }
1278 tmpBindArgs.push_back(val); // columnValue
1279 split = ",";
1280 }
1281
1282 if (!where.empty()) {
1283 sql.append(" WHERE ").append(where);
1284 }
1285
1286 tmpBindArgs.insert(tmpBindArgs.end(), args.begin(), args.end());
1287
1288 int64_t changes = 0;
1289 auto errCode = ExecuteForChangedRowCount(changes, sql, tmpBindArgs);
1290 if (errCode == E_OK) {
1291 DoCloudSync(table);
1292 }
1293 return { errCode, int32_t(changes) };
1294 }
1295
Delete(int & deletedRows,const std::string & table,const std::string & whereClause,const Values & args)1296 int RdbStoreImpl::Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args)
1297 {
1298 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1299 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1300 return E_NOT_SUPPORT;
1301 }
1302 if (table.empty()) {
1303 return E_EMPTY_TABLE_NAME;
1304 }
1305
1306 RdbStatReporter reportStat(RDB_PERF, DELETE, config_, reportFunc_);
1307 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1308 std::string sql;
1309 sql.append("DELETE FROM ").append(table);
1310 if (!whereClause.empty()) {
1311 sql.append(" WHERE ").append(whereClause);
1312 }
1313 int64_t changes = 0;
1314 auto errCode = ExecuteForChangedRowCount(changes, sql, args);
1315 if (errCode != E_OK) {
1316 return errCode;
1317 }
1318 deletedRows = changes;
1319 DoCloudSync(table);
1320 return E_OK;
1321 }
1322
QuerySql(const std::string & sql,const Values & bindArgs)1323 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1324 {
1325 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1326 if (config_.GetDBType() == DB_VECTOR) {
1327 return nullptr;
1328 }
1329 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1330 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1331 auto start = std::chrono::steady_clock::now();
1332 auto pool = GetPool();
1333 if (pool == nullptr) {
1334 LOG_ERROR("Database already closed.");
1335 return nullptr;
1336 }
1337 return std::make_shared<SqliteSharedResultSet>(start, pool->AcquireRef(true), sql, bindArgs, path_);
1338 #else
1339 (void)sql;
1340 (void)bindArgs;
1341 return nullptr;
1342 #endif
1343 }
1344
QueryByStep(const std::string & sql,const Values & args,bool preCount)1345 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args, bool preCount)
1346 {
1347 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1348 auto start = std::chrono::steady_clock::now();
1349 auto pool = GetPool();
1350 if (pool == nullptr) {
1351 LOG_ERROR("Database already closed.");
1352 return nullptr;
1353 }
1354 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1355 return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, preCount);
1356 #else
1357 return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, false);
1358 #endif
1359 }
1360
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1361 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1362 {
1363 if (config_.GetDBType() == DB_VECTOR) {
1364 return E_NOT_SUPPORT;
1365 }
1366 std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1367 return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1368 }
1369
WriteToCompareFile(const std::string & dbPath,const std::string & bundleName,const std::string & sql)1370 void WriteToCompareFile(const std::string &dbPath, const std::string &bundleName, const std::string &sql)
1371 {
1372 auto poolTask = TaskExecutor::GetInstance().GetExecutor();
1373 if (poolTask != nullptr) {
1374 poolTask->Execute([dbPath, bundleName, sql]() {
1375 std::string comparePath = dbPath + "-compare";
1376 if (SqliteUtils::CleanFileContent(comparePath)) {
1377 Reportor::ReportFault(
1378 RdbFaultEvent(FT_CURD, E_DFX_IS_NOT_EXIST, bundleName, "compare file is deleted"));
1379 }
1380 SqliteUtils::WriteSqlToFile(comparePath, sql);
1381 });
1382 }
1383 }
1384
HandleSchemaDDL(std::shared_ptr<Statement> statement,std::shared_ptr<ConnectionPool> pool,const std::string & sql,int32_t & errCode)1385 void RdbStoreImpl::HandleSchemaDDL(std::shared_ptr<Statement> statement,
1386 std::shared_ptr<ConnectionPool> pool, const std::string &sql, int32_t &errCode)
1387 {
1388 statement->Reset();
1389 statement->Prepare("PRAGMA schema_version");
1390 auto [err, version] = statement->ExecuteForValue();
1391 statement = nullptr;
1392 if (vSchema_ < static_cast<int64_t>(version)) {
1393 LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64
1394 "> app self can check the SQL.",
1395 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version));
1396 vSchema_ = version;
1397 errCode = pool->RestartConns();
1398 if (!isMemoryRdb_) {
1399 std::string dbPath = config_.GetPath();
1400 std::string bundleName = config_.GetBundleName();
1401 WriteToCompareFile(dbPath, bundleName, sql);
1402 }
1403 }
1404 }
1405
ExecuteSql(const std::string & sql,const Values & args)1406 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1407 {
1408 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1409 if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1410 return E_NOT_SUPPORT;
1411 }
1412 int ret = CheckAttach(sql);
1413 if (ret != E_OK) {
1414 return ret;
1415 }
1416 RdbStatReporter reportStat(RDB_PERF, EXECUTESQL, config_, reportFunc_);
1417 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1418 auto [errCode, statement] = BeginExecuteSql(sql);
1419 if (statement == nullptr) {
1420 return errCode;
1421 }
1422 auto pool = GetPool();
1423 if (pool == nullptr) {
1424 return E_ALREADY_CLOSED;
1425 }
1426 errCode = statement->Execute(args);
1427 if (errCode != E_OK) {
1428 LOG_ERROR("failed,error:0x%{public}x app self can check the SQL.", errCode);
1429 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1430 pool->Dump(true, "EXECUTE");
1431 }
1432 return errCode;
1433 }
1434 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1435 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1436 HandleSchemaDDL(statement, pool, sql, errCode);
1437 }
1438 statement = nullptr;
1439 if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1440 DoCloudSync("");
1441 }
1442 return errCode;
1443 }
1444
Execute(const std::string & sql,const Values & args,int64_t trxId)1445 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1446 {
1447 ValueObject object;
1448 if (isReadOnly_) {
1449 return { E_NOT_SUPPORT, object };
1450 }
1451
1452 RdbStatReporter reportStat(RDB_PERF, EXECUTE, config_, reportFunc_);
1453 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1454 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1455 if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1456 LOG_ERROR("Not support the sqlType: %{public}d, app self can check the SQL", sqlType);
1457 return { E_NOT_SUPPORT_THE_SQL, object };
1458 }
1459
1460 if (config_.IsVector() && trxId > 0) {
1461 return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1462 }
1463
1464 auto pool = GetPool();
1465 if (pool == nullptr) {
1466 return { E_ALREADY_CLOSED, object };
1467 }
1468 auto conn = pool->AcquireConnection(false);
1469 if (pool == nullptr) {
1470 return { E_DATABASE_BUSY, object };
1471 }
1472
1473 auto [errCode, statement] = GetStatement(sql, conn);
1474 if (errCode != E_OK) {
1475 return { errCode, object };
1476 }
1477
1478 errCode = statement->Execute(args);
1479 if (errCode != E_OK) {
1480 LOG_ERROR("failed,error:0x%{public}x app self can check the SQL.", errCode);
1481 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1482 pool->Dump(true, "EXECUTE");
1483 }
1484 return { errCode, object };
1485 }
1486
1487 if (config_.IsVector()) {
1488 return { errCode, object };
1489 }
1490
1491 return HandleDifferentSqlTypes(statement, sql, object, sqlType);
1492 }
1493
HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,const std::string & sql,const ValueObject & object,int sqlType)1494 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(
1495 std::shared_ptr<Statement> statement, const std::string &sql, const ValueObject &object, int sqlType)
1496 {
1497 int32_t errCode = E_OK;
1498 if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1499 int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1500 return { errCode, ValueObject(outValue) };
1501 }
1502
1503 if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1504 int outValue = statement->Changes();
1505 return { errCode, ValueObject(outValue) };
1506 }
1507
1508 if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1509 if (statement->GetColumnCount() == 1) {
1510 return statement->GetColumn(0);
1511 }
1512
1513 if (statement->GetColumnCount() > 1) {
1514 LOG_ERROR("Not support the sql:app self can check the SQL, column count more than 1");
1515 return { E_NOT_SUPPORT_THE_SQL, object };
1516 }
1517 }
1518
1519 auto pool = GetPool();
1520 if (pool == nullptr) {
1521 return { E_ALREADY_CLOSED, object };
1522 }
1523
1524 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1525 HandleSchemaDDL(statement, pool, sql, errCode);
1526 }
1527 return { errCode, object };
1528 }
1529
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1530 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1531 {
1532 if (config_.GetDBType() == DB_VECTOR) {
1533 return E_NOT_SUPPORT;
1534 }
1535 auto [errCode, statement] = BeginExecuteSql(sql);
1536 if (statement == nullptr) {
1537 return errCode;
1538 }
1539 auto [err, object] = statement->ExecuteForValue(args);
1540 if (err != E_OK) {
1541 LOG_ERROR("failed, app self can check the SQL, ERROR is %{public}d.", err);
1542 }
1543 outValue = object;
1544 return err;
1545 }
1546
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1547 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1548 {
1549 if (config_.GetDBType() == DB_VECTOR) {
1550 return E_NOT_SUPPORT;
1551 }
1552 auto [errCode, statement] = BeginExecuteSql(sql);
1553 if (statement == nullptr) {
1554 return errCode;
1555 }
1556 ValueObject object;
1557 std::tie(errCode, object) = statement->ExecuteForValue(args);
1558 if (errCode != E_OK) {
1559 LOG_ERROR("failed, app self can check the SQL, ERROR is %{public}d.", errCode);
1560 }
1561 outValue = static_cast<std::string>(object);
1562 return errCode;
1563 }
1564
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1565 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1566 {
1567 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1568 return E_NOT_SUPPORT;
1569 }
1570 auto begin = std::chrono::steady_clock::now();
1571 auto [errCode, statement] = GetStatement(sql, false);
1572 if (statement == nullptr) {
1573 return errCode;
1574 }
1575 auto beginExec = std::chrono::steady_clock::now();
1576 errCode = statement->Execute(args);
1577 if (errCode != E_OK) {
1578 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1579 auto pool = GetPool();
1580 if (pool != nullptr) {
1581 pool->Dump(true, "INSERT");
1582 }
1583 }
1584 return errCode;
1585 }
1586 auto beginResult = std::chrono::steady_clock::now();
1587 outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1588 auto allEnd = std::chrono::steady_clock::now();
1589 int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - begin).count();
1590 if (totalCostTime >= TIME_OUT) {
1591 int64_t prepareCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1592 int64_t execCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginResult - beginExec).count();
1593 int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1594 LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1595 "] result[%{public}" PRId64 "] "
1596 "sql[%{public}s]",
1597 totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::Anonymous(sql).c_str());
1598 }
1599 return E_OK;
1600 }
1601
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1602 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1603 {
1604 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1605 return E_NOT_SUPPORT;
1606 }
1607 auto [errCode, statement] = GetStatement(sql, false);
1608 if (statement == nullptr) {
1609 return errCode;
1610 }
1611 errCode = statement->Execute(args);
1612 if (errCode == E_OK) {
1613 outValue = statement->Changes();
1614 return E_OK;
1615 }
1616 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1617 auto pool = GetPool();
1618 if (pool != nullptr) {
1619 pool->Dump(true, "UPG DEL");
1620 }
1621 }
1622 return errCode;
1623 }
1624
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1625 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1626 {
1627 if (databasePath.empty()) {
1628 return E_INVALID_FILE_PATH;
1629 }
1630
1631 if (ISFILE(databasePath)) {
1632 backupFilePath = ExtractFilePath(path_) + databasePath;
1633 } else {
1634 // 2 represents two characters starting from the len - 2 position
1635 if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1636 databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1637 LOG_ERROR("Invalid databasePath.");
1638 return E_INVALID_FILE_PATH;
1639 }
1640 backupFilePath = databasePath;
1641 }
1642
1643 if (backupFilePath == path_) {
1644 LOG_ERROR("The backupPath and path should not be same.");
1645 return E_INVALID_FILE_PATH;
1646 }
1647
1648 LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1649 return E_OK;
1650 }
1651
GetSlaveName(const std::string & path,std::string & backupFilePath)1652 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1653 {
1654 std::string suffix(".db");
1655 std::string slaveSuffix("_slave.db");
1656 auto pos = path.find(suffix);
1657 if (pos == std::string::npos) {
1658 backupFilePath = path + slaveSuffix;
1659 } else {
1660 backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1661 }
1662 return E_OK;
1663 }
1664
1665 /**
1666 * Backup a database from a specified encrypted or unencrypted database file.
1667 */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey)1668 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey)
1669 {
1670 LOG_INFO("Backup db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
1671 if (isReadOnly_ || isMemoryRdb_) {
1672 return E_NOT_SUPPORT;
1673 }
1674 std::string backupFilePath;
1675 if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1676 return InnerBackup(backupFilePath, encryptKey);
1677 }
1678
1679 int ret = GetDataBasePath(databasePath, backupFilePath);
1680 if (ret != E_OK) {
1681 return ret;
1682 }
1683
1684 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1685 keyFiles.Lock();
1686
1687 auto walFile = backupFilePath + "-wal";
1688 if (access(walFile.c_str(), F_OK) == E_OK) {
1689 if (!SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1690 keyFiles.Unlock();
1691 return E_ERROR;
1692 }
1693 }
1694 std::string tempPath = backupFilePath + ".tmp";
1695 if (access(tempPath.c_str(), F_OK) == E_OK) {
1696 SqliteUtils::DeleteFile(backupFilePath);
1697 } else {
1698 if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1699 LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1700 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1701 keyFiles.Unlock();
1702 return E_ERROR;
1703 }
1704 }
1705 ret = InnerBackup(backupFilePath, encryptKey);
1706 if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1707 if (ret == E_DB_NOT_EXIST) {
1708 Reportor::ReportCorrupted(Reportor::Create(config_, ret, "ErrorType: BackupFailed"));
1709 }
1710 if (SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1711 SqliteUtils::RenameFile(tempPath, backupFilePath);
1712 }
1713 } else {
1714 SqliteUtils::DeleteFile(tempPath);
1715 }
1716 keyFiles.Unlock();
1717 return ret;
1718 }
1719
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1720 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(
1721 const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1722 {
1723 std::vector<ValueObject> bindArgs;
1724 bindArgs.emplace_back(databasePath);
1725 if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1726 bindArgs.emplace_back(destEncryptKey);
1727 } else if (config_.IsEncrypt()) {
1728 std::vector<uint8_t> key = config_.GetEncryptKey();
1729 bindArgs.emplace_back(key);
1730 key.assign(key.size(), 0);
1731 } else {
1732 bindArgs.emplace_back("");
1733 }
1734 return bindArgs;
1735 }
1736
1737 /**
1738 * Backup a database from a specified encrypted or unencrypted database file.
1739 */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1740 int RdbStoreImpl::InnerBackup(const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1741 {
1742 if (isReadOnly_) {
1743 return E_NOT_SUPPORT;
1744 }
1745
1746 if (config_.GetDBType() == DB_VECTOR) {
1747 auto [errCode, conn] = GetConn(false);
1748 return errCode != E_OK ? errCode : conn->Backup(databasePath, destEncryptKey, false, slaveStatus_);
1749 }
1750
1751 if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1752 auto [errCode, conn] = GetConn(false);
1753 return errCode != E_OK ? errCode : conn->Backup(databasePath, {}, false, slaveStatus_);
1754 }
1755
1756 auto [result, conn] = CreateWritableConn();
1757 if (result != E_OK) {
1758 return result;
1759 }
1760
1761 if (config_.IsEncrypt()) {
1762 result = SetDefaultEncryptAlgo(conn, config_);
1763 if (result != E_OK) {
1764 return result;
1765 }
1766 }
1767
1768 std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1769 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1770 errCode = statement->Execute(bindArgs);
1771 if (errCode != E_OK) {
1772 return errCode;
1773 }
1774 errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1775 errCode = statement->Execute();
1776 if (errCode != E_OK) {
1777 return errCode;
1778 }
1779 errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1780 int ret = statement->Execute();
1781 errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1782 int res = statement->Execute();
1783 return (res == E_OK) ? ret : res;
1784 }
1785
BeginExecuteSql(const std::string & sql)1786 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string &sql)
1787 {
1788 int type = SqliteUtils::GetSqlStatementType(sql);
1789 if (SqliteUtils::IsSpecial(type)) {
1790 return { E_NOT_SUPPORT, nullptr };
1791 }
1792
1793 bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1794 auto pool = GetPool();
1795 if (pool == nullptr) {
1796 return { E_ALREADY_CLOSED, nullptr };
1797 }
1798 auto conn = pool->AcquireConnection(assumeReadOnly);
1799 if (conn == nullptr) {
1800 return { E_DATABASE_BUSY, nullptr };
1801 }
1802
1803 auto [errCode, statement] = conn->CreateStatement(sql, conn);
1804 if (statement == nullptr) {
1805 return { errCode, nullptr };
1806 }
1807
1808 if (statement->ReadOnly() && conn->IsWriter()) {
1809 statement = nullptr;
1810 conn = nullptr;
1811 return GetStatement(sql, true);
1812 }
1813
1814 return { errCode, statement };
1815 }
1816
IsHoldingConnection()1817 bool RdbStoreImpl::IsHoldingConnection()
1818 {
1819 return GetPool() != nullptr;
1820 }
1821
SetDefaultEncryptSql(const std::shared_ptr<Statement> & statement,std::string sql,const RdbStoreConfig & config)1822 int RdbStoreImpl::SetDefaultEncryptSql(
1823 const std::shared_ptr<Statement> &statement, std::string sql, const RdbStoreConfig &config)
1824 {
1825 auto errCode = statement->Prepare(sql);
1826 if (errCode != E_OK) {
1827 LOG_ERROR("Prepare failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1828 SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1829 config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1830 config.GetCryptoParam().cryptoPageSize);
1831 return errCode;
1832 }
1833 errCode = statement->Execute();
1834 if (errCode != E_OK) {
1835 LOG_ERROR("Execute failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1836 SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1837 config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1838 config.GetCryptoParam().cryptoPageSize);
1839 return errCode;
1840 }
1841 return E_OK;
1842 }
1843
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1844 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1845 {
1846 if (conn == nullptr) {
1847 return E_DATABASE_BUSY;
1848 }
1849
1850 if (!config.GetCryptoParam().IsValid()) {
1851 LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config.GetName()).c_str());
1852 return E_INVALID_ARGS;
1853 }
1854
1855 std::string sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_CIPHER_PREFIX) +
1856 SqliteUtils::EncryptAlgoDescription(config.GetEncryptAlgo()) +
1857 std::string(GlobalExpr::ALGO_SUFFIX);
1858 auto [errCode, statement] = conn->CreateStatement(sql, conn);
1859 errCode = SetDefaultEncryptSql(statement, sql, config);
1860 if (errCode != E_OK) {
1861 return errCode;
1862 }
1863
1864 if (config.GetIter() > 0) {
1865 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ITER_PREFIX) + std::to_string(config.GetIter());
1866 errCode = SetDefaultEncryptSql(statement, sql, config);
1867 if (errCode != E_OK) {
1868 return errCode;
1869 }
1870 }
1871
1872 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO_PREFIX) +
1873 SqliteUtils::HmacAlgoDescription(config.GetCryptoParam().hmacAlgo) + std::string(GlobalExpr::ALGO_SUFFIX);
1874 errCode = SetDefaultEncryptSql(statement, sql, config);
1875 if (errCode != E_OK) {
1876 return errCode;
1877 }
1878
1879 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ALGO_PREFIX) +
1880 SqliteUtils::KdfAlgoDescription(config.GetCryptoParam().kdfAlgo) +
1881 std::string(GlobalExpr::ALGO_SUFFIX);
1882 errCode = SetDefaultEncryptSql(statement, sql, config);
1883 if (errCode != E_OK) {
1884 return errCode;
1885 }
1886
1887 sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_PAGE_SIZE_PREFIX) +
1888 std::to_string(config.GetCryptoParam().cryptoPageSize);
1889 return SetDefaultEncryptSql(statement, sql, config);
1890 }
1891
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1892 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1893 const std::vector<uint8_t> &key, int32_t waitTime)
1894 {
1895 auto pool = GetPool();
1896 if (pool == nullptr) {
1897 return E_ALREADY_CLOSED;
1898 }
1899 auto [conn, readers] = pool->AcquireAll(waitTime);
1900 if (conn == nullptr) {
1901 return E_DATABASE_BUSY;
1902 }
1903
1904 if (!isMemoryRdb_ && conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
1905 // close first to prevent the connection from being put back.
1906 pool->CloseAllConnections();
1907 conn = nullptr;
1908 readers.clear();
1909 auto [err, newConn] = pool->DisableWal();
1910 if (err != E_OK) {
1911 return err;
1912 }
1913 conn = newConn;
1914 }
1915 std::vector<ValueObject> bindArgs;
1916 bindArgs.emplace_back(ValueObject(dbPath));
1917 bindArgs.emplace_back(ValueObject(attachName));
1918 if (!key.empty()) {
1919 auto ret = SetDefaultEncryptAlgo(conn, config);
1920 if (ret != E_OK) {
1921 return ret;
1922 }
1923 bindArgs.emplace_back(ValueObject(key));
1924 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
1925 if (statement == nullptr || errCode != E_OK) {
1926 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1927 return E_ERROR;
1928 }
1929 return statement->Execute(bindArgs);
1930 }
1931
1932 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_SQL, conn);
1933 if (statement == nullptr || errCode != E_OK) {
1934 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1935 return errCode;
1936 }
1937 return statement->Execute(bindArgs);
1938 }
1939
1940 /**
1941 * Attaches a database.
1942 */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)1943 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
1944 const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
1945 {
1946 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE ||
1947 (config.IsMemoryRdb() && config.IsEncrypt())) {
1948 return { E_NOT_SUPPORT, 0 };
1949 }
1950 std::string dbPath;
1951 int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
1952 if (err != E_OK || (access(dbPath.c_str(), F_OK) != E_OK && !config.IsMemoryRdb())) {
1953 return { E_INVALID_FILE_PATH, 0 };
1954 }
1955
1956 // encrypted databases are not supported to attach a non encrypted database.
1957 if (!config.IsEncrypt() && config_.IsEncrypt()) {
1958 return { E_NOT_SUPPORT, 0 };
1959 }
1960
1961 if (attachedInfo_.Contains(attachName)) {
1962 return { E_ATTACHED_DATABASE_EXIST, 0 };
1963 }
1964
1965 std::vector<uint8_t> key;
1966 config.Initialize();
1967 if (config.IsEncrypt()) {
1968 key = config.GetEncryptKey();
1969 }
1970 err = AttachInner(config, attachName, dbPath, key, waitTime);
1971 key.assign(key.size(), 0);
1972 if (err == E_SQLITE_ERROR) {
1973 // only when attachName is already in use, SQLITE-ERROR will be reported here.
1974 return { E_ATTACHED_DATABASE_EXIST, 0 };
1975 } else if (err != E_OK) {
1976 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
1977 "[%{public}s]",
1978 err, SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str(),
1979 SqliteUtils::Anonymous(config.GetName()).c_str());
1980 return { err, 0 };
1981 }
1982 if (!attachedInfo_.Insert(attachName, dbPath)) {
1983 return { E_ATTACHED_DATABASE_EXIST, 0 };
1984 }
1985 return { E_OK, attachedInfo_.Size() };
1986 }
1987
Detach(const std::string & attachName,int32_t waitTime)1988 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
1989 {
1990 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1991 return { E_NOT_SUPPORT, 0 };
1992 }
1993 if (!attachedInfo_.Contains(attachName)) {
1994 return { E_OK, attachedInfo_.Size() };
1995 }
1996
1997 auto pool = GetPool();
1998 if (pool == nullptr) {
1999 return { E_ALREADY_CLOSED, 0 };
2000 }
2001 auto [connection, readers] = pool->AcquireAll(waitTime);
2002 if (connection == nullptr) {
2003 return { E_DATABASE_BUSY, 0 };
2004 }
2005 std::vector<ValueObject> bindArgs;
2006 bindArgs.push_back(ValueObject(attachName));
2007
2008 auto [errCode, statement] = connection->CreateStatement(GlobalExpr::DETACH_SQL, connection);
2009 if (statement == nullptr || errCode != E_OK) {
2010 LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
2011 return { errCode, 0 };
2012 }
2013 errCode = statement->Execute(bindArgs);
2014 if (errCode != E_OK) {
2015 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
2016 SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str());
2017 return { errCode, 0 };
2018 }
2019
2020 attachedInfo_.Erase(attachName);
2021 if (!attachedInfo_.Empty()) {
2022 return { E_OK, attachedInfo_.Size() };
2023 }
2024 statement = nullptr;
2025 if (!isMemoryRdb_ && connection->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
2026 // close first to prevent the connection from being put back.
2027 pool->CloseAllConnections();
2028 connection = nullptr;
2029 readers.clear();
2030 errCode = pool->EnableWal();
2031 }
2032 return { errCode, 0 };
2033 }
2034
2035 /**
2036 * Obtains the database version.
2037 */
GetVersion(int & version)2038 int RdbStoreImpl::GetVersion(int &version)
2039 {
2040 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
2041 if (statement == nullptr) {
2042 return errCode;
2043 }
2044 ValueObject value;
2045 std::tie(errCode, value) = statement->ExecuteForValue();
2046 auto val = std::get_if<int64_t>(&value.value);
2047 if (val != nullptr) {
2048 version = static_cast<int>(*val);
2049 }
2050 return errCode;
2051 }
2052
2053 /**
2054 * Sets the version of a new database.
2055 */
SetVersion(int version)2056 int RdbStoreImpl::SetVersion(int version)
2057 {
2058 if (isReadOnly_) {
2059 return E_NOT_SUPPORT;
2060 }
2061 std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
2062 auto [errCode, statement] = GetStatement(sql);
2063 if (statement == nullptr) {
2064 return errCode;
2065 }
2066 return statement->Execute();
2067 }
2068 /**
2069 * Begins a transaction in EXCLUSIVE mode.
2070 */
BeginTransaction()2071 int RdbStoreImpl::BeginTransaction()
2072 {
2073 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2074 auto pool = GetPool();
2075 if (pool == nullptr) {
2076 return E_ALREADY_CLOSED;
2077 }
2078 std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2079 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2080 return E_NOT_SUPPORT;
2081 }
2082 // size + 1 means the number of transactions in process
2083 RdbStatReporter reportStat(RDB_PERF, BEGINTRANSACTION, config_, reportFunc_);
2084 size_t transactionId = pool->GetTransactionStack().size() + 1;
2085 BaseTransaction transaction(pool->GetTransactionStack().size());
2086 auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
2087 if (statement == nullptr) {
2088 return errCode;
2089 }
2090 errCode = statement->Execute();
2091 if (errCode != E_OK) {
2092 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2093 pool->Dump(true, "BEGIN");
2094 }
2095 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2096 SqliteUtils::Anonymous(name_).c_str(), errCode);
2097 return errCode;
2098 }
2099 pool->SetInTransaction(true);
2100 pool->GetTransactionStack().push(transaction);
2101 // 1 means the number of transactions in process
2102 if (transactionId > 1) {
2103 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2104 SqliteUtils::Anonymous(name_).c_str(), errCode);
2105 }
2106
2107 return E_OK;
2108 }
2109
BeginTrans()2110 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
2111 {
2112 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2113 if (!config_.IsVector() || isReadOnly_) {
2114 return { E_NOT_SUPPORT, 0 };
2115 }
2116
2117 int64_t tmpTrxId = 0;
2118 auto pool = GetPool();
2119 if (pool == nullptr) {
2120 return { E_ALREADY_CLOSED, 0 };
2121 }
2122 auto [errCode, connection] = pool->CreateTransConn(false);
2123 if (connection == nullptr) {
2124 LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
2125 SqliteUtils::Anonymous(name_).c_str(), errCode);
2126 return { errCode, 0 };
2127 }
2128
2129 tmpTrxId = newTrxId_.fetch_add(1);
2130 trxConnMap_.Insert(tmpTrxId, connection);
2131 errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
2132 if (errCode != E_OK) {
2133 trxConnMap_.Erase(tmpTrxId);
2134 }
2135 return { errCode, tmpTrxId };
2136 }
2137
2138 /**
2139 * Begins a transaction in EXCLUSIVE mode.
2140 */
RollBack()2141 int RdbStoreImpl::RollBack()
2142 {
2143 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2144 auto pool = GetPool();
2145 if (pool == nullptr) {
2146 return E_ALREADY_CLOSED;
2147 }
2148 std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2149 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2150 return E_NOT_SUPPORT;
2151 }
2152 RdbStatReporter reportStat(RDB_PERF, ROLLBACK, config_, reportFunc_);
2153 size_t transactionId = pool->GetTransactionStack().size();
2154
2155 if (pool->GetTransactionStack().empty()) {
2156 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
2157 SqliteUtils::Anonymous(name_).c_str());
2158 return E_NO_TRANSACTION_IN_SESSION;
2159 }
2160 BaseTransaction transaction = pool->GetTransactionStack().top();
2161 pool->GetTransactionStack().pop();
2162 if (transaction.GetType() != TransType::ROLLBACK_SELF && !pool->GetTransactionStack().empty()) {
2163 pool->GetTransactionStack().top().SetChildFailure(true);
2164 }
2165 auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
2166 if (statement == nullptr) {
2167 if (errCode == E_DATABASE_BUSY) {
2168 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2169 }
2170 // size + 1 means the number of transactions in process
2171 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
2172 SqliteUtils::Anonymous(name_).c_str());
2173 return E_DATABASE_BUSY;
2174 }
2175 errCode = statement->Execute();
2176 if (errCode != E_OK) {
2177 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2178 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2179 }
2180 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2181 SqliteUtils::Anonymous(name_).c_str(), errCode);
2182 return errCode;
2183 }
2184 if (pool->GetTransactionStack().empty()) {
2185 pool->SetInTransaction(false);
2186 }
2187 // 1 means the number of transactions in process
2188 if (transactionId > 1) {
2189 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2190 SqliteUtils::Anonymous(name_).c_str(), errCode);
2191 }
2192 return E_OK;
2193 }
2194
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)2195 int RdbStoreImpl::ExecuteByTrxId(
2196 const std::string &sql, int64_t trxId, bool closeConnAfterExecute, const std::vector<ValueObject> &bindArgs)
2197 {
2198 if ((!config_.IsVector()) || isReadOnly_) {
2199 return E_NOT_SUPPORT;
2200 }
2201 if (trxId == 0) {
2202 return E_INVALID_ARGS;
2203 }
2204
2205 if (!trxConnMap_.Contains(trxId)) {
2206 LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
2207 return E_INVALID_ARGS;
2208 }
2209 auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
2210 auto result = trxConnMap_.Find(trxId);
2211 auto connection = result.second;
2212 if (connection == nullptr) {
2213 LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
2214 SqliteUtils::Anonymous(name_).c_str(), time);
2215 return E_ERROR;
2216 }
2217 auto [ret, statement] = GetStatement(sql, connection);
2218 if (ret != E_OK) {
2219 return ret;
2220 }
2221 ret = statement->Execute(bindArgs);
2222 if (ret != E_OK) {
2223 LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
2224 SqliteUtils::Anonymous(name_).c_str(), ret);
2225 trxConnMap_.Erase(trxId);
2226 return ret;
2227 }
2228 if (closeConnAfterExecute) {
2229 trxConnMap_.Erase(trxId);
2230 }
2231 return E_OK;
2232 }
2233
RollBack(int64_t trxId)2234 int RdbStoreImpl::RollBack(int64_t trxId)
2235 {
2236 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2237 return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
2238 }
2239
2240 /**
2241 * Begins a transaction in EXCLUSIVE mode.
2242 */
Commit()2243 int RdbStoreImpl::Commit()
2244 {
2245 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2246 auto pool = GetPool();
2247 if (pool == nullptr) {
2248 return E_ALREADY_CLOSED;
2249 }
2250 std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2251 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2252 return E_NOT_SUPPORT;
2253 }
2254 RdbStatReporter reportStat(RDB_PERF, COMMIT, config_, reportFunc_);
2255 size_t transactionId = pool->GetTransactionStack().size();
2256
2257 if (pool->GetTransactionStack().empty()) {
2258 return E_OK;
2259 }
2260 BaseTransaction transaction = pool->GetTransactionStack().top();
2261 std::string sqlStr = transaction.GetCommitStr();
2262 if (sqlStr.size() <= 1) {
2263 LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s", transactionId,
2264 SqliteUtils::Anonymous(name_).c_str(), sqlStr.c_str());
2265 pool->GetTransactionStack().pop();
2266 return E_OK;
2267 }
2268 auto [errCode, statement] = GetStatement(sqlStr);
2269 if (statement == nullptr) {
2270 if (errCode == E_DATABASE_BUSY) {
2271 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2272 }
2273 LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
2274 SqliteUtils::Anonymous(name_).c_str());
2275 return E_DATABASE_BUSY;
2276 }
2277 errCode = statement->Execute();
2278 if (errCode != E_OK) {
2279 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2280 Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2281 }
2282 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2283 SqliteUtils::Anonymous(name_).c_str(), errCode);
2284 return errCode;
2285 }
2286 pool->SetInTransaction(false);
2287 // 1 means the number of transactions in process
2288 if (transactionId > 1) {
2289 LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2290 SqliteUtils::Anonymous(name_).c_str(), errCode);
2291 }
2292 pool->GetTransactionStack().pop();
2293 return E_OK;
2294 }
2295
Commit(int64_t trxId)2296 int RdbStoreImpl::Commit(int64_t trxId)
2297 {
2298 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2299 return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
2300 }
2301
IsInTransaction()2302 bool RdbStoreImpl::IsInTransaction()
2303 {
2304 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2305 return false;
2306 }
2307 auto pool = GetPool();
2308 if (pool == nullptr) {
2309 return false;
2310 }
2311 return pool->IsInTransaction();
2312 }
2313
CheckAttach(const std::string & sql)2314 int RdbStoreImpl::CheckAttach(const std::string &sql)
2315 {
2316 size_t index = sql.find_first_not_of(' ');
2317 if (index == std::string::npos) {
2318 return E_OK;
2319 }
2320
2321 /* The first 3 characters can determine the type */
2322 std::string sqlType = sql.substr(index, 3);
2323 sqlType = SqliteUtils::StrToUpper(sqlType);
2324 if (sqlType != "ATT") {
2325 return E_OK;
2326 }
2327
2328 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
2329 if (statement == nullptr) {
2330 return errCode;
2331 }
2332
2333 errCode = statement->Execute();
2334 if (errCode != E_OK) {
2335 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2336 return errCode;
2337 }
2338 auto [errorCode, valueObject] = statement->GetColumn(0);
2339 if (errorCode != E_OK) {
2340 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2341 return errorCode;
2342 }
2343 auto journal = std::get_if<std::string>(&valueObject.value);
2344 auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2345 if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2346 LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2347 return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2348 }
2349
2350 return E_OK;
2351 }
2352
IsOpen() const2353 bool RdbStoreImpl::IsOpen() const
2354 {
2355 return isOpen_;
2356 }
2357
GetPath()2358 std::string RdbStoreImpl::GetPath()
2359 {
2360 return path_;
2361 }
2362
IsReadOnly() const2363 bool RdbStoreImpl::IsReadOnly() const
2364 {
2365 return isReadOnly_;
2366 }
2367
IsMemoryRdb() const2368 bool RdbStoreImpl::IsMemoryRdb() const
2369 {
2370 return isMemoryRdb_;
2371 }
2372
GetName()2373 std::string RdbStoreImpl::GetName()
2374 {
2375 return name_;
2376 }
2377
DoCloudSync(const std::string & table)2378 void RdbStoreImpl::DoCloudSync(const std::string &table)
2379 {
2380 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2381 auto needSync = cloudInfo_->Change(table);
2382 if (!needSync) {
2383 return;
2384 }
2385 auto pool = TaskExecutor::GetInstance().GetExecutor();
2386 if (pool == nullptr) {
2387 return;
2388 }
2389 auto interval =
2390 std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2391 pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2392 auto changeInfo = cloudInfo.lock();
2393 if (changeInfo == nullptr) {
2394 return;
2395 }
2396 auto tables = changeInfo->Steal();
2397 if (tables.empty()) {
2398 return;
2399 }
2400 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2401 auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2402 InnerSync(param, option, memo, nullptr);
2403 });
2404 #endif
2405 }
GetFileType()2406 std::string RdbStoreImpl::GetFileType()
2407 {
2408 return fileType_;
2409 }
2410
2411 /**
2412 * Sets the database locale.
2413 */
ConfigLocale(const std::string & localeStr)2414 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2415 {
2416 if (!isOpen_) {
2417 LOG_ERROR("The connection pool has been closed.");
2418 return E_ALREADY_CLOSED;
2419 }
2420
2421 auto pool = GetPool();
2422 if (pool == nullptr) {
2423 return E_ALREADY_CLOSED;
2424 }
2425 return pool->ConfigLocale(localeStr);
2426 }
2427
GetDestPath(const std::string & backupPath,std::string & destPath)2428 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2429 {
2430 int ret = GetDataBasePath(backupPath, destPath);
2431 if (ret != E_OK) {
2432 return ret;
2433 }
2434 std::string tempPath = destPath + ".tmp";
2435 if (access(tempPath.c_str(), F_OK) == E_OK) {
2436 destPath = tempPath;
2437 }
2438
2439 if (access(destPath.c_str(), F_OK) != E_OK) {
2440 LOG_ERROR("The backupFilePath does not exists.");
2441 return E_INVALID_FILE_PATH;
2442 }
2443 return E_OK;
2444 }
2445
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2446 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2447 {
2448 LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2449 if (isReadOnly_ || isMemoryRdb_) {
2450 return E_NOT_SUPPORT;
2451 }
2452
2453 auto pool = GetPool();
2454 if (pool == nullptr || !isOpen_) {
2455 LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, pool == nullptr);
2456 return E_ALREADY_CLOSED;
2457 }
2458
2459 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2460 keyFiles.Lock();
2461 std::string destPath;
2462 bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2463 if (!isOK) {
2464 int ret = GetDestPath(backupPath, destPath);
2465 if (ret != E_OK) {
2466 keyFiles.Unlock();
2467 return ret;
2468 }
2469 }
2470 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2471 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2472 if (service != nullptr) {
2473 service->Disable(syncerParam_);
2474 }
2475 #endif
2476 bool corrupt = Reportor::IsReportCorruptedFault(path_);
2477 int errCode = pool->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2478 keyFiles.Unlock();
2479 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2480 SecurityPolicy::SetSecurityLabel(config_);
2481 if (service != nullptr) {
2482 service->Enable(syncerParam_);
2483 if (errCode == E_OK) {
2484 auto syncerParam = syncerParam_;
2485 syncerParam.infos_ = Connection::Collect(config_);
2486 service->AfterOpen(syncerParam);
2487 NotifyDataChange();
2488 }
2489 }
2490 #endif
2491 if (errCode == E_OK) {
2492 ExchangeSlaverToMaster();
2493 Reportor::ReportRestore(Reportor::Create(config_, E_OK, "ErrorType::RdbStoreImpl::Restore", false), corrupt);
2494 rebuild_ = RebuiltType::NONE;
2495 }
2496 DoCloudSync("");
2497 return errCode;
2498 }
2499
CreateWritableConn()2500 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn()
2501 {
2502 auto config = config_;
2503 config.SetHaMode(HAMode::SINGLE);
2504 config.SetCreateNecessary(false);
2505 auto [result, conn] = Connection::Create(config, true);
2506 if (result != E_OK || conn == nullptr) {
2507 LOG_ERROR("create connection failed, err:%{public}d", result);
2508 return { result, nullptr };
2509 }
2510 return { E_OK, conn };
2511 }
2512
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2513 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2514 const std::string &sql, std::shared_ptr<Connection> conn) const
2515 {
2516 if (conn == nullptr) {
2517 return { E_DATABASE_BUSY, nullptr };
2518 }
2519 return conn->CreateStatement(sql, conn);
2520 }
2521
GetStatement(const std::string & sql,bool read) const2522 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string &sql, bool read) const
2523 {
2524 auto pool = GetPool();
2525 if (pool == nullptr) {
2526 return { E_ALREADY_CLOSED, nullptr };
2527 }
2528 auto conn = pool->AcquireConnection(read);
2529 if (conn == nullptr) {
2530 return { E_DATABASE_BUSY, nullptr };
2531 }
2532 return conn->CreateStatement(sql, conn);
2533 }
2534
GetRebuilt(RebuiltType & rebuilt)2535 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2536 {
2537 rebuilt = static_cast<RebuiltType>(rebuild_);
2538 return E_OK;
2539 }
2540
InterruptBackup()2541 int RdbStoreImpl::InterruptBackup()
2542 {
2543 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2544 return E_NOT_SUPPORT;
2545 }
2546 if (slaveStatus_ == SlaveStatus::BACKING_UP) {
2547 slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2548 return E_OK;
2549 }
2550 return E_CANCEL;
2551 }
2552
GetBackupStatus() const2553 int32_t RdbStoreImpl::GetBackupStatus() const
2554 {
2555 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2556 return SlaveStatus::UNDEFINED;
2557 }
2558 return slaveStatus_;
2559 }
2560
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2561 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2562 {
2563 if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2564 return false;
2565 }
2566 int ret = GetSlaveName(config_.GetPath(), destPath);
2567 if (ret != E_OK) {
2568 destPath = {};
2569 return false;
2570 }
2571 if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2572 LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2573 return false;
2574 }
2575 return true;
2576 }
2577
IsSlaveDiffFromMaster() const2578 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2579 {
2580 std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2581 return SqliteUtils::IsSlaveInvalid(config_.GetPath()) || (access(slaveDbPath.c_str(), F_OK) != 0);
2582 }
2583
ExchangeSlaverToMaster()2584 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2585 {
2586 if (isReadOnly_ || isMemoryRdb_ || rebuild_ != RebuiltType::NONE) {
2587 return E_OK;
2588 }
2589 auto [errCode, conn] = GetConn(false);
2590 if (errCode != E_OK) {
2591 return errCode;
2592 }
2593 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2594 if (strategy != ExchangeStrategy::NOT_HANDLE) {
2595 LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2596 }
2597 int ret = E_OK;
2598 if (strategy == ExchangeStrategy::RESTORE) {
2599 conn = nullptr;
2600 // disable is required before restore
2601 ret = Restore({}, {});
2602 } else if (strategy == ExchangeStrategy::BACKUP) {
2603 // async backup
2604 ret = conn->Backup({}, {}, true, slaveStatus_);
2605 }
2606 return ret;
2607 }
2608
GetDbType() const2609 int32_t RdbStoreImpl::GetDbType() const
2610 {
2611 return config_.GetDBType();
2612 }
2613
CreateTransaction(int32_t type)2614 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2615 {
2616 if (isReadOnly_) {
2617 return { E_NOT_SUPPORT, nullptr };
2618 }
2619
2620 auto pool = GetPool();
2621 if (pool == nullptr) {
2622 return { E_ALREADY_CLOSED, nullptr };
2623 }
2624 auto [errCode, conn] = pool->CreateTransConn();
2625 if (conn == nullptr) {
2626 return { errCode, nullptr };
2627 }
2628 std::shared_ptr<Transaction> trans;
2629 std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetName());
2630 if (trans == nullptr) {
2631 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2632 pool->Dump(true, "TRANS");
2633 }
2634 return { errCode, nullptr };
2635 }
2636
2637 std::lock_guard<decltype(mutex_)> guard(mutex_);
2638 for (auto it = transactions_.begin(); it != transactions_.end();) {
2639 if (it->expired()) {
2640 it = transactions_.erase(it);
2641 } else {
2642 it++;
2643 }
2644 }
2645 transactions_.push_back(trans);
2646 return { errCode, trans };
2647 }
2648
AddTables(const std::vector<std::string> & tables)2649 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2650 {
2651 std::lock_guard<std::mutex> lock(mutex_);
2652 for (auto &table : tables) {
2653 tables_.insert(table);
2654 }
2655 return E_OK;
2656 }
2657
RmvTables(const std::vector<std::string> & tables)2658 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2659 {
2660 std::lock_guard<std::mutex> lock(mutex_);
2661 for (auto &table : tables) {
2662 tables_.erase(table);
2663 }
2664 return E_OK;
2665 }
2666
Change(const std::string & table)2667 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2668 {
2669 bool needSync = false;
2670 {
2671 std::lock_guard<std::mutex> lock(mutex_);
2672 if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2673 return needSync;
2674 }
2675 // from empty, then need schedule the cloud sync, others only wait the schedule execute
2676 needSync = changes_.empty();
2677 if (!table.empty()) {
2678 changes_.insert(table);
2679 } else {
2680 changes_.insert(tables_.begin(), tables_.end());
2681 }
2682 }
2683 return needSync;
2684 }
2685
Steal()2686 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2687 {
2688 std::set<std::string> result;
2689 {
2690 std::lock_guard<std::mutex> lock(mutex_);
2691 result = std::move(changes_);
2692 }
2693 return result;
2694 }
2695 } // namespace OHOS::NativeRdb
2696