• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "GdbConnPool"
16 #include "connection_pool.h"
17 
18 #include <unistd.h>
19 #include <utility>
20 
21 #include "gdb_errors.h"
22 #include "gdb_utils.h"
23 #include "logger.h"
24 #include "rdb_store_config.h"
25 
26 namespace OHOS::DistributedDataAip {
27 using namespace std::chrono;
28 
Create(const StoreConfig & config,int & errCode)29 std::shared_ptr<ConnectionPool> ConnectionPool::Create(const StoreConfig &config, int &errCode)
30 {
31     std::shared_ptr<ConnectionPool> pool = std::make_shared<ConnectionPool>(config);
32     if (pool == nullptr) {
33         LOG_ERROR("ConnectionPool::Create new failed, pool is nullptr.");
34         errCode = E_INIT_CONN_POOL_FAILED;
35         return nullptr;
36     }
37     std::shared_ptr<Connection> conn;
38     for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
39         std::tie(errCode, conn) = pool->Init();
40         if (errCode != E_GRD_DATA_CORRUPTED) {
41             break;
42         }
43         config.SetIter(ITER_V1);
44     }
45     return errCode == E_OK ? pool : nullptr;
46 }
47 
ConnectionPool(const StoreConfig & storeConfig)48 ConnectionPool::ConnectionPool(const StoreConfig &storeConfig) : config_(storeConfig), writers_(), readers_()
49 {
50 }
51 
Init(bool isAttach,bool needWriter)52 std::pair<int32_t, std::shared_ptr<Connection>> ConnectionPool::Init(bool isAttach, bool needWriter)
53 {
54     std::pair<int32_t, std::shared_ptr<Connection>> result;
55     auto &[errCode, conn] = result;
56     config_.GenerateEncryptedKey();
57     // write connect count is 1
58     std::shared_ptr<ConnectionPool::ConnNode> node;
59     std::tie(errCode, node) = writers_.Initialize(
60         [this, isAttach]() { return Connection::Create(config_, true); }, 1, config_.GetWriteTime(), true, needWriter);
61     conn = Convert2AutoConn(node);
62     if (errCode != E_OK) {
63         return result;
64     }
65 
66     maxReader_ = GetMaxReaders(config_);
67     LOG_DEBUG("ConnectionPool::Init maxReader=%{public}d", maxReader_);
68     // max read connect count is 64
69     if (maxReader_ > 64) {
70         LOG_ERROR("maxReader is too big. maxReader=%{public}d", maxReader_);
71         return { E_ARGS_READ_CON_OVERLOAD, nullptr };
72     }
73     auto [ret, nodeRead] = readers_.Initialize([this, isAttach]() { return Connection::Create(config_, false); },
74         maxReader_, config_.GetReadTime(), maxReader_ == 0);
75     errCode = ret;
76     return result;
77 }
78 
~ConnectionPool()79 ConnectionPool::~ConnectionPool()
80 {
81     LOG_DEBUG("enter");
82     CloseAllConnections();
83 }
84 
GetMaxReaders(const StoreConfig & config)85 int32_t ConnectionPool::GetMaxReaders(const StoreConfig &config)
86 {
87     return config.GetReadConSize();
88 }
89 
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)90 std::shared_ptr<Connection> ConnectionPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
91 {
92     if (node == nullptr) {
93         return nullptr;
94     }
95 
96     auto conn = node->GetConnect();
97     if (conn == nullptr) {
98         return nullptr;
99     }
100     if (isTrans) {
101         transCount_++;
102     }
103 
104     return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
105         auto realPool = pool.lock();
106         if (realPool == nullptr) {
107             return;
108         }
109         realPool->ReleaseNode(node, !isTrans);
110         if (isTrans) {
111             realPool->transCount_--;
112         }
113         node = nullptr;
114     });
115 }
116 
CloseAllConnections()117 void ConnectionPool::CloseAllConnections()
118 {
119     writers_.Clear();
120     readers_.Clear();
121 }
122 
CreateTransConn()123 std::pair<int32_t, std::shared_ptr<Connection>> ConnectionPool::CreateTransConn()
124 {
125     if (transCount_ >= MAX_TRANS) {
126         writers_.Dump("NO TRANS", transCount_ + isInTransaction_);
127         return { E_DATABASE_BUSY, nullptr };
128     }
129     auto [errCode, node] = writers_.Create();
130     return { errCode, Convert2AutoConn(node, true) };
131 }
132 
Acquire(bool isReadOnly,std::chrono::milliseconds ms)133 std::shared_ptr<Connection> ConnectionPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
134 {
135     Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
136     auto node = container->Acquire(ms);
137     if (node == nullptr) {
138         const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
139         container->Dump(header, transCount_ + isInTransaction_);
140         return nullptr;
141     }
142     return Convert2AutoConn(node);
143 }
144 
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)145 std::shared_ptr<Connection> ConnectionPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
146 {
147     if (maxReader_ != 0) {
148         return Acquire(isReadOnly, ms);
149     }
150     auto node = writers_.Acquire(ms);
151     if (node == nullptr) {
152         writers_.Dump("writers_", transCount_ + isInTransaction_);
153         return nullptr;
154     }
155     auto conn = node->connect_;
156     writers_.Release(node);
157     return {conn.get(), [pool = weak_from_this(), conn](Connection *) {
158         auto realPool = pool.lock();
159         if (realPool == nullptr) {
160             return;
161         }
162         realPool->writers_.cond_.notify_all();
163     }};
164 }
165 
ReleaseNode(std::shared_ptr<ConnNode> node,bool reuse)166 void ConnectionPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool reuse)
167 {
168     if (node == nullptr) {
169         return;
170     }
171     auto now = steady_clock::now();
172     auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
173                    failedTime_.load() == steady_clock::time_point();
174     auto transCount = transCount_ + isInTransaction_;
175     auto remainCount = reuse ? transCount : transCount - 1;
176     auto errCode = node->Unused(remainCount, timeout);
177     if (errCode == E_DATABASE_BUSY) {
178         writers_.Dump("WAL writers_", transCount);
179         readers_.Dump("WAL readers_", transCount);
180     }
181     LOG_DEBUG(
182         "ConnectionPool::ReleaseNode reuse=%{public}d,timeout=%{public}d,remainCount=%{public}d,isWriter=%{public}d",
183         reuse, timeout, remainCount, node->IsWriter());
184 
185     if (node->IsWriter() && errCode != E_NOT_SUPPORT) {
186         failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
187     }
188 
189     auto &container = node->IsWriter() ? writers_ : readers_;
190     if (reuse) {
191         container.Release(node);
192     } else {
193         container.Drop(node);
194     }
195 }
196 
RestartReaders()197 int ConnectionPool::RestartReaders()
198 {
199     readers_.Clear();
200     auto [errCode, node] = readers_.Initialize(
201         [this]() { return Connection::Create(config_, false); }, maxReader_, config_.GetReadTime(), maxReader_ == 0);
202     return errCode;
203 }
204 
Dump(bool isWriter,const char * header)205 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
206 {
207     Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
208     container->Dump(header, transCount_ + isInTransaction_);
209     return E_OK;
210 }
211 
ConnNode(std::shared_ptr<Connection> conn)212 ConnectionPool::ConnNode::ConnNode(std::shared_ptr<Connection> conn) : connect_(std::move(conn))
213 {
214 }
215 
GetConnect()216 std::shared_ptr<Connection> ConnectionPool::ConnNode::GetConnect()
217 {
218     tid_ = gettid();
219     time_ = steady_clock::now();
220     return connect_;
221 }
222 
GetUsingTime() const223 int64_t ConnectionPool::ConnNode::GetUsingTime() const
224 {
225     auto time = steady_clock::now() - time_;
226     return duration_cast<milliseconds>(time).count();
227 }
228 
Unused(int32_t count,bool timeout)229 int32_t ConnectionPool::ConnNode::Unused(int32_t count, bool timeout)
230 {
231     time_ = steady_clock::now();
232     if (connect_ == nullptr) {
233         return E_OK;
234     }
235     time_ = steady_clock::now();
236     if (!connect_->IsWriter()) {
237         tid_ = 0;
238     }
239     return E_OK;
240 }
241 
IsWriter() const242 bool ConnectionPool::ConnNode::IsWriter() const
243 {
244     if (connect_ != nullptr) {
245         return connect_->IsWriter();
246     }
247     return false;
248 }
249 
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)250 std::pair<int32_t, std::shared_ptr<ConnectionPool::ConnNode>> ConnectionPool::Container::Initialize(
251     Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)
252 {
253     std::shared_ptr<ConnNode> connNode = nullptr;
254     {
255         std::unique_lock<decltype(mutex_)> lock(mutex_);
256         disable_ = disable;
257         max_ = max;
258         creator_ = std::move(creator);
259         timeout_ = std::chrono::seconds(timeout);
260         for (int i = 0; i < max; ++i) {
261             auto errCode = ExtendNode();
262             if (errCode != E_OK) {
263                 nodes_.clear();
264                 details_.clear();
265                 return { errCode, nullptr };
266             }
267         }
268 
269         if (acquire && count_ > 0) {
270             connNode = nodes_.back();
271             nodes_.pop_back();
272             count_--;
273         }
274     }
275     cond_.notify_all();
276     return { E_OK, connNode };
277 }
278 
Acquire(std::chrono::milliseconds milliS)279 std::shared_ptr<ConnectionPool::ConnNode> ConnectionPool::Container::Acquire(std::chrono::milliseconds milliS)
280 {
281     auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
282     std::unique_lock<decltype(mutex_)> lock(mutex_);
283     LOG_DEBUG("count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d", count_, max_, total_,
284         left_, right_);
285     if (max_ == 0) {
286         return nullptr;
287     }
288     auto waiter = [this]() -> bool {
289         if (count_ > 0) {
290             return true;
291         }
292 
293         if (disable_) {
294             return false;
295         }
296         return ExtendNode() == E_OK;
297     };
298     if (cond_.wait_for(lock, interval, waiter)) {
299         if (nodes_.empty()) {
300             LOG_ERROR("nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
301                 count_, max_, total_, left_, right_);
302             count_ = 0;
303             return nullptr;
304         }
305         auto node = nodes_.back();
306         nodes_.pop_back();
307         count_--;
308         return node;
309     }
310     return nullptr;
311 }
312 
Create()313 std::pair<int32_t, std::shared_ptr<ConnectionPool::ConnNode>> ConnectionPool::Container::Create()
314 {
315     std::unique_lock<decltype(mutex_)> lock(mutex_);
316     if (creator_ == nullptr) {
317         return { E_NOT_SUPPORT, nullptr };
318     }
319 
320     auto [errCode, conn] = creator_();
321     if (conn == nullptr) {
322         return { errCode, nullptr };
323     }
324 
325     auto node = std::make_shared<ConnNode>(conn);
326     if (node == nullptr) {
327         return { E_ERROR, nullptr };
328     }
329     node->id_ = MIN_TRANS_ID + trans_;
330     conn->SetId(node->id_);
331     details_.push_back(node);
332     trans_++;
333     return { E_OK, node };
334 }
335 
ExtendNode()336 int32_t ConnectionPool::Container::ExtendNode()
337 {
338     if (creator_ == nullptr) {
339         return E_ERROR;
340     }
341     auto [errCode, conn] = creator_();
342     if (conn == nullptr) {
343         return errCode;
344     }
345     auto node = std::make_shared<ConnNode>(conn);
346     node->id_ = right_++;
347     conn->SetId(node->id_);
348     nodes_.push_back(node);
349     details_.push_back(node);
350     count_++;
351     total_++;
352     return E_OK;
353 }
354 
Disable()355 void ConnectionPool::Container::Disable()
356 {
357     disable_ = true;
358     cond_.notify_one();
359 }
360 
Enable()361 void ConnectionPool::Container::Enable()
362 {
363     disable_ = false;
364     cond_.notify_one();
365 }
366 
Release(std::shared_ptr<ConnNode> node)367 int32_t ConnectionPool::Container::Release(std::shared_ptr<ConnNode> node)
368 {
369     {
370         std::unique_lock<decltype(mutex_)> lock(mutex_);
371         if (node->id_ < left_ || node->id_ >= right_) {
372             return E_OK;
373         }
374         if (count_ == max_) {
375             total_ = total_ > count_ ? total_ - 1 : count_;
376             RelDetails(node);
377         } else {
378             nodes_.push_front(node);
379             count_++;
380         }
381     }
382     cond_.notify_one();
383     return E_OK;
384 }
385 
Drop(std::shared_ptr<ConnNode> node)386 int32_t ConnectionPool::Container::Drop(std::shared_ptr<ConnNode> node)
387 {
388     {
389         std::unique_lock<decltype(mutex_)> lock(mutex_);
390         RelDetails(node);
391     }
392     cond_.notify_one();
393     return E_OK;
394 }
395 
RelDetails(std::shared_ptr<ConnNode> node)396 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
397 {
398     for (auto it = details_.begin(); it != details_.end();) {
399         auto detailNode = it->lock();
400         if (detailNode == nullptr || detailNode->id_ == node->id_) {
401             it = details_.erase(it);
402         } else {
403             it++;
404         }
405     }
406     return E_OK;
407 }
408 
Clear()409 int32_t ConnectionPool::Container::Clear()
410 {
411     std::list<std::shared_ptr<ConnNode>> nodes;
412     std::list<std::weak_ptr<ConnNode>> details;
413     {
414         std::unique_lock<decltype(mutex_)> lock(mutex_);
415         nodes = std::move(nodes_);
416         details = std::move(details_);
417         disable_ = true;
418         total_ = 0;
419         count_ = 0;
420         if (right_ > MAX_RIGHT) {
421             right_ = 0;
422         }
423         left_ = right_;
424         creator_ = nullptr;
425     }
426     nodes.clear();
427     details.clear();
428     LOG_DEBUG(
429         "Container::Clear success count=%{public}d, max=%{public}d, total=%{public}d, left=%{public}d, "
430         "right=%{public}d", count_, max_, total_, left_, right_);
431     return 0;
432 }
433 
IsFull()434 bool ConnectionPool::Container::IsFull()
435 {
436     std::unique_lock<decltype(mutex_)> lock(mutex_);
437     return total_ == count_;
438 }
439 
Dump(const char * header,int32_t count)440 int32_t ConnectionPool::Container::Dump(const char *header, int32_t count)
441 {
442     std::string info;
443     std::vector<std::shared_ptr<ConnNode>> details;
444     std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
445                         std::to_string(total_) + "," + std::to_string(count_) + "]";
446     {
447         std::unique_lock<decltype(mutex_)> lock(mutex_);
448         details.reserve(details_.size());
449         for (auto &detail : details_) {
450             auto node = detail.lock();
451             if (node == nullptr) {
452                 continue;
453             }
454             details.push_back(node);
455         }
456     }
457 
458     for (auto &node : details) {
459         info.append("<")
460             .append(std::to_string(node->id_))
461             .append(",")
462             .append(std::to_string(node->tid_))
463             .append(",")
464             .append(std::to_string(node->GetUsingTime()))
465             .append(">");
466         // 256 represent that limit to info length
467         if (info.size() > 256) {
468             LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
469             info.clear();
470         }
471     }
472     LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
473     return 0;
474 }
475 } // namespace OHOS::DistributedDataAip