• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "ConnectionPool"
17 #include "connection_pool.h"
18 
19 #include <base_transaction.h>
20 
21 #include <condition_variable>
22 #include <iterator>
23 #include <mutex>
24 #include <sstream>
25 #include <vector>
26 
27 #include "logger.h"
28 #include "connection.h"
29 #include "rdb_common.h"
30 #include "rdb_errno.h"
31 #include "rdb_fault_hiview_reporter.h"
32 #include "rdb_sql_statistic.h"
33 #include "sqlite_global_config.h"
34 #include "sqlite_utils.h"
35 #include "task_executor.h"
36 
37 namespace OHOS {
38 namespace NativeRdb {
39 using namespace OHOS::Rdb;
40 using namespace std::chrono;
41 using Conn = Connection;
42 using ConnPool = ConnectionPool;
43 using SharedConn = std::shared_ptr<Connection>;
44 using SharedConns = std::vector<std::shared_ptr<Connection>>;
45 using SqlStatistic = DistributedRdb::SqlStatistic;
46 using Reportor = RdbFaultHiViewReporter;
47 constexpr int32_t TRANSACTION_TIMEOUT(2);
48 
Create(const RdbStoreConfig & config,int & errCode)49 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
50 {
51     std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
52     if (pool == nullptr) {
53         LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
54         errCode = E_ERROR;
55         return nullptr;
56     }
57     std::shared_ptr<Connection> conn;
58     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
59         std::tie(errCode, conn) = pool->Init();
60         if (errCode != E_SQLITE_CORRUPT) {
61             break;
62         }
63         config.SetIter(ITER_V1);
64     }
65     std::string dbPath;
66     (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
67     LOG_INFO("code:%{public}d app:%{public}s path:[%{public}s] "
68              "cfg:[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
69              "%{public}s",
70         errCode, config.GetBundleName().c_str(), SqliteUtils::Anonymous(dbPath).c_str(), config.GetDBType(),
71         config.GetHaMode(), config.IsEncrypt(), config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(),
72         config.IsReadOnly(),
73         Reportor::FormatBrief(Connection::Collect(config), SqliteUtils::Anonymous(config.GetName())).c_str());
74     return errCode == E_OK ? pool : nullptr;
75 }
76 
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)77 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption
78     (const RdbStoreConfig &storeConfig, int &errCode)
79 {
80     std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
81     auto &[rebuiltType, pool] = result;
82 
83     int repairErrCode = Connection::Repair(storeConfig);
84     if (repairErrCode == E_OK) {
85         rebuiltType = RebuiltType::REPAIRED;
86     } else if (storeConfig.GetAllowRebuild()) {
87         Connection::Delete(storeConfig);
88         rebuiltType = RebuiltType::REBUILT;
89     } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
90         return result;
91     } else {
92         errCode = E_SQLITE_CORRUPT;
93         return result;
94     }
95     pool = Create(storeConfig, errCode);
96     if (errCode != E_OK) {
97         LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d, errno",
98             static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
99             storeConfig.IsEncrypt(), errCode, errno);
100     } else {
101         Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild"), false);
102     }
103 
104     return result;
105 }
106 
ConnectionPool(const RdbStoreConfig & storeConfig)107 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
108     : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
109       transactionUsed_(false)
110 {
111     attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
112     trans_.right_ = Container::MIN_TRANS_ID;
113     trans_.left_ = trans_.right_;
114 }
115 
Init(bool isAttach,bool needWriter)116 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
117 {
118     const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
119     std::pair<int32_t, std::shared_ptr<Connection>> result;
120     auto &[errCode, conn] = result;
121     errCode = config.Initialize();
122     if (errCode != E_OK) {
123         return result;
124     }
125 
126     if (config.GetRoleType() == OWNER && !config.IsReadOnly()) {
127         // write connect count is 1
128         std::shared_ptr<ConnPool::ConnNode> node;
129         auto create = [this, isAttach]() {
130             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
131             return Connection::Create(config, true);
132         };
133         std::tie(errCode, node) = writers_.Initialize(create, 1, config.GetWriteTime(), true, needWriter);
134         conn = Convert2AutoConn(node);
135         if (errCode != E_OK) {
136             return result;
137         }
138         trans_.InitMembers(create, MAX_TRANS, 0, false);
139     }
140 
141     isAttach_ = isAttach;
142     maxReader_ = GetMaxReaders(config);
143     // max read connect count is 64
144     if (maxReader_ > 64) {
145         return { E_ARGS_READ_CON_OVERLOAD, nullptr };
146     }
147     auto [ret, node] = readers_.Initialize(
148         [this, isAttach]() {
149             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
150             return Connection::Create(config, false);
151         },
152         maxReader_, config.GetReadTime(), maxReader_ == 0);
153     errCode = ret;
154     return result;
155 }
156 
~ConnectionPool()157 ConnPool::~ConnectionPool()
158 {
159     CloseAllConnections();
160 }
161 
GetMaxReaders(const RdbStoreConfig & config)162 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
163 {
164     if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
165         config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
166         return config.GetReadConSize();
167     } else {
168         return 0;
169     }
170 }
171 
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)172 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
173 {
174     if (node == nullptr) {
175         return nullptr;
176     }
177 
178     auto conn = node->GetConnect();
179     if (conn == nullptr) {
180         return nullptr;
181     }
182     if (isTrans) {
183         transCount_++;
184     }
185 
186     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
187         auto realPool = pool.lock();
188         if (realPool == nullptr) {
189             return;
190         }
191         realPool->ReleaseNode(node, isTrans);
192         if (isTrans) {
193             realPool->transCount_--;
194         }
195         node = nullptr;
196     });
197 }
198 
DelayClearTrans()199 void ConnPool::DelayClearTrans()
200 {
201     auto pool = TaskExecutor::GetInstance().GetExecutor();
202     if (pool == nullptr) {
203         LOG_ERROR("pool is nullptr.");
204         return;
205     }
206     pool->Schedule(TRANS_CLEAR_INTERVAL, [pool = weak_from_this()]() {
207         auto realPool = pool.lock();
208         if (realPool == nullptr) {
209             return;
210         }
211         realPool->trans_.ClearUnusedTrans(realPool);
212     });
213 }
214 
CloseAllConnections()215 void ConnPool::CloseAllConnections()
216 {
217     writers_.Clear();
218     readers_.Clear();
219     trans_.Clear();
220 }
221 
IsInTransaction()222 bool ConnPool::IsInTransaction()
223 {
224     return isInTransaction_.load();
225 }
226 
SetInTransaction(bool isInTransaction)227 void ConnPool::SetInTransaction(bool isInTransaction)
228 {
229     isInTransaction_.store(isInTransaction);
230 }
231 
CreateTransConn(bool limited)232 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
233 {
234     if (transCount_.load() >= MAX_TRANS && limited) {
235         trans_.Dump("NO TRANS", transCount_ + isInTransaction_);
236         writers_.Dump("NO TRANS WRITE", transCount_ + isInTransaction_);
237         return { E_DATABASE_BUSY, nullptr };
238     }
239     if (trans_.Empty()) {
240         DelayClearTrans();
241     }
242     auto [errCode, node] = trans_.Acquire(INVALID_TIME);
243     return { errCode, Convert2AutoConn(node, true) };
244 }
245 
AcquireConnection(bool isReadOnly)246 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
247 {
248     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
249     return Acquire(isReadOnly);
250 }
251 
AcquireAll(int32_t time)252 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
253 {
254     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
255     using namespace std::chrono;
256     std::pair<SharedConn, SharedConns> result;
257     auto &[writer, readers] = result;
258     auto interval = duration_cast<milliseconds>(seconds(time));
259     auto start = steady_clock::now();
260     auto [res, writerNodes] = writers_.AcquireAll(interval);
261     if (!res) {
262         return {};
263     }
264     writer = Convert2AutoConn(writerNodes.front());
265 
266     auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
267     if (writer == nullptr || usedTime >= interval) {
268         return {};
269     }
270 
271     if (maxReader_ == 0) {
272         return result;
273     }
274 
275     readers_.Disable();
276     std::list<std::shared_ptr<ConnPool::ConnNode>> nodes;
277     std::tie(res, nodes) = readers_.AcquireAll(interval - usedTime);
278     if (!res) {
279         readers_.Enable();
280         return {};
281     }
282 
283     for (auto node : nodes) {
284         auto conn = Convert2AutoConn(node);
285         if (conn == nullptr) {
286             continue;
287         }
288         readers.push_back(conn);
289     }
290     usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
291     if (usedTime >= interval) {
292         return {};
293     }
294     trans_.Disable();
295     std::list<std::shared_ptr<ConnPool::ConnNode>> trans;
296     std::tie(res, trans) = trans_.AcquireAll(interval - usedTime);
297     if (!res) {
298         trans_.Enable();
299         return {};
300     }
301     return result;
302 }
303 
Acquire(bool isReadOnly,std::chrono::milliseconds ms)304 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
305 {
306     Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
307     auto [errCode, node] = container->Acquire(ms);
308     if (errCode != E_OK || node == nullptr) {
309         const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
310         container->Dump(header, transCount_ + isInTransaction_);
311         return nullptr;
312     }
313     return Convert2AutoConn(node);
314 }
315 
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)316 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
317 {
318     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
319     if (maxReader_ != 0) {
320         return Acquire(isReadOnly, ms);
321     }
322     auto [errCode, node] = writers_.Acquire(ms);
323     if (errCode != E_OK || node == nullptr) {
324         writers_.Dump("writers_", transCount_ + isInTransaction_);
325         return nullptr;
326     }
327     auto conn = node->connect_;
328     writers_.Release(node);
329     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
330         auto realPool = pool.lock();
331         if (realPool == nullptr) {
332             return;
333         }
334         realPool->writers_.cond_.notify_all();
335     });
336 }
337 
ReleaseNode(std::shared_ptr<ConnNode> node,bool isTrans)338 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node,  bool isTrans)
339 {
340     if (node == nullptr) {
341         return;
342     }
343     auto now = steady_clock::now();
344     auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
345                    failedTime_.load() == steady_clock::time_point();
346     auto transCount = transCount_ + isInTransaction_;
347     auto remainCount = isTrans ? transCount - 1 : transCount;
348     auto errCode = node->Unused(remainCount, timeout);
349     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
350         writers_.Dump("WAL writers_", transCount);
351         trans_.Dump("WAL trans_", transCount);
352         readers_.Dump("WAL readers_", transCount);
353     }
354 
355     if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
356         failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
357     }
358 
359     if (isTrans) {
360         trans_.ReleaseTrans(node);
361     } else {
362         auto &container = node->IsWriter() ? writers_ : readers_;
363         container.Release(node);
364     }
365 }
366 
AcquireTransaction()367 int ConnPool::AcquireTransaction()
368 {
369     std::unique_lock<std::mutex> lock(transMutex_);
370     if (transCondition_.wait_for(lock, std::chrono::seconds(TRANSACTION_TIMEOUT), [this] {
371             return !transactionUsed_;
372         })) {
373         transactionUsed_ = true;
374         return E_OK;
375     }
376     LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
377     return E_DATABASE_BUSY;
378 }
379 
ReleaseTransaction()380 void ConnPool::ReleaseTransaction()
381 {
382     {
383         std::unique_lock<std::mutex> lock(transMutex_);
384         transactionUsed_ = false;
385     }
386     transCondition_.notify_one();
387 }
388 
RestartConns()389 int ConnPool::RestartConns()
390 {
391     const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
392     readers_.Clear();
393     auto [errCode, node] = readers_.Initialize(
394         [this]() {
395             const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
396             return Connection::Create(config, false);
397         },
398         maxReader_, config.GetReadTime(), maxReader_ == 0);
399     trans_.Clear();
400     trans_.InitMembers(
401         [this]() {
402             const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
403             return Connection::Create(config, true);
404         },
405         MAX_TRANS, 0, false);
406     return errCode;
407 }
408 
409 /**
410  * The database locale.
411  */
ConfigLocale(const std::string & localeStr)412 int ConnPool::ConfigLocale(const std::string &localeStr)
413 {
414     auto errCode = readers_.ConfigLocale(localeStr);
415     if (errCode != E_OK) {
416         return errCode;
417     }
418     return writers_.ConfigLocale(localeStr);
419 }
420 
421 /**
422  * Rename the backed up database.
423  */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,SlaveStatus & slaveStatus)424 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
425     const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)
426 {
427     if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
428         LOG_ERROR("Connection pool is busy now!");
429         return E_ERROR;
430     }
431     if (config_.GetDBType() == DB_VECTOR) {
432         CloseAllConnections();
433         auto [retVal, connection] = CreateTransConn();
434 
435         if (connection == nullptr) {
436             LOG_ERROR("Get null connection.");
437             return retVal;
438         }
439         retVal = connection->Restore(backupPath, {}, slaveStatus);
440         if (retVal != E_OK) {
441             LOG_ERROR("RdDbRestore error.");
442             return retVal;
443         }
444         CloseAllConnections();
445         auto [errCode, node] = Init();
446         return errCode;
447     }
448     return RestoreByDbSqliteType(newPath, backupPath, slaveStatus);
449 }
450 
RestoreByDbSqliteType(const std::string & newPath,const std::string & backupPath,SlaveStatus & slaveStatus)451 int ConnPool::RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)
452 {
453     if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() != HAMode::SINGLE) {
454         auto connection = AcquireConnection(false);
455         if (connection == nullptr) {
456             return E_DATABASE_BUSY;
457         }
458         return connection->Restore(backupPath, {}, slaveStatus);
459     }
460 
461     return RestoreMasterDb(newPath, backupPath);
462 }
463 
RestoreMasterDb(const std::string & newPath,const std::string & backupPath)464 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath)
465 {
466     if (!CheckIntegrity(backupPath)) {
467         LOG_ERROR("backup file is corrupted, %{public}s", SqliteUtils::Anonymous(backupPath).c_str());
468         return E_SQLITE_CORRUPT;
469     }
470     SqliteUtils::DeleteFile(backupPath + "-shm");
471     SqliteUtils::DeleteFile(backupPath + "-wal");
472 
473     CloseAllConnections();
474     Connection::Delete(config_);
475 
476     if (config_.GetPath() != newPath) {
477         RdbStoreConfig config(newPath);
478         config.SetPath(newPath);
479         Connection::Delete(config);
480     }
481 
482     bool copyRet = SqliteUtils::CopyFile(backupPath, newPath);
483     int32_t errCode = E_OK;
484     std::shared_ptr<Connection> pool;
485     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
486         std::tie(errCode, pool) = Init();
487         if (errCode == E_OK) {
488             break;
489         }
490         if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
491             break;
492         }
493         config_.SetIter(ITER_V1);
494     }
495     if (errCode != E_OK) {
496         CloseAllConnections();
497         Connection::Delete(config_);
498         std::tie(errCode, pool) = Init();
499         LOG_WARN("restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
500             SqliteUtils::Anonymous(backupPath).c_str());
501     }
502     return copyRet ? errCode : E_ERROR;
503 }
504 
GetTransactionStack()505 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
506 {
507     return transactionStack_;
508 }
509 
GetTransactionStackMutex()510 std::mutex &ConnPool::GetTransactionStackMutex()
511 {
512     return transactionStackMutex_;
513 }
514 
DisableWal()515 std::pair<int, std::shared_ptr<Conn>> ConnPool::DisableWal()
516 {
517     return Init(true, true);
518 }
519 
EnableWal()520 int ConnPool::EnableWal()
521 {
522     auto [errCode, node] = Init();
523     return errCode;
524 }
525 
Dump(bool isWriter,const char * header)526 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
527 {
528     Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
529     container->Dump(header, transCount_ + isInTransaction_);
530     return E_OK;
531 }
532 
ConnNode(std::shared_ptr<Conn> conn)533 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
534 {
535 }
536 
GetConnect()537 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
538 {
539     tid_ = gettid();
540     time_ = steady_clock::now();
541     return connect_;
542 }
543 
GetUsingTime() const544 int64_t ConnPool::ConnNode::GetUsingTime() const
545 {
546     auto time = steady_clock::now() - time_;
547     return duration_cast<milliseconds>(time).count();
548 }
549 
Unused(int32_t count,bool timeout)550 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
551 {
552     time_ = steady_clock::now();
553     if (connect_ == nullptr) {
554         return E_OK;
555     }
556 
557     connect_->ClearCache();
558     int32_t errCode = E_INNER_WARNING;
559     if (count <= 0) {
560         errCode = connect_->TryCheckPoint(timeout);
561     }
562 
563     time_ = steady_clock::now();
564     if (!connect_->IsWriter()) {
565         tid_ = 0;
566     }
567     return errCode;
568 }
569 
IsWriter() const570 bool ConnPool::ConnNode::IsWriter() const
571 {
572     if (connect_ != nullptr) {
573         return connect_->IsWriter();
574     }
575     return false;
576 }
577 
InitMembers(Creator creator,int32_t max,int32_t timeout,bool disable)578 void ConnPool::Container::InitMembers(Creator creator, int32_t max, int32_t timeout, bool disable)
579 {
580     std::unique_lock<decltype(mutex_)> lock(mutex_);
581     disable_ = disable;
582     max_ = max;
583     creator_ = creator;
584     timeout_ = std::chrono::seconds(timeout);
585 }
586 
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)587 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(Creator creator, int32_t max,
588     int32_t timeout, bool disable, bool acquire)
589 {
590     InitMembers(creator, max, timeout, disable);
591     std::shared_ptr<ConnNode> connNode = nullptr;
592     {
593         std::unique_lock<decltype(mutex_)> lock(mutex_);
594         disable_ = disable;
595         max_ = max;
596         creator_ = creator;
597         timeout_ = std::chrono::seconds(timeout);
598         for (int i = 0; i < max_; ++i) {
599             auto errCode = ExtendNode();
600             if (errCode != E_OK) {
601                 nodes_.clear();
602                 details_.clear();
603                 return { errCode, nullptr };
604             }
605         }
606 
607         if (acquire && count_ > 0) {
608             connNode = nodes_.back();
609             nodes_.pop_back();
610             count_--;
611         }
612     }
613     cond_.notify_all();
614     return { E_OK, connNode };
615 }
616 
ConfigLocale(const std::string & locale)617 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
618 {
619     std::unique_lock<decltype(mutex_)> lock(mutex_);
620     if (total_ != count_) {
621         return E_DATABASE_BUSY;
622     }
623     for (auto it = details_.begin(); it != details_.end();) {
624         auto conn = it->lock();
625         if (conn == nullptr || conn->connect_ == nullptr) {
626             it = details_.erase(it);
627             continue;
628         }
629         conn->connect_->ConfigLocale(locale);
630     }
631     return E_OK;
632 }
633 
Acquire(std::chrono::milliseconds milliS)634 std::pair<int, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
635 {
636     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
637     std::unique_lock<decltype(mutex_)> lock(mutex_);
638     if (max_ == 0) {
639         return {E_ERROR, nullptr};
640     }
641     int errCode = E_OK;
642     auto waiter = [this, &errCode]() -> bool {
643         if (count_ > 0) {
644             return true;
645         }
646 
647         if (disable_) {
648             return false;
649         }
650         errCode = ExtendNode();
651         return errCode == E_OK;
652     };
653     if (cond_.wait_for(lock, interval, waiter)) {
654         if (nodes_.empty()) {
655             LOG_ERROR(
656                 "nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
657                 count_, max_, total_, left_, right_);
658             count_ = 0;
659             return {E_ERROR, nullptr};
660         }
661         auto node = nodes_.back();
662         nodes_.pop_back();
663         count_--;
664         return {E_OK, node};
665     }
666     return {errCode, nullptr};
667 }
668 
ExtendNode()669 int32_t ConnPool::Container::ExtendNode()
670 {
671     if (creator_ == nullptr) {
672         return E_ERROR;
673     }
674     auto [errCode, conn] = creator_();
675     if (conn == nullptr) {
676         return errCode;
677     }
678     auto node = std::make_shared<ConnNode>(conn);
679     node->id_ = right_++;
680     conn->SetId(node->id_);
681     nodes_.push_back(node);
682     details_.push_back(node);
683     count_++;
684     total_++;
685     return E_OK;
686 }
687 
AcquireAll(std::chrono::milliseconds milliS)688 std::pair<bool, std::list<std::shared_ptr<ConnPool::ConnNode>>> ConnPool::Container::AcquireAll(
689     std::chrono::milliseconds milliS)
690 {
691     std::list<std::shared_ptr<ConnNode>> nodes;
692     int32_t count = 0;
693     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
694     auto time = std::chrono::steady_clock::now() + interval;
695     std::unique_lock<decltype(mutex_)> lock(mutex_);
696     while (count < total_ && cond_.wait_until(lock, time, [this]() {
697         return count_ > 0;
698     })) {
699         nodes.merge(std::move(nodes_));
700         nodes_.clear();
701         count += count_;
702         count_ = 0;
703     }
704 
705     if (count != total_) {
706         count_ = count;
707         nodes_ = std::move(nodes);
708         nodes.clear();
709         return {false, nodes};
710     }
711     auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
712         for (auto &node : nodes) {
713             if (node->connect_ == nullptr) {
714                 continue;
715             }
716             if (node->connect_.use_count() != 1) {
717                 return false;
718             }
719         }
720         return true;
721     };
722     bool failed = false;
723     while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
724     }
725     if (failed) {
726         count_ = count;
727         nodes_ = std::move(nodes);
728         nodes.clear();
729     }
730     return {!failed, nodes};
731 }
732 
Disable()733 void ConnPool::Container::Disable()
734 {
735     disable_ = true;
736     cond_.notify_one();
737 }
738 
Enable()739 void ConnPool::Container::Enable()
740 {
741     disable_ = false;
742     cond_.notify_one();
743 }
744 
Release(std::shared_ptr<ConnNode> node)745 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
746 {
747     {
748         std::unique_lock<decltype(mutex_)> lock(mutex_);
749         if (node->id_ < left_ || node->id_ >= right_) {
750             return E_OK;
751         }
752         if (count_ == max_) {
753             total_ = total_ > count_ ? total_ - 1 : count_;
754             RelDetails(node);
755         } else {
756             nodes_.push_front(node);
757             count_++;
758         }
759     }
760     cond_.notify_one();
761     return E_OK;
762 }
763 
ReleaseTrans(std::shared_ptr<ConnNode> node)764 int32_t ConnectionPool::Container::ReleaseTrans(std::shared_ptr<ConnNode> node)
765 {
766     {
767         std::unique_lock<decltype(mutex_)> lock(mutex_);
768         if (node->id_ < left_ || node->id_ >= right_) {
769             return E_OK;
770         }
771         if (node->IsRecyclable()) {
772             nodes_.push_back(node);
773             count_++;
774         } else {
775             total_--;
776             RelDetails(node);
777         }
778     }
779     cond_.notify_one();
780     return E_OK;
781 }
782 
RelDetails(std::shared_ptr<ConnNode> node)783 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
784 {
785     for (auto it = details_.begin(); it != details_.end();) {
786         auto detailNode = it->lock();
787         if (detailNode == nullptr || detailNode->id_ == node->id_) {
788             it = details_.erase(it);
789         } else {
790             it++;
791         }
792     }
793     return E_OK;
794 }
795 
CheckIntegrity(const std::string & dbPath)796 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
797 {
798     RdbStoreConfig config(config_);
799     config.SetPath(dbPath);
800     config.SetIntegrityCheck(IntegrityCheck::FULL);
801     config.SetHaMode(HAMode::SINGLE);
802     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
803         auto [ret, connection] = Connection::Create(config, true);
804         if (ret == E_OK) {
805             return true;
806         }
807         if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
808             break;
809         }
810         config.SetIter(ITER_V1);
811     }
812     return false;
813 }
814 
Clear()815 int32_t ConnPool::Container::Clear()
816 {
817     std::list<std::shared_ptr<ConnNode>> nodes;
818     std::list<std::weak_ptr<ConnNode>> details;
819     {
820         std::unique_lock<decltype(mutex_)> lock(mutex_);
821         nodes = std::move(nodes_);
822         details = std::move(details_);
823         disable_ = true;
824         total_ = 0;
825         count_ = 0;
826         if (right_ > MAX_RIGHT) {
827             right_ = 0;
828         }
829         left_ = right_;
830         creator_ = nullptr;
831     }
832     nodes.clear();
833     details.clear();
834     return 0;
835 }
836 
IsFull()837 bool ConnPool::Container::IsFull()
838 {
839     std::unique_lock<decltype(mutex_)> lock(mutex_);
840     return total_ == count_;
841 }
842 
Empty()843 bool ConnPool::Container::Empty()
844 {
845     std::unique_lock<decltype(mutex_)> lock(mutex_);
846     return total_ == 0;
847 }
848 
Dump(const char * header,int32_t count)849 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
850 {
851     std::string info;
852     std::vector<std::shared_ptr<ConnNode>> details;
853     std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
854                         std::to_string(total_) + "," + std::to_string(count_) + "]";
855     {
856         std::unique_lock<decltype(mutex_)> lock(mutex_);
857         details.reserve(details_.size());
858         for (auto &detail : details_) {
859             auto node = detail.lock();
860             if (node == nullptr) {
861                 continue;
862             }
863             details.push_back(node);
864         }
865     }
866 
867     for (auto &node : details) {
868         info.append("<")
869             .append(std::to_string(node->id_))
870             .append(",")
871             .append(std::to_string(node->tid_))
872             .append(",")
873             .append(std::to_string(node->GetUsingTime()))
874             .append(">");
875         // 256 represent that limit to info length
876         if (info.size() > 256) {
877             LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
878             info.clear();
879         }
880     }
881     LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
882     return 0;
883 }
884 
ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)885 int32_t ConnectionPool::Container::ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)
886 {
887     std::unique_lock<decltype(mutex_)> lock(mutex_);
888     int transCount = total_;
889     for (auto it = nodes_.begin(); it != nodes_.end();) {
890         auto unusedDuration = std::chrono::steady_clock::now() - (*it)->time_;
891         if (unusedDuration < TRANS_CLEAR_INTERVAL) {
892             it++;
893             continue;
894         }
895         RelDetails(*it);
896         it = nodes_.erase(it);
897         total_--;
898         count_--;
899     }
900     if (total_ != 0) {
901         pool->DelayClearTrans();
902     }
903     LOG_INFO("%{public}d have been cleaned up, and there are %{public}d remaining to be cleaned up",
904         transCount - total_, total_);
905     return E_OK;
906 }
907 
IsRecyclable()908 bool ConnPool::ConnNode::IsRecyclable()
909 {
910     return connect_->IsRecyclable();
911 }
912 } // namespace NativeRdb
913 } // namespace OHOS
914