• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "ConnectionPool"
17 #include "connection_pool.h"
18 
19 #include <base_transaction.h>
20 
21 #include <condition_variable>
22 #include <iterator>
23 #include <mutex>
24 #include <sstream>
25 #include <vector>
26 
27 #include "connection.h"
28 #include "logger.h"
29 #include "rdb_common.h"
30 #include "rdb_errno.h"
31 #include "rdb_fault_hiview_reporter.h"
32 #include "rdb_sql_statistic.h"
33 #include "sqlite_global_config.h"
34 #include "sqlite_utils.h"
35 #include "task_executor.h"
36 
37 namespace OHOS {
38 namespace NativeRdb {
39 using namespace OHOS::Rdb;
40 using namespace std::chrono;
41 using Conn = Connection;
42 using ConnPool = ConnectionPool;
43 using SharedConn = std::shared_ptr<Connection>;
44 using SharedConns = std::vector<std::shared_ptr<Connection>>;
45 using SqlStatistic = DistributedRdb::SqlStatistic;
46 using Reportor = RdbFaultHiViewReporter;
47 constexpr int32_t TRANSACTION_TIMEOUT(2);
48 
Create(const RdbStoreConfig & config,int & errCode)49 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
50 {
51     std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
52     if (pool == nullptr) {
53         LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
54         errCode = E_ERROR;
55         return nullptr;
56     }
57     std::shared_ptr<Connection> conn;
58     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
59         std::tie(errCode, conn) = pool->Init();
60         if (errCode != E_SQLITE_CORRUPT) {
61             break;
62         }
63         config.SetIter(ITER_V1);
64     }
65     std::string dbPath;
66     (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
67     LOG_INFO("code:%{public}d app[%{public}s:%{public}s] path[%{public}s] "
68              "cfg[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
69              "%{public}s",
70         errCode, config.GetBundleName().c_str(), config.GetModuleName().c_str(),
71         SqliteUtils::Anonymous(dbPath).c_str(), config.GetDBType(), config.GetHaMode(), config.IsEncrypt(),
72         config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(), config.IsReadOnly(),
73         SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config),
74         SqliteUtils::Anonymous(config.GetName())).c_str());
75     return errCode == E_OK ? pool : nullptr;
76 }
77 
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)78 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption(
79     const RdbStoreConfig &storeConfig, int &errCode)
80 {
81     std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
82     auto &[rebuiltType, pool] = result;
83 
84     int repairErrCode = Connection::Repair(storeConfig);
85     if (repairErrCode == E_OK) {
86         rebuiltType = RebuiltType::REPAIRED;
87     } else if (storeConfig.GetAllowRebuild()) {
88         Connection::Delete(storeConfig);
89         rebuiltType = RebuiltType::REBUILT;
90     } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
91         return result;
92     } else {
93         errCode = E_SQLITE_CORRUPT;
94         return result;
95     }
96     pool = Create(storeConfig, errCode);
97     if (errCode != E_OK) {
98         LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d errno %{public}d",
99             static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
100             storeConfig.IsEncrypt(), errCode, errno);
101     } else {
102         Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild", false), false);
103     }
104 
105     return result;
106 }
107 
ConnectionPool(const RdbStoreConfig & storeConfig)108 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
109     : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
110       transactionUsed_(false)
111 {
112     attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
113     trans_.right_ = Container::MIN_TRANS_ID;
114     trans_.left_ = trans_.right_;
115 }
116 
Init(bool isAttach,bool needWriter)117 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
118 {
119     const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
120     std::pair<int32_t, std::shared_ptr<Connection>> result;
121     auto &[errCode, conn] = result;
122     errCode = config.Initialize();
123     if (errCode != E_OK) {
124         return result;
125     }
126 
127     if ((config.GetRoleType() == OWNER || config.GetRoleType() == VISITOR_WRITE) && !config.IsReadOnly()) {
128         // write connect count is 1
129         std::shared_ptr<ConnPool::ConnNode> node;
130         auto create = [this, isAttach]() {
131             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
132             return Connection::Create(config, true);
133         };
134         std::tie(errCode, node) = writers_.Initialize(create, 1, config.GetWriteTime(), true, needWriter);
135         conn = Convert2AutoConn(node);
136         if (errCode != E_OK) {
137             return result;
138         }
139         trans_.InitMembers(create, MAX_TRANS, 0, false);
140     }
141     isAttach_ = isAttach;
142     maxReader_ = GetMaxReaders(config);
143     // max read connect count is 64
144     if (maxReader_ > 64) {
145         return { E_ARGS_READ_CON_OVERLOAD, nullptr };
146     }
147     auto [ret, node] = readers_.Initialize(
148         [this, isAttach]() {
149             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
150             return Connection::Create(config, false);
151         },
152         maxReader_, config.GetReadTime(), maxReader_ == 0);
153     errCode = ret;
154     return result;
155 }
156 
~ConnectionPool()157 ConnPool::~ConnectionPool()
158 {
159     CloseAllConnections();
160 }
161 
GetMaxReaders(const RdbStoreConfig & config)162 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
163 {
164     if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
165         config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
166         return config.GetReadConSize();
167     } else {
168         return 0;
169     }
170 }
171 
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)172 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
173 {
174     if (node == nullptr) {
175         return nullptr;
176     }
177 
178     auto conn = node->GetConnect();
179     if (conn == nullptr) {
180         return nullptr;
181     }
182     conn->VerifyAndRegisterHook(config_);
183     if (isTrans) {
184         transCount_++;
185     }
186     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
187         auto realPool = pool.lock();
188         if (realPool == nullptr) {
189             return;
190         }
191         realPool->ReleaseNode(node, isTrans);
192         if (isTrans) {
193             realPool->transCount_--;
194         }
195         node = nullptr;
196     });
197 }
198 
DelayClearTrans()199 void ConnPool::DelayClearTrans()
200 {
201     auto pool = TaskExecutor::GetInstance().GetExecutor();
202     if (pool == nullptr) {
203         LOG_ERROR("pool is nullptr.");
204         return;
205     }
206     pool->Schedule(TRANS_CLEAR_INTERVAL, [pool = weak_from_this()]() {
207         auto realPool = pool.lock();
208         if (realPool == nullptr) {
209             return;
210         }
211         realPool->trans_.ClearUnusedTrans(realPool);
212     });
213 }
214 
CloseAllConnections()215 void ConnPool::CloseAllConnections()
216 {
217     writers_.Clear();
218     readers_.Clear();
219     trans_.Clear();
220 }
221 
IsInTransaction()222 bool ConnPool::IsInTransaction()
223 {
224     return isInTransaction_.load();
225 }
226 
SetInTransaction(bool isInTransaction)227 void ConnPool::SetInTransaction(bool isInTransaction)
228 {
229     isInTransaction_.store(isInTransaction);
230 }
231 
CreateTransConn(bool limited)232 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
233 {
234     if (transCount_.load() >= MAX_TRANS && limited) {
235         trans_.Dump("NO TRANS", transCount_ + isInTransaction_);
236         writers_.Dump("NO TRANS WRITE", transCount_ + isInTransaction_);
237         return { E_DATABASE_BUSY, nullptr };
238     }
239     if (trans_.Empty()) {
240         DelayClearTrans();
241     }
242     auto [errCode, node] = trans_.Acquire(INVALID_TIME);
243     return { errCode, Convert2AutoConn(node, true) };
244 }
245 
AcquireConnection(bool isReadOnly)246 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
247 {
248     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
249     return Acquire(isReadOnly);
250 }
251 
AcquireAll(int32_t time)252 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
253 {
254     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
255     using namespace std::chrono;
256     std::pair<SharedConn, SharedConns> result;
257     auto &[writer, readers] = result;
258     auto interval = duration_cast<milliseconds>(seconds(time));
259     auto start = steady_clock::now();
260     auto [res, writerNodes] = writers_.AcquireAll(interval);
261     if (!res) {
262         return {};
263     }
264     writer = Convert2AutoConn(writerNodes.front());
265 
266     auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
267     if (writer == nullptr || usedTime >= interval) {
268         return {};
269     }
270 
271     if (maxReader_ == 0) {
272         return result;
273     }
274 
275     readers_.Disable();
276     std::list<std::shared_ptr<ConnPool::ConnNode>> nodes;
277     std::tie(res, nodes) = readers_.AcquireAll(interval - usedTime);
278     if (!res) {
279         readers_.Enable();
280         return {};
281     }
282 
283     for (auto node : nodes) {
284         auto conn = Convert2AutoConn(node);
285         if (conn == nullptr) {
286             continue;
287         }
288         readers.push_back(conn);
289     }
290     usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
291     if (usedTime >= interval) {
292         return {};
293     }
294     trans_.Disable();
295     std::list<std::shared_ptr<ConnPool::ConnNode>> trans;
296     std::tie(res, trans) = trans_.AcquireAll(interval - usedTime);
297     if (!res) {
298         trans_.Enable();
299         return {};
300     }
301     return result;
302 }
303 
Acquire(bool isReadOnly,std::chrono::milliseconds ms)304 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
305 {
306     Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
307     auto [errCode, node] = container->Acquire(ms);
308     if (errCode != E_OK || node == nullptr) {
309         const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
310         container->Dump(header, transCount_ + isInTransaction_);
311         return nullptr;
312     }
313     return Convert2AutoConn(node);
314 }
315 
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)316 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
317 {
318     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
319     if (maxReader_ != 0) {
320         return Acquire(isReadOnly, ms);
321     }
322     auto [errCode, node] = writers_.Acquire(ms);
323     if (errCode != E_OK || node == nullptr) {
324         writers_.Dump("writers_", transCount_ + isInTransaction_);
325         return nullptr;
326     }
327     auto conn = node->connect_;
328     writers_.Release(node);
329     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
330         auto realPool = pool.lock();
331         if (realPool == nullptr) {
332             return;
333         }
334         realPool->writers_.cond_.notify_all();
335     });
336 }
337 
ReleaseNode(std::shared_ptr<ConnNode> node,bool isTrans)338 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool isTrans)
339 {
340     if (node == nullptr) {
341         return;
342     }
343 
344     auto now = steady_clock::now();
345     auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
346                    failedTime_.load() == steady_clock::time_point();
347     auto transCount = transCount_ + isInTransaction_;
348     auto remainCount = isTrans ? transCount - 1 : transCount;
349     auto errCode = node->Unused(remainCount, timeout);
350     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
351         writers_.Dump("WAL writers_", transCount);
352         trans_.Dump("WAL trans_", transCount);
353         readers_.Dump("WAL readers_", transCount);
354     }
355 
356     if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
357         failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
358     }
359 
360     if (isTrans) {
361         trans_.ReleaseTrans(node);
362     } else {
363         auto &container = node->IsWriter() ? writers_ : readers_;
364         container.Release(node);
365     }
366 }
367 
AcquireTransaction()368 int ConnPool::AcquireTransaction()
369 {
370     std::unique_lock<std::mutex> lock(transMutex_);
371     if (transCondition_.wait_for(
372         lock, std::chrono::seconds(TRANSACTION_TIMEOUT),
373         [this] { return !transactionUsed_; })) {
374         transactionUsed_ = true;
375         return E_OK;
376     }
377     LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
378     return E_DATABASE_BUSY;
379 }
380 
ReleaseTransaction()381 void ConnPool::ReleaseTransaction()
382 {
383     {
384         std::unique_lock<std::mutex> lock(transMutex_);
385         transactionUsed_ = false;
386     }
387     transCondition_.notify_one();
388 }
389 
RestartConns()390 int ConnPool::RestartConns()
391 {
392     const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
393     readers_.Clear();
394     auto [errCode, node] = readers_.Initialize(
395         [this]() {
396             const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
397             return Connection::Create(config, false);
398         },
399         maxReader_, config.GetReadTime(), maxReader_ == 0);
400     trans_.Clear();
401     trans_.InitMembers(
402         [this]() {
403             const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
404             return Connection::Create(config, true);
405         },
406         MAX_TRANS, 0, false);
407     return errCode;
408 }
409 
410 /**
411  * The database locale.
412  */
ConfigLocale(const std::string & localeStr)413 int ConnPool::ConfigLocale(const std::string &localeStr)
414 {
415     auto errCode = readers_.ConfigLocale(localeStr);
416     if (errCode != E_OK) {
417         return errCode;
418     }
419     return writers_.ConfigLocale(localeStr);
420 }
421 
422 /**
423  * Rename the backed up database.
424  */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,SlaveStatus & slaveStatus)425 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
426     const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)
427 {
428     if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
429         LOG_ERROR("Connection pool is busy now!");
430         return E_ERROR;
431     }
432     if (config_.GetDBType() == DB_VECTOR) {
433         CloseAllConnections();
434         auto [retVal, conn] = Connection::Create(config_, false);
435         if (retVal != E_OK) {
436             LOG_ERROR("Create connection fail, errCode:%{public}d", retVal);
437             return retVal;
438         }
439 
440         retVal = conn->Restore(backupPath, newKey, slaveStatus);
441         if (retVal != E_OK) {
442             LOG_ERROR("Restore failed, errCode:0x%{public}x", retVal);
443             return retVal;
444         }
445 
446         conn = nullptr;
447         auto initRes = Init();
448         if (initRes.first != E_OK) {
449             LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
450             return initRes.first;
451         }
452         return retVal;
453     }
454     return RestoreMasterDb(newPath, backupPath, slaveStatus);
455 }
456 
RestoreMasterDb(const std::string & newPath,const std::string & backupPath,SlaveStatus & slaveStatus)457 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)
458 {
459     if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() == HAMode::MAIN_REPLICA) {
460         auto connection = AcquireConnection(false);
461         if (connection == nullptr) {
462             return E_DATABASE_BUSY;
463         }
464         return connection->Restore(backupPath, {}, slaveStatus);
465     }
466 
467     CloseAllConnections();
468     int ret = Connection::Restore(config_, backupPath, newPath);
469     int32_t errCode = E_OK;
470     std::shared_ptr<Connection> pool;
471     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
472         std::tie(errCode, pool) = Init();
473         if (errCode == E_OK) {
474             break;
475         }
476         if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
477             break;
478         }
479         config_.SetIter(ITER_V1);
480     }
481     if (errCode != E_OK) {
482         CloseAllConnections();
483         Connection::Delete(config_);
484         std::tie(errCode, pool) = Init();
485         LOG_WARN("Restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
486             SqliteUtils::Anonymous(backupPath).c_str());
487     }
488     return ret == E_OK ? errCode : ret;
489 }
490 
GetTransactionStack()491 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
492 {
493     return transactionStack_;
494 }
495 
GetTransactionStackMutex()496 std::mutex &ConnPool::GetTransactionStackMutex()
497 {
498     return transactionStackMutex_;
499 }
500 
DisableWal()501 std::pair<int32_t, std::shared_ptr<Conn>> ConnPool::DisableWal()
502 {
503     return Init(true, true);
504 }
505 
EnableWal()506 int ConnPool::EnableWal()
507 {
508     auto [errCode, node] = Init();
509     return errCode;
510 }
511 
Dump(bool isWriter,const char * header)512 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
513 {
514     Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
515     container->Dump(header, transCount_ + isInTransaction_);
516     return E_OK;
517 }
518 
ConnNode(std::shared_ptr<Conn> conn)519 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
520 {
521 }
522 
GetConnect()523 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
524 {
525     tid_ = gettid();
526     time_ = steady_clock::now();
527     return connect_;
528 }
529 
GetUsingTime() const530 int64_t ConnPool::ConnNode::GetUsingTime() const
531 {
532     auto time = steady_clock::now() - time_;
533     return duration_cast<milliseconds>(time).count();
534 }
535 
Unused(int32_t count,bool timeout)536 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
537 {
538     if (connect_ == nullptr) {
539         return E_OK;
540     }
541 
542     connect_->ClearCache();
543     int32_t errCode = E_INNER_WARNING;
544     if (count <= 0) {
545         errCode = connect_->TryCheckPoint(timeout);
546     }
547 
548     time_ = steady_clock::now();
549     if (!connect_->IsWriter()) {
550         tid_ = 0;
551     }
552     return errCode;
553 }
554 
IsWriter() const555 bool ConnPool::ConnNode::IsWriter() const
556 {
557     if (connect_ != nullptr) {
558         return connect_->IsWriter();
559     }
560     return false;
561 }
562 
InitMembers(Creator creator,int32_t max,int32_t timeout,bool disable)563 void ConnPool::Container::InitMembers(Creator creator, int32_t max, int32_t timeout, bool disable)
564 {
565     std::unique_lock<decltype(mutex_)> lock(mutex_);
566     disable_ = disable;
567     max_ = max;
568     creator_ = creator;
569     timeout_ = std::chrono::seconds(timeout);
570 }
571 
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)572 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(
573     Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)
574 {
575     InitMembers(creator, max, timeout, disable);
576     std::shared_ptr<ConnNode> connNode = nullptr;
577     {
578         std::unique_lock<decltype(mutex_)> lock(mutex_);
579         for (int i = 0; i < max_; ++i) {
580             auto errCode = ExtendNode();
581             if (errCode != E_OK) {
582                 nodes_.clear();
583                 details_.clear();
584                 return { errCode, nullptr };
585             }
586         }
587 
588         if (acquire && count_ > 0) {
589             connNode = nodes_.back();
590             nodes_.pop_back();
591             count_--;
592         }
593     }
594     cond_.notify_all();
595     return { E_OK, connNode };
596 }
597 
ConfigLocale(const std::string & locale)598 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
599 {
600     std::unique_lock<decltype(mutex_)> lock(mutex_);
601     if (total_ != count_) {
602         return E_DATABASE_BUSY;
603     }
604     for (auto it = details_.begin(); it != details_.end();) {
605         auto conn = it->lock();
606         if (conn == nullptr || conn->connect_ == nullptr) {
607             it = details_.erase(it);
608             continue;
609         }
610         conn->connect_->ConfigLocale(locale);
611     }
612     return E_OK;
613 }
614 
Acquire(std::chrono::milliseconds milliS)615 std::pair<int, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
616 {
617     std::unique_lock<decltype(mutex_)> lock(mutex_);
618     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
619     if (max_ == 0) {
620         return {E_ERROR, nullptr};
621     }
622     int errCode = E_OK;
623     auto waiter = [this, &errCode]() -> bool {
624         if (count_ > 0) {
625             return true;
626         }
627 
628         if (disable_) {
629             return false;
630         }
631         errCode = ExtendNode();
632         return errCode == E_OK;
633     };
634     if (cond_.wait_for(lock, interval, waiter)) {
635         if (nodes_.empty()) {
636             LOG_ERROR("Nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
637                 count_, max_, total_, left_, right_);
638             count_ = 0;
639             return {E_ERROR, nullptr};
640         }
641         auto node = nodes_.back();
642         nodes_.pop_back();
643         count_--;
644         return {E_OK, node};
645     }
646     return {errCode, nullptr};
647 }
648 
ExtendNode()649 int32_t ConnPool::Container::ExtendNode()
650 {
651     if (creator_ == nullptr) {
652         return E_ERROR;
653     }
654     auto [errCode, conn] = creator_();
655     if (conn == nullptr) {
656         return errCode;
657     }
658     auto node = std::make_shared<ConnNode>(conn);
659     node->id_ = right_++;
660     conn->SetId(node->id_);
661     nodes_.push_back(node);
662     details_.push_back(node);
663     count_++;
664     total_++;
665     return E_OK;
666 }
667 
AcquireAll(std::chrono::milliseconds milliS)668 std::pair<bool, std::list<std::shared_ptr<ConnPool::ConnNode>>> ConnPool::Container::AcquireAll(
669     std::chrono::milliseconds milliS)
670 {
671     std::list<std::shared_ptr<ConnNode>> nodes;
672     int32_t count = 0;
673     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
674     auto time = std::chrono::steady_clock::now() + interval;
675     std::unique_lock<decltype(mutex_)> lock(mutex_);
676     while (count < total_ && cond_.wait_until(lock, time, [this]() { return count_ > 0; })) {
677         nodes.merge(std::move(nodes_));
678         nodes_.clear();
679         count += count_;
680         count_ = 0;
681     }
682 
683     if (count != total_) {
684         count_ = count;
685         nodes_ = std::move(nodes);
686         nodes.clear();
687         return {false, nodes};
688     }
689     auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
690         for (auto &node : nodes) {
691             if (node->connect_ == nullptr) {
692                 continue;
693             }
694             if (node->connect_.use_count() != 1) {
695                 return false;
696             }
697         }
698         return true;
699     };
700     bool failed = false;
701     while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
702     }
703     if (failed) {
704         count_ = count;
705         nodes_ = std::move(nodes);
706         nodes.clear();
707     }
708     return {!failed, nodes};
709 }
710 
Disable()711 void ConnPool::Container::Disable()
712 {
713     disable_ = true;
714     cond_.notify_one();
715 }
716 
Enable()717 void ConnPool::Container::Enable()
718 {
719     disable_ = false;
720     cond_.notify_one();
721 }
722 
Release(std::shared_ptr<ConnNode> node)723 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
724 {
725     {
726         std::unique_lock<decltype(mutex_)> lock(mutex_);
727         if (node->id_ < left_ || node->id_ >= right_) {
728             return E_OK;
729         }
730         if (count_ == max_) {
731             total_ = total_ > count_ ? total_ - 1 : count_;
732             RelDetails(node);
733         } else {
734             nodes_.push_front(node);
735             count_++;
736         }
737     }
738     cond_.notify_one();
739     return E_OK;
740 }
741 
ReleaseTrans(std::shared_ptr<ConnNode> node)742 int32_t ConnectionPool::Container::ReleaseTrans(std::shared_ptr<ConnNode> node)
743 {
744     {
745         std::unique_lock<decltype(mutex_)> lock(mutex_);
746         if (node->id_ < left_ || node->id_ >= right_) {
747             return E_OK;
748         }
749         if (node->IsRecyclable()) {
750             nodes_.push_back(node);
751             count_++;
752         } else {
753             total_--;
754             RelDetails(node);
755         }
756     }
757     cond_.notify_one();
758     return E_OK;
759 }
760 
RelDetails(std::shared_ptr<ConnNode> node)761 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
762 {
763     for (auto it = details_.begin(); it != details_.end();) {
764         auto detailNode = it->lock();
765         if (detailNode == nullptr || detailNode->id_ == node->id_) {
766             it = details_.erase(it);
767         } else {
768             it++;
769         }
770     }
771     return E_OK;
772 }
773 
CheckIntegrity(const std::string & dbPath)774 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
775 {
776     RdbStoreConfig config(config_);
777     config.SetPath(dbPath);
778     config.SetIntegrityCheck(IntegrityCheck::FULL);
779     config.SetHaMode(HAMode::SINGLE);
780     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
781         auto [ret, connection] = Connection::Create(config, true);
782         if (ret == E_OK) {
783             return true;
784         }
785         if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
786             break;
787         }
788         config.SetIter(ITER_V1);
789     }
790     return false;
791 }
792 
Clear()793 int32_t ConnPool::Container::Clear()
794 {
795     std::list<std::shared_ptr<ConnNode>> nodes;
796     std::list<std::weak_ptr<ConnNode>> details;
797     {
798         std::unique_lock<decltype(mutex_)> lock(mutex_);
799         nodes = std::move(nodes_);
800         details = std::move(details_);
801         disable_ = true;
802         total_ = 0;
803         count_ = 0;
804         if (right_ > MAX_RIGHT) {
805             right_ = 0;
806         }
807         left_ = right_;
808         creator_ = nullptr;
809     }
810     nodes.clear();
811     details.clear();
812     return 0;
813 }
814 
IsFull()815 bool ConnPool::Container::IsFull()
816 {
817     std::unique_lock<decltype(mutex_)> lock(mutex_);
818     return total_ == count_;
819 }
820 
Empty()821 bool ConnPool::Container::Empty()
822 {
823     std::unique_lock<decltype(mutex_)> lock(mutex_);
824     return total_ == 0;
825 }
826 
Dump(const char * header,int32_t count)827 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
828 {
829     std::string info;
830     std::string allInfo;
831     std::vector<std::shared_ptr<ConnNode>> details;
832     std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
833                         std::to_string(total_) + "," + std::to_string(count_) + "]";
834     {
835         std::unique_lock<decltype(mutex_)> lock(mutex_);
836         details.reserve(details_.size());
837         for (auto &detail : details_) {
838             auto node = detail.lock();
839             if (node == nullptr) {
840                 continue;
841             }
842             details.push_back(node);
843         }
844     }
845 
846     for (auto &node : details) {
847         info.append("<")
848             .append(std::to_string(node->id_))
849             .append(",")
850             .append(std::to_string(node->tid_))
851             .append(",")
852             .append(std::to_string(node->GetUsingTime()))
853             .append(">");
854         // 256 represent that limit to info length
855         if (info.size() > 256) {
856             LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
857             allInfo.append(header).append(": ").append(title).append(std::move(info)).append("\n");
858             info.clear();
859         }
860     }
861     LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
862     allInfo.append(header).append(": ").append(title).append(std::move(info));
863     Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_DUMP_INFO, BUNDLE_NAME_COMMON, allInfo));
864     return 0;
865 }
866 
ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)867 int32_t ConnectionPool::Container::ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)
868 {
869     std::unique_lock<decltype(mutex_)> lock(mutex_);
870     int transCount = total_;
871     for (auto it = nodes_.begin(); it != nodes_.end();) {
872         auto unusedDuration = std::chrono::steady_clock::now() - (*it)->time_;
873         if (unusedDuration < TRANS_CLEAR_INTERVAL) {
874             it++;
875             continue;
876         }
877         RelDetails(*it);
878         it = nodes_.erase(it);
879         total_--;
880         count_--;
881     }
882     if (total_ != 0) {
883         pool->DelayClearTrans();
884     }
885     LOG_INFO("%{public}d have been cleaned up, and there are %{public}d remaining to be cleaned up",
886         transCount - total_, total_);
887     return E_OK;
888 }
889 
IsRecyclable()890 bool ConnPool::ConnNode::IsRecyclable()
891 {
892     return connect_->IsRecyclable();
893 }
894 } // namespace NativeRdb
895 } // namespace OHOS
896