• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "ConnectionPool"
17 #include "connection_pool.h"
18 
19 #include <base_transaction.h>
20 
21 #include <condition_variable>
22 #include <iterator>
23 #include <mutex>
24 #include <sstream>
25 #include <vector>
26 
27 #include "connection.h"
28 #include "logger.h"
29 #include "rdb_common.h"
30 #include "rdb_errno.h"
31 #include "rdb_fault_hiview_reporter.h"
32 #include "rdb_sql_statistic.h"
33 #include "rdb_perfStat.h"
34 #include "sqlite_global_config.h"
35 #include "sqlite_utils.h"
36 #include "task_executor.h"
37 
38 namespace OHOS {
39 namespace NativeRdb {
40 using namespace OHOS::Rdb;
41 using namespace std::chrono;
42 using Conn = Connection;
43 using ConnPool = ConnectionPool;
44 using SharedConn = std::shared_ptr<Connection>;
45 using SharedConns = std::vector<std::shared_ptr<Connection>>;
46 using SqlStatistic = DistributedRdb::SqlStatistic;
47 using PerfStat = DistributedRdb::PerfStat;
48 using Reportor = RdbFaultHiViewReporter;
49 constexpr int32_t TRANSACTION_TIMEOUT(2);
50 constexpr int64_t WAIT_TIME = 2;
51 
Create(const RdbStoreConfig & config,int & errCode)52 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
53 {
54     std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
55     if (pool == nullptr) {
56         LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
57         errCode = E_ERROR;
58         return nullptr;
59     }
60     std::shared_ptr<Connection> conn;
61     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
62         std::tie(errCode, conn) = pool->Init();
63         if (errCode != E_SQLITE_CORRUPT) {
64             break;
65         }
66         config.SetIter(ITER_V1);
67     }
68     std::string dbPath;
69     (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
70     LOG_INFO("code:%{public}d app[%{public}s:%{public}s] area[%{public}s] "
71              "cfg[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
72              "%{public}s",
73         errCode, config.GetBundleName().c_str(), config.GetModuleName().c_str(),
74         SqliteUtils::GetArea(dbPath).c_str(), config.GetDBType(), config.GetHaMode(), config.IsEncrypt(),
75         config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(), config.IsReadOnly(),
76         SqliteUtils::FormatDebugInfoBrief(Connection::Collect(config),
77         SqliteUtils::Anonymous(config.GetName())).c_str());
78     return errCode == E_OK ? pool : nullptr;
79 }
80 
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)81 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption(
82     const RdbStoreConfig &storeConfig, int &errCode)
83 {
84     std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
85     auto &[rebuiltType, pool] = result;
86 
87     int repairErrCode = Connection::Repair(storeConfig);
88     if (repairErrCode == E_OK) {
89         rebuiltType = RebuiltType::REPAIRED;
90     } else if (storeConfig.GetAllowRebuild()) {
91         Connection::Delete(storeConfig);
92         rebuiltType = RebuiltType::REBUILT;
93     } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
94         return result;
95     } else {
96         errCode = E_SQLITE_CORRUPT;
97         return result;
98     }
99     pool = Create(storeConfig, errCode);
100     if (errCode != E_OK) {
101         LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d errno %{public}d",
102             static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
103             storeConfig.IsEncrypt(), errCode, errno);
104     } else {
105         Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild", false), false);
106     }
107 
108     return result;
109 }
110 
ConnectionPool(const RdbStoreConfig & storeConfig)111 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
112     : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
113       transactionUsed_(false)
114 {
115     attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
116     trans_.right_ = Container::MIN_TRANS_ID;
117     trans_.left_ = trans_.right_;
118     clearActuator_ = std::make_shared<DelayActuator<std::vector<std::weak_ptr<ConnPool::ConnNode>>,
119         std::function<void(
120             std::vector<std::weak_ptr<ConnPool::ConnNode>> & out, std::shared_ptr<ConnPool::ConnNode> && input)>>>(
121         [](auto &out, std::shared_ptr<ConnNode> &&input) {
122             for (auto &node : out) {
123                 if (node.lock() == input) {
124                     return;
125                 }
126             }
127             out.push_back(std::move(input));
128         },
129         FIRST_DELAY_INTERVAL,
130         MIN_EXECUTE_INTERVAL,
131         MAX_EXECUTE_INTERVAL);
132     clearActuator_->SetExecutorPool(TaskExecutor::GetInstance().GetExecutor());
133     clearActuator_->SetTask([](std::vector<std::weak_ptr<ConnPool::ConnNode>> &&nodes) {
134         for (auto &node : nodes) {
135             auto realNode = node.lock();
136             if (realNode == nullptr || realNode->connect_ == nullptr) {
137                 continue;
138             }
139             realNode->connect_->ClearCache(true);
140         }
141         return E_OK;
142     });
143 }
144 
Init(bool isAttach,bool needWriter)145 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
146 {
147     const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
148     std::pair<int32_t, std::shared_ptr<Connection>> result;
149     auto &[errCode, conn] = result;
150     errCode = config.Initialize();
151     if (errCode != E_OK) {
152         return result;
153     }
154 
155     if ((config.GetRoleType() == OWNER || config.GetRoleType() == VISITOR_WRITE) && !config.IsReadOnly()) {
156         // write connect count is 1
157         std::shared_ptr<ConnPool::ConnNode> node;
158         auto create = [this, isAttach]() {
159             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
160             return Connection::Create(config, true);
161         };
162         std::tie(errCode, node) = writers_.Initialize(create, 1, config.GetWriteTime(), true, needWriter);
163         conn = Convert2AutoConn(node);
164         if (errCode != E_OK) {
165             return result;
166         }
167         trans_.InitMembers(create, MAX_TRANS, 0, false);
168     }
169     isAttach_ = isAttach;
170     maxReader_ = GetMaxReaders(config);
171     // max read connect count is 64
172     if (maxReader_ > 64) {
173         return { E_ARGS_READ_CON_OVERLOAD, nullptr };
174     }
175     auto [ret, node] = readers_.Initialize(
176         [this, isAttach]() {
177             const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
178             return Connection::Create(config, false);
179         },
180         maxReader_, config.GetReadTime(), maxReader_ == 0);
181     errCode = ret;
182     return result;
183 }
184 
~ConnectionPool()185 ConnPool::~ConnectionPool()
186 {
187     clearActuator_ = nullptr;
188     CloseAllConnections();
189 }
190 
GetMaxReaders(const RdbStoreConfig & config)191 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
192 {
193     if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
194         config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
195         return config.GetReadConSize();
196     } else {
197         return 0;
198     }
199 }
200 
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)201 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
202 {
203     if (node == nullptr) {
204         return nullptr;
205     }
206 
207     auto conn = node->GetConnect();
208     if (conn == nullptr) {
209         return nullptr;
210     }
211     conn->VerifyAndRegisterHook(config_);
212     if (isTrans) {
213         transCount_++;
214     }
215     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
216         auto realPool = pool.lock();
217         if (realPool == nullptr) {
218             return;
219         }
220         realPool->ReleaseNode(node, isTrans);
221         if (isTrans) {
222             realPool->transCount_--;
223         }
224         node = nullptr;
225     });
226 }
227 
DelayClearTrans()228 void ConnPool::DelayClearTrans()
229 {
230     auto pool = TaskExecutor::GetInstance().GetExecutor();
231     if (pool == nullptr) {
232         LOG_ERROR("pool is nullptr.");
233         return;
234     }
235     pool->Schedule(TRANS_CLEAR_INTERVAL, [pool = weak_from_this()]() {
236         auto realPool = pool.lock();
237         if (realPool == nullptr) {
238             return;
239         }
240         realPool->trans_.ClearUnusedTrans(realPool);
241     });
242 }
243 
CloseAllConnections()244 void ConnPool::CloseAllConnections()
245 {
246     writers_.Clear();
247     readers_.Clear();
248     trans_.Clear();
249 }
250 
IsInTransaction()251 bool ConnPool::IsInTransaction()
252 {
253     return isInTransaction_.load();
254 }
255 
SetInTransaction(bool isInTransaction)256 void ConnPool::SetInTransaction(bool isInTransaction)
257 {
258     isInTransaction_.store(isInTransaction);
259 }
260 
CreateTransConn(bool limited)261 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
262 {
263     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_WAIT);
264     if (transCount_.load() >= MAX_TRANS && limited) {
265         trans_.Dump("NO TRANS", transCount_ + isInTransaction_);
266         writers_.Dump("NO TRANS WRITE", transCount_ + isInTransaction_);
267         return { E_DATABASE_BUSY, nullptr };
268     }
269     if (trans_.Empty()) {
270         DelayClearTrans();
271     }
272     auto [errCode, node] = trans_.Acquire(INVALID_TIME);
273     return { errCode, Convert2AutoConn(node, true) };
274 }
275 
AcquireConnection(bool isReadOnly)276 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
277 {
278     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
279     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_WAIT);
280     return Acquire(isReadOnly);
281 }
282 
AcquireAll(int32_t time)283 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
284 {
285     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
286     PerfStat perfStat(config_.GetPath(), "", PerfStat::Step::STEP_WAIT);
287     using namespace std::chrono;
288     std::pair<SharedConn, SharedConns> result;
289     auto &[writer, readers] = result;
290     auto interval = duration_cast<milliseconds>(seconds(time));
291     auto start = steady_clock::now();
292     auto [res, writerNodes] = writers_.AcquireAll(interval);
293     if (!res || writerNodes.empty()) {
294         return {};
295     }
296     writer = Convert2AutoConn(writerNodes.front());
297 
298     auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
299     if (writer == nullptr || usedTime >= interval) {
300         return {};
301     }
302 
303     if (maxReader_ == 0) {
304         return result;
305     }
306 
307     readers_.Disable();
308     std::list<std::shared_ptr<ConnPool::ConnNode>> nodes;
309     std::tie(res, nodes) = readers_.AcquireAll(interval - usedTime);
310     if (!res) {
311         readers_.Enable();
312         return {};
313     }
314 
315     for (auto node : nodes) {
316         auto conn = Convert2AutoConn(node);
317         if (conn == nullptr) {
318             continue;
319         }
320         readers.push_back(conn);
321     }
322     usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
323     if (usedTime >= interval) {
324         return {};
325     }
326     trans_.Disable();
327     std::list<std::shared_ptr<ConnPool::ConnNode>> trans;
328     std::tie(res, trans) = trans_.AcquireAll(interval - usedTime);
329     if (!res) {
330         trans_.Enable();
331         return {};
332     }
333     return result;
334 }
335 
Acquire(bool isReadOnly,std::chrono::milliseconds ms)336 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
337 {
338     Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
339     auto [errCode, node] = container->Acquire(ms);
340     if (errCode != E_OK || node == nullptr) {
341         const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
342         container->Dump(header, transCount_ + isInTransaction_);
343         return nullptr;
344     }
345     return Convert2AutoConn(node);
346 }
347 
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)348 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
349 {
350     SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
351     PerfStat perfStat(config_.GetPath(), "", SqlStatistic::Step::STEP_WAIT);
352     if (maxReader_ != 0) {
353         return Acquire(isReadOnly, ms);
354     }
355     auto [errCode, node] = writers_.Acquire(ms);
356     if (errCode != E_OK || node == nullptr) {
357         writers_.Dump("writers_", transCount_ + isInTransaction_);
358         return nullptr;
359     }
360     auto conn = node->connect_;
361     writers_.Release(node);
362     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
363         auto realPool = pool.lock();
364         if (realPool == nullptr) {
365             return;
366         }
367         realPool->writers_.cond_.notify_all();
368     });
369 }
370 
ReleaseNode(std::shared_ptr<ConnNode> node,bool isTrans)371 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool isTrans)
372 {
373     if (node == nullptr) {
374         return;
375     }
376 
377     auto now = steady_clock::now();
378     auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
379                    failedTime_.load() == steady_clock::time_point();
380     auto transCount = transCount_ + isInTransaction_;
381     auto remainCount = isTrans ? transCount - 1 : transCount;
382     auto errCode = node->Unused(remainCount, timeout);
383     if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
384         writers_.Dump("WAL writers_", transCount);
385         trans_.Dump("WAL trans_", transCount);
386         readers_.Dump("WAL readers_", transCount);
387     }
388     if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
389         failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
390     }
391 
392     if (isTrans) {
393         trans_.ReleaseTrans(node);
394     } else {
395         auto &container = node->IsWriter() ? writers_ : readers_;
396         container.Release(node);
397     }
398     if (clearActuator_ != nullptr) {
399         clearActuator_->Execute<std::shared_ptr<ConnNode>>(std::move(node));
400     }
401 }
402 
AcquireTransaction()403 int ConnPool::AcquireTransaction()
404 {
405     std::unique_lock<std::mutex> lock(transMutex_);
406     if (transCondition_.wait_for(
407         lock, std::chrono::seconds(TRANSACTION_TIMEOUT),
408         [this] { return !transactionUsed_; })) {
409         transactionUsed_ = true;
410         return E_OK;
411     }
412     LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
413     return E_DATABASE_BUSY;
414 }
415 
ReleaseTransaction()416 void ConnPool::ReleaseTransaction()
417 {
418     {
419         std::unique_lock<std::mutex> lock(transMutex_);
420         transactionUsed_ = false;
421     }
422     transCondition_.notify_one();
423 }
424 
RestartConns()425 int ConnPool::RestartConns()
426 {
427     const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
428     readers_.Clear();
429     auto [errCode, node] = readers_.Initialize(
430         [this]() {
431             const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
432             return Connection::Create(config, false);
433         },
434         maxReader_, config.GetReadTime(), maxReader_ == 0);
435     trans_.Clear();
436     trans_.InitMembers(
437         [this]() {
438             const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
439             return Connection::Create(config, true);
440         },
441         MAX_TRANS, 0, false);
442     return errCode;
443 }
444 
ReopenConns()445 int ConnPool::ReopenConns()
446 {
447     CloseAllConnections();
448     auto initRes = Init();
449     if (initRes.first != E_OK) {
450         LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
451         return initRes.first;
452     }
453     return E_OK;
454 }
455 
456 /**
457  * The database locale.
458  */
ConfigLocale(const std::string & localeStr)459 int ConnPool::ConfigLocale(const std::string &localeStr)
460 {
461     auto errCode = readers_.ConfigLocale(localeStr);
462     if (errCode != E_OK) {
463         return errCode;
464     }
465     errCode = writers_.ConfigLocale(localeStr);
466     if (errCode != E_OK) {
467         return errCode;
468     }
469     return trans_.ConfigLocale(localeStr);
470 }
471 
472 /**
473  * Rename the backed up database.
474  */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,std::shared_ptr<SlaveStatus> slaveStatus)475 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
476     const std::vector<uint8_t> &newKey, std::shared_ptr<SlaveStatus> slaveStatus)
477 {
478     if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
479         LOG_ERROR("Connection pool is busy now!");
480         return E_ERROR;
481     }
482     if (config_.GetDBType() == DB_VECTOR) {
483         CloseAllConnections();
484         auto [retVal, conn] = Connection::Create(config_, false);
485         if (retVal != E_OK) {
486             LOG_ERROR("Create connection fail, errCode:%{public}d", retVal);
487             return retVal;
488         }
489 
490         retVal = conn->Restore(backupPath, newKey, slaveStatus);
491         if (retVal != E_OK) {
492             LOG_ERROR("Restore failed, errCode:0x%{public}x", retVal);
493             return retVal;
494         }
495 
496         conn = nullptr;
497         auto initRes = Init();
498         if (initRes.first != E_OK) {
499             LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
500             return initRes.first;
501         }
502         return retVal;
503     }
504     return RestoreByDbSqliteType(newPath, backupPath, slaveStatus);
505 }
506 
RestoreByDbSqliteType(const std::string & newPath,const std::string & backupPath,std::shared_ptr<SlaveStatus> slaveStatus)507 int ConnPool::RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath,
508     std::shared_ptr<SlaveStatus> slaveStatus)
509 {
510     if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() != HAMode::SINGLE) {
511         auto connection = AcquireConnection(false);
512         if (connection == nullptr) {
513             return E_DATABASE_BUSY;
514         }
515         return connection->Restore(backupPath, {}, slaveStatus);
516     }
517 
518     return RestoreMasterDb(newPath, backupPath);
519 }
520 
RestoreMasterDb(const std::string & newPath,const std::string & backupPath)521 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath)
522 {
523     if (!CheckIntegrity(backupPath)) {
524         LOG_ERROR("backup file is corrupted, %{public}s", SqliteUtils::Anonymous(backupPath).c_str());
525         return E_SQLITE_CORRUPT;
526     }
527     auto walFile = backupPath + "-wal";
528     if (SqliteUtils::GetFileSize(walFile) != 0) {
529         LOG_ERROR("Wal file exist.");
530         return E_SQLITE_CORRUPT;
531     }
532 
533     SqliteUtils::DeleteFile(backupPath + "-shm");
534     SqliteUtils::DeleteFile(backupPath + "-wal");
535 
536     CloseAllConnections();
537     Connection::Delete(config_);
538 
539     if (config_.GetPath() != newPath) {
540         RdbStoreConfig config(newPath);
541         config.SetPath(newPath);
542         Connection::Delete(config);
543     }
544 
545     bool copyRet = SqliteUtils::CopyFile(backupPath, newPath);
546     int32_t errCode = E_OK;
547     std::shared_ptr<Connection> pool;
548     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
549         std::tie(errCode, pool) = Init();
550         if (errCode == E_OK) {
551             break;
552         }
553         if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
554             break;
555         }
556         config_.SetIter(ITER_V1);
557     }
558     if (errCode != E_OK) {
559         CloseAllConnections();
560         Connection::Delete(config_);
561         std::tie(errCode, pool) = Init();
562         LOG_WARN("Restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
563             SqliteUtils::Anonymous(backupPath).c_str());
564     }
565     return copyRet ? errCode : E_ERROR;
566 }
567 
Rekey(const RdbStoreConfig::CryptoParam & cryptoParam)568 int ConnPool::Rekey(const RdbStoreConfig::CryptoParam &cryptoParam)
569 {
570     int errCode = E_OK;
571     auto [connection, readers] = AcquireAll(WAIT_TIME);
572     if (connection == nullptr) {
573         return E_DATABASE_BUSY;
574     }
575     CloseAllConnections();
576     auto key = config_.GetEncryptKey();
577     readers.clear();
578 
579     errCode = connection->Rekey(cryptoParam);
580     if (errCode != E_OK) {
581         LOG_ERROR("ReKey failed, err = %{public}d", errCode);
582         key.assign(key.size(), 0);
583         return errCode;
584     }
585     config_.ResetEncryptKey(cryptoParam.encryptKey_);
586     connection = nullptr;
587     auto initRes = Init();
588     if (initRes.first != E_OK) {
589         LOG_ERROR("Init fail, errCode:%{public}d", initRes.first);
590         config_.ResetEncryptKey(key);
591         key.assign(key.size(), 0);
592         initRes = Init();
593         return initRes.first;
594     }
595     return E_OK;
596 }
597 
GetTransactionStack()598 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
599 {
600     return transactionStack_;
601 }
602 
GetTransactionStackMutex()603 std::mutex &ConnPool::GetTransactionStackMutex()
604 {
605     return transactionStackMutex_;
606 }
607 
DisableWal()608 std::pair<int32_t, std::shared_ptr<Conn>> ConnPool::DisableWal()
609 {
610     return Init(true, true);
611 }
612 
EnableWal()613 int ConnPool::EnableWal()
614 {
615     auto [errCode, node] = Init();
616     return errCode;
617 }
618 
Dump(bool isWriter,const char * header)619 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
620 {
621     Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
622     container->Dump(header, transCount_ + isInTransaction_);
623     return E_OK;
624 }
625 
ConnNode(std::shared_ptr<Conn> conn)626 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
627 {
628 }
629 
GetConnect()630 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
631 {
632     tid_ = gettid();
633     time_ = steady_clock::now();
634     return connect_;
635 }
636 
GetUsingTime() const637 int64_t ConnPool::ConnNode::GetUsingTime() const
638 {
639     auto time = steady_clock::now() - time_;
640     return duration_cast<milliseconds>(time).count();
641 }
642 
Unused(int32_t count,bool timeout)643 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
644 {
645     if (connect_ == nullptr) {
646         return E_OK;
647     }
648 
649     connect_->ClearCache();
650     int32_t errCode = E_INNER_WARNING;
651     if (count <= 0) {
652         errCode = connect_->TryCheckPoint(timeout);
653     }
654 
655     time_ = steady_clock::now();
656     if (!connect_->IsWriter()) {
657         tid_ = 0;
658     }
659     return errCode;
660 }
661 
IsWriter() const662 bool ConnPool::ConnNode::IsWriter() const
663 {
664     if (connect_ != nullptr) {
665         return connect_->IsWriter();
666     }
667     return false;
668 }
669 
InitMembers(Creator creator,int32_t max,int32_t timeout,bool disable)670 void ConnPool::Container::InitMembers(Creator creator, int32_t max, int32_t timeout, bool disable)
671 {
672     std::unique_lock<decltype(mutex_)> lock(mutex_);
673     disable_ = disable;
674     max_ = max;
675     creator_ = creator;
676     timeout_ = std::chrono::seconds(timeout);
677 }
678 
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)679 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(
680     Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)
681 {
682     std::shared_ptr<ConnNode> connNode = nullptr;
683     {
684         std::unique_lock<decltype(mutex_)> lock(mutex_);
685         disable_ = disable;
686         max_ = max;
687         creator_ = creator;
688         timeout_ = std::chrono::seconds(timeout);
689         for (int i = 0; i < max_; ++i) {
690             auto errCode = ExtendNode();
691             if (errCode != E_OK) {
692                 nodes_.clear();
693                 details_.clear();
694                 return { errCode, nullptr };
695             }
696         }
697 
698         if (acquire && count_ > 0) {
699             connNode = nodes_.back();
700             nodes_.pop_back();
701             count_--;
702         }
703     }
704     cond_.notify_all();
705     return { E_OK, connNode };
706 }
707 
ConfigLocale(const std::string & locale)708 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
709 {
710     std::unique_lock<decltype(mutex_)> lock(mutex_);
711     for (auto it = details_.begin(); it != details_.end();) {
712         auto conn = it->lock();
713         if (conn == nullptr || conn->connect_ == nullptr) {
714             it = details_.erase(it);
715             continue;
716         }
717         int32_t errCode = conn->connect_->ConfigLocale(locale);
718         if (errCode != E_OK) {
719             LOG_ERROR("ConfigLocale failed %{public}d", errCode);
720             return errCode;
721         }
722         ++it;
723     }
724     return E_OK;
725 }
726 
Acquire(std::chrono::milliseconds milliS)727 std::pair<int, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
728 {
729     std::unique_lock<decltype(mutex_)> lock(mutex_);
730     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
731     if (max_ == 0) {
732         return {E_ERROR, nullptr};
733     }
734     int errCode = E_OK;
735     auto waiter = [this, &errCode]() -> bool {
736         if (count_ > 0) {
737             return true;
738         }
739 
740         if (disable_) {
741             return false;
742         }
743         errCode = ExtendNode();
744         return errCode == E_OK;
745     };
746     if (cond_.wait_for(lock, interval, waiter)) {
747         if (nodes_.empty()) {
748             LOG_ERROR("Nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
749                 count_, max_, total_, left_, right_);
750             count_ = 0;
751             return {E_ERROR, nullptr};
752         }
753         auto node = nodes_.back();
754         nodes_.pop_back();
755         count_--;
756         return {E_OK, node};
757     }
758     return {errCode, nullptr};
759 }
760 
ExtendNode()761 int32_t ConnPool::Container::ExtendNode()
762 {
763     if (creator_ == nullptr) {
764         return E_ERROR;
765     }
766     auto [errCode, conn] = creator_();
767     if (conn == nullptr) {
768         return errCode;
769     }
770     auto node = std::make_shared<ConnNode>(conn);
771     node->id_ = right_++;
772     conn->SetId(node->id_);
773     nodes_.push_back(node);
774     details_.push_back(node);
775     count_++;
776     total_++;
777     return E_OK;
778 }
779 
AcquireAll(std::chrono::milliseconds milliS)780 std::pair<bool, std::list<std::shared_ptr<ConnPool::ConnNode>>> ConnPool::Container::AcquireAll(
781     std::chrono::milliseconds milliS)
782 {
783     std::list<std::shared_ptr<ConnNode>> nodes;
784     int32_t count = 0;
785     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
786     auto time = std::chrono::steady_clock::now() + interval;
787     std::unique_lock<decltype(mutex_)> lock(mutex_);
788     while (count < total_ && cond_.wait_until(lock, time, [this]() { return count_ > 0; })) {
789         nodes.merge(std::move(nodes_));
790         nodes_.clear();
791         count += count_;
792         count_ = 0;
793     }
794 
795     if (count != total_) {
796         count_ = count;
797         nodes_ = std::move(nodes);
798         nodes.clear();
799         return {false, nodes};
800     }
801     auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
802         for (auto &node : nodes) {
803             if (node->connect_ == nullptr) {
804                 continue;
805             }
806             if (node->connect_.use_count() != 1) {
807                 return false;
808             }
809         }
810         return true;
811     };
812     bool failed = false;
813     while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
814     }
815     if (failed) {
816         count_ = count;
817         nodes_ = std::move(nodes);
818         nodes.clear();
819     }
820     return {!failed, nodes};
821 }
822 
Disable()823 void ConnPool::Container::Disable()
824 {
825     disable_ = true;
826     cond_.notify_one();
827 }
828 
Enable()829 void ConnPool::Container::Enable()
830 {
831     disable_ = false;
832     cond_.notify_one();
833 }
834 
Release(std::shared_ptr<ConnNode> node)835 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
836 {
837     {
838         std::unique_lock<decltype(mutex_)> lock(mutex_);
839         if (node->id_ < left_ || node->id_ >= right_) {
840             return E_OK;
841         }
842         if (count_ == max_) {
843             total_ = total_ > count_ ? total_ - 1 : count_;
844             RelDetails(node);
845         } else {
846             nodes_.push_front(node);
847             count_++;
848         }
849     }
850     cond_.notify_one();
851     return E_OK;
852 }
853 
ReleaseTrans(std::shared_ptr<ConnNode> node)854 int32_t ConnectionPool::Container::ReleaseTrans(std::shared_ptr<ConnNode> node)
855 {
856     {
857         std::unique_lock<decltype(mutex_)> lock(mutex_);
858         if (node->id_ < left_ || node->id_ >= right_) {
859             return E_OK;
860         }
861         if (node->IsRecyclable()) {
862             nodes_.push_back(node);
863             count_++;
864         } else {
865             total_--;
866             RelDetails(node);
867         }
868     }
869     cond_.notify_one();
870     return E_OK;
871 }
872 
RelDetails(std::shared_ptr<ConnNode> node)873 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
874 {
875     for (auto it = details_.begin(); it != details_.end();) {
876         auto detailNode = it->lock();
877         if (detailNode == nullptr || detailNode->id_ == node->id_) {
878             it = details_.erase(it);
879         } else {
880             it++;
881         }
882     }
883     return E_OK;
884 }
885 
CheckIntegrity(const std::string & dbPath)886 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
887 {
888     RdbStoreConfig config(config_);
889     config.SetPath(dbPath);
890     config.SetIntegrityCheck(IntegrityCheck::FULL);
891     config.SetHaMode(HAMode::SINGLE);
892     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
893         auto [ret, connection] = Connection::Create(config, true);
894         if (ret == E_OK) {
895             return true;
896         }
897         if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
898             break;
899         }
900         config.SetIter(ITER_V1);
901     }
902     return false;
903 }
904 
Clear()905 int32_t ConnPool::Container::Clear()
906 {
907     std::list<std::shared_ptr<ConnNode>> nodes;
908     std::list<std::weak_ptr<ConnNode>> details;
909     {
910         std::unique_lock<decltype(mutex_)> lock(mutex_);
911         nodes = std::move(nodes_);
912         details = std::move(details_);
913         disable_ = true;
914         total_ = 0;
915         count_ = 0;
916         if (right_ > MAX_RIGHT) {
917             right_ = 0;
918         }
919         left_ = right_;
920         creator_ = nullptr;
921     }
922     nodes.clear();
923     details.clear();
924     return 0;
925 }
926 
IsFull()927 bool ConnPool::Container::IsFull()
928 {
929     std::unique_lock<decltype(mutex_)> lock(mutex_);
930     return total_ == count_;
931 }
932 
Empty()933 bool ConnPool::Container::Empty()
934 {
935     std::unique_lock<decltype(mutex_)> lock(mutex_);
936     return total_ == 0;
937 }
938 
Dump(const char * header,int32_t count)939 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
940 {
941     std::string info;
942     std::string allInfo;
943     std::vector<std::shared_ptr<ConnNode>> details;
944     std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
945                         std::to_string(total_) + "," + std::to_string(count_) + "]";
946     {
947         std::unique_lock<decltype(mutex_)> lock(mutex_);
948         details.reserve(details_.size());
949         for (auto &detail : details_) {
950             auto node = detail.lock();
951             if (node == nullptr) {
952                 continue;
953             }
954             details.push_back(node);
955         }
956     }
957 
958     for (auto &node : details) {
959         info.append("<")
960             .append(std::to_string(node->id_))
961             .append(",")
962             .append(std::to_string(node->tid_))
963             .append(",")
964             .append(std::to_string(node->GetUsingTime()))
965             .append(">");
966         // 256 represent that limit to info length
967         if (info.size() > 256) {
968             LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
969             allInfo.append(header).append(": ").append(title).append(std::move(info)).append("\n");
970             info.clear();
971         }
972     }
973     LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
974     allInfo.append(header).append(": ").append(title).append(std::move(info));
975     Reportor::ReportFault(RdbFaultEvent(FT_CURD, E_DFX_DUMP_INFO, BUNDLE_NAME_COMMON, allInfo));
976     return 0;
977 }
978 
ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)979 int32_t ConnectionPool::Container::ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)
980 {
981     std::unique_lock<decltype(mutex_)> lock(mutex_);
982     int transCount = total_;
983     for (auto it = nodes_.begin(); it != nodes_.end();) {
984         auto unusedDuration = std::chrono::steady_clock::now() - (*it)->time_;
985         if (unusedDuration < TRANS_CLEAR_INTERVAL) {
986             it++;
987             continue;
988         }
989         RelDetails(*it);
990         it = nodes_.erase(it);
991         total_--;
992         count_--;
993     }
994     if (total_ != 0) {
995         pool->DelayClearTrans();
996     }
997     LOG_INFO("%{public}d have been cleaned up, and there are %{public}d remaining to be cleaned up",
998         transCount - total_, total_);
999     return E_OK;
1000 }
1001 
IsRecyclable()1002 bool ConnPool::ConnNode::IsRecyclable()
1003 {
1004     return connect_->IsRecyclable();
1005 }
1006 } // namespace NativeRdb
1007 } // namespace OHOS
1008