1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "ConnectionPool"
17 #include "connection_pool.h"
18
19 #include <base_transaction.h>
20
21 #include <condition_variable>
22 #include <iterator>
23 #include <mutex>
24 #include <sstream>
25 #include <vector>
26
27 #include "connection.h"
28 #include "logger.h"
29 #include "rdb_common.h"
30 #include "rdb_errno.h"
31 #include "rdb_fault_hiview_reporter.h"
32 #include "rdb_sql_statistic.h"
33 #include "rdb_perfStat.h"
34 #include "sqlite_global_config.h"
35 #include "sqlite_utils.h"
36 #include "task_executor.h"
37
38 namespace OHOS {
39 namespace NativeRdb {
40 using namespace OHOS::Rdb;
41 using namespace std::chrono;
42 using Conn = Connection;
43 using ConnPool = ConnectionPool;
44 using SharedConn = std::shared_ptr<Connection>;
45 using SharedConns = std::vector<std::shared_ptr<Connection>>;
46 using SqlStatistic = DistributedRdb::SqlStatistic;
47 using PerfStat = DistributedRdb::PerfStat;
48 using Reportor = RdbFaultHiViewReporter;
49 constexpr int32_t TRANSACTION_TIMEOUT(2);
50 constexpr int64_t WAIT_TIME = 2;
51
Create(const RdbStoreConfig & config,int & errCode)52 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
53 {
54 std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
55 if (pool == nullptr) {
56 LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
57 errCode = E_ERROR;
58 return nullptr;
59 }
60 std::shared_ptr<Connection> conn;
61 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
62 std::tie(errCode, conn) = pool->Init();
63 if (errCode != E_SQLITE_CORRUPT) {
64 break;
65 }
66 config.SetIter(ITER_V1);
67 }
68 std::string dbPath;
69 (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
70 LOG_INFO("code:%{public}d app[%{public}s:%{public}s] area[%{public}s] "
71 "cfg[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
72 "%{public}s",
73 errCode, config.GetBundleName().c_str(), config.GetModuleName().c_str(),
74 SqliteUtils::GetArea(dbPath).c_str(), config.GetDBType(), config.GetHaMode(), config.IsEncrypt(),
75 config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(), config.IsReadOnly(),
76 SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config),
77 SqliteUtils::Anonymous(config.GetName())).c_str());
78 return errCode == E_OK ? pool : nullptr;
79 }
80
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)81 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption(
82 const RdbStoreConfig &storeConfig, int &errCode)
83 {
84 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
85 auto &[rebuiltType, pool] = result;
86
87 int repairErrCode = Connection::Repair(storeConfig);
88 if (repairErrCode == E_OK) {
89 rebuiltType = RebuiltType::REPAIRED;
90 } else if (storeConfig.GetAllowRebuild()) {
91 Connection::Delete(storeConfig);
92 rebuiltType = RebuiltType::REBUILT;
93 } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
94 return result;
95 } else {
96 errCode = E_SQLITE_CORRUPT;
97 return result;
98 }
99 pool = Create(storeConfig, errCode);
100 if (errCode != E_OK) {
101 LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d errno %{public}d",
102 static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
103 storeConfig.IsEncrypt(), errCode, errno);
104 } else {
105 Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild", false), false);
106 }
107
108 return result;
109 }
110
ConnectionPool(const RdbStoreConfig & storeConfig)111 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
112 : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
113 transactionUsed_(false)
114 {
115 attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
116 trans_.right_ = Container::MIN_TRANS_ID;
117 trans_.left_ = trans_.right_;
118 clearActuator_ = std::make_shared<DelayActuator<std::vector<std::weak_ptr<ConnPool::ConnNode>>,
119 std::function<void(
120 std::vector<std::weak_ptr<ConnPool::ConnNode>> & out, std::shared_ptr<ConnPool::ConnNode> && input)>>>(
121 [](auto &out, std::shared_ptr<ConnNode> &&input) {
122 for (auto &node : out) {
123 if (node.lock() == input) {
124 return;
125 }
126 }
127 out.push_back(std::move(input));
128 },
129 FIRST_DELAY_INTERVAL,
130 MIN_EXECUTE_INTERVAL,
131 MAX_EXECUTE_INTERVAL);
132 clearActuator_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
133 clearActuator_->SetTask([](std::vector<std::weak_ptr<ConnPool::ConnNode>> &&nodes) {
134 for (auto &node : nodes) {
135 auto realNode = node.lock();
136 if (realNode == nullptr || realNode->connect_ == nullptr) {
137 continue;
138 }
139 realNode->connect_->ClearCache(true);
140 }
141 return E_OK;
142 });
143 }
144
Init(bool isAttach,bool needWriter)145 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
146 {
147 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
148 std::pair<int32_t, std::shared_ptr<Connection>> result;
149 auto &[errCode, conn] = result;
150 errCode = config.Initialize();
151 if (errCode != E_OK) {
152 return result;
153 }
154
155 if ((config.GetRoleType() == OWNER || config.GetRoleType() == VISITOR_WRITE) && !config.IsReadOnly()) {
156 // write connect count is 1
157 std::shared_ptr<ConnPool::ConnNode> node;
158 auto create = [this, isAttach]() {
159 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
160 return Connection::Create(config, true);
161 };
162 std::tie(errCode, node) = writers_.Initialize(create, 1, config.GetWriteTime(), true, needWriter);
163 conn = Convert2AutoConn(node);
164 if (errCode != E_OK) {
165 return result;
166 }
167 trans_.InitMembers(create, MAX_TRANS, 0, false);
168 }
169 isAttach_ = isAttach;
170 maxReader_ = GetMaxReaders(config);
171 // max read connect count is 64
172 if (maxReader_ > 64) {
173 return { E_ARGS_READ_CON_OVERLOAD, nullptr };
174 }
175 auto [ret, node] = readers_.Initialize(
176 [this, isAttach]() {
177 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
178 return Connection::Create(config, false);
179 },
180 maxReader_, config.GetReadTime(), maxReader_ == 0);
181 errCode = ret;
182 return result;
183 }
184
~ConnectionPool()185 ConnPool::~ConnectionPool()
186 {
187 clearActuator_ = nullptr;
188 CloseAllConnections();
189 }
190
GetMaxReaders(const RdbStoreConfig & config)191 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
192 {
193 if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
194 config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
195 return config.GetReadConSize();
196 } else {
197 return 0;
198 }
199 }
200
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)201 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
202 {
203 if (node == nullptr) {
204 return nullptr;
205 }
206
207 auto conn = node->GetConnect();
208 if (conn == nullptr) {
209 return nullptr;
210 }
211 conn->VerifyAndRegisterHook(config_);
212 if (isTrans) {
213 transCount_++;
214 }
215 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
216 auto realPool = pool.lock();
217 if (realPool == nullptr) {
218 return;
219 }
220 realPool->ReleaseNode(node, isTrans);
221 if (isTrans) {
222 realPool->transCount_--;
223 }
224 node = nullptr;
225 });
226 }
227
DelayClearTrans()228 void ConnPool::DelayClearTrans()
229 {
230 auto pool = TaskExecutor::GetInstance().GetExecutor();
231 if (pool == nullptr) {
232 LOG_ERROR("pool is nullptr.");
233 return;
234 }
235 pool->Schedule(TRANS_CLEAR_INTERVAL, [pool = weak_from_this()]() {
236 auto realPool = pool.lock();
237 if (realPool == nullptr) {
238 return;
239 }
240 realPool->trans_.ClearUnusedTrans(realPool);
241 });
242 }
243
CloseAllConnections()244 void ConnPool::CloseAllConnections()
245 {
246 writers_.Clear();
247 readers_.Clear();
248 trans_.Clear();
249 }
250
IsInTransaction()251 bool ConnPool::IsInTransaction()
252 {
253 return isInTransaction_.load();
254 }
255
SetInTransaction(bool isInTransaction)256 void ConnPool::SetInTransaction(bool isInTransaction)
257 {
258 isInTransaction_.store(isInTransaction);
259 }
260
CreateTransConn(bool limited)261 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
262 {
263 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_WAIT);
264 if (transCount_.load() >= MAX_TRANS && limited) {
265 trans_.Dump("NO TRANS", transCount_ + isInTransaction_);
266 writers_.Dump("NO TRANS WRITE", transCount_ + isInTransaction_);
267 return { E_DATABASE_BUSY, nullptr };
268 }
269 if (trans_.Empty()) {
270 DelayClearTrans();
271 }
272 auto [errCode, node] = trans_.Acquire(INVALID_TIME);
273 return { errCode, Convert2AutoConn(node, true) };
274 }
275
AcquireConnection(bool isReadOnly)276 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
277 {
278 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
279 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_WAIT);
280 return Acquire(isReadOnly);
281 }
282
AcquireAll(int32_t time)283 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
284 {
285 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
286 PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_WAIT);
287 using namespace std::chrono;
288 std::pair<SharedConn, SharedConns> result;
289 auto &[writer, readers] = result;
290 auto interval = duration_cast<milliseconds>(seconds(time));
291 auto start = steady_clock::now();
292 auto [res, writerNodes] = writers_.AcquireAll(interval);
293 if (!res || writerNodes.empty()) {
294 return {};
295 }
296 writer = Convert2AutoConn(writerNodes.front());
297
298 auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
299 if (writer == nullptr || usedTime >= interval) {
300 return {};
301 }
302
303 if (maxReader_ == 0) {
304 return result;
305 }
306
307 readers_.Disable();
308 std::list<std::shared_ptr<ConnPool::ConnNode>> nodes;
309 std::tie(res, nodes) = readers_.AcquireAll(interval - usedTime);
310 if (!res) {
311 readers_.Enable();
312 return {};
313 }
314
315 for (auto node : nodes) {
316 auto conn = Convert2AutoConn(node);
317 if (conn == nullptr) {
318 continue;
319 }
320 readers.push_back(conn);
321 }
322 usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
323 if (usedTime >= interval) {
324 return {};
325 }
326 trans_.Disable();
327 std::list<std::shared_ptr<ConnPool::ConnNode>> trans;
328 std::tie(res, trans) = trans_.AcquireAll(interval - usedTime);
329 if (!res) {
330 trans_.Enable();
331 return {};
332 }
333 return result;
334 }
335
Acquire(bool isReadOnly,std::chrono::milliseconds ms)336 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
337 {
338 Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
339 auto [errCode, node] = container->Acquire(ms);
340 if (errCode != E_OK || node == nullptr) {
341 const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
342 container->Dump(header, transCount_ + isInTransaction_);
343 return nullptr;
344 }
345 return Convert2AutoConn(node);
346 }
347
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)348 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
349 {
350 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
351 PerfStat perfStat(config_.GetPath(), "", SqlStatistic::Step::STEP_WAIT);
352 if (maxReader_ != 0) {
353 return Acquire(isReadOnly, ms);
354 }
355 auto [errCode, node] = writers_.Acquire(ms);
356 if (errCode != E_OK || node == nullptr) {
357 writers_.Dump("writers_", transCount_ + isInTransaction_);
358 return nullptr;
359 }
360 auto conn = node->connect_;
361 writers_.Release(node);
362 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
363 auto realPool = pool.lock();
364 if (realPool == nullptr) {
365 return;
366 }
367 realPool->writers_.cond_.notify_all();
368 });
369 }
370
ReleaseNode(std::shared_ptr<ConnNode> node,bool isTrans)371 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool isTrans)
372 {
373 if (node == nullptr) {
374 return;
375 }
376
377 auto now = steady_clock::now();
378 auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
379 failedTime_.load() == steady_clock::time_point();
380 auto transCount = transCount_ + isInTransaction_;
381 auto remainCount = isTrans ? transCount - 1 : transCount;
382 auto errCode = node->Unused(remainCount, timeout);
383 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
384 writers_.Dump("WAL writers_", transCount);
385 trans_.Dump("WAL trans_", transCount);
386 readers_.Dump("WAL readers_", transCount);
387 }
388 if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
389 failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
390 }
391
392 if (isTrans) {
393 trans_.ReleaseTrans(node);
394 } else {
395 auto &container = node->IsWriter() ? writers_ : readers_;
396 container.Release(node);
397 }
398 if (clearActuator_ != nullptr) {
399 clearActuator_->Execute<std::shared_ptr<ConnNode>>(std::move(node));
400 }
401 }
402
AcquireTransaction()403 int ConnPool::AcquireTransaction()
404 {
405 std::unique_lock<std::mutex> lock(transMutex_);
406 if (transCondition_.wait_for(
407 lock, std::chrono::seconds(TRANSACTION_TIMEOUT),
408 [this] { return !transactionUsed_; })) {
409 transactionUsed_ = true;
410 return E_OK;
411 }
412 LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
413 return E_DATABASE_BUSY;
414 }
415
ReleaseTransaction()416 void ConnPool::ReleaseTransaction()
417 {
418 {
419 std::unique_lock<std::mutex> lock(transMutex_);
420 transactionUsed_ = false;
421 }
422 transCondition_.notify_one();
423 }
424
RestartConns()425 int ConnPool::RestartConns()
426 {
427 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
428 readers_.Clear();
429 auto [errCode, node] = readers_.Initialize(
430 [this]() {
431 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
432 return Connection::Create(config, false);
433 },
434 maxReader_, config.GetReadTime(), maxReader_ == 0);
435 trans_.Clear();
436 trans_.InitMembers(
437 [this]() {
438 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
439 return Connection::Create(config, true);
440 },
441 MAX_TRANS, 0, false);
442 return errCode;
443 }
444
ReopenConns()445 int ConnPool::ReopenConns()
446 {
447 CloseAllConnections();
448 auto initRes = Init();
449 if (initRes.first != E_OK) {
450 LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
451 return initRes.first;
452 }
453 return E_OK;
454 }
455
456 /**
457 * The database locale.
458 */
ConfigLocale(const std::string & localeStr)459 int ConnPool::ConfigLocale(const std::string &localeStr)
460 {
461 auto errCode = readers_.ConfigLocale(localeStr);
462 if (errCode != E_OK) {
463 return errCode;
464 }
465 errCode = writers_.ConfigLocale(localeStr);
466 if (errCode != E_OK) {
467 return errCode;
468 }
469 return trans_.ConfigLocale(localeStr);
470 }
471
472 /**
473 * Rename the backed up database.
474 */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,std::shared_ptr<SlaveStatus> slaveStatus)475 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
476 const std::vector<uint8_t> &newKey, std::shared_ptr<SlaveStatus> slaveStatus)
477 {
478 if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
479 LOG_ERROR("Connection pool is busy now!");
480 return E_ERROR;
481 }
482 if (config_.GetDBType() == DB_VECTOR) {
483 CloseAllConnections();
484 auto [retVal, conn] = Connection::Create(config_, false);
485 if (retVal != E_OK) {
486 LOG_ERROR("Create connection fail, errCode:%{public}d", retVal);
487 return retVal;
488 }
489
490 retVal = conn->Restore(backupPath, newKey, slaveStatus);
491 if (retVal != E_OK) {
492 LOG_ERROR("Restore failed, errCode:0x%{public}x", retVal);
493 return retVal;
494 }
495
496 conn = nullptr;
497 auto initRes = Init();
498 if (initRes.first != E_OK) {
499 LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
500 return initRes.first;
501 }
502 return retVal;
503 }
504 return RestoreByDbSqliteType(newPath, backupPath, slaveStatus);
505 }
506
RestoreByDbSqliteType(const std::string & newPath,const std::string & backupPath,std::shared_ptr<SlaveStatus> slaveStatus)507 int ConnPool::RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath,
508 std::shared_ptr<SlaveStatus> slaveStatus)
509 {
510 if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() != HAMode::SINGLE) {
511 auto connection = AcquireConnection(false);
512 if (connection == nullptr) {
513 return E_DATABASE_BUSY;
514 }
515 return connection->Restore(backupPath, {}, slaveStatus);
516 }
517
518 return RestoreMasterDb(newPath, backupPath);
519 }
520
RestoreMasterDb(const std::string & newPath,const std::string & backupPath)521 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath)
522 {
523 if (!CheckIntegrity(backupPath)) {
524 LOG_ERROR("backup file is corrupted, %{public}s", SqliteUtils::Anonymous(backupPath).c_str());
525 return E_SQLITE_CORRUPT;
526 }
527 auto walFile = backupPath + "-wal";
528 if (SqliteUtils::GetFileSize(walFile) != 0) {
529 LOG_ERROR("Wal file exist.");
530 return E_SQLITE_CORRUPT;
531 }
532
533 SqliteUtils::DeleteFile(backupPath + "-shm");
534 SqliteUtils::DeleteFile(backupPath + "-wal");
535
536 CloseAllConnections();
537 Connection::Delete(config_);
538
539 if (config_.GetPath() != newPath) {
540 RdbStoreConfig config(newPath);
541 config.SetPath(newPath);
542 Connection::Delete(config);
543 }
544
545 bool copyRet = SqliteUtils::CopyFile(backupPath, newPath);
546 int32_t errCode = E_OK;
547 std::shared_ptr<Connection> pool;
548 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
549 std::tie(errCode, pool) = Init();
550 if (errCode == E_OK) {
551 break;
552 }
553 if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
554 break;
555 }
556 config_.SetIter(ITER_V1);
557 }
558 if (errCode != E_OK) {
559 CloseAllConnections();
560 Connection::Delete(config_);
561 std::tie(errCode, pool) = Init();
562 LOG_WARN("Restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
563 SqliteUtils::Anonymous(backupPath).c_str());
564 }
565 return copyRet ? errCode : E_ERROR;
566 }
567
Rekey(const RdbStoreConfig::CryptoParam & cryptoParam)568 int ConnPool::Rekey(const RdbStoreConfig::CryptoParam &cryptoParam)
569 {
570 int errCode = E_OK;
571 auto [connection, readers] = AcquireAll(WAIT_TIME);
572 if (connection == nullptr) {
573 return E_DATABASE_BUSY;
574 }
575 CloseAllConnections();
576 auto key = config_.GetEncryptKey();
577 readers.clear();
578
579 errCode = connection->Rekey(cryptoParam);
580 if (errCode != E_OK) {
581 LOG_ERROR("ReKey failed, err = %{public}d", errCode);
582 key.assign(key.size(), 0);
583 return errCode;
584 }
585 config_.ResetEncryptKey(cryptoParam.encryptKey_);
586 connection = nullptr;
587 auto initRes = Init();
588 if (initRes.first != E_OK) {
589 LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
590 config_.ResetEncryptKey(key);
591 key.assign(key.size(), 0);
592 initRes = Init();
593 return initRes.first;
594 }
595 return E_OK;
596 }
597
GetTransactionStack()598 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
599 {
600 return transactionStack_;
601 }
602
GetTransactionStackMutex()603 std::mutex &ConnPool::GetTransactionStackMutex()
604 {
605 return transactionStackMutex_;
606 }
607
DisableWal()608 std::pair<int32_t, std::shared_ptr<Conn>> ConnPool::DisableWal()
609 {
610 return Init(true, true);
611 }
612
EnableWal()613 int ConnPool::EnableWal()
614 {
615 auto [errCode, node] = Init();
616 return errCode;
617 }
618
Dump(bool isWriter,const char * header)619 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
620 {
621 Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
622 container->Dump(header, transCount_ + isInTransaction_);
623 return E_OK;
624 }
625
ConnNode(std::shared_ptr<Conn> conn)626 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
627 {
628 }
629
GetConnect()630 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
631 {
632 tid_ = gettid();
633 time_ = steady_clock::now();
634 return connect_;
635 }
636
GetUsingTime() const637 int64_t ConnPool::ConnNode::GetUsingTime() const
638 {
639 auto time = steady_clock::now() - time_;
640 return duration_cast<milliseconds>(time).count();
641 }
642
Unused(int32_t count,bool timeout)643 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
644 {
645 if (connect_ == nullptr) {
646 return E_OK;
647 }
648
649 connect_->ClearCache();
650 int32_t errCode = E_INNER_WARNING;
651 if (count <= 0) {
652 errCode = connect_->TryCheckPoint(timeout);
653 }
654
655 time_ = steady_clock::now();
656 if (!connect_->IsWriter()) {
657 tid_ = 0;
658 }
659 return errCode;
660 }
661
IsWriter() const662 bool ConnPool::ConnNode::IsWriter() const
663 {
664 if (connect_ != nullptr) {
665 return connect_->IsWriter();
666 }
667 return false;
668 }
669
InitMembers(Creator creator,int32_t max,int32_t timeout,bool disable)670 void ConnPool::Container::InitMembers(Creator creator, int32_t max, int32_t timeout, bool disable)
671 {
672 std::unique_lock<decltype(mutex_)> lock(mutex_);
673 disable_ = disable;
674 max_ = max;
675 creator_ = creator;
676 timeout_ = std::chrono::seconds(timeout);
677 }
678
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)679 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(
680 Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)
681 {
682 std::shared_ptr<ConnNode> connNode = nullptr;
683 {
684 std::unique_lock<decltype(mutex_)> lock(mutex_);
685 disable_ = disable;
686 max_ = max;
687 creator_ = creator;
688 timeout_ = std::chrono::seconds(timeout);
689 for (int i = 0; i < max_; ++i) {
690 auto errCode = ExtendNode();
691 if (errCode != E_OK) {
692 nodes_.clear();
693 details_.clear();
694 return { errCode, nullptr };
695 }
696 }
697
698 if (acquire && count_ > 0) {
699 connNode = nodes_.back();
700 nodes_.pop_back();
701 count_--;
702 }
703 }
704 cond_.notify_all();
705 return { E_OK, connNode };
706 }
707
ConfigLocale(const std::string & locale)708 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
709 {
710 std::unique_lock<decltype(mutex_)> lock(mutex_);
711 for (auto it = details_.begin(); it != details_.end();) {
712 auto conn = it->lock();
713 if (conn == nullptr || conn->connect_ == nullptr) {
714 it = details_.erase(it);
715 continue;
716 }
717 int32_t errCode = conn->connect_->ConfigLocale(locale);
718 if (errCode != E_OK) {
719 LOG_ERROR("ConfigLocale failed %{public}d", errCode);
720 return errCode;
721 }
722 ++it;
723 }
724 return E_OK;
725 }
726
Acquire(std::chrono::milliseconds milliS)727 std::pair<int, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
728 {
729 std::unique_lock<decltype(mutex_)> lock(mutex_);
730 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
731 if (max_ == 0) {
732 return {E_ERROR, nullptr};
733 }
734 int errCode = E_OK;
735 auto waiter = [this, &errCode]() -> bool {
736 if (count_ > 0) {
737 return true;
738 }
739
740 if (disable_) {
741 return false;
742 }
743 errCode = ExtendNode();
744 return errCode == E_OK;
745 };
746 if (cond_.wait_for(lock, interval, waiter)) {
747 if (nodes_.empty()) {
748 LOG_ERROR("Nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
749 count_, max_, total_, left_, right_);
750 count_ = 0;
751 return {E_ERROR, nullptr};
752 }
753 auto node = nodes_.back();
754 nodes_.pop_back();
755 count_--;
756 return {E_OK, node};
757 }
758 return {errCode, nullptr};
759 }
760
ExtendNode()761 int32_t ConnPool::Container::ExtendNode()
762 {
763 if (creator_ == nullptr) {
764 return E_ERROR;
765 }
766 auto [errCode, conn] = creator_();
767 if (conn == nullptr) {
768 return errCode;
769 }
770 auto node = std::make_shared<ConnNode>(conn);
771 node->id_ = right_++;
772 conn->SetId(node->id_);
773 nodes_.push_back(node);
774 details_.push_back(node);
775 count_++;
776 total_++;
777 return E_OK;
778 }
779
AcquireAll(std::chrono::milliseconds milliS)780 std::pair<bool, std::list<std::shared_ptr<ConnPool::ConnNode>>> ConnPool::Container::AcquireAll(
781 std::chrono::milliseconds milliS)
782 {
783 std::list<std::shared_ptr<ConnNode>> nodes;
784 int32_t count = 0;
785 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
786 auto time = std::chrono::steady_clock::now() + interval;
787 std::unique_lock<decltype(mutex_)> lock(mutex_);
788 while (count < total_ && cond_.wait_until(lock, time, [this]() { return count_ > 0; })) {
789 nodes.merge(std::move(nodes_));
790 nodes_.clear();
791 count += count_;
792 count_ = 0;
793 }
794
795 if (count != total_) {
796 count_ = count;
797 nodes_ = std::move(nodes);
798 nodes.clear();
799 return {false, nodes};
800 }
801 auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
802 for (auto &node : nodes) {
803 if (node->connect_ == nullptr) {
804 continue;
805 }
806 if (node->connect_.use_count() != 1) {
807 return false;
808 }
809 }
810 return true;
811 };
812 bool failed = false;
813 while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
814 }
815 if (failed) {
816 count_ = count;
817 nodes_ = std::move(nodes);
818 nodes.clear();
819 }
820 return {!failed, nodes};
821 }
822
Disable()823 void ConnPool::Container::Disable()
824 {
825 disable_ = true;
826 cond_.notify_one();
827 }
828
Enable()829 void ConnPool::Container::Enable()
830 {
831 disable_ = false;
832 cond_.notify_one();
833 }
834
Release(std::shared_ptr<ConnNode> node)835 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
836 {
837 {
838 std::unique_lock<decltype(mutex_)> lock(mutex_);
839 if (node->id_ < left_ || node->id_ >= right_) {
840 return E_OK;
841 }
842 if (count_ == max_) {
843 total_ = total_ > count_ ? total_ - 1 : count_;
844 RelDetails(node);
845 } else {
846 nodes_.push_front(node);
847 count_++;
848 }
849 }
850 cond_.notify_one();
851 return E_OK;
852 }
853
ReleaseTrans(std::shared_ptr<ConnNode> node)854 int32_t ConnectionPool::Container::ReleaseTrans(std::shared_ptr<ConnNode> node)
855 {
856 {
857 std::unique_lock<decltype(mutex_)> lock(mutex_);
858 if (node->id_ < left_ || node->id_ >= right_) {
859 return E_OK;
860 }
861 if (node->IsRecyclable()) {
862 nodes_.push_back(node);
863 count_++;
864 } else {
865 total_--;
866 RelDetails(node);
867 }
868 }
869 cond_.notify_one();
870 return E_OK;
871 }
872
RelDetails(std::shared_ptr<ConnNode> node)873 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
874 {
875 for (auto it = details_.begin(); it != details_.end();) {
876 auto detailNode = it->lock();
877 if (detailNode == nullptr || detailNode->id_ == node->id_) {
878 it = details_.erase(it);
879 } else {
880 it++;
881 }
882 }
883 return E_OK;
884 }
885
CheckIntegrity(const std::string & dbPath)886 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
887 {
888 RdbStoreConfig config(config_);
889 config.SetPath(dbPath);
890 config.SetIntegrityCheck(IntegrityCheck::FULL);
891 config.SetHaMode(HAMode::SINGLE);
892 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
893 auto [ret, connection] = Connection::Create(config, true);
894 if (ret == E_OK) {
895 return true;
896 }
897 if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
898 break;
899 }
900 config.SetIter(ITER_V1);
901 }
902 return false;
903 }
904
Clear()905 int32_t ConnPool::Container::Clear()
906 {
907 std::list<std::shared_ptr<ConnNode>> nodes;
908 std::list<std::weak_ptr<ConnNode>> details;
909 {
910 std::unique_lock<decltype(mutex_)> lock(mutex_);
911 nodes = std::move(nodes_);
912 details = std::move(details_);
913 disable_ = true;
914 total_ = 0;
915 count_ = 0;
916 if (right_ > MAX_RIGHT) {
917 right_ = 0;
918 }
919 left_ = right_;
920 creator_ = nullptr;
921 }
922 nodes.clear();
923 details.clear();
924 return 0;
925 }
926
IsFull()927 bool ConnPool::Container::IsFull()
928 {
929 std::unique_lock<decltype(mutex_)> lock(mutex_);
930 return total_ == count_;
931 }
932
Empty()933 bool ConnPool::Container::Empty()
934 {
935 std::unique_lock<decltype(mutex_)> lock(mutex_);
936 return total_ == 0;
937 }
938
Dump(const char * header,int32_t count)939 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
940 {
941 std::string info;
942 std::string allInfo;
943 std::vector<std::shared_ptr<ConnNode>> details;
944 std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
945 std::to_string(total_) + "," + std::to_string(count_) + "]";
946 {
947 std::unique_lock<decltype(mutex_)> lock(mutex_);
948 details.reserve(details_.size());
949 for (auto &detail : details_) {
950 auto node = detail.lock();
951 if (node == nullptr) {
952 continue;
953 }
954 details.push_back(node);
955 }
956 }
957
958 for (auto &node : details) {
959 info.append("<")
960 .append(std::to_string(node->id_))
961 .append(",")
962 .append(std::to_string(node->tid_))
963 .append(",")
964 .append(std::to_string(node->GetUsingTime()))
965 .append(">");
966 // 256 represent that limit to info length
967 if (info.size() > 256) {
968 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
969 allInfo.append(header).append(": ").append(title).append(std::move(info)).append("\n");
970 info.clear();
971 }
972 }
973 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
974 allInfo.append(header).append(": ").append(title).append(std::move(info));
975 Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_DUMP_INFO, BUNDLE_NAME_COMMON, allInfo));
976 return 0;
977 }
978
ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)979 int32_t ConnectionPool::Container::ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)
980 {
981 std::unique_lock<decltype(mutex_)> lock(mutex_);
982 int transCount = total_;
983 for (auto it = nodes_.begin(); it != nodes_.end();) {
984 auto unusedDuration = std::chrono::steady_clock::now() - (*it)->time_;
985 if (unusedDuration < TRANS_CLEAR_INTERVAL) {
986 it++;
987 continue;
988 }
989 RelDetails(*it);
990 it = nodes_.erase(it);
991 total_--;
992 count_--;
993 }
994 if (total_ != 0) {
995 pool->DelayClearTrans();
996 }
997 LOG_INFO("%{public}d have been cleaned up, and there are %{public}d remaining to be cleaned up",
998 transCount - total_, total_);
999 return E_OK;
1000 }
1001
IsRecyclable()1002 bool ConnPool::ConnNode::IsRecyclable()
1003 {
1004 return connect_->IsRecyclable();
1005 }
1006 } // namespace NativeRdb
1007 } // namespace OHOS
1008