1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "AutoCache"
16 #include "store/auto_cache.h"
17 #include "utils/anonymous.h"
18
19 #include "log_print.h"
20 namespace OHOS::DistributedData {
GetInstance()21 AutoCache &AutoCache::GetInstance()
22 {
23 static AutoCache cache;
24 return cache;
25 }
26
RegCreator(int32_t type,Creator creator)27 int32_t AutoCache::RegCreator(int32_t type, Creator creator)
28 {
29 if (type >= MAX_CREATOR_NUM) {
30 return E_ERROR;
31 }
32 creators_[type] = creator;
33 return 0;
34 }
35
Bind(std::shared_ptr<Executor> executor)36 void AutoCache::Bind(std::shared_ptr<Executor> executor)
37 {
38 if (executor == nullptr || taskId_ != Executor::INVALID_TASK_ID) {
39 return;
40 }
41 executor_ = executor;
42 taskId_ = executor_->Schedule(std::bind(&AutoCache::GarbageCollect, this, false), std::chrono::minutes(INTERVAL),
43 std::chrono::minutes(INTERVAL));
44 }
45
AutoCache()46 AutoCache::AutoCache() {}
47
~AutoCache()48 AutoCache::~AutoCache()
49 {
50 GarbageCollect(true);
51 if (executor_ != nullptr) {
52 executor_->Remove(taskId_, true);
53 }
54 }
55
GetStore(const StoreMetaData & meta,const Watchers & watchers)56 AutoCache::Store AutoCache::GetStore(const StoreMetaData &meta, const Watchers &watchers)
57 {
58 Store store;
59 if (meta.storeType >= MAX_CREATOR_NUM || meta.storeType < 0 || !creators_[meta.storeType]) {
60 return store;
61 }
62
63 stores_.Compute(meta.tokenId,
64 [this, &meta, &watchers, &store](auto &, std::map<std::string, Delegate> &stores) -> bool {
65 auto it = stores.find(meta.storeId);
66 if (it != stores.end()) {
67 if (!watchers.empty()) {
68 it->second.SetObservers(watchers);
69 }
70 store = it->second;
71 return !stores.empty();
72 }
73 auto *dbStore = creators_[meta.storeType](meta);
74 if (dbStore == nullptr) {
75 return !stores.empty();
76 }
77 auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(meta.storeId),
78 std::forward_as_tuple(dbStore, watchers, atoi(meta.user.c_str())));
79 store = result.first->second;
80 return !stores.empty();
81 });
82 return store;
83 }
84
CloseStore(uint32_t tokenId,const std::string & storeId)85 void AutoCache::CloseStore(uint32_t tokenId, const std::string &storeId)
86 {
87 stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, Delegate> &delegates) {
88 auto it = delegates.find(storeId);
89 if (it != delegates.end()) {
90 it->second.Close();
91 delegates.erase(it);
92 }
93 return !delegates.empty();
94 });
95 }
96
CloseStore(uint32_t tokenId)97 void AutoCache::CloseStore(uint32_t tokenId)
98 {
99 stores_.Erase(tokenId);
100 }
101
CloseExcept(const std::set<int32_t> & users)102 void AutoCache::CloseExcept(const std::set<int32_t> &users)
103 {
104 stores_.EraseIf([&users](const auto &tokenId, std::map<std::string, Delegate> &delegates) {
105 if (delegates.empty() || users.count(delegates.begin()->second.GetUser()) != 0) {
106 return delegates.empty();
107 }
108
109 for (auto it = delegates.begin(); it != delegates.end();) {
110 // if the kv store is BUSY we wait more INTERVAL minutes again
111 if (!it->second.Close()) {
112 ++it;
113 } else {
114 it = delegates.erase(it);
115 }
116 }
117 return delegates.empty();
118 });
119 }
120
SetObserver(uint32_t tokenId,const std::string & storeId,const AutoCache::Watchers & watchers)121 void AutoCache::SetObserver(uint32_t tokenId, const std::string &storeId, const AutoCache::Watchers &watchers)
122 {
123 stores_.ComputeIfPresent(tokenId, [&storeId, &watchers](auto &key, auto &stores) {
124 ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, Anonymous::Change(storeId).c_str(),
125 watchers.size());
126 auto it = stores.find(storeId);
127 if (it != stores.end()) {
128 it->second.SetObservers(watchers);
129 }
130 return true;
131 });
132 }
133
GarbageCollect(bool isForce)134 void AutoCache::GarbageCollect(bool isForce)
135 {
136 auto current = std::chrono::steady_clock::now();
137 stores_.EraseIf([¤t, isForce](auto &key, std::map<std::string, Delegate> &delegates) {
138 for (auto it = delegates.begin(); it != delegates.end();) {
139 // if the kv store is BUSY we wait more INTERVAL minutes again
140 if ((isForce || it->second < current) && it->second.Close()) {
141 it = delegates.erase(it);
142 } else {
143 ++it;
144 }
145 }
146 return delegates.empty();
147 });
148 }
149
Delegate(GeneralStore * delegate,const Watchers & watchers,int32_t user)150 AutoCache::Delegate::Delegate(GeneralStore *delegate, const Watchers &watchers, int32_t user)
151 : store_(delegate), watchers_(watchers), user_(user)
152 {
153 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
154 if (store_ != nullptr) {
155 store_->Watch(Origin::ORIGIN_ALL, *this);
156 }
157 }
158
~Delegate()159 AutoCache::Delegate::~Delegate()
160 {
161 if (store_ != nullptr) {
162 store_->Unwatch(Origin::ORIGIN_ALL, *this);
163 store_->Close();
164 store_->Release();
165 store_ = nullptr;
166 }
167 }
168
operator Store()169 AutoCache::Delegate::operator Store()
170 {
171 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
172 if (store_ != nullptr) {
173 store_->AddRef();
174 return Store(store_, [](GeneralStore *store) { store->Release(); });
175 }
176 return nullptr;
177 }
178
operator <(const AutoCache::Time & time) const179 bool AutoCache::Delegate::operator<(const AutoCache::Time &time) const
180 {
181 return time_ < time;
182 }
183
Close()184 bool AutoCache::Delegate::Close()
185 {
186 std::unique_lock<decltype(mutex_)> lock(mutex_);
187 if (store_ != nullptr) {
188 store_->Unwatch(Origin::ORIGIN_ALL, *this);
189 auto status = store_->Close();
190 if (status == Error::E_BUSY) {
191 return false;
192 }
193 }
194 return true;
195 }
196
SetObservers(const AutoCache::Watchers & watchers)197 void AutoCache::Delegate::SetObservers(const AutoCache::Watchers &watchers)
198 {
199 std::unique_lock<decltype(mutex_)> lock(mutex_);
200 watchers_ = watchers;
201 }
202
GetUser() const203 int32_t AutoCache::Delegate::GetUser() const
204 {
205 return user_;
206 }
207
OnChange(const Origin & origin,const PRIFields & primaryFields,ChangeInfo && values)208 int32_t AutoCache::Delegate::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values)
209 {
210 Watchers watchers;
211 {
212 std::unique_lock<decltype(mutex_)> lock(mutex_);
213 watchers = watchers_;
214 }
215 size_t remain = watchers.size();
216 for (auto &watcher : watchers) {
217 remain--;
218 if (watcher == nullptr) {
219 continue;
220 }
221 watcher->OnChange(origin, primaryFields, (remain != 0) ? ChangeInfo(values) : std::move(values));
222 }
223 return Error::E_OK;
224 }
225 } // namespace OHOS::DistributedData
226