1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "KvSyncManager"
16 #include "kvstore_sync_manager.h"
17 #include "log_print.h"
18
19 namespace OHOS {
20 namespace DistributedKv {
KvStoreSyncManager()21 KvStoreSyncManager::KvStoreSyncManager() {}
~KvStoreSyncManager()22 KvStoreSyncManager::~KvStoreSyncManager() {}
23
AddSyncOperation(uintptr_t syncId,uint32_t delayMs,const SyncFunc & syncFunc,const SyncEnd & syncEnd)24 Status KvStoreSyncManager::AddSyncOperation(uintptr_t syncId, uint32_t delayMs, const SyncFunc &syncFunc,
25 const SyncEnd &syncEnd)
26 {
27 if (syncId == 0 || syncFunc == nullptr) {
28 return Status::INVALID_ARGUMENT;
29 }
30 uint32_t opSeq = ++syncOpSeq_;
31 SyncEnd endFunc;
32 if (syncEnd != nullptr) {
33 endFunc = [opSeq, delayMs, syncEnd, this](const std::map<std::string, DistributedDB::DBStatus> &devices) {
34 RemoveSyncingOp(opSeq, (delayMs == 0) ? realtimeSyncingOps_ : delaySyncingOps_);
35 syncEnd(devices);
36 };
37 }
38
39 auto beginTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(delayMs);
40 KvSyncOperation syncOp{ syncId, opSeq, delayMs, syncFunc, endFunc, beginTime };
41 if (delayMs == 0) {
42 if (endFunc != nullptr) {
43 std::lock_guard<std::mutex> lock(syncOpsMutex_);
44 realtimeSyncingOps_.push_back(syncOp);
45 }
46 return syncFunc(endFunc);
47 }
48
49 std::lock_guard<std::mutex> lock(syncOpsMutex_);
50 scheduleSyncOps_.emplace(beginTime, syncOp);
51 ZLOGD("add op %u delay %u count %zu.", opSeq, delayMs, scheduleSyncOps_.size());
52 if ((scheduleSyncOps_.size() == 1) ||
53 (nextScheduleTime_ > beginTime + std::chrono::milliseconds(GetExpireTimeRange(delayMs)))) {
54 AddTimer(beginTime);
55 }
56 return Status::SUCCESS;
57 }
58
GetExpireTimeRange(uint32_t delayMs) const59 uint32_t KvStoreSyncManager::GetExpireTimeRange(uint32_t delayMs) const
60 {
61 uint32_t range = delayMs / DELAY_TIME_RANGE_DIVISOR;
62 return std::max(range, SYNC_MIN_DELAY_MS >> 1);
63 }
64
RemoveSyncOperation(uintptr_t syncId)65 Status KvStoreSyncManager::RemoveSyncOperation(uintptr_t syncId)
66 {
67 auto pred = [syncId](const KvSyncOperation &op) -> bool { return syncId == op.syncId; };
68
69 std::lock_guard<std::mutex> lock(syncOpsMutex_);
70 uint32_t count = DoRemoveSyncingOp(pred, realtimeSyncingOps_);
71 count += DoRemoveSyncingOp(pred, delaySyncingOps_);
72
73 auto &syncOps = scheduleSyncOps_;
74 for (auto it = syncOps.begin(); it != syncOps.end();) {
75 if (pred(it->second)) {
76 count++;
77 it = syncOps.erase(it);
78 } else {
79 ++it;
80 }
81 }
82 return (count > 0) ? Status::SUCCESS : Status::ERROR;
83 }
84
DoRemoveSyncingOp(OpPred pred,std::list<KvSyncOperation> & syncingOps)85 uint32_t KvStoreSyncManager::DoRemoveSyncingOp(OpPred pred, std::list<KvSyncOperation> &syncingOps)
86 {
87 uint32_t count = 0;
88 for (auto it = syncingOps.begin(); it != syncingOps.end();) {
89 if (pred(*it)) {
90 count++;
91 it = syncingOps.erase(it);
92 } else {
93 ++it;
94 }
95 }
96 return count;
97 }
98
RemoveSyncingOp(uint32_t opSeq,std::list<KvSyncOperation> & syncingOps)99 Status KvStoreSyncManager::RemoveSyncingOp(uint32_t opSeq, std::list<KvSyncOperation> &syncingOps)
100 {
101 auto pred = [opSeq](const KvSyncOperation &op) -> bool { return opSeq == op.opSeq; };
102
103 ZLOGD("remove op %u", opSeq);
104 std::lock_guard<std::mutex> lock(syncOpsMutex_);
105 uint32_t count = DoRemoveSyncingOp(pred, syncingOps);
106 return (count == 1) ? Status::SUCCESS : Status::ERROR;
107 }
108
AddTimer(const TimePoint & expireTime)109 void KvStoreSyncManager::AddTimer(const TimePoint &expireTime)
110 {
111 ZLOGD("time %lld", expireTime.time_since_epoch().count());
112 nextScheduleTime_ = expireTime;
113 executors_->Schedule(
114 expireTime - std::chrono::steady_clock::now(),
115 [time = expireTime, this]() {
116 Schedule(time);
117 });
118 }
119
GetTimeoutSyncOps(const TimePoint & currentTime,std::list<KvSyncOperation> & syncOps)120 bool KvStoreSyncManager::GetTimeoutSyncOps(const TimePoint ¤tTime, std::list<KvSyncOperation> &syncOps)
121 {
122 std::lock_guard<std::mutex> lock(syncOpsMutex_);
123 if ((!realtimeSyncingOps_.empty()) && (!scheduleSyncOps_.empty())) {
124 // the last processing time is less than priorSyncingTime
125 auto priorSyncingTime = std::chrono::milliseconds(REALTIME_PRIOR_SYNCING_MS);
126 if (currentTime < realtimeSyncingOps_.rbegin()->beginTime + priorSyncingTime) {
127 return true;
128 }
129 }
130 for (auto it = scheduleSyncOps_.begin(); it != scheduleSyncOps_.end();) {
131 const auto &expireTime = it->first;
132 const auto &op = it->second;
133 // currentTime is earlier than expireTime minus delayMs
134 if (currentTime + std::chrono::milliseconds(GetExpireTimeRange(op.delayMs)) < expireTime) {
135 break;
136 }
137
138 syncOps.push_back(op);
139 if (op.syncEnd != nullptr) {
140 delaySyncingOps_.push_back(op);
141 }
142 it = scheduleSyncOps_.erase(it);
143 }
144 return false;
145 }
146
DoCheckSyncingTimeout(std::list<KvSyncOperation> & syncingOps)147 void KvStoreSyncManager::DoCheckSyncingTimeout(std::list<KvSyncOperation> &syncingOps)
148 {
149 auto syncingTimeoutPred = [](const KvSyncOperation &op) -> bool {
150 return op.beginTime + std::chrono::milliseconds(SYNCING_TIMEOUT_MS) < std::chrono::steady_clock::now();
151 };
152
153 uint32_t count = DoRemoveSyncingOp(syncingTimeoutPred, syncingOps);
154 if (count > 0) {
155 ZLOGI("remove %u syncing ops by timeout", count);
156 }
157 }
158
Schedule(const TimePoint & time)159 void KvStoreSyncManager::Schedule(const TimePoint &time)
160 {
161 ZLOGD("timeout %lld", time.time_since_epoch().count());
162 std::list<KvSyncOperation> syncOps;
163 bool delaySchedule = GetTimeoutSyncOps(time, syncOps);
164
165 for (const auto &op : syncOps) {
166 op.syncFunc(op.syncEnd);
167 }
168
169 std::lock_guard<std::mutex> lock(syncOpsMutex_);
170 DoCheckSyncingTimeout(realtimeSyncingOps_);
171 DoCheckSyncingTimeout(delaySyncingOps_);
172 if (!scheduleSyncOps_.empty()) {
173 auto nextTime = scheduleSyncOps_.begin()->first;
174 if (delaySchedule) {
175 nextTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(SYNC_MIN_DELAY_MS);
176 }
177 AddTimer(nextTime);
178 }
179 }
SetThreadPool(std::shared_ptr<ExecutorPool> executors)180 void KvStoreSyncManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
181 {
182 executors_ = executors;
183 }
184 } // namespace DistributedKv
185 } // namespace OHOS
186