• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "KvSyncManager"
16 #include "kvstore_sync_manager.h"
17 #include "log_print.h"
18 
19 namespace OHOS {
20 namespace DistributedKv {
KvStoreSyncManager()21 KvStoreSyncManager::KvStoreSyncManager() : syncScheduler_("SyncMgr")
22 {}
23 
~KvStoreSyncManager()24 KvStoreSyncManager::~KvStoreSyncManager() {}
25 
AddSyncOperation(uintptr_t syncId,uint32_t delayMs,const SyncFunc & syncFunc,const SyncEnd & syncEnd)26 Status KvStoreSyncManager::AddSyncOperation(uintptr_t syncId, uint32_t delayMs, const SyncFunc &syncFunc,
27                                             const SyncEnd &syncEnd)
28 {
29     if (syncId == 0 || syncFunc == nullptr) {
30         return Status::INVALID_ARGUMENT;
31     }
32     uint32_t opSeq = ++syncOpSeq_;
33     SyncEnd endFunc;
34     if (syncEnd != nullptr) {
35         endFunc = [opSeq, delayMs, syncEnd, this](const std::map<std::string, DistributedDB::DBStatus> &devices) {
36             RemoveSyncingOp(opSeq, (delayMs == 0) ? realtimeSyncingOps_ : delaySyncingOps_);
37             syncEnd(devices);
38         };
39     }
40 
41     auto beginTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(delayMs);
42     KvSyncOperation syncOp{ syncId, opSeq, delayMs, syncFunc, endFunc, beginTime };
43     if (delayMs == 0) {
44         if (endFunc != nullptr) {
45             std::lock_guard<std::mutex> lock(syncOpsMutex_);
46             realtimeSyncingOps_.push_back(syncOp);
47         }
48         return syncFunc(endFunc);
49     }
50 
51     std::lock_guard<std::mutex> lock(syncOpsMutex_);
52     scheduleSyncOps_.emplace(beginTime, syncOp);
53     ZLOGD("add op %u delay %u count %zu.", opSeq, delayMs, scheduleSyncOps_.size());
54     if ((scheduleSyncOps_.size() == 1) ||
55         (nextScheduleTime_ > beginTime + std::chrono::milliseconds(GetExpireTimeRange(delayMs)))) {
56         AddTimer(beginTime);
57     }
58     return Status::SUCCESS;
59 }
60 
GetExpireTimeRange(uint32_t delayMs) const61 uint32_t KvStoreSyncManager::GetExpireTimeRange(uint32_t delayMs) const
62 {
63     uint32_t range = delayMs / DELAY_TIME_RANGE_DIVISOR;
64     return std::max(range, SYNC_MIN_DELAY_MS >> 1);
65 }
66 
RemoveSyncOperation(uintptr_t syncId)67 Status KvStoreSyncManager::RemoveSyncOperation(uintptr_t syncId)
68 {
69     auto pred = [syncId](const KvSyncOperation &op) -> bool { return syncId == op.syncId; };
70 
71     std::lock_guard<std::mutex> lock(syncOpsMutex_);
72     uint32_t count = DoRemoveSyncingOp(pred, realtimeSyncingOps_);
73     count += DoRemoveSyncingOp(pred, delaySyncingOps_);
74 
75     auto &syncOps = scheduleSyncOps_;
76     for (auto it = syncOps.begin(); it != syncOps.end();) {
77         if (pred(it->second)) {
78             count++;
79             it = syncOps.erase(it);
80         } else {
81             ++it;
82         }
83     }
84     return (count > 0) ? Status::SUCCESS : Status::ERROR;
85 }
86 
DoRemoveSyncingOp(OpPred pred,std::list<KvSyncOperation> & syncingOps)87 uint32_t KvStoreSyncManager::DoRemoveSyncingOp(OpPred pred, std::list<KvSyncOperation> &syncingOps)
88 {
89     uint32_t count = 0;
90     for (auto it = syncingOps.begin(); it != syncingOps.end();) {
91         if (pred(*it)) {
92             count++;
93             it = syncingOps.erase(it);
94         } else {
95             ++it;
96         }
97     }
98     return count;
99 }
100 
RemoveSyncingOp(uint32_t opSeq,std::list<KvSyncOperation> & syncingOps)101 Status KvStoreSyncManager::RemoveSyncingOp(uint32_t opSeq, std::list<KvSyncOperation> &syncingOps)
102 {
103     auto pred = [opSeq](const KvSyncOperation &op) -> bool { return opSeq == op.opSeq; };
104 
105     ZLOGD("remove op %u", opSeq);
106     std::lock_guard<std::mutex> lock(syncOpsMutex_);
107     uint32_t count = DoRemoveSyncingOp(pred, syncingOps);
108     return (count == 1) ? Status::SUCCESS : Status::ERROR;
109 }
110 
AddTimer(const TimePoint & expireTime)111 void KvStoreSyncManager::AddTimer(const TimePoint &expireTime)
112 {
113     ZLOGD("time %lld", expireTime.time_since_epoch().count());
114     nextScheduleTime_ = expireTime;
115     syncScheduler_.At(expireTime, [time = expireTime, this]() { Schedule(time); });
116 }
117 
GetTimeoutSyncOps(const TimePoint & currentTime,std::list<KvSyncOperation> & syncOps)118 bool KvStoreSyncManager::GetTimeoutSyncOps(const TimePoint &currentTime, std::list<KvSyncOperation> &syncOps)
119 {
120     std::lock_guard<std::mutex> lock(syncOpsMutex_);
121     if ((!realtimeSyncingOps_.empty()) && (!scheduleSyncOps_.empty())) {
122         // the last processing time is less than priorSyncingTime
123         auto priorSyncingTime = std::chrono::milliseconds(REALTIME_PRIOR_SYNCING_MS);
124         if (currentTime < realtimeSyncingOps_.rbegin()->beginTime + priorSyncingTime) {
125             return true;
126         }
127     }
128     for (auto it = scheduleSyncOps_.begin(); it != scheduleSyncOps_.end();) {
129         const auto &expireTime = it->first;
130         const auto &op = it->second;
131         // currentTime is earlier than expireTime minus delayMs
132         if (currentTime + std::chrono::milliseconds(GetExpireTimeRange(op.delayMs)) < expireTime) {
133             break;
134         }
135 
136         syncOps.push_back(op);
137         if (op.syncEnd != nullptr) {
138             delaySyncingOps_.push_back(op);
139         }
140         it = scheduleSyncOps_.erase(it);
141     }
142     return false;
143 }
144 
DoCheckSyncingTimeout(std::list<KvSyncOperation> & syncingOps)145 void KvStoreSyncManager::DoCheckSyncingTimeout(std::list<KvSyncOperation> &syncingOps)
146 {
147     auto syncingTimeoutPred = [](const KvSyncOperation &op) -> bool {
148         return op.beginTime + std::chrono::milliseconds(SYNCING_TIMEOUT_MS) < std::chrono::steady_clock::now();
149     };
150 
151     uint32_t count = DoRemoveSyncingOp(syncingTimeoutPred, syncingOps);
152     if (count > 0) {
153         ZLOGI("remove %u syncing ops by timeout", count);
154     }
155 }
156 
Schedule(const TimePoint & time)157 void KvStoreSyncManager::Schedule(const TimePoint &time)
158 {
159     ZLOGD("timeout %lld", time.time_since_epoch().count());
160     std::list<KvSyncOperation> syncOps;
161     bool delaySchedule = GetTimeoutSyncOps(time, syncOps);
162 
163     for (const auto &op : syncOps) {
164         op.syncFunc(op.syncEnd);
165     }
166 
167     std::lock_guard<std::mutex> lock(syncOpsMutex_);
168     DoCheckSyncingTimeout(realtimeSyncingOps_);
169     DoCheckSyncingTimeout(delaySyncingOps_);
170     if (!scheduleSyncOps_.empty()) {
171         auto nextTime = scheduleSyncOps_.begin()->first;
172         if (delaySchedule) {
173             nextTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(SYNC_MIN_DELAY_MS);
174         }
175         AddTimer(nextTime);
176     }
177 }
178 }  // namespace DistributedKv
179 }  // namespace OHOS
180