1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "GdbConnPool"
16 #include "connection_pool.h"
17
18 #include <unistd.h>
19 #include <utility>
20
21 #include "gdb_errors.h"
22 #include "gdb_utils.h"
23 #include "logger.h"
24 #include "rdb_store_config.h"
25
26 namespace OHOS::DistributedDataAip {
27 using namespace std::chrono;
28
Create(const StoreConfig & config,int & errCode)29 std::shared_ptr<ConnectionPool> ConnectionPool::Create(const StoreConfig &config, int &errCode)
30 {
31 std::shared_ptr<ConnectionPool> pool = std::make_shared<ConnectionPool>(config);
32 if (pool == nullptr) {
33 LOG_ERROR("ConnectionPool::Create new failed, pool is nullptr.");
34 errCode = E_INIT_CONN_POOL_FAILED;
35 return nullptr;
36 }
37 std::shared_ptr<Connection> conn;
38 for (uint32_t retry = 0; retry < ITERS_COUNT; ++retry) {
39 std::tie(errCode, conn) = pool->Init();
40 if (errCode != E_GRD_DATA_CORRUPTED) {
41 break;
42 }
43 config.SetIter(ITER_V1);
44 }
45 return errCode == E_OK ? pool : nullptr;
46 }
47
ConnectionPool(const StoreConfig & storeConfig)48 ConnectionPool::ConnectionPool(const StoreConfig &storeConfig) : config_(storeConfig), writers_(), readers_()
49 {
50 }
51
Init(bool isAttach,bool needWriter)52 std::pair<int32_t, std::shared_ptr<Connection>> ConnectionPool::Init(bool isAttach, bool needWriter)
53 {
54 std::pair<int32_t, std::shared_ptr<Connection>> result;
55 auto &[errCode, conn] = result;
56 config_.GenerateEncryptedKey();
57 // write connect count is 1
58 std::shared_ptr<ConnectionPool::ConnNode> node;
59 std::tie(errCode, node) = writers_.Initialize(
60 [this, isAttach]() { return Connection::Create(config_, true); }, 1, config_.GetWriteTime(), true, needWriter);
61 conn = Convert2AutoConn(node);
62 if (errCode != E_OK) {
63 return result;
64 }
65
66 maxReader_ = GetMaxReaders(config_);
67 LOG_DEBUG("ConnectionPool::Init maxReader=%{public}d", maxReader_);
68 // max read connect count is 64
69 if (maxReader_ > 64) {
70 LOG_ERROR("maxReader is too big. maxReader=%{public}d", maxReader_);
71 return { E_ARGS_READ_CON_OVERLOAD, nullptr };
72 }
73 auto [ret, nodeRead] = readers_.Initialize([this, isAttach]() { return Connection::Create(config_, false); },
74 maxReader_, config_.GetReadTime(), maxReader_ == 0);
75 errCode = ret;
76 return result;
77 }
78
~ConnectionPool()79 ConnectionPool::~ConnectionPool()
80 {
81 LOG_DEBUG("enter");
82 CloseAllConnections();
83 }
84
GetMaxReaders(const StoreConfig & config)85 int32_t ConnectionPool::GetMaxReaders(const StoreConfig &config)
86 {
87 return config.GetReadConSize();
88 }
89
Convert2AutoConn(std::shared_ptr<ConnNode> node,bool isTrans)90 std::shared_ptr<Connection> ConnectionPool::Convert2AutoConn(std::shared_ptr<ConnNode> node, bool isTrans)
91 {
92 if (node == nullptr) {
93 return nullptr;
94 }
95
96 auto conn = node->GetConnect();
97 if (conn == nullptr) {
98 return nullptr;
99 }
100 if (isTrans) {
101 transCount_++;
102 }
103
104 return std::shared_ptr<Connection>(conn.get(), [pool = weak_from_this(), node, isTrans](auto *) mutable {
105 auto realPool = pool.lock();
106 if (realPool == nullptr) {
107 return;
108 }
109 realPool->ReleaseNode(node, !isTrans);
110 if (isTrans) {
111 realPool->transCount_--;
112 }
113 node = nullptr;
114 });
115 }
116
CloseAllConnections()117 void ConnectionPool::CloseAllConnections()
118 {
119 writers_.Clear();
120 readers_.Clear();
121 }
122
CreateTransConn()123 std::pair<int32_t, std::shared_ptr<Connection>> ConnectionPool::CreateTransConn()
124 {
125 if (transCount_ >= MAX_TRANS) {
126 writers_.Dump("NO TRANS", transCount_ + isInTransaction_);
127 return { E_DATABASE_BUSY, nullptr };
128 }
129 auto [errCode, node] = writers_.Create();
130 return { errCode, Convert2AutoConn(node, true) };
131 }
132
Acquire(bool isReadOnly,std::chrono::milliseconds ms)133 std::shared_ptr<Connection> ConnectionPool::Acquire(bool isReadOnly, std::chrono::milliseconds ms)
134 {
135 Container *container = (isReadOnly && maxReader_ != 0) ? &readers_ : &writers_;
136 auto node = container->Acquire(ms);
137 if (node == nullptr) {
138 const char *header = (isReadOnly && maxReader_ != 0) ? "readers_" : "writers_";
139 container->Dump(header, transCount_ + isInTransaction_);
140 return nullptr;
141 }
142 return Convert2AutoConn(node);
143 }
144
AcquireRef(bool isReadOnly,std::chrono::milliseconds ms)145 std::shared_ptr<Connection> ConnectionPool::AcquireRef(bool isReadOnly, std::chrono::milliseconds ms)
146 {
147 if (maxReader_ != 0) {
148 return Acquire(isReadOnly, ms);
149 }
150 auto node = writers_.Acquire(ms);
151 if (node == nullptr) {
152 writers_.Dump("writers_", transCount_ + isInTransaction_);
153 return nullptr;
154 }
155 auto conn = node->connect_;
156 writers_.Release(node);
157 return {conn.get(), [pool = weak_from_this(), conn](Connection *) {
158 auto realPool = pool.lock();
159 if (realPool == nullptr) {
160 return;
161 }
162 realPool->writers_.cond_.notify_all();
163 }};
164 }
165
ReleaseNode(std::shared_ptr<ConnNode> node,bool reuse)166 void ConnectionPool::ReleaseNode(std::shared_ptr<ConnNode> node, bool reuse)
167 {
168 if (node == nullptr) {
169 return;
170 }
171 auto now = steady_clock::now();
172 auto timeout = now > (failedTime_.load() + minutes(CHECK_POINT_INTERVAL)) || now < failedTime_.load() ||
173 failedTime_.load() == steady_clock::time_point();
174 auto transCount = transCount_ + isInTransaction_;
175 auto remainCount = reuse ? transCount : transCount - 1;
176 auto errCode = node->Unused(remainCount, timeout);
177 if (errCode == E_DATABASE_BUSY) {
178 writers_.Dump("WAL writers_", transCount);
179 readers_.Dump("WAL readers_", transCount);
180 }
181 LOG_DEBUG(
182 "ConnectionPool::ReleaseNode reuse=%{public}d,timeout=%{public}d,remainCount=%{public}d,isWriter=%{public}d",
183 reuse, timeout, remainCount, node->IsWriter());
184
185 if (node->IsWriter() && errCode != E_NOT_SUPPORT) {
186 failedTime_ = errCode != E_OK ? now : steady_clock::time_point();
187 }
188
189 auto &container = node->IsWriter() ? writers_ : readers_;
190 if (reuse) {
191 container.Release(node);
192 } else {
193 container.Drop(node);
194 }
195 }
196
RestartReaders()197 int ConnectionPool::RestartReaders()
198 {
199 readers_.Clear();
200 auto [errCode, node] = readers_.Initialize(
201 [this]() { return Connection::Create(config_, false); }, maxReader_, config_.GetReadTime(), maxReader_ == 0);
202 return errCode;
203 }
204
Dump(bool isWriter,const char * header)205 int32_t ConnectionPool::Dump(bool isWriter, const char *header)
206 {
207 Container *container = (isWriter || maxReader_ == 0) ? &writers_ : &readers_;
208 container->Dump(header, transCount_ + isInTransaction_);
209 return E_OK;
210 }
211
ConnNode(std::shared_ptr<Connection> conn)212 ConnectionPool::ConnNode::ConnNode(std::shared_ptr<Connection> conn) : connect_(std::move(conn))
213 {
214 }
215
GetConnect()216 std::shared_ptr<Connection> ConnectionPool::ConnNode::GetConnect()
217 {
218 tid_ = gettid();
219 time_ = steady_clock::now();
220 return connect_;
221 }
222
GetUsingTime() const223 int64_t ConnectionPool::ConnNode::GetUsingTime() const
224 {
225 auto time = steady_clock::now() - time_;
226 return duration_cast<milliseconds>(time).count();
227 }
228
Unused(int32_t count,bool timeout)229 int32_t ConnectionPool::ConnNode::Unused(int32_t count, bool timeout)
230 {
231 time_ = steady_clock::now();
232 if (connect_ == nullptr) {
233 return E_OK;
234 }
235 time_ = steady_clock::now();
236 if (!connect_->IsWriter()) {
237 tid_ = 0;
238 }
239 return E_OK;
240 }
241
IsWriter() const242 bool ConnectionPool::ConnNode::IsWriter() const
243 {
244 if (connect_ != nullptr) {
245 return connect_->IsWriter();
246 }
247 return false;
248 }
249
Initialize(Creator creator,int32_t max,int32_t timeout,bool disable,bool acquire)250 std::pair<int32_t, std::shared_ptr<ConnectionPool::ConnNode>> ConnectionPool::Container::Initialize(
251 Creator creator, int32_t max, int32_t timeout, bool disable, bool acquire)
252 {
253 std::shared_ptr<ConnNode> connNode = nullptr;
254 {
255 std::unique_lock<decltype(mutex_)> lock(mutex_);
256 disable_ = disable;
257 max_ = max;
258 creator_ = std::move(creator);
259 timeout_ = std::chrono::seconds(timeout);
260 for (int i = 0; i < max; ++i) {
261 auto errCode = ExtendNode();
262 if (errCode != E_OK) {
263 nodes_.clear();
264 details_.clear();
265 return { errCode, nullptr };
266 }
267 }
268
269 if (acquire && count_ > 0) {
270 connNode = nodes_.back();
271 nodes_.pop_back();
272 count_--;
273 }
274 }
275 cond_.notify_all();
276 return { E_OK, connNode };
277 }
278
Acquire(std::chrono::milliseconds milliS)279 std::shared_ptr<ConnectionPool::ConnNode> ConnectionPool::Container::Acquire(std::chrono::milliseconds milliS)
280 {
281 auto interval = (milliS == INVALID_TIME) ? timeout_ : milliS;
282 std::unique_lock<decltype(mutex_)> lock(mutex_);
283 LOG_DEBUG("count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d", count_, max_, total_,
284 left_, right_);
285 if (max_ == 0) {
286 return nullptr;
287 }
288 auto waiter = [this]() -> bool {
289 if (count_ > 0) {
290 return true;
291 }
292
293 if (disable_) {
294 return false;
295 }
296 return ExtendNode() == E_OK;
297 };
298 if (cond_.wait_for(lock, interval, waiter)) {
299 if (nodes_.empty()) {
300 LOG_ERROR("nodes is empty.count %{public}d max %{public}d total %{public}d left %{public}d right%{public}d",
301 count_, max_, total_, left_, right_);
302 count_ = 0;
303 return nullptr;
304 }
305 auto node = nodes_.back();
306 nodes_.pop_back();
307 count_--;
308 return node;
309 }
310 return nullptr;
311 }
312
Create()313 std::pair<int32_t, std::shared_ptr<ConnectionPool::ConnNode>> ConnectionPool::Container::Create()
314 {
315 std::unique_lock<decltype(mutex_)> lock(mutex_);
316 if (creator_ == nullptr) {
317 return { E_NOT_SUPPORT, nullptr };
318 }
319
320 auto [errCode, conn] = creator_();
321 if (conn == nullptr) {
322 return { errCode, nullptr };
323 }
324
325 auto node = std::make_shared<ConnNode>(conn);
326 if (node == nullptr) {
327 return { E_ERROR, nullptr };
328 }
329 node->id_ = MIN_TRANS_ID + trans_;
330 conn->SetId(node->id_);
331 details_.push_back(node);
332 trans_++;
333 return { E_OK, node };
334 }
335
ExtendNode()336 int32_t ConnectionPool::Container::ExtendNode()
337 {
338 if (creator_ == nullptr) {
339 return E_ERROR;
340 }
341 auto [errCode, conn] = creator_();
342 if (conn == nullptr) {
343 return errCode;
344 }
345 auto node = std::make_shared<ConnNode>(conn);
346 node->id_ = right_++;
347 conn->SetId(node->id_);
348 nodes_.push_back(node);
349 details_.push_back(node);
350 count_++;
351 total_++;
352 return E_OK;
353 }
354
Disable()355 void ConnectionPool::Container::Disable()
356 {
357 disable_ = true;
358 cond_.notify_one();
359 }
360
Enable()361 void ConnectionPool::Container::Enable()
362 {
363 disable_ = false;
364 cond_.notify_one();
365 }
366
Release(std::shared_ptr<ConnNode> node)367 int32_t ConnectionPool::Container::Release(std::shared_ptr<ConnNode> node)
368 {
369 {
370 std::unique_lock<decltype(mutex_)> lock(mutex_);
371 if (node->id_ < left_ || node->id_ >= right_) {
372 return E_OK;
373 }
374 if (count_ == max_) {
375 total_ = total_ > count_ ? total_ - 1 : count_;
376 RelDetails(node);
377 } else {
378 nodes_.push_front(node);
379 count_++;
380 }
381 }
382 cond_.notify_one();
383 return E_OK;
384 }
385
Drop(std::shared_ptr<ConnNode> node)386 int32_t ConnectionPool::Container::Drop(std::shared_ptr<ConnNode> node)
387 {
388 {
389 std::unique_lock<decltype(mutex_)> lock(mutex_);
390 RelDetails(node);
391 }
392 cond_.notify_one();
393 return E_OK;
394 }
395
RelDetails(std::shared_ptr<ConnNode> node)396 int32_t ConnectionPool::Container::RelDetails(std::shared_ptr<ConnNode> node)
397 {
398 for (auto it = details_.begin(); it != details_.end();) {
399 auto detailNode = it->lock();
400 if (detailNode == nullptr || detailNode->id_ == node->id_) {
401 it = details_.erase(it);
402 } else {
403 it++;
404 }
405 }
406 return E_OK;
407 }
408
Clear()409 int32_t ConnectionPool::Container::Clear()
410 {
411 std::list<std::shared_ptr<ConnNode>> nodes;
412 std::list<std::weak_ptr<ConnNode>> details;
413 {
414 std::unique_lock<decltype(mutex_)> lock(mutex_);
415 nodes = std::move(nodes_);
416 details = std::move(details_);
417 disable_ = true;
418 total_ = 0;
419 count_ = 0;
420 if (right_ > MAX_RIGHT) {
421 right_ = 0;
422 }
423 left_ = right_;
424 creator_ = nullptr;
425 }
426 nodes.clear();
427 details.clear();
428 LOG_DEBUG(
429 "Container::Clear success count=%{public}d, max=%{public}d, total=%{public}d, left=%{public}d, "
430 "right=%{public}d", count_, max_, total_, left_, right_);
431 return 0;
432 }
433
IsFull()434 bool ConnectionPool::Container::IsFull()
435 {
436 std::unique_lock<decltype(mutex_)> lock(mutex_);
437 return total_ == count_;
438 }
439
Dump(const char * header,int32_t count)440 int32_t ConnectionPool::Container::Dump(const char *header, int32_t count)
441 {
442 std::string info;
443 std::vector<std::shared_ptr<ConnNode>> details;
444 std::string title = "B_M_T_C[" + std::to_string(count) + "," + std::to_string(max_) + "," +
445 std::to_string(total_) + "," + std::to_string(count_) + "]";
446 {
447 std::unique_lock<decltype(mutex_)> lock(mutex_);
448 details.reserve(details_.size());
449 for (auto &detail : details_) {
450 auto node = detail.lock();
451 if (node == nullptr) {
452 continue;
453 }
454 details.push_back(node);
455 }
456 }
457
458 for (auto &node : details) {
459 info.append("<")
460 .append(std::to_string(node->id_))
461 .append(",")
462 .append(std::to_string(node->tid_))
463 .append(",")
464 .append(std::to_string(node->GetUsingTime()))
465 .append(">");
466 // 256 represent that limit to info length
467 if (info.size() > 256) {
468 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
469 info.clear();
470 }
471 }
472 LOG_WARN("%{public}s %{public}s:%{public}s", header, title.c_str(), info.c_str());
473 return 0;
474 }
475 } // namespace OHOS::DistributedDataAip