1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "ConnectionPool"
17 #include "connection_pool.h"
18
19 #include <base_transaction.h>
20
21 #include <condition_variable>
22 #include <iterator>
23 #include <mutex>
24 #include <sstream>
25 #include <vector>
26
27 #include "logger.h"
28 #include "connection.h"
29 #include "rdb_common.h"
30 #include "rdb_errno.h"
31 #include "rdb_fault_hiview_reporter.h"
32 #include "rdb_sql_statistic.h"
33 #include "sqlite_global_config.h"
34 #include "sqlite_utils.h"
35 #include "task_executor.h"
36
37 namespace OHOS {
38 namespace NativeRdb {
39 using namespace OHOS::Rdb;
40 using namespace std::chrono;
41 using Conn = Connection;
42 using ConnPool = ConnectionPool;
43 using SharedConn = std::shared_ptr<Connection>;
44 using SharedConns = std::vector<std::shared_ptr<Connection>>;
45 using SqlStatistic = DistributedRdb::SqlStatistic;
46 using Reportor = RdbFaultHiViewReporter;
47 constexpr int32_t TRANSACTION_TIMEOUT(2);
48
Create(const RdbStoreConfig & config,int & errCode)49 std::shared_ptr<ConnPool> ConnPool::Create(const RdbStoreConfig &config, int &errCode)
50 {
51 std::shared_ptr<ConnPool> pool(new (std::nothrow) ConnPool(config));
52 if (pool == nullptr) {
53 LOG_ERROR("ConnPool::Create new failed, pool is nullptr.");
54 errCode = E_ERROR;
55 return nullptr;
56 }
57 std::shared_ptr<Connection> conn;
58 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
59 std::tie(errCode, conn) = pool->Init();
60 if (errCode != E_SQLITE_CORRUPT) {
61 break;
62 }
63 config.SetIter(ITER_V1);
64 }
65 std::string dbPath;
66 (void)SqliteGlobalConfig::GetDbPath(config, dbPath);
67 LOG_INFO("code:%{public}d app:%{public}s path:[%{public}s] "
68 "cfg:[%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d,%{public}d]"
69 "%{public}s",
70 errCode, config.GetBundleName().c_str(), SqliteUtils::Anonymous(dbPath).c_str(), config.GetDBType(),
71 config.GetHaMode(), config.IsEncrypt(), config.GetArea(), config.GetSecurityLevel(), config.GetRoleType(),
72 config.IsReadOnly(),
73 Reportor::FormatBrief(Connection::Collect(config), SqliteUtils::Anonymous(config.GetName())).c_str());
74 return errCode == E_OK ? pool : nullptr;
75 }
76
HandleDataCorruption(const RdbStoreConfig & storeConfig,int & errCode)77 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> ConnPool::HandleDataCorruption
78 (const RdbStoreConfig &storeConfig, int &errCode)
79 {
80 std::pair<RebuiltType, std::shared_ptr<ConnectionPool>> result;
81 auto &[rebuiltType, pool] = result;
82
83 int repairErrCode = Connection::Repair(storeConfig);
84 if (repairErrCode == E_OK) {
85 rebuiltType = RebuiltType::REPAIRED;
86 } else if (storeConfig.GetAllowRebuild()) {
87 Connection::Delete(storeConfig);
88 rebuiltType = RebuiltType::REBUILT;
89 } else if (storeConfig.IsEncrypt() && errCode == E_INVALID_SECRET_KEY) {
90 return result;
91 } else {
92 errCode = E_SQLITE_CORRUPT;
93 return result;
94 }
95 pool = Create(storeConfig, errCode);
96 if (errCode != E_OK) {
97 LOG_WARN("failed, type %{public}d db %{public}s encrypt %{public}d error %{public}d, errno",
98 static_cast<uint32_t>(rebuiltType), SqliteUtils::Anonymous(storeConfig.GetName()).c_str(),
99 storeConfig.IsEncrypt(), errCode, errno);
100 } else {
101 Reportor::ReportRestore(Reportor::Create(storeConfig, E_OK, "RestoreType:Rebuild"), false);
102 }
103
104 return result;
105 }
106
ConnectionPool(const RdbStoreConfig & storeConfig)107 ConnPool::ConnectionPool(const RdbStoreConfig &storeConfig)
108 : config_(storeConfig), attachConfig_(storeConfig), writers_(), readers_(), transactionStack_(),
109 transactionUsed_(false)
110 {
111 attachConfig_.SetJournalMode(JournalMode::MODE_TRUNCATE);
112 trans_.right_ = Container::MIN_TRANS_ID;
113 trans_.left_ = trans_.right_;
114 }
115
Init(bool isAttach,bool needWriter)116 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::Init(bool isAttach, bool needWriter)
117 {
118 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
119 std::pair<int32_t, std::shared_ptr<Connection>> result;
120 auto &[errCode, conn] = result;
121 errCode = config.Initialize();
122 if (errCode != E_OK) {
123 return result;
124 }
125
126 if (config.GetRoleType() == OWNER && !config.IsReadOnly()) {
127 // write connect count is 1
128 std::shared_ptr<ConnPool::ConnNode> node;
129 auto create = [this, isAttach]() {
130 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
131 return Connection::Create(config, true);
132 };
133 std::tie(errCode, node) = writers_.Initialize(create, 1, config.GetWriteTime(), true, needWriter);
134 conn = Convert2AutoConn(node);
135 if (errCode != E_OK) {
136 return result;
137 }
138 trans_.InitMembers(create, MAX_TRANS, 0, false);
139 }
140
141 isAttach_ = isAttach;
142 maxReader_ = GetMaxReaders(config);
143 // max read connect count is 64
144 if (maxReader_ > 64) {
145 return { E_ARGS_READ_CON_OVERLOAD, nullptr };
146 }
147 auto [ret, node] = readers_.Initialize(
148 [this, isAttach]() {
149 const RdbStoreConfig &config = isAttach ? attachConfig_ : config_;
150 return Connection::Create(config, false);
151 },
152 maxReader_, config.GetReadTime(), maxReader_ == 0);
153 errCode = ret;
154 return result;
155 }
156
~ConnectionPool()157 ConnPool::~ConnectionPool()
158 {
159 CloseAllConnections();
160 }
161
GetMaxReaders(const RdbStoreConfig & config)162 int32_t ConnPool::GetMaxReaders(const RdbStoreConfig &config)
163 {
164 if (config.GetStorageMode() != StorageMode::MODE_MEMORY &&
165 config.GetJournalMode() == RdbStoreConfig::GetJournalModeValue(JournalMode::MODE_WAL)) {
166 return config.GetReadConSize();
167 } else {
168 return 0;
169 }
170 }
171
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)172 std::shared_ptr<Connection> ConnPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
173 {
174 if (node == nullptr) {
175 return nullptr;
176 }
177
178 auto conn = node->GetConnect();
179 if (conn == nullptr) {
180 return nullptr;
181 }
182 if (isTrans) {
183 transCount_++;
184 }
185
186 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
187 auto realPool = pool.lock();
188 if (realPool == nullptr) {
189 return;
190 }
191 realPool->ReleaseNode(node, isTrans);
192 if (isTrans) {
193 realPool->transCount_--;
194 }
195 node = nullptr;
196 });
197 }
198
DelayClearTrans()199 void ConnPool::DelayClearTrans()
200 {
201 auto pool = TaskExecutor::GetInstance().GetExecutor();
202 if (pool == nullptr) {
203 LOG_ERROR("pool is nullptr.");
204 return;
205 }
206 pool->Schedule(TRANS_CLEAR_INTERVAL, [pool = weak_from_this()]() {
207 auto realPool = pool.lock();
208 if (realPool == nullptr) {
209 return;
210 }
211 realPool->trans_.ClearUnusedTrans(realPool);
212 });
213 }
214
CloseAllConnections()215 void ConnPool::CloseAllConnections()
216 {
217 writers_.Clear();
218 readers_.Clear();
219 trans_.Clear();
220 }
221
IsInTransaction()222 bool ConnPool::IsInTransaction()
223 {
224 return isInTransaction_.load();
225 }
226
SetInTransaction(bool isInTransaction)227 void ConnPool::SetInTransaction(bool isInTransaction)
228 {
229 isInTransaction_.store(isInTransaction);
230 }
231
CreateTransConn(bool limited)232 std::pair<int32_t, std::shared_ptr<Connection>> ConnPool::CreateTransConn(bool limited)
233 {
234 if (transCount_.load() >= MAX_TRANS && limited) {
235 trans_.Dump("NO TRANS", transCount_ + isInTransaction_);
236 writers_.Dump("NO TRANS WRITE", transCount_ + isInTransaction_);
237 return { E_DATABASE_BUSY, nullptr };
238 }
239 if (trans_.Empty()) {
240 DelayClearTrans();
241 }
242 auto [errCode, node] = trans_.Acquire(INVALID_TIME);
243 return { errCode, Convert2AutoConn(node, true) };
244 }
245
AcquireConnection(bool isReadOnly)246 std::shared_ptr<Conn> ConnPool::AcquireConnection(bool isReadOnly)
247 {
248 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
249 return Acquire(isReadOnly);
250 }
251
AcquireAll(int32_t time)252 std::pair<SharedConn, SharedConns> ConnPool::AcquireAll(int32_t time)
253 {
254 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
255 using namespace std::chrono;
256 std::pair<SharedConn, SharedConns> result;
257 auto &[writer, readers] = result;
258 auto interval = duration_cast<milliseconds>(seconds(time));
259 auto start = steady_clock::now();
260 auto [res, writerNodes] = writers_.AcquireAll(interval);
261 if (!res) {
262 return {};
263 }
264 writer = Convert2AutoConn(writerNodes.front());
265
266 auto usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
267 if (writer == nullptr || usedTime >= interval) {
268 return {};
269 }
270
271 if (maxReader_ == 0) {
272 return result;
273 }
274
275 readers_.Disable();
276 std::list<std::shared_ptr<ConnPool::ConnNode>> nodes;
277 std::tie(res, nodes) = readers_.AcquireAll(interval - usedTime);
278 if (!res) {
279 readers_.Enable();
280 return {};
281 }
282
283 for (auto node : nodes) {
284 auto conn = Convert2AutoConn(node);
285 if (conn == nullptr) {
286 continue;
287 }
288 readers.push_back(conn);
289 }
290 usedTime = duration_cast<milliseconds>(steady_clock::now() - start);
291 if (usedTime >= interval) {
292 return {};
293 }
294 trans_.Disable();
295 std::list<std::shared_ptr<ConnPool::ConnNode>> trans;
296 std::tie(res, trans) = trans_.AcquireAll(interval - usedTime);
297 if (!res) {
298 trans_.Enable();
299 return {};
300 }
301 return result;
302 }
303
Acquire(bool isReadOnly,std::chrono::milliseconds ms)304 std::shared_ptr<Conn> ConnPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
305 {
306 Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
307 auto [errCode, node] = container->Acquire(ms);
308 if (errCode != E_OK || node == nullptr) {
309 const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
310 container->Dump(header, transCount_ + isInTransaction_);
311 return nullptr;
312 }
313 return Convert2AutoConn(node);
314 }
315
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)316 SharedConn ConnPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
317 {
318 SqlStatistic sqlStatistic("", SqlStatistic::Step::STEP_WAIT);
319 if (maxReader_ != 0) {
320 return Acquire(isReadOnly, ms);
321 }
322 auto [errCode, node] = writers_.Acquire(ms);
323 if (errCode != E_OK || node == nullptr) {
324 writers_.Dump("writers_", transCount_ + isInTransaction_);
325 return nullptr;
326 }
327 auto conn = node->connect_;
328 writers_.Release(node);
329 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), conn](Connection *) {
330 auto realPool = pool.lock();
331 if (realPool == nullptr) {
332 return;
333 }
334 realPool->writers_.cond_.notify_all();
335 });
336 }
337
ReleaseNode(std::shared_ptr<ConnNode> node,bool isTrans)338 void ConnPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool isTrans)
339 {
340 if (node == nullptr) {
341 return;
342 }
343 auto now = steady_clock::now();
344 auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
345 failedTime_.load() == steady_clock::time_point();
346 auto transCount = transCount_ + isInTransaction_;
347 auto remainCount = isTrans ? transCount - 1 : transCount;
348 auto errCode = node->Unused(remainCount, timeout);
349 if (errCode == E_SQLITE_LOCKED || errCode == E_SQLITE_BUSY) {
350 writers_.Dump("WAL writers_", transCount);
351 trans_.Dump("WAL trans_", transCount);
352 readers_.Dump("WAL readers_", transCount);
353 }
354
355 if (node->IsWriter() && (errCode != E_INNER_WARNING && errCode != E_NOT_SUPPORT)) {
356 failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
357 }
358
359 if (isTrans) {
360 trans_.ReleaseTrans(node);
361 } else {
362 auto &container = node->IsWriter() ? writers_ : readers_;
363 container.Release(node);
364 }
365 }
366
AcquireTransaction()367 int ConnPool::AcquireTransaction()
368 {
369 std::unique_lock<std::mutex> lock(transMutex_);
370 if (transCondition_.wait_for(lock, std::chrono::seconds(TRANSACTION_TIMEOUT), [this] {
371 return !transactionUsed_;
372 })) {
373 transactionUsed_ = true;
374 return E_OK;
375 }
376 LOG_WARN("transactionUsed_ is %{public}d", transactionUsed_);
377 return E_DATABASE_BUSY;
378 }
379
ReleaseTransaction()380 void ConnPool::ReleaseTransaction()
381 {
382 {
383 std::unique_lock<std::mutex> lock(transMutex_);
384 transactionUsed_ = false;
385 }
386 transCondition_.notify_one();
387 }
388
RestartConns()389 int ConnPool::RestartConns()
390 {
391 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
392 readers_.Clear();
393 auto [errCode, node] = readers_.Initialize(
394 [this]() {
395 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
396 return Connection::Create(config, false);
397 },
398 maxReader_, config.GetReadTime(), maxReader_ == 0);
399 trans_.Clear();
400 trans_.InitMembers(
401 [this]() {
402 const RdbStoreConfig &config = isAttach_ ? attachConfig_ : config_;
403 return Connection::Create(config, true);
404 },
405 MAX_TRANS, 0, false);
406 return errCode;
407 }
408
409 /**
410 * The database locale.
411 */
ConfigLocale(const std::string & localeStr)412 int ConnPool::ConfigLocale(const std::string &localeStr)
413 {
414 auto errCode = readers_.ConfigLocale(localeStr);
415 if (errCode != E_OK) {
416 return errCode;
417 }
418 return writers_.ConfigLocale(localeStr);
419 }
420
421 /**
422 * Rename the backed up database.
423 */
ChangeDbFileForRestore(const std::string & newPath,const std::string & backupPath,const std::vector<uint8_t> & newKey,SlaveStatus & slaveStatus)424 int ConnPool::ChangeDbFileForRestore(const std::string &newPath, const std::string &backupPath,
425 const std::vector<uint8_t> &newKey, SlaveStatus &slaveStatus)
426 {
427 if (!writers_.IsFull() || config_.GetPath() == backupPath || newPath == backupPath) {
428 LOG_ERROR("Connection pool is busy now!");
429 return E_ERROR;
430 }
431 if (config_.GetDBType() == DB_VECTOR) {
432 CloseAllConnections();
433 auto [retVal, connection] = CreateTransConn();
434
435 if (connection == nullptr) {
436 LOG_ERROR("Get null connection.");
437 return retVal;
438 }
439 retVal = connection->Restore(backupPath, {}, slaveStatus);
440 if (retVal != E_OK) {
441 LOG_ERROR("RdDbRestore error.");
442 return retVal;
443 }
444 CloseAllConnections();
445 auto [errCode, node] = Init();
446 return errCode;
447 }
448 return RestoreByDbSqliteType(newPath, backupPath, slaveStatus);
449 }
450
RestoreByDbSqliteType(const std::string & newPath,const std::string & backupPath,SlaveStatus & slaveStatus)451 int ConnPool::RestoreByDbSqliteType(const std::string &newPath, const std::string &backupPath, SlaveStatus &slaveStatus)
452 {
453 if (SqliteUtils::IsSlaveDbName(backupPath) && config_.GetHaMode() != HAMode::SINGLE) {
454 auto connection = AcquireConnection(false);
455 if (connection == nullptr) {
456 return E_DATABASE_BUSY;
457 }
458 return connection->Restore(backupPath, {}, slaveStatus);
459 }
460
461 return RestoreMasterDb(newPath, backupPath);
462 }
463
RestoreMasterDb(const std::string & newPath,const std::string & backupPath)464 int ConnPool::RestoreMasterDb(const std::string &newPath, const std::string &backupPath)
465 {
466 if (!CheckIntegrity(backupPath)) {
467 LOG_ERROR("backup file is corrupted, %{public}s", SqliteUtils::Anonymous(backupPath).c_str());
468 return E_SQLITE_CORRUPT;
469 }
470 SqliteUtils::DeleteFile(backupPath + "-shm");
471 SqliteUtils::DeleteFile(backupPath + "-wal");
472
473 CloseAllConnections();
474 Connection::Delete(config_);
475
476 if (config_.GetPath() != newPath) {
477 RdbStoreConfig config(newPath);
478 config.SetPath(newPath);
479 Connection::Delete(config);
480 }
481
482 bool copyRet = SqliteUtils::CopyFile(backupPath, newPath);
483 int32_t errCode = E_OK;
484 std::shared_ptr<Connection> pool;
485 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
486 std::tie(errCode, pool) = Init();
487 if (errCode == E_OK) {
488 break;
489 }
490 if (errCode != E_SQLITE_CORRUPT || !config_.IsEncrypt()) {
491 break;
492 }
493 config_.SetIter(ITER_V1);
494 }
495 if (errCode != E_OK) {
496 CloseAllConnections();
497 Connection::Delete(config_);
498 std::tie(errCode, pool) = Init();
499 LOG_WARN("restore failed! rebuild res:%{public}d, path:%{public}s.", errCode,
500 SqliteUtils::Anonymous(backupPath).c_str());
501 }
502 return copyRet ? errCode : E_ERROR;
503 }
504
GetTransactionStack()505 std::stack<BaseTransaction> &ConnPool::GetTransactionStack()
506 {
507 return transactionStack_;
508 }
509
GetTransactionStackMutex()510 std::mutex &ConnPool::GetTransactionStackMutex()
511 {
512 return transactionStackMutex_;
513 }
514
DisableWal()515 std::pair<int, std::shared_ptr<Conn>> ConnPool::DisableWal()
516 {
517 return Init(true, true);
518 }
519
EnableWal()520 int ConnPool::EnableWal()
521 {
522 auto [errCode, node] = Init();
523 return errCode;
524 }
525
Dump(bool isWriter,const char * header)526 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
527 {
528 Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
529 container->Dump(header, transCount_ + isInTransaction_);
530 return E_OK;
531 }
532
ConnNode(std::shared_ptr<Conn> conn)533 ConnPool::ConnNode::ConnNode(std::shared_ptr<Conn> conn) : connect_(std::move(conn))
534 {
535 }
536
GetConnect()537 std::shared_ptr<Conn> ConnPool::ConnNode::GetConnect()
538 {
539 tid_ = gettid();
540 time_ = steady_clock::now();
541 return connect_;
542 }
543
GetUsingTime() const544 int64_t ConnPool::ConnNode::GetUsingTime() const
545 {
546 auto time = steady_clock::now() - time_;
547 return duration_cast<milliseconds>(time).count();
548 }
549
Unused(int32_t count,bool timeout)550 int32_t ConnPool::ConnNode::Unused(int32_t count, bool timeout)
551 {
552 time_ = steady_clock::now();
553 if (connect_ == nullptr) {
554 return E_OK;
555 }
556
557 connect_->ClearCache();
558 int32_t errCode = E_INNER_WARNING;
559 if (count <= 0) {
560 errCode = connect_->TryCheckPoint(timeout);
561 }
562
563 time_ = steady_clock::now();
564 if (!connect_->IsWriter()) {
565 tid_ = 0;
566 }
567 return errCode;
568 }
569
IsWriter() const570 bool ConnPool::ConnNode::IsWriter() const
571 {
572 if (connect_ != nullptr) {
573 return connect_->IsWriter();
574 }
575 return false;
576 }
577
InitMembers(Creator creator,int32_t max,int32_t timeout,bool disable)578 void ConnPool::Container::InitMembers(Creator creator, int32_t max, int32_t timeout, bool disable)
579 {
580 std::unique_lock<decltype(mutex_)> lock(mutex_);
581 disable_ = disable;
582 max_ = max;
583 creator_ = creator;
584 timeout_ = std::chrono::seconds(timeout);
585 }
586
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)587 std::pair<int32_t, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Initialize(Creator creator, int32_t max,
588 int32_t timeout, bool disable, bool acquire)
589 {
590 InitMembers(creator, max, timeout, disable);
591 std::shared_ptr<ConnNode> connNode = nullptr;
592 {
593 std::unique_lock<decltype(mutex_)> lock(mutex_);
594 disable_ = disable;
595 max_ = max;
596 creator_ = creator;
597 timeout_ = std::chrono::seconds(timeout);
598 for (int i = 0; i < max_; ++i) {
599 auto errCode = ExtendNode();
600 if (errCode != E_OK) {
601 nodes_.clear();
602 details_.clear();
603 return { errCode, nullptr };
604 }
605 }
606
607 if (acquire && count_ > 0) {
608 connNode = nodes_.back();
609 nodes_.pop_back();
610 count_--;
611 }
612 }
613 cond_.notify_all();
614 return { E_OK, connNode };
615 }
616
ConfigLocale(const std::string & locale)617 int32_t ConnPool::Container::ConfigLocale(const std::string &locale)
618 {
619 std::unique_lock<decltype(mutex_)> lock(mutex_);
620 if (total_ != count_) {
621 return E_DATABASE_BUSY;
622 }
623 for (auto it = details_.begin(); it != details_.end();) {
624 auto conn = it->lock();
625 if (conn == nullptr || conn->connect_ == nullptr) {
626 it = details_.erase(it);
627 continue;
628 }
629 conn->connect_->ConfigLocale(locale);
630 }
631 return E_OK;
632 }
633
Acquire(std::chrono::milliseconds milliS)634 std::pair<int, std::shared_ptr<ConnPool::ConnNode>> ConnPool::Container::Acquire(std::chrono::milliseconds milliS)
635 {
636 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
637 std::unique_lock<decltype(mutex_)> lock(mutex_);
638 if (max_ == 0) {
639 return {E_ERROR, nullptr};
640 }
641 int errCode = E_OK;
642 auto waiter = [this, &errCode]() -> bool {
643 if (count_ > 0) {
644 return true;
645 }
646
647 if (disable_) {
648 return false;
649 }
650 errCode = ExtendNode();
651 return errCode == E_OK;
652 };
653 if (cond_.wait_for(lock, interval, waiter)) {
654 if (nodes_.empty()) {
655 LOG_ERROR(
656 "nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
657 count_, max_, total_, left_, right_);
658 count_ = 0;
659 return {E_ERROR, nullptr};
660 }
661 auto node = nodes_.back();
662 nodes_.pop_back();
663 count_--;
664 return {E_OK, node};
665 }
666 return {errCode, nullptr};
667 }
668
ExtendNode()669 int32_t ConnPool::Container::ExtendNode()
670 {
671 if (creator_ == nullptr) {
672 return E_ERROR;
673 }
674 auto [errCode, conn] = creator_();
675 if (conn == nullptr) {
676 return errCode;
677 }
678 auto node = std::make_shared<ConnNode>(conn);
679 node->id_ = right_++;
680 conn->SetId(node->id_);
681 nodes_.push_back(node);
682 details_.push_back(node);
683 count_++;
684 total_++;
685 return E_OK;
686 }
687
AcquireAll(std::chrono::milliseconds milliS)688 std::pair<bool, std::list<std::shared_ptr<ConnPool::ConnNode>>> ConnPool::Container::AcquireAll(
689 std::chrono::milliseconds milliS)
690 {
691 std::list<std::shared_ptr<ConnNode>> nodes;
692 int32_t count = 0;
693 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
694 auto time = std::chrono::steady_clock::now() + interval;
695 std::unique_lock<decltype(mutex_)> lock(mutex_);
696 while (count < total_ && cond_.wait_until(lock, time, [this]() {
697 return count_ > 0;
698 })) {
699 nodes.merge(std::move(nodes_));
700 nodes_.clear();
701 count += count_;
702 count_ = 0;
703 }
704
705 if (count != total_) {
706 count_ = count;
707 nodes_ = std::move(nodes);
708 nodes.clear();
709 return {false, nodes};
710 }
711 auto func = [](const std::list<std::shared_ptr<ConnNode>> &nodes) -> bool {
712 for (auto &node : nodes) {
713 if (node->connect_ == nullptr) {
714 continue;
715 }
716 if (node->connect_.use_count() != 1) {
717 return false;
718 }
719 }
720 return true;
721 };
722 bool failed = false;
723 while (failed = !func(nodes), failed && cond_.wait_until(lock, time) != std::cv_status::timeout) {
724 }
725 if (failed) {
726 count_ = count;
727 nodes_ = std::move(nodes);
728 nodes.clear();
729 }
730 return {!failed, nodes};
731 }
732
Disable()733 void ConnPool::Container::Disable()
734 {
735 disable_ = true;
736 cond_.notify_one();
737 }
738
Enable()739 void ConnPool::Container::Enable()
740 {
741 disable_ = false;
742 cond_.notify_one();
743 }
744
Release(std::shared_ptr<ConnNode> node)745 int32_t ConnPool::Container::Release(std::shared_ptr<ConnNode> node)
746 {
747 {
748 std::unique_lock<decltype(mutex_)> lock(mutex_);
749 if (node->id_ < left_ || node->id_ >= right_) {
750 return E_OK;
751 }
752 if (count_ == max_) {
753 total_ = total_ > count_ ? total_ - 1 : count_;
754 RelDetails(node);
755 } else {
756 nodes_.push_front(node);
757 count_++;
758 }
759 }
760 cond_.notify_one();
761 return E_OK;
762 }
763
ReleaseTrans(std::shared_ptr<ConnNode> node)764 int32_t ConnectionPool::Container::ReleaseTrans(std::shared_ptr<ConnNode> node)
765 {
766 {
767 std::unique_lock<decltype(mutex_)> lock(mutex_);
768 if (node->id_ < left_ || node->id_ >= right_) {
769 return E_OK;
770 }
771 if (node->IsRecyclable()) {
772 nodes_.push_back(node);
773 count_++;
774 } else {
775 total_--;
776 RelDetails(node);
777 }
778 }
779 cond_.notify_one();
780 return E_OK;
781 }
782
RelDetails(std::shared_ptr<ConnNode> node)783 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
784 {
785 for (auto it = details_.begin(); it != details_.end();) {
786 auto detailNode = it->lock();
787 if (detailNode == nullptr || detailNode->id_ == node->id_) {
788 it = details_.erase(it);
789 } else {
790 it++;
791 }
792 }
793 return E_OK;
794 }
795
CheckIntegrity(const std::string & dbPath)796 bool ConnectionPool::CheckIntegrity(const std::string &dbPath)
797 {
798 RdbStoreConfig config(config_);
799 config.SetPath(dbPath);
800 config.SetIntegrityCheck(IntegrityCheck::FULL);
801 config.SetHaMode(HAMode::SINGLE);
802 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
803 auto [ret, connection] = Connection::Create(config, true);
804 if (ret == E_OK) {
805 return true;
806 }
807 if (ret != E_SQLITE_CORRUPT || !config.IsEncrypt()) {
808 break;
809 }
810 config.SetIter(ITER_V1);
811 }
812 return false;
813 }
814
Clear()815 int32_t ConnPool::Container::Clear()
816 {
817 std::list<std::shared_ptr<ConnNode>> nodes;
818 std::list<std::weak_ptr<ConnNode>> details;
819 {
820 std::unique_lock<decltype(mutex_)> lock(mutex_);
821 nodes = std::move(nodes_);
822 details = std::move(details_);
823 disable_ = true;
824 total_ = 0;
825 count_ = 0;
826 if (right_ > MAX_RIGHT) {
827 right_ = 0;
828 }
829 left_ = right_;
830 creator_ = nullptr;
831 }
832 nodes.clear();
833 details.clear();
834 return 0;
835 }
836
IsFull()837 bool ConnPool::Container::IsFull()
838 {
839 std::unique_lock<decltype(mutex_)> lock(mutex_);
840 return total_ == count_;
841 }
842
Empty()843 bool ConnPool::Container::Empty()
844 {
845 std::unique_lock<decltype(mutex_)> lock(mutex_);
846 return total_ == 0;
847 }
848
Dump(const char * header,int32_t count)849 int32_t ConnPool::Container::Dump(const char *header, int32_t count)
850 {
851 std::string info;
852 std::vector<std::shared_ptr<ConnNode>> details;
853 std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
854 std::to_string(total_) + "," + std::to_string(count_) + "]";
855 {
856 std::unique_lock<decltype(mutex_)> lock(mutex_);
857 details.reserve(details_.size());
858 for (auto &detail : details_) {
859 auto node = detail.lock();
860 if (node == nullptr) {
861 continue;
862 }
863 details.push_back(node);
864 }
865 }
866
867 for (auto &node : details) {
868 info.append("<")
869 .append(std::to_string(node->id_))
870 .append(",")
871 .append(std::to_string(node->tid_))
872 .append(",")
873 .append(std::to_string(node->GetUsingTime()))
874 .append(">");
875 // 256 represent that limit to info length
876 if (info.size() > 256) {
877 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
878 info.clear();
879 }
880 }
881 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
882 return 0;
883 }
884
ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)885 int32_t ConnectionPool::Container::ClearUnusedTrans(std::shared_ptr<ConnectionPool> pool)
886 {
887 std::unique_lock<decltype(mutex_)> lock(mutex_);
888 int transCount = total_;
889 for (auto it = nodes_.begin(); it != nodes_.end();) {
890 auto unusedDuration = std::chrono::steady_clock::now() - (*it)->time_;
891 if (unusedDuration < TRANS_CLEAR_INTERVAL) {
892 it++;
893 continue;
894 }
895 RelDetails(*it);
896 it = nodes_.erase(it);
897 total_--;
898 count_--;
899 }
900 if (total_ != 0) {
901 pool->DelayClearTrans();
902 }
903 LOG_INFO("%{public}d have been cleaned up, and there are %{public}d remaining to be cleaned up",
904 transCount - total_, total_);
905 return E_OK;
906 }
907
IsRecyclable()908 bool ConnPool::ConnNode::IsRecyclable()
909 {
910 return connect_->IsRecyclable();
911 }
912 } // namespace NativeRdb
913 } // namespace OHOS
914