1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "AutoSyncTimer"
16 #include "auto_sync_timer.h"
17
18 #include "kvdb_service_client.h"
19 #include "log_print.h"
20
21 namespace OHOS::DistributedKv {
GetInstance()22 AutoSyncTimer &AutoSyncTimer::GetInstance()
23 {
24 static AutoSyncTimer instance;
25 return instance;
26 }
27
StartTimer()28 void AutoSyncTimer::StartTimer()
29 {
30 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
31 if (forceSyncTaskId_ == TaskScheduler::INVALID_TASK_ID) {
32 auto expiredTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(FORCE_SYNC_INTERVAL);
33 forceSyncTaskId_ = scheduler_.At(expiredTime, ProcessTask());
34 }
35 if (delaySyncTaskId_ == TaskScheduler::INVALID_TASK_ID) {
36 auto expiredTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(AUTO_SYNC_INTERVAL);
37 delaySyncTaskId_ = scheduler_.At(expiredTime, ProcessTask());
38 } else {
39 delaySyncTaskId_ = scheduler_.Reset(delaySyncTaskId_, std::chrono::milliseconds(AUTO_SYNC_INTERVAL));
40 }
41 }
42
DoAutoSync(const std::string & appId,std::set<StoreId> storeIds)43 void AutoSyncTimer::DoAutoSync(const std::string &appId, std::set<StoreId> storeIds)
44 {
45 AddSyncStores(appId, std::move(storeIds));
46 StartTimer();
47 }
48
AddSyncStores(const std::string & appId,std::set<StoreId> storeIds)49 void AutoSyncTimer::AddSyncStores(const std::string &appId, std::set<StoreId> storeIds)
50 {
51 stores_.Compute(appId, [&storeIds](const auto &key, std::set<StoreId> &value) {
52 value.merge(std::move(storeIds));
53 return !value.empty();
54 });
55 }
56
HasSyncStores()57 bool AutoSyncTimer::HasSyncStores()
58 {
59 return !stores_.Empty();
60 }
61
GetStoreIds()62 std::map<std::string, std::set<StoreId>> AutoSyncTimer::GetStoreIds()
63 {
64 std::map<std::string, std::set<StoreId>> stores;
65 int count = SYNC_STORE_NUM;
66 stores_.EraseIf([&stores, &count](const std::string &key, std::set<StoreId> &value) {
67 int size = value.size();
68 if (size <= count) {
69 stores.insert({ key, std::move(value) });
70 count = count - size;
71 return true;
72 }
73 auto &innerStore = stores[key];
74 for (auto it = value.begin(); it != value.end() && count > 0;) {
75 innerStore.insert(*it);
76 it = value.erase(it);
77 count--;
78 }
79 return value.empty();
80 });
81 return stores;
82 }
83
ProcessTask()84 std::function<void()> AutoSyncTimer::ProcessTask()
85 {
86 return [this]() {
87 StopTimer();
88 auto service = KVDBServiceClient::GetInstance();
89 if (service == nullptr) {
90 return;
91 }
92
93 auto storeIds = GetStoreIds();
94 for (const auto &id : storeIds) {
95 ZLOGD("DoSync appId:%{public}s store size:%{public}zu", id.first.c_str(), id.second.size());
96 for (const auto &storeId : id.second) {
97 service->Sync({ id.first }, storeId, {});
98 }
99 }
100 if (HasSyncStores()) {
101 StartTimer();
102 }
103 };
104 }
105
StopTimer()106 void AutoSyncTimer::StopTimer()
107 {
108 std::lock_guard<decltype(mutex_)> lockGuard(mutex_);
109 scheduler_.Clean();
110 forceSyncTaskId_ = TaskScheduler::INVALID_TASK_ID;
111 delaySyncTaskId_ = TaskScheduler::INVALID_TASK_ID;
112 }
113 }