1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "ConnectionPool"
17 #include "connection_pool.h"
18
19 #include <base_transaction.h>
20
21 #include <condition_variable>
22 #include <iterator>
23 #include <mutex>
24 #include <sstream>
25 #include <vector>
26
27 #include "connection.h"
28 #include "logger.h"
29 #include "rdb_common.h"
30 #include "rdb_errno.h"
31 #include "rdb_fault_hiview_reporter.h"
32 #include "rdb_sql_statistic.h"
33 #include "sqlite_global_config.h"
34 #include "sqlite_utils.h"
35 #include "task_executor.h"
36
37 namespace OHOS {
38 namespace NativeRdb {
39 using namespace OHOS::Rdb;
40 using namespace std::chrono;
41 using Conn = Connection;
42 using ConnPool = ConnectionPool;
43 using SharedConn = std::shared_ptr<Connection>;
44 using SharedConns = std::vector<std::shared_ptr<Connection>>;
45 using SqlStatistic = DistributedRdb::SqlStatistic;
46 using Reportor = RdbFaultHiViewReporter;
47 constexpr int32_t TRANSACTION_TIMEOUT(2);
48
Create(const RdbStoreConfig & config,int & errCode)49 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
50 {
51 std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
52 if (pool == nullptr) {
53 LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
54 errCode = E_ERROR;
55 return nullptr;
56 }
57 std::shared_ptr<Connection> conn;
58 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
59 std::tie(errCode, conn) = pool->Init();
60 if (errCode != E_SQLITE_CORRUPT) {
61 break;
62 }
63 config.SetIter(ITER_V1);
64 }
65 std::string dbPath;
66 (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
67 LOG_INFO("code:%{public}d app[%{public}s:%{public}s] path[%{public}s] "
68 "cfg[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
69 "%{public}s",
70 errCode, config.GetBundleName().c_str(), config.GetModuleName().c_str(),
71 SqliteUtils::Anonymous(dbPath).c_str(), config.GetDBType(), config.GetHaMode(), config.IsEncrypt(),
72 config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(), config.IsReadOnly(),
73 SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config),
74 SqliteUtils::Anonymous(config.GetName())).c_str());
75 return errCode == E_OK ? pool : nullptr;
76 }
77
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)78 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption(
79 const RdbStoreConfig &storeConfig, int &errCode)
80 {
81 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
82 auto &[rebuiltType, pool] = result;
83
84 int repairErrCode = Connection::Repair(storeConfig);
85 if (repairErrCode == E_OK) {
86 rebuiltType = RebuiltType::REPAIRED;
87 } else if (storeConfig.GetAllowRebuild()) {
88 Connection::Delete(storeConfig);
89 rebuiltType = RebuiltType::REBUILT;
90 } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
91 return result;
92 } else {
93 errCode = E_SQLITE_CORRUPT;
94 return result;
95 }
96 pool = Create(storeConfig, errCode);
97 if (errCode != E_OK) {
98 LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d errno %{public}d",
99 static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
100 storeConfig.IsEncrypt(), errCode, errno);
101 } else {
102 Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild", false), false);
103 }
104
105 return result;
106 }
107
ConnectionPool(const RdbStoreConfig & storeConfig)108 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
109 : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
110 transactionUsed_(false)
111 {
112 attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
113 trans_.right_ = Container::MIN_TRANS_ID;
114 trans_.left_ = trans_.right_;
115 }
116
Init(bool isAttach,bool needWriter)117 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
118 {
119 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
120 std::pair<int32_t, std::shared_ptr<Connection>> result;
121 auto &[errCode, conn] = result;
122 errCode = config.Initialize();
123 if (errCode != E_OK) {
124 return result;
125 }
126
127 if ((config.GetRoleType() == OWNER || config.GetRoleType() == VISITOR_WRITE) && !config.IsReadOnly()) {
128 // write connect count is 1
129 std::shared_ptr<ConnPool::ConnNode> node;
130 auto create = [this, isAttach]() {
131 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
132 return Connection::Create(config, true);
133 };
134 std::tie(errCode, node) = writers_.Initialize(create, 1, config.GetWriteTime(), true, needWriter);
135 conn = Convert2AutoConn(node);
136 if (errCode != E_OK) {
137 return result;
138 }
139 trans_.InitMembers(create, MAX_TRANS, 0, false);
140 }
141 isAttach_ = isAttach;
142 maxReader_ = GetMaxReaders(config);
143 // max read connect count is 64
144 if (maxReader_ > 64) {
145 return { E_ARGS_READ_CON_OVERLOAD, nullptr };
146 }
147 auto [ret, node] = readers_.Initialize(
148 [this, isAttach]() {
149 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
150 return Connection::Create(config, false);
151 },
152 maxReader_, config.GetReadTime(), maxReader_ == 0);
153 errCode = ret;
154 return result;
155 }
156
~ConnectionPool()157 ConnPool::~ConnectionPool()
158 {
159 CloseAllConnections();
160 }
161
GetMaxReaders(const RdbStoreConfig & config)162 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
163 {
164 if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
165 config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
166 return config.GetReadConSize();
167 } else {
168 return 0;
169 }
170 }
171
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)172 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
173 {
174 if (node == nullptr) {
175 return nullptr;
176 }
177
178 auto conn = node->GetConnect();
179 if (conn == nullptr) {
180 return nullptr;
181 }
182 conn->VerifyAndRegisterHook(config_);
183 if (isTrans) {
184 transCount_++;
185 }
186 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
187 auto realPool = pool.lock();
188 if (realPool == nullptr) {
189 return;
190 }
191 realPool->ReleaseNode(node, isTrans);
192 if (isTrans) {
193 realPool->transCount_--;
194 }
195 node = nullptr;
196 });
197 }
198
DelayClearTrans()199 void ConnPool::DelayClearTrans()
200 {
201 auto pool = TaskExecutor::GetInstance().GetExecutor();
202 if (pool == nullptr) {
203 LOG_ERROR("pool is nullptr.");
204 return;
205 }
206 pool->Schedule(TRANS_CLEAR_INTERVAL, [pool = weak_from_this()]() {
207 auto realPool = pool.lock();
208 if (realPool == nullptr) {
209 return;
210 }
211 realPool->trans_.ClearUnusedTrans(realPool);
212 });
213 }
214
CloseAllConnections()215 void ConnPool::CloseAllConnections()
216 {
217 writers_.Clear();
218 readers_.Clear();
219 trans_.Clear();
220 }
221
IsInTransaction()222 bool ConnPool::IsInTransaction()
223 {
224 return isInTransaction_.load();
225 }
226
SetInTransaction(bool isInTransaction)227 void ConnPool::SetInTransaction(bool isInTransaction)
228 {
229 isInTransaction_.store(isInTransaction);
230 }
231
CreateTransConn(bool limited)232 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
233 {
234 if (transCount_.load() >= MAX_TRANS && limited) {
235 trans_.Dump("NO TRANS", transCount_ + isInTransaction_);
236 writers_.Dump("NO TRANS WRITE", transCount_ + isInTransaction_);
237 return { E_DATABASE_BUSY, nullptr };
238 }
239 if (trans_.Empty()) {
240 DelayClearTrans();
241 }
242 auto [errCode, node] = trans_.Acquire(INVALID_TIME);
243 return { errCode, Convert2AutoConn(node, true) };
244 }
245
AcquireConnection(bool isReadOnly)246 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
247 {
248 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
249 return Acquire(isReadOnly);
250 }
251
AcquireAll(int32_t time)252 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
253 {
254 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
255 using namespace std::chrono;
256 std::pair<SharedConn, SharedConns> result;
257 auto &[writer, readers] = result;
258 auto interval = duration_cast<milliseconds>(seconds(time));
259 auto start = steady_clock::now();
260 auto [res, writerNodes] = writers_.AcquireAll(interval);
261 if (!res) {
262 return {};
263 }
264 writer = Convert2AutoConn(writerNodes.front());
265
266 auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
267 if (writer == nullptr || usedTime >= interval) {
268 return {};
269 }
270
271 if (maxReader_ == 0) {
272 return result;
273 }
274
275 readers_.Disable();
276 std::list<std::shared_ptr<ConnPool::ConnNode>> nodes;
277 std::tie(res, nodes) = readers_.AcquireAll(interval - usedTime);
278 if (!res) {
279 readers_.Enable();
280 return {};
281 }
282
283 for (auto node : nodes) {
284 auto conn = Convert2AutoConn(node);
285 if (conn == nullptr) {
286 continue;
287 }
288 readers.push_back(conn);
289 }
290 usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
291 if (usedTime >= interval) {
292 return {};
293 }
294 trans_.Disable();
295 std::list<std::shared_ptr<ConnPool::ConnNode>> trans;
296 std::tie(res, trans) = trans_.AcquireAll(interval - usedTime);
297 if (!res) {
298 trans_.Enable();
299 return {};
300 }
301 return result;
302 }
303
Acquire(bool isReadOnly,std::chrono::milliseconds ms)304 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
305 {
306 Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
307 auto [errCode, node] = container->Acquire(ms);
308 if (errCode != E_OK || node == nullptr) {
309 const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
310 container->Dump(header, transCount_ + isInTransaction_);
311 return nullptr;
312 }
313 return Convert2AutoConn(node);
314 }
315
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)316 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
317 {
318 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
319 if (maxReader_ != 0) {
320 return Acquire(isReadOnly, ms);
321 }
322 auto [errCode, node] = writers_.Acquire(ms);
323 if (errCode != E_OK || node == nullptr) {
324 writers_.Dump("writers_", transCount_ + isInTransaction_);
325 return nullptr;
326 }
327 auto conn = node->connect_;
328 writers_.Release(node);
329 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
330 auto realPool = pool.lock();
331 if (realPool == nullptr) {
332 return;
333 }
334 realPool->writers_.cond_.notify_all();
335 });
336 }
337
ReleaseNode(std::shared_ptr<ConnNode> node,bool isTrans)338 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool isTrans)
339 {
340 if (node == nullptr) {
341 return;
342 }
343
344 auto now = steady_clock::now();
345 auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
346 failedTime_.load() == steady_clock::time_point();
347 auto transCount = transCount_ + isInTransaction_;
348 auto remainCount = isTrans ? transCount - 1 : transCount;
349 auto errCode = node->Unused(remainCount, timeout);
350 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
351 writers_.Dump("WAL writers_", transCount);
352 trans_.Dump("WAL trans_", transCount);
353 readers_.Dump("WAL readers_", transCount);
354 }
355
356 if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
357 failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
358 }
359
360 if (isTrans) {
361 trans_.ReleaseTrans(node);
362 } else {
363 auto &container = node->IsWriter() ? writers_ : readers_;
364 container.Release(node);
365 }
366 }
367
AcquireTransaction()368 int ConnPool::AcquireTransaction()
369 {
370 std::unique_lock<std::mutex> lock(transMutex_);
371 if (transCondition_.wait_for(
372 lock, std::chrono::seconds(TRANSACTION_TIMEOUT),
373 [this] { return !transactionUsed_; })) {
374 transactionUsed_ = true;
375 return E_OK;
376 }
377 LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
378 return E_DATABASE_BUSY;
379 }
380
ReleaseTransaction()381 void ConnPool::ReleaseTransaction()
382 {
383 {
384 std::unique_lock<std::mutex> lock(transMutex_);
385 transactionUsed_ = false;
386 }
387 transCondition_.notify_one();
388 }
389
RestartConns()390 int ConnPool::RestartConns()
391 {
392 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
393 readers_.Clear();
394 auto [errCode, node] = readers_.Initialize(
395 [this]() {
396 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
397 return Connection::Create(config, false);
398 },
399 maxReader_, config.GetReadTime(), maxReader_ == 0);
400 trans_.Clear();
401 trans_.InitMembers(
402 [this]() {
403 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
404 return Connection::Create(config, true);
405 },
406 MAX_TRANS, 0, false);
407 return errCode;
408 }
409
410 /**
411 * The database locale.
412 */
ConfigLocale(const std::string & localeStr)413 int ConnPool::ConfigLocale(const std::string &localeStr)
414 {
415 auto errCode = readers_.ConfigLocale(localeStr);
416 if (errCode != E_OK) {
417 return errCode;
418 }
419 return writers_.ConfigLocale(localeStr);
420 }
421
422 /**
423 * Rename the backed up database.
424 */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,SlaveStatus & slaveStatus)425 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
426 const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)
427 {
428 if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
429 LOG_ERROR("Connection pool is busy now!");
430 return E_ERROR;
431 }
432 if (config_.GetDBType() == DB_VECTOR) {
433 CloseAllConnections();
434 auto [retVal, conn] = Connection::Create(config_, false);
435 if (retVal != E_OK) {
436 LOG_ERROR("Create connection fail, errCode:%{public}d", retVal);
437 return retVal;
438 }
439
440 retVal = conn->Restore(backupPath, newKey, slaveStatus);
441 if (retVal != E_OK) {
442 LOG_ERROR("Restore failed, errCode:0x%{public}x", retVal);
443 return retVal;
444 }
445
446 conn = nullptr;
447 auto initRes = Init();
448 if (initRes.first != E_OK) {
449 LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
450 return initRes.first;
451 }
452 return retVal;
453 }
454 return RestoreMasterDb(newPath, backupPath, slaveStatus);
455 }
456
RestoreMasterDb(const std::string & newPath,const std::string & backupPath,SlaveStatus & slaveStatus)457 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)
458 {
459 if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() == HAMode::MAIN_REPLICA) {
460 auto connection = AcquireConnection(false);
461 if (connection == nullptr) {
462 return E_DATABASE_BUSY;
463 }
464 return connection->Restore(backupPath, {}, slaveStatus);
465 }
466
467 CloseAllConnections();
468 int ret = Connection::Restore(config_, backupPath, newPath);
469 int32_t errCode = E_OK;
470 std::shared_ptr<Connection> pool;
471 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
472 std::tie(errCode, pool) = Init();
473 if (errCode == E_OK) {
474 break;
475 }
476 if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
477 break;
478 }
479 config_.SetIter(ITER_V1);
480 }
481 if (errCode != E_OK) {
482 CloseAllConnections();
483 Connection::Delete(config_);
484 std::tie(errCode, pool) = Init();
485 LOG_WARN("Restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
486 SqliteUtils::Anonymous(backupPath).c_str());
487 }
488 return ret == E_OK ? errCode : ret;
489 }
490
GetTransactionStack()491 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
492 {
493 return transactionStack_;
494 }
495
GetTransactionStackMutex()496 std::mutex &ConnPool::GetTransactionStackMutex()
497 {
498 return transactionStackMutex_;
499 }
500
DisableWal()501 std::pair<int32_t, std::shared_ptr<Conn>> ConnPool::DisableWal()
502 {
503 return Init(true, true);
504 }
505
EnableWal()506 int ConnPool::EnableWal()
507 {
508 auto [errCode, node] = Init();
509 return errCode;
510 }
511
Dump(bool isWriter,const char * header)512 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
513 {
514 Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
515 container->Dump(header, transCount_ + isInTransaction_);
516 return E_OK;
517 }
518
ConnNode(std::shared_ptr<Conn> conn)519 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
520 {
521 }
522
GetConnect()523 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
524 {
525 tid_ = gettid();
526 time_ = steady_clock::now();
527 return connect_;
528 }
529
GetUsingTime() const530 int64_t ConnPool::ConnNode::GetUsingTime() const
531 {
532 auto time = steady_clock::now() - time_;
533 return duration_cast<milliseconds>(time).count();
534 }
535
Unused(int32_t count,bool timeout)536 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
537 {
538 if (connect_ == nullptr) {
539 return E_OK;
540 }
541
542 connect_->ClearCache();
543 int32_t errCode = E_INNER_WARNING;
544 if (count <= 0) {
545 errCode = connect_->TryCheckPoint(timeout);
546 }
547
548 time_ = steady_clock::now();
549 if (!connect_->IsWriter()) {
550 tid_ = 0;
551 }
552 return errCode;
553 }
554
IsWriter() const555 bool ConnPool::ConnNode::IsWriter() const
556 {
557 if (connect_ != nullptr) {
558 return connect_->IsWriter();
559 }
560 return false;
561 }
562
InitMembers(Creator creator,int32_t max,int32_t timeout,bool disable)563 void ConnPool::Container::InitMembers(Creator creator, int32_t max, int32_t timeout, bool disable)
564 {
565 std::unique_lock<decltype(mutex_)> lock(mutex_);
566 disable_ = disable;
567 max_ = max;
568 creator_ = creator;
569 timeout_ = std::chrono::seconds(timeout);
570 }
571
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)572 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(
573 Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)
574 {
575 InitMembers(creator, max, timeout, disable);
576 std::shared_ptr<ConnNode> connNode = nullptr;
577 {
578 std::unique_lock<decltype(mutex_)> lock(mutex_);
579 for (int i = 0; i < max_; ++i) {
580 auto errCode = ExtendNode();
581 if (errCode != E_OK) {
582 nodes_.clear();
583 details_.clear();
584 return { errCode, nullptr };
585 }
586 }
587
588 if (acquire && count_ > 0) {
589 connNode = nodes_.back();
590 nodes_.pop_back();
591 count_--;
592 }
593 }
594 cond_.notify_all();
595 return { E_OK, connNode };
596 }
597
ConfigLocale(const std::string & locale)598 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
599 {
600 std::unique_lock<decltype(mutex_)> lock(mutex_);
601 if (total_ != count_) {
602 return E_DATABASE_BUSY;
603 }
604 for (auto it = details_.begin(); it != details_.end();) {
605 auto conn = it->lock();
606 if (conn == nullptr || conn->connect_ == nullptr) {
607 it = details_.erase(it);
608 continue;
609 }
610 conn->connect_->ConfigLocale(locale);
611 }
612 return E_OK;
613 }
614
Acquire(std::chrono::milliseconds milliS)615 std::pair<int, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
616 {
617 std::unique_lock<decltype(mutex_)> lock(mutex_);
618 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
619 if (max_ == 0) {
620 return {E_ERROR, nullptr};
621 }
622 int errCode = E_OK;
623 auto waiter = [this, &errCode]() -> bool {
624 if (count_ > 0) {
625 return true;
626 }
627
628 if (disable_) {
629 return false;
630 }
631 errCode = ExtendNode();
632 return errCode == E_OK;
633 };
634 if (cond_.wait_for(lock, interval, waiter)) {
635 if (nodes_.empty()) {
636 LOG_ERROR("Nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
637 count_, max_, total_, left_, right_);
638 count_ = 0;
639 return {E_ERROR, nullptr};
640 }
641 auto node = nodes_.back();
642 nodes_.pop_back();
643 count_--;
644 return {E_OK, node};
645 }
646 return {errCode, nullptr};
647 }
648
ExtendNode()649 int32_t ConnPool::Container::ExtendNode()
650 {
651 if (creator_ == nullptr) {
652 return E_ERROR;
653 }
654 auto [errCode, conn] = creator_();
655 if (conn == nullptr) {
656 return errCode;
657 }
658 auto node = std::make_shared<ConnNode>(conn);
659 node->id_ = right_++;
660 conn->SetId(node->id_);
661 nodes_.push_back(node);
662 details_.push_back(node);
663 count_++;
664 total_++;
665 return E_OK;
666 }
667
AcquireAll(std::chrono::milliseconds milliS)668 std::pair<bool, std::list<std::shared_ptr<ConnPool::ConnNode>>> ConnPool::Container::AcquireAll(
669 std::chrono::milliseconds milliS)
670 {
671 std::list<std::shared_ptr<ConnNode>> nodes;
672 int32_t count = 0;
673 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
674 auto time = std::chrono::steady_clock::now() + interval;
675 std::unique_lock<decltype(mutex_)> lock(mutex_);
676 while (count < total_ && cond_.wait_until(lock, time, [this]() { return count_ > 0; })) {
677 nodes.merge(std::move(nodes_));
678 nodes_.clear();
679 count += count_;
680 count_ = 0;
681 }
682
683 if (count != total_) {
684 count_ = count;
685 nodes_ = std::move(nodes);
686 nodes.clear();
687 return {false, nodes};
688 }
689 auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
690 for (auto &node : nodes) {
691 if (node->connect_ == nullptr) {
692 continue;
693 }
694 if (node->connect_.use_count() != 1) {
695 return false;
696 }
697 }
698 return true;
699 };
700 bool failed = false;
701 while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
702 }
703 if (failed) {
704 count_ = count;
705 nodes_ = std::move(nodes);
706 nodes.clear();
707 }
708 return {!failed, nodes};
709 }
710
Disable()711 void ConnPool::Container::Disable()
712 {
713 disable_ = true;
714 cond_.notify_one();
715 }
716
Enable()717 void ConnPool::Container::Enable()
718 {
719 disable_ = false;
720 cond_.notify_one();
721 }
722
Release(std::shared_ptr<ConnNode> node)723 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
724 {
725 {
726 std::unique_lock<decltype(mutex_)> lock(mutex_);
727 if (node->id_ < left_ || node->id_ >= right_) {
728 return E_OK;
729 }
730 if (count_ == max_) {
731 total_ = total_ > count_ ? total_ - 1 : count_;
732 RelDetails(node);
733 } else {
734 nodes_.push_front(node);
735 count_++;
736 }
737 }
738 cond_.notify_one();
739 return E_OK;
740 }
741
ReleaseTrans(std::shared_ptr<ConnNode> node)742 int32_t ConnectionPool::Container::ReleaseTrans(std::shared_ptr<ConnNode> node)
743 {
744 {
745 std::unique_lock<decltype(mutex_)> lock(mutex_);
746 if (node->id_ < left_ || node->id_ >= right_) {
747 return E_OK;
748 }
749 if (node->IsRecyclable()) {
750 nodes_.push_back(node);
751 count_++;
752 } else {
753 total_--;
754 RelDetails(node);
755 }
756 }
757 cond_.notify_one();
758 return E_OK;
759 }
760
RelDetails(std::shared_ptr<ConnNode> node)761 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
762 {
763 for (auto it = details_.begin(); it != details_.end();) {
764 auto detailNode = it->lock();
765 if (detailNode == nullptr || detailNode->id_ == node->id_) {
766 it = details_.erase(it);
767 } else {
768 it++;
769 }
770 }
771 return E_OK;
772 }
773
CheckIntegrity(const std::string & dbPath)774 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
775 {
776 RdbStoreConfig config(config_);
777 config.SetPath(dbPath);
778 config.SetIntegrityCheck(IntegrityCheck::FULL);
779 config.SetHaMode(HAMode::SINGLE);
780 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
781 auto [ret, connection] = Connection::Create(config, true);
782 if (ret == E_OK) {
783 return true;
784 }
785 if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
786 break;
787 }
788 config.SetIter(ITER_V1);
789 }
790 return false;
791 }
792
Clear()793 int32_t ConnPool::Container::Clear()
794 {
795 std::list<std::shared_ptr<ConnNode>> nodes;
796 std::list<std::weak_ptr<ConnNode>> details;
797 {
798 std::unique_lock<decltype(mutex_)> lock(mutex_);
799 nodes = std::move(nodes_);
800 details = std::move(details_);
801 disable_ = true;
802 total_ = 0;
803 count_ = 0;
804 if (right_ > MAX_RIGHT) {
805 right_ = 0;
806 }
807 left_ = right_;
808 creator_ = nullptr;
809 }
810 nodes.clear();
811 details.clear();
812 return 0;
813 }
814
IsFull()815 bool ConnPool::Container::IsFull()
816 {
817 std::unique_lock<decltype(mutex_)> lock(mutex_);
818 return total_ == count_;
819 }
820
Empty()821 bool ConnPool::Container::Empty()
822 {
823 std::unique_lock<decltype(mutex_)> lock(mutex_);
824 return total_ == 0;
825 }
826
Dump(const char * header,int32_t count)827 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
828 {
829 std::string info;
830 std::string allInfo;
831 std::vector<std::shared_ptr<ConnNode>> details;
832 std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
833 std::to_string(total_) + "," + std::to_string(count_) + "]";
834 {
835 std::unique_lock<decltype(mutex_)> lock(mutex_);
836 details.reserve(details_.size());
837 for (auto &detail : details_) {
838 auto node = detail.lock();
839 if (node == nullptr) {
840 continue;
841 }
842 details.push_back(node);
843 }
844 }
845
846 for (auto &node : details) {
847 info.append("<")
848 .append(std::to_string(node->id_))
849 .append(",")
850 .append(std::to_string(node->tid_))
851 .append(",")
852 .append(std::to_string(node->GetUsingTime()))
853 .append(">");
854 // 256 represent that limit to info length
855 if (info.size() > 256) {
856 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
857 allInfo.append(header).append(": ").append(title).append(std::move(info)).append("\n");
858 info.clear();
859 }
860 }
861 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
862 allInfo.append(header).append(": ").append(title).append(std::move(info));
863 Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_DUMP_INFO, BUNDLE_NAME_COMMON, allInfo));
864 return 0;
865 }
866
ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)867 int32_t ConnectionPool::Container::ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)
868 {
869 std::unique_lock<decltype(mutex_)> lock(mutex_);
870 int transCount = total_;
871 for (auto it = nodes_.begin(); it != nodes_.end();) {
872 auto unusedDuration = std::chrono::steady_clock::now() - (*it)->time_;
873 if (unusedDuration < TRANS_CLEAR_INTERVAL) {
874 it++;
875 continue;
876 }
877 RelDetails(*it);
878 it = nodes_.erase(it);
879 total_--;
880 count_--;
881 }
882 if (total_ != 0) {
883 pool->DelayClearTrans();
884 }
885 LOG_INFO("%{public}d have been cleaned up, and there are %{public}d remaining to be cleaned up",
886 transCount - total_, total_);
887 return E_OK;
888 }
889
IsRecyclable()890 bool ConnPool::ConnNode::IsRecyclable()
891 {
892 return connect_->IsRecyclable();
893 }
894 } // namespace NativeRdb
895 } // namespace OHOS
896