• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17 
18 #include <algorithm>
19 #include <chrono>
20 #include <cinttypes>
21 #include <cstdint>
22 #include <memory>
23 #include <mutex>
24 #include <sstream>
25 #include <string>
26 #include <unistd.h>
27 
28 #include "cache_result_set.h"
29 #include "directory_ex.h"
30 #include "logger.h"
31 #include "rdb_common.h"
32 #include "rdb_errno.h"
33 #include "rdb_fault_hiview_reporter.h"
34 #include "rdb_radar_reporter.h"
35 #include "rdb_security_manager.h"
36 #include "rdb_sql_statistic.h"
37 #include "rdb_store.h"
38 #include "rdb_trace.h"
39 #include "rdb_types.h"
40 #include "relational_store_client.h"
41 #include "sqlite_global_config.h"
42 #include "sqlite_sql_builder.h"
43 #include "sqlite_statement.h"
44 #include "sqlite_utils.h"
45 #include "step_result_set.h"
46 #include "values_buckets.h"
47 #include "task_executor.h"
48 #include "traits.h"
49 #include "transaction.h"
50 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
51 #include "delay_notify.h"
52 #include "raw_data_parser.h"
53 #include "rdb_device_manager_adapter.h"
54 #include "rdb_manager_impl.h"
55 #include "relational_store_manager.h"
56 #include "runtime_config.h"
57 #include "security_policy.h"
58 #include "sqlite_shared_result_set.h"
59 #endif
60 
61 #ifdef WINDOWS_PLATFORM
62 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
63 #else
64 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
65 #endif
66 
67 namespace OHOS::NativeRdb {
68 using namespace OHOS::Rdb;
69 using namespace std::chrono;
70 using SqlStatistic = DistributedRdb::SqlStatistic;
71 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
72 using Reportor = RdbFaultHiViewReporter;
73 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
74 using RdbMgr = DistributedRdb::RdbManagerImpl;
75 #endif
76 
77 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
78 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
79 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
80 static constexpr const char *BACKUP_RESTORE = "backup.restore";
81 constexpr int64_t TIME_OUT = 1500;
82 
InitSyncerParam(const RdbStoreConfig & config,bool created)83 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
84 {
85     syncerParam_.bundleName_ = config.GetBundleName();
86     syncerParam_.hapName_ = config.GetModuleName();
87     syncerParam_.storeName_ = config.GetName();
88     syncerParam_.customDir_ = config.GetCustomDir();
89     syncerParam_.area_ = config.GetArea();
90     syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
91     syncerParam_.type_ = config.GetDistributedType();
92     syncerParam_.isEncrypt_ = config.IsEncrypt();
93     syncerParam_.isAutoClean_ = config.GetAutoClean();
94     syncerParam_.isSearchable_ = config.IsSearchable();
95     syncerParam_.password_ = config.GetEncryptKey();
96     syncerParam_.haMode_ = config.GetHaMode();
97     syncerParam_.roleType_ = config.GetRoleType();
98     if (created) {
99         syncerParam_.infos_ = Connection::Collect(config);
100     }
101 }
102 
InnerOpen()103 int RdbStoreImpl::InnerOpen()
104 {
105     isOpen_ = true;
106 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
107     if (isReadOnly_) {
108         return E_OK;
109     }
110 
111     AfterOpen(syncerParam_);
112     int errCode = RegisterDataChangeCallback();
113     if (errCode != E_OK) {
114         LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
115     }
116 #endif
117     return E_OK;
118 }
119 
120 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)121 void RdbStoreImpl::AfterOpen(const RdbParam &param, int32_t retry)
122 {
123     auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
124     if (err == E_NOT_SUPPORT) {
125         return;
126     }
127     if (err != E_OK || service == nullptr) {
128         LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
129             SqliteUtils::Anonymous(param.storeName_).c_str());
130         auto pool = TaskExecutor::GetInstance().GetExecutor();
131         if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry++ < MAX_RETRY_TIMES) {
132             pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
133                 AfterOpen(param, retry);
134             });
135         }
136         return;
137     }
138     err = service->AfterOpen(param);
139     if (err != E_OK) {
140         LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
141             SqliteUtils::Anonymous(param.storeName_).c_str());
142     }
143 }
144 
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)145 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(const std::string &table, const std::string &columnName,
146     std::vector<PRIKey> &keys)
147 {
148     if (table.empty() || columnName.empty() || keys.empty()) {
149         LOG_ERROR("invalid para.");
150         return {};
151     }
152 
153     auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
154     if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
155         return GetModifyTimeByRowId(logTable, keys);
156     }
157     std::vector<ValueObject> hashKeys;
158     hashKeys.reserve(keys.size());
159     std::map<std::vector<uint8_t>, PRIKey> keyMap;
160     std::map<std::string, DistributedDB::Type> tmp;
161     for (const auto &key : keys) {
162         DistributedDB::Type value;
163         RawDataParser::Convert(key, value);
164         tmp[columnName] = value;
165         auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
166         if (hashKey.empty()) {
167             LOG_DEBUG("hash key fail");
168             continue;
169         }
170         hashKeys.emplace_back(ValueObject(hashKey));
171         keyMap[hashKey] = key;
172     }
173 
174     std::string sql;
175     sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
176     sql.append(logTable);
177     sql.append(" where hash_key in (");
178     sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
179     sql.append(")");
180     auto resultSet = QueryByStep(sql, hashKeys);
181     int count = 0;
182     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
183         LOG_ERROR("get resultSet err.");
184         return {};
185     }
186     return { resultSet, keyMap, false };
187 }
188 
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)189 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
190 {
191     std::string sql;
192     sql.append("select data_key as key, timestamp/10000 as modify_time from ");
193     sql.append(logTable);
194     sql.append(" where data_key in (");
195     sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
196     sql.append(")");
197     std::vector<ValueObject> args;
198     args.reserve(keys.size());
199     for (auto &key : keys) {
200         ValueObject::Type value;
201         RawDataParser::Convert(key, value);
202         args.emplace_back(ValueObject(value));
203     }
204     auto resultSet = QueryByStep(sql, args);
205     int count = 0;
206     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
207         LOG_ERROR("get resultSet err.");
208         return {};
209     }
210     return ModifyTime(resultSet, {}, true);
211 }
212 
CleanDirtyData(const std::string & table,uint64_t cursor)213 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
214 {
215     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
216         LOG_ERROR("not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d",
217             SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType());
218         return E_NOT_SUPPORT;
219     }
220     auto connection = connectionPool_->AcquireConnection(false);
221     if (connection == nullptr) {
222         LOG_ERROR("db is busy. table:%{public}s", SqliteUtils::Anonymous(table).c_str());
223         return E_DATABASE_BUSY;
224     }
225     int errCode = connection->CleanDirtyData(table, cursor);
226     return errCode;
227 }
228 
GetLogTableName(const std::string & tableName)229 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
230 {
231     return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
232 }
233 
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)234 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(const AbsRdbPredicates &predicates,
235     const Fields &columns)
236 {
237     if (config_.GetDBType() == DB_VECTOR) {
238         return { E_NOT_SUPPORT, nullptr };
239     }
240     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
241     if (errCode != E_OK) {
242         return { errCode, nullptr };
243     }
244     auto [status, resultSet] =
245         service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
246     if (status != E_OK) {
247         return { status, nullptr };
248     }
249     return { status, resultSet };
250 }
251 
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)252 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(const std::string &device, const AbsRdbPredicates &predicates,
253     const Fields &columns, int &errCode)
254 {
255     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
256     if (config_.GetDBType() == DB_VECTOR) {
257         return nullptr;
258     }
259     std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
260     std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
261     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
262     if (err == E_NOT_SUPPORT) {
263         errCode = err;
264         return nullptr;
265     }
266     if (err != E_OK) {
267         LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed");
268         errCode = err;
269         return nullptr;
270     }
271     auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
272     errCode = status;
273     return resultSet;
274 }
275 
NotifyDataChange()276 void RdbStoreImpl::NotifyDataChange()
277 {
278     int errCode = RegisterDataChangeCallback();
279     if (errCode != E_OK) {
280         LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
281     }
282     DistributedRdb::RdbChangedData rdbChangedData;
283     if (delayNotifier_ != nullptr) {
284         delayNotifier_->UpdateNotify(rdbChangedData, true);
285     }
286 }
287 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)288 int RdbStoreImpl::SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
289                                        const DistributedRdb::DistributedConfig &distributedConfig)
290 {
291     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
292     if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
293         return E_NOT_SUPPORT;
294     }
295     if (tables.empty()) {
296         LOG_WARN("The distributed tables to be set is empty.");
297         return E_OK;
298     }
299     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
300     if (errCode != E_OK) {
301         return errCode;
302     }
303     int32_t errorCode = service->SetDistributedTables(syncerParam_, tables, distributedConfig.references,
304         distributedConfig.isRebuild, type);
305     if (errorCode != E_OK) {
306         LOG_ERROR("Fail to set distributed tables, error=%{public}d", errorCode);
307         return errorCode;
308     }
309     if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
310         return E_OK;
311     }
312     auto conn = connectionPool_->AcquireConnection(false);
313     if (conn != nullptr) {
314         auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
315         if (strategy == ExchangeStrategy::BACKUP) {
316             (void)conn->Backup({}, {}, false, slaveStatus_);
317         }
318     }
319     {
320         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
321         if (distributedConfig.autoSync) {
322             cloudInfo_->AddTables(tables);
323         } else {
324             cloudInfo_->RmvTables(tables);
325             return E_OK;
326         }
327     }
328     auto isRebuilt = RebuiltType::NONE;
329     GetRebuilt(isRebuilt);
330     if (isRebuilt == RebuiltType::REBUILT) {
331         DoCloudSync("");
332     }
333     return E_OK;
334 }
335 
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)336 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
337 {
338     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
339     if (config_.GetDBType() == DB_VECTOR) {
340         return "";
341     }
342     std::string uuid;
343     DeviceManagerAdaptor::RdbDeviceManagerAdaptor &deviceManager =
344         DeviceManagerAdaptor::RdbDeviceManagerAdaptor::GetInstance(syncerParam_.bundleName_);
345     errCode = deviceManager.GetEncryptedUuidByNetworkId(device, uuid);
346     if (errCode != E_OK) {
347         LOG_ERROR("GetUuid is failed.");
348         return "";
349     }
350 
351     auto translateCall = [uuid](const std::string &oriDevId, const DistributedDB::StoreInfo &info) {
352         return uuid;
353     };
354     DistributedDB::RuntimeConfig::SetTranslateToDeviceIdCallback(translateCall);
355 
356     return DistributedDB::RelationalStoreManager::GetDistributedTableName(uuid, table);
357 }
358 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)359 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
360 {
361     if (config_.GetDBType() == DB_VECTOR) {
362         return E_NOT_SUPPORT;
363     }
364     return Sync(option, predicate, [callback](Details &&details) {
365         Briefs briefs;
366         for (auto &[key, value] : details) {
367             briefs.insert_or_assign(key, value.code);
368         }
369         if (callback != nullptr) {
370             callback(briefs);
371         }
372     });
373 }
374 
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)375 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
376 {
377     return Sync(option, AbsRdbPredicates(tables), async);
378 }
379 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)380 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
381 {
382     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
383     DistributedRdb::RdbService::Option rdbOption;
384     rdbOption.mode = option.mode;
385     rdbOption.isAsync = !option.isBlock;
386     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
387     ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
388     return ret;
389 }
390 
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)391 int RdbStoreImpl::InnerSync(const RdbParam &param, const Options &option, const Memo &predicates,
392     const AsyncDetail &async)
393 {
394     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
395     if (errCode == E_NOT_SUPPORT) {
396         return errCode;
397     }
398     if (errCode != E_OK) {
399         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
400             param.bundleName_.c_str());
401         return errCode;
402     }
403     errCode = service->Sync(param, option, predicates, async);
404     if (errCode != E_OK) {
405         LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
406         return errCode;
407     }
408     return E_OK;
409 }
410 
GetUri(const std::string & event)411 Uri RdbStoreImpl::GetUri(const std::string &event)
412 {
413     std::string rdbUri;
414     if (config_.GetDataGroupId().empty()) {
415         rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
416     } else {
417         rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
418     }
419     return Uri(rdbUri);
420 }
421 
SubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)422 int RdbStoreImpl::SubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
423 {
424     std::lock_guard<std::mutex> lock(mutex_);
425     localObservers_.try_emplace(option.event);
426     auto &list = localObservers_.find(option.event)->second;
427     for (auto it = list.begin(); it != list.end(); it++) {
428         if ((*it)->getObserver() == observer) {
429             LOG_ERROR("duplicate subscribe.");
430             return E_OK;
431         }
432     }
433 
434     localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
435     return E_OK;
436 }
437 
SubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)438 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
439 {
440     std::lock_guard<std::mutex> lock(mutex_);
441     localSharedObservers_.try_emplace(option.event);
442     auto &list = localSharedObservers_.find(option.event)->second;
443     for (auto it = list.begin(); it != list.end(); it++) {
444         if ((*it)->getObserver() == observer) {
445             LOG_ERROR("duplicate subscribe.");
446             return E_OK;
447         }
448     }
449 
450     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
451     if (client == nullptr) {
452         LOG_ERROR("Failed to get DataObsMgrClient.");
453         return E_GET_DATAOBSMGRCLIENT_FAIL;
454     }
455     sptr<RdbStoreLocalSharedObserver> localSharedObserver(new (std::nothrow) RdbStoreLocalSharedObserver(observer));
456     int32_t err = client->RegisterObserver(GetUri(option.event), localSharedObserver);
457     if (err != 0) {
458         LOG_ERROR("Subscribe failed.");
459         return err;
460     }
461     localSharedObservers_[option.event].push_back(std::move(localSharedObserver));
462     return E_OK;
463 }
464 
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)465 int32_t RdbStoreImpl::SubscribeLocalDetail(const SubscribeOption &option,
466                                            const std::shared_ptr<RdbStoreObserver> &observer)
467 {
468     auto connection = connectionPool_->AcquireConnection(false);
469     if (connection == nullptr) {
470         return E_DATABASE_BUSY;
471     }
472     int32_t errCode = connection->Subscribe(option.event, observer);
473     if (errCode != E_OK) {
474         LOG_ERROR("subscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
475             SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
476     }
477     return errCode;
478 }
479 
SubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)480 int RdbStoreImpl::SubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
481 {
482     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
483     if (errCode != E_OK) {
484         return errCode;
485     }
486     return service->Subscribe(syncerParam_, option, observer);
487 }
488 
Subscribe(const SubscribeOption & option,RdbStoreObserver * observer)489 int RdbStoreImpl::Subscribe(const SubscribeOption &option, RdbStoreObserver *observer)
490 {
491     if (config_.GetDBType() == DB_VECTOR) {
492         return E_NOT_SUPPORT;
493     }
494     if (option.mode == SubscribeMode::LOCAL) {
495         return SubscribeLocal(option, observer);
496     }
497     if (option.mode == SubscribeMode::LOCAL_SHARED) {
498         return SubscribeLocalShared(option, observer);
499     }
500     return SubscribeRemote(option, observer);
501 }
502 
UnSubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)503 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption& option, RdbStoreObserver *observer)
504 {
505     std::lock_guard<std::mutex> lock(mutex_);
506     auto obs = localObservers_.find(option.event);
507     if (obs == localObservers_.end()) {
508         return E_OK;
509     }
510 
511     auto &list = obs->second;
512     for (auto it = list.begin(); it != list.end(); it++) {
513         if ((*it)->getObserver() == observer) {
514             it = list.erase(it);
515             break;
516         }
517     }
518 
519     if (list.empty()) {
520         localObservers_.erase(option.event);
521     }
522     return E_OK;
523 }
524 
UnSubscribeLocalAll(const SubscribeOption & option)525 int RdbStoreImpl::UnSubscribeLocalAll(const SubscribeOption& option)
526 {
527     std::lock_guard<std::mutex> lock(mutex_);
528     auto obs = localObservers_.find(option.event);
529     if (obs == localObservers_.end()) {
530         return E_OK;
531     }
532 
533     localObservers_.erase(option.event);
534     return E_OK;
535 }
536 
UnSubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)537 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption& option, RdbStoreObserver *observer)
538 {
539     std::lock_guard<std::mutex> lock(mutex_);
540     auto obs = localSharedObservers_.find(option.event);
541     if (obs == localSharedObservers_.end()) {
542         return E_OK;
543     }
544 
545     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
546     if (client == nullptr) {
547         LOG_ERROR("Failed to get DataObsMgrClient.");
548         return E_GET_DATAOBSMGRCLIENT_FAIL;
549     }
550 
551     auto &list = obs->second;
552     for (auto it = list.begin(); it != list.end(); it++) {
553         if ((*it)->getObserver() == observer) {
554             int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
555             if (err != 0) {
556                 LOG_ERROR("UnSubscribeLocalShared failed.");
557                 return err;
558             }
559             list.erase(it);
560             break;
561         }
562     }
563     if (list.empty()) {
564         localSharedObservers_.erase(option.event);
565     }
566     return E_OK;
567 }
568 
UnSubscribeLocalSharedAll(const SubscribeOption & option)569 int RdbStoreImpl::UnSubscribeLocalSharedAll(const SubscribeOption& option)
570 {
571     std::lock_guard<std::mutex> lock(mutex_);
572     auto obs = localSharedObservers_.find(option.event);
573     if (obs == localSharedObservers_.end()) {
574         return E_OK;
575     }
576 
577     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
578     if (client == nullptr) {
579         LOG_ERROR("Failed to get DataObsMgrClient.");
580         return E_GET_DATAOBSMGRCLIENT_FAIL;
581     }
582 
583     auto &list = obs->second;
584     auto it = list.begin();
585     while (it != list.end()) {
586         int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
587         if (err != 0) {
588             LOG_ERROR("UnSubscribe failed.");
589             return err;
590         }
591         it = list.erase(it);
592     }
593 
594     localSharedObservers_.erase(option.event);
595     return E_OK;
596 }
597 
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)598 int32_t RdbStoreImpl::UnsubscribeLocalDetail(const SubscribeOption& option,
599                                              const std::shared_ptr<RdbStoreObserver> &observer)
600 {
601     auto connection = connectionPool_->AcquireConnection(false);
602     if (connection == nullptr) {
603         return E_DATABASE_BUSY;
604     }
605     int32_t errCode = connection->Unsubscribe(option.event, observer);
606     if (errCode != E_OK) {
607         LOG_ERROR("unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}" PRId32,
608             SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
609     }
610     return errCode;
611 }
612 
UnSubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)613 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption& option, RdbStoreObserver *observer)
614 {
615     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
616     if (errCode != E_OK) {
617         return errCode;
618     }
619     return service->UnSubscribe(syncerParam_, option, observer);
620 }
621 
UnSubscribe(const SubscribeOption & option,RdbStoreObserver * observer)622 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer)
623 {
624     if (config_.GetDBType() == DB_VECTOR) {
625         return E_NOT_SUPPORT;
626     }
627     if (option.mode == SubscribeMode::LOCAL && observer) {
628         return UnSubscribeLocal(option, observer);
629     } else if (option.mode == SubscribeMode::LOCAL && !observer) {
630         return UnSubscribeLocalAll(option);
631     } else if (option.mode == SubscribeMode::LOCAL_SHARED && observer) {
632         return UnSubscribeLocalShared(option, observer);
633     } else if (option.mode == SubscribeMode::LOCAL_SHARED && !observer) {
634         return UnSubscribeLocalSharedAll(option);
635     }
636     return UnSubscribeRemote(option, observer);
637 }
638 
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)639 int RdbStoreImpl::SubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
640 {
641     if (config_.GetDBType() == DB_VECTOR) {
642         return E_NOT_SUPPORT;
643     }
644     return SubscribeLocalDetail(option, observer);
645 }
646 
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)647 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption& option, const std::shared_ptr<RdbStoreObserver> &observer)
648 {
649     if (config_.GetDBType() == DB_VECTOR) {
650         return E_NOT_SUPPORT;
651     }
652     return UnsubscribeLocalDetail(option, observer);
653 }
654 
Notify(const std::string & event)655 int RdbStoreImpl::Notify(const std::string &event)
656 {
657     if (config_.GetDBType() == DB_VECTOR) {
658         return E_NOT_SUPPORT;
659     }
660     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
661     if (client == nullptr) {
662         LOG_ERROR("Failed to get DataObsMgrClient.");
663         return E_GET_DATAOBSMGRCLIENT_FAIL;
664     }
665     int32_t err = client->NotifyChange(GetUri(event));
666     if (err != 0) {
667         LOG_ERROR("Notify failed.");
668     }
669 
670     std::lock_guard<std::mutex> lock(mutex_);
671     auto obs = localObservers_.find(event);
672     if (obs != localObservers_.end()) {
673         auto &list = obs->second;
674         for (auto &it : list) {
675             it->OnChange();
676         }
677     }
678     return E_OK;
679 }
680 
SetSearchable(bool isSearchable)681 int RdbStoreImpl::SetSearchable(bool isSearchable)
682 {
683     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
684     if (errCode != E_OK || service == nullptr) {
685         LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
686         return errCode;
687     }
688     return service->SetSearchable(syncerParam_, isSearchable);
689 }
690 
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)691 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
692 {
693     if (config_.GetDBType() == DB_VECTOR) {
694         return E_NOT_SUPPORT;
695     }
696     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
697     if (errCode != E_OK) {
698         return errCode;
699     }
700     return service->RegisterAutoSyncCallback(syncerParam_, observer);
701 }
702 
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)703 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
704 {
705     if (config_.GetDBType() == DB_VECTOR) {
706         return E_NOT_SUPPORT;
707     }
708     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
709     if (errCode != E_OK) {
710         return errCode;
711     }
712     return service->UnregisterAutoSyncCallback(syncerParam_, observer);
713 }
714 
InitDelayNotifier()715 void RdbStoreImpl::InitDelayNotifier()
716 {
717     if (delayNotifier_ != nullptr) {
718         return;
719     }
720     delayNotifier_ = std::make_shared<DelayNotify>();
721     if (delayNotifier_ == nullptr) {
722         LOG_ERROR("Init delay notifier failed.");
723         return;
724     }
725     delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
726     delayNotifier_->SetTask([param = syncerParam_]
727         (const DistributedRdb::RdbChangedData& rdbChangedData, const RdbNotifyConfig& rdbNotifyConfig) -> int {
728         auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
729         if (errCode == E_NOT_SUPPORT) {
730             return errCode;
731         }
732         if (errCode != E_OK || service == nullptr) {
733             LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
734             return errCode;
735         }
736         return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
737     });
738 }
739 
RegisterDataChangeCallback()740 int RdbStoreImpl::RegisterDataChangeCallback()
741 {
742     if (!config_.IsSearchable()) {
743         return E_OK;
744     }
745 
746     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
747         return E_NOT_SUPPORT;
748     }
749     InitDelayNotifier();
750     auto callBack = [delayNotifier = delayNotifier_](const std::set<std::string> &tables) {
751         DistributedRdb::RdbChangedData rdbChangedData;
752         for (const auto& table : tables) {
753             rdbChangedData.tableData[table].isTrackedDataChange = true;
754         }
755         if (delayNotifier != nullptr) {
756             delayNotifier->UpdateNotify(rdbChangedData);
757         }
758     };
759     auto connection = connectionPool_->AcquireConnection(false);
760     if (connection == nullptr) {
761         return E_DATABASE_BUSY;
762     }
763     return connection->SubscribeTableChanges(callBack);
764 }
765 
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)766 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
767 {
768     std::string table = predicates.GetTableName();
769     if (table.empty()) {
770         return E_EMPTY_TABLE_NAME;
771     }
772     auto logTable = GetLogTableName(table);
773     std::string sql;
774     sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
775     sql.append(" INNER JOIN ").append(table).append(" ON ");
776     sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
777     auto whereClause = predicates.GetWhereClause();
778     if (!whereClause.empty()) {
779         SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + ".");
780         sql.append(" WHERE ").append(whereClause);
781     }
782 
783     auto result = QuerySql(sql, predicates.GetBindArgs());
784     if (result == nullptr) {
785         return E_ERROR;
786     }
787     int count = 0;
788     if (result->GetRowCount(count) != E_OK) {
789         return E_NO_ROW_IN_QUERY;
790     }
791 
792     if (count <= 0) {
793         return E_NO_ROW_IN_QUERY;
794     }
795     while (result->GoToNextRow() == E_OK) {
796         std::vector<uint8_t> hashKey;
797         if (result->GetBlob(0, hashKey) != E_OK) {
798             return E_ERROR;
799         }
800         hashKeys.push_back(std::move(hashKey));
801     }
802     return E_OK;
803 }
804 
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)805 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
806 {
807     std::vector<std::vector<uint8_t>> hashKeys;
808     int ret = GetHashKeyForLockRow(predicates, hashKeys);
809     if (ret != E_OK) {
810         LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
811         return ret;
812     }
813     auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
814     if (statement == nullptr || err != E_OK) {
815         return err;
816     }
817     int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
818     if (errCode == E_WAIT_COMPENSATED_SYNC) {
819         LOG_DEBUG("Start compensation sync.");
820         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
821         auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
822         InnerSync(syncerParam_, option, memo, nullptr);
823         return E_OK;
824     }
825     if (errCode != E_OK) {
826         LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
827     }
828     return errCode;
829 }
830 
LockCloudContainer()831 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
832 {
833     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
834     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
835     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
836     if (errCode == E_NOT_SUPPORT) {
837         LOG_ERROR("not support");
838         return { errCode, 0 };
839     }
840     if (errCode != E_OK) {
841         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
842             syncerParam_.bundleName_.c_str());
843         return { errCode, 0 };
844     }
845     auto result = service->LockCloudContainer(syncerParam_);
846     if (result.first != E_OK) {
847         LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
848     }
849     return result;
850 }
851 
UnlockCloudContainer()852 int32_t RdbStoreImpl::UnlockCloudContainer()
853 {
854     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
855     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
856     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
857     if (errCode == E_NOT_SUPPORT) {
858         LOG_ERROR("not support");
859         return errCode;
860     }
861     if (errCode != E_OK) {
862         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
863             syncerParam_.bundleName_.c_str());
864         return errCode;
865     }
866     errCode = service->UnlockCloudContainer(syncerParam_);
867     if (errCode != E_OK) {
868         LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
869     }
870     return errCode;
871 }
872 #endif
873 
RdbStoreImpl(const RdbStoreConfig & config)874 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
875     : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
876       fileType_(config.GetDatabaseFileType())
877 {
878     path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
879     isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
880 }
881 
RdbStoreImpl(const RdbStoreConfig & config,int & errCode)882 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config, int &errCode)
883     : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
884       fileType_(config.GetDatabaseFileType())
885 {
886     isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
887     path_ = (config.GetRoleType() == VISITOR) ? config.GetVisitorDir() : config.GetPath();
888     bool created = access(path_.c_str(), F_OK) != 0;
889     connectionPool_ = ConnectionPool::Create(config_, errCode);
890     if (connectionPool_ == nullptr && errCode == E_SQLITE_CORRUPT && config.GetAllowRebuild() && !isReadOnly_) {
891         LOG_ERROR("database corrupt, rebuild database %{public}s", SqliteUtils::Anonymous(name_).c_str());
892 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
893         RdbParam param;
894         param.bundleName_ = config_.GetBundleName();
895         param.storeName_ = config_.GetName();
896         auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
897         if (service != nullptr) {
898             service->Disable(param);
899         }
900 #endif
901         config_.SetIter(0);
902         std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
903         created = true;
904 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
905         if (service != nullptr) {
906             service->Enable(param);
907         }
908 #endif
909     }
910     if (connectionPool_ == nullptr || errCode != E_OK) {
911         connectionPool_ = nullptr;
912         LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
913             SqliteUtils::Anonymous(path_).c_str());
914         return;
915     }
916     InitSyncerParam(config_, created);
917     InnerOpen();
918 }
919 
~RdbStoreImpl()920 RdbStoreImpl::~RdbStoreImpl()
921 {
922     connectionPool_ = nullptr;
923     trxConnMap_ = {};
924     for (auto &trans : transactions_) {
925         auto realTrans = trans.lock();
926         if (realTrans) {
927             (void)realTrans->Close();
928         }
929     }
930     transactions_ = {};
931 }
932 
GetConfig()933 const RdbStoreConfig &RdbStoreImpl::GetConfig()
934 {
935     return config_;
936 }
937 
Insert(const std::string & table,const Row & row,Resolution resolution)938 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
939 {
940     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
941     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
942         return { E_NOT_SUPPORT, -1 };
943     }
944     if (table.empty()) {
945         return { E_EMPTY_TABLE_NAME, -1 };
946     }
947 
948     if (row.IsEmpty()) {
949         return { E_EMPTY_VALUES_BUCKET, -1 };
950     }
951 
952     auto conflictClause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
953     if (conflictClause == nullptr) {
954         return { E_INVALID_CONFLICT_FLAG, -1 };
955     }
956     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
957     std::string sql;
958     sql.append("INSERT").append(conflictClause).append(" INTO ").append(table).append("(");
959     size_t bindArgsSize = row.values_.size();
960     std::vector<ValueObject> bindArgs;
961     bindArgs.reserve(bindArgsSize);
962     const char *split = "";
963     for (const auto &[key, val] : row.values_) {
964         sql.append(split).append(key);
965         if (val.GetType() == ValueObject::TYPE_ASSETS && resolution == ConflictResolution::ON_CONFLICT_REPLACE) {
966             return { E_INVALID_ARGS, -1 };
967         }
968         SqliteSqlBuilder::UpdateAssetStatus(val, AssetValue::STATUS_INSERT);
969         bindArgs.push_back(val); // columnValue
970         split = ",";
971     }
972 
973     sql.append(") VALUES (");
974     if (bindArgsSize > 0) {
975         sql.append(SqliteSqlBuilder::GetSqlArgs(bindArgsSize));
976     }
977 
978     sql.append(")");
979     int64_t rowid = -1;
980     auto errCode = ExecuteForLastInsertedRowId(rowid, sql, bindArgs);
981     if (errCode == E_OK) {
982         DoCloudSync(table);
983     }
984 
985     return { errCode, rowid };
986 }
987 
BatchInsert(const std::string & table,const ValuesBuckets & rows)988 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
989 {
990     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
991         return { E_NOT_SUPPORT, -1 };
992     }
993 
994     if (rows.RowSize() == 0) {
995         return { E_OK, 0 };
996     }
997 
998     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
999     auto connection = connectionPool_->AcquireConnection(false);
1000     if (connection == nullptr) {
1001         return { E_DATABASE_BUSY, -1 };
1002     }
1003 
1004     auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, connection->GetMaxVariable());
1005     if (executeSqlArgs.empty()) {
1006         LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(), rows.RowSize(),
1007             connection->GetMaxVariable());
1008         return { E_INVALID_ARGS, -1 };
1009     }
1010 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1011     PauseDelayNotify pauseDelayNotify(delayNotifier_);
1012 #endif
1013     for (const auto &[sql, bindArgs] : executeSqlArgs) {
1014         auto [errCode, statement] = GetStatement(sql, connection);
1015         if (statement == nullptr) {
1016             LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, sql:%{public}s",
1017                 errCode, bindArgs.size(), table.c_str(), sql.c_str());
1018             return { E_OK, -1 };
1019         }
1020         for (const auto &args : bindArgs) {
1021             auto errCode = statement->Execute(args);
1022             if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1023                 connectionPool_->Dump(true, "BATCH");
1024                 return { errCode, -1 };
1025             }
1026             if (errCode != E_OK) {
1027                 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,sql:%{public}s", errCode,
1028                     bindArgs.size(), table.c_str(), sql.c_str());
1029                 return { E_OK, -1 };
1030             }
1031         }
1032     }
1033     connection = nullptr;
1034     DoCloudSync(table);
1035     return { E_OK, int64_t(rows.RowSize()) };
1036 }
1037 
Update(const std::string & table,const Row & row,const std::string & where,const Values & args,Resolution resolution)1038 std::pair<int, int> RdbStoreImpl::Update(const std::string &table, const Row &row, const std::string &where,
1039     const Values &args, Resolution resolution)
1040 {
1041     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1042     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1043         return { E_NOT_SUPPORT, -1 };
1044     }
1045     if (table.empty()) {
1046         return { E_EMPTY_TABLE_NAME, -1 };
1047     }
1048 
1049     if (row.IsEmpty()) {
1050         return { E_EMPTY_VALUES_BUCKET, -1 };
1051     }
1052 
1053     auto clause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1054     if (clause == nullptr) {
1055         return { E_INVALID_CONFLICT_FLAG, -1 };
1056     }
1057     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1058     std::string sql;
1059     sql.append("UPDATE").append(clause).append(" ").append(table).append(" SET ");
1060     std::vector<ValueObject> tmpBindArgs;
1061     size_t tmpBindSize = row.values_.size() + args.size();
1062     tmpBindArgs.reserve(tmpBindSize);
1063     const char *split = "";
1064     for (auto &[key, val] : row.values_) {
1065         sql.append(split);
1066         if (val.GetType() == ValueObject::TYPE_ASSETS) {
1067             sql.append(key).append("=merge_assets(").append(key).append(", ?)"); // columnName
1068         } else if (val.GetType() == ValueObject::TYPE_ASSET) {
1069             sql.append(key).append("=merge_asset(").append(key).append(", ?)"); // columnName
1070         } else {
1071             sql.append(key).append("=?"); // columnName
1072         }
1073         tmpBindArgs.push_back(val); // columnValue
1074         split = ",";
1075     }
1076 
1077     if (!where.empty()) {
1078         sql.append(" WHERE ").append(where);
1079     }
1080 
1081     tmpBindArgs.insert(tmpBindArgs.end(), args.begin(), args.end());
1082 
1083     int64_t changes = 0;
1084     auto errCode = ExecuteForChangedRowCount(changes, sql, tmpBindArgs);
1085     if (errCode == E_OK) {
1086         DoCloudSync(table);
1087     }
1088     return { errCode, int32_t(changes) };
1089 }
1090 
Delete(int & deletedRows,const std::string & table,const std::string & whereClause,const Values & args)1091 int RdbStoreImpl::Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args)
1092 {
1093     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1094     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1095         return E_NOT_SUPPORT;
1096     }
1097     if (table.empty()) {
1098         return E_EMPTY_TABLE_NAME;
1099     }
1100 
1101     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1102     std::string sql;
1103     sql.append("DELETE FROM ").append(table);
1104     if (!whereClause.empty()) {
1105         sql.append(" WHERE ").append(whereClause);
1106     }
1107     int64_t changes = 0;
1108     auto errCode = ExecuteForChangedRowCount(changes, sql, args);
1109     if (errCode != E_OK) {
1110         return errCode;
1111     }
1112     deletedRows = changes;
1113     DoCloudSync(table);
1114     return E_OK;
1115 }
1116 
QuerySql(const std::string & sql,const Values & bindArgs)1117 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1118 {
1119     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1120     if (config_.GetDBType() == DB_VECTOR) {
1121         return nullptr;
1122     }
1123     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1124 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1125     auto start = std::chrono::steady_clock::now();
1126     return std::make_shared<SqliteSharedResultSet>(start, connectionPool_->AcquireRef(true), sql, bindArgs, path_);
1127 #else
1128     (void)sql;
1129     (void)bindArgs;
1130     return nullptr;
1131 #endif
1132 }
1133 
QueryByStep(const std::string & sql,const Values & args)1134 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args)
1135 {
1136     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1137     auto start = std::chrono::steady_clock::now();
1138     return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args);
1139 }
1140 
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1141 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1142 {
1143     if (config_.GetDBType() == DB_VECTOR) {
1144         return E_NOT_SUPPORT;
1145     }
1146     std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1147     return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1148 }
1149 
ExecuteSql(const std::string & sql,const Values & args)1150 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1151 {
1152     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1153     if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1154         return E_NOT_SUPPORT;
1155     }
1156     int ret = CheckAttach(sql);
1157     if (ret != E_OK) {
1158         return ret;
1159     }
1160     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1161     auto [errCode, statement] = BeginExecuteSql(sql);
1162     if (statement == nullptr) {
1163         return errCode;
1164     }
1165     errCode = statement->Execute(args);
1166     if (errCode != E_OK) {
1167         LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1168         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1169             connectionPool_->Dump(true, "EXECUTE");
1170         }
1171         return errCode;
1172     }
1173     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1174     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1175         statement->Reset();
1176         statement->Prepare("PRAGMA schema_version");
1177         auto [err, version] = statement->ExecuteForValue();
1178         statement = nullptr;
1179         if (vSchema_ < static_cast<int64_t>(version)) {
1180             LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1181                 SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1182             vSchema_ = version;
1183             errCode = connectionPool_->RestartReaders();
1184         }
1185     }
1186     statement = nullptr;
1187     if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1188         DoCloudSync("");
1189     }
1190     return errCode;
1191 }
1192 
Execute(const std::string & sql,const Values & args,int64_t trxId)1193 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1194 {
1195     ValueObject object;
1196     if (isReadOnly_) {
1197         return { E_NOT_SUPPORT, object };
1198     }
1199 
1200     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1201     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1202     if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1203         LOG_ERROR("Not support the sqlType: %{public}d, sql: %{public}s", sqlType, sql.c_str());
1204         return { E_NOT_SUPPORT_THE_SQL, object };
1205     }
1206 
1207     if (config_.IsVector() && trxId > 0) {
1208         return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1209     }
1210 
1211     auto connect = connectionPool_->AcquireConnection(false);
1212     if (connect == nullptr) {
1213         return { E_DATABASE_BUSY, object };
1214     }
1215 
1216     auto [errCode, statement] = GetStatement(sql, connect);
1217     if (errCode != E_OK) {
1218         return { errCode, object };
1219     }
1220 
1221     errCode = statement->Execute(args);
1222     if (errCode != E_OK) {
1223         LOG_ERROR("failed,error:0x%{public}x sql:%{public}s.", errCode, sql.c_str());
1224         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1225             connectionPool_->Dump(true, "EXECUTE");
1226         }
1227         return { errCode, object };
1228     }
1229 
1230     if (config_.IsVector()) {
1231         return { errCode, object };
1232     }
1233 
1234     return HandleDifferentSqlTypes(statement, sql, object, sqlType);
1235 }
1236 
HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,const std::string & sql,const ValueObject & object,int sqlType)1237 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,
1238     const std::string &sql, const ValueObject &object, int sqlType)
1239 {
1240     int32_t errCode = E_OK;
1241     if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1242         int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1243         return { errCode, ValueObject(outValue) };
1244     }
1245 
1246     if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1247         int outValue = statement->Changes();
1248         return { errCode, ValueObject(outValue) };
1249     }
1250 
1251     if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1252         if (statement->GetColumnCount() == 1) {
1253             return statement->GetColumn(0);
1254         }
1255 
1256         if (statement->GetColumnCount() > 1) {
1257             LOG_ERROR("Not support the sql:%{public}s, column count more than 1", sql.c_str());
1258             return { E_NOT_SUPPORT_THE_SQL, object };
1259         }
1260     }
1261 
1262     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1263         statement->Reset();
1264         statement->Prepare("PRAGMA schema_version");
1265         auto [err, version] = statement->ExecuteForValue();
1266         if (vSchema_ < static_cast<int64_t>(version)) {
1267             LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64 "> sql:%{public}s.",
1268                      SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version), sql.c_str());
1269             vSchema_ = version;
1270             errCode = connectionPool_->RestartReaders();
1271         }
1272     }
1273     return { errCode, object };
1274 }
1275 
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1276 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1277 {
1278     if (config_.GetDBType() == DB_VECTOR) {
1279         return E_NOT_SUPPORT;
1280     }
1281     auto [errCode, statement] = BeginExecuteSql(sql);
1282     if (statement == nullptr) {
1283         return errCode;
1284     }
1285     auto [err, object] = statement->ExecuteForValue(args);
1286     if (err != E_OK) {
1287         LOG_ERROR("failed, sql %{public}s,  ERROR is %{public}d.", sql.c_str(), err);
1288     }
1289     outValue = object;
1290     return err;
1291 }
1292 
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1293 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1294 {
1295     if (config_.GetDBType() == DB_VECTOR) {
1296         return E_NOT_SUPPORT;
1297     }
1298     auto [errCode, statement] = BeginExecuteSql(sql);
1299     if (statement == nullptr) {
1300         return errCode;
1301     }
1302     ValueObject object;
1303     std::tie(errCode, object) = statement->ExecuteForValue(args);
1304     if (errCode != E_OK) {
1305         LOG_ERROR("failed, sql %{public}s,  ERROR is %{public}d.", sql.c_str(), errCode);
1306     }
1307     outValue = static_cast<std::string>(object);
1308     return errCode;
1309 }
1310 
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1311 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1312 {
1313     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1314         return E_NOT_SUPPORT;
1315     }
1316     auto begin = std::chrono::steady_clock::now();
1317     auto [errCode, statement] = GetStatement(sql, false);
1318     if (statement == nullptr) {
1319         return errCode;
1320     }
1321     auto beginExec = std::chrono::steady_clock::now();
1322     errCode = statement->Execute(args);
1323     if (errCode != E_OK) {
1324         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1325             connectionPool_->Dump(true, "INSERT");
1326         }
1327         return errCode;
1328     }
1329     auto beginResult = std::chrono::steady_clock::now();
1330     outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1331     auto allEnd = std::chrono::steady_clock::now();
1332     int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(begin - allEnd).count();
1333     if (totalCostTime >= TIME_OUT) {
1334         int64_t prepareCost =
1335             std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1336         int64_t execCost =
1337             std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - beginResult).count();
1338         int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1339         LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1340                  "] result[%{public}" PRId64 "] "
1341                  "sql[%{public}s]",
1342             totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::Anonymous(sql).c_str());
1343     }
1344     return E_OK;
1345 }
1346 
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1347 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1348 {
1349     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1350         return E_NOT_SUPPORT;
1351     }
1352     auto [errCode, statement] = GetStatement(sql, false);
1353     if (statement == nullptr) {
1354         return errCode;
1355     }
1356     errCode = statement->Execute(args);
1357     if (errCode != E_OK) {
1358         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1359             connectionPool_->Dump(true, "UPG DEL");
1360         }
1361         return errCode;
1362     }
1363     outValue = statement->Changes();
1364     return E_OK;
1365 }
1366 
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1367 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1368 {
1369     if (databasePath.empty()) {
1370         return E_INVALID_FILE_PATH;
1371     }
1372 
1373     if (ISFILE(databasePath)) {
1374         backupFilePath = ExtractFilePath(path_) + databasePath;
1375     } else {
1376         // 2 represents two characters starting from the len - 2 position
1377         if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1378             databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1379             LOG_ERROR("Invalid databasePath.");
1380             return E_INVALID_FILE_PATH;
1381         }
1382         backupFilePath = databasePath;
1383     }
1384 
1385     if (backupFilePath == path_) {
1386         LOG_ERROR("The backupPath and path should not be same.");
1387         return E_INVALID_FILE_PATH;
1388     }
1389 
1390     LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1391     return E_OK;
1392 }
1393 
GetSlaveName(const std::string & path,std::string & backupFilePath)1394 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1395 {
1396     std::string suffix(".db");
1397     std::string slaveSuffix("_slave.db");
1398     auto pos = path.find(suffix);
1399     if (pos == std::string::npos) {
1400         backupFilePath = path + slaveSuffix;
1401     } else {
1402         backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1403     }
1404     return E_OK;
1405 }
1406 
1407 /**
1408  * Backup a database from a specified encrypted or unencrypted database file.
1409  */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey)1410 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey)
1411 {
1412     LOG_INFO("Backup db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
1413     if (isReadOnly_) {
1414         return E_NOT_SUPPORT;
1415     }
1416     std::string backupFilePath;
1417     if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1418         return InnerBackup(backupFilePath, encryptKey);
1419     }
1420 
1421     int ret = GetDataBasePath(databasePath, backupFilePath);
1422     if (ret != E_OK) {
1423         return ret;
1424     }
1425 
1426     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1427     keyFiles.Lock();
1428 
1429     auto deleteDirtyFiles = [&backupFilePath] {
1430         auto res = SqliteUtils::DeleteFile(backupFilePath);
1431         res = SqliteUtils::DeleteFile(backupFilePath + "-shm") && res;
1432         res = SqliteUtils::DeleteFile(backupFilePath + "-wal") && res;
1433         return res;
1434     };
1435 
1436     auto walFile = backupFilePath + "-wal";
1437     if (access(walFile.c_str(), F_OK) == E_OK) {
1438         if (!deleteDirtyFiles()) {
1439             keyFiles.Unlock();
1440             return E_ERROR;
1441         }
1442     }
1443     std::string tempPath = backupFilePath + ".tmp";
1444     if (access(tempPath.c_str(), F_OK) == E_OK) {
1445         SqliteUtils::DeleteFile(backupFilePath);
1446     } else {
1447         if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1448             LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1449                 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1450             keyFiles.Unlock();
1451             return E_ERROR;
1452         }
1453     }
1454     ret = InnerBackup(backupFilePath, encryptKey);
1455     if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1456         if (deleteDirtyFiles()) {
1457             SqliteUtils::RenameFile(tempPath, backupFilePath);
1458         }
1459     } else {
1460         SqliteUtils::DeleteFile(tempPath);
1461     }
1462     keyFiles.Unlock();
1463     return ret;
1464 }
1465 
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1466 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(const std::string &databasePath,
1467     const std::vector<uint8_t> &destEncryptKey)
1468 {
1469     std::vector<ValueObject> bindArgs;
1470     bindArgs.emplace_back(databasePath);
1471     if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1472         bindArgs.emplace_back(destEncryptKey);
1473     } else if (config_.IsEncrypt()) {
1474         std::vector<uint8_t> key = config_.GetEncryptKey();
1475         bindArgs.emplace_back(key);
1476         key.assign(key.size(), 0);
1477     } else {
1478         bindArgs.emplace_back("");
1479     }
1480     return bindArgs;
1481 }
1482 
1483 /**
1484  * Backup a database from a specified encrypted or unencrypted database file.
1485  */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1486 int RdbStoreImpl::InnerBackup(const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1487 {
1488     if (isReadOnly_) {
1489         return E_NOT_SUPPORT;
1490     }
1491 
1492     if (config_.GetDBType() == DB_VECTOR) {
1493         if (config_.IsEncrypt()) {
1494             return E_NOT_SUPPORT;
1495         }
1496 
1497         auto conn = connectionPool_->AcquireConnection(false);
1498         if (conn == nullptr) {
1499             return E_BASE;
1500         }
1501 
1502         return conn->Backup(databasePath, {}, false, slaveStatus_);
1503     }
1504 
1505     if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1506         auto conn = connectionPool_->AcquireConnection(false);
1507         return conn == nullptr ? E_BASE : conn->Backup(databasePath, {}, false, slaveStatus_);
1508     }
1509 
1510     auto [result, conn] = CreateWritableConn();
1511     if (result != E_OK) {
1512         return result;
1513     }
1514 
1515     if (config_.IsEncrypt()) {
1516         result = SetDefaultEncryptAlgo(conn, config_);
1517         if (result != E_OK) {
1518             return result;
1519         }
1520     }
1521 
1522     std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1523     auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1524     errCode = statement->Execute(bindArgs);
1525     if (errCode != E_OK) {
1526         return errCode;
1527     }
1528     errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1529     errCode = statement->Execute();
1530     if (errCode != E_OK) {
1531         return errCode;
1532     }
1533     errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1534     int ret = statement->Execute();
1535     errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1536     int res = statement->Execute();
1537     return (res == E_OK) ? ret : res;
1538 }
1539 
BeginExecuteSql(const std::string & sql)1540 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string& sql)
1541 {
1542     int type = SqliteUtils::GetSqlStatementType(sql);
1543     if (SqliteUtils::IsSpecial(type)) {
1544         return { E_NOT_SUPPORT, nullptr };
1545     }
1546 
1547     bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1548     auto conn = connectionPool_->AcquireConnection(assumeReadOnly);
1549     if (conn == nullptr) {
1550         return { E_DATABASE_BUSY, nullptr };
1551     }
1552 
1553     auto [errCode, statement] = conn->CreateStatement(sql, conn);
1554     if (statement == nullptr) {
1555         return { errCode, nullptr };
1556     }
1557 
1558     if (statement->ReadOnly() && conn->IsWriter()) {
1559         statement = nullptr;
1560         conn = nullptr;
1561         return GetStatement(sql, true);
1562     }
1563 
1564     return { errCode, statement };
1565 }
1566 
IsHoldingConnection()1567 bool RdbStoreImpl::IsHoldingConnection()
1568 {
1569     return connectionPool_ != nullptr;
1570 }
1571 
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1572 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1573 {
1574     if (conn == nullptr) {
1575         return E_DATABASE_BUSY;
1576     }
1577 
1578     auto [errCode, statement] = conn->CreateStatement(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO, conn);
1579     if (errCode != E_OK || statement == nullptr) {
1580         return errCode;
1581     }
1582 
1583     errCode = statement->Execute();
1584     if (errCode != E_OK) {
1585         LOG_ERROR("Execute failed: %{public}s, %{public}d",
1586             SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetIter());
1587     }
1588     return errCode;
1589 }
1590 
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1591 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1592     const std::vector<uint8_t> &key, int32_t waitTime)
1593 {
1594     auto [conn, readers] = connectionPool_->AcquireAll(waitTime);
1595     if (conn == nullptr) {
1596         return E_DATABASE_BUSY;
1597     }
1598 
1599     if (config_.GetStorageMode() != StorageMode::MODE_MEMORY &&
1600         conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
1601         // close first to prevent the connection from being put back.
1602         connectionPool_->CloseAllConnections();
1603         conn = nullptr;
1604         readers.clear();
1605         auto [err, newConn] = connectionPool_->DisableWal();
1606         if (err != E_OK) {
1607             return err;
1608         }
1609         conn = newConn;
1610     }
1611     std::vector<ValueObject> bindArgs;
1612     bindArgs.emplace_back(ValueObject(dbPath));
1613     bindArgs.emplace_back(ValueObject(attachName));
1614     if (!key.empty()) {
1615         auto ret = SetDefaultEncryptAlgo(conn, config);
1616         if (ret != E_OK) {
1617             return ret;
1618         }
1619         bindArgs.emplace_back(ValueObject(key));
1620         auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
1621         if (statement == nullptr || errCode != E_OK) {
1622             LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1623             return E_ERROR;
1624         }
1625         return statement->Execute(bindArgs);
1626     }
1627 
1628     auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_SQL, conn);
1629     if (statement == nullptr || errCode != E_OK) {
1630         LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1631         return errCode;
1632     }
1633     return statement->Execute(bindArgs);
1634 }
1635 
1636 /**
1637  * Attaches a database.
1638  */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)1639 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
1640     const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
1641 {
1642     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE) {
1643         return { E_NOT_SUPPORT, 0 };
1644     }
1645     std::string dbPath;
1646     int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
1647     if (err != E_OK || access(dbPath.c_str(), F_OK) != E_OK) {
1648         return { E_INVALID_FILE_PATH, 0 };
1649     }
1650 
1651     // encrypted databases are not supported to attach a non encrypted database.
1652     if (!config.IsEncrypt() && config_.IsEncrypt()) {
1653         return { E_NOT_SUPPORT, 0 };
1654     }
1655 
1656     if (attachedInfo_.Contains(attachName)) {
1657         return { E_ATTACHED_DATABASE_EXIST, 0 };
1658     }
1659 
1660     std::vector<uint8_t> key;
1661     config.Initialize();
1662     if (config.IsEncrypt()) {
1663         key = config.GetEncryptKey();
1664     }
1665     err = AttachInner(config, attachName, dbPath, key, waitTime);
1666     key.assign(key.size(), 0);
1667     if (err == E_SQLITE_ERROR) {
1668         // only when attachName is already in use, SQLITE-ERROR will be reported here.
1669         return { E_ATTACHED_DATABASE_EXIST, 0 };
1670     } else if (err != E_OK) {
1671         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
1672                   "[%{public}s]",
1673             err, SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str(),
1674             SqliteUtils::Anonymous(config.GetName()).c_str());
1675         return { err, 0 };
1676     }
1677     if (!attachedInfo_.Insert(attachName, dbPath)) {
1678         return { E_ATTACHED_DATABASE_EXIST, 0 };
1679     }
1680     return { E_OK, attachedInfo_.Size() };
1681 }
1682 
Detach(const std::string & attachName,int32_t waitTime)1683 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
1684 {
1685     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1686         return { E_NOT_SUPPORT, 0 };
1687     }
1688     if (!attachedInfo_.Contains(attachName)) {
1689         return { E_OK, attachedInfo_.Size() };
1690     }
1691 
1692     auto [connection, readers] = connectionPool_->AcquireAll(waitTime);
1693     if (connection == nullptr) {
1694         return { E_DATABASE_BUSY, 0 };
1695     }
1696     std::vector<ValueObject> bindArgs;
1697     bindArgs.push_back(ValueObject(attachName));
1698 
1699     auto [errCode, statement] = connection->CreateStatement(GlobalExpr::DETACH_SQL, connection);
1700     if (statement == nullptr || errCode != E_OK) {
1701         LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
1702         return { errCode, 0 };
1703     }
1704     errCode = statement->Execute(bindArgs);
1705     if (errCode != E_OK) {
1706         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
1707             SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str());
1708         return { errCode, 0 };
1709     }
1710 
1711     attachedInfo_.Erase(attachName);
1712     if (!attachedInfo_.Empty()) {
1713         return { E_OK, attachedInfo_.Size() };
1714     }
1715     statement = nullptr;
1716     // close first to prevent the connection from being put back.
1717     connectionPool_->CloseAllConnections();
1718     connection = nullptr;
1719     readers.clear();
1720     errCode = connectionPool_->EnableWal();
1721     return { errCode, 0 };
1722 }
1723 
1724 /**
1725  * Obtains the database version.
1726  */
GetVersion(int & version)1727 int RdbStoreImpl::GetVersion(int &version)
1728 {
1729     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
1730     if (statement == nullptr) {
1731         return errCode;
1732     }
1733     ValueObject value;
1734     std::tie(errCode, value) = statement->ExecuteForValue();
1735     auto val = std::get_if<int64_t>(&value.value);
1736     if (val != nullptr) {
1737         version = static_cast<int>(*val);
1738     }
1739     return errCode;
1740 }
1741 
1742 /**
1743  * Sets the version of a new database.
1744  */
SetVersion(int version)1745 int RdbStoreImpl::SetVersion(int version)
1746 {
1747     if (isReadOnly_) {
1748         return E_NOT_SUPPORT;
1749     }
1750     std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
1751     auto [errCode, statement] = GetStatement(sql);
1752     if (statement == nullptr) {
1753         return errCode;
1754     }
1755     return statement->Execute();
1756 }
1757 /**
1758  * Begins a transaction in EXCLUSIVE mode.
1759  */
BeginTransaction()1760 int RdbStoreImpl::BeginTransaction()
1761 {
1762     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1763     std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1764     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1765         return E_NOT_SUPPORT;
1766     }
1767     // size + 1 means the number of transactions in process
1768     size_t transactionId = connectionPool_->GetTransactionStack().size() + 1;
1769     BaseTransaction transaction(connectionPool_->GetTransactionStack().size());
1770     auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
1771     if (statement == nullptr) {
1772         return errCode;
1773     }
1774     errCode = statement->Execute();
1775     if (errCode != E_OK) {
1776         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1777             connectionPool_->Dump(true, "BEGIN");
1778         }
1779         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1780             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1781         return errCode;
1782     }
1783     connectionPool_->SetInTransaction(true);
1784     connectionPool_->GetTransactionStack().push(transaction);
1785     // 1 means the number of transactions in process
1786     if (transactionId > 1) {
1787         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1788             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1789     }
1790 
1791     return E_OK;
1792 }
1793 
BeginTrans()1794 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
1795 {
1796     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1797     if (!config_.IsVector() || isReadOnly_) {
1798         return {E_NOT_SUPPORT, 0};
1799     }
1800 
1801     int64_t tmpTrxId = 0;
1802     auto [errCode, connection] = connectionPool_->CreateTransConn(false);
1803     if (connection == nullptr) {
1804         LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
1805             SqliteUtils::Anonymous(name_).c_str(), errCode);
1806         return {errCode, 0};
1807     }
1808     tmpTrxId = newTrxId_.fetch_add(1);
1809     trxConnMap_.Insert(tmpTrxId, connection);
1810     errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
1811     if (errCode != E_OK) {
1812         trxConnMap_.Erase(tmpTrxId);
1813     }
1814     return {errCode, tmpTrxId};
1815 }
1816 
1817 /**
1818 * Begins a transaction in EXCLUSIVE mode.
1819 */
RollBack()1820 int RdbStoreImpl::RollBack()
1821 {
1822     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1823     std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1824     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1825         return E_NOT_SUPPORT;
1826     }
1827     size_t transactionId = connectionPool_->GetTransactionStack().size();
1828 
1829     if (connectionPool_->GetTransactionStack().empty()) {
1830         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
1831             SqliteUtils::Anonymous(name_).c_str());
1832         return E_NO_TRANSACTION_IN_SESSION;
1833     }
1834     BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1835     connectionPool_->GetTransactionStack().pop();
1836     if (transaction.GetType() != TransType::ROLLBACK_SELF && !connectionPool_->GetTransactionStack().empty()) {
1837         connectionPool_->GetTransactionStack().top().SetChildFailure(true);
1838     }
1839     auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
1840     if (statement == nullptr) {
1841         if (errCode == E_DATABASE_BUSY) {
1842             Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1843         }
1844         // size + 1 means the number of transactions in process
1845         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
1846             SqliteUtils::Anonymous(name_).c_str());
1847         return E_DATABASE_BUSY;
1848     }
1849     errCode = statement->Execute();
1850     if (errCode != E_OK) {
1851         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
1852             Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
1853         }
1854         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1855             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1856         return errCode;
1857     }
1858     if (connectionPool_->GetTransactionStack().empty()) {
1859         connectionPool_->SetInTransaction(false);
1860     }
1861     // 1 means the number of transactions in process
1862     if (transactionId > 1) {
1863         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1864             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1865     }
1866     return E_OK;
1867 }
1868 
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)1869 int RdbStoreImpl::ExecuteByTrxId(const std::string &sql, int64_t trxId, bool closeConnAfterExecute,
1870     const std::vector<ValueObject> &bindArgs)
1871 {
1872     if ((!config_.IsVector()) || isReadOnly_) {
1873         return E_NOT_SUPPORT;
1874     }
1875     if (trxId == 0) {
1876         return E_INVALID_ARGS;
1877     }
1878 
1879     if (!trxConnMap_.Contains(trxId)) {
1880         LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
1881         return E_INVALID_ARGS;
1882     }
1883     auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
1884     auto result = trxConnMap_.Find(trxId);
1885     auto connection = result.second;
1886     if (connection == nullptr) {
1887         LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
1888             SqliteUtils::Anonymous(name_).c_str(), time);
1889         return E_ERROR;
1890     }
1891     auto [ret, statement] = GetStatement(sql, connection);
1892     if (ret != E_OK) {
1893         return ret;
1894     }
1895     ret = statement->Execute(bindArgs);
1896     if (ret != E_OK) {
1897         LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
1898             SqliteUtils::Anonymous(name_).c_str(), ret);
1899         trxConnMap_.Erase(trxId);
1900         return ret;
1901     }
1902     if (closeConnAfterExecute) {
1903         trxConnMap_.Erase(trxId);
1904     }
1905     return E_OK;
1906 }
1907 
RollBack(int64_t trxId)1908 int RdbStoreImpl::RollBack(int64_t trxId)
1909 {
1910     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1911     return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
1912 }
1913 
1914 /**
1915 * Begins a transaction in EXCLUSIVE mode.
1916 */
Commit()1917 int RdbStoreImpl::Commit()
1918 {
1919     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1920     std::lock_guard<std::mutex> lockGuard(connectionPool_->GetTransactionStackMutex());
1921     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1922         return E_NOT_SUPPORT;
1923     }
1924     size_t transactionId = connectionPool_->GetTransactionStack().size();
1925 
1926     if (connectionPool_->GetTransactionStack().empty()) {
1927         return E_OK;
1928     }
1929     BaseTransaction transaction = connectionPool_->GetTransactionStack().top();
1930     std::string sqlStr = transaction.GetCommitStr();
1931     if (sqlStr.size() <= 1) {
1932         LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s",
1933             transactionId, SqliteUtils::Anonymous(name_).c_str(), sqlStr.c_str());
1934         connectionPool_->GetTransactionStack().pop();
1935         return E_OK;
1936     }
1937     auto [errCode, statement] = GetStatement(sqlStr);
1938     if (statement == nullptr) {
1939         if (errCode == E_DATABASE_BUSY || errCode == E_SQLITE_BUSY || E_SQLITE_LOCKED) {
1940             Reportor::Report(Reportor::Create(config_, E_DATABASE_BUSY, "ErrorType: Busy"));
1941         }
1942         LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
1943             SqliteUtils::Anonymous(name_).c_str());
1944         return E_DATABASE_BUSY;
1945     }
1946     errCode = statement->Execute();
1947     if (errCode != E_OK) {
1948         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
1949             Reportor::Report(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
1950         }
1951         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d",
1952             transactionId, SqliteUtils::Anonymous(name_).c_str(), errCode);
1953         return errCode;
1954     }
1955     connectionPool_->SetInTransaction(false);
1956     // 1 means the number of transactions in process
1957     if (transactionId > 1) {
1958         LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
1959             SqliteUtils::Anonymous(name_).c_str(), errCode);
1960     }
1961     connectionPool_->GetTransactionStack().pop();
1962     return E_OK;
1963 }
1964 
Commit(int64_t trxId)1965 int RdbStoreImpl::Commit(int64_t trxId)
1966 {
1967     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1968     return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
1969 }
1970 
IsInTransaction()1971 bool RdbStoreImpl::IsInTransaction()
1972 {
1973     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1974         return false;
1975     }
1976     return connectionPool_->IsInTransaction();
1977 }
1978 
CheckAttach(const std::string & sql)1979 int RdbStoreImpl::CheckAttach(const std::string &sql)
1980 {
1981     size_t index = sql.find_first_not_of(' ');
1982     if (index == std::string::npos) {
1983         return E_OK;
1984     }
1985 
1986     /* The first 3 characters can determine the type */
1987     std::string sqlType = sql.substr(index, 3);
1988     sqlType = SqliteUtils::StrToUpper(sqlType);
1989     if (sqlType != "ATT") {
1990         return E_OK;
1991     }
1992 
1993     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
1994     if (statement == nullptr) {
1995         return errCode;
1996     }
1997 
1998     errCode = statement->Execute();
1999     if (errCode != E_OK) {
2000         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2001         return errCode;
2002     }
2003     auto [errorCode, valueObject] = statement->GetColumn(0);
2004     if (errorCode != E_OK) {
2005         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2006         return errorCode;
2007     }
2008     auto journal = std::get_if<std::string>(&valueObject.value);
2009     auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2010     if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2011         LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2012         return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2013     }
2014 
2015     return E_OK;
2016 }
2017 
IsOpen() const2018 bool RdbStoreImpl::IsOpen() const
2019 {
2020     return isOpen_;
2021 }
2022 
GetPath()2023 std::string RdbStoreImpl::GetPath()
2024 {
2025     return path_;
2026 }
2027 
IsReadOnly() const2028 bool RdbStoreImpl::IsReadOnly() const
2029 {
2030     return isReadOnly_;
2031 }
2032 
IsMemoryRdb() const2033 bool RdbStoreImpl::IsMemoryRdb() const
2034 {
2035     return isMemoryRdb_;
2036 }
2037 
GetName()2038 std::string RdbStoreImpl::GetName()
2039 {
2040     return name_;
2041 }
2042 
DoCloudSync(const std::string & table)2043 void RdbStoreImpl::DoCloudSync(const std::string &table)
2044 {
2045 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2046     auto needSync = cloudInfo_->Change(table);
2047     if (!needSync) {
2048         return;
2049     }
2050     auto pool = TaskExecutor::GetInstance().GetExecutor();
2051     if (pool == nullptr) {
2052         return;
2053     }
2054     auto interval =
2055         std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2056     pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2057         auto changeInfo = cloudInfo.lock();
2058         if (changeInfo == nullptr) {
2059             return ;
2060         }
2061         auto tables = changeInfo->Steal();
2062         if (tables.empty()) {
2063             return;
2064         }
2065         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2066         auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2067         InnerSync(param, option, memo, nullptr);
2068     });
2069 #endif
2070 }
GetFileType()2071 std::string RdbStoreImpl::GetFileType()
2072 {
2073     return fileType_;
2074 }
2075 
2076 /**
2077  * Sets the database locale.
2078  */
ConfigLocale(const std::string & localeStr)2079 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2080 {
2081     if (!isOpen_) {
2082         LOG_ERROR("The connection pool has been closed.");
2083         return E_ERROR;
2084     }
2085 
2086     if (connectionPool_ == nullptr) {
2087         LOG_ERROR("connectionPool_ is null.");
2088         return E_ERROR;
2089     }
2090     return connectionPool_->ConfigLocale(localeStr);
2091 }
2092 
GetDestPath(const std::string & backupPath,std::string & destPath)2093 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2094 {
2095     int ret = GetDataBasePath(backupPath, destPath);
2096     if (ret != E_OK) {
2097         return ret;
2098     }
2099     std::string tempPath = destPath + ".tmp";
2100     if (access(tempPath.c_str(), F_OK) == E_OK) {
2101         destPath = tempPath;
2102     } else {
2103         auto walFile = destPath + "-wal";
2104         if (access(walFile.c_str(), F_OK) == E_OK) {
2105             return E_ERROR;
2106         }
2107     }
2108 
2109     if (access(destPath.c_str(), F_OK) != E_OK) {
2110         LOG_ERROR("The backupFilePath does not exists.");
2111         return E_INVALID_FILE_PATH;
2112     }
2113     return E_OK;
2114 }
2115 
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2116 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2117 {
2118     LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2119     if (isReadOnly_) {
2120         return E_NOT_SUPPORT;
2121     }
2122 
2123     if (!isOpen_ || connectionPool_ == nullptr) {
2124         LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, connectionPool_ == nullptr);
2125         return E_ERROR;
2126     }
2127 
2128     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2129     keyFiles.Lock();
2130     std::string destPath;
2131     bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2132     if (!isOK) {
2133         int ret = GetDestPath(backupPath, destPath);
2134         if (ret != E_OK) {
2135             keyFiles.Unlock();
2136             return ret;
2137         }
2138     }
2139 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2140     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2141     if (service != nullptr) {
2142         service->Disable(syncerParam_);
2143     }
2144 #endif
2145     bool corrupt = Reportor::IsReportCorruptedFault(path_);
2146     int errCode = connectionPool_->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2147     keyFiles.Unlock();
2148 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2149     SecurityPolicy::SetSecurityLabel(config_);
2150     if (service != nullptr) {
2151         service->Enable(syncerParam_);
2152         if (errCode == E_OK) {
2153             auto syncerParam = syncerParam_;
2154             syncerParam.infos_ = Connection::Collect(config_);
2155             service->AfterOpen(syncerParam);
2156             NotifyDataChange();
2157         }
2158     }
2159 #endif
2160     if (errCode == E_OK) {
2161         Reportor::ReportRestore(Reportor::Create(config_, E_OK), corrupt);
2162         rebuild_ = RebuiltType::NONE;
2163     }
2164     DoCloudSync("");
2165     return errCode;
2166 }
2167 
CreateWritableConn()2168 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn()
2169 {
2170     auto config  = config_;
2171     config.SetHaMode(HAMode::SINGLE);
2172     auto [result, conn] = Connection::Create(config, true);
2173     if (result != E_OK || conn == nullptr) {
2174         LOG_ERROR("create connection failed, err:%{public}d", result);
2175         return { result, nullptr };
2176     }
2177     return { E_OK, conn };
2178 }
2179 
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2180 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2181     const std::string &sql, std::shared_ptr<Connection> conn) const
2182 {
2183     if (conn == nullptr) {
2184         return { E_DATABASE_BUSY, nullptr };
2185     }
2186     return conn->CreateStatement(sql, conn);
2187 }
2188 
GetStatement(const std::string & sql,bool read) const2189 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string& sql, bool read) const
2190 {
2191     auto conn = connectionPool_->AcquireConnection(read);
2192     if (conn == nullptr) {
2193         return { E_DATABASE_BUSY, nullptr };
2194     }
2195     return conn->CreateStatement(sql, conn);
2196 }
2197 
GetRebuilt(RebuiltType & rebuilt)2198 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2199 {
2200     rebuilt = static_cast<RebuiltType>(rebuild_);
2201     return E_OK;
2202 }
2203 
InterruptBackup()2204 int RdbStoreImpl::InterruptBackup()
2205 {
2206     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2207         return E_NOT_SUPPORT;
2208     }
2209     if (slaveStatus_ == SlaveStatus::BACKING_UP) {
2210         slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2211         return E_OK;
2212     }
2213     return E_INVALID_INTERRUPT;
2214 }
2215 
GetBackupStatus() const2216 int32_t RdbStoreImpl::GetBackupStatus() const
2217 {
2218     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2219         return SlaveStatus::UNDEFINED;
2220     }
2221     return slaveStatus_;
2222 }
2223 
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2224 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2225 {
2226     if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2227         return false;
2228     }
2229     int ret = GetSlaveName(config_.GetPath(), destPath);
2230     if (ret != E_OK) {
2231         destPath = {};
2232         return false;
2233     }
2234     if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2235         LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2236         return false;
2237     }
2238     return true;
2239 }
2240 
IsSlaveDiffFromMaster() const2241 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2242 {
2243     std::string failureFlagFile = config_.GetPath() + "-slaveFailure";
2244     std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2245     return access(failureFlagFile.c_str(), F_OK) == 0 || access(slaveDbPath.c_str(), F_OK) != 0;
2246 }
2247 
ExchangeSlaverToMaster()2248 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2249 {
2250     if (isReadOnly_) {
2251         return E_OK;
2252     }
2253     auto conn = connectionPool_->AcquireConnection(false);
2254     if (conn == nullptr) {
2255         return E_DATABASE_BUSY;
2256     }
2257     auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2258     if (strategy != ExchangeStrategy::NOT_HANDLE) {
2259         LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2260     }
2261     int ret = E_OK;
2262     if (strategy == ExchangeStrategy::RESTORE) {
2263         conn = nullptr;
2264         // disable is required before restore
2265         ret = Restore({}, {});
2266     } else if (strategy == ExchangeStrategy::BACKUP) {
2267         // async backup
2268         ret = conn->Backup({}, {}, true, slaveStatus_);
2269     }
2270     return ret;
2271 }
2272 
GetDbType() const2273 int32_t RdbStoreImpl::GetDbType() const
2274 {
2275     return config_.GetDBType();
2276 }
2277 
CreateTransaction(int32_t type)2278 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2279 {
2280     if (isReadOnly_) {
2281         return { E_NOT_SUPPORT, nullptr};
2282     }
2283 
2284     auto [errCode, conn] = connectionPool_->CreateTransConn();
2285     if (conn == nullptr) {
2286         return { errCode, nullptr };
2287     }
2288     std::shared_ptr<Transaction> trans;
2289     std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetName());
2290     if (trans == nullptr) {
2291         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2292             connectionPool_->Dump(true, "TRANS");
2293         }
2294         return { errCode, nullptr };
2295     }
2296 
2297     std::lock_guard<decltype(mutex_)> guard(mutex_);
2298     for (auto it = transactions_.begin(); it != transactions_.end();) {
2299         if (it->expired()) {
2300             it = transactions_.erase(it);
2301         } else {
2302             it++;
2303         }
2304     }
2305     transactions_.push_back(trans);
2306     return { errCode, trans };
2307 }
2308 
AddTables(const std::vector<std::string> & tables)2309 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2310 {
2311     std::lock_guard<std::mutex> lock(mutex_);
2312     for (auto &table : tables) {
2313         tables_.insert(table);
2314     }
2315     return E_OK;
2316 }
2317 
RmvTables(const std::vector<std::string> & tables)2318 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2319 {
2320     std::lock_guard<std::mutex> lock(mutex_);
2321     for (auto &table : tables) {
2322         tables_.erase(table);
2323     }
2324     return E_OK;
2325 }
2326 
Change(const std::string & table)2327 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2328 {
2329     bool needSync = false;
2330     {
2331         std::lock_guard<std::mutex> lock(mutex_);
2332         if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2333             return needSync;
2334         }
2335         // from empty, then need schedule the cloud sync, others only wait the schedule execute.
2336         needSync = changes_.empty();
2337         if (!table.empty()) {
2338             changes_.insert(table);
2339         } else {
2340             changes_.insert(tables_.begin(), tables_.end());
2341         }
2342     }
2343     return needSync;
2344 }
2345 
Steal()2346 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2347 {
2348     std::set<std::string> result;
2349     {
2350         std::lock_guard<std::mutex> lock(mutex_);
2351         result = std::move(changes_);
2352     }
2353     return result;
2354 }
2355 } // namespace OHOS::NativeRdb