1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "AutoCache"
16 #include <cinttypes>
17 #include "utils/anonymous.h"
18 #include "store/auto_cache.h"
19
20 #include "log_print.h"
21 namespace OHOS::DistributedData {
GetInstance()22 AutoCache &AutoCache::GetInstance()
23 {
24 static AutoCache cache;
25 return cache;
26 }
27
RegCreator(int32_t type,Creator creator)28 int32_t AutoCache::RegCreator(int32_t type, Creator creator)
29 {
30 if (type >= MAX_CREATOR_NUM) {
31 return E_ERROR;
32 }
33 creators_[type] = creator;
34 return 0;
35 }
36
Bind(std::shared_ptr<Executor> executor)37 void AutoCache::Bind(std::shared_ptr<Executor> executor)
38 {
39 if (executor == nullptr || taskId_ != Executor::INVALID_TASK_ID) {
40 return;
41 }
42 executor_ = executor;
43 }
44
AutoCache()45 AutoCache::AutoCache() {}
46
~AutoCache()47 AutoCache::~AutoCache()
48 {
49 GarbageCollect(true);
50 if (executor_ != nullptr) {
51 executor_->Remove(taskId_, true);
52 }
53 }
54
GetStore(const StoreMetaData & meta,const Watchers & watchers)55 AutoCache::Store AutoCache::GetStore(const StoreMetaData &meta, const Watchers &watchers)
56 {
57 Store store;
58 if (meta.storeType >= MAX_CREATOR_NUM || meta.storeType < 0 || !creators_[meta.storeType]) {
59 return store;
60 }
61
62 stores_.Compute(meta.tokenId,
63 [this, &meta, &watchers, &store](auto &, std::map<std::string, Delegate> &stores) -> bool {
64 auto it = stores.find(meta.storeId);
65 if (it != stores.end()) {
66 if (!watchers.empty()) {
67 it->second.SetObservers(watchers);
68 }
69 store = it->second;
70 return !stores.empty();
71 }
72 auto *dbStore = creators_[meta.storeType](meta);
73 if (dbStore == nullptr) {
74 ZLOGE("creator failed. storeName:%{public}s", meta.GetStoreAlias().c_str());
75 return !stores.empty();
76 }
77 auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(meta.storeId),
78 std::forward_as_tuple(dbStore, watchers, atoi(meta.user.c_str())));
79 store = result.first->second;
80 StartTimer();
81 return !stores.empty();
82 });
83 return store;
84 }
85
GetStoresIfPresent(uint32_t tokenId,const std::string & storeName)86 AutoCache::Stores AutoCache::GetStoresIfPresent(uint32_t tokenId, const std::string& storeName)
87 {
88 Stores stores;
89 stores_.ComputeIfPresent(tokenId, [&stores, &storeName](auto&, std::map<std::string, Delegate>& delegates) -> bool {
90 if (storeName.empty()) {
91 for (auto& [_, delegate] : delegates) {
92 stores.push_back(delegate);
93 }
94 } else {
95 auto it = delegates.find(storeName);
96 if (it != delegates.end()) {
97 stores.push_back(it->second);
98 }
99 }
100 return !stores.empty();
101 });
102 return stores;
103 }
104
105 // Should be used within stores_'s thread safe methods
StartTimer()106 void AutoCache::StartTimer()
107 {
108 if (executor_ == nullptr || taskId_ != Executor::INVALID_TASK_ID) {
109 return;
110 }
111 taskId_ = executor_->Schedule(
112 [this]() {
113 GarbageCollect(false);
114 stores_.DoActionIfEmpty([this]() {
115 if (executor_ == nullptr || taskId_ == Executor::INVALID_TASK_ID) {
116 return;
117 }
118 executor_->Remove(taskId_);
119 ZLOGD("remove timer,taskId: %{public}" PRIu64, taskId_);
120 taskId_ = Executor::INVALID_TASK_ID;
121 });
122 },
123 std::chrono::minutes(INTERVAL), std::chrono::minutes(INTERVAL));
124 ZLOGD("start timer,taskId: %{public}" PRIu64, taskId_);
125 }
126
CloseStore(uint32_t tokenId,const std::string & storeId)127 void AutoCache::CloseStore(uint32_t tokenId, const std::string &storeId)
128 {
129 stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, Delegate> &delegates) {
130 auto it = delegates.find(storeId);
131 if (it != delegates.end()) {
132 it->second.Close();
133 delegates.erase(it);
134 }
135 return !delegates.empty();
136 });
137 }
138
CloseStore(uint32_t tokenId)139 void AutoCache::CloseStore(uint32_t tokenId)
140 {
141 stores_.Erase(tokenId);
142 }
143
CloseExcept(const std::set<int32_t> & users)144 void AutoCache::CloseExcept(const std::set<int32_t> &users)
145 {
146 stores_.EraseIf([&users](const auto &tokenId, std::map<std::string, Delegate> &delegates) {
147 if (delegates.empty() || users.count(delegates.begin()->second.GetUser()) != 0) {
148 return delegates.empty();
149 }
150
151 for (auto it = delegates.begin(); it != delegates.end();) {
152 // if the kv store is BUSY we wait more INTERVAL minutes again
153 if (!it->second.Close()) {
154 ++it;
155 } else {
156 it = delegates.erase(it);
157 }
158 }
159 return delegates.empty();
160 });
161 }
162
SetObserver(uint32_t tokenId,const std::string & storeId,const AutoCache::Watchers & watchers)163 void AutoCache::SetObserver(uint32_t tokenId, const std::string &storeId, const AutoCache::Watchers &watchers)
164 {
165 stores_.ComputeIfPresent(tokenId, [&storeId, &watchers](auto &key, auto &stores) {
166 ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, Anonymous::Change(storeId).c_str(),
167 watchers.size());
168 auto it = stores.find(storeId);
169 if (it != stores.end()) {
170 it->second.SetObservers(watchers);
171 }
172 return true;
173 });
174 }
175
GarbageCollect(bool isForce)176 void AutoCache::GarbageCollect(bool isForce)
177 {
178 auto current = std::chrono::steady_clock::now();
179 stores_.EraseIf([¤t, isForce](auto &key, std::map<std::string, Delegate> &delegates) {
180 for (auto it = delegates.begin(); it != delegates.end();) {
181 // if the store is BUSY we wait more INTERVAL minutes again
182 if ((isForce || it->second < current) && it->second.Close()) {
183 it = delegates.erase(it);
184 } else {
185 ++it;
186 }
187 }
188 return delegates.empty();
189 });
190 }
191
Delegate(GeneralStore * delegate,const Watchers & watchers,int32_t user)192 AutoCache::Delegate::Delegate(GeneralStore *delegate, const Watchers &watchers, int32_t user)
193 : store_(delegate), watchers_(watchers), user_(user)
194 {
195 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
196 if (store_ != nullptr) {
197 store_->Watch(Origin::ORIGIN_ALL, *this);
198 }
199 }
200
~Delegate()201 AutoCache::Delegate::~Delegate()
202 {
203 if (store_ != nullptr) {
204 store_->Unwatch(Origin::ORIGIN_ALL, *this);
205 store_->Close();
206 store_->Release();
207 store_ = nullptr;
208 }
209 }
210
operator Store()211 AutoCache::Delegate::operator Store()
212 {
213 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
214 if (store_ != nullptr) {
215 store_->AddRef();
216 return Store(store_, [](GeneralStore *store) { store->Release(); });
217 }
218 return nullptr;
219 }
220
operator <(const AutoCache::Time & time) const221 bool AutoCache::Delegate::operator<(const AutoCache::Time &time) const
222 {
223 return time_ < time;
224 }
225
Close()226 bool AutoCache::Delegate::Close()
227 {
228 std::unique_lock<decltype(mutex_)> lock(mutex_);
229 if (store_ != nullptr) {
230 store_->Unwatch(Origin::ORIGIN_ALL, *this);
231 auto status = store_->Close();
232 if (status == Error::E_BUSY) {
233 return false;
234 }
235 }
236 return true;
237 }
238
SetObservers(const AutoCache::Watchers & watchers)239 void AutoCache::Delegate::SetObservers(const AutoCache::Watchers &watchers)
240 {
241 std::unique_lock<decltype(mutex_)> lock(mutex_);
242 watchers_ = watchers;
243 }
244
GetUser() const245 int32_t AutoCache::Delegate::GetUser() const
246 {
247 return user_;
248 }
249
OnChange(const Origin & origin,const PRIFields & primaryFields,ChangeInfo && values)250 int32_t AutoCache::Delegate::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values)
251 {
252 Watchers watchers;
253 {
254 std::unique_lock<decltype(mutex_)> lock(mutex_);
255 watchers = watchers_;
256 }
257 size_t remain = watchers.size();
258 for (auto &watcher : watchers) {
259 remain--;
260 if (watcher == nullptr) {
261 continue;
262 }
263 watcher->OnChange(origin, primaryFields, (remain != 0) ? ChangeInfo(values) : std::move(values));
264 }
265 return Error::E_OK;
266 }
267 } // namespace OHOS::DistributedData
268