1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17
18 #include <algorithm>
19 #include <chrono>
20 #include <cinttypes>
21 #include <cstdint>
22 #include <memory>
23 #include <mutex>
24 #include <sstream>
25 #include <string>
26 #include <unistd.h>
27
28 #include "cache_result_set.h"
29 #include "directory_ex.h"
30 #include "logger.h"
31 #include "rdb_common.h"
32 #include "rdb_errno.h"
33 #include "rdb_fault_hiview_reporter.h"
34 #include "rdb_radar_reporter.h"
35 #include "rdb_security_manager.h"
36 #include "rdb_sql_statistic.h"
37 #include "rdb_store.h"
38 #include "rdb_trace.h"
39 #include "rdb_types.h"
40 #include "relational_store_client.h"
41 #include "sqlite_global_config.h"
42 #include "sqlite_sql_builder.h"
43 #include "sqlite_statement.h"
44 #include "sqlite_utils.h"
45 #include "step_result_set.h"
46 #include "values_buckets.h"
47 #include "task_executor.h"
48 #include "traits.h"
49 #include "transaction.h"
50 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
51 #include "delay_notify.h"
52 #include "raw_data_parser.h"
53 #include "rdb_device_manager_adapter.h"
54 #include "rdb_manager_impl.h"
55 #include "relational_store_manager.h"
56 #include "runtime_config.h"
57 #include "security_policy.h"
58 #include "sqlite_shared_result_set.h"
59 #endif
60
61 #ifdef WINDOWS_PLATFORM
62 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
63 #else
64 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
65 #endif
66
67 namespace OHOS::NativeRdb {
68 using namespace OHOS::Rdb;
69 using namespace std::chrono;
70 using SqlStatistic = DistributedRdb::SqlStatistic;
71 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
72 using Reportor = RdbFaultHiViewReporter;
73 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
74 using RdbMgr = DistributedRdb::RdbManagerImpl;
75 #endif
76
77 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
78 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
79 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
80 static constexpr const char *BACKUP_RESTORE = "backup.restore";
81 constexpr int64_t TIME_OUT = 1500;
82
InitSyncerParam(const RdbStoreConfig & config,bool created)83 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
84 {
85 syncerParam_.bundleName_ = config.GetBundleName();
86 syncerParam_.hapName_ = config.GetModuleName();
87 syncerParam_.storeName_ = config.GetName();
88 syncerParam_.customDir_ = config.GetCustomDir();
89 syncerParam_.area_ = config.GetArea();
90 syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
91 syncerParam_.type_ = config.GetDistributedType();
92 syncerParam_.isEncrypt_ = config.IsEncrypt();
93 syncerParam_.isAutoClean_ = config.GetAutoClean();
94 syncerParam_.isSearchable_ = config.IsSearchable();
95 syncerParam_.password_ = config.GetEncryptKey();
96 syncerParam_.haMode_ = config.GetHaMode();
97 syncerParam_.roleType_ = config.GetRoleType();
98 if (created) {
99 syncerParam_.infos_ = Connection::Collect(config);
100 }
101 }
102
InnerOpen()103 int RdbStoreImpl::InnerOpen()
104 {
105 isOpen_ = true;
106 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
107 if (isReadOnly_) {
108 return E_OK;
109 }
110
111 AfterOpen(syncerParam_);
112 int errCode = RegisterDataChangeCallback();
113 if (errCode != E_OK) {
114 LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
115 }
116 #endif
117 return E_OK;
118 }
119
120 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)121 void RdbStoreImpl::AfterOpen(const RdbParam ¶m, int32_t retry)
122 {
123 auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
124 if (err == E_NOT_SUPPORT) {
125 return;
126 }
127 if (err != E_OK || service == nullptr) {
128 LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
129 SqliteUtils::Anonymous(param.storeName_).c_str());
130 auto pool = TaskExecutor::GetInstance().GetExecutor();
131 if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry++ < MAX_RETRY_TIMES) {
132 pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
133 AfterOpen(param, retry);
134 });
135 }
136 return;
137 }
138 err = service->AfterOpen(param);
139 if (err != E_OK) {
140 LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
141 SqliteUtils::Anonymous(param.storeName_).c_str());
142 }
143 }
144
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)145 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(const std::string &table, const std::string &columnName,
146 std::vector<PRIKey> &keys)
147 {
148 if (table.empty() || columnName.empty() || keys.empty()) {
149 LOG_ERROR("invalid para.");
150 return {};
151 }
152
153 auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
154 if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
155 return GetModifyTimeByRowId(logTable, keys);
156 }
157 std::vector<ValueObject> hashKeys;
158 hashKeys.reserve(keys.size());
159 std::map<std::vector<uint8_t>, PRIKey> keyMap;
160 std::map<std::string, DistributedDB::Type> tmp;
161 for (const auto &key : keys) {
162 DistributedDB::Type value;
163 RawDataParser::Convert(key, value);
164 tmp[columnName] = value;
165 auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
166 if (hashKey.empty()) {
167 LOG_DEBUG("hash key fail");
168 continue;
169 }
170 hashKeys.emplace_back(ValueObject(hashKey));
171 keyMap[hashKey] = key;
172 }
173
174 std::string sql;
175 sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
176 sql.append(logTable);
177 sql.append(" where hash_key in (");
178 sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
179 sql.append(")");
180 auto resultSet = QueryByStep(sql, hashKeys);
181 int count = 0;
182 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
183 LOG_ERROR("get resultSet err.");
184 return {};
185 }
186 return { resultSet, keyMap, false };
187 }
188
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)189 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
190 {
191 std::string sql;
192 sql.append("select data_key as key, timestamp/10000 as modify_time from ");
193 sql.append(logTable);
194 sql.append(" where data_key in (");
195 sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
196 sql.append(")");
197 std::vector<ValueObject> args;
198 args.reserve(keys.size());
199 for (auto &key : keys) {
200 ValueObject::Type value;
201 RawDataParser::Convert(key, value);
202 args.emplace_back(ValueObject(value));
203 }
204 auto resultSet = QueryByStep(sql, args);
205 int count = 0;
206 if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
207 LOG_ERROR("get resultSet err.");
208 return {};
209 }
210 return ModifyTime(resultSet, {}, true);
211 }
212
CleanDirtyData(const std::string & table,uint64_t cursor)213 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
214 {
215 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
216 LOG_ERROR("not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d",
217 SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType());
218 return E_NOT_SUPPORT;
219 }
220 auto connection = connectionPool_->AcquireConnection(false);
221 if (connection == nullptr) {
222 LOG_ERROR("db is busy. table:%{public}s", SqliteUtils::Anonymous(table).c_str());
223 return E_DATABASE_BUSY;
224 }
225 int errCode = connection->CleanDirtyData(table, cursor);
226 return errCode;
227 }
228
GetLogTableName(const std::string & tableName)229 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
230 {
231 return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
232 }
233
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)234 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(const AbsRdbPredicates &predicates,
235 const Fields &columns)
236 {
237 if (config_.GetDBType() == DB_VECTOR) {
238 return { E_NOT_SUPPORT, nullptr };
239 }
240 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
241 if (errCode != E_OK) {
242 return { errCode, nullptr };
243 }
244 auto [status, resultSet] =
245 service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
246 if (status != E_OK) {
247 return { status, nullptr };
248 }
249 return { status, resultSet };
250 }
251
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)252 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(const std::string &device, const AbsRdbPredicates &predicates,
253 const Fields &columns, int &errCode)
254 {
255 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
256 if (config_.GetDBType() == DB_VECTOR) {
257 return nullptr;
258 }
259 std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
260 std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
261 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
262 if (err == E_NOT_SUPPORT) {
263 errCode = err;
264 return nullptr;
265 }
266 if (err != E_OK) {
267 LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed");
268 errCode = err;
269 return nullptr;
270 }
271 auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
272 errCode = status;
273 return resultSet;
274 }
275
NotifyDataChange()276 void RdbStoreImpl::NotifyDataChange()
277 {
278 int errCode = RegisterDataChangeCallback();
279 if (errCode != E_OK) {
280 LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
281 }
282 DistributedRdb::RdbChangedData rdbChangedData;
283 if (delayNotifier_ != nullptr) {
284 delayNotifier_->UpdateNotify(rdbChangedData, true);
285 }
286 }
287
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)288 int RdbStoreImpl::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
289 const DistributedRdb::DistributedConfig &distributedConfig)
290 {
291 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
292 if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
293 return E_NOT_SUPPORT;
294 }
295 if (tables.empty()) {
296 LOG_WARN("The distributed tables to be set is empty.");
297 return E_OK;
298 }
299 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
300 if (errCode != E_OK) {
301 return errCode;
302 }
303 int32_t errorCode = service->SetDistributedTables(syncerParam_, tables, distributedConfig.references,
304 distributedConfig.isRebuild, type);
305 if (errorCode != E_OK) {
306 LOG_ERROR("Fail to set distributed tables, error=%{public}d", errorCode);
307 return errorCode;
308 }
309 if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
310 return E_OK;
311 }
312 auto conn = connectionPool_->AcquireConnection(false);
313 if (conn != nullptr) {
314 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
315 if (strategy == ExchangeStrategy::BACKUP) {
316 (void)conn->Backup({}, {}, false, slaveStatus_);
317 }
318 }
319 {
320 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
321 if (distributedConfig.autoSync) {
322 cloudInfo_->AddTables(tables);
323 } else {
324 cloudInfo_->RmvTables(tables);
325 return E_OK;
326 }
327 }
328 auto isRebuilt = RebuiltType::NONE;
329 GetRebuilt(isRebuilt);
330 if (isRebuilt == RebuiltType::REBUILT) {
331 DoCloudSync("");
332 }
333 return E_OK;
334 }
335
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)336 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
337 {
338 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
339 if (config_.GetDBType() == DB_VECTOR) {
340 return "";
341 }
342 std::string uuid;
343 DeviceManagerAdaptor::RdbDeviceManagerAdaptor &deviceManager =
344 DeviceManagerAdaptor::RdbDeviceManagerAdaptor::GetInstance(syncerParam_.bundleName_);
345 errCode = deviceManager.GetEncryptedUuidByNetworkId(device, uuid);
346 if (errCode != E_OK) {
347 LOG_ERROR("GetUuid is failed.");
348 return "";
349 }
350
351 auto translateCall = [uuid](const std::string &oriDevId, const DistributedDB::StoreInfo &info) {
352 return uuid;
353 };
354 DistributedDB::RuntimeConfig::SetTranslateToDeviceIdCallback(translateCall);
355
356 return DistributedDB::RelationalStoreManager::GetDistributedTableName(uuid, table);
357 }
358
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)359 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
360 {
361 if (config_.GetDBType() == DB_VECTOR) {
362 return E_NOT_SUPPORT;
363 }
364 return Sync(option, predicate, [callback](Details &&details) {
365 Briefs briefs;
366 for (auto &[key, value] : details) {
367 briefs.insert_or_assign(key, value.code);
368 }
369 if (callback != nullptr) {
370 callback(briefs);
371 }
372 });
373 }
374
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)375 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
376 {
377 return Sync(option, AbsRdbPredicates(tables), async);
378 }
379
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)380 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
381 {
382 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
383 DistributedRdb::RdbService::Option rdbOption;
384 rdbOption.mode = option.mode;
385 rdbOption.isAsync = !option.isBlock;
386 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
387 ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
388 return ret;
389 }
390
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)391 int RdbStoreImpl::InnerSync(const RdbParam ¶m, const Options &option, const Memo &predicates,
392 const AsyncDetail &async)
393 {
394 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
395 if (errCode == E_NOT_SUPPORT) {
396 return errCode;
397 }
398 if (errCode != E_OK) {
399 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
400 param.bundleName_.c_str());
401 return errCode;
402 }
403 errCode = service->Sync(param, option, predicates, async);
404 if (errCode != E_OK) {
405 LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
406 return errCode;
407 }
408 return E_OK;
409 }
410
GetUri(const std::string & event)411 Uri RdbStoreImpl::GetUri(const std::string &event)
412 {
413 std::string rdbUri;
414 if (config_.GetDataGroupId().empty()) {
415 rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
416 } else {
417 rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
418 }
419 return Uri(rdbUri);
420 }
421
SubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)422 int RdbStoreImpl::SubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
423 {
424 std::lock_guard<std::mutex> lock(mutex_);
425 localObservers_.try_emplace(option.event);
426 auto &list = localObservers_.find(option.event)->second;
427 for (auto it = list.begin(); it != list.end(); it++) {
428 if ((*it)->getObserver() == observer) {
429 LOG_ERROR("duplicate subscribe.");
430 return E_OK;
431 }
432 }
433
434 localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
435 return E_OK;
436 }
437
SubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)438 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
439 {
440 std::lock_guard<std::mutex> lock(mutex_);
441 localSharedObservers_.try_emplace(option.event);
442 auto &list = localSharedObservers_.find(option.event)->second;
443 for (auto it = list.begin(); it != list.end(); it++) {
444 if ((*it)->getObserver() == observer) {
445 LOG_ERROR("duplicate subscribe.");
446 return E_OK;
447 }
448 }
449
450 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
451 if (client == nullptr) {
452 LOG_ERROR("Failed to get DataObsMgrClient.");
453 return E_GET_DATAOBSMGRCLIENT_FAIL;
454 }
455 sptr<RdbStoreLocalSharedObserver> localSharedObserver(new (std::nothrow) RdbStoreLocalSharedObserver(observer));
456 int32_t err = client->RegisterObserver(GetUri(option.event), localSharedObserver);
457 if (err != 0) {
458 LOG_ERROR("Subscribe failed.");
459 return err;
460 }
461 localSharedObservers_[option.event].push_back(std::move(localSharedObserver));
462 return E_OK;
463 }
464
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)465 int32_t RdbStoreImpl::SubscribeLocalDetail(const SubscribeOption &option,
466 const std::shared_ptr<RdbStoreObserver> &observer)
467 {
468 auto connection = connectionPool_->AcquireConnection(false);
469 if (connection == nullptr) {
470 return E_DATABASE_BUSY;
471 }
472 int32_t errCode = connection->Subscribe(option.event, observer);
473 if (errCode != E_OK) {
474 LOG_ERROR("subscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
475 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
476 }
477 return errCode;
478 }
479
SubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)480 int RdbStoreImpl::SubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
481 {
482 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
483 if (errCode != E_OK) {
484 return errCode;
485 }
486 return service->Subscribe(syncerParam_, option, observer);
487 }
488
Subscribe(const SubscribeOption & option,RdbStoreObserver * observer)489 int RdbStoreImpl::Subscribe(const SubscribeOption &option, RdbStoreObserver *observer)
490 {
491 if (config_.GetDBType() == DB_VECTOR) {
492 return E_NOT_SUPPORT;
493 }
494 if (option.mode == SubscribeMode::LOCAL) {
495 return SubscribeLocal(option, observer);
496 }
497 if (option.mode == SubscribeMode::LOCAL_SHARED) {
498 return SubscribeLocalShared(option, observer);
499 }
500 return SubscribeRemote(option, observer);
501 }
502
UnSubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)503 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
504 {
505 std::lock_guard<std::mutex> lock(mutex_);
506 auto obs = localObservers_.find(option.event);
507 if (obs == localObservers_.end()) {
508 return E_OK;
509 }
510
511 auto &list = obs->second;
512 for (auto it = list.begin(); it != list.end(); it++) {
513 if ((*it)->getObserver() == observer) {
514 it = list.erase(it);
515 break;
516 }
517 }
518
519 if (list.empty()) {
520 localObservers_.erase(option.event);
521 }
522 return E_OK;
523 }
524
UnSubscribeLocalAll(const SubscribeOption & option)525 int RdbStoreImpl::UnSubscribeLocalAll(const SubscribeOption& option)
526 {
527 std::lock_guard<std::mutex> lock(mutex_);
528 auto obs = localObservers_.find(option.event);
529 if (obs == localObservers_.end()) {
530 return E_OK;
531 }
532
533 localObservers_.erase(option.event);
534 return E_OK;
535 }
536
UnSubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)537 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
538 {
539 std::lock_guard<std::mutex> lock(mutex_);
540 auto obs = localSharedObservers_.find(option.event);
541 if (obs == localSharedObservers_.end()) {
542 return E_OK;
543 }
544
545 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
546 if (client == nullptr) {
547 LOG_ERROR("Failed to get DataObsMgrClient.");
548 return E_GET_DATAOBSMGRCLIENT_FAIL;
549 }
550
551 auto &list = obs->second;
552 for (auto it = list.begin(); it != list.end(); it++) {
553 if ((*it)->getObserver() == observer) {
554 int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
555 if (err != 0) {
556 LOG_ERROR("UnSubscribeLocalShared failed.");
557 return err;
558 }
559 list.erase(it);
560 break;
561 }
562 }
563 if (list.empty()) {
564 localSharedObservers_.erase(option.event);
565 }
566 return E_OK;
567 }
568
UnSubscribeLocalSharedAll(const SubscribeOption & option)569 int RdbStoreImpl::UnSubscribeLocalSharedAll(const SubscribeOption& option)
570 {
571 std::lock_guard<std::mutex> lock(mutex_);
572 auto obs = localSharedObservers_.find(option.event);
573 if (obs == localSharedObservers_.end()) {
574 return E_OK;
575 }
576
577 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
578 if (client == nullptr) {
579 LOG_ERROR("Failed to get DataObsMgrClient.");
580 return E_GET_DATAOBSMGRCLIENT_FAIL;
581 }
582
583 auto &list = obs->second;
584 auto it = list.begin();
585 while (it != list.end()) {
586 int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
587 if (err != 0) {
588 LOG_ERROR("UnSubscribe failed.");
589 return err;
590 }
591 it = list.erase(it);
592 }
593
594 localSharedObservers_.erase(option.event);
595 return E_OK;
596 }
597
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)598 int32_t RdbStoreImpl::UnsubscribeLocalDetail(const SubscribeOption& option,
599 const std::shared_ptr<RdbStoreObserver> &observer)
600 {
601 auto connection = connectionPool_->AcquireConnection(false);
602 if (connection == nullptr) {
603 return E_DATABASE_BUSY;
604 }
605 int32_t errCode = connection->Unsubscribe(option.event, observer);
606 if (errCode != E_OK) {
607 LOG_ERROR("unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
608 SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
609 }
610 return errCode;
611 }
612
UnSubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)613 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
614 {
615 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
616 if (errCode != E_OK) {
617 return errCode;
618 }
619 return service->UnSubscribe(syncerParam_, option, observer);
620 }
621
UnSubscribe(const SubscribeOption & option,RdbStoreObserver * observer)622 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer)
623 {
624 if (config_.GetDBType() == DB_VECTOR) {
625 return E_NOT_SUPPORT;
626 }
627 if (option.mode == SubscribeMode::LOCAL && observer) {
628 return UnSubscribeLocal(option, observer);
629 } else if (option.mode == SubscribeMode::LOCAL && !observer) {
630 return UnSubscribeLocalAll(option);
631 } else if (option.mode == SubscribeMode::LOCAL_SHARED && observer) {
632 return UnSubscribeLocalShared(option, observer);
633 } else if (option.mode == SubscribeMode::LOCAL_SHARED && !observer) {
634 return UnSubscribeLocalSharedAll(option);
635 }
636 return UnSubscribeRemote(option, observer);
637 }
638
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)639 int RdbStoreImpl::SubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
640 {
641 if (config_.GetDBType() == DB_VECTOR) {
642 return E_NOT_SUPPORT;
643 }
644 return SubscribeLocalDetail(option, observer);
645 }
646
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)647 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
648 {
649 if (config_.GetDBType() == DB_VECTOR) {
650 return E_NOT_SUPPORT;
651 }
652 return UnsubscribeLocalDetail(option, observer);
653 }
654
Notify(const std::string & event)655 int RdbStoreImpl::Notify(const std::string &event)
656 {
657 if (config_.GetDBType() == DB_VECTOR) {
658 return E_NOT_SUPPORT;
659 }
660 auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
661 if (client == nullptr) {
662 LOG_ERROR("Failed to get DataObsMgrClient.");
663 return E_GET_DATAOBSMGRCLIENT_FAIL;
664 }
665 int32_t err = client->NotifyChange(GetUri(event));
666 if (err != 0) {
667 LOG_ERROR("Notify failed.");
668 }
669
670 std::lock_guard<std::mutex> lock(mutex_);
671 auto obs = localObservers_.find(event);
672 if (obs != localObservers_.end()) {
673 auto &list = obs->second;
674 for (auto &it : list) {
675 it->OnChange();
676 }
677 }
678 return E_OK;
679 }
680
SetSearchable(bool isSearchable)681 int RdbStoreImpl::SetSearchable(bool isSearchable)
682 {
683 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
684 if (errCode != E_OK || service == nullptr) {
685 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
686 return errCode;
687 }
688 return service->SetSearchable(syncerParam_, isSearchable);
689 }
690
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)691 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
692 {
693 if (config_.GetDBType() == DB_VECTOR) {
694 return E_NOT_SUPPORT;
695 }
696 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
697 if (errCode != E_OK) {
698 return errCode;
699 }
700 return service->RegisterAutoSyncCallback(syncerParam_, observer);
701 }
702
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)703 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
704 {
705 if (config_.GetDBType() == DB_VECTOR) {
706 return E_NOT_SUPPORT;
707 }
708 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
709 if (errCode != E_OK) {
710 return errCode;
711 }
712 return service->UnregisterAutoSyncCallback(syncerParam_, observer);
713 }
714
InitDelayNotifier()715 void RdbStoreImpl::InitDelayNotifier()
716 {
717 if (delayNotifier_ != nullptr) {
718 return;
719 }
720 delayNotifier_ = std::make_shared<DelayNotify>();
721 if (delayNotifier_ == nullptr) {
722 LOG_ERROR("Init delay notifier failed.");
723 return;
724 }
725 delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
726 delayNotifier_->SetTask([param = syncerParam_]
727 (const DistributedRdb::RdbChangedData& rdbChangedData, const RdbNotifyConfig& rdbNotifyConfig) -> int {
728 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
729 if (errCode == E_NOT_SUPPORT) {
730 return errCode;
731 }
732 if (errCode != E_OK || service == nullptr) {
733 LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
734 return errCode;
735 }
736 return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
737 });
738 }
739
RegisterDataChangeCallback()740 int RdbStoreImpl::RegisterDataChangeCallback()
741 {
742 if (!config_.IsSearchable()) {
743 return E_OK;
744 }
745
746 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
747 return E_NOT_SUPPORT;
748 }
749 InitDelayNotifier();
750 auto callBack = [delayNotifier = delayNotifier_](const std::set<std::string> &tables) {
751 DistributedRdb::RdbChangedData rdbChangedData;
752 for (const auto& table : tables) {
753 rdbChangedData.tableData[table].isTrackedDataChange = true;
754 }
755 if (delayNotifier != nullptr) {
756 delayNotifier->UpdateNotify(rdbChangedData);
757 }
758 };
759 auto connection = connectionPool_->AcquireConnection(false);
760 if (connection == nullptr) {
761 return E_DATABASE_BUSY;
762 }
763 return connection->SubscribeTableChanges(callBack);
764 }
765
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)766 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
767 {
768 std::string table = predicates.GetTableName();
769 if (table.empty()) {
770 return E_EMPTY_TABLE_NAME;
771 }
772 auto logTable = GetLogTableName(table);
773 std::string sql;
774 sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
775 sql.append(" INNER JOIN ").append(table).append(" ON ");
776 sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
777 auto whereClause = predicates.GetWhereClause();
778 if (!whereClause.empty()) {
779 SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + ".");
780 sql.append(" WHERE ").append(whereClause);
781 }
782
783 auto result = QuerySql(sql, predicates.GetBindArgs());
784 if (result == nullptr) {
785 return E_ERROR;
786 }
787 int count = 0;
788 if (result->GetRowCount(count) != E_OK) {
789 return E_NO_ROW_IN_QUERY;
790 }
791
792 if (count <= 0) {
793 return E_NO_ROW_IN_QUERY;
794 }
795 while (result->GoToNextRow() == E_OK) {
796 std::vector<uint8_t> hashKey;
797 if (result->GetBlob(0, hashKey) != E_OK) {
798 return E_ERROR;
799 }
800 hashKeys.push_back(std::move(hashKey));
801 }
802 return E_OK;
803 }
804
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)805 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
806 {
807 std::vector<std::vector<uint8_t>> hashKeys;
808 int ret = GetHashKeyForLockRow(predicates, hashKeys);
809 if (ret != E_OK) {
810 LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
811 return ret;
812 }
813 auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
814 if (statement == nullptr || err != E_OK) {
815 return err;
816 }
817 int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
818 if (errCode == E_WAIT_COMPENSATED_SYNC) {
819 LOG_DEBUG("Start compensation sync.");
820 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
821 auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
822 InnerSync(syncerParam_, option, memo, nullptr);
823 return E_OK;
824 }
825 if (errCode != E_OK) {
826 LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
827 }
828 return errCode;
829 }
830
LockCloudContainer()831 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
832 {
833 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
834 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
835 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
836 if (errCode == E_NOT_SUPPORT) {
837 LOG_ERROR("not support");
838 return { errCode, 0 };
839 }
840 if (errCode != E_OK) {
841 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
842 syncerParam_.bundleName_.c_str());
843 return { errCode, 0 };
844 }
845 auto result = service->LockCloudContainer(syncerParam_);
846 if (result.first != E_OK) {
847 LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
848 }
849 return result;
850 }
851
UnlockCloudContainer()852 int32_t RdbStoreImpl::UnlockCloudContainer()
853 {
854 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
855 RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
856 auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
857 if (errCode == E_NOT_SUPPORT) {
858 LOG_ERROR("not support");
859 return errCode;
860 }
861 if (errCode != E_OK) {
862 LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
863 syncerParam_.bundleName_.c_str());
864 return errCode;
865 }
866 errCode = service->UnlockCloudContainer(syncerParam_);
867 if (errCode != E_OK) {
868 LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
869 }
870 return errCode;
871 }
872 #endif
873
RdbStoreImpl(const RdbStoreConfig & config)874 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
875 : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
876 fileType_(config.GetDatabaseFileType())
877 {
878 path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
879 isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
880 }
881
RdbStoreImpl(const RdbStoreConfig & config,int & errCode)882 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config, int &errCode)
883 : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
884 fileType_(config.GetDatabaseFileType())
885 {
886 isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
887 path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
888 bool created = access(path_.c_str(), F_OK) != 0;
889 connectionPool_ = ConnectionPool::Create(config_, errCode);
890 if (connectionPool_ == nullptr && errCode == E_SQLITE_CORRUPT && config.GetAllowRebuild() && !isReadOnly_) {
891 LOG_ERROR("database corrupt, rebuild database %{public}s", SqliteUtils::Anonymous(name_).c_str());
892 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
893 RdbParam param;
894 param.bundleName_ = config_.GetBundleName();
895 param.storeName_ = config_.GetName();
896 auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
897 if (service != nullptr) {
898 service->Disable(param);
899 }
900 #endif
901 config_.SetIter(0);
902 std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
903 created = true;
904 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
905 if (service != nullptr) {
906 service->Enable(param);
907 }
908 #endif
909 }
910 if (connectionPool_ == nullptr || errCode != E_OK) {
911 connectionPool_ = nullptr;
912 LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
913 SqliteUtils::Anonymous(path_).c_str());
914 return;
915 }
916 InitSyncerParam(config_, created);
917 InnerOpen();
918 }
919
~RdbStoreImpl()920 RdbStoreImpl::~RdbStoreImpl()
921 {
922 connectionPool_ = nullptr;
923 trxConnMap_ = {};
924 for (auto &trans : transactions_) {
925 auto realTrans = trans.lock();
926 if (realTrans) {
927 (void)realTrans->Close();
928 }
929 }
930 transactions_ = {};
931 }
932
GetConfig()933 const RdbStoreConfig &RdbStoreImpl::GetConfig()
934 {
935 return config_;
936 }
937
Insert(const std::string & table,const Row & row,Resolution resolution)938 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
939 {
940 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
941 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
942 return { E_NOT_SUPPORT, -1 };
943 }
944 if (table.empty()) {
945 return { E_EMPTY_TABLE_NAME, -1 };
946 }
947
948 if (row.IsEmpty()) {
949 return { E_EMPTY_VALUES_BUCKET, -1 };
950 }
951
952 auto conflictClause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
953 if (conflictClause == nullptr) {
954 return { E_INVALID_CONFLICT_FLAG, -1 };
955 }
956 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
957 std::string sql;
958 sql.append("INSERT").append(conflictClause).append(" INTO ").append(table).append("(");
959 size_t bindArgsSize = row.values_.size();
960 std::vector<ValueObject> bindArgs;
961 bindArgs.reserve(bindArgsSize);
962 const char *split = "";
963 for (const auto &[key, val] : row.values_) {
964 sql.append(split).append(key);
965 if (val.GetType() == ValueObject::TYPE_ASSETS && resolution == ConflictResolution::ON_CONFLICT_REPLACE) {
966 return { E_INVALID_ARGS, -1 };
967 }
968 SqliteSqlBuilder::UpdateAssetStatus(val, AssetValue::STATUS_INSERT);
969 bindArgs.push_back(val); // columnValue
970 split = ",";
971 }
972
973 sql.append(") VALUES (");
974 if (bindArgsSize > 0) {
975 sql.append(SqliteSqlBuilder::GetSqlArgs(bindArgsSize));
976 }
977
978 sql.append(")");
979 int64_t rowid = -1;
980 auto errCode = ExecuteForLastInsertedRowId(rowid, sql, bindArgs);
981 if (errCode == E_OK) {
982 DoCloudSync(table);
983 }
984
985 return { errCode, rowid };
986 }
987
BatchInsert(const std::string & table,const ValuesBuckets & rows)988 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
989 {
990 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
991 return { E_NOT_SUPPORT, -1 };
992 }
993
994 if (rows.RowSize() == 0) {
995 return { E_OK, 0 };
996 }
997
998 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
999 auto connection = connectionPool_->AcquireConnection(false);
1000 if (connection == nullptr) {
1001 return { E_DATABASE_BUSY, -1 };
1002 }
1003
1004 auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, connection->GetMaxVariable());
1005 if (executeSqlArgs.empty()) {
1006 LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(), rows.RowSize(),
1007 connection->GetMaxVariable());
1008 return { E_INVALID_ARGS, -1 };
1009 }
1010 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1011 PauseDelayNotify pauseDelayNotify(delayNotifier_);
1012 #endif
1013 for (const auto &[sql, bindArgs] : executeSqlArgs) {
1014 auto [errCode, statement] = GetStatement(sql, connection);
1015 if (statement == nullptr) {
1016 LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, sql:%{public}s",
1017 errCode, bindArgs.size(), table.c_str(), sql.c_str());
1018 return { E_OK, -1 };
1019 }
1020 for (const auto &args : bindArgs) {
1021 auto errCode = statement->Execute(args);
1022 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1023 connectionPool_->Dump(true, "BATCH");
1024 return { errCode, -1 };
1025 }
1026 if (errCode != E_OK) {
1027 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,sql:%{public}s", errCode,
1028 bindArgs.size(), table.c_str(), sql.c_str());
1029 return { E_OK, -1 };
1030 }
1031 }
1032 }
1033 connection = nullptr;
1034 DoCloudSync(table);
1035 return { E_OK, int64_t(rows.RowSize()) };
1036 }
1037
Update(const std::string & table,const Row & row,const std::string & where,const Values & args,Resolution resolution)1038 std::pair<int, int> RdbStoreImpl::Update(const std::string &table, const Row &row, const std::string &where,
1039 const Values &args, Resolution resolution)
1040 {
1041 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1042 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1043 return { E_NOT_SUPPORT, -1 };
1044 }
1045 if (table.empty()) {
1046 return { E_EMPTY_TABLE_NAME, -1 };
1047 }
1048
1049 if (row.IsEmpty()) {
1050 return { E_EMPTY_VALUES_BUCKET, -1 };
1051 }
1052
1053 auto clause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1054 if (clause == nullptr) {
1055 return { E_INVALID_CONFLICT_FLAG, -1 };
1056 }
1057 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1058 std::string sql;
1059 sql.append("UPDATE").append(clause).append(" ").append(table).append(" SET ");
1060 std::vector<ValueObject> tmpBindArgs;
1061 size_t tmpBindSize = row.values_.size() + args.size();
1062 tmpBindArgs.reserve(tmpBindSize);
1063 const char *split = "";
1064 for (auto &[key, val] : row.values_) {
1065 sql.append(split);
1066 if (val.GetType() == ValueObject::TYPE_ASSETS) {
1067 sql.append(key).append("=merge_assets(").append(key).append(", ?)"); // columnName
1068 } else if (val.GetType() == ValueObject::TYPE_ASSET) {
1069 sql.append(key).append("=merge_asset(").append(key).append(", ?)"); // columnName
1070 } else {
1071 sql.append(key).append("=?"); // columnName
1072 }
1073 tmpBindArgs.push_back(val); // columnValue
1074 split = ",";
1075 }
1076
1077 if (!where.empty()) {
1078 sql.append(" WHERE ").append(where);
1079 }
1080
1081 tmpBindArgs.insert(tmpBindArgs.end(), args.begin(), args.end());
1082
1083 int64_t changes = 0;
1084 auto errCode = ExecuteForChangedRowCount(changes, sql, tmpBindArgs);
1085 if (errCode == E_OK) {
1086 DoCloudSync(table);
1087 }
1088 return { errCode, int32_t(changes) };
1089 }
1090
Delete(int & deletedRows,const std::string & table,const std::string & whereClause,const Values & args)1091 int RdbStoreImpl::Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args)
1092 {
1093 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1094 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1095 return E_NOT_SUPPORT;
1096 }
1097 if (table.empty()) {
1098 return E_EMPTY_TABLE_NAME;
1099 }
1100
1101 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1102 std::string sql;
1103 sql.append("DELETE FROM ").append(table);
1104 if (!whereClause.empty()) {
1105 sql.append(" WHERE ").append(whereClause);
1106 }
1107 int64_t changes = 0;
1108 auto errCode = ExecuteForChangedRowCount(changes, sql, args);
1109 if (errCode != E_OK) {
1110 return errCode;
1111 }
1112 deletedRows = changes;
1113 DoCloudSync(table);
1114 return E_OK;
1115 }
1116
QuerySql(const std::string & sql,const Values & bindArgs)1117 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1118 {
1119 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1120 if (config_.GetDBType() == DB_VECTOR) {
1121 return nullptr;
1122 }
1123 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1124 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1125 auto start = std::chrono::steady_clock::now();
1126 return std::make_shared<SqliteSharedResultSet>(start, connectionPool_->AcquireRef(true), sql, bindArgs, path_);
1127 #else
1128 (void)sql;
1129 (void)bindArgs;
1130 return nullptr;
1131 #endif
1132 }
1133
QueryByStep(const std::string & sql,const Values & args)1134 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args)
1135 {
1136 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1137 auto start = std::chrono::steady_clock::now();
1138 return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args);
1139 }
1140
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1141 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1142 {
1143 if (config_.GetDBType() == DB_VECTOR) {
1144 return E_NOT_SUPPORT;
1145 }
1146 std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1147 return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1148 }
1149
ExecuteSql(const std::string & sql,const Values & args)1150 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1151 {
1152 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1153 if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1154 return E_NOT_SUPPORT;
1155 }
1156 int ret = CheckAttach(sql);
1157 if (ret != E_OK) {
1158 return ret;
1159 }
1160 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1161 auto [errCode, statement] = BeginExecuteSql(sql);
1162 if (statement == nullptr) {
1163 return errCode;
1164 }
1165 errCode = statement->Execute(args);
1166 if (errCode != E_OK) {
1167 LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1168 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1169 connectionPool_->Dump(true, "EXECUTE");
1170 }
1171 return errCode;
1172 }
1173 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1174 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1175 statement->Reset();
1176 statement->Prepare("PRAGMA schema_version");
1177 auto [err, version] = statement->ExecuteForValue();
1178 statement = nullptr;
1179 if (vSchema_ < static_cast<int64_t>(version)) {
1180 LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1181 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1182 vSchema_ = version;
1183 errCode = connectionPool_->RestartReaders();
1184 }
1185 }
1186 statement = nullptr;
1187 if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1188 DoCloudSync("");
1189 }
1190 return errCode;
1191 }
1192
Execute(const std::string & sql,const Values & args,int64_t trxId)1193 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1194 {
1195 ValueObject object;
1196 if (isReadOnly_) {
1197 return { E_NOT_SUPPORT, object };
1198 }
1199
1200 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1201 int sqlType = SqliteUtils::GetSqlStatementType(sql);
1202 if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1203 LOG_ERROR("Not support the sqlType: %{public}d, sql: %{public}s", sqlType, sql.c_str());
1204 return { E_NOT_SUPPORT_THE_SQL, object };
1205 }
1206
1207 if (config_.IsVector() && trxId > 0) {
1208 return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1209 }
1210
1211 auto connect = connectionPool_->AcquireConnection(false);
1212 if (connect == nullptr) {
1213 return { E_DATABASE_BUSY, object };
1214 }
1215
1216 auto [errCode, statement] = GetStatement(sql, connect);
1217 if (errCode != E_OK) {
1218 return { errCode, object };
1219 }
1220
1221 errCode = statement->Execute(args);
1222 if (errCode != E_OK) {
1223 LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1224 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1225 connectionPool_->Dump(true, "EXECUTE");
1226 }
1227 return { errCode, object };
1228 }
1229
1230 if (config_.IsVector()) {
1231 return { errCode, object };
1232 }
1233
1234 return HandleDifferentSqlTypes(statement, sql, object, sqlType);
1235 }
1236
HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,const std::string & sql,const ValueObject & object,int sqlType)1237 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,
1238 const std::string &sql, const ValueObject &object, int sqlType)
1239 {
1240 int32_t errCode = E_OK;
1241 if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1242 int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1243 return { errCode, ValueObject(outValue) };
1244 }
1245
1246 if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1247 int outValue = statement->Changes();
1248 return { errCode, ValueObject(outValue) };
1249 }
1250
1251 if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1252 if (statement->GetColumnCount() == 1) {
1253 return statement->GetColumn(0);
1254 }
1255
1256 if (statement->GetColumnCount() > 1) {
1257 LOG_ERROR("Not support the sql:%{public}s, column count more than 1", sql.c_str());
1258 return { E_NOT_SUPPORT_THE_SQL, object };
1259 }
1260 }
1261
1262 if (sqlType == SqliteUtils::STATEMENT_DDL) {
1263 statement->Reset();
1264 statement->Prepare("PRAGMA schema_version");
1265 auto [err, version] = statement->ExecuteForValue();
1266 if (vSchema_ < static_cast<int64_t>(version)) {
1267 LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1268 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1269 vSchema_ = version;
1270 errCode = connectionPool_->RestartReaders();
1271 }
1272 }
1273 return { errCode, object };
1274 }
1275
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1276 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1277 {
1278 if (config_.GetDBType() == DB_VECTOR) {
1279 return E_NOT_SUPPORT;
1280 }
1281 auto [errCode, statement] = BeginExecuteSql(sql);
1282 if (statement == nullptr) {
1283 return errCode;
1284 }
1285 auto [err, object] = statement->ExecuteForValue(args);
1286 if (err != E_OK) {
1287 LOG_ERROR("failed, sql %{public}s, ERROR is %{public}d.", sql.c_str(), err);
1288 }
1289 outValue = object;
1290 return err;
1291 }
1292
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1293 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1294 {
1295 if (config_.GetDBType() == DB_VECTOR) {
1296 return E_NOT_SUPPORT;
1297 }
1298 auto [errCode, statement] = BeginExecuteSql(sql);
1299 if (statement == nullptr) {
1300 return errCode;
1301 }
1302 ValueObject object;
1303 std::tie(errCode, object) = statement->ExecuteForValue(args);
1304 if (errCode != E_OK) {
1305 LOG_ERROR("failed, sql %{public}s, ERROR is %{public}d.", sql.c_str(), errCode);
1306 }
1307 outValue = static_cast<std::string>(object);
1308 return errCode;
1309 }
1310
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1311 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1312 {
1313 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1314 return E_NOT_SUPPORT;
1315 }
1316 auto begin = std::chrono::steady_clock::now();
1317 auto [errCode, statement] = GetStatement(sql, false);
1318 if (statement == nullptr) {
1319 return errCode;
1320 }
1321 auto beginExec = std::chrono::steady_clock::now();
1322 errCode = statement->Execute(args);
1323 if (errCode != E_OK) {
1324 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1325 connectionPool_->Dump(true, "INSERT");
1326 }
1327 return errCode;
1328 }
1329 auto beginResult = std::chrono::steady_clock::now();
1330 outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1331 auto allEnd = std::chrono::steady_clock::now();
1332 int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(begin - allEnd).count();
1333 if (totalCostTime >= TIME_OUT) {
1334 int64_t prepareCost =
1335 std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1336 int64_t execCost =
1337 std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - beginResult).count();
1338 int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1339 LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1340 "] result[%{public}" PRId64 "] "
1341 "sql[%{public}s]",
1342 totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::Anonymous(sql).c_str());
1343 }
1344 return E_OK;
1345 }
1346
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1347 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1348 {
1349 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1350 return E_NOT_SUPPORT;
1351 }
1352 auto [errCode, statement] = GetStatement(sql, false);
1353 if (statement == nullptr) {
1354 return errCode;
1355 }
1356 errCode = statement->Execute(args);
1357 if (errCode != E_OK) {
1358 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1359 connectionPool_->Dump(true, "UPG DEL");
1360 }
1361 return errCode;
1362 }
1363 outValue = statement->Changes();
1364 return E_OK;
1365 }
1366
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1367 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1368 {
1369 if (databasePath.empty()) {
1370 return E_INVALID_FILE_PATH;
1371 }
1372
1373 if (ISFILE(databasePath)) {
1374 backupFilePath = ExtractFilePath(path_) + databasePath;
1375 } else {
1376 // 2 represents two characters starting from the len - 2 position
1377 if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1378 databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1379 LOG_ERROR("Invalid databasePath.");
1380 return E_INVALID_FILE_PATH;
1381 }
1382 backupFilePath = databasePath;
1383 }
1384
1385 if (backupFilePath == path_) {
1386 LOG_ERROR("The backupPath and path should not be same.");
1387 return E_INVALID_FILE_PATH;
1388 }
1389
1390 LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1391 return E_OK;
1392 }
1393
GetSlaveName(const std::string & path,std::string & backupFilePath)1394 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1395 {
1396 std::string suffix(".db");
1397 std::string slaveSuffix("_slave.db");
1398 auto pos = path.find(suffix);
1399 if (pos == std::string::npos) {
1400 backupFilePath = path + slaveSuffix;
1401 } else {
1402 backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1403 }
1404 return E_OK;
1405 }
1406
1407 /**
1408 * Backup a database from a specified encrypted or unencrypted database file.
1409 */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey)1410 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey)
1411 {
1412 LOG_INFO("Backup db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
1413 if (isReadOnly_) {
1414 return E_NOT_SUPPORT;
1415 }
1416 std::string backupFilePath;
1417 if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1418 return InnerBackup(backupFilePath, encryptKey);
1419 }
1420
1421 int ret = GetDataBasePath(databasePath, backupFilePath);
1422 if (ret != E_OK) {
1423 return ret;
1424 }
1425
1426 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1427 keyFiles.Lock();
1428
1429 auto deleteDirtyFiles = [&backupFilePath] {
1430 auto res = SqliteUtils::DeleteFile(backupFilePath);
1431 res = SqliteUtils::DeleteFile(backupFilePath + "-shm") && res;
1432 res = SqliteUtils::DeleteFile(backupFilePath + "-wal") && res;
1433 return res;
1434 };
1435
1436 auto walFile = backupFilePath + "-wal";
1437 if (access(walFile.c_str(), F_OK) == E_OK) {
1438 if (!deleteDirtyFiles()) {
1439 keyFiles.Unlock();
1440 return E_ERROR;
1441 }
1442 }
1443 std::string tempPath = backupFilePath + ".tmp";
1444 if (access(tempPath.c_str(), F_OK) == E_OK) {
1445 SqliteUtils::DeleteFile(backupFilePath);
1446 } else {
1447 if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1448 LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1449 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1450 keyFiles.Unlock();
1451 return E_ERROR;
1452 }
1453 }
1454 ret = InnerBackup(backupFilePath, encryptKey);
1455 if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1456 if (deleteDirtyFiles()) {
1457 SqliteUtils::RenameFile(tempPath, backupFilePath);
1458 }
1459 } else {
1460 SqliteUtils::DeleteFile(tempPath);
1461 }
1462 keyFiles.Unlock();
1463 return ret;
1464 }
1465
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1466 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(const std::string &databasePath,
1467 const std::vector<uint8_t> &destEncryptKey)
1468 {
1469 std::vector<ValueObject> bindArgs;
1470 bindArgs.emplace_back(databasePath);
1471 if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1472 bindArgs.emplace_back(destEncryptKey);
1473 } else if (config_.IsEncrypt()) {
1474 std::vector<uint8_t> key = config_.GetEncryptKey();
1475 bindArgs.emplace_back(key);
1476 key.assign(key.size(), 0);
1477 } else {
1478 bindArgs.emplace_back("");
1479 }
1480 return bindArgs;
1481 }
1482
1483 /**
1484 * Backup a database from a specified encrypted or unencrypted database file.
1485 */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1486 int RdbStoreImpl::InnerBackup(const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1487 {
1488 if (isReadOnly_) {
1489 return E_NOT_SUPPORT;
1490 }
1491
1492 if (config_.GetDBType() == DB_VECTOR) {
1493 if (config_.IsEncrypt()) {
1494 return E_NOT_SUPPORT;
1495 }
1496
1497 auto conn = connectionPool_->AcquireConnection(false);
1498 if (conn == nullptr) {
1499 return E_BASE;
1500 }
1501
1502 return conn->Backup(databasePath, {}, false, slaveStatus_);
1503 }
1504
1505 if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1506 auto conn = connectionPool_->AcquireConnection(false);
1507 return conn == nullptr ? E_BASE : conn->Backup(databasePath, {}, false, slaveStatus_);
1508 }
1509
1510 auto [result, conn] = CreateWritableConn();
1511 if (result != E_OK) {
1512 return result;
1513 }
1514
1515 if (config_.IsEncrypt()) {
1516 result = SetDefaultEncryptAlgo(conn, config_);
1517 if (result != E_OK) {
1518 return result;
1519 }
1520 }
1521
1522 std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1523 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1524 errCode = statement->Execute(bindArgs);
1525 if (errCode != E_OK) {
1526 return errCode;
1527 }
1528 errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1529 errCode = statement->Execute();
1530 if (errCode != E_OK) {
1531 return errCode;
1532 }
1533 errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1534 int ret = statement->Execute();
1535 errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1536 int res = statement->Execute();
1537 return (res == E_OK) ? ret : res;
1538 }
1539
BeginExecuteSql(const std::string & sql)1540 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string& sql)
1541 {
1542 int type = SqliteUtils::GetSqlStatementType(sql);
1543 if (SqliteUtils::IsSpecial(type)) {
1544 return { E_NOT_SUPPORT, nullptr };
1545 }
1546
1547 bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1548 auto conn = connectionPool_->AcquireConnection(assumeReadOnly);
1549 if (conn == nullptr) {
1550 return { E_DATABASE_BUSY, nullptr };
1551 }
1552
1553 auto [errCode, statement] = conn->CreateStatement(sql, conn);
1554 if (statement == nullptr) {
1555 return { errCode, nullptr };
1556 }
1557
1558 if (statement->ReadOnly() && conn->IsWriter()) {
1559 statement = nullptr;
1560 conn = nullptr;
1561 return GetStatement(sql, true);
1562 }
1563
1564 return { errCode, statement };
1565 }
1566
IsHoldingConnection()1567 bool RdbStoreImpl::IsHoldingConnection()
1568 {
1569 return connectionPool_ != nullptr;
1570 }
1571
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1572 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1573 {
1574 if (conn == nullptr) {
1575 return E_DATABASE_BUSY;
1576 }
1577
1578 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO, conn);
1579 if (errCode != E_OK || statement == nullptr) {
1580 return errCode;
1581 }
1582
1583 errCode = statement->Execute();
1584 if (errCode != E_OK) {
1585 LOG_ERROR("Execute failed: %{public}s, %{public}d",
1586 SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetIter());
1587 }
1588 return errCode;
1589 }
1590
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1591 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1592 const std::vector<uint8_t> &key, int32_t waitTime)
1593 {
1594 auto [conn, readers] = connectionPool_->AcquireAll(waitTime);
1595 if (conn == nullptr) {
1596 return E_DATABASE_BUSY;
1597 }
1598
1599 if (config_.GetStorageMode() != StorageMode::MODE_MEMORY &&
1600 conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
1601 // close first to prevent the connection from being put back.
1602 connectionPool_->CloseAllConnections();
1603 conn = nullptr;
1604 readers.clear();
1605 auto [err, newConn] = connectionPool_->DisableWal();
1606 if (err != E_OK) {
1607 return err;
1608 }
1609 conn = newConn;
1610 }
1611 std::vector<ValueObject> bindArgs;
1612 bindArgs.emplace_back(ValueObject(dbPath));
1613 bindArgs.emplace_back(ValueObject(attachName));
1614 if (!key.empty()) {
1615 auto ret = SetDefaultEncryptAlgo(conn, config);
1616 if (ret != E_OK) {
1617 return ret;
1618 }
1619 bindArgs.emplace_back(ValueObject(key));
1620 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
1621 if (statement == nullptr || errCode != E_OK) {
1622 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1623 return E_ERROR;
1624 }
1625 return statement->Execute(bindArgs);
1626 }
1627
1628 auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_SQL, conn);
1629 if (statement == nullptr || errCode != E_OK) {
1630 LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1631 return errCode;
1632 }
1633 return statement->Execute(bindArgs);
1634 }
1635
1636 /**
1637 * Attaches a database.
1638 */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)1639 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
1640 const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
1641 {
1642 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE) {
1643 return { E_NOT_SUPPORT, 0 };
1644 }
1645 std::string dbPath;
1646 int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
1647 if (err != E_OK || access(dbPath.c_str(), F_OK) != E_OK) {
1648 return { E_INVALID_FILE_PATH, 0 };
1649 }
1650
1651 // encrypted databases are not supported to attach a non encrypted database.
1652 if (!config.IsEncrypt() && config_.IsEncrypt()) {
1653 return { E_NOT_SUPPORT, 0 };
1654 }
1655
1656 if (attachedInfo_.Contains(attachName)) {
1657 return { E_ATTACHED_DATABASE_EXIST, 0 };
1658 }
1659
1660 std::vector<uint8_t> key;
1661 config.Initialize();
1662 if (config.IsEncrypt()) {
1663 key = config.GetEncryptKey();
1664 }
1665 err = AttachInner(config, attachName, dbPath, key, waitTime);
1666 key.assign(key.size(), 0);
1667 if (err == E_SQLITE_ERROR) {
1668 // only when attachName is already in use, SQLITE-ERROR will be reported here.
1669 return { E_ATTACHED_DATABASE_EXIST, 0 };
1670 } else if (err != E_OK) {
1671 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
1672 "[%{public}s]",
1673 err, SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str(),
1674 SqliteUtils::Anonymous(config.GetName()).c_str());
1675 return { err, 0 };
1676 }
1677 if (!attachedInfo_.Insert(attachName, dbPath)) {
1678 return { E_ATTACHED_DATABASE_EXIST, 0 };
1679 }
1680 return { E_OK, attachedInfo_.Size() };
1681 }
1682
Detach(const std::string & attachName,int32_t waitTime)1683 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
1684 {
1685 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1686 return { E_NOT_SUPPORT, 0 };
1687 }
1688 if (!attachedInfo_.Contains(attachName)) {
1689 return { E_OK, attachedInfo_.Size() };
1690 }
1691
1692 auto [connection, readers] = connectionPool_->AcquireAll(waitTime);
1693 if (connection == nullptr) {
1694 return { E_DATABASE_BUSY, 0 };
1695 }
1696 std::vector<ValueObject> bindArgs;
1697 bindArgs.push_back(ValueObject(attachName));
1698
1699 auto [errCode, statement] = connection->CreateStatement(GlobalExpr::DETACH_SQL, connection);
1700 if (statement == nullptr || errCode != E_OK) {
1701 LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
1702 return { errCode, 0 };
1703 }
1704 errCode = statement->Execute(bindArgs);
1705 if (errCode != E_OK) {
1706 LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
1707 SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str());
1708 return { errCode, 0 };
1709 }
1710
1711 attachedInfo_.Erase(attachName);
1712 if (!attachedInfo_.Empty()) {
1713 return { E_OK, attachedInfo_.Size() };
1714 }
1715 statement = nullptr;
1716 // close first to prevent the connection from being put back.
1717 connectionPool_->CloseAllConnections();
1718 connection = nullptr;
1719 readers.clear();
1720 errCode = connectionPool_->EnableWal();
1721 return { errCode, 0 };
1722 }
1723
1724 /**
1725 * Obtains the database version.
1726 */
GetVersion(int & version)1727 int RdbStoreImpl::GetVersion(int &version)
1728 {
1729 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
1730 if (statement == nullptr) {
1731 return errCode;
1732 }
1733 ValueObject value;
1734 std::tie(errCode, value) = statement->ExecuteForValue();
1735 auto val = std::get_if<int64_t>(&value.value);
1736 if (val != nullptr) {
1737 version = static_cast<int>(*val);
1738 }
1739 return errCode;
1740 }
1741
1742 /**
1743 * Sets the version of a new database.
1744 */
SetVersion(int version)1745 int RdbStoreImpl::SetVersion(int version)
1746 {
1747 if (isReadOnly_) {
1748 return E_NOT_SUPPORT;
1749 }
1750 std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
1751 auto [errCode, statement] = GetStatement(sql);
1752 if (statement == nullptr) {
1753 return errCode;
1754 }
1755 return statement->Execute();
1756 }
1757 /**
1758 * Begins a transaction in EXCLUSIVE mode.
1759 */
BeginTransaction()1760 int RdbStoreImpl::BeginTransaction()
1761 {
1762 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1763 std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1764 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1765 return E_NOT_SUPPORT;
1766 }
1767 // size + 1 means the number of transactions in process
1768 size_t transactionId = connectionPool_->GetTransactionStack().size() + 1;
1769 BaseTransaction transaction(connectionPool_->GetTransactionStack().size());
1770 auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
1771 if (statement == nullptr) {
1772 return errCode;
1773 }
1774 errCode = statement->Execute();
1775 if (errCode != E_OK) {
1776 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1777 connectionPool_->Dump(true, "BEGIN");
1778 }
1779 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1780 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1781 return errCode;
1782 }
1783 connectionPool_->SetInTransaction(true);
1784 connectionPool_->GetTransactionStack().push(transaction);
1785 // 1 means the number of transactions in process
1786 if (transactionId > 1) {
1787 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1788 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1789 }
1790
1791 return E_OK;
1792 }
1793
BeginTrans()1794 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
1795 {
1796 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1797 if (!config_.IsVector() || isReadOnly_) {
1798 return {E_NOT_SUPPORT, 0};
1799 }
1800
1801 int64_t tmpTrxId = 0;
1802 auto [errCode, connection] = connectionPool_->CreateTransConn(false);
1803 if (connection == nullptr) {
1804 LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
1805 SqliteUtils::Anonymous(name_).c_str(), errCode);
1806 return {errCode, 0};
1807 }
1808 tmpTrxId = newTrxId_.fetch_add(1);
1809 trxConnMap_.Insert(tmpTrxId, connection);
1810 errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
1811 if (errCode != E_OK) {
1812 trxConnMap_.Erase(tmpTrxId);
1813 }
1814 return {errCode, tmpTrxId};
1815 }
1816
1817 /**
1818 * Begins a transaction in EXCLUSIVE mode.
1819 */
RollBack()1820 int RdbStoreImpl::RollBack()
1821 {
1822 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1823 std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1824 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1825 return E_NOT_SUPPORT;
1826 }
1827 size_t transactionId = connectionPool_->GetTransactionStack().size();
1828
1829 if (connectionPool_->GetTransactionStack().empty()) {
1830 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
1831 SqliteUtils::Anonymous(name_).c_str());
1832 return E_NO_TRANSACTION_IN_SESSION;
1833 }
1834 BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1835 connectionPool_->GetTransactionStack().pop();
1836 if (transaction.GetType() != TransType::ROLLBACK_SELF && !connectionPool_->GetTransactionStack().empty()) {
1837 connectionPool_->GetTransactionStack().top().SetChildFailure(true);
1838 }
1839 auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
1840 if (statement == nullptr) {
1841 if (errCode == E_DATABASE_BUSY) {
1842 Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1843 }
1844 // size + 1 means the number of transactions in process
1845 LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
1846 SqliteUtils::Anonymous(name_).c_str());
1847 return E_DATABASE_BUSY;
1848 }
1849 errCode = statement->Execute();
1850 if (errCode != E_OK) {
1851 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
1852 Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1853 }
1854 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1855 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1856 return errCode;
1857 }
1858 if (connectionPool_->GetTransactionStack().empty()) {
1859 connectionPool_->SetInTransaction(false);
1860 }
1861 // 1 means the number of transactions in process
1862 if (transactionId > 1) {
1863 LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1864 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1865 }
1866 return E_OK;
1867 }
1868
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)1869 int RdbStoreImpl::ExecuteByTrxId(const std::string &sql, int64_t trxId, bool closeConnAfterExecute,
1870 const std::vector<ValueObject> &bindArgs)
1871 {
1872 if ((!config_.IsVector()) || isReadOnly_) {
1873 return E_NOT_SUPPORT;
1874 }
1875 if (trxId == 0) {
1876 return E_INVALID_ARGS;
1877 }
1878
1879 if (!trxConnMap_.Contains(trxId)) {
1880 LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
1881 return E_INVALID_ARGS;
1882 }
1883 auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
1884 auto result = trxConnMap_.Find(trxId);
1885 auto connection = result.second;
1886 if (connection == nullptr) {
1887 LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
1888 SqliteUtils::Anonymous(name_).c_str(), time);
1889 return E_ERROR;
1890 }
1891 auto [ret, statement] = GetStatement(sql, connection);
1892 if (ret != E_OK) {
1893 return ret;
1894 }
1895 ret = statement->Execute(bindArgs);
1896 if (ret != E_OK) {
1897 LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
1898 SqliteUtils::Anonymous(name_).c_str(), ret);
1899 trxConnMap_.Erase(trxId);
1900 return ret;
1901 }
1902 if (closeConnAfterExecute) {
1903 trxConnMap_.Erase(trxId);
1904 }
1905 return E_OK;
1906 }
1907
RollBack(int64_t trxId)1908 int RdbStoreImpl::RollBack(int64_t trxId)
1909 {
1910 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1911 return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
1912 }
1913
1914 /**
1915 * Begins a transaction in EXCLUSIVE mode.
1916 */
Commit()1917 int RdbStoreImpl::Commit()
1918 {
1919 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1920 std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1921 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1922 return E_NOT_SUPPORT;
1923 }
1924 size_t transactionId = connectionPool_->GetTransactionStack().size();
1925
1926 if (connectionPool_->GetTransactionStack().empty()) {
1927 return E_OK;
1928 }
1929 BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1930 std::string sqlStr = transaction.GetCommitStr();
1931 if (sqlStr.size() <= 1) {
1932 LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s",
1933 transactionId, SqliteUtils::Anonymous(name_).c_str(), sqlStr.c_str());
1934 connectionPool_->GetTransactionStack().pop();
1935 return E_OK;
1936 }
1937 auto [errCode, statement] = GetStatement(sqlStr);
1938 if (statement == nullptr) {
1939 if (errCode == E_DATABASE_BUSY || errCode == E_SQLITE_BUSY || E_SQLITE_LOCKED) {
1940 Reportor::Report(Reportor::Create(config_, E_DATABASE_BUSY, "ErrorType: Busy"));
1941 }
1942 LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
1943 SqliteUtils::Anonymous(name_).c_str());
1944 return E_DATABASE_BUSY;
1945 }
1946 errCode = statement->Execute();
1947 if (errCode != E_OK) {
1948 if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
1949 Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
1950 }
1951 LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1952 transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1953 return errCode;
1954 }
1955 connectionPool_->SetInTransaction(false);
1956 // 1 means the number of transactions in process
1957 if (transactionId > 1) {
1958 LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
1959 SqliteUtils::Anonymous(name_).c_str(), errCode);
1960 }
1961 connectionPool_->GetTransactionStack().pop();
1962 return E_OK;
1963 }
1964
Commit(int64_t trxId)1965 int RdbStoreImpl::Commit(int64_t trxId)
1966 {
1967 DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1968 return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
1969 }
1970
IsInTransaction()1971 bool RdbStoreImpl::IsInTransaction()
1972 {
1973 if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1974 return false;
1975 }
1976 return connectionPool_->IsInTransaction();
1977 }
1978
CheckAttach(const std::string & sql)1979 int RdbStoreImpl::CheckAttach(const std::string &sql)
1980 {
1981 size_t index = sql.find_first_not_of(' ');
1982 if (index == std::string::npos) {
1983 return E_OK;
1984 }
1985
1986 /* The first 3 characters can determine the type */
1987 std::string sqlType = sql.substr(index, 3);
1988 sqlType = SqliteUtils::StrToUpper(sqlType);
1989 if (sqlType != "ATT") {
1990 return E_OK;
1991 }
1992
1993 auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
1994 if (statement == nullptr) {
1995 return errCode;
1996 }
1997
1998 errCode = statement->Execute();
1999 if (errCode != E_OK) {
2000 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2001 return errCode;
2002 }
2003 auto [errorCode, valueObject] = statement->GetColumn(0);
2004 if (errorCode != E_OK) {
2005 LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2006 return errorCode;
2007 }
2008 auto journal = std::get_if<std::string>(&valueObject.value);
2009 auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2010 if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2011 LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2012 return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2013 }
2014
2015 return E_OK;
2016 }
2017
IsOpen() const2018 bool RdbStoreImpl::IsOpen() const
2019 {
2020 return isOpen_;
2021 }
2022
GetPath()2023 std::string RdbStoreImpl::GetPath()
2024 {
2025 return path_;
2026 }
2027
IsReadOnly() const2028 bool RdbStoreImpl::IsReadOnly() const
2029 {
2030 return isReadOnly_;
2031 }
2032
IsMemoryRdb() const2033 bool RdbStoreImpl::IsMemoryRdb() const
2034 {
2035 return isMemoryRdb_;
2036 }
2037
GetName()2038 std::string RdbStoreImpl::GetName()
2039 {
2040 return name_;
2041 }
2042
DoCloudSync(const std::string & table)2043 void RdbStoreImpl::DoCloudSync(const std::string &table)
2044 {
2045 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2046 auto needSync = cloudInfo_->Change(table);
2047 if (!needSync) {
2048 return;
2049 }
2050 auto pool = TaskExecutor::GetInstance().GetExecutor();
2051 if (pool == nullptr) {
2052 return;
2053 }
2054 auto interval =
2055 std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2056 pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2057 auto changeInfo = cloudInfo.lock();
2058 if (changeInfo == nullptr) {
2059 return ;
2060 }
2061 auto tables = changeInfo->Steal();
2062 if (tables.empty()) {
2063 return;
2064 }
2065 DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2066 auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2067 InnerSync(param, option, memo, nullptr);
2068 });
2069 #endif
2070 }
GetFileType()2071 std::string RdbStoreImpl::GetFileType()
2072 {
2073 return fileType_;
2074 }
2075
2076 /**
2077 * Sets the database locale.
2078 */
ConfigLocale(const std::string & localeStr)2079 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2080 {
2081 if (!isOpen_) {
2082 LOG_ERROR("The connection pool has been closed.");
2083 return E_ERROR;
2084 }
2085
2086 if (connectionPool_ == nullptr) {
2087 LOG_ERROR("connectionPool_ is null.");
2088 return E_ERROR;
2089 }
2090 return connectionPool_->ConfigLocale(localeStr);
2091 }
2092
GetDestPath(const std::string & backupPath,std::string & destPath)2093 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2094 {
2095 int ret = GetDataBasePath(backupPath, destPath);
2096 if (ret != E_OK) {
2097 return ret;
2098 }
2099 std::string tempPath = destPath + ".tmp";
2100 if (access(tempPath.c_str(), F_OK) == E_OK) {
2101 destPath = tempPath;
2102 } else {
2103 auto walFile = destPath + "-wal";
2104 if (access(walFile.c_str(), F_OK) == E_OK) {
2105 return E_ERROR;
2106 }
2107 }
2108
2109 if (access(destPath.c_str(), F_OK) != E_OK) {
2110 LOG_ERROR("The backupFilePath does not exists.");
2111 return E_INVALID_FILE_PATH;
2112 }
2113 return E_OK;
2114 }
2115
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2116 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2117 {
2118 LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2119 if (isReadOnly_) {
2120 return E_NOT_SUPPORT;
2121 }
2122
2123 if (!isOpen_ || connectionPool_ == nullptr) {
2124 LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, connectionPool_ == nullptr);
2125 return E_ERROR;
2126 }
2127
2128 RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2129 keyFiles.Lock();
2130 std::string destPath;
2131 bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2132 if (!isOK) {
2133 int ret = GetDestPath(backupPath, destPath);
2134 if (ret != E_OK) {
2135 keyFiles.Unlock();
2136 return ret;
2137 }
2138 }
2139 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2140 auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2141 if (service != nullptr) {
2142 service->Disable(syncerParam_);
2143 }
2144 #endif
2145 bool corrupt = Reportor::IsReportCorruptedFault(path_);
2146 int errCode = connectionPool_->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2147 keyFiles.Unlock();
2148 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2149 SecurityPolicy::SetSecurityLabel(config_);
2150 if (service != nullptr) {
2151 service->Enable(syncerParam_);
2152 if (errCode == E_OK) {
2153 auto syncerParam = syncerParam_;
2154 syncerParam.infos_ = Connection::Collect(config_);
2155 service->AfterOpen(syncerParam);
2156 NotifyDataChange();
2157 }
2158 }
2159 #endif
2160 if (errCode == E_OK) {
2161 Reportor::ReportRestore(Reportor::Create(config_, E_OK), corrupt);
2162 rebuild_ = RebuiltType::NONE;
2163 }
2164 DoCloudSync("");
2165 return errCode;
2166 }
2167
CreateWritableConn()2168 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn()
2169 {
2170 auto config = config_;
2171 config.SetHaMode(HAMode::SINGLE);
2172 auto [result, conn] = Connection::Create(config, true);
2173 if (result != E_OK || conn == nullptr) {
2174 LOG_ERROR("create connection failed, err:%{public}d", result);
2175 return { result, nullptr };
2176 }
2177 return { E_OK, conn };
2178 }
2179
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2180 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2181 const std::string &sql, std::shared_ptr<Connection> conn) const
2182 {
2183 if (conn == nullptr) {
2184 return { E_DATABASE_BUSY, nullptr };
2185 }
2186 return conn->CreateStatement(sql, conn);
2187 }
2188
GetStatement(const std::string & sql,bool read) const2189 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string& sql, bool read) const
2190 {
2191 auto conn = connectionPool_->AcquireConnection(read);
2192 if (conn == nullptr) {
2193 return { E_DATABASE_BUSY, nullptr };
2194 }
2195 return conn->CreateStatement(sql, conn);
2196 }
2197
GetRebuilt(RebuiltType & rebuilt)2198 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2199 {
2200 rebuilt = static_cast<RebuiltType>(rebuild_);
2201 return E_OK;
2202 }
2203
InterruptBackup()2204 int RdbStoreImpl::InterruptBackup()
2205 {
2206 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2207 return E_NOT_SUPPORT;
2208 }
2209 if (slaveStatus_ == SlaveStatus::BACKING_UP) {
2210 slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2211 return E_OK;
2212 }
2213 return E_INVALID_INTERRUPT;
2214 }
2215
GetBackupStatus() const2216 int32_t RdbStoreImpl::GetBackupStatus() const
2217 {
2218 if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2219 return SlaveStatus::UNDEFINED;
2220 }
2221 return slaveStatus_;
2222 }
2223
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2224 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2225 {
2226 if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2227 return false;
2228 }
2229 int ret = GetSlaveName(config_.GetPath(), destPath);
2230 if (ret != E_OK) {
2231 destPath = {};
2232 return false;
2233 }
2234 if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2235 LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2236 return false;
2237 }
2238 return true;
2239 }
2240
IsSlaveDiffFromMaster() const2241 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2242 {
2243 std::string failureFlagFile = config_.GetPath() + "-slaveFailure";
2244 std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2245 return access(failureFlagFile.c_str(), F_OK) == 0 || access(slaveDbPath.c_str(), F_OK) != 0;
2246 }
2247
ExchangeSlaverToMaster()2248 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2249 {
2250 if (isReadOnly_) {
2251 return E_OK;
2252 }
2253 auto conn = connectionPool_->AcquireConnection(false);
2254 if (conn == nullptr) {
2255 return E_DATABASE_BUSY;
2256 }
2257 auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2258 if (strategy != ExchangeStrategy::NOT_HANDLE) {
2259 LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2260 }
2261 int ret = E_OK;
2262 if (strategy == ExchangeStrategy::RESTORE) {
2263 conn = nullptr;
2264 // disable is required before restore
2265 ret = Restore({}, {});
2266 } else if (strategy == ExchangeStrategy::BACKUP) {
2267 // async backup
2268 ret = conn->Backup({}, {}, true, slaveStatus_);
2269 }
2270 return ret;
2271 }
2272
GetDbType() const2273 int32_t RdbStoreImpl::GetDbType() const
2274 {
2275 return config_.GetDBType();
2276 }
2277
CreateTransaction(int32_t type)2278 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2279 {
2280 if (isReadOnly_) {
2281 return { E_NOT_SUPPORT, nullptr};
2282 }
2283
2284 auto [errCode, conn] = connectionPool_->CreateTransConn();
2285 if (conn == nullptr) {
2286 return { errCode, nullptr };
2287 }
2288 std::shared_ptr<Transaction> trans;
2289 std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetName());
2290 if (trans == nullptr) {
2291 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2292 connectionPool_->Dump(true, "TRANS");
2293 }
2294 return { errCode, nullptr };
2295 }
2296
2297 std::lock_guard<decltype(mutex_)> guard(mutex_);
2298 for (auto it = transactions_.begin(); it != transactions_.end();) {
2299 if (it->expired()) {
2300 it = transactions_.erase(it);
2301 } else {
2302 it++;
2303 }
2304 }
2305 transactions_.push_back(trans);
2306 return { errCode, trans };
2307 }
2308
AddTables(const std::vector<std::string> & tables)2309 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2310 {
2311 std::lock_guard<std::mutex> lock(mutex_);
2312 for (auto &table : tables) {
2313 tables_.insert(table);
2314 }
2315 return E_OK;
2316 }
2317
RmvTables(const std::vector<std::string> & tables)2318 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2319 {
2320 std::lock_guard<std::mutex> lock(mutex_);
2321 for (auto &table : tables) {
2322 tables_.erase(table);
2323 }
2324 return E_OK;
2325 }
2326
Change(const std::string & table)2327 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2328 {
2329 bool needSync = false;
2330 {
2331 std::lock_guard<std::mutex> lock(mutex_);
2332 if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2333 return needSync;
2334 }
2335 // from empty, then need schedule the cloud sync, others only wait the schedule execute.
2336 needSync = changes_.empty();
2337 if (!table.empty()) {
2338 changes_.insert(table);
2339 } else {
2340 changes_.insert(tables_.begin(), tables_.end());
2341 }
2342 }
2343 return needSync;
2344 }
2345
Steal()2346 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2347 {
2348 std::set<std::string> result;
2349 {
2350 std::lock_guard<std::mutex> lock(mutex_);
2351 result = std::move(changes_);
2352 }
2353 return result;
2354 }
2355 } // namespace OHOS::NativeRdb