1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "SqliteConnection"
17 #include "sqlite_connection.h"
18
19 #include <dlfcn.h>
20 #include <sqlite3sym.h>
21 #include <sys/stat.h>
22 #include <unistd.h>
23
24 #include <cerrno>
25 #include <memory>
26 #include <sstream>
27 #include <string>
28
29 #include "global_resource.h"
30 #include "logger.h"
31 #include "raw_data_parser.h"
32 #include "rdb_errno.h"
33 #include "rdb_fault_hiview_reporter.h"
34 #include "rdb_icu_manager.h"
35 #include "rdb_local_db_observer.h"
36 #include "rdb_security_manager.h"
37 #include "rdb_sql_log.h"
38 #include "rdb_sql_statistic.h"
39 #include "rdb_store_config.h"
40 #include "relational_store_client.h"
41 #include "sqlite3.h"
42 #include "sqlite_default_function.h"
43 #include "sqlite_errno.h"
44 #include "sqlite_global_config.h"
45 #include "sqlite_utils.h"
46 #include "suspender.h"
47 #include "value_object.h"
48 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
49 #include "rdb_manager_impl.h"
50 #include "relational/relational_store_sqlite_ext.h"
51 #endif
52 #include "task_executor.h"
53
54 namespace OHOS {
55 namespace NativeRdb {
56 using namespace OHOS::Rdb;
57 using namespace std::chrono;
58 using RdbKeyFile = RdbSecurityManager::KeyFileType;
59 using Reportor = RdbFaultHiViewReporter;
60 constexpr const char *INTEGRITIES[] = { nullptr, "PRAGMA quick_check", "PRAGMA integrity_check" };
61 constexpr SqliteConnection::Suffix SqliteConnection::FILE_SUFFIXES[];
62 constexpr int SqliteConnection::DEFAULT_BUSY_TIMEOUT_MS;
63 constexpr int SqliteConnection::BACKUP_PAGES_PRE_STEP; // 1024 * 4 * 12800 == 50m
64 constexpr int SqliteConnection::BACKUP_PRE_WAIT_TIME;
65 constexpr int SqliteConnection::RESTORE_PRE_WAIT_TIME;
66 constexpr ssize_t SqliteConnection::SLAVE_WAL_SIZE_LIMIT;
67 constexpr ssize_t SqliteConnection::SLAVE_INTEGRITY_CHECK_LIMIT;
68 constexpr uint32_t SqliteConnection::NO_ITER;
69 constexpr unsigned short SqliteConnection::BINLOG_FILE_NUMS_LIMIT;
70 constexpr uint32_t SqliteConnection::BINLOG_FILE_SIZE_LIMIT;
71 constexpr uint32_t SqliteConnection::DB_INDEX;
72 constexpr uint32_t SqliteConnection::WAL_INDEX;
73 ConcurrentMap<std::string, std::weak_ptr<SqliteConnection>> SqliteConnection::reusableReplicas_ = {};
74 __attribute__((used))
75 const int32_t SqliteConnection::regCreator_ = Connection::RegisterCreator(DB_SQLITE, SqliteConnection::Create);
76 __attribute__((used))
77 const int32_t SqliteConnection::regRepairer_ = Connection::RegisterRepairer(DB_SQLITE, SqliteConnection::Repair);
78 __attribute__((used))
79 const int32_t SqliteConnection::regDeleter_ = Connection::RegisterDeleter(DB_SQLITE, SqliteConnection::Delete);
80 __attribute__((used))
81 const int32_t SqliteConnection::regCollector_ = Connection::RegisterCollector(DB_SQLITE, SqliteConnection::Collect);
82 __attribute__((used)) const int32_t SqliteConnection::regReplicaChecker_ =
83 Connection::RegisterReplicaChecker(DB_SQLITE, SqliteConnection::CheckReplicaIntegrity);
84 __attribute__((used)) const int32_t SqliteConnection::regDbClientCleaner_ =
85 GlobalResource::RegisterClean(GlobalResource::DB_CLIENT, SqliteConnection::ClientCleanUp);
86 __attribute__((used)) const int32_t SqliteConnection::regOpenSSLCleaner_ =
87 GlobalResource::RegisterClean(GlobalResource::OPEN_SSL, SqliteConnection::OpenSSLCleanUp);
88
Create(const RdbStoreConfig & config,bool isWrite)89 std::pair<int32_t, std::shared_ptr<Connection>> SqliteConnection::Create(const RdbStoreConfig &config, bool isWrite)
90 {
91 std::pair<int32_t, std::shared_ptr<Connection>> result = { E_ERROR, nullptr };
92 auto &[errCode, conn] = result;
93 std::tie(errCode, conn) = InnerCreate(config, isWrite, true);
94 return result;
95 }
96
Delete(const RdbStoreConfig & config)97 int32_t SqliteConnection::Delete(const RdbStoreConfig &config)
98 {
99 auto path = config.GetPath();
100 auto binlogFolder = GetBinlogFolderPath(path);
101 size_t num = SqliteUtils::DeleteFolder(binlogFolder);
102 if (num > 0 && IsSupportBinlog(config)) {
103 LOG_INFO("removed %{public}zu binlog related items", num);
104 }
105 auto slavePath = SqliteUtils::GetSlavePath(path);
106 Delete(slavePath);
107 Delete(path);
108 return E_OK;
109 }
110
Delete(const std::string & path)111 int32_t SqliteConnection::Delete(const std::string &path)
112 {
113 for (const auto &suffix : FILE_SUFFIXES) {
114 SqliteUtils::DeleteFile(path + suffix.suffix_);
115 }
116 return E_OK;
117 }
118
Collect(const RdbStoreConfig & config)119 std::map<std::string, Connection::Info> SqliteConnection::Collect(const RdbStoreConfig &config)
120 {
121 std::map<std::string, Connection::Info> collection;
122 if (config.IsMemoryRdb()) {
123 return collection;
124 }
125 std::string path;
126 SqliteGlobalConfig::GetDbPath(config, path);
127 for (auto &suffix : FILE_SUFFIXES) {
128 if (suffix.debug_ == nullptr) {
129 continue;
130 }
131 auto file = path + suffix.suffix_;
132 std::pair<int32_t, RdbDebugInfo> fileInfo = SqliteUtils::Stat(file);
133 if (fileInfo.first == E_OK) {
134 collection.insert(std::pair{ suffix.debug_, fileInfo.second });
135 }
136 }
137 RdbSecurityManager::KeyFiles keyFiles(path);
138 std::string keyPath = keyFiles.GetKeyFile(RdbSecurityManager::PUB_KEY_FILE);
139 std::pair<int32_t, RdbDebugInfo> fileInfo = SqliteUtils::Stat(keyPath);
140 if (fileInfo.first == E_OK) {
141 collection.insert(std::pair{ "key", fileInfo.second });
142 }
143 std::string newKeyPath = keyFiles.GetKeyFile(RdbSecurityManager::PUB_KEY_FILE_NEW_KEY);
144 fileInfo = SqliteUtils::Stat(newKeyPath);
145 if (fileInfo.first == E_OK) {
146 collection.insert(std::pair{ "newKey", fileInfo.second });
147 }
148 return collection;
149 }
150
SqliteConnection(const RdbStoreConfig & config,bool isWriteConnection,bool isSlave)151 SqliteConnection::SqliteConnection(const RdbStoreConfig &config, bool isWriteConnection, bool isSlave)
152 : dbHandle_(nullptr), isWriter_(isWriteConnection), isReadOnly_(false), isSlave_(isSlave), maxVariableNumber_(0),
153 config_(config)
154 {
155 backupId_ = TaskExecutor::INVALID_TASK_ID;
156 }
157
CreateSlaveConnection(const RdbStoreConfig & config,SlaveOpenPolicy slaveOpenPolicy)158 std::pair<int32_t, std::shared_ptr<SqliteConnection>> SqliteConnection::CreateSlaveConnection(
159 const RdbStoreConfig &config, SlaveOpenPolicy slaveOpenPolicy)
160 {
161 std::pair<int32_t, std::shared_ptr<SqliteConnection>> result = { E_ERROR, nullptr };
162 auto &[errCode, conn] = result;
163 std::map<std::string, DebugInfo> bugInfo = Connection::Collect(config);
164 bool isSlaveExist = access(config.GetPath().c_str(), F_OK) == 0;
165 bool isSlaveLockExist = SqliteUtils::IsSlaveInterrupted(config_.GetPath());
166 bool hasFailure = SqliteUtils::IsSlaveInvalid(config_.GetPath());
167 bool walOverLimit = bugInfo.find(FILE_SUFFIXES[WAL_INDEX].debug_) != bugInfo.end() &&
168 bugInfo[FILE_SUFFIXES[WAL_INDEX].debug_].size_ > SLAVE_WAL_SIZE_LIMIT;
169 LOG_INFO("slave cfg:[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]%{public}s "
170 "%{public}s,[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]",
171 config.GetDBType(), config.GetHaMode(), config.IsEncrypt(), config.GetArea(), config.GetSecurityLevel(),
172 config.GetRoleType(), config.IsReadOnly(),
173 SqliteUtils::FormatDebugInfoBrief(bugInfo, SqliteUtils::Anonymous(config.GetName())).c_str(),
174 SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config_), "master").c_str(), isSlaveExist,
175 isSlaveLockExist, hasFailure, walOverLimit, IsSupportBinlog(config_));
176 if (config.GetHaMode() == HAMode::MANUAL_TRIGGER && (slaveOpenPolicy == SlaveOpenPolicy::OPEN_IF_DB_VALID &&
177 (!isSlaveExist || isSlaveLockExist || hasFailure || walOverLimit))) {
178 if (walOverLimit) {
179 SqliteUtils::SetSlaveInvalid(config_.GetPath());
180 Reportor::ReportCorrupted(Reportor::Create(config, E_SQLITE_ERROR, "ErrorType: slaveWalOverLimit"));
181 }
182 return result;
183 }
184
185 std::shared_ptr<SqliteConnection> connection = std::make_shared<SqliteConnection>(config, true, true);
186 connection->SetIsSupportBinlog(IsSupportBinlog(config_));
187 errCode = connection->InnerOpen(config);
188 if (errCode != E_OK) {
189 SqliteUtils::SetSlaveInvalid(config_.GetPath());
190 if (errCode == E_SQLITE_CORRUPT) {
191 LOG_WARN("slave corrupt, rebuild:%{public}s", SqliteUtils::Anonymous(config.GetPath()).c_str());
192 (void)Delete(config.GetPath());
193 // trigger mode does not require rebuild the slave
194 if (config.GetHaMode() == HAMode::MANUAL_TRIGGER) {
195 return result;
196 }
197 errCode = connection->InnerOpen(config);
198 if (errCode != E_OK) {
199 LOG_ERROR("reopen slave failed:%{public}d", errCode);
200 return result;
201 }
202 } else {
203 LOG_WARN(
204 "open slave failed:%{public}d, %{public}s", errCode, SqliteUtils::Anonymous(config.GetPath()).c_str());
205 return result;
206 }
207 }
208 conn = connection;
209 return result;
210 }
211
GetSlaveRdbStoreConfig(const RdbStoreConfig & rdbConfig)212 RdbStoreConfig SqliteConnection::GetSlaveRdbStoreConfig(const RdbStoreConfig &rdbConfig)
213 {
214 RdbStoreConfig rdbStoreConfig(SqliteUtils::GetSlavePath(rdbConfig.GetPath()));
215 rdbStoreConfig.SetEncryptStatus(rdbConfig.IsEncrypt());
216 rdbStoreConfig.SetSearchable(rdbConfig.IsSearchable());
217 rdbStoreConfig.SetIsVector(rdbConfig.IsVector());
218 rdbStoreConfig.SetAutoClean(rdbConfig.GetAutoClean());
219 rdbStoreConfig.SetSecurityLevel(rdbConfig.GetSecurityLevel());
220 rdbStoreConfig.SetDataGroupId(rdbConfig.GetDataGroupId());
221 rdbStoreConfig.SetName(SqliteUtils::GetSlavePath(rdbConfig.GetName()));
222 rdbStoreConfig.SetCustomDir(rdbConfig.GetCustomDir());
223 rdbStoreConfig.SetAllowRebuild(rdbConfig.GetAllowRebuild());
224 rdbStoreConfig.SetReadOnly(rdbConfig.IsReadOnly());
225 rdbStoreConfig.SetAutoCheck(rdbConfig.IsAutoCheck());
226 rdbStoreConfig.SetCreateNecessary(rdbConfig.IsCreateNecessary());
227 rdbStoreConfig.SetJournalSize(rdbConfig.GetJournalSize());
228 rdbStoreConfig.SetPageSize(rdbConfig.GetPageSize());
229 rdbStoreConfig.SetReadConSize(rdbConfig.GetReadConSize());
230 rdbStoreConfig.SetReadTime(rdbConfig.GetReadTime());
231 rdbStoreConfig.SetDBType(rdbConfig.GetDBType());
232 rdbStoreConfig.SetVisitorDir(rdbConfig.GetVisitorDir());
233 rdbStoreConfig.SetScalarFunctions(rdbConfig.GetScalarFunctions());
234 rdbStoreConfig.SetJournalMode(rdbConfig.GetJournalMode());
235
236 rdbStoreConfig.SetModuleName(rdbConfig.GetModuleName());
237 rdbStoreConfig.SetPluginLibs(rdbConfig.GetPluginLibs());
238 rdbStoreConfig.SetHaMode(rdbConfig.GetHaMode());
239
240 rdbStoreConfig.SetCryptoParam(rdbConfig.GetCryptoParam());
241 return rdbStoreConfig;
242 }
243
InnerOpen(const RdbStoreConfig & config)244 int SqliteConnection::InnerOpen(const RdbStoreConfig &config)
245 {
246 std::string dbPath;
247 auto errCode = SqliteGlobalConfig::GetDbPath(config, dbPath);
248 if (errCode != E_OK) {
249 return errCode;
250 }
251 SetTokenizer(config);
252
253 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
254 bool isDbFileExist = access(dbPath.c_str(), F_OK) == 0;
255 if (!isDbFileExist && (!config.IsCreateNecessary())) {
256 Reportor::ReportFault(RdbFaultDbFileEvent(FT_EX_FILE, E_DB_NOT_EXIST, config, "db not exist"));
257 LOG_ERROR("db not exist errno is %{public}d", errno);
258 return E_DB_NOT_EXIST;
259 }
260 #endif
261 isReadOnly_ = !isWriter_ || config.IsReadOnly();
262 uint32_t openFileFlags = config.IsReadOnly() ? (SQLITE_OPEN_READONLY | SQLITE_OPEN_FULLMUTEX)
263 : (SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
264 if (config.IsMemoryRdb()) {
265 openFileFlags |= SQLITE_OPEN_URI;
266 }
267 errCode = OpenDatabase(dbPath, static_cast<int>(openFileFlags));
268 if (errCode != E_OK) {
269 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, errCode, config, "", true));
270 return errCode;
271 }
272
273 maxVariableNumber_ = sqlite3_limit(dbHandle_, SQLITE_LIMIT_VARIABLE_NUMBER, -1);
274
275 errCode = Configure(config, dbPath);
276 isConfigured_ = true;
277 if (errCode != E_OK) {
278 return errCode;
279 }
280
281 if (isWriter_) {
282 ValueObject checkResult{"ok"};
283 auto index = static_cast<uint32_t>(config.GetIntegrityCheck());
284 if (index < static_cast<uint32_t>(sizeof(INTEGRITIES) / sizeof(INTEGRITIES[0]))) {
285 auto sql = INTEGRITIES[index];
286 if (sql != nullptr) {
287 LOG_INFO("%{public}s : %{public}s, ", sql, SqliteUtils::Anonymous(config.GetName()).c_str());
288 std::tie(errCode, checkResult) = ExecuteForValue(sql);
289 }
290 if (errCode == E_OK && static_cast<std::string>(checkResult) != "ok") {
291 LOG_ERROR("%{public}s integrity check result is %{public}s, sql:%{public}s",
292 SqliteUtils::Anonymous(config.GetName()).c_str(), static_cast<std::string>(checkResult).c_str(),
293 SqliteUtils::SqlAnonymous(sql).c_str());
294 Reportor::ReportCorruptedOnce(Reportor::Create(config, errCode, static_cast<std::string>(checkResult)));
295 }
296 }
297 }
298 return E_OK;
299 }
300
OpenDatabase(const std::string & dbPath,int openFileFlags)301 int32_t SqliteConnection::OpenDatabase(const std::string &dbPath, int openFileFlags)
302 {
303 const char *option = isSlave_ && isSupportBinlog_ ? "compressvfs" : nullptr;
304 int errCode = sqlite3_open_v2(dbPath.c_str(), &dbHandle_, openFileFlags, option);
305 if (errCode != SQLITE_OK) {
306 LOG_ERROR("fail to open database errCode=%{public}d, dbPath=%{public}s, flags=%{public}d, errno=%{public}d",
307 errCode, SqliteUtils::Anonymous(dbPath).c_str(), openFileFlags, errno);
308 if (isSlave_ && errCode == SQLITE_WARNING &&
309 sqlite3_extended_errcode(dbHandle_) == SQLITE_WARNING_NOTCOMPRESSDB) {
310 LOG_WARN("slave db is not using compress");
311 return E_SQLITE_CORRUPT;
312 }
313 if (errCode == SQLITE_CANTOPEN) {
314 std::pair<int32_t, RdbDebugInfo> fileInfo = SqliteUtils::Stat(dbPath);
315 if (fileInfo.first != E_OK) {
316 LOG_ERROR("The stat error, errno=%{public}d, parent dir modes: %{public}s", errno,
317 SqliteUtils::GetParentModes(dbPath).c_str());
318 }
319 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, E_SQLITE_CANTOPEN, config_,
320 "failed to openDB errno[ " + std::to_string(errno) + "]," +
321 SqliteUtils::GetFileStatInfo(fileInfo.second) +
322 "parent dir modes:" + SqliteUtils::GetParentModes(dbPath),
323 true));
324 }
325 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
326 auto const pos = dbPath.find_last_of("\\/");
327 if (pos != std::string::npos) {
328 std::string filepath = dbPath.substr(0, pos);
329 if (access(filepath.c_str(), F_OK | W_OK) != 0) {
330 LOG_ERROR("The path to the database file to be created is not valid, err = %{public}d", errno);
331 return E_INVALID_FILE_PATH;
332 }
333 }
334 #endif
335 if (errCode == SQLITE_NOTADB) {
336 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, E_SQLITE_NOT_DB, config_, "", true));
337 }
338 return SQLiteError::ErrNo(errCode);
339 }
340 return E_OK;
341 }
342
SetCustomFunctions(const RdbStoreConfig & config)343 int SqliteConnection::SetCustomFunctions(const RdbStoreConfig &config)
344 {
345 customScalarFunctions_ = config.GetScalarFunctions();
346 for (auto &it : customScalarFunctions_) {
347 int errCode = SetCustomScalarFunction(it.first, it.second.argc_, &it.second.function_);
348 if (errCode != E_OK) {
349 return errCode;
350 }
351 }
352 return E_OK;
353 }
354
CustomScalarFunctionCallback(sqlite3_context * ctx,int argc,sqlite3_value ** argv)355 static void CustomScalarFunctionCallback(sqlite3_context *ctx, int argc, sqlite3_value **argv)
356 {
357 if (ctx == nullptr || argv == nullptr) {
358 LOG_ERROR("ctx or argv is nullptr.");
359 return;
360 }
361 auto function = static_cast<ScalarFunction *>(sqlite3_user_data(ctx));
362 if (function == nullptr) {
363 LOG_ERROR("function is nullptr.");
364 return;
365 }
366
367 std::vector<std::string> argsVector;
368 for (int i = 0; i < argc; ++i) {
369 auto arg = reinterpret_cast<const char *>(sqlite3_value_text(argv[i]));
370 if (arg == nullptr) {
371 LOG_ERROR("arg is nullptr, index is %{public}d, errno is %{public}d", i, errno);
372 sqlite3_result_null(ctx);
373 return;
374 }
375 argsVector.emplace_back(std::string(arg));
376 }
377
378 std::string result = (*function)(argsVector);
379 if (result.empty()) {
380 sqlite3_result_null(ctx);
381 return;
382 }
383 sqlite3_result_text(ctx, result.c_str(), -1, SQLITE_TRANSIENT);
384 }
385
SetCustomScalarFunction(const std::string & functionName,int argc,ScalarFunction * function)386 int SqliteConnection::SetCustomScalarFunction(const std::string &functionName, int argc, ScalarFunction *function)
387 {
388 int err = sqlite3_create_function_v2(dbHandle_, functionName.c_str(), argc, SQLITE_UTF8, function,
389 &CustomScalarFunctionCallback, nullptr, nullptr, nullptr);
390 if (err != SQLITE_OK) {
391 LOG_ERROR("SetCustomScalarFunction errCode is %{public}d, errno is %{public}d.", err, errno);
392 }
393 return err;
394 }
395
Configure(const RdbStoreConfig & config,std::string & dbPath)396 int SqliteConnection::Configure(const RdbStoreConfig &config, std::string &dbPath)
397 {
398 Suspender suspender(Suspender::SQL_STATISTIC);
399 // there is a read-only dependency
400 if (!config.GetCollatorLocales().empty()) {
401 ConfigLocale(config.GetCollatorLocales());
402 }
403
404 if (config.GetRoleType() == VISITOR) {
405 return E_OK;
406 }
407
408 auto errCode = RegDefaultFunctions(dbHandle_);
409 if (errCode != E_OK) {
410 return errCode;
411 }
412
413 SetBusyTimeout(DEFAULT_BUSY_TIMEOUT_MS);
414
415 LimitPermission(config, dbPath);
416
417 SetDwrEnable(config);
418 errCode = SetPersistWal(config);
419 if (errCode != E_OK) {
420 return errCode;
421 }
422
423 errCode = SetPageSize(config);
424 if (errCode != E_OK) {
425 return errCode;
426 }
427
428 errCode = SetEncrypt(config);
429 if (errCode != E_OK) {
430 return errCode;
431 }
432
433 errCode = SetJournalMode(config);
434 if (errCode != E_OK) {
435 return errCode;
436 }
437
438 // set the user version to the wal file;
439 SetWalFile(config);
440
441 errCode = SetAutoCheckpoint(config);
442 if (errCode != E_OK) {
443 return errCode;
444 }
445
446 errCode = SetCustomFunctions(config);
447 if (errCode != E_OK) {
448 return errCode;
449 }
450 RegisterHookIfNecessary();
451 return LoadExtension(config, dbHandle_);
452 }
453
~SqliteConnection()454 SqliteConnection::~SqliteConnection()
455 {
456 if (backupId_ != TaskExecutor::INVALID_TASK_ID) {
457 auto pool = TaskExecutor::GetInstance().GetExecutor();
458 if (pool != nullptr) {
459 pool->Remove(backupId_, true);
460 }
461 }
462 if (dbHandle_ != nullptr) {
463 int errCode = sqlite3_close_v2(dbHandle_);
464 if (errCode != SQLITE_OK) {
465 LOG_ERROR("could not close database err = %{public}d, errno = %{public}d", errCode, errno);
466 }
467 }
468 }
469
VerifyAndRegisterHook(const RdbStoreConfig & config)470 int32_t SqliteConnection::VerifyAndRegisterHook(const RdbStoreConfig &config)
471 {
472 if (!isWriter_ || config_.IsEqualRegisterInfo(config)) {
473 return E_OK;
474 }
475 for (auto &eventInfo : onEventHandlers_) {
476 if (config.GetRegisterInfo(eventInfo.Type) && !config_.GetRegisterInfo(eventInfo.Type)) {
477 config_.SetRegisterInfo(eventInfo.Type, true);
478 (this->*(eventInfo.handle))();
479 }
480 }
481 return E_OK;
482 }
483
RegisterHookIfNecessary()484 int32_t SqliteConnection::RegisterHookIfNecessary()
485 {
486 if (!isWriter_) {
487 return E_OK;
488 }
489 for (auto &eventInfo : onEventHandlers_) {
490 if (config_.GetRegisterInfo(eventInfo.Type)) {
491 (this->*(eventInfo.handle))();
492 }
493 }
494 return E_OK;
495 }
496
RegisterStoreObs()497 int SqliteConnection::RegisterStoreObs()
498 {
499 RegisterDbHook(dbHandle_);
500 auto status = CreateDataChangeTempTrigger(dbHandle_);
501 if (status != E_OK) {
502 LOG_ERROR("CreateDataChangeTempTrigger failed. status %{public}d", status);
503 return status;
504 }
505 return E_OK;
506 }
507
RegisterClientObs()508 int SqliteConnection::RegisterClientObs()
509 {
510 RegisterDbHook(dbHandle_);
511 return E_OK;
512 }
513
CheckReplicaIntegrity(const RdbStoreConfig & config)514 int32_t SqliteConnection::CheckReplicaIntegrity(const RdbStoreConfig &config)
515 {
516 std::shared_ptr<SqliteConnection> connection = std::make_shared<SqliteConnection>(config, true);
517 if (connection == nullptr) {
518 return E_ERROR;
519 }
520 RdbStoreConfig rdbSlaveStoreConfig = connection->GetSlaveRdbStoreConfig(config);
521 if (access(rdbSlaveStoreConfig.GetPath().c_str(), F_OK) != 0) {
522 return E_NOT_SUPPORT;
523 }
524 auto [ret, conn] = connection->CreateSlaveConnection(rdbSlaveStoreConfig, SlaveOpenPolicy::FORCE_OPEN);
525 if (ret != E_OK) {
526 return ret;
527 }
528 connection->slaveConnection_ = conn;
529 if (IsSupportBinlog(config)) {
530 SqliteConnection::ReplayBinlog(config.GetPath(), conn, false);
531 }
532 return connection->VerifySlaveIntegrity();
533 }
534
CheckReplicaForRestore()535 int SqliteConnection::CheckReplicaForRestore()
536 {
537 return ExchangeVerify(true);
538 }
539
CreateStatement(const std::string & sql,std::shared_ptr<Connection> conn)540 std::pair<int, std::shared_ptr<Statement>> SqliteConnection::CreateStatement(const std::string &sql,
541 std::shared_ptr<Connection> conn)
542 {
543 return CreateStatementInner(sql, conn, dbHandle_, false);
544 }
545
CreateReplicaStatement(const std::string & sql,std::shared_ptr<Connection> conn)546 std::pair<int, std::shared_ptr<Statement>> SqliteConnection::CreateReplicaStatement(const std::string &sql,
547 std::shared_ptr<Connection> conn)
548 {
549 sqlite3 *db = dbHandle_;
550 RdbStoreConfig rdbSlaveStoreConfig = GetSlaveRdbStoreConfig(config_);
551 if (slaveConnection_ == nullptr && access(rdbSlaveStoreConfig.GetPath().c_str(), F_OK) == 0) {
552 auto [errCode, slaveConn] = CreateSlaveConnection(rdbSlaveStoreConfig, SlaveOpenPolicy::FORCE_OPEN);
553 if (errCode == E_OK) {
554 slaveConnection_ = slaveConn;
555 db = slaveConnection_->dbHandle_;
556 }
557 LOG_INFO("create slave conn ret=%{public}d, %{public}d", errCode, IsWriter());
558 }
559 return CreateStatementInner(sql, conn, db, true);
560 }
561
CreateStatementInner(const std::string & sql,std::shared_ptr<Connection> conn,sqlite3 * db,bool isFromReplica)562 std::pair<int, std::shared_ptr<Statement>> SqliteConnection::CreateStatementInner(const std::string &sql,
563 std::shared_ptr<Connection> conn, sqlite3 *db, bool isFromReplica)
564 {
565 std::shared_ptr<SqliteStatement> statement = std::make_shared<SqliteStatement>(&config_);
566 // When memory is not cleared, quick_check reads memory pages and detects damage but does not report it
567 if (sql == INTEGRITIES[1] && db != nullptr && mode_ == JournalMode::MODE_WAL) {
568 sqlite3_db_release_memory(db);
569 }
570 int errCode = statement->Prepare(db, sql);
571 if (errCode != E_OK) {
572 return { errCode, nullptr };
573 }
574 statement->conn_ = conn;
575 if (!isFromReplica && slaveConnection_ && IsWriter() && !IsSupportBinlog(config_) &&
576 !SqliteUtils::IsSlaveRestoring(config_.GetPath())) {
577 auto slaveStmt = std::make_shared<SqliteStatement>();
578 if (sql == INTEGRITIES[1] && dbHandle_ != nullptr && mode_ == JournalMode::MODE_WAL) {
579 sqlite3_db_release_memory(dbHandle_);
580 }
581 slaveStmt->config_ = &slaveConnection_->config_;
582 slaveStmt->conn_ = slaveConnection_;
583 errCode = slaveStmt->Prepare(slaveConnection_->dbHandle_, sql);
584 if (errCode != E_OK) {
585 LOG_WARN("prepare slave stmt failed:%{public}d, app self can check the SQL", errCode);
586 SqliteUtils::SetSlaveInvalid(config_.GetPath());
587 return { E_OK, statement };
588 }
589 statement->slave_ = slaveStmt;
590 }
591 return { E_OK, statement };
592 }
593
IsWriter() const594 bool SqliteConnection::IsWriter() const
595 {
596 return isWriter_;
597 }
598
SubscribeTableChanges(const Connection::Notifier & notifier)599 int SqliteConnection::SubscribeTableChanges(const Connection::Notifier ¬ifier)
600 {
601 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
602 if (!isWriter_ || notifier == nullptr) {
603 return E_OK;
604 }
605
606 int32_t status = RegisterClientObserver(dbHandle_, [notifier](const ClientChangedData &clientData) {
607 DistributedRdb::RdbChangedData rdbChangedData;
608 for (auto &[key, val] : clientData.tableData) {
609 if (val.isTrackedDataChange || val.isP2pSyncDataChange || val.isKnowledgeDataChange) {
610 rdbChangedData.tableData[key].isTrackedDataChange = val.isTrackedDataChange;
611 rdbChangedData.tableData[key].isP2pSyncDataChange = val.isP2pSyncDataChange;
612 rdbChangedData.tableData[key].isKnowledgeDataChange = val.isKnowledgeDataChange;
613 }
614 }
615 notifier(rdbChangedData);
616 });
617 if (status != E_OK) {
618 LOG_ERROR("RegisterClientObserver error, status:%{public}d", status);
619 }
620 RegisterDbHook(dbHandle_);
621 config_.SetRegisterInfo(RegisterType::CLIENT_OBSERVER, true);
622 return status;
623 #endif
624 return E_OK;
625 }
626
GetMaxVariable() const627 int SqliteConnection::GetMaxVariable() const
628 {
629 return maxVariableNumber_;
630 }
631
GetJournalMode()632 int32_t SqliteConnection::GetJournalMode()
633 {
634 return (int32_t)mode_;
635 }
636
GetDBType() const637 int32_t SqliteConnection::GetDBType() const
638 {
639 return DB_SQLITE;
640 }
641
SetPageSize(const RdbStoreConfig & config)642 int SqliteConnection::SetPageSize(const RdbStoreConfig &config)
643 {
644 if (isReadOnly_ || config.GetPageSize() == GlobalExpr::DB_PAGE_SIZE) {
645 return E_OK;
646 }
647
648 int targetValue = config.GetPageSize();
649 Suspender suspender(Suspender::SQL_LOG);
650 auto [errCode, object] = ExecuteForValue("PRAGMA page_size");
651 if (errCode != E_OK) {
652 LOG_ERROR("SetPageSize fail to get page size : %{public}d", errCode);
653 return errCode;
654 }
655
656 if (static_cast<int64_t>(object) == targetValue) {
657 return E_OK;
658 }
659
660 errCode = ExecuteSql("PRAGMA page_size=" + std::to_string(targetValue));
661 if (errCode != E_OK) {
662 LOG_ERROR("SetPageSize fail to set page size : %{public}d", errCode);
663 }
664 return errCode;
665 }
666
SetEncryptAgo(const RdbStoreConfig & config)667 int SqliteConnection::SetEncryptAgo(const RdbStoreConfig &config)
668 {
669 return SetEncryptAgo(config.GetCryptoParam());
670 }
671
SetEncryptAgo(const RdbStoreConfig::CryptoParam & cryptoParam)672 int SqliteConnection::SetEncryptAgo(const RdbStoreConfig::CryptoParam &cryptoParam)
673 {
674 Suspender suspender(Suspender::SQL_LOG);
675 if (!cryptoParam.IsValid()) {
676 LOG_ERROR("Invalid crypto param: %{public}d, %{public}d, %{public}d, %{public}d, %{public}u",
677 cryptoParam.iterNum, cryptoParam.encryptAlgo, cryptoParam.hmacAlgo, cryptoParam.kdfAlgo,
678 cryptoParam.cryptoPageSize);
679 return E_INVALID_ARGS;
680 }
681
682 if (cryptoParam.iterNum != NO_ITER) {
683 auto errCode =
684 ExecuteSql(std::string(GlobalExpr::CIPHER_ALGO_PREFIX) +
685 SqliteUtils::EncryptAlgoDescription(static_cast<EncryptAlgo>(cryptoParam.encryptAlgo)) +
686 std::string(GlobalExpr::ALGO_SUFFIX));
687 if (errCode != E_OK) {
688 LOG_ERROR("set cipher algo failed, err = %{public}d", errCode);
689 return errCode;
690 }
691
692 errCode = ExecuteSql(std::string(GlobalExpr::CIPHER_KDF_ITER) + std::to_string(cryptoParam.iterNum));
693 if (errCode != E_OK) {
694 LOG_ERROR("set kdf iter number V1 failed, err = %{public}d", errCode);
695 return errCode;
696 }
697 }
698
699 auto errCode = ExecuteSql(std::string(GlobalExpr::CODEC_HMAC_ALGO_PREFIX) +
700 SqliteUtils::HmacAlgoDescription(cryptoParam.hmacAlgo) +
701 std::string(GlobalExpr::ALGO_SUFFIX));
702 if (errCode != E_OK) {
703 LOG_ERROR("set codec hmac algo failed, err = %{public}d", errCode);
704 return errCode;
705 }
706
707 errCode = ExecuteSql(std::string(GlobalExpr::CODEC_KDF_ALGO_PREFIX) +
708 SqliteUtils::KdfAlgoDescription(cryptoParam.kdfAlgo) +
709 std::string(GlobalExpr::ALGO_SUFFIX));
710 if (errCode != E_OK) {
711 LOG_ERROR("set codec kdf algo failed, err = %{public}d", errCode);
712 return errCode;
713 }
714
715 errCode = ExecuteSql(
716 std::string(GlobalExpr::CODEC_PAGE_SIZE_PREFIX) + std::to_string(cryptoParam.cryptoPageSize));
717 if (errCode != E_OK) {
718 LOG_ERROR("set codec page size failed, err = %{public}d", errCode);
719 return errCode;
720 }
721
722 errCode = ExecuteSql(GlobalExpr::CODEC_REKEY_HMAC_ALGO);
723 if (errCode != E_OK) {
724 LOG_ERROR("set rekey sha algo failed, err = %{public}d", errCode);
725 return errCode;
726 }
727 return E_OK;
728 }
729
ResetKey(const RdbStoreConfig & config)730 int SqliteConnection::ResetKey(const RdbStoreConfig &config)
731 {
732 if (!IsWriter()) {
733 return E_OK;
734 }
735 LOG_INFO(
736 "name = %{public}s, iter = %{public}d", SqliteUtils::Anonymous(config.GetName()).c_str(), config.GetIter());
737 std::vector<uint8_t> newKey = config.GetNewEncryptKey();
738 int errCode = sqlite3_rekey(dbHandle_, static_cast<const void *>(newKey.data()), static_cast<int>(newKey.size()));
739 newKey.assign(newKey.size(), 0);
740 if (errCode != SQLITE_OK) {
741 LOG_ERROR("ReKey failed, err = %{public}d, errno = %{public}d", errCode, errno);
742 RdbSecurityManager::GetInstance().DelKeyFile(config.GetPath(), RdbKeyFile::PUB_KEY_FILE_NEW_KEY);
743 return E_OK;
744 }
745 config.ChangeEncryptKey();
746 return E_OK;
747 }
748
Rekey(const RdbStoreConfig::CryptoParam & cryptoParam)749 int SqliteConnection::Rekey(const RdbStoreConfig::CryptoParam &cryptoParam)
750 {
751 std::vector<uint8_t> key;
752 RdbPassword rdbPwd;
753 int errCode = E_OK;
754 if (cryptoParam.encryptKey_.empty()) {
755 rdbPwd = RdbSecurityManager::GetInstance().GetRdbPassword(
756 config_.GetPath(), RdbSecurityManager::PUB_KEY_FILE_NEW_KEY);
757 key = std::vector<uint8_t>(rdbPwd.GetData(), rdbPwd.GetData() + rdbPwd.GetSize());
758 } else {
759 key = cryptoParam.encryptKey_;
760 }
761 if (key.empty()) {
762 LOG_ERROR("key is empty");
763 return E_ERROR;
764 }
765 errCode = sqlite3_rekey(dbHandle_, static_cast<const void *>(key.data()), static_cast<int>(key.size()));
766 if (errCode != SQLITE_OK) {
767 key.assign(key.size(), 0);
768 LOG_ERROR("ReKey failed, err = %{public}d, name = %{public}s", errCode,
769 SqliteUtils::Anonymous(config_.GetName()).c_str());
770 return SQLiteError::ErrNo(errCode);
771 }
772 errCode = SetEncryptAgo(cryptoParam);
773 if (errCode != E_OK) {
774 key.assign(key.size(), 0);
775 LOG_ERROR("ReKey failed, err = %{public}d, name = %{public}s", errCode,
776 SqliteUtils::Anonymous(config_.GetName()).c_str());
777 return errCode;
778 }
779 if (cryptoParam.encryptKey_.empty()) {
780 RdbSecurityManager::GetInstance().ChangeKeyFile(config_.GetPath());
781 }
782 key.assign(key.size(), 0);
783 return E_OK;
784 }
785
SetDwrEnable(const RdbStoreConfig & config)786 void SqliteConnection::SetDwrEnable(const RdbStoreConfig &config)
787 {
788 if (config.IsEncrypt() || config.IsMemoryRdb()) {
789 return;
790 }
791 auto errCode = ExecuteSql(GlobalExpr::PRAGMA_META_DOUBLE_WRITE);
792 if (errCode == E_SQLITE_META_RECOVERED) {
793 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, errCode, config, "", true));
794 } else if (errCode != E_OK) {
795 LOG_ERROR("meta double failed %{public}d", errCode);
796 }
797 }
798
SetEncrypt(const RdbStoreConfig & config)799 int SqliteConnection::SetEncrypt(const RdbStoreConfig &config)
800 {
801 if (!config.IsEncrypt()) {
802 return E_OK;
803 }
804 if (config.IsMemoryRdb()) {
805 return E_NOT_SUPPORT;
806 }
807
808 std::vector<uint8_t> key = config.GetEncryptKey();
809 std::vector<uint8_t> newKey = config.GetNewEncryptKey();
810 auto errCode = SetEncryptKey(key, config);
811 key.assign(key.size(), 0);
812 if (errCode != E_OK) {
813 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, E_SET_ENCRYPT_FAIL, config, "LOG:SetEncryptKey errcode=" +
814 std::to_string(errCode) + ",iter=" + std::to_string(config.GetIter()), true));
815 if (!newKey.empty()) {
816 LOG_INFO("use new key, iter=%{public}d err=%{public}d errno=%{public}d name=%{public}s", config.GetIter(),
817 errCode, errno, SqliteUtils::Anonymous(config.GetName()).c_str());
818 errCode = SetEncryptKey(newKey, config);
819 if (errCode != E_OK) {
820 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, E_SET_NEW_ENCRYPT_FAIL, config,
821 "LOG:new key SetEncryptKey errcode= "+ std::to_string(errCode) +
822 ",iter=" + std::to_string(config.GetIter()), true));
823 }
824 }
825 newKey.assign(newKey.size(), 0);
826 if (errCode != E_OK) {
827 errCode = SetServiceKey(config, errCode);
828 LOG_ERROR("fail, iter=%{public}d err=%{public}d errno=%{public}d name=%{public}s", config.GetIter(),
829 errCode, errno, SqliteUtils::Anonymous(config.GetName()).c_str());
830 if (errCode != E_OK) {
831 bool sameKey = (key == config.GetEncryptKey()) || (newKey == config.GetEncryptKey());
832 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, E_SET_SERVICE_ENCRYPT_FAIL, config,
833 "LOG:service key SetEncryptKey errcode=" + std::to_string(errCode) +
834 ",iter=" + std::to_string(config.GetIter()) + ",samekey=" + std::to_string(sameKey), true));
835 }
836 return errCode;
837 }
838 config.ChangeEncryptKey();
839 newKey = {};
840 }
841
842 if (!newKey.empty()) {
843 ResetKey(config);
844 }
845 newKey.assign(newKey.size(), 0);
846 return E_OK;
847 }
848
SetEncryptKey(const std::vector<uint8_t> & key,const RdbStoreConfig & config)849 int SqliteConnection::SetEncryptKey(const std::vector<uint8_t> &key, const RdbStoreConfig &config)
850 {
851 if (key.empty()) {
852 return E_INVALID_SECRET_KEY;
853 }
854
855 auto errCode = sqlite3_key(dbHandle_, static_cast<const void *>(key.data()), static_cast<int>(key.size()));
856 if (errCode != SQLITE_OK) {
857 return SQLiteError::ErrNo(errCode);
858 }
859 Suspender suspender(Suspender::SQL_LOG);
860 errCode = SetEncryptAgo(config);
861 if (errCode != E_OK) {
862 return errCode;
863 }
864
865 if (IsWriter() || config.IsReadOnly()) {
866 ValueObject version;
867 std::tie(errCode, version) = ExecuteForValue(GlobalExpr::PRAGMA_VERSION);
868 if (errCode != E_OK || version.GetType() == ValueObject::TYPE_NULL) {
869 return errCode;
870 }
871 return E_OK;
872 }
873 return errCode;
874 }
875
SetPersistWal(const RdbStoreConfig & config)876 int SqliteConnection::SetPersistWal(const RdbStoreConfig &config)
877 {
878 if (config.IsMemoryRdb()) {
879 return E_OK;
880 }
881 int opcode = 1;
882 int errCode = sqlite3_file_control(dbHandle_, "main", SQLITE_FCNTL_PERSIST_WAL, &opcode);
883 if (errCode != SQLITE_OK) {
884 LOG_ERROR("failed.");
885 return E_SET_PERSIST_WAL;
886 }
887 return E_OK;
888 }
889
SetBusyTimeout(int timeout)890 int SqliteConnection::SetBusyTimeout(int timeout)
891 {
892 auto errCode = sqlite3_busy_timeout(dbHandle_, timeout);
893 if (errCode != SQLITE_OK) {
894 LOG_ERROR("set buys timeout failed, errCode=%{public}d, errno=%{public}d", errCode, errno);
895 return errCode;
896 }
897 return E_OK;
898 }
899
RegDefaultFunctions(sqlite3 * dbHandle)900 int SqliteConnection::RegDefaultFunctions(sqlite3 *dbHandle)
901 {
902 if (dbHandle == nullptr) {
903 return SQLITE_OK;
904 }
905
906 auto [funcs, funcCount] = SqliteFunctionRegistry::GetFunctions();
907
908 for (size_t i = 0; i < funcCount; i++) {
909 const SqliteFunction& func = funcs[i];
910 int errCode = sqlite3_create_function_v2(dbHandle, func.name, func.numArgs,
911 SQLITE_UTF8 | SQLITE_DETERMINISTIC, nullptr, func.function, nullptr, nullptr, nullptr);
912 if (errCode != SQLITE_OK) {
913 LOG_ERROR("register function %{public}s failed, errCode=0x%{public}x, errno=%{public}d", func.name,
914 errCode, errno);
915 return SQLiteError::ErrNo(errCode);
916 }
917 }
918 return E_OK;
919 }
920
SetJournalMode(const RdbStoreConfig & config)921 int SqliteConnection::SetJournalMode(const RdbStoreConfig &config)
922 {
923 if (isReadOnly_ || config.IsMemoryRdb()) {
924 return E_OK;
925 }
926
927 auto [errCode, object] = ExecuteForValue("PRAGMA journal_mode");
928 if (errCode != E_OK) {
929 LOG_ERROR("SetJournalMode fail to get journal mode : %{public}d, errno %{public}d", errCode, errno);
930 Reportor::ReportFault(RdbFaultEvent(FT_OPEN, E_DFX_GET_JOURNAL_FAIL, config_.GetBundleName(),
931 "PRAGMA journal_mode get fail: " + std::to_string(errCode) + "," + std::to_string(errno)));
932 // errno: 28 No space left on device
933 return (errCode == E_SQLITE_IOERR && sqlite3_system_errno(dbHandle_) == 28) ? E_SQLITE_IOERR_FULL : errCode;
934 }
935
936 if (config.GetJournalMode().compare(static_cast<std::string>(object)) == 0) {
937 return E_OK;
938 }
939
940 std::string currentMode = SqliteUtils::StrToUpper(static_cast<std::string>(object));
941 if (currentMode != config.GetJournalMode()) {
942 auto [errorCode, journalMode] = ExecuteForValue("PRAGMA journal_mode=" + config.GetJournalMode());
943 if (errorCode != E_OK) {
944 LOG_ERROR("SqliteConnection SetJournalMode: fail to set journal mode err=%{public}d", errorCode);
945 Reportor::ReportFault(RdbFaultEvent(FT_OPEN, E_DFX_SET_JOURNAL_FAIL, config_.GetBundleName(),
946 "PRAGMA journal_mode set fail: " + std::to_string(errCode) + "," + std::to_string(errno) + "," +
947 config.GetJournalMode()));
948 return errorCode;
949 }
950
951 if (SqliteUtils::StrToUpper(static_cast<std::string>(journalMode)) != config.GetJournalMode()) {
952 LOG_ERROR("SqliteConnection SetJournalMode: result incorrect.");
953 return E_EXECUTE_RESULT_INCORRECT;
954 }
955 }
956
957 if (config.GetJournalMode() == "WAL") {
958 errCode = SetWalSyncMode(config.GetSyncMode());
959 }
960 if (config.GetJournalMode() == "TRUNCATE") {
961 mode_ = JournalMode::MODE_TRUNCATE;
962 }
963 return errCode;
964 }
965
SetAutoCheckpoint(const RdbStoreConfig & config)966 int SqliteConnection::SetAutoCheckpoint(const RdbStoreConfig &config)
967 {
968 if (isReadOnly_ || config.IsMemoryRdb()) {
969 return E_OK;
970 }
971
972 int targetValue = SqliteGlobalConfig::GetWalAutoCheckpoint();
973 auto [errCode, value] = ExecuteForValue("PRAGMA wal_autocheckpoint");
974 if (errCode != E_OK) {
975 LOG_ERROR("SqliteConnection SetAutoCheckpoint fail to get wal_autocheckpoint : %{public}d", errCode);
976 return errCode;
977 }
978
979 if (static_cast<int64_t>(value) == targetValue) {
980 return E_OK;
981 }
982
983 std::tie(errCode, value) = ExecuteForValue("PRAGMA wal_autocheckpoint=" + std::to_string(targetValue));
984 if (errCode != E_OK) {
985 LOG_ERROR("SqliteConnection SetAutoCheckpoint fail to set wal_autocheckpoint : %{public}d", errCode);
986 }
987 return errCode;
988 }
989
SetTokenizer(const RdbStoreConfig & config)990 int SqliteConnection::SetTokenizer(const RdbStoreConfig &config)
991 {
992 auto tokenizer = config.GetTokenizer();
993 if (tokenizer == NONE_TOKENIZER || tokenizer == CUSTOM_TOKENIZER) {
994 return E_OK;
995 }
996 if (tokenizer == ICU_TOKENIZER) {
997 sqlite3_config(SQLITE_CONFIG_ENABLE_ICU, 1);
998 return E_OK;
999 }
1000 LOG_ERROR("fail to set Tokenizer: %{public}d", tokenizer);
1001 return E_INVALID_ARGS;
1002 }
1003
SetWalFile(const RdbStoreConfig & config)1004 int SqliteConnection::SetWalFile(const RdbStoreConfig &config)
1005 {
1006 if (!IsWriter()) {
1007 return E_OK;
1008 }
1009 Suspender suspender(Suspender::SQL_LOG);
1010 auto [errCode, version] = ExecuteForValue(GlobalExpr::PRAGMA_VERSION);
1011 if (errCode != E_OK) {
1012 return errCode;
1013 }
1014 return ExecuteSql(std::string(GlobalExpr::PRAGMA_VERSION) + "=?", { std::move(version) });
1015 }
1016
SetWalSyncMode(const std::string & syncMode)1017 int SqliteConnection::SetWalSyncMode(const std::string &syncMode)
1018 {
1019 std::string targetValue = SqliteGlobalConfig::GetSyncMode();
1020 if (syncMode.length() != 0) {
1021 targetValue = syncMode;
1022 }
1023 Suspender suspender(Suspender::SQL_LOG);
1024 auto [errCode, object] = ExecuteForValue("PRAGMA synchronous");
1025 if (errCode != E_OK) {
1026 LOG_ERROR("get wal sync mode fail, errCode:%{public}d", errCode);
1027 return errCode;
1028 }
1029
1030 std::string walSyncMode = SqliteUtils::StrToUpper(static_cast<std::string>(object));
1031 if (walSyncMode == targetValue) {
1032 return E_OK;
1033 }
1034
1035 errCode = ExecuteSql("PRAGMA synchronous=" + targetValue);
1036 if (errCode != E_OK) {
1037 LOG_ERROR("set wal sync mode fail, errCode:%{public}d", errCode);
1038 }
1039 return errCode;
1040 }
1041
ExecuteSql(const std::string & sql,const std::vector<ValueObject> & bindArgs)1042 int SqliteConnection::ExecuteSql(const std::string &sql, const std::vector<ValueObject> &bindArgs)
1043 {
1044 auto [errCode, statement] = CreateStatement(sql, nullptr);
1045 if (statement == nullptr || errCode != E_OK) {
1046 return errCode;
1047 }
1048 return statement->Execute(bindArgs);
1049 }
1050
ExecuteForValue(const std::string & sql,const std::vector<ValueObject> & bindArgs)1051 std::pair<int32_t, ValueObject> SqliteConnection::ExecuteForValue(
1052 const std::string &sql, const std::vector<ValueObject> &bindArgs)
1053 {
1054 auto [errCode, statement] = CreateStatement(sql, nullptr);
1055 if (statement == nullptr || errCode != E_OK) {
1056 return { static_cast<int32_t>(errCode), ValueObject() };
1057 }
1058
1059 ValueObject object;
1060 std::tie(errCode, object) = statement->ExecuteForValue(bindArgs);
1061 if (errCode != E_OK) {
1062 LOG_ERROR("execute sql failed, errCode:%{public}d, app self can check the SQL, args size:%{public}zu",
1063 SQLiteError::ErrNo(errCode), bindArgs.size());
1064 }
1065 return { errCode, object };
1066 }
1067
ClearCache(bool isForceClear)1068 int SqliteConnection::ClearCache(bool isForceClear)
1069 {
1070 if (dbHandle_ != nullptr && mode_ == JournalMode::MODE_WAL) {
1071 auto getUsedBytes = [dbHandle = dbHandle_]() -> int {
1072 int usedBytes = 0;
1073 int nEntry = 0;
1074 sqlite3_db_status(dbHandle, SQLITE_DBSTATUS_CACHE_USED, &usedBytes, &nEntry, 0);
1075 return usedBytes;
1076 };
1077 if (isForceClear || getUsedBytes() > config_.GetClearMemorySize()) {
1078 sqlite3_db_release_memory(dbHandle_);
1079 }
1080 }
1081 if (slaveConnection_) {
1082 int errCode = slaveConnection_->ClearCache(isForceClear);
1083 if (errCode != E_OK) {
1084 LOG_ERROR("slaveConnection clearCache failed:%{public}d", errCode);
1085 }
1086 }
1087 return E_OK;
1088 }
1089
LimitPermission(const RdbStoreConfig & config,const std::string & dbPath) const1090 void SqliteConnection::LimitPermission(const RdbStoreConfig &config, const std::string &dbPath) const
1091 {
1092 if (config.IsMemoryRdb()) {
1093 return;
1094 }
1095 struct stat st = { 0 };
1096 if (stat(dbPath.c_str(), &st) == 0) {
1097 if ((st.st_mode & (S_IXUSR | S_IXGRP | S_IRWXO)) != 0) {
1098 int ret = chmod(dbPath.c_str(), st.st_mode & (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP));
1099 if (ret != 0) {
1100 LOG_DEBUG("SqliteConnection LimitPermission chmod fail, err = %{public}d", errno);
1101 }
1102 }
1103 } else {
1104 LOG_ERROR("SqliteConnection LimitPermission stat fail, err = %{public}d", errno);
1105 }
1106 }
1107
ConfigLocale(const std::string & localeStr)1108 int SqliteConnection::ConfigLocale(const std::string &localeStr)
1109 {
1110 return RdbICUManager::GetInstance().ConfigLocale(dbHandle_, localeStr);
1111 }
1112
CleanDirtyData(const std::string & table,uint64_t cursor)1113 int SqliteConnection::CleanDirtyData(const std::string &table, uint64_t cursor)
1114 {
1115 if (table.empty()) {
1116 LOG_ERROR("table is empty");
1117 return E_INVALID_ARGS;
1118 }
1119 uint64_t tmpCursor = cursor == UINT64_MAX ? 0 : cursor;
1120 auto status = DropLogicDeletedData(dbHandle_, table, tmpCursor);
1121 LOG_INFO("status:%{public}d, table:%{public}s, cursor:%{public}" PRIu64 "", status,
1122 SqliteUtils::Anonymous(table).c_str(), cursor);
1123 return status == DistributedDB::DBStatus::OK ? E_OK : E_ERROR;
1124 }
1125
TryCheckPoint(bool timeout)1126 int SqliteConnection::TryCheckPoint(bool timeout)
1127 {
1128 if (!isWriter_ || config_.IsMemoryRdb()) {
1129 return E_NOT_SUPPORT;
1130 }
1131
1132 std::shared_ptr<Connection> autoCheck(slaveConnection_.get(), [this, timeout](Connection *conn) {
1133 if (conn != nullptr && backupId_ == TaskExecutor::INVALID_TASK_ID) {
1134 conn->TryCheckPoint(timeout);
1135 }
1136 });
1137 std::string walName = sqlite3_filename_wal(sqlite3_db_filename(dbHandle_, "main"));
1138 ssize_t size = SqliteUtils::GetFileSize(walName);
1139 if (size < 0) {
1140 LOG_ERROR("Invalid size for WAL:%{public}s size:%{public}zd", SqliteUtils::Anonymous(walName).c_str(), size);
1141 return E_ERROR;
1142 }
1143
1144 if (size <= config_.GetStartCheckpointSize()) {
1145 return E_OK;
1146 }
1147
1148 if (!timeout && size < config_.GetCheckpointSize()) {
1149 return E_INNER_WARNING;
1150 }
1151
1152 (void)sqlite3_busy_timeout(dbHandle_, CHECKPOINT_TIME);
1153 int errCode = sqlite3_wal_checkpoint_v2(dbHandle_, nullptr, SQLITE_CHECKPOINT_TRUNCATE, nullptr, nullptr);
1154 (void)sqlite3_busy_timeout(dbHandle_, DEFAULT_BUSY_TIMEOUT_MS);
1155 if (errCode != SQLITE_OK) {
1156 Reportor::ReportFault(RdbFaultDbFileEvent(FT_CP, E_CHECK_POINT_FAIL, config_,
1157 "LOG:cp fail, errcode=" + std::to_string(errCode), true));
1158 LOG_WARN("sqlite3_wal_checkpoint_v2 failed err:%{public}d,size:%{public}zd,wal:%{public}s.", errCode, size,
1159 SqliteUtils::Anonymous(walName).c_str());
1160 return SQLiteError::ErrNo(errCode);
1161 }
1162 return E_OK;
1163 }
1164
LimitWalSize()1165 int SqliteConnection::LimitWalSize()
1166 {
1167 if (!isConfigured_ || !isWriter_ || config_.IsMemoryRdb()) {
1168 return E_OK;
1169 }
1170
1171 std::string walName = sqlite3_filename_wal(sqlite3_db_filename(dbHandle_, "main"));
1172 ssize_t fileSize = SqliteUtils::GetFileSize(walName);
1173 if (fileSize < 0 || fileSize > config_.GetWalLimitSize()) {
1174 std::stringstream ss;
1175 ss << "The WAL file size exceeds the limit,name=" << SqliteUtils::Anonymous(walName).c_str()
1176 << ",file size=" << fileSize
1177 << ",limit size=" << config_.GetWalLimitSize();
1178 LOG_ERROR("%{public}s", ss.str().c_str());
1179 Reportor::ReportFault(RdbFaultDbFileEvent(FT_OPEN, E_WAL_SIZE_OVER_LIMIT, config_, ss.str()));
1180 return E_WAL_SIZE_OVER_LIMIT;
1181 }
1182 return E_OK;
1183 }
1184
Subscribe(const std::shared_ptr<DistributedDB::StoreObserver> & observer)1185 int32_t SqliteConnection::Subscribe(const std::shared_ptr<DistributedDB::StoreObserver> &observer)
1186 {
1187 if (!isWriter_ || observer == nullptr) {
1188 return E_OK;
1189 }
1190 int32_t errCode = RegisterStoreObserver(dbHandle_, observer);
1191 if (errCode != E_OK) {
1192 return errCode;
1193 }
1194 RegisterDbHook(dbHandle_);
1195 config_.SetRegisterInfo(RegisterType::STORE_OBSERVER, true);
1196 return E_OK;
1197 }
1198
Unsubscribe(const std::shared_ptr<DistributedDB::StoreObserver> & observer)1199 int32_t SqliteConnection::Unsubscribe(const std::shared_ptr<DistributedDB::StoreObserver> &observer)
1200 {
1201 if (!isWriter_ || observer == nullptr) {
1202 return E_OK;
1203 }
1204 int32_t errCode = UnregisterStoreObserver(dbHandle_, observer);
1205 if (errCode != 0) {
1206 return errCode;
1207 }
1208 return E_OK;
1209 }
1210
Backup(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey,bool isAsync,std::shared_ptr<SlaveStatus> slaveStatus,bool verifyDb)1211 int32_t SqliteConnection::Backup(const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey,
1212 bool isAsync, std::shared_ptr<SlaveStatus> slaveStatus, bool verifyDb)
1213 {
1214 if (*slaveStatus == SlaveStatus::BACKING_UP) {
1215 LOG_INFO("backing up, return:%{public}s", SqliteUtils::Anonymous(config_.GetName()).c_str());
1216 return E_OK;
1217 }
1218 LOG_INFO(
1219 "begin backup to slave:%{public}s, isAsync:%{public}d", SqliteUtils::Anonymous(databasePath).c_str(), isAsync);
1220 if (!isAsync) {
1221 if (slaveConnection_ == nullptr) {
1222 RdbStoreConfig rdbSlaveStoreConfig = GetSlaveRdbStoreConfig(config_);
1223 auto [errCode, conn] = CreateSlaveConnection(rdbSlaveStoreConfig, SlaveOpenPolicy::FORCE_OPEN);
1224 if (errCode != E_OK) {
1225 return errCode;
1226 }
1227 slaveConnection_ = conn;
1228 reusableReplicas_.InsertOrAssign(rdbSlaveStoreConfig.GetPath(), conn);
1229 }
1230 return ExchangeSlaverToMaster(false, verifyDb, slaveStatus);
1231 }
1232
1233 if (backupId_ == TaskExecutor::INVALID_TASK_ID) {
1234 auto pool = TaskExecutor::GetInstance().GetExecutor();
1235 if (pool == nullptr) {
1236 LOG_WARN("task pool err when restore");
1237 return E_OK;
1238 }
1239 backupId_ = pool->Execute([this, slaveStatus]() {
1240 auto [err, conn] = InnerCreate(config_, true);
1241 if (err != E_OK) {
1242 return;
1243 }
1244 err = conn->ExchangeSlaverToMaster(false, true, slaveStatus);
1245 if (err != E_OK) {
1246 LOG_WARN("master backup to slave failed:%{public}d", err);
1247 }
1248 backupId_ = TaskExecutor::INVALID_TASK_ID;
1249 });
1250 }
1251 return E_OK;
1252 }
1253
Restore(const std::string & databasePath,const std::vector<uint8_t> & destEncryptKey,std::shared_ptr<SlaveStatus> slaveStatus)1254 int32_t SqliteConnection::Restore(
1255 const std::string &databasePath, const std::vector<uint8_t> &destEncryptKey,
1256 std::shared_ptr<SlaveStatus> slaveStatus)
1257 {
1258 return ExchangeSlaverToMaster(true, true, slaveStatus);
1259 };
1260
LoadExtension(const RdbStoreConfig & config,sqlite3 * dbHandle)1261 int SqliteConnection::LoadExtension(const RdbStoreConfig &config, sqlite3 *dbHandle)
1262 {
1263 auto pluginLibs = config.GetPluginLibs();
1264 if (config.GetTokenizer() == CUSTOM_TOKENIZER) {
1265 pluginLibs.push_back("libcustomtokenizer.z.so");
1266 }
1267 if (pluginLibs.empty() || dbHandle == nullptr) {
1268 return E_OK;
1269 }
1270 if (pluginLibs.size() >
1271 SqliteUtils::MAX_LOAD_EXTENSION_COUNT + (config.GetTokenizer() == CUSTOM_TOKENIZER ? 1 : 0)) {
1272 LOG_ERROR("failed, size %{public}zu is too large", pluginLibs.size());
1273 return E_INVALID_ARGS;
1274 }
1275 int err = sqlite3_db_config(
1276 dbHandle, SQLITE_DBCONFIG_ENABLE_LOAD_EXTENSION, SqliteUtils::ENABLE_LOAD_EXTENSION, nullptr);
1277 if (err != SQLITE_OK) {
1278 LOG_ERROR("enable failed, err=%{public}d, errno=%{public}d", err, errno);
1279 return SQLiteError::ErrNo(err);
1280 }
1281 for (auto &path : pluginLibs) {
1282 if (path.empty()) {
1283 continue;
1284 }
1285 err = sqlite3_load_extension(dbHandle, path.c_str(), nullptr, nullptr);
1286 if (err != SQLITE_OK) {
1287 LOG_ERROR("load error. err=%{public}d, errno=%{public}d, errmsg:%{public}s, lib=%{public}s", err, errno,
1288 sqlite3_errmsg(dbHandle), SqliteUtils::Anonymous(path).c_str());
1289 if (access(path.c_str(), F_OK) != 0) {
1290 return E_INVALID_FILE_PATH;
1291 }
1292 break;
1293 }
1294 }
1295 int ret = sqlite3_db_config(
1296 dbHandle, SQLITE_DBCONFIG_ENABLE_LOAD_EXTENSION, SqliteUtils::DISABLE_LOAD_EXTENSION, nullptr);
1297 if (ret != SQLITE_OK) {
1298 LOG_ERROR("disable failed, err=%{public}d, errno=%{public}d", err, errno);
1299 }
1300 return SQLiteError::ErrNo(err == SQLITE_OK ? ret : err);
1301 }
1302
SetServiceKey(const RdbStoreConfig & config,int32_t errCode)1303 int SqliteConnection::SetServiceKey(const RdbStoreConfig &config, int32_t errCode)
1304 {
1305 DistributedRdb::RdbSyncerParam param;
1306 param.bundleName_ = config.GetBundleName();
1307 param.hapName_ = config.GetModuleName();
1308 param.storeName_ = config.GetName();
1309 param.customDir_ = config.GetCustomDir();
1310 param.area_ = config.GetArea();
1311 param.level_ = static_cast<int32_t>(config.GetSecurityLevel());
1312 param.type_ = config.GetDistributedType();
1313 param.isEncrypt_ = config.IsEncrypt();
1314 param.isAutoClean_ = config.GetAutoClean();
1315 param.isSearchable_ = config.IsSearchable();
1316 param.haMode_ = config.GetHaMode();
1317 param.password_ = {};
1318 param.subUser_ = config.GetSubUser();
1319 std::vector<std::vector<uint8_t>> keys;
1320 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM) && !defined(ANDROID_PLATFORM) && !defined(IOS_PLATFORM)
1321 auto [svcErr, service] = DistributedRdb::RdbManagerImpl::GetInstance().GetRdbService(param);
1322 if (svcErr != E_OK) {
1323 return errCode;
1324 }
1325 svcErr = service->GetPassword(param, keys);
1326 if (svcErr != RDB_OK) {
1327 return errCode;
1328 }
1329 #endif
1330
1331 for (const auto &key : keys) {
1332 errCode = SetEncryptKey(key, config);
1333 if (errCode == E_OK) {
1334 config.RestoreEncryptKey(key);
1335 break;
1336 }
1337 }
1338 for (auto &key : keys) {
1339 key.assign(key.size(), 0);
1340 }
1341 return errCode;
1342 }
1343
ExchangeSlaverToMaster(bool isRestore,bool verifyDb,std::shared_ptr<SlaveStatus> curStatus)1344 int SqliteConnection::ExchangeSlaverToMaster(bool isRestore, bool verifyDb, std::shared_ptr<SlaveStatus> curStatus)
1345 {
1346 *curStatus = SlaveStatus::BACKING_UP;
1347 int err = verifyDb ? ExchangeVerify(isRestore) : E_OK;
1348 if (err != E_OK) {
1349 *curStatus = SlaveStatus::UNDEFINED;
1350 return err;
1351 }
1352
1353 err = SqliteNativeBackup(isRestore, curStatus);
1354 if (err != E_OK) {
1355 return err;
1356 }
1357 if (!isRestore && IsSupportBinlog(config_) && config_.GetHaMode() != HAMode::SINGLE) {
1358 LOG_INFO("reset binlog start");
1359 sqlite3_db_config(dbHandle_, SQLITE_DBCONFIG_ENABLE_BINLOG, nullptr);
1360 SetBinlog();
1361 err = sqlite3_clean_binlog(dbHandle_, BinlogFileCleanModeE::BINLOG_FILE_CLEAN_ALL_MODE);
1362 if (err != SQLITE_OK) {
1363 sqlite3_db_config(dbHandle_, SQLITE_DBCONFIG_ENABLE_BINLOG, nullptr);
1364 SqliteUtils::SetSlaveInvalid(config_.GetPath());
1365 }
1366 LOG_INFO("reset binlog finished, %{public}d", err);
1367 }
1368 return E_OK;
1369 }
1370
1371
SqliteBackupStep(bool isRestore,sqlite3_backup * pBackup,std::shared_ptr<SlaveStatus> curStatus)1372 int SqliteConnection::SqliteBackupStep(bool isRestore, sqlite3_backup *pBackup, std::shared_ptr<SlaveStatus> curStatus)
1373 {
1374 int sleepTime = BACKUP_PRE_WAIT_TIME;
1375 if (isRestore) {
1376 sleepTime = SqliteUtils::IsSlaveRestoring(config_.GetPath()) ? RESTORE_PRE_WAIT_TIME : 0;
1377 }
1378 int rc = SQLITE_OK;
1379 do {
1380 if (!isRestore && (*curStatus == SlaveStatus::BACKUP_INTERRUPT || *curStatus == SlaveStatus::DB_CLOSING)) {
1381 rc = E_CANCEL;
1382 break;
1383 }
1384 rc = sqlite3_backup_step(pBackup, BACKUP_PAGES_PRE_STEP);
1385 LOG_INFO("backup slave process cur/total:%{public}d/%{public}d, rs:%{public}d,isRestore:%{public}d,%{public}d",
1386 sqlite3_backup_pagecount(pBackup) - sqlite3_backup_remaining(pBackup), sqlite3_backup_pagecount(pBackup),
1387 rc, isRestore, sleepTime);
1388 if (sleepTime > 0) {
1389 sqlite3_sleep(sleepTime);
1390 }
1391 } while (sqlite3_backup_pagecount(pBackup) != 0 && (rc == SQLITE_OK || rc == SQLITE_BUSY || rc == SQLITE_LOCKED));
1392 (void)sqlite3_backup_finish(pBackup);
1393 return rc;
1394 }
1395
SqliteNativeBackup(bool isRestore,std::shared_ptr<SlaveStatus> curStatus)1396 int SqliteConnection::SqliteNativeBackup(bool isRestore, std::shared_ptr<SlaveStatus> curStatus)
1397 {
1398 sqlite3 *dbFrom = isRestore ? dbHandle_ : slaveConnection_->dbHandle_;
1399 sqlite3 *dbTo = isRestore ? slaveConnection_->dbHandle_ : dbHandle_;
1400 sqlite3_backup *pBackup = sqlite3_backup_init(dbFrom, "main", dbTo, "main");
1401 if (pBackup == nullptr) {
1402 LOG_WARN("slave backup init failed");
1403 *curStatus = SlaveStatus::UNDEFINED;
1404 return E_OK;
1405 }
1406 int rc = SqliteBackupStep(isRestore, pBackup, curStatus);
1407 if (rc != SQLITE_DONE) {
1408 LOG_ERROR("backup slave err:%{public}d, isRestore:%{public}d", rc, isRestore);
1409 if (!isRestore) {
1410 RdbStoreConfig slaveConfig(slaveConnection_->config_.GetPath());
1411 if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED) {
1412 slaveConnection_ = nullptr;
1413 (void)SqliteConnection::Delete(slaveConfig.GetPath());
1414 }
1415 *curStatus = SlaveStatus::BACKUP_INTERRUPT;
1416 Reportor::ReportCorrupted(Reportor::Create(slaveConfig, SQLiteError::ErrNo(rc), "ErrorType: slaveBackup"));
1417 }
1418 return rc == E_CANCEL ? E_CANCEL : SQLiteError::ErrNo(rc);
1419 }
1420 rc = isRestore ? TryCheckPoint(true) : slaveConnection_->TryCheckPoint(true);
1421 if (rc != E_OK && config_.GetHaMode() == HAMode::MANUAL_TRIGGER) {
1422 if (!isRestore) {
1423 *curStatus = SlaveStatus::BACKUP_INTERRUPT;
1424 }
1425 LOG_WARN("CheckPoint failed err:%{public}d, isRestore:%{public}d", rc, isRestore);
1426 return E_OK;
1427 }
1428 *curStatus = SlaveStatus::BACKUP_FINISHED;
1429 SqliteUtils::SetSlaveValid(config_.GetPath());
1430 LOG_INFO("backup slave success, isRestore:%{public}d", isRestore);
1431 return E_OK;
1432 }
1433
GenerateExchangeStrategy(std::shared_ptr<SlaveStatus> status,bool isRelpay)1434 ExchangeStrategy SqliteConnection::GenerateExchangeStrategy(std::shared_ptr<SlaveStatus> status, bool isRelpay)
1435 {
1436 if (dbHandle_ == nullptr || slaveConnection_ == nullptr || slaveConnection_->dbHandle_ == nullptr ||
1437 config_.GetHaMode() == HAMode::SINGLE || *status == SlaveStatus::BACKING_UP) {
1438 return ExchangeStrategy::NOT_HANDLE;
1439 }
1440 const std::string querySql = "SELECT COUNT(*) FROM sqlite_master WHERE type='table';";
1441 const std::string qIndexSql = "SELECT COUNT(*) FROM sqlite_master WHERE type='index';";
1442 auto [mRet, mObj] = ExecuteForValue(querySql);
1443 auto [mIdxRet, mIdxObj] = ExecuteForValue(qIndexSql);
1444 if (mRet == E_SQLITE_CORRUPT || mIdxRet == E_SQLITE_CORRUPT) {
1445 LOG_WARN("main abnormal, err:%{public}d", mRet);
1446 return ExchangeStrategy::RESTORE;
1447 }
1448 int64_t mCount = static_cast<int64_t>(mObj);
1449 int64_t mIdxCount = static_cast<int64_t>(mIdxObj);
1450 // trigger mode only does restore, not backup
1451 if (config_.GetHaMode() == HAMode::MANUAL_TRIGGER) {
1452 return mCount == 0 ? ExchangeStrategy::RESTORE : ExchangeStrategy::NOT_HANDLE;
1453 }
1454 if (*status == SlaveStatus::DB_CLOSING) {
1455 return ExchangeStrategy::NOT_HANDLE;
1456 }
1457 if (*status == SlaveStatus::BACKUP_INTERRUPT) {
1458 return ExchangeStrategy::BACKUP;
1459 }
1460 if (IsSupportBinlog(config_)) {
1461 if (isRelpay) {
1462 SqliteConnection::ReplayBinlog(config_.GetPath(), slaveConnection_, false);
1463 } else if (mCount == 0) {
1464 LOG_INFO("main empty");
1465 return ExchangeStrategy::RESTORE;
1466 } else {
1467 return ExchangeStrategy::PENDING_BACKUP;
1468 }
1469 }
1470 return CompareWithSlave(mCount, mIdxCount);
1471 }
1472
SetKnowledgeSchema(const DistributedRdb::RdbKnowledgeSchema & schema)1473 int SqliteConnection::SetKnowledgeSchema(const DistributedRdb::RdbKnowledgeSchema &schema)
1474 {
1475 DistributedDB::DBStatus status = DistributedDB::DBStatus::OK;
1476 for (const auto &table : schema.tables) {
1477 DistributedDB::KnowledgeSourceSchema sourceSchema;
1478 sourceSchema.tableName = table.tableName;
1479 for (const auto &item : table.knowledgeFields) {
1480 sourceSchema.knowledgeColNames.insert(item.columnName);
1481 }
1482 sourceSchema.extendColNames = std::set<std::string>(table.referenceFields.begin(),
1483 table.referenceFields.end());
1484 status = SetKnowledgeSourceSchema(dbHandle_, sourceSchema);
1485 if (status != DistributedDB::DBStatus::OK) {
1486 return E_ERROR;
1487 }
1488 }
1489 return E_OK;
1490 }
1491
CleanDirtyLog(const std::string & table,uint64_t cursor)1492 int SqliteConnection::CleanDirtyLog(const std::string &table, uint64_t cursor)
1493 {
1494 if (table.empty()) {
1495 LOG_ERROR("table is empty");
1496 return E_INVALID_ARGS;
1497 }
1498 auto status = CleanDeletedData(dbHandle_, table, cursor);
1499 return status == DistributedDB::DBStatus::OK ? E_OK : E_ERROR;
1500 }
1501
Repair(const RdbStoreConfig & config)1502 int32_t SqliteConnection::Repair(const RdbStoreConfig &config)
1503 {
1504 std::shared_ptr<SqliteConnection> connection = std::make_shared<SqliteConnection>(config, true);
1505 if (connection == nullptr) {
1506 return E_ERROR;
1507 }
1508 RdbStoreConfig rdbSlaveStoreConfig = connection->GetSlaveRdbStoreConfig(config);
1509 if (access(rdbSlaveStoreConfig.GetPath().c_str(), F_OK) != 0) {
1510 return E_NOT_SUPPORT;
1511 }
1512 auto [ret, conn] = connection->CreateSlaveConnection(rdbSlaveStoreConfig, SlaveOpenPolicy::FORCE_OPEN);
1513 if (ret != E_OK) {
1514 return ret;
1515 }
1516 connection->slaveConnection_ = conn;
1517 if (IsSupportBinlog(config)) {
1518 SqliteConnection::ReplayBinlog(config.GetPath(), conn, false);
1519 }
1520 ret = connection->VerifySlaveIntegrity();
1521 if (ret != E_OK) {
1522 return ret;
1523 }
1524 (void)SqliteConnection::Delete(config.GetPath());
1525 ret = connection->InnerOpen(config);
1526 if (ret != E_OK) {
1527 LOG_ERROR("reopen db failed, err:%{public}d", ret);
1528 return ret;
1529 }
1530 connection->TryCheckPoint(true);
1531 std::shared_ptr<SlaveStatus> curStatus = std::make_shared<SlaveStatus>(SlaveStatus::UNDEFINED);
1532 ret = connection->ExchangeSlaverToMaster(true, false, curStatus);
1533 if (ret != E_OK) {
1534 auto slavePath = SqliteUtils::GetSlavePath(config.GetPath());
1535 LOG_ERROR("repair failed, [%{public}s]->[%{public}s], err:%{public}d",
1536 SqliteUtils::Anonymous(slavePath).c_str(), SqliteUtils::Anonymous(config.GetName()).c_str(), ret);
1537 } else {
1538 LOG_INFO("repair main success:%{public}s", SqliteUtils::Anonymous(config.GetPath()).c_str());
1539 }
1540 return ret;
1541 }
1542
ExchangeVerify(bool isRestore)1543 int SqliteConnection::ExchangeVerify(bool isRestore)
1544 {
1545 if (isRestore) {
1546 SqliteConnection::ReplayBinlog(config_);
1547 int err = VerifySlaveIntegrity();
1548 if (err != E_OK) {
1549 return err;
1550 }
1551 if (IsDbVersionBelowSlave()) {
1552 return E_OK;
1553 }
1554 if (SqliteUtils::IsSlaveInvalid(config_.GetPath())) {
1555 LOG_ERROR("incomplete slave, %{public}s", SqliteUtils::Anonymous(config_.GetName()).c_str());
1556 return E_SQLITE_CORRUPT;
1557 }
1558 return E_OK;
1559 }
1560 if (slaveConnection_ == nullptr) {
1561 return E_ALREADY_CLOSED;
1562 }
1563 if (access(config_.GetPath().c_str(), F_OK) != 0) {
1564 LOG_WARN("main no exist, isR:%{public}d, %{public}s", isRestore,
1565 SqliteUtils::Anonymous(config_.GetName()).c_str());
1566 return E_DB_NOT_EXIST;
1567 }
1568 auto [cRet, cObj] = ExecuteForValue(INTEGRITIES[1]); // 1 is quick_check
1569 if (cRet == E_OK && (static_cast<std::string>(cObj) != "ok")) {
1570 LOG_ERROR("main corrupt, cancel, %{public}s, ret:%{public}s, qRet:%{public}d",
1571 SqliteUtils::Anonymous(config_.GetName()).c_str(), static_cast<std::string>(cObj).c_str(), cRet);
1572 return E_SQLITE_CORRUPT;
1573 }
1574 SqliteUtils::SetSlaveInterrupted(config_.GetPath());
1575 return E_OK;
1576 }
1577
InnerCreate(const RdbStoreConfig & config,bool isWrite,bool isReusableReplica)1578 std::pair<int32_t, std::shared_ptr<SqliteConnection>> SqliteConnection::InnerCreate(
1579 const RdbStoreConfig &config, bool isWrite, bool isReusableReplica)
1580 {
1581 std::pair<int32_t, std::shared_ptr<SqliteConnection>> result = { E_ERROR, nullptr };
1582 auto &[errCode, conn] = result;
1583 std::shared_ptr<SqliteConnection> connection = std::make_shared<SqliteConnection>(config, isWrite);
1584 if (connection == nullptr) {
1585 LOG_ERROR("connection is nullptr.");
1586 return result;
1587 }
1588
1589 errCode = connection->InnerOpen(config);
1590 if (errCode != E_OK) {
1591 return result;
1592 }
1593 conn = connection;
1594 if (isWrite && config.GetHaMode() != HAMode::SINGLE) {
1595 RdbStoreConfig slaveCfg = connection->GetSlaveRdbStoreConfig(config);
1596 auto [err, slaveConn] = connection->CreateSlaveConnection(slaveCfg, SlaveOpenPolicy::OPEN_IF_DB_VALID);
1597 if (err != E_OK) {
1598 return result;
1599 }
1600 conn->slaveConnection_ = slaveConn;
1601 conn->SetBinlog();
1602 if (isReusableReplica) {
1603 reusableReplicas_.InsertOrAssign(slaveCfg.GetPath(), slaveConn);
1604 }
1605 }
1606 return result;
1607 }
1608
VerifySlaveIntegrity()1609 int SqliteConnection::VerifySlaveIntegrity()
1610 {
1611 if (slaveConnection_ == nullptr) {
1612 return E_ALREADY_CLOSED;
1613 }
1614
1615 RdbStoreConfig slaveCfg = GetSlaveRdbStoreConfig(config_);
1616 std::map<std::string, DebugInfo> bugInfo = Connection::Collect(slaveCfg);
1617 LOG_INFO("%{public}s", SqliteUtils::FormatDebugInfoBrief(bugInfo,
1618 SqliteUtils::Anonymous(slaveCfg.GetName())).c_str());
1619
1620 if (SqliteUtils::IsSlaveInterrupted(config_.GetPath())) {
1621 return E_SQLITE_CORRUPT;
1622 }
1623 Suspender suspender(Suspender::SQL_LOG);
1624 std::string sql = "SELECT COUNT(*) FROM sqlite_master WHERE type='table';";
1625 auto [err, obj] = slaveConnection_->ExecuteForValue(sql);
1626 auto val = std::get_if<int64_t>(&obj.value);
1627 if (err == E_SQLITE_CORRUPT || (val != nullptr && static_cast<int64_t>(*val) == 0L)) {
1628 LOG_ERROR("slave %{public}d", err);
1629 return E_SQLITE_CORRUPT;
1630 }
1631
1632 int64_t mCount = 0L;
1633 if (dbHandle_ != nullptr) {
1634 std::tie(err, obj) = ExecuteForValue(sql);
1635 val = std::get_if<int64_t>(&obj.value);
1636 if (val != nullptr) {
1637 mCount = static_cast<int64_t>(*val);
1638 }
1639 }
1640 ssize_t slaveSize = 0;
1641 if (IsSupportBinlog(config_)) {
1642 slaveSize = SqliteUtils::GetDecompressedSize(slaveCfg.GetPath());
1643 }
1644 if (slaveSize == 0 && bugInfo.find(FILE_SUFFIXES[DB_INDEX].debug_) != bugInfo.end()) {
1645 slaveSize = bugInfo[FILE_SUFFIXES[DB_INDEX].debug_].size_;
1646 }
1647 if (slaveSize > SLAVE_INTEGRITY_CHECK_LIMIT && mCount == 0L) {
1648 return SqliteUtils::IsSlaveInvalid(config_.GetPath()) ? E_SQLITE_CORRUPT : E_OK;
1649 }
1650
1651 std::tie(err, obj) = slaveConnection_->ExecuteForValue(INTEGRITIES[2]); // 2 is integrity_check
1652 if (err == E_OK && (static_cast<std::string>(obj) != "ok")) {
1653 LOG_ERROR("slave corrupt, ret:%{public}s, cRet:%{public}d, %{public}d", static_cast<std::string>(obj).c_str(),
1654 err, errno);
1655 SqliteUtils::SetSlaveInvalid(config_.GetPath());
1656 return E_SQLITE_CORRUPT;
1657 }
1658 return E_OK;
1659 }
1660
IsDbVersionBelowSlave()1661 bool SqliteConnection::IsDbVersionBelowSlave()
1662 {
1663 if (slaveConnection_ == nullptr) {
1664 return false;
1665 }
1666 Suspender suspender(Suspender::SQL_LOG);
1667 auto [cRet, cObj] = ExecuteForValue("SELECT COUNT(*) FROM sqlite_master WHERE type='table';");
1668 auto cVal = std::get_if<int64_t>(&cObj.value);
1669 if (cRet == E_SQLITE_CORRUPT || (cVal != nullptr && (static_cast<int64_t>(*cVal) == 0L))) {
1670 LOG_INFO("main empty, %{public}d, %{public}s", cRet, SqliteUtils::Anonymous(config_.GetName()).c_str());
1671 return true;
1672 }
1673
1674 std::tie(cRet, cObj) = ExecuteForValue(GlobalExpr::PRAGMA_VERSION);
1675 if (cVal == nullptr || (cVal != nullptr && static_cast<int64_t>(*cVal) == 0L)) {
1676 std::tie(cRet, cObj) = slaveConnection_->ExecuteForValue(GlobalExpr::PRAGMA_VERSION);
1677 cVal = std::get_if<int64_t>(&cObj.value);
1678 if (cVal != nullptr && static_cast<int64_t>(*cVal) > 0L) {
1679 LOG_INFO("version, %{public}" PRId64, static_cast<int64_t>(*cVal));
1680 return true;
1681 }
1682 }
1683 return false;
1684 }
1685
BinlogOnErrFunc(void * pCtx,int errNo,char * errMsg,const char * dbPath)1686 void SqliteConnection::BinlogOnErrFunc(void *pCtx, int errNo, char *errMsg, const char *dbPath)
1687 {
1688 if (dbPath == nullptr) {
1689 LOG_WARN("path is null");
1690 return;
1691 }
1692 std::string dbPathStr(dbPath);
1693 LOG_WARN("binlog failed, mark invalid %{public}s", SqliteUtils::Anonymous(dbPathStr).c_str());
1694 SqliteUtils::SetSlaveInvalid(dbPathStr);
1695 }
1696
BinlogOpenHandle(const std::string & dbPath,sqlite3 * & dbHandle,bool isMemoryRdb)1697 int SqliteConnection::BinlogOpenHandle(const std::string &dbPath, sqlite3 *&dbHandle, bool isMemoryRdb)
1698 {
1699 uint32_t openFileFlags = (SQLITE_OPEN_READWRITE | SQLITE_OPEN_FULLMUTEX);
1700 if (isMemoryRdb) {
1701 openFileFlags |= SQLITE_OPEN_URI;
1702 }
1703 sqlite3 *db = nullptr;
1704 int err = sqlite3_open_v2(dbPath.c_str(), &db, static_cast<int>(openFileFlags), nullptr);
1705 if (err != SQLITE_OK) {
1706 LOG_ERROR("open binlog handle error. rc=%{public}d, errno=%{public}d, p=%{public}s",
1707 err, errno, SqliteUtils::Anonymous(dbPath).c_str());
1708 sqlite3_close_v2(db);
1709 return E_INVALID_FILE_PATH;
1710 }
1711 dbHandle = db;
1712 return E_OK;
1713 }
1714
BinlogCloseHandle(sqlite3 * dbHandle)1715 void SqliteConnection::BinlogCloseHandle(sqlite3 *dbHandle)
1716 {
1717 if (dbHandle != nullptr) {
1718 int errCode = sqlite3_close_v2(dbHandle);
1719 if (errCode != SQLITE_OK) {
1720 LOG_ERROR("could not close binlog handle err = %{public}d, errno = %{public}d", errCode, errno);
1721 }
1722 }
1723 }
1724
CheckPathExist(const std::string & dbPath)1725 int SqliteConnection::CheckPathExist(const std::string &dbPath)
1726 {
1727 bool isDbFileExist = access(dbPath.c_str(), F_OK) == 0;
1728 if (!isDbFileExist) {
1729 LOG_ERROR("db %{public}s not exist errno is %{public}d",
1730 SqliteUtils::Anonymous(dbPath).c_str(), errno);
1731 return E_DB_NOT_EXIST;
1732 }
1733 return E_OK;
1734 }
1735
BinlogSetConfig(sqlite3 * dbHandle)1736 void SqliteConnection::BinlogSetConfig(sqlite3 *dbHandle)
1737 {
1738 Sqlite3BinlogConfig binLogConfig = {
1739 .mode = Sqlite3BinlogMode::ROW,
1740 .fullCallbackThreshold = BINLOG_FILE_NUMS_LIMIT,
1741 .maxFileSize = BINLOG_FILE_SIZE_LIMIT,
1742 .xErrorCallback = &BinlogOnErrFunc,
1743 .xLogFullCallback = nullptr,
1744 .callbackCtx = nullptr,
1745 };
1746
1747 int err = sqlite3_db_config(dbHandle, SQLITE_DBCONFIG_ENABLE_BINLOG, &binLogConfig);
1748 if (err != SQLITE_OK) {
1749 LOG_ERROR("set binlog config error. err=%{public}d, errno=%{public}d", err, errno);
1750 }
1751 }
1752
BinlogOnFullFunc(void * pCtx,unsigned short currentCount,const char * dbPath)1753 void SqliteConnection::BinlogOnFullFunc(void *pCtx, unsigned short currentCount, const char *dbPath)
1754 {
1755 if (dbPath == nullptr) {
1756 LOG_WARN("path is null");
1757 return;
1758 }
1759 auto pool = TaskExecutor::GetInstance().GetExecutor();
1760 if (pool == nullptr) {
1761 LOG_WARN("get pool failed");
1762 return;
1763 }
1764 std::string dbPathStr(dbPath);
1765 auto [found, weakPtr] = reusableReplicas_.Find(SqliteUtils::GetSlavePath(dbPathStr));
1766 if (!found) {
1767 LOG_WARN("no replica connection for %{public}s", SqliteUtils::Anonymous(dbPath).c_str());
1768 return;
1769 }
1770 auto slaveConn = weakPtr.lock();
1771 if (slaveConn == nullptr) {
1772 LOG_WARN("replica connection expired for %{public}s", SqliteUtils::Anonymous(dbPath).c_str());
1773 return;
1774 }
1775 pool->Execute([dbPathStr, slaveConn] {
1776 SqliteConnection::ReplayBinlog(dbPathStr, slaveConn, true);
1777 });
1778 }
1779
SetBinlog()1780 int SqliteConnection::SetBinlog()
1781 {
1782 if (!IsSupportBinlog(config_)) {
1783 return E_OK;
1784 }
1785 Sqlite3BinlogConfig binLogConfig = {
1786 .mode = Sqlite3BinlogMode::ROW,
1787 .fullCallbackThreshold = BINLOG_FILE_NUMS_LIMIT,
1788 .maxFileSize = BINLOG_FILE_SIZE_LIMIT,
1789 .xErrorCallback = &BinlogOnErrFunc,
1790 .xLogFullCallback = &BinlogOnFullFunc,
1791 .callbackCtx = nullptr,
1792 };
1793
1794 LOG_INFO("binlog: open %{public}s", SqliteUtils::Anonymous(config_.GetPath()).c_str());
1795 int err = sqlite3_db_config(dbHandle_, SQLITE_DBCONFIG_ENABLE_BINLOG, &binLogConfig);
1796 if (err != SQLITE_OK) {
1797 LOG_ERROR("set binlog error. err=%{public}d, errno=%{public}d", err, errno);
1798 return err;
1799 }
1800 return E_OK;
1801 }
1802
ReplayBinlog(const std::string & dbPath,std::shared_ptr<SqliteConnection> slaveConn,bool isNeedClean)1803 void SqliteConnection::ReplayBinlog(const std::string &dbPath,
1804 std::shared_ptr<SqliteConnection> slaveConn, bool isNeedClean)
1805 {
1806 auto errCode = SqliteConnection::CheckPathExist(dbPath);
1807 if (errCode != E_OK) {
1808 LOG_WARN("main db does not exist, %{public}d", errCode);
1809 return;
1810 }
1811 if (slaveConn == nullptr || slaveConn->dbHandle_ == nullptr) {
1812 LOG_WARN("backup db does not exist, %{public}d", slaveConn == nullptr);
1813 return;
1814 }
1815 sqlite3 *dbFrom = nullptr;
1816 errCode = SqliteConnection::BinlogOpenHandle(dbPath, dbFrom, false);
1817 if (errCode != E_OK) {
1818 return;
1819 }
1820 SqliteConnection::BinlogSetConfig(dbFrom);
1821 errCode = SQLiteError::ErrNo(sqlite3_replay_binlog(dbFrom, slaveConn->dbHandle_));
1822 if (errCode != E_OK) {
1823 LOG_WARN("async replay err:%{public}d", errCode);
1824 } else if (isNeedClean) {
1825 errCode = SQLiteError::ErrNo(sqlite3_clean_binlog(dbFrom, BinlogFileCleanModeE::BINLOG_FILE_CLEAN_READ_MODE));
1826 LOG_INFO("clean finished, %{public}d, %{public}s", errCode, SqliteUtils::Anonymous(dbPath).c_str());
1827 }
1828 SqliteConnection::BinlogCloseHandle(dbFrom);
1829 return;
1830 }
1831
ReplayBinlog(const RdbStoreConfig & config)1832 void SqliteConnection::ReplayBinlog(const RdbStoreConfig &config)
1833 {
1834 if (!IsSupportBinlog(config)) {
1835 return;
1836 }
1837 if (slaveConnection_ == nullptr) {
1838 LOG_WARN("back up does not exist");
1839 return;
1840 }
1841 if (SqliteConnection::CheckPathExist(config.GetPath()) != E_OK) {
1842 LOG_WARN("main db does not exist");
1843 return;
1844 }
1845 int err = SQLiteError::ErrNo(sqlite3_replay_binlog(dbHandle_, slaveConnection_->dbHandle_));
1846 if (err != E_OK) {
1847 LOG_WARN("replay err:%{public}d", err);
1848 }
1849 return;
1850 }
1851
SetIsSupportBinlog(bool isSupport)1852 void SqliteConnection::SetIsSupportBinlog(bool isSupport)
1853 {
1854 isSupportBinlog_ = isSupport;
1855 }
1856
IsSupportBinlog(const RdbStoreConfig & config)1857 bool SqliteConnection::IsSupportBinlog(const RdbStoreConfig &config)
1858 {
1859 #if !defined(CROSS_PLATFORM)
1860 if (sqlite3_is_support_binlog == nullptr) {
1861 return false;
1862 }
1863 if (sqlite3_is_support_binlog(config.GetName().c_str()) != SQLITE_OK) {
1864 return false;
1865 }
1866 return !config.IsEncrypt() && !config.IsMemoryRdb();
1867 #else
1868 return false;
1869 #endif
1870 }
1871
GetBinlogFolderPath(const std::string & dbPath)1872 std::string SqliteConnection::GetBinlogFolderPath(const std::string &dbPath)
1873 {
1874 std::string suffix(BINLOG_FOLDER_SUFFIX);
1875 return dbPath + suffix;
1876 }
1877
CompareWithSlave(int64_t mCount,int64_t mIdxCount)1878 ExchangeStrategy SqliteConnection::CompareWithSlave(int64_t mCount, int64_t mIdxCount)
1879 {
1880 const std::string querySql = "SELECT COUNT(*) FROM sqlite_master WHERE type='table';";
1881 const std::string qIndexSql = "SELECT COUNT(*) FROM sqlite_master WHERE type='index';";
1882 auto [sRet, sObj] = slaveConnection_->ExecuteForValue(querySql);
1883 auto [sInxRet, sInxObj] = slaveConnection_->ExecuteForValue(qIndexSql);
1884 if (sRet == E_SQLITE_CORRUPT || sInxRet == E_SQLITE_CORRUPT) {
1885 LOG_WARN("slave db abnormal, need backup, err:%{public}d", sRet);
1886 return ExchangeStrategy::BACKUP;
1887 }
1888 int64_t sCount = static_cast<int64_t>(sObj);
1889 int64_t sIdxCount = static_cast<int64_t>(sInxObj);
1890 if ((mCount == sCount && mIdxCount == sIdxCount) && !SqliteUtils::IsSlaveInvalid(config_.GetPath())) {
1891 LOG_INFO("equal, main:%{public}" PRId64 ",slave:%{public}" PRId64, mCount, sCount);
1892 return ExchangeStrategy::NOT_HANDLE;
1893 }
1894 if (mCount == 0) {
1895 LOG_INFO("main empty, main:%{public}" PRId64 ",slave:%{public}" PRId64, mCount, sCount);
1896 return ExchangeStrategy::RESTORE;
1897 }
1898 LOG_INFO("backup, main:[%{public}" PRId64 ",%{public}" PRId64 "], slave:[%{public}" PRId64 ",%{public}" PRId64 "]",
1899 mCount, mIdxCount, sCount, sIdxCount);
1900 return ExchangeStrategy::BACKUP;
1901 }
1902
RegisterAlgo(const std::string & clstAlgoName,ClusterAlgoFunc func)1903 int SqliteConnection::RegisterAlgo(const std::string &clstAlgoName, ClusterAlgoFunc func)
1904 {
1905 return E_NOT_SUPPORT;
1906 }
1907
ClientCleanUp()1908 int32_t SqliteConnection::ClientCleanUp()
1909 {
1910 Clean(false);
1911 return E_OK;
1912 }
1913
OpenSSLCleanUp()1914 int32_t SqliteConnection::OpenSSLCleanUp()
1915 {
1916 Clean(true);
1917 return E_OK;
1918 }
1919 } // namespace NativeRdb
1920 } // namespace OHOS