• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "RdbStoreImpl"
16 #include "rdb_store_impl.h"
17 
18 #include <unistd.h>
19 
20 #include <algorithm>
21 #include <chrono>
22 #include <cinttypes>
23 #include <cstdint>
24 #include <fstream>
25 #include <memory>
26 #include <mutex>
27 #include <sstream>
28 #include <sys/stat.h>
29 #include <string>
30 
31 #include "cache_result_set.h"
32 #include "connection_pool.h"
33 #include "delay_notify.h"
34 #include "directory_ex.h"
35 #include "logger.h"
36 #include "rdb_common.h"
37 #include "rdb_errno.h"
38 #include "rdb_fault_hiview_reporter.h"
39 #include "rdb_local_db_observer.h"
40 #include "rdb_radar_reporter.h"
41 #include "rdb_stat_reporter.h"
42 #include "rdb_security_manager.h"
43 #include "rdb_sql_statistic.h"
44 #include "rdb_store.h"
45 #include "rdb_trace.h"
46 #include "rdb_types.h"
47 #include "relational_store_client.h"
48 #include "sqlite_global_config.h"
49 #include "sqlite_sql_builder.h"
50 #include "sqlite_statement.h"
51 #include "sqlite_utils.h"
52 #include "step_result_set.h"
53 #include "task_executor.h"
54 #include "traits.h"
55 #include "transaction.h"
56 #include "values_buckets.h"
57 #if !defined(CROSS_PLATFORM)
58 #include "raw_data_parser.h"
59 #include "rdb_device_manager_adapter.h"
60 #include "rdb_manager_impl.h"
61 #include "relational_store_manager.h"
62 #include "runtime_config.h"
63 #include "security_policy.h"
64 #include "sqlite_shared_result_set.h"
65 #endif
66 
67 #ifdef WINDOWS_PLATFORM
68 #define ISFILE(filePath) ((filePath.find("\\") == std::string::npos))
69 #else
70 #define ISFILE(filePath) ((filePath.find("/") == std::string::npos))
71 #endif
72 #include "rdb_time_utils.h"
73 
74 namespace OHOS::NativeRdb {
75 using namespace OHOS::Rdb;
76 using namespace std::chrono;
77 using SqlStatistic = DistributedRdb::SqlStatistic;
78 using RdbNotifyConfig = DistributedRdb::RdbNotifyConfig;
79 using Reportor = RdbFaultHiViewReporter;
80 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
81 using RdbMgr = DistributedRdb::RdbManagerImpl;
82 #endif
83 
84 static constexpr const char *BEGIN_TRANSACTION_SQL = "begin;";
85 static constexpr const char *COMMIT_TRANSACTION_SQL = "commit;";
86 static constexpr const char *ROLLBACK_TRANSACTION_SQL = "rollback;";
87 static constexpr const char *BACKUP_RESTORE = "backup.restore";
88 constexpr int64_t TIME_OUT = 1500;
89 
InitSyncerParam(const RdbStoreConfig & config,bool created)90 void RdbStoreImpl::InitSyncerParam(const RdbStoreConfig &config, bool created)
91 {
92     syncerParam_.bundleName_ = config.GetBundleName();
93     syncerParam_.hapName_ = config.GetModuleName();
94     syncerParam_.storeName_ = config.GetName();
95     syncerParam_.customDir_ = config.GetCustomDir();
96     syncerParam_.area_ = config.GetArea();
97     syncerParam_.level_ = static_cast<int32_t>(config.GetSecurityLevel());
98     syncerParam_.type_ = config.GetDistributedType();
99     syncerParam_.isEncrypt_ = config.IsEncrypt();
100     syncerParam_.isAutoClean_ = config.GetAutoClean();
101     syncerParam_.isSearchable_ = config.IsSearchable();
102     syncerParam_.password_ = config.GetEncryptKey();
103     syncerParam_.haMode_ = config.GetHaMode();
104     syncerParam_.roleType_ = config.GetRoleType();
105     syncerParam_.tokenIds_ = config.GetPromiseInfo().tokenIds_;
106     syncerParam_.uids_ = config.GetPromiseInfo().uids_;
107     syncerParam_.user_ = config.GetPromiseInfo().user_;
108     syncerParam_.permissionNames_ = config.GetPromiseInfo().permissionNames_;
109     syncerParam_.subUser_ = config.GetSubUser();
110     syncerParam_.dfxInfo_.lastOpenTime_ = RdbTimeUtils::GetCurSysTimeWithMs();
111     if (created) {
112         syncerParam_.infos_ = Connection::Collect(config);
113     }
114 }
115 
InnerOpen()116 int RdbStoreImpl::InnerOpen()
117 {
118     isOpen_ = true;
119 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
120     if (isReadOnly_ || isMemoryRdb_) {
121         return E_OK;
122     }
123 
124     AfterOpen(syncerParam_);
125     if (config_.GetDBType() == DB_VECTOR || !config_.IsSearchable()) {
126         return E_OK;
127     }
128     int errCode = RegisterDataChangeCallback();
129     if (errCode != E_OK) {
130         LOG_ERROR("RegisterCallBackObserver is failed, err is %{public}d.", errCode);
131     }
132 #endif
133     return E_OK;
134 }
135 
InitReportFunc(const RdbParam & param)136 void RdbStoreImpl::InitReportFunc(const RdbParam &param)
137 {
138 #if !defined(CROSS_PLATFORM)
139     reportFunc_ = std::make_shared<ReportFunc>([reportParam = param](const DistributedRdb::RdbStatEvent &event) {
140         auto [err, service] = RdbMgr::GetInstance().GetRdbService(reportParam);
141         if (err != E_OK || service == nullptr) {
142             LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
143                 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
144             return;
145         }
146         err = service->ReportStatistic(reportParam, event);
147         if (err != E_OK) {
148             LOG_ERROR("ReportStatistic failed, err: %{public}d, storeName: %{public}s.", err,
149                 SqliteUtils::Anonymous(reportParam.storeName_).c_str());
150         }
151         return;
152     });
153 #endif
154 }
155 
Close()156 void RdbStoreImpl::Close()
157 {
158     {
159         std::unique_lock<decltype(poolMutex_)> lock(poolMutex_);
160         if (connectionPool_) {
161             connectionPool_->CloseAllConnections();
162             connectionPool_.reset();
163         }
164     }
165     {
166         std::lock_guard<decltype(mutex_)> guard(mutex_);
167         for (auto &it : transactions_) {
168             auto trans = it.lock();
169             if (trans != nullptr) {
170                 trans->Close();
171             }
172         }
173         transactions_ = {};
174     }
175 }
176 
GetPool() const177 std::shared_ptr<ConnectionPool> RdbStoreImpl::GetPool() const
178 {
179     std::shared_lock<decltype(poolMutex_)> lock(poolMutex_);
180     return connectionPool_;
181 }
182 
GetConn(bool isRead)183 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::GetConn(bool isRead)
184 {
185     auto pool = GetPool();
186     if (pool == nullptr) {
187         return { E_ALREADY_CLOSED, nullptr };
188     }
189 
190     auto connection = pool->AcquireConnection(isRead);
191     if (connection == nullptr) {
192         return { E_DATABASE_BUSY, nullptr };
193     }
194     return { E_OK, connection };
195 }
196 
197 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
AfterOpen(const RdbParam & param,int32_t retry)198 void RdbStoreImpl::AfterOpen(const RdbParam &param, int32_t retry)
199 {
200     auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
201     if (err == E_NOT_SUPPORT) {
202         return;
203     }
204     if (err != E_OK || service == nullptr) {
205         LOG_ERROR("GetRdbService failed, err: %{public}d, storeName: %{public}s.", err,
206             SqliteUtils::Anonymous(param.storeName_).c_str());
207         auto pool = TaskExecutor::GetInstance().GetExecutor();
208         if (err == E_SERVICE_NOT_FOUND && pool != nullptr && retry++ < MAX_RETRY_TIMES) {
209             pool->Schedule(std::chrono::seconds(RETRY_INTERVAL), [param, retry]() {
210                 AfterOpen(param, retry);
211             });
212         }
213         return;
214     }
215     err = service->AfterOpen(param);
216     if (err != E_OK) {
217         LOG_ERROR("AfterOpen failed, err: %{public}d, storeName: %{public}s.", err,
218             SqliteUtils::Anonymous(param.storeName_).c_str());
219     }
220 }
221 
GetModifyTime(const std::string & table,const std::string & columnName,std::vector<PRIKey> & keys)222 RdbStore::ModifyTime RdbStoreImpl::GetModifyTime(
223     const std::string &table, const std::string &columnName, std::vector<PRIKey> &keys)
224 {
225     if (table.empty() || columnName.empty() || keys.empty()) {
226         LOG_ERROR("Invalid para.");
227         return {};
228     }
229 
230     auto logTable = DistributedDB::RelationalStoreManager::GetDistributedLogTableName(table);
231     if (SqliteUtils::StrToUpper(columnName) == ROW_ID) {
232         return GetModifyTimeByRowId(logTable, keys);
233     }
234     std::vector<ValueObject> hashKeys;
235     hashKeys.reserve(keys.size());
236     std::map<std::vector<uint8_t>, PRIKey> keyMap;
237     std::map<std::string, DistributedDB::Type> tmp;
238     for (const auto &key : keys) {
239         DistributedDB::Type value;
240         RawDataParser::Convert(key, value);
241         tmp[columnName] = value;
242         auto hashKey = DistributedDB::RelationalStoreManager::CalcPrimaryKeyHash(tmp);
243         if (hashKey.empty()) {
244             LOG_DEBUG("Hash key fail.");
245             continue;
246         }
247         hashKeys.emplace_back(ValueObject(hashKey));
248         keyMap[hashKey] = key;
249     }
250 
251     std::string sql;
252     sql.append("select hash_key as key, timestamp/10000 as modify_time from ");
253     sql.append(logTable);
254     sql.append(" where hash_key in (");
255     sql.append(SqliteSqlBuilder::GetSqlArgs(hashKeys.size()));
256     sql.append(")");
257     auto resultSet = QueryByStep(sql, hashKeys, true);
258     int count = 0;
259     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
260         LOG_ERROR("Get resultSet err.");
261         return {};
262     }
263     return { resultSet, keyMap, false };
264 }
265 
GetModifyTimeByRowId(const std::string & logTable,std::vector<PRIKey> & keys)266 RdbStore::ModifyTime RdbStoreImpl::GetModifyTimeByRowId(const std::string &logTable, std::vector<PRIKey> &keys)
267 {
268     std::string sql;
269     sql.append("select data_key as key, timestamp/10000 as modify_time from ");
270     sql.append(logTable);
271     sql.append(" where data_key in (");
272     sql.append(SqliteSqlBuilder::GetSqlArgs(keys.size()));
273     sql.append(")");
274     std::vector<ValueObject> args;
275     args.reserve(keys.size());
276     for (auto &key : keys) {
277         ValueObject::Type value;
278         RawDataParser::Convert(key, value);
279         args.emplace_back(ValueObject(value));
280     }
281     auto resultSet = QueryByStep(sql, args, true);
282     int count = 0;
283     if (resultSet == nullptr || resultSet->GetRowCount(count) != E_OK || count <= 0) {
284         LOG_ERROR("Get resultSet err.");
285         return {};
286     }
287     return ModifyTime(resultSet, {}, true);
288 }
289 
CleanDirtyData(const std::string & table,uint64_t cursor)290 int RdbStoreImpl::CleanDirtyData(const std::string &table, uint64_t cursor)
291 {
292     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || isMemoryRdb_) {
293         LOG_ERROR("Not support. table:%{public}s, isRead:%{public}d, dbType:%{public}d, isMemoryRdb:%{public}d.",
294             SqliteUtils::Anonymous(table).c_str(), isReadOnly_, config_.GetDBType(), isMemoryRdb_);
295         return E_NOT_SUPPORT;
296     }
297     auto [errCode, conn] = GetConn(false);
298     if (errCode != E_OK) {
299         LOG_ERROR("The database is busy or closed.");
300         return errCode;
301     }
302     errCode = conn->CleanDirtyData(table, cursor);
303     return errCode;
304 }
305 
GetLogTableName(const std::string & tableName)306 std::string RdbStoreImpl::GetLogTableName(const std::string &tableName)
307 {
308     return DistributedDB::RelationalStoreManager::GetDistributedLogTableName(tableName);
309 }
310 
QuerySharingResource(const AbsRdbPredicates & predicates,const Fields & columns)311 std::pair<int32_t, std::shared_ptr<ResultSet>> RdbStoreImpl::QuerySharingResource(
312     const AbsRdbPredicates &predicates, const Fields &columns)
313 {
314     if (config_.GetDBType() == DB_VECTOR) {
315         return { E_NOT_SUPPORT, nullptr };
316     }
317     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
318     if (errCode != E_OK) {
319         return { errCode, nullptr };
320     }
321     auto [status, resultSet] =
322         service->QuerySharingResource(syncerParam_, predicates.GetDistributedPredicates(), columns);
323     if (status != E_OK) {
324         return { status, nullptr };
325     }
326     return { status, resultSet };
327 }
328 
RemoteQuery(const std::string & device,const AbsRdbPredicates & predicates,const Fields & columns,int & errCode)329 std::shared_ptr<ResultSet> RdbStoreImpl::RemoteQuery(
330     const std::string &device, const AbsRdbPredicates &predicates, const Fields &columns, int &errCode)
331 {
332     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
333     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
334         return nullptr;
335     }
336     std::vector<std::string> selectionArgs = predicates.GetWhereArgs();
337     std::string sql = SqliteSqlBuilder::BuildQueryString(predicates, columns);
338     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
339     if (err == E_NOT_SUPPORT) {
340         errCode = err;
341         return nullptr;
342     }
343     if (err != E_OK) {
344         LOG_ERROR("RdbStoreImpl::RemoteQuery get service failed.");
345         errCode = err;
346         return nullptr;
347     }
348     auto [status, resultSet] = service->RemoteQuery(syncerParam_, device, sql, selectionArgs);
349     errCode = status;
350     return resultSet;
351 }
352 
NotifyDataChange()353 void RdbStoreImpl::NotifyDataChange()
354 {
355     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
356         return;
357     }
358     config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, false);
359     int errCode = RegisterDataChangeCallback();
360     if (errCode != E_OK) {
361         LOG_ERROR("RegisterDataChangeCallback is failed, err is %{public}d.", errCode);
362     }
363     DistributedRdb::RdbChangedData rdbChangedData;
364     if (delayNotifier_ != nullptr) {
365         delayNotifier_->UpdateNotify(rdbChangedData, true);
366     }
367 }
368 
SetDistributedTables(const std::vector<std::string> & tables,int32_t type,const DistributedRdb::DistributedConfig & distributedConfig)369 int RdbStoreImpl::SetDistributedTables(
370     const std::vector<std::string> &tables, int32_t type, const DistributedRdb::DistributedConfig &distributedConfig)
371 {
372     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
373     if (config_.GetDBType() == DB_VECTOR || isReadOnly_ || isMemoryRdb_) {
374         return E_NOT_SUPPORT;
375     }
376     if (tables.empty()) {
377         LOG_WARN("The distributed tables to be set is empty.");
378         return E_OK;
379     }
380     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
381     if (errCode != E_OK) {
382         return errCode;
383     }
384     syncerParam_.asyncDownloadAsset_ = distributedConfig.asyncDownloadAsset;
385     syncerParam_.enableCloud_ = distributedConfig.enableCloud;
386     int32_t errorCode = service->SetDistributedTables(
387         syncerParam_, tables, distributedConfig.references, distributedConfig.isRebuild, type);
388     if (errorCode != E_OK) {
389         LOG_ERROR("Fail to set distributed tables, error=%{public}d.", errorCode);
390         return errorCode;
391     }
392     if (type == DistributedRdb::DISTRIBUTED_DEVICE && !config_.GetRegisterInfo(RegisterType::CLIENT_OBSERVER)) {
393         RegisterDataChangeCallback();
394     }
395     if (type != DistributedRdb::DISTRIBUTED_CLOUD) {
396         return E_OK;
397     }
398 
399     return HandleCloudSyncAfterSetDistributedTables(tables, distributedConfig);
400 }
401 
HandleCloudSyncAfterSetDistributedTables(const std::vector<std::string> & tables,const DistributedRdb::DistributedConfig & distributedConfig)402 int RdbStoreImpl::HandleCloudSyncAfterSetDistributedTables(
403     const std::vector<std::string> &tables, const DistributedRdb::DistributedConfig &distributedConfig)
404 {
405     auto pool = GetPool();
406     if (pool == nullptr) {
407         return E_ALREADY_CLOSED;
408     }
409     auto conn = pool->AcquireConnection(false);
410     if (conn != nullptr) {
411         auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
412         if (strategy == ExchangeStrategy::BACKUP) {
413             (void)conn->Backup({}, {}, false, slaveStatus_);
414         }
415     }
416     {
417         std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
418         if (distributedConfig.enableCloud && distributedConfig.autoSync) {
419             cloudInfo_->AddTables(tables);
420         } else {
421             cloudInfo_->RmvTables(tables);
422             return E_OK;
423         }
424     }
425     auto isRebuilt = RebuiltType::NONE;
426     GetRebuilt(isRebuilt);
427     if (isRebuilt == RebuiltType::REBUILT) {
428         DoCloudSync("");
429     }
430     return E_OK;
431 }
432 
ObtainDistributedTableName(const std::string & device,const std::string & table,int & errCode)433 std::string RdbStoreImpl::ObtainDistributedTableName(const std::string &device, const std::string &table, int &errCode)
434 {
435     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
436     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
437         return "";
438     }
439     std::string uuid;
440     DeviceManagerAdaptor::RdbDeviceManagerAdaptor &deviceManager =
441         DeviceManagerAdaptor::RdbDeviceManagerAdaptor::GetInstance(syncerParam_.bundleName_);
442     errCode = deviceManager.GetEncryptedUuidByNetworkId(device, uuid);
443     if (errCode != E_OK) {
444         LOG_ERROR("GetUuid is failed.");
445         return "";
446     }
447 
448     auto translateCall = [uuid](const std::string &oriDevId, const DistributedDB::StoreInfo &info) {
449         return uuid;
450     };
451     DistributedDB::RuntimeConfig::SetTranslateToDeviceIdCallback(translateCall);
452 
453     return DistributedDB::RelationalStoreManager::GetDistributedTableName(uuid, table);
454 }
455 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncBrief & callback)456 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncBrief &callback)
457 {
458     if (config_.GetDBType() == DB_VECTOR) {
459         return E_NOT_SUPPORT;
460     }
461     return Sync(option, predicate, [callback](Details &&details) {
462         Briefs briefs;
463         for (auto &[key, value] : details) {
464             briefs.insert_or_assign(key, value.code);
465         }
466         if (callback != nullptr) {
467             callback(briefs);
468         }
469     });
470 }
471 
Sync(const SyncOption & option,const std::vector<std::string> & tables,const AsyncDetail & async)472 int RdbStoreImpl::Sync(const SyncOption &option, const std::vector<std::string> &tables, const AsyncDetail &async)
473 {
474     return Sync(option, AbsRdbPredicates(tables), async);
475 }
476 
Sync(const SyncOption & option,const AbsRdbPredicates & predicate,const AsyncDetail & async)477 int RdbStoreImpl::Sync(const SyncOption &option, const AbsRdbPredicates &predicate, const AsyncDetail &async)
478 {
479     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
480     if (isMemoryRdb_) {
481         return E_NOT_SUPPORT;
482     }
483     DistributedRdb::RdbService::Option rdbOption;
484     rdbOption.mode = option.mode;
485     rdbOption.isAsync = !option.isBlock;
486     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
487     ret = InnerSync(syncerParam_, rdbOption, predicate.GetDistributedPredicates(), async);
488     return ret;
489 }
490 
InnerSync(const RdbParam & param,const Options & option,const Memo & predicates,const AsyncDetail & async)491 int RdbStoreImpl::InnerSync(
492     const RdbParam &param, const Options &option, const Memo &predicates, const AsyncDetail &async)
493 {
494     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
495     if (errCode == E_NOT_SUPPORT) {
496         return errCode;
497     }
498     if (errCode != E_OK) {
499         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
500             param.bundleName_.c_str());
501         return errCode;
502     }
503     errCode = service->Sync(param, option, predicates, async);
504     if (errCode != E_OK) {
505         LOG_ERROR("Sync is failed, err is %{public}d.", errCode);
506         return errCode;
507     }
508     return E_OK;
509 }
510 
GetUri(const std::string & event)511 Uri RdbStoreImpl::GetUri(const std::string &event)
512 {
513     std::string rdbUri;
514     if (config_.GetDataGroupId().empty()) {
515         rdbUri = SCHEME_RDB + config_.GetBundleName() + "/" + path_ + "/" + event;
516     } else {
517         rdbUri = SCHEME_RDB + config_.GetDataGroupId() + "/" + path_ + "/" + event;
518     }
519     return Uri(rdbUri);
520 }
521 
SubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)522 int RdbStoreImpl::SubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer)
523 {
524     std::lock_guard<std::mutex> lock(mutex_);
525     localObservers_.try_emplace(option.event);
526     auto &list = localObservers_.find(option.event)->second;
527     for (auto it = list.begin(); it != list.end(); it++) {
528         if ((*it)->getObserver() == observer) {
529             LOG_ERROR("Duplicate subscribe.");
530             return E_OK;
531         }
532     }
533 
534     localObservers_[option.event].push_back(std::make_shared<RdbStoreLocalObserver>(observer));
535     return E_OK;
536 }
537 
SubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)538 int RdbStoreImpl::SubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer)
539 {
540     std::lock_guard<std::mutex> lock(mutex_);
541     localSharedObservers_.try_emplace(option.event);
542     auto &list = localSharedObservers_.find(option.event)->second;
543     for (auto it = list.begin(); it != list.end(); it++) {
544         if ((*it)->getObserver() == observer) {
545             LOG_ERROR("Duplicate subscribe.");
546             return E_OK;
547         }
548     }
549 
550     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
551     if (client == nullptr) {
552         LOG_ERROR("Failed to get DataObsMgrClient.");
553         return E_GET_DATAOBSMGRCLIENT_FAIL;
554     }
555     sptr<RdbStoreLocalSharedObserver> localSharedObserver(new (std::nothrow) RdbStoreLocalSharedObserver(observer));
556     int32_t err = client->RegisterObserver(GetUri(option.event), localSharedObserver);
557     if (err != 0) {
558         LOG_ERROR("Subscribe failed.");
559         return err;
560     }
561     localSharedObservers_[option.event].push_back(std::move(localSharedObserver));
562     return E_OK;
563 }
564 
SubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)565 int32_t RdbStoreImpl::SubscribeLocalDetail(
566     const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
567 {
568     if (observer == nullptr) {
569         return E_OK;
570     }
571     std::lock_guard<std::mutex> lock(mutex_);
572     for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end(); it++) {
573         if ((*it)->GetObserver() == observer) {
574             LOG_WARN("duplicate subscribe.");
575             return E_OK;
576         }
577     }
578     auto localStoreObserver = std::make_shared<RdbStoreLocalDbObserver>(observer);
579     auto [errCode, conn] = GetConn(false);
580     if (conn == nullptr) {
581         return errCode;
582     }
583     errCode = conn->Subscribe(localStoreObserver);
584     if (errCode != E_OK) {
585         LOG_ERROR("Subscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
586             SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
587         return errCode;
588     }
589     config_.SetRegisterInfo(RegisterType::STORE_OBSERVER, true);
590     localDetailObservers_.emplace_back(localStoreObserver);
591     return E_OK;
592 }
593 
SubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)594 int RdbStoreImpl::SubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer)
595 {
596     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
597     if (errCode != E_OK) {
598         return errCode;
599     }
600     return service->Subscribe(syncerParam_, option, observer);
601 }
602 
Subscribe(const SubscribeOption & option,RdbStoreObserver * observer)603 int RdbStoreImpl::Subscribe(const SubscribeOption &option, RdbStoreObserver *observer)
604 {
605     if (config_.GetDBType() == DB_VECTOR) {
606         return E_NOT_SUPPORT;
607     }
608     if (option.mode == SubscribeMode::LOCAL) {
609         return SubscribeLocal(option, observer);
610     }
611     if (isMemoryRdb_) {
612         return E_NOT_SUPPORT;
613     }
614     if (option.mode == SubscribeMode::LOCAL_SHARED) {
615         return SubscribeLocalShared(option, observer);
616     }
617     return SubscribeRemote(option, observer);
618 }
619 
UnSubscribeLocal(const SubscribeOption & option,RdbStoreObserver * observer)620 int RdbStoreImpl::UnSubscribeLocal(const SubscribeOption &option, RdbStoreObserver *observer)
621 {
622     std::lock_guard<std::mutex> lock(mutex_);
623     auto obs = localObservers_.find(option.event);
624     if (obs == localObservers_.end()) {
625         return E_OK;
626     }
627 
628     auto &list = obs->second;
629     for (auto it = list.begin(); it != list.end();) {
630         if (observer == nullptr || (*it)->getObserver() == observer) {
631             it = list.erase(it);
632             if (observer != nullptr) {
633                 break;
634             }
635         } else {
636             it++;
637         }
638     }
639 
640     if (list.empty()) {
641         localObservers_.erase(option.event);
642     }
643     return E_OK;
644 }
645 
UnSubscribeLocalShared(const SubscribeOption & option,RdbStoreObserver * observer)646 int RdbStoreImpl::UnSubscribeLocalShared(const SubscribeOption &option, RdbStoreObserver *observer)
647 {
648     std::lock_guard<std::mutex> lock(mutex_);
649     auto obs = localSharedObservers_.find(option.event);
650     if (obs == localSharedObservers_.end()) {
651         return E_OK;
652     }
653 
654     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
655     if (client == nullptr) {
656         LOG_ERROR("Failed to get DataObsMgrClient.");
657         return E_GET_DATAOBSMGRCLIENT_FAIL;
658     }
659 
660     auto &list = obs->second;
661     for (auto it = list.begin(); it != list.end();) {
662         if (observer == nullptr || (*it)->getObserver() == observer) {
663             int32_t err = client->UnregisterObserver(GetUri(option.event), *it);
664             if (err != 0) {
665                 LOG_ERROR("UnSubscribeLocalShared failed.");
666                 return err;
667             }
668             it = list.erase(it);
669             if (observer != nullptr) {
670                 break;
671             }
672         } else {
673             it++;
674         }
675     }
676     if (list.empty()) {
677         localSharedObservers_.erase(option.event);
678     }
679     return E_OK;
680 }
681 
UnsubscribeLocalDetail(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)682 int32_t RdbStoreImpl::UnsubscribeLocalDetail(
683     const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
684 {
685     auto [errCode, conn] = GetConn(false);
686     if (conn == nullptr) {
687         return errCode;
688     }
689     std::lock_guard<std::mutex> lock(mutex_);
690     for (auto it = localDetailObservers_.begin(); it != localDetailObservers_.end();) {
691         if (observer == nullptr || (*it)->GetObserver() == observer) {
692             int32_t err = conn->Unsubscribe(*it);
693             if (err != 0) {
694                 LOG_ERROR("Unsubscribe local detail observer failed. db name:%{public}s errCode:%{public}." PRId32,
695                     SqliteUtils::Anonymous(config_.GetName()).c_str(), errCode);
696                 return err;
697             }
698             it = localDetailObservers_.erase(it);
699             if (observer != nullptr) {
700                 break;
701             }
702         } else {
703             it++;
704         }
705     }
706     return E_OK;
707 }
708 
UnSubscribeRemote(const SubscribeOption & option,RdbStoreObserver * observer)709 int RdbStoreImpl::UnSubscribeRemote(const SubscribeOption &option, RdbStoreObserver *observer)
710 {
711     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
712     if (errCode != E_OK) {
713         return errCode;
714     }
715     return service->UnSubscribe(syncerParam_, option, observer);
716 }
717 
UnSubscribe(const SubscribeOption & option,RdbStoreObserver * observer)718 int RdbStoreImpl::UnSubscribe(const SubscribeOption &option, RdbStoreObserver *observer)
719 {
720     if (config_.GetDBType() == DB_VECTOR) {
721         return E_NOT_SUPPORT;
722     }
723     if (option.mode == SubscribeMode::LOCAL) {
724         return UnSubscribeLocal(option, observer);
725     }
726     if (isMemoryRdb_) {
727         return E_NOT_SUPPORT;
728     }
729     if (option.mode == SubscribeMode::LOCAL_SHARED) {
730         return UnSubscribeLocalShared(option, observer);
731     }
732     return UnSubscribeRemote(option, observer);
733 }
734 
SubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)735 int RdbStoreImpl::SubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
736 {
737     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
738         return E_NOT_SUPPORT;
739     }
740     return SubscribeLocalDetail(option, observer);
741 }
742 
UnsubscribeObserver(const SubscribeOption & option,const std::shared_ptr<RdbStoreObserver> & observer)743 int RdbStoreImpl::UnsubscribeObserver(const SubscribeOption &option, const std::shared_ptr<RdbStoreObserver> &observer)
744 {
745     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
746         return E_NOT_SUPPORT;
747     }
748     return UnsubscribeLocalDetail(option, observer);
749 }
750 
Notify(const std::string & event)751 int RdbStoreImpl::Notify(const std::string &event)
752 {
753     if (config_.GetDBType() == DB_VECTOR) {
754         return E_NOT_SUPPORT;
755     }
756 
757     {
758         std::lock_guard<std::mutex> lock(mutex_);
759         auto obs = localObservers_.find(event);
760         if (obs != localObservers_.end()) {
761             auto &list = obs->second;
762             for (auto &it : list) {
763                 it->OnChange();
764             }
765         }
766     }
767     if (isMemoryRdb_) {
768         return E_OK;
769     }
770     auto client = OHOS::AAFwk::DataObsMgrClient::GetInstance();
771     if (client == nullptr) {
772         LOG_ERROR("Failed to get DataObsMgrClient.");
773         return E_GET_DATAOBSMGRCLIENT_FAIL;
774     }
775     int32_t err = client->NotifyChange(GetUri(event));
776     if (err != 0) {
777         LOG_ERROR("Notify failed.");
778     }
779     return E_OK;
780 }
781 
SetSearchable(bool isSearchable)782 int RdbStoreImpl::SetSearchable(bool isSearchable)
783 {
784     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
785         return E_NOT_SUPPORT;
786     }
787     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
788     if (errCode != E_OK || service == nullptr) {
789         LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
790         return errCode;
791     }
792     return service->SetSearchable(syncerParam_, isSearchable);
793 }
794 
RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)795 int RdbStoreImpl::RegisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
796 {
797     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
798         return E_NOT_SUPPORT;
799     }
800     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
801     if (errCode != E_OK) {
802         return errCode;
803     }
804     return service->RegisterAutoSyncCallback(syncerParam_, observer);
805 }
806 
UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)807 int RdbStoreImpl::UnregisterAutoSyncCallback(std::shared_ptr<DetailProgressObserver> observer)
808 {
809     if (config_.GetDBType() == DB_VECTOR || isMemoryRdb_) {
810         return E_NOT_SUPPORT;
811     }
812     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
813     if (errCode != E_OK) {
814         return errCode;
815     }
816     return service->UnregisterAutoSyncCallback(syncerParam_, observer);
817 }
818 
InitDelayNotifier()819 void RdbStoreImpl::InitDelayNotifier()
820 {
821     if (delayNotifier_ != nullptr) {
822         return;
823     }
824     delayNotifier_ = std::make_shared<DelayNotify>();
825     if (delayNotifier_ == nullptr) {
826         LOG_ERROR("Init delay notifier failed.");
827         return;
828     }
829     delayNotifier_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
830     delayNotifier_->SetTask([param = syncerParam_](const DistributedRdb::RdbChangedData &rdbChangedData,
831                                 const RdbNotifyConfig &rdbNotifyConfig) -> int {
832         auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(param);
833         if (errCode == E_NOT_SUPPORT) {
834             return errCode;
835         }
836         if (errCode != E_OK || service == nullptr) {
837             LOG_ERROR("GetRdbService is failed, err is %{public}d.", errCode);
838             return errCode;
839         }
840         return service->NotifyDataChange(param, rdbChangedData, rdbNotifyConfig);
841     });
842 }
843 
RegisterDataChangeCallback()844 int RdbStoreImpl::RegisterDataChangeCallback()
845 {
846     InitDelayNotifier();
847     auto connPool = GetPool();
848     if (connPool == nullptr) {
849         return E_ALREADY_CLOSED;
850     }
851 
852     RegisterDataChangeCallback(delayNotifier_, connPool, 0);
853     config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, true);
854     return E_OK;
855 }
856 
RegisterDataChangeCallback(std::shared_ptr<DelayNotify> delayNotifier,std::weak_ptr<ConnectionPool> connPool,int retry)857 void RdbStoreImpl::RegisterDataChangeCallback(
858     std::shared_ptr<DelayNotify> delayNotifier, std::weak_ptr<ConnectionPool> connPool, int retry)
859 {
860     auto relConnPool = connPool.lock();
861     if (relConnPool == nullptr) {
862         return;
863     }
864     auto conn = relConnPool->AcquireConnection(false);
865     if (conn == nullptr) {
866         relConnPool->Dump(true, "DATACHANGE");
867         auto pool = TaskExecutor::GetInstance().GetExecutor();
868         if (pool != nullptr && retry++ < MAX_RETRY_TIMES) {
869             pool->Schedule(std::chrono::seconds(1),
870                 [delayNotifier, connPool, retry]() { RegisterDataChangeCallback(delayNotifier, connPool, retry); });
871         }
872         return;
873     }
874     auto callBack = [delayNotifier](const DistributedRdb::RdbChangedData &rdbChangedData) {
875         if (delayNotifier != nullptr) {
876             delayNotifier->UpdateNotify(rdbChangedData);
877         }
878     };
879     auto errCode = conn->SubscribeTableChanges(callBack);
880     if (errCode != E_OK) {
881         return;
882     }
883 }
884 
GetHashKeyForLockRow(const AbsRdbPredicates & predicates,std::vector<std::vector<uint8_t>> & hashKeys)885 int RdbStoreImpl::GetHashKeyForLockRow(const AbsRdbPredicates &predicates, std::vector<std::vector<uint8_t>> &hashKeys)
886 {
887     std::string table = predicates.GetTableName();
888     if (table.empty()) {
889         return E_EMPTY_TABLE_NAME;
890     }
891     auto logTable = GetLogTableName(table);
892     std::string sql;
893     sql.append("SELECT ").append(logTable).append(".hash_key ").append("FROM ").append(logTable);
894     sql.append(" INNER JOIN ").append(table).append(" ON ");
895     sql.append(table).append(".ROWID = ").append(logTable).append(".data_key");
896     auto whereClause = predicates.GetWhereClause();
897     if (!whereClause.empty()) {
898         SqliteUtils::Replace(whereClause, SqliteUtils::REP, logTable + ".");
899         sql.append(" WHERE ").append(whereClause);
900     }
901 
902     auto result = QuerySql(sql, predicates.GetBindArgs());
903     if (result == nullptr) {
904         return E_ALREADY_CLOSED ;
905     }
906     int count = 0;
907     if (result->GetRowCount(count) != E_OK) {
908         return E_NO_ROW_IN_QUERY;
909     }
910 
911     if (count <= 0) {
912         return E_NO_ROW_IN_QUERY;
913     }
914     while (result->GoToNextRow() == E_OK) {
915         std::vector<uint8_t> hashKey;
916         if (result->GetBlob(0, hashKey) != E_OK) {
917             return E_ERROR;
918         }
919         hashKeys.push_back(std::move(hashKey));
920     }
921     return E_OK;
922 }
923 
ModifyLockStatus(const AbsRdbPredicates & predicates,bool isLock)924 int RdbStoreImpl::ModifyLockStatus(const AbsRdbPredicates &predicates, bool isLock)
925 {
926     if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
927         return E_NOT_SUPPORT;
928     }
929     std::vector<std::vector<uint8_t>> hashKeys;
930     int ret = GetHashKeyForLockRow(predicates, hashKeys);
931     if (ret != E_OK) {
932         LOG_ERROR("GetHashKeyForLockRow failed, err is %{public}d.", ret);
933         return ret;
934     }
935     auto [err, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION);
936     if (statement == nullptr || err != E_OK) {
937         return err;
938     }
939     int errCode = statement->ModifyLockStatus(predicates.GetTableName(), hashKeys, isLock);
940     if (errCode == E_WAIT_COMPENSATED_SYNC) {
941         LOG_DEBUG("Start compensation sync.");
942         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true, true };
943         auto memo = AbsRdbPredicates(predicates.GetTableName()).GetDistributedPredicates();
944         InnerSync(syncerParam_, option, memo, nullptr);
945         return E_OK;
946     }
947     if (errCode != E_OK) {
948         LOG_ERROR("ModifyLockStatus failed, err is %{public}d.", errCode);
949     }
950     return errCode;
951 }
952 
LockCloudContainer()953 std::pair<int32_t, uint32_t> RdbStoreImpl::LockCloudContainer()
954 {
955     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
956     if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
957         return { E_NOT_SUPPORT, 0 };
958     }
959     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
960     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
961     if (errCode == E_NOT_SUPPORT) {
962         LOG_ERROR("not support");
963         return { errCode, 0 };
964     }
965     if (errCode != E_OK) {
966         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
967             syncerParam_.bundleName_.c_str());
968         return { errCode, 0 };
969     }
970     auto result = service->LockCloudContainer(syncerParam_);
971     if (result.first != E_OK) {
972         LOG_ERROR("LockCloudContainer failed, err is %{public}d.", result.first);
973     }
974     return result;
975 }
976 
UnlockCloudContainer()977 int32_t RdbStoreImpl::UnlockCloudContainer()
978 {
979     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
980     if (config_.IsVector() || isMemoryRdb_ || isReadOnly_) {
981         return E_NOT_SUPPORT;
982     }
983     RdbRadar ret(Scene::SCENE_SYNC, __FUNCTION__, config_.GetBundleName());
984     auto [errCode, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
985     if (errCode == E_NOT_SUPPORT) {
986         LOG_ERROR("not support");
987         return errCode;
988     }
989     if (errCode != E_OK) {
990         LOG_ERROR("GetRdbService is failed, err is %{public}d, bundleName is %{public}s.", errCode,
991             syncerParam_.bundleName_.c_str());
992         return errCode;
993     }
994     errCode = service->UnlockCloudContainer(syncerParam_);
995     if (errCode != E_OK) {
996         LOG_ERROR("UnlockCloudContainer failed, err is %{public}d.", errCode);
997     }
998     return errCode;
999 }
1000 #endif
1001 
RdbStoreImpl(const RdbStoreConfig & config)1002 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config)
1003     : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
1004       fileType_(config.GetDatabaseFileType())
1005 {
1006     SqliteGlobalConfig::GetDbPath(config_, path_);
1007     isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
1008 }
1009 
RdbStoreImpl(const RdbStoreConfig & config,int & errCode)1010 RdbStoreImpl::RdbStoreImpl(const RdbStoreConfig &config, int &errCode)
1011     : isMemoryRdb_(config.IsMemoryRdb()), config_(config), name_(config.GetName()),
1012       fileType_(config.GetDatabaseFileType())
1013 {
1014     isReadOnly_ = config.IsReadOnly() || config.GetRoleType() == VISITOR;
1015     SqliteGlobalConfig::GetDbPath(config_, path_);
1016     bool created = access(path_.c_str(), F_OK) != 0;
1017     connectionPool_ = ConnectionPool::Create(config_, errCode);
1018     if (connectionPool_ == nullptr && (errCode == E_SQLITE_CORRUPT || errCode == E_INVALID_SECRET_KEY) &&
1019         !isReadOnly_) {
1020         LOG_ERROR("database corrupt, errCode:0x%{public}x, %{public}s, %{public}s", errCode,
1021             SqliteUtils::Anonymous(name_).c_str(),
1022             SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config_), "master").c_str());
1023 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1024         RdbParam param;
1025         param.bundleName_ = config_.GetBundleName();
1026         param.storeName_ = config_.GetName();
1027         param.subUser_ = config_.GetSubUser();
1028         auto [err, service] = RdbMgr::GetInstance().GetRdbService(param);
1029         if (service != nullptr) {
1030             service->Disable(param);
1031         }
1032 #endif
1033         config_.SetIter(0);
1034         if (config_.IsEncrypt() && config_.GetAllowRebuild()) {
1035             auto key = config_.GetEncryptKey();
1036             RdbSecurityManager::GetInstance().RestoreKeyFile(path_, key);
1037             key.assign(key.size(), 0);
1038         }
1039         std::tie(rebuild_, connectionPool_) = ConnectionPool::HandleDataCorruption(config_, errCode);
1040         created = true;
1041 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1042         if (service != nullptr) {
1043             service->Enable(param);
1044         }
1045 #endif
1046     }
1047     if (connectionPool_ == nullptr || errCode != E_OK) {
1048         connectionPool_ = nullptr;
1049         LOG_ERROR("Create connPool failed, err is %{public}d, path:%{public}s", errCode,
1050             SqliteUtils::Anonymous(path_).c_str());
1051         return;
1052     }
1053     InitSyncerParam(config_, created);
1054     InitReportFunc(syncerParam_);
1055     InnerOpen();
1056 }
1057 
~RdbStoreImpl()1058 RdbStoreImpl::~RdbStoreImpl()
1059 {
1060     connectionPool_ = nullptr;
1061     trxConnMap_ = {};
1062     for (auto &trans : transactions_) {
1063         auto realTrans = trans.lock();
1064         if (realTrans) {
1065             (void)realTrans->Close();
1066         }
1067     }
1068     transactions_ = {};
1069 }
1070 
GetConfig()1071 const RdbStoreConfig &RdbStoreImpl::GetConfig()
1072 {
1073     return config_;
1074 }
1075 
Insert(const std::string & table,const Row & row,Resolution resolution)1076 std::pair<int, int64_t> RdbStoreImpl::Insert(const std::string &table, const Row &row, Resolution resolution)
1077 {
1078     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1079     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1080         return { E_NOT_SUPPORT, -1 };
1081     }
1082     if (table.empty()) {
1083         return { E_EMPTY_TABLE_NAME, -1 };
1084     }
1085 
1086     if (row.IsEmpty()) {
1087         return { E_EMPTY_VALUES_BUCKET, -1 };
1088     }
1089 
1090     auto conflictClause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1091     if (conflictClause == nullptr) {
1092         return { E_INVALID_CONFLICT_FLAG, -1 };
1093     }
1094     RdbStatReporter reportStat(RDB_PERF, INSERT, config_, reportFunc_);
1095     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1096     std::string sql;
1097     sql.append("INSERT").append(conflictClause).append(" INTO ").append(table).append("(");
1098     size_t bindArgsSize = row.values_.size();
1099     std::vector<ValueObject> bindArgs;
1100     bindArgs.reserve(bindArgsSize);
1101     const char *split = "";
1102     for (const auto &[key, val] : row.values_) {
1103         sql.append(split).append(key);
1104         if (val.GetType() == ValueObject::TYPE_ASSETS && resolution == ConflictResolution::ON_CONFLICT_REPLACE) {
1105             return { E_INVALID_ARGS, -1 };
1106         }
1107         SqliteSqlBuilder::UpdateAssetStatus(val, AssetValue::STATUS_INSERT);
1108         bindArgs.push_back(val); // columnValue
1109         split = ",";
1110     }
1111 
1112     sql.append(") VALUES (");
1113     if (bindArgsSize > 0) {
1114         sql.append(SqliteSqlBuilder::GetSqlArgs(bindArgsSize));
1115     }
1116 
1117     sql.append(")");
1118     int64_t rowid = -1;
1119     auto errCode = ExecuteForLastInsertedRowId(rowid, sql, bindArgs);
1120     if (errCode == E_OK) {
1121         DoCloudSync(table);
1122     }
1123 
1124     return { errCode, rowid };
1125 }
1126 
BatchInsert(const std::string & table,const ValuesBuckets & rows)1127 std::pair<int, int64_t> RdbStoreImpl::BatchInsert(const std::string &table, const ValuesBuckets &rows)
1128 {
1129     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1130         return { E_NOT_SUPPORT, -1 };
1131     }
1132 
1133     if (rows.RowSize() == 0) {
1134         return { E_OK, 0 };
1135     }
1136 
1137     RdbStatReporter reportStat(RDB_PERF, BATCHINSERT, config_, reportFunc_);
1138     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1139     auto pool = GetPool();
1140     if (pool == nullptr) {
1141         return { E_ALREADY_CLOSED, -1 };
1142     }
1143     auto conn = pool->AcquireConnection(false);
1144     if (conn == nullptr) {
1145         return { E_DATABASE_BUSY, -1 };
1146     }
1147 
1148     auto executeSqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable());
1149     BatchInsertArgsDfx(static_cast<int>(executeSqlArgs.size()));
1150     if (executeSqlArgs.empty()) {
1151         LOG_ERROR("empty, table=%{public}s, values:%{public}zu, max number:%{public}d.", table.c_str(), rows.RowSize(),
1152             conn->GetMaxVariable());
1153         return { E_INVALID_ARGS, -1 };
1154     }
1155     PauseDelayNotify pauseDelayNotify(delayNotifier_);
1156     for (const auto &[sql, bindArgs] : executeSqlArgs) {
1157         auto [errCode, statement] = GetStatement(sql, conn);
1158         if (statement == nullptr) {
1159             LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1160                 "app self can check the SQL", errCode, bindArgs.size(), table.c_str());
1161             return { E_OK, -1 };
1162         }
1163         for (const auto &args : bindArgs) {
1164             auto errCode = statement->Execute(args);
1165             if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1166                 pool->Dump(true, "BATCH");
1167                 return { errCode, -1 };
1168             }
1169             if (errCode != E_OK) {
1170                 LOG_ERROR("failed, errCode:%{public}d,args:%{public}zu,table:%{public}s,app self can check the SQL",
1171                     errCode, bindArgs.size(), table.c_str());
1172                 return { E_OK, -1 };
1173             }
1174         }
1175     }
1176     conn = nullptr;
1177     DoCloudSync(table);
1178     return { E_OK, int64_t(rows.RowSize()) };
1179 }
1180 
BatchInsertArgsDfx(int argsSize)1181 void RdbStoreImpl::BatchInsertArgsDfx(int argsSize)
1182 {
1183     if (argsSize > 1) {
1184         Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_BATCH_INSERT_ARGS_SIZE, config_.GetBundleName(),
1185             "BatchInsert executeSqlArgs size[ " + std::to_string(argsSize) + "]"));
1186     }
1187 }
1188 
BatchInsertWithConflictResolution(const std::string & table,const ValuesBuckets & rows,Resolution resolution)1189 std::pair<int, int64_t> RdbStoreImpl::BatchInsertWithConflictResolution(
1190     const std::string &table, const ValuesBuckets &rows, Resolution resolution)
1191 {
1192     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1193         return { E_NOT_SUPPORT, -1 };
1194     }
1195 
1196     if (rows.RowSize() == 0) {
1197         return { E_OK, 0 };
1198     }
1199 
1200     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1201     auto pool = GetPool();
1202     if (pool == nullptr) {
1203         return { E_ALREADY_CLOSED, -1 };
1204     }
1205     auto conn = pool->AcquireConnection(false);
1206     if (conn == nullptr) {
1207         return { E_DATABASE_BUSY, -1 };
1208     }
1209 
1210     auto sqlArgs = SqliteSqlBuilder::GenerateSqls(table, rows, conn->GetMaxVariable(), resolution);
1211     // To ensure atomicity, execute SQL only once
1212     if (sqlArgs.size() != 1 || sqlArgs.front().second.size() != 1) {
1213         auto [fields, values] = rows.GetFieldsAndValues();
1214         LOG_ERROR("invalid args, table=%{public}s, rows:%{public}zu, fields:%{public}zu, max:%{public}d.",
1215             table.c_str(), rows.RowSize(), fields != nullptr ? fields->size() : 0, conn->GetMaxVariable());
1216         return { E_INVALID_ARGS, -1 };
1217     }
1218     auto &[sql, bindArgs] = sqlArgs.front();
1219     auto [errCode, statement] = GetStatement(sql, conn);
1220     if (statement == nullptr) {
1221         LOG_ERROR("statement is nullptr, errCode:0x%{public}x, args:%{public}zu, table:%{public}s, "
1222                   "app self can check the SQL", errCode, bindArgs.size(), table.c_str());
1223         return { errCode, -1 };
1224     }
1225     PauseDelayNotify pauseDelayNotify(delayNotifier_);
1226     auto args = std::ref(bindArgs.front());
1227     errCode = statement->Execute(args);
1228     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1229         pool->Dump(true, "BATCH");
1230         return { errCode, -1 };
1231     }
1232     if (errCode != E_OK) {
1233         LOG_ERROR("failed,errCode:%{public}d,table:%{public}s,args:%{public}zu,resolution:%{public}d.", errCode,
1234             table.c_str(), args.get().size(), static_cast<int32_t>(resolution));
1235         return { errCode, errCode == E_SQLITE_CONSTRAINT ? int64_t(statement->Changes()) : -1 };
1236     }
1237     conn = nullptr;
1238     DoCloudSync(table);
1239     return { E_OK, int64_t(statement->Changes()) };
1240 }
1241 
Update(const std::string & table,const Row & row,const std::string & where,const Values & args,Resolution resolution)1242 std::pair<int, int> RdbStoreImpl::Update(
1243     const std::string &table, const Row &row, const std::string &where, const Values &args, Resolution resolution)
1244 {
1245     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1246     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1247         return { E_NOT_SUPPORT, -1 };
1248     }
1249     if (table.empty()) {
1250         return { E_EMPTY_TABLE_NAME, -1 };
1251     }
1252 
1253     if (row.IsEmpty()) {
1254         return { E_EMPTY_VALUES_BUCKET, -1 };
1255     }
1256 
1257     auto clause = SqliteUtils::GetConflictClause(static_cast<int>(resolution));
1258     if (clause == nullptr) {
1259         return { E_INVALID_CONFLICT_FLAG, -1 };
1260     }
1261     RdbStatReporter reportStat(RDB_PERF, UPDATE, config_, reportFunc_);
1262     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1263     std::string sql;
1264     sql.append("UPDATE").append(clause).append(" ").append(table).append(" SET ");
1265     std::vector<ValueObject> tmpBindArgs;
1266     size_t tmpBindSize = row.values_.size() + args.size();
1267     tmpBindArgs.reserve(tmpBindSize);
1268     const char *split = "";
1269     for (auto &[key, val] : row.values_) {
1270         sql.append(split);
1271         if (val.GetType() == ValueObject::TYPE_ASSETS) {
1272             sql.append(key).append("=merge_assets(").append(key).append(", ?)"); // columnName
1273         } else if (val.GetType() == ValueObject::TYPE_ASSET) {
1274             sql.append(key).append("=merge_asset(").append(key).append(", ?)"); // columnName
1275         } else {
1276             sql.append(key).append("=?"); // columnName
1277         }
1278         tmpBindArgs.push_back(val); // columnValue
1279         split = ",";
1280     }
1281 
1282     if (!where.empty()) {
1283         sql.append(" WHERE ").append(where);
1284     }
1285 
1286     tmpBindArgs.insert(tmpBindArgs.end(), args.begin(), args.end());
1287 
1288     int64_t changes = 0;
1289     auto errCode = ExecuteForChangedRowCount(changes, sql, tmpBindArgs);
1290     if (errCode == E_OK) {
1291         DoCloudSync(table);
1292     }
1293     return { errCode, int32_t(changes) };
1294 }
1295 
Delete(int & deletedRows,const std::string & table,const std::string & whereClause,const Values & args)1296 int RdbStoreImpl::Delete(int &deletedRows, const std::string &table, const std::string &whereClause, const Values &args)
1297 {
1298     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1299     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1300         return E_NOT_SUPPORT;
1301     }
1302     if (table.empty()) {
1303         return E_EMPTY_TABLE_NAME;
1304     }
1305 
1306     RdbStatReporter reportStat(RDB_PERF, DELETE, config_, reportFunc_);
1307     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1308     std::string sql;
1309     sql.append("DELETE FROM ").append(table);
1310     if (!whereClause.empty()) {
1311         sql.append(" WHERE ").append(whereClause);
1312     }
1313     int64_t changes = 0;
1314     auto errCode = ExecuteForChangedRowCount(changes, sql, args);
1315     if (errCode != E_OK) {
1316         return errCode;
1317     }
1318     deletedRows = changes;
1319     DoCloudSync(table);
1320     return E_OK;
1321 }
1322 
QuerySql(const std::string & sql,const Values & bindArgs)1323 std::shared_ptr<AbsSharedResultSet> RdbStoreImpl::QuerySql(const std::string &sql, const Values &bindArgs)
1324 {
1325     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1326     if (config_.GetDBType() == DB_VECTOR) {
1327         return nullptr;
1328     }
1329     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1330 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1331     auto start = std::chrono::steady_clock::now();
1332     auto pool = GetPool();
1333     if (pool == nullptr) {
1334         LOG_ERROR("Database already closed.");
1335         return nullptr;
1336     }
1337     return std::make_shared<SqliteSharedResultSet>(start, pool->AcquireRef(true), sql, bindArgs, path_);
1338 #else
1339     (void)sql;
1340     (void)bindArgs;
1341     return nullptr;
1342 #endif
1343 }
1344 
QueryByStep(const std::string & sql,const Values & args,bool preCount)1345 std::shared_ptr<ResultSet> RdbStoreImpl::QueryByStep(const std::string &sql, const Values &args, bool preCount)
1346 {
1347     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1348     auto start = std::chrono::steady_clock::now();
1349     auto pool = GetPool();
1350     if (pool == nullptr) {
1351         LOG_ERROR("Database already closed.");
1352         return nullptr;
1353     }
1354 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1355     return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, preCount);
1356 #else
1357     return std::make_shared<StepResultSet>(start, connectionPool_->AcquireRef(true), sql, args, false);
1358 #endif
1359 }
1360 
Count(int64_t & outValue,const AbsRdbPredicates & predicates)1361 int RdbStoreImpl::Count(int64_t &outValue, const AbsRdbPredicates &predicates)
1362 {
1363     if (config_.GetDBType() == DB_VECTOR) {
1364         return E_NOT_SUPPORT;
1365     }
1366     std::string sql = SqliteSqlBuilder::BuildCountString(predicates);
1367     return ExecuteAndGetLong(outValue, sql, predicates.GetBindArgs());
1368 }
1369 
WriteToCompareFile(const std::string & dbPath,const std::string & bundleName,const std::string & sql)1370 void WriteToCompareFile(const std::string &dbPath, const std::string &bundleName, const std::string &sql)
1371 {
1372     auto poolTask = TaskExecutor::GetInstance().GetExecutor();
1373     if (poolTask != nullptr) {
1374         poolTask->Execute([dbPath, bundleName, sql]() {
1375             std::string comparePath = dbPath + "-compare";
1376             if (SqliteUtils::CleanFileContent(comparePath)) {
1377                 Reportor::ReportFault(
1378                     RdbFaultEvent(FT_CURD, E_DFX_IS_NOT_EXIST, bundleName, "compare file is deleted"));
1379             }
1380             SqliteUtils::WriteSqlToFile(comparePath, sql);
1381         });
1382     }
1383 }
1384 
HandleSchemaDDL(std::shared_ptr<Statement> statement,std::shared_ptr<ConnectionPool> pool,const std::string & sql,int32_t & errCode)1385 void RdbStoreImpl::HandleSchemaDDL(std::shared_ptr<Statement> statement,
1386     std::shared_ptr<ConnectionPool> pool, const std::string &sql, int32_t &errCode)
1387 {
1388     statement->Reset();
1389     statement->Prepare("PRAGMA schema_version");
1390     auto [err, version] = statement->ExecuteForValue();
1391     statement = nullptr;
1392     if (vSchema_ < static_cast<int64_t>(version)) {
1393         LOG_INFO("db:%{public}s exe DDL schema<%{public}" PRIi64 "->%{public}" PRIi64
1394                  "> app self can check the SQL.",
1395             SqliteUtils::Anonymous(name_).c_str(), vSchema_, static_cast<int64_t>(version));
1396         vSchema_ = version;
1397         errCode = pool->RestartConns();
1398         if (!isMemoryRdb_) {
1399             std::string dbPath = config_.GetPath();
1400             std::string bundleName = config_.GetBundleName();
1401             WriteToCompareFile(dbPath, bundleName, sql);
1402         }
1403     }
1404 }
1405 
ExecuteSql(const std::string & sql,const Values & args)1406 int RdbStoreImpl::ExecuteSql(const std::string &sql, const Values &args)
1407 {
1408     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
1409     if (config_.GetDBType() == DB_VECTOR || isReadOnly_) {
1410         return E_NOT_SUPPORT;
1411     }
1412     int ret = CheckAttach(sql);
1413     if (ret != E_OK) {
1414         return ret;
1415     }
1416     RdbStatReporter reportStat(RDB_PERF, EXECUTESQL, config_, reportFunc_);
1417     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1418     auto [errCode, statement] = BeginExecuteSql(sql);
1419     if (statement == nullptr) {
1420         return errCode;
1421     }
1422     auto pool = GetPool();
1423     if (pool == nullptr) {
1424         return E_ALREADY_CLOSED;
1425     }
1426     errCode = statement->Execute(args);
1427     if (errCode != E_OK) {
1428         LOG_ERROR("failed,error:0x%{public}x app self can check the SQL.", errCode);
1429         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1430             pool->Dump(true, "EXECUTE");
1431         }
1432         return errCode;
1433     }
1434     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1435     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1436         HandleSchemaDDL(statement, pool, sql, errCode);
1437     }
1438     statement = nullptr;
1439     if (errCode == E_OK && (sqlType == SqliteUtils::STATEMENT_UPDATE || sqlType == SqliteUtils::STATEMENT_INSERT)) {
1440         DoCloudSync("");
1441     }
1442     return errCode;
1443 }
1444 
Execute(const std::string & sql,const Values & args,int64_t trxId)1445 std::pair<int32_t, ValueObject> RdbStoreImpl::Execute(const std::string &sql, const Values &args, int64_t trxId)
1446 {
1447     ValueObject object;
1448     if (isReadOnly_) {
1449         return { E_NOT_SUPPORT, object };
1450     }
1451 
1452     RdbStatReporter reportStat(RDB_PERF, EXECUTE, config_, reportFunc_);
1453     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_TOTAL);
1454     int sqlType = SqliteUtils::GetSqlStatementType(sql);
1455     if (!SqliteUtils::IsSupportSqlForExecute(sqlType)) {
1456         LOG_ERROR("Not support the sqlType: %{public}d, app self can check the SQL", sqlType);
1457         return { E_NOT_SUPPORT_THE_SQL, object };
1458     }
1459 
1460     if (config_.IsVector() && trxId > 0) {
1461         return { ExecuteByTrxId(sql, trxId, false, args), ValueObject() };
1462     }
1463 
1464     auto pool = GetPool();
1465     if (pool == nullptr) {
1466         return { E_ALREADY_CLOSED, object };
1467     }
1468     auto conn = pool->AcquireConnection(false);
1469     if (pool == nullptr) {
1470         return { E_DATABASE_BUSY, object };
1471     }
1472 
1473     auto [errCode, statement] = GetStatement(sql, conn);
1474     if (errCode != E_OK) {
1475         return { errCode, object };
1476     }
1477 
1478     errCode = statement->Execute(args);
1479     if (errCode != E_OK) {
1480         LOG_ERROR("failed,error:0x%{public}x app self can check the SQL.", errCode);
1481         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1482             pool->Dump(true, "EXECUTE");
1483         }
1484         return { errCode, object };
1485     }
1486 
1487     if (config_.IsVector()) {
1488         return { errCode, object };
1489     }
1490 
1491     return HandleDifferentSqlTypes(statement, sql, object, sqlType);
1492 }
1493 
HandleDifferentSqlTypes(std::shared_ptr<Statement> statement,const std::string & sql,const ValueObject & object,int sqlType)1494 std::pair<int32_t, ValueObject> RdbStoreImpl::HandleDifferentSqlTypes(
1495     std::shared_ptr<Statement> statement, const std::string &sql, const ValueObject &object, int sqlType)
1496 {
1497     int32_t errCode = E_OK;
1498     if (sqlType == SqliteUtils::STATEMENT_INSERT) {
1499         int64_t outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1500         return { errCode, ValueObject(outValue) };
1501     }
1502 
1503     if (sqlType == SqliteUtils::STATEMENT_UPDATE) {
1504         int outValue = statement->Changes();
1505         return { errCode, ValueObject(outValue) };
1506     }
1507 
1508     if (sqlType == SqliteUtils::STATEMENT_PRAGMA) {
1509         if (statement->GetColumnCount() == 1) {
1510             return statement->GetColumn(0);
1511         }
1512 
1513         if (statement->GetColumnCount() > 1) {
1514             LOG_ERROR("Not support the sql:app self can check the SQL, column count more than 1");
1515             return { E_NOT_SUPPORT_THE_SQL, object };
1516         }
1517     }
1518 
1519     auto pool = GetPool();
1520     if (pool == nullptr) {
1521         return { E_ALREADY_CLOSED, object };
1522     }
1523 
1524     if (sqlType == SqliteUtils::STATEMENT_DDL) {
1525         HandleSchemaDDL(statement, pool, sql, errCode);
1526     }
1527     return { errCode, object };
1528 }
1529 
ExecuteAndGetLong(int64_t & outValue,const std::string & sql,const Values & args)1530 int RdbStoreImpl::ExecuteAndGetLong(int64_t &outValue, const std::string &sql, const Values &args)
1531 {
1532     if (config_.GetDBType() == DB_VECTOR) {
1533         return E_NOT_SUPPORT;
1534     }
1535     auto [errCode, statement] = BeginExecuteSql(sql);
1536     if (statement == nullptr) {
1537         return errCode;
1538     }
1539     auto [err, object] = statement->ExecuteForValue(args);
1540     if (err != E_OK) {
1541         LOG_ERROR("failed, app self can check the SQL,  ERROR is %{public}d.", err);
1542     }
1543     outValue = object;
1544     return err;
1545 }
1546 
ExecuteAndGetString(std::string & outValue,const std::string & sql,const Values & args)1547 int RdbStoreImpl::ExecuteAndGetString(std::string &outValue, const std::string &sql, const Values &args)
1548 {
1549     if (config_.GetDBType() == DB_VECTOR) {
1550         return E_NOT_SUPPORT;
1551     }
1552     auto [errCode, statement] = BeginExecuteSql(sql);
1553     if (statement == nullptr) {
1554         return errCode;
1555     }
1556     ValueObject object;
1557     std::tie(errCode, object) = statement->ExecuteForValue(args);
1558     if (errCode != E_OK) {
1559         LOG_ERROR("failed, app self can check the SQL,  ERROR is %{public}d.", errCode);
1560     }
1561     outValue = static_cast<std::string>(object);
1562     return errCode;
1563 }
1564 
ExecuteForLastInsertedRowId(int64_t & outValue,const std::string & sql,const Values & args)1565 int RdbStoreImpl::ExecuteForLastInsertedRowId(int64_t &outValue, const std::string &sql, const Values &args)
1566 {
1567     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1568         return E_NOT_SUPPORT;
1569     }
1570     auto begin = std::chrono::steady_clock::now();
1571     auto [errCode, statement] = GetStatement(sql, false);
1572     if (statement == nullptr) {
1573         return errCode;
1574     }
1575     auto beginExec = std::chrono::steady_clock::now();
1576     errCode = statement->Execute(args);
1577     if (errCode != E_OK) {
1578         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1579             auto pool = GetPool();
1580             if (pool != nullptr) {
1581                 pool->Dump(true, "INSERT");
1582             }
1583         }
1584         return errCode;
1585     }
1586     auto beginResult = std::chrono::steady_clock::now();
1587     outValue = statement->Changes() > 0 ? statement->LastInsertRowId() : -1;
1588     auto allEnd = std::chrono::steady_clock::now();
1589     int64_t totalCostTime = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - begin).count();
1590     if (totalCostTime >= TIME_OUT) {
1591         int64_t prepareCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginExec - begin).count();
1592         int64_t execCost = std::chrono::duration_cast<std::chrono::milliseconds>(beginResult - beginExec).count();
1593         int64_t resultCost = std::chrono::duration_cast<std::chrono::milliseconds>(allEnd - beginResult).count();
1594         LOG_WARN("total[%{public}" PRId64 "] stmt[%{public}" PRId64 "] exec[%{public}" PRId64
1595                  "] result[%{public}" PRId64 "] "
1596                  "sql[%{public}s]",
1597             totalCostTime, prepareCost, execCost, resultCost, SqliteUtils::Anonymous(sql).c_str());
1598     }
1599     return E_OK;
1600 }
1601 
ExecuteForChangedRowCount(int64_t & outValue,const std::string & sql,const Values & args)1602 int RdbStoreImpl::ExecuteForChangedRowCount(int64_t &outValue, const std::string &sql, const Values &args)
1603 {
1604     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1605         return E_NOT_SUPPORT;
1606     }
1607     auto [errCode, statement] = GetStatement(sql, false);
1608     if (statement == nullptr) {
1609         return errCode;
1610     }
1611     errCode = statement->Execute(args);
1612     if (errCode == E_OK) {
1613         outValue = statement->Changes();
1614         return E_OK;
1615     }
1616     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
1617         auto pool = GetPool();
1618         if (pool != nullptr) {
1619             pool->Dump(true, "UPG DEL");
1620         }
1621     }
1622     return errCode;
1623 }
1624 
GetDataBasePath(const std::string & databasePath,std::string & backupFilePath)1625 int RdbStoreImpl::GetDataBasePath(const std::string &databasePath, std::string &backupFilePath)
1626 {
1627     if (databasePath.empty()) {
1628         return E_INVALID_FILE_PATH;
1629     }
1630 
1631     if (ISFILE(databasePath)) {
1632         backupFilePath = ExtractFilePath(path_) + databasePath;
1633     } else {
1634         // 2 represents two characters starting from the len - 2 position
1635         if (!PathToRealPath(ExtractFilePath(databasePath), backupFilePath) || databasePath.back() == '/' ||
1636             databasePath.substr(databasePath.length() - 2, 2) == "\\") {
1637             LOG_ERROR("Invalid databasePath.");
1638             return E_INVALID_FILE_PATH;
1639         }
1640         backupFilePath = databasePath;
1641     }
1642 
1643     if (backupFilePath == path_) {
1644         LOG_ERROR("The backupPath and path should not be same.");
1645         return E_INVALID_FILE_PATH;
1646     }
1647 
1648     LOG_INFO("databasePath is %{public}s.", SqliteUtils::Anonymous(backupFilePath).c_str());
1649     return E_OK;
1650 }
1651 
GetSlaveName(const std::string & path,std::string & backupFilePath)1652 int RdbStoreImpl::GetSlaveName(const std::string &path, std::string &backupFilePath)
1653 {
1654     std::string suffix(".db");
1655     std::string slaveSuffix("_slave.db");
1656     auto pos = path.find(suffix);
1657     if (pos == std::string::npos) {
1658         backupFilePath = path + slaveSuffix;
1659     } else {
1660         backupFilePath = std::string(path, 0, pos) + slaveSuffix;
1661     }
1662     return E_OK;
1663 }
1664 
1665 /**
1666  * Backup a database from a specified encrypted or unencrypted database file.
1667  */
Backup(const std::string & databasePath,const std::vector<uint8_t> & encryptKey)1668 int RdbStoreImpl::Backup(const std::string &databasePath, const std::vector<uint8_t> &encryptKey)
1669 {
1670     LOG_INFO("Backup db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
1671     if (isReadOnly_ || isMemoryRdb_) {
1672         return E_NOT_SUPPORT;
1673     }
1674     std::string backupFilePath;
1675     if (TryGetMasterSlaveBackupPath(databasePath, backupFilePath)) {
1676         return InnerBackup(backupFilePath, encryptKey);
1677     }
1678 
1679     int ret = GetDataBasePath(databasePath, backupFilePath);
1680     if (ret != E_OK) {
1681         return ret;
1682     }
1683 
1684     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
1685     keyFiles.Lock();
1686 
1687     auto walFile = backupFilePath + "-wal";
1688     if (access(walFile.c_str(), F_OK) == E_OK) {
1689         if (!SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1690             keyFiles.Unlock();
1691             return E_ERROR;
1692         }
1693     }
1694     std::string tempPath = backupFilePath + ".tmp";
1695     if (access(tempPath.c_str(), F_OK) == E_OK) {
1696         SqliteUtils::DeleteFile(backupFilePath);
1697     } else {
1698         if (access(backupFilePath.c_str(), F_OK) == E_OK && !SqliteUtils::RenameFile(backupFilePath, tempPath)) {
1699             LOG_ERROR("rename backup file failed, path:%{public}s, errno:%{public}d",
1700                 SqliteUtils::Anonymous(backupFilePath).c_str(), errno);
1701             keyFiles.Unlock();
1702             return E_ERROR;
1703         }
1704     }
1705     ret = InnerBackup(backupFilePath, encryptKey);
1706     if (ret != E_OK || access(walFile.c_str(), F_OK) == E_OK) {
1707         if (ret == E_DB_NOT_EXIST) {
1708             Reportor::ReportCorrupted(Reportor::Create(config_, ret, "ErrorType: BackupFailed"));
1709         }
1710         if (SqliteUtils::DeleteDirtyFiles(backupFilePath)) {
1711             SqliteUtils::RenameFile(tempPath, backupFilePath);
1712         }
1713     } else {
1714         SqliteUtils::DeleteFile(tempPath);
1715     }
1716     keyFiles.Unlock();
1717     return ret;
1718 }
1719 
CreateBackupBindArgs(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1720 std::vector<ValueObject> RdbStoreImpl::CreateBackupBindArgs(
1721     const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1722 {
1723     std::vector<ValueObject> bindArgs;
1724     bindArgs.emplace_back(databasePath);
1725     if (!destEncryptKey.empty() && !config_.IsEncrypt()) {
1726         bindArgs.emplace_back(destEncryptKey);
1727     } else if (config_.IsEncrypt()) {
1728         std::vector<uint8_t> key = config_.GetEncryptKey();
1729         bindArgs.emplace_back(key);
1730         key.assign(key.size(), 0);
1731     } else {
1732         bindArgs.emplace_back("");
1733     }
1734     return bindArgs;
1735 }
1736 
1737 /**
1738  * Backup a database from a specified encrypted or unencrypted database file.
1739  */
InnerBackup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey)1740 int RdbStoreImpl::InnerBackup(const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey)
1741 {
1742     if (isReadOnly_) {
1743         return E_NOT_SUPPORT;
1744     }
1745 
1746     if (config_.GetDBType() == DB_VECTOR) {
1747         auto [errCode, conn] = GetConn(false);
1748         return errCode != E_OK ? errCode : conn->Backup(databasePath, destEncryptKey, false, slaveStatus_);
1749     }
1750 
1751     if (config_.GetHaMode() != HAMode::SINGLE && SqliteUtils::IsSlaveDbName(databasePath)) {
1752         auto [errCode, conn] = GetConn(false);
1753         return errCode != E_OK ? errCode : conn->Backup(databasePath, {}, false, slaveStatus_);
1754     }
1755 
1756     auto [result, conn] = CreateWritableConn();
1757     if (result != E_OK) {
1758         return result;
1759     }
1760 
1761     if (config_.IsEncrypt()) {
1762         result = SetDefaultEncryptAlgo(conn, config_);
1763         if (result != E_OK) {
1764             return result;
1765         }
1766     }
1767 
1768     std::vector<ValueObject> bindArgs = CreateBackupBindArgs(databasePath, destEncryptKey);
1769     auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_BACKUP_SQL, conn);
1770     errCode = statement->Execute(bindArgs);
1771     if (errCode != E_OK) {
1772         return errCode;
1773     }
1774     errCode = statement->Prepare(GlobalExpr::PRAGMA_BACKUP_JOUR_MODE_WAL);
1775     errCode = statement->Execute();
1776     if (errCode != E_OK) {
1777         return errCode;
1778     }
1779     errCode = statement->Prepare(GlobalExpr::EXPORT_SQL);
1780     int ret = statement->Execute();
1781     errCode = statement->Prepare(GlobalExpr::DETACH_BACKUP_SQL);
1782     int res = statement->Execute();
1783     return (res == E_OK) ? ret : res;
1784 }
1785 
BeginExecuteSql(const std::string & sql)1786 std::pair<int32_t, RdbStoreImpl::Stmt> RdbStoreImpl::BeginExecuteSql(const std::string &sql)
1787 {
1788     int type = SqliteUtils::GetSqlStatementType(sql);
1789     if (SqliteUtils::IsSpecial(type)) {
1790         return { E_NOT_SUPPORT, nullptr };
1791     }
1792 
1793     bool assumeReadOnly = SqliteUtils::IsSqlReadOnly(type);
1794     auto pool = GetPool();
1795     if (pool == nullptr) {
1796         return { E_ALREADY_CLOSED, nullptr };
1797     }
1798     auto conn = pool->AcquireConnection(assumeReadOnly);
1799     if (conn == nullptr) {
1800         return { E_DATABASE_BUSY, nullptr };
1801     }
1802 
1803     auto [errCode, statement] = conn->CreateStatement(sql, conn);
1804     if (statement == nullptr) {
1805         return { errCode, nullptr };
1806     }
1807 
1808     if (statement->ReadOnly() && conn->IsWriter()) {
1809         statement = nullptr;
1810         conn = nullptr;
1811         return GetStatement(sql, true);
1812     }
1813 
1814     return { errCode, statement };
1815 }
1816 
IsHoldingConnection()1817 bool RdbStoreImpl::IsHoldingConnection()
1818 {
1819     return GetPool() != nullptr;
1820 }
1821 
SetDefaultEncryptSql(const std::shared_ptr<Statement> & statement,std::string sql,const RdbStoreConfig & config)1822 int RdbStoreImpl::SetDefaultEncryptSql(
1823     const std::shared_ptr<Statement> &statement, std::string sql, const RdbStoreConfig &config)
1824 {
1825     auto errCode = statement->Prepare(sql);
1826     if (errCode != E_OK) {
1827         LOG_ERROR("Prepare failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1828             SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1829             config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1830             config.GetCryptoParam().cryptoPageSize);
1831         return errCode;
1832     }
1833     errCode = statement->Execute();
1834     if (errCode != E_OK) {
1835         LOG_ERROR("Execute failed: %{public}s, %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
1836             SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetCryptoParam().iterNum,
1837             config.GetCryptoParam().encryptAlgo, config.GetCryptoParam().hmacAlgo, config.GetCryptoParam().kdfAlgo,
1838             config.GetCryptoParam().cryptoPageSize);
1839         return errCode;
1840     }
1841     return E_OK;
1842 }
1843 
SetDefaultEncryptAlgo(const ConnectionPool::SharedConn & conn,const RdbStoreConfig & config)1844 int RdbStoreImpl::SetDefaultEncryptAlgo(const ConnectionPool::SharedConn &conn, const RdbStoreConfig &config)
1845 {
1846     if (conn == nullptr) {
1847         return E_DATABASE_BUSY;
1848     }
1849 
1850     if (!config.GetCryptoParam().IsValid()) {
1851         LOG_ERROR("Invalid crypto param, name:%{public}s", SqliteUtils::Anonymous(config.GetName()).c_str());
1852         return E_INVALID_ARGS;
1853     }
1854 
1855     std::string sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_CIPHER_PREFIX) +
1856                       SqliteUtils::EncryptAlgoDescription(config.GetEncryptAlgo()) +
1857                       std::string(GlobalExpr::ALGO_SUFFIX);
1858     auto [errCode, statement] = conn->CreateStatement(sql, conn);
1859     errCode = SetDefaultEncryptSql(statement, sql, config);
1860     if (errCode != E_OK) {
1861         return errCode;
1862     }
1863 
1864     if (config.GetIter() > 0) {
1865         sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ITER_PREFIX) + std::to_string(config.GetIter());
1866         errCode = SetDefaultEncryptSql(statement, sql, config);
1867         if (errCode != E_OK) {
1868             return errCode;
1869         }
1870     }
1871 
1872     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_HMAC_ALGO_PREFIX) +
1873           SqliteUtils::HmacAlgoDescription(config.GetCryptoParam().hmacAlgo) + std::string(GlobalExpr::ALGO_SUFFIX);
1874     errCode = SetDefaultEncryptSql(statement, sql, config);
1875     if (errCode != E_OK) {
1876         return errCode;
1877     }
1878 
1879     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_KDF_ALGO_PREFIX) +
1880                       SqliteUtils::KdfAlgoDescription(config.GetCryptoParam().kdfAlgo) +
1881                       std::string(GlobalExpr::ALGO_SUFFIX);
1882     errCode = SetDefaultEncryptSql(statement, sql, config);
1883     if (errCode != E_OK) {
1884         return errCode;
1885     }
1886 
1887     sql = std::string(GlobalExpr::CIPHER_DEFAULT_ATTACH_PAGE_SIZE_PREFIX) +
1888           std::to_string(config.GetCryptoParam().cryptoPageSize);
1889     return SetDefaultEncryptSql(statement, sql, config);
1890 }
1891 
AttachInner(const RdbStoreConfig & config,const std::string & attachName,const std::string & dbPath,const std::vector<uint8_t> & key,int32_t waitTime)1892 int RdbStoreImpl::AttachInner(const RdbStoreConfig &config, const std::string &attachName, const std::string &dbPath,
1893     const std::vector<uint8_t> &key, int32_t waitTime)
1894 {
1895     auto pool = GetPool();
1896     if (pool == nullptr) {
1897         return E_ALREADY_CLOSED;
1898     }
1899     auto [conn, readers] = pool->AcquireAll(waitTime);
1900     if (conn == nullptr) {
1901         return E_DATABASE_BUSY;
1902     }
1903 
1904     if (!isMemoryRdb_ && conn->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
1905         // close first to prevent the connection from being put back.
1906         pool->CloseAllConnections();
1907         conn = nullptr;
1908         readers.clear();
1909         auto [err, newConn] = pool->DisableWal();
1910         if (err != E_OK) {
1911             return err;
1912         }
1913         conn = newConn;
1914     }
1915     std::vector<ValueObject> bindArgs;
1916     bindArgs.emplace_back(ValueObject(dbPath));
1917     bindArgs.emplace_back(ValueObject(attachName));
1918     if (!key.empty()) {
1919         auto ret = SetDefaultEncryptAlgo(conn, config);
1920         if (ret != E_OK) {
1921             return ret;
1922         }
1923         bindArgs.emplace_back(ValueObject(key));
1924         auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_WITH_KEY_SQL, conn);
1925         if (statement == nullptr || errCode != E_OK) {
1926             LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1927             return E_ERROR;
1928         }
1929         return statement->Execute(bindArgs);
1930     }
1931 
1932     auto [errCode, statement] = conn->CreateStatement(GlobalExpr::ATTACH_SQL, conn);
1933     if (statement == nullptr || errCode != E_OK) {
1934         LOG_ERROR("Attach get statement failed, code is %{public}d", errCode);
1935         return errCode;
1936     }
1937     return statement->Execute(bindArgs);
1938 }
1939 
1940 /**
1941  * Attaches a database.
1942  */
Attach(const RdbStoreConfig & config,const std::string & attachName,int32_t waitTime)1943 std::pair<int32_t, int32_t> RdbStoreImpl::Attach(
1944     const RdbStoreConfig &config, const std::string &attachName, int32_t waitTime)
1945 {
1946     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR) || config_.GetHaMode() != HAMode::SINGLE ||
1947         (config.IsMemoryRdb() && config.IsEncrypt())) {
1948         return { E_NOT_SUPPORT, 0 };
1949     }
1950     std::string dbPath;
1951     int err = SqliteGlobalConfig::GetDbPath(config, dbPath);
1952     if (err != E_OK || (access(dbPath.c_str(), F_OK) != E_OK && !config.IsMemoryRdb())) {
1953         return { E_INVALID_FILE_PATH, 0 };
1954     }
1955 
1956     // encrypted databases are not supported to attach a non encrypted database.
1957     if (!config.IsEncrypt() && config_.IsEncrypt()) {
1958         return { E_NOT_SUPPORT, 0 };
1959     }
1960 
1961     if (attachedInfo_.Contains(attachName)) {
1962         return { E_ATTACHED_DATABASE_EXIST, 0 };
1963     }
1964 
1965     std::vector<uint8_t> key;
1966     config.Initialize();
1967     if (config.IsEncrypt()) {
1968         key = config.GetEncryptKey();
1969     }
1970     err = AttachInner(config, attachName, dbPath, key, waitTime);
1971     key.assign(key.size(), 0);
1972     if (err == E_SQLITE_ERROR) {
1973         // only when attachName is already in use, SQLITE-ERROR will be reported here.
1974         return { E_ATTACHED_DATABASE_EXIST, 0 };
1975     } else if (err != E_OK) {
1976         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach fileName"
1977                   "[%{public}s]",
1978             err, SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str(),
1979             SqliteUtils::Anonymous(config.GetName()).c_str());
1980         return { err, 0 };
1981     }
1982     if (!attachedInfo_.Insert(attachName, dbPath)) {
1983         return { E_ATTACHED_DATABASE_EXIST, 0 };
1984     }
1985     return { E_OK, attachedInfo_.Size() };
1986 }
1987 
Detach(const std::string & attachName,int32_t waitTime)1988 std::pair<int32_t, int32_t> RdbStoreImpl::Detach(const std::string &attachName, int32_t waitTime)
1989 {
1990     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
1991         return { E_NOT_SUPPORT, 0 };
1992     }
1993     if (!attachedInfo_.Contains(attachName)) {
1994         return { E_OK, attachedInfo_.Size() };
1995     }
1996 
1997     auto pool = GetPool();
1998     if (pool == nullptr) {
1999         return { E_ALREADY_CLOSED, 0 };
2000     }
2001     auto [connection, readers] = pool->AcquireAll(waitTime);
2002     if (connection == nullptr) {
2003         return { E_DATABASE_BUSY, 0 };
2004     }
2005     std::vector<ValueObject> bindArgs;
2006     bindArgs.push_back(ValueObject(attachName));
2007 
2008     auto [errCode, statement] = connection->CreateStatement(GlobalExpr::DETACH_SQL, connection);
2009     if (statement == nullptr || errCode != E_OK) {
2010         LOG_ERROR("Detach get statement failed, errCode %{public}d", errCode);
2011         return { errCode, 0 };
2012     }
2013     errCode = statement->Execute(bindArgs);
2014     if (errCode != E_OK) {
2015         LOG_ERROR("failed, errCode[%{public}d] fileName[%{public}s] attachName[%{public}s] attach", errCode,
2016             SqliteUtils::Anonymous(config_.GetName()).c_str(), attachName.c_str());
2017         return { errCode, 0 };
2018     }
2019 
2020     attachedInfo_.Erase(attachName);
2021     if (!attachedInfo_.Empty()) {
2022         return { E_OK, attachedInfo_.Size() };
2023     }
2024     statement = nullptr;
2025     if (!isMemoryRdb_ && connection->GetJournalMode() == static_cast<int32_t>(JournalMode::MODE_WAL)) {
2026         // close first to prevent the connection from being put back.
2027         pool->CloseAllConnections();
2028         connection = nullptr;
2029         readers.clear();
2030         errCode = pool->EnableWal();
2031     }
2032     return { errCode, 0 };
2033 }
2034 
2035 /**
2036  * Obtains the database version.
2037  */
GetVersion(int & version)2038 int RdbStoreImpl::GetVersion(int &version)
2039 {
2040     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_VERSION, isReadOnly_);
2041     if (statement == nullptr) {
2042         return errCode;
2043     }
2044     ValueObject value;
2045     std::tie(errCode, value) = statement->ExecuteForValue();
2046     auto val = std::get_if<int64_t>(&value.value);
2047     if (val != nullptr) {
2048         version = static_cast<int>(*val);
2049     }
2050     return errCode;
2051 }
2052 
2053 /**
2054  * Sets the version of a new database.
2055  */
SetVersion(int version)2056 int RdbStoreImpl::SetVersion(int version)
2057 {
2058     if (isReadOnly_) {
2059         return E_NOT_SUPPORT;
2060     }
2061     std::string sql = std::string(GlobalExpr::PRAGMA_VERSION) + " = " + std::to_string(version);
2062     auto [errCode, statement] = GetStatement(sql);
2063     if (statement == nullptr) {
2064         return errCode;
2065     }
2066     return statement->Execute();
2067 }
2068 /**
2069  * Begins a transaction in EXCLUSIVE mode.
2070  */
BeginTransaction()2071 int RdbStoreImpl::BeginTransaction()
2072 {
2073     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2074     auto pool = GetPool();
2075     if (pool == nullptr) {
2076         return E_ALREADY_CLOSED;
2077     }
2078     std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2079     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2080         return E_NOT_SUPPORT;
2081     }
2082     // size + 1 means the number of transactions in process
2083     RdbStatReporter reportStat(RDB_PERF, BEGINTRANSACTION, config_, reportFunc_);
2084     size_t transactionId = pool->GetTransactionStack().size() + 1;
2085     BaseTransaction transaction(pool->GetTransactionStack().size());
2086     auto [errCode, statement] = GetStatement(transaction.GetTransactionStr());
2087     if (statement == nullptr) {
2088         return errCode;
2089     }
2090     errCode = statement->Execute();
2091     if (errCode != E_OK) {
2092         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2093             pool->Dump(true, "BEGIN");
2094         }
2095         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2096             SqliteUtils::Anonymous(name_).c_str(), errCode);
2097         return errCode;
2098     }
2099     pool->SetInTransaction(true);
2100     pool->GetTransactionStack().push(transaction);
2101     // 1 means the number of transactions in process
2102     if (transactionId > 1) {
2103         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2104             SqliteUtils::Anonymous(name_).c_str(), errCode);
2105     }
2106 
2107     return E_OK;
2108 }
2109 
BeginTrans()2110 std::pair<int, int64_t> RdbStoreImpl::BeginTrans()
2111 {
2112     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2113     if (!config_.IsVector() || isReadOnly_) {
2114         return { E_NOT_SUPPORT, 0 };
2115     }
2116 
2117     int64_t tmpTrxId = 0;
2118     auto pool = GetPool();
2119     if (pool == nullptr) {
2120         return { E_ALREADY_CLOSED, 0 };
2121     }
2122     auto [errCode, connection] = pool->CreateTransConn(false);
2123     if (connection == nullptr) {
2124         LOG_ERROR("Get null connection, storeName: %{public}s errCode:0x%{public}x.",
2125             SqliteUtils::Anonymous(name_).c_str(), errCode);
2126         return { errCode, 0 };
2127     }
2128 
2129     tmpTrxId = newTrxId_.fetch_add(1);
2130     trxConnMap_.Insert(tmpTrxId, connection);
2131     errCode = ExecuteByTrxId(BEGIN_TRANSACTION_SQL, tmpTrxId);
2132     if (errCode != E_OK) {
2133         trxConnMap_.Erase(tmpTrxId);
2134     }
2135     return { errCode, tmpTrxId };
2136 }
2137 
2138 /**
2139 * Begins a transaction in EXCLUSIVE mode.
2140 */
RollBack()2141 int RdbStoreImpl::RollBack()
2142 {
2143     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2144     auto pool = GetPool();
2145     if (pool == nullptr) {
2146         return E_ALREADY_CLOSED;
2147     }
2148     std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2149     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2150         return E_NOT_SUPPORT;
2151     }
2152     RdbStatReporter reportStat(RDB_PERF, ROLLBACK, config_, reportFunc_);
2153     size_t transactionId = pool->GetTransactionStack().size();
2154 
2155     if (pool->GetTransactionStack().empty()) {
2156         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId,
2157             SqliteUtils::Anonymous(name_).c_str());
2158         return E_NO_TRANSACTION_IN_SESSION;
2159     }
2160     BaseTransaction transaction = pool->GetTransactionStack().top();
2161     pool->GetTransactionStack().pop();
2162     if (transaction.GetType() != TransType::ROLLBACK_SELF && !pool->GetTransactionStack().empty()) {
2163         pool->GetTransactionStack().top().SetChildFailure(true);
2164     }
2165     auto [errCode, statement] = GetStatement(transaction.GetRollbackStr());
2166     if (statement == nullptr) {
2167         if (errCode == E_DATABASE_BUSY) {
2168             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2169         }
2170         // size + 1 means the number of transactions in process
2171         LOG_ERROR("transaction id: %{public}zu, storeName: %{public}s", transactionId + 1,
2172             SqliteUtils::Anonymous(name_).c_str());
2173         return E_DATABASE_BUSY;
2174     }
2175     errCode = statement->Execute();
2176     if (errCode != E_OK) {
2177         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2178             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: RollBusy"));
2179         }
2180         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2181             SqliteUtils::Anonymous(name_).c_str(), errCode);
2182         return errCode;
2183     }
2184     if (pool->GetTransactionStack().empty()) {
2185         pool->SetInTransaction(false);
2186     }
2187     // 1 means the number of transactions in process
2188     if (transactionId > 1) {
2189         LOG_WARN("transaction id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2190             SqliteUtils::Anonymous(name_).c_str(), errCode);
2191     }
2192     return E_OK;
2193 }
2194 
ExecuteByTrxId(const std::string & sql,int64_t trxId,bool closeConnAfterExecute,const std::vector<ValueObject> & bindArgs)2195 int RdbStoreImpl::ExecuteByTrxId(
2196     const std::string &sql, int64_t trxId, bool closeConnAfterExecute, const std::vector<ValueObject> &bindArgs)
2197 {
2198     if ((!config_.IsVector()) || isReadOnly_) {
2199         return E_NOT_SUPPORT;
2200     }
2201     if (trxId == 0) {
2202         return E_INVALID_ARGS;
2203     }
2204 
2205     if (!trxConnMap_.Contains(trxId)) {
2206         LOG_ERROR("trxId hasn't appeared before %{public}" PRIu64, trxId);
2207         return E_INVALID_ARGS;
2208     }
2209     auto time = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
2210     auto result = trxConnMap_.Find(trxId);
2211     auto connection = result.second;
2212     if (connection == nullptr) {
2213         LOG_ERROR("Get null connection, storeName: %{public}s time:%{public}" PRIu64 ".",
2214             SqliteUtils::Anonymous(name_).c_str(), time);
2215         return E_ERROR;
2216     }
2217     auto [ret, statement] = GetStatement(sql, connection);
2218     if (ret != E_OK) {
2219         return ret;
2220     }
2221     ret = statement->Execute(bindArgs);
2222     if (ret != E_OK) {
2223         LOG_ERROR("transaction id: %{public}" PRIu64 ", storeName: %{public}s, errCode: %{public}d" PRIu64, trxId,
2224             SqliteUtils::Anonymous(name_).c_str(), ret);
2225         trxConnMap_.Erase(trxId);
2226         return ret;
2227     }
2228     if (closeConnAfterExecute) {
2229         trxConnMap_.Erase(trxId);
2230     }
2231     return E_OK;
2232 }
2233 
RollBack(int64_t trxId)2234 int RdbStoreImpl::RollBack(int64_t trxId)
2235 {
2236     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2237     return ExecuteByTrxId(ROLLBACK_TRANSACTION_SQL, trxId, true);
2238 }
2239 
2240 /**
2241 * Begins a transaction in EXCLUSIVE mode.
2242 */
Commit()2243 int RdbStoreImpl::Commit()
2244 {
2245     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2246     auto pool = GetPool();
2247     if (pool == nullptr) {
2248         return E_ALREADY_CLOSED;
2249     }
2250     std::lock_guard<std::mutex> lockGuard(pool->GetTransactionStackMutex());
2251     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2252         return E_NOT_SUPPORT;
2253     }
2254     RdbStatReporter reportStat(RDB_PERF, COMMIT, config_, reportFunc_);
2255     size_t transactionId = pool->GetTransactionStack().size();
2256 
2257     if (pool->GetTransactionStack().empty()) {
2258         return E_OK;
2259     }
2260     BaseTransaction transaction = pool->GetTransactionStack().top();
2261     std::string sqlStr = transaction.GetCommitStr();
2262     if (sqlStr.size() <= 1) {
2263         LOG_WARN("id: %{public}zu, storeName: %{public}s, sql: %{public}s", transactionId,
2264             SqliteUtils::Anonymous(name_).c_str(), sqlStr.c_str());
2265         pool->GetTransactionStack().pop();
2266         return E_OK;
2267     }
2268     auto [errCode, statement] = GetStatement(sqlStr);
2269     if (statement == nullptr) {
2270         if (errCode == E_DATABASE_BUSY) {
2271             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2272         }
2273         LOG_ERROR("id: %{public}zu, storeName: %{public}s, statement error", transactionId,
2274             SqliteUtils::Anonymous(name_).c_str());
2275         return E_DATABASE_BUSY;
2276     }
2277     errCode = statement->Execute();
2278     if (errCode != E_OK) {
2279         if (errCode == E_SQLITE_BUSY || errCode == E_SQLITE_LOCKED) {
2280             Reportor::ReportCorrupted(Reportor::Create(config_, errCode, "ErrorType: CommitBusy"));
2281         }
2282         LOG_ERROR("failed, id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2283             SqliteUtils::Anonymous(name_).c_str(), errCode);
2284         return errCode;
2285     }
2286     pool->SetInTransaction(false);
2287     // 1 means the number of transactions in process
2288     if (transactionId > 1) {
2289         LOG_WARN("id: %{public}zu, storeName: %{public}s, errCode: %{public}d", transactionId,
2290             SqliteUtils::Anonymous(name_).c_str(), errCode);
2291     }
2292     pool->GetTransactionStack().pop();
2293     return E_OK;
2294 }
2295 
Commit(int64_t trxId)2296 int RdbStoreImpl::Commit(int64_t trxId)
2297 {
2298     DISTRIBUTED_DATA_HITRACE(std::string(__FUNCTION__));
2299     return ExecuteByTrxId(COMMIT_TRANSACTION_SQL, trxId, true);
2300 }
2301 
IsInTransaction()2302 bool RdbStoreImpl::IsInTransaction()
2303 {
2304     if (isReadOnly_ || (config_.GetDBType() == DB_VECTOR)) {
2305         return false;
2306     }
2307     auto pool = GetPool();
2308     if (pool == nullptr) {
2309         return false;
2310     }
2311     return pool->IsInTransaction();
2312 }
2313 
CheckAttach(const std::string & sql)2314 int RdbStoreImpl::CheckAttach(const std::string &sql)
2315 {
2316     size_t index = sql.find_first_not_of(' ');
2317     if (index == std::string::npos) {
2318         return E_OK;
2319     }
2320 
2321     /* The first 3 characters can determine the type */
2322     std::string sqlType = sql.substr(index, 3);
2323     sqlType = SqliteUtils::StrToUpper(sqlType);
2324     if (sqlType != "ATT") {
2325         return E_OK;
2326     }
2327 
2328     auto [errCode, statement] = GetStatement(GlobalExpr::PRAGMA_JOUR_MODE_EXP);
2329     if (statement == nullptr) {
2330         return errCode;
2331     }
2332 
2333     errCode = statement->Execute();
2334     if (errCode != E_OK) {
2335         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errCode);
2336         return errCode;
2337     }
2338     auto [errorCode, valueObject] = statement->GetColumn(0);
2339     if (errorCode != E_OK) {
2340         LOG_ERROR("RdbStoreImpl CheckAttach fail to get journal mode : %{public}d", errorCode);
2341         return errorCode;
2342     }
2343     auto journal = std::get_if<std::string>(&valueObject.value);
2344     auto journalMode = SqliteUtils::StrToUpper((journal == nullptr) ? "" : *journal);
2345     if (journalMode == RdbStoreConfig::DB_DEFAULT_JOURNAL_MODE) {
2346         LOG_ERROR("RdbStoreImpl attach is not supported in WAL mode");
2347         return E_NOT_SUPPORTED_ATTACH_IN_WAL_MODE;
2348     }
2349 
2350     return E_OK;
2351 }
2352 
IsOpen() const2353 bool RdbStoreImpl::IsOpen() const
2354 {
2355     return isOpen_;
2356 }
2357 
GetPath()2358 std::string RdbStoreImpl::GetPath()
2359 {
2360     return path_;
2361 }
2362 
IsReadOnly() const2363 bool RdbStoreImpl::IsReadOnly() const
2364 {
2365     return isReadOnly_;
2366 }
2367 
IsMemoryRdb() const2368 bool RdbStoreImpl::IsMemoryRdb() const
2369 {
2370     return isMemoryRdb_;
2371 }
2372 
GetName()2373 std::string RdbStoreImpl::GetName()
2374 {
2375     return name_;
2376 }
2377 
DoCloudSync(const std::string & table)2378 void RdbStoreImpl::DoCloudSync(const std::string &table)
2379 {
2380 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2381     auto needSync = cloudInfo_->Change(table);
2382     if (!needSync) {
2383         return;
2384     }
2385     auto pool = TaskExecutor::GetInstance().GetExecutor();
2386     if (pool == nullptr) {
2387         return;
2388     }
2389     auto interval =
2390         std::chrono::duration_cast<std::chrono::steady_clock::duration>(std::chrono::milliseconds(INTERVAL));
2391     pool->Schedule(interval, [cloudInfo = std::weak_ptr<CloudTables>(cloudInfo_), param = syncerParam_]() {
2392         auto changeInfo = cloudInfo.lock();
2393         if (changeInfo == nullptr) {
2394             return;
2395         }
2396         auto tables = changeInfo->Steal();
2397         if (tables.empty()) {
2398             return;
2399         }
2400         DistributedRdb::RdbService::Option option = { DistributedRdb::TIME_FIRST, 0, true, true };
2401         auto memo = AbsRdbPredicates(std::vector<std::string>(tables.begin(), tables.end())).GetDistributedPredicates();
2402         InnerSync(param, option, memo, nullptr);
2403     });
2404 #endif
2405 }
GetFileType()2406 std::string RdbStoreImpl::GetFileType()
2407 {
2408     return fileType_;
2409 }
2410 
2411 /**
2412  * Sets the database locale.
2413  */
ConfigLocale(const std::string & localeStr)2414 int RdbStoreImpl::ConfigLocale(const std::string &localeStr)
2415 {
2416     if (!isOpen_) {
2417         LOG_ERROR("The connection pool has been closed.");
2418         return E_ALREADY_CLOSED;
2419     }
2420 
2421     auto pool = GetPool();
2422     if (pool == nullptr) {
2423         return E_ALREADY_CLOSED;
2424     }
2425     return pool->ConfigLocale(localeStr);
2426 }
2427 
GetDestPath(const std::string & backupPath,std::string & destPath)2428 int RdbStoreImpl::GetDestPath(const std::string &backupPath, std::string &destPath)
2429 {
2430     int ret = GetDataBasePath(backupPath, destPath);
2431     if (ret != E_OK) {
2432         return ret;
2433     }
2434     std::string tempPath = destPath + ".tmp";
2435     if (access(tempPath.c_str(), F_OK) == E_OK) {
2436         destPath = tempPath;
2437     }
2438 
2439     if (access(destPath.c_str(), F_OK) != E_OK) {
2440         LOG_ERROR("The backupFilePath does not exists.");
2441         return E_INVALID_FILE_PATH;
2442     }
2443     return E_OK;
2444 }
2445 
Restore(const std::string & backupPath,const std::vector<uint8_t> & newKey)2446 int RdbStoreImpl::Restore(const std::string &backupPath, const std::vector<uint8_t> &newKey)
2447 {
2448     LOG_INFO("Restore db: %{public}s.", SqliteUtils::Anonymous(config_.GetName()).c_str());
2449     if (isReadOnly_ || isMemoryRdb_) {
2450         return E_NOT_SUPPORT;
2451     }
2452 
2453     auto pool = GetPool();
2454     if (pool == nullptr || !isOpen_) {
2455         LOG_ERROR("The pool is: %{public}d, pool is null: %{public}d", isOpen_, pool == nullptr);
2456         return E_ALREADY_CLOSED;
2457     }
2458 
2459     RdbSecurityManager::KeyFiles keyFiles(path_ + BACKUP_RESTORE);
2460     keyFiles.Lock();
2461     std::string destPath;
2462     bool isOK = TryGetMasterSlaveBackupPath(backupPath, destPath, true);
2463     if (!isOK) {
2464         int ret = GetDestPath(backupPath, destPath);
2465         if (ret != E_OK) {
2466             keyFiles.Unlock();
2467             return ret;
2468         }
2469     }
2470 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2471     auto [err, service] = RdbMgr::GetInstance().GetRdbService(syncerParam_);
2472     if (service != nullptr) {
2473         service->Disable(syncerParam_);
2474     }
2475 #endif
2476     bool corrupt = Reportor::IsReportCorruptedFault(path_);
2477     int errCode = pool->ChangeDbFileForRestore(path_, destPath, newKey, slaveStatus_);
2478     keyFiles.Unlock();
2479 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
2480     SecurityPolicy::SetSecurityLabel(config_);
2481     if (service != nullptr) {
2482         service->Enable(syncerParam_);
2483         if (errCode == E_OK) {
2484             auto syncerParam = syncerParam_;
2485             syncerParam.infos_ = Connection::Collect(config_);
2486             service->AfterOpen(syncerParam);
2487             NotifyDataChange();
2488         }
2489     }
2490 #endif
2491     if (errCode == E_OK) {
2492         ExchangeSlaverToMaster();
2493         Reportor::ReportRestore(Reportor::Create(config_, E_OK, "ErrorType::RdbStoreImpl::Restore", false), corrupt);
2494         rebuild_ = RebuiltType::NONE;
2495     }
2496     DoCloudSync("");
2497     return errCode;
2498 }
2499 
CreateWritableConn()2500 std::pair<int32_t, std::shared_ptr<Connection>> RdbStoreImpl::CreateWritableConn()
2501 {
2502     auto config = config_;
2503     config.SetHaMode(HAMode::SINGLE);
2504     config.SetCreateNecessary(false);
2505     auto [result, conn] = Connection::Create(config, true);
2506     if (result != E_OK || conn == nullptr) {
2507         LOG_ERROR("create connection failed, err:%{public}d", result);
2508         return { result, nullptr };
2509     }
2510     return { E_OK, conn };
2511 }
2512 
GetStatement(const std::string & sql,std::shared_ptr<Connection> conn) const2513 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(
2514     const std::string &sql, std::shared_ptr<Connection> conn) const
2515 {
2516     if (conn == nullptr) {
2517         return { E_DATABASE_BUSY, nullptr };
2518     }
2519     return conn->CreateStatement(sql, conn);
2520 }
2521 
GetStatement(const std::string & sql,bool read) const2522 std::pair<int32_t, std::shared_ptr<Statement>> RdbStoreImpl::GetStatement(const std::string &sql, bool read) const
2523 {
2524     auto pool = GetPool();
2525     if (pool == nullptr) {
2526         return { E_ALREADY_CLOSED, nullptr };
2527     }
2528     auto conn = pool->AcquireConnection(read);
2529     if (conn == nullptr) {
2530         return { E_DATABASE_BUSY, nullptr };
2531     }
2532     return conn->CreateStatement(sql, conn);
2533 }
2534 
GetRebuilt(RebuiltType & rebuilt)2535 int RdbStoreImpl::GetRebuilt(RebuiltType &rebuilt)
2536 {
2537     rebuilt = static_cast<RebuiltType>(rebuild_);
2538     return E_OK;
2539 }
2540 
InterruptBackup()2541 int RdbStoreImpl::InterruptBackup()
2542 {
2543     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER) {
2544         return E_NOT_SUPPORT;
2545     }
2546     if (slaveStatus_ == SlaveStatus::BACKING_UP) {
2547         slaveStatus_ = SlaveStatus::BACKUP_INTERRUPT;
2548         return E_OK;
2549     }
2550     return E_CANCEL;
2551 }
2552 
GetBackupStatus() const2553 int32_t RdbStoreImpl::GetBackupStatus() const
2554 {
2555     if (config_.GetHaMode() != HAMode::MANUAL_TRIGGER && config_.GetHaMode() != HAMode::MAIN_REPLICA) {
2556         return SlaveStatus::UNDEFINED;
2557     }
2558     return slaveStatus_;
2559 }
2560 
TryGetMasterSlaveBackupPath(const std::string & srcPath,std::string & destPath,bool isRestore)2561 bool RdbStoreImpl::TryGetMasterSlaveBackupPath(const std::string &srcPath, std::string &destPath, bool isRestore)
2562 {
2563     if (!srcPath.empty() || config_.GetHaMode() == HAMode::SINGLE || config_.GetDBType() != DB_SQLITE) {
2564         return false;
2565     }
2566     int ret = GetSlaveName(config_.GetPath(), destPath);
2567     if (ret != E_OK) {
2568         destPath = {};
2569         return false;
2570     }
2571     if (isRestore && access(destPath.c_str(), F_OK) != 0) {
2572         LOG_WARN("The backup path can not access: %{public}s", SqliteUtils::Anonymous(destPath).c_str());
2573         return false;
2574     }
2575     return true;
2576 }
2577 
IsSlaveDiffFromMaster() const2578 bool RdbStoreImpl::IsSlaveDiffFromMaster() const
2579 {
2580     std::string slaveDbPath = SqliteUtils::GetSlavePath(config_.GetPath());
2581     return SqliteUtils::IsSlaveInvalid(config_.GetPath()) || (access(slaveDbPath.c_str(), F_OK) != 0);
2582 }
2583 
ExchangeSlaverToMaster()2584 int32_t RdbStoreImpl::ExchangeSlaverToMaster()
2585 {
2586     if (isReadOnly_ || isMemoryRdb_ || rebuild_ != RebuiltType::NONE) {
2587         return E_OK;
2588     }
2589     auto [errCode, conn] = GetConn(false);
2590     if (errCode != E_OK) {
2591         return errCode;
2592     }
2593     auto strategy = conn->GenerateExchangeStrategy(slaveStatus_);
2594     if (strategy != ExchangeStrategy::NOT_HANDLE) {
2595         LOG_WARN("exchange st:%{public}d, %{public}s,", strategy, SqliteUtils::Anonymous(config_.GetName()).c_str());
2596     }
2597     int ret = E_OK;
2598     if (strategy == ExchangeStrategy::RESTORE) {
2599         conn = nullptr;
2600         // disable is required before restore
2601         ret = Restore({}, {});
2602     } else if (strategy == ExchangeStrategy::BACKUP) {
2603         // async backup
2604         ret = conn->Backup({}, {}, true, slaveStatus_);
2605     }
2606     return ret;
2607 }
2608 
GetDbType() const2609 int32_t RdbStoreImpl::GetDbType() const
2610 {
2611     return config_.GetDBType();
2612 }
2613 
CreateTransaction(int32_t type)2614 std::pair<int32_t, std::shared_ptr<Transaction>> RdbStoreImpl::CreateTransaction(int32_t type)
2615 {
2616     if (isReadOnly_) {
2617         return { E_NOT_SUPPORT, nullptr };
2618     }
2619 
2620     auto pool = GetPool();
2621     if (pool == nullptr) {
2622         return { E_ALREADY_CLOSED, nullptr };
2623     }
2624     auto [errCode, conn] = pool->CreateTransConn();
2625     if (conn == nullptr) {
2626         return { errCode, nullptr };
2627     }
2628     std::shared_ptr<Transaction> trans;
2629     std::tie(errCode, trans) = Transaction::Create(type, conn, config_.GetName());
2630     if (trans == nullptr) {
2631         if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
2632             pool->Dump(true, "TRANS");
2633         }
2634         return { errCode, nullptr };
2635     }
2636 
2637     std::lock_guard<decltype(mutex_)> guard(mutex_);
2638     for (auto it = transactions_.begin(); it != transactions_.end();) {
2639         if (it->expired()) {
2640             it = transactions_.erase(it);
2641         } else {
2642             it++;
2643         }
2644     }
2645     transactions_.push_back(trans);
2646     return { errCode, trans };
2647 }
2648 
AddTables(const std::vector<std::string> & tables)2649 int32_t RdbStoreImpl::CloudTables::AddTables(const std::vector<std::string> &tables)
2650 {
2651     std::lock_guard<std::mutex> lock(mutex_);
2652     for (auto &table : tables) {
2653         tables_.insert(table);
2654     }
2655     return E_OK;
2656 }
2657 
RmvTables(const std::vector<std::string> & tables)2658 int32_t RdbStoreImpl::CloudTables::RmvTables(const std::vector<std::string> &tables)
2659 {
2660     std::lock_guard<std::mutex> lock(mutex_);
2661     for (auto &table : tables) {
2662         tables_.erase(table);
2663     }
2664     return E_OK;
2665 }
2666 
Change(const std::string & table)2667 bool RdbStoreImpl::CloudTables::Change(const std::string &table)
2668 {
2669     bool needSync = false;
2670     {
2671         std::lock_guard<std::mutex> lock(mutex_);
2672         if (tables_.empty() || (!table.empty() && tables_.find(table) == tables_.end())) {
2673             return needSync;
2674         }
2675         // from empty, then need schedule the cloud sync, others only wait the schedule execute
2676         needSync = changes_.empty();
2677         if (!table.empty()) {
2678             changes_.insert(table);
2679         } else {
2680             changes_.insert(tables_.begin(), tables_.end());
2681         }
2682     }
2683     return needSync;
2684 }
2685 
Steal()2686 std::set<std::string> RdbStoreImpl::CloudTables::Steal()
2687 {
2688     std::set<std::string> result;
2689     {
2690         std::lock_guard<std::mutex> lock(mutex_);
2691         result = std::move(changes_);
2692     }
2693     return result;
2694 }
2695 } // namespace OHOS::NativeRdb
2696