• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "AutoCache"
16 #include <cinttypes>
17 #include "utils/anonymous.h"
18 #include "store/auto_cache.h"
19 
20 #include "log_print.h"
21 namespace OHOS::DistributedData {
GetInstance()22 AutoCache &AutoCache::GetInstance()
23 {
24     static AutoCache cache;
25     return cache;
26 }
27 
RegCreator(int32_t type,Creator creator)28 int32_t AutoCache::RegCreator(int32_t type, Creator creator)
29 {
30     if (type >= MAX_CREATOR_NUM) {
31         return E_ERROR;
32     }
33     creators_[type] = creator;
34     return 0;
35 }
36 
Bind(std::shared_ptr<Executor> executor)37 void AutoCache::Bind(std::shared_ptr<Executor> executor)
38 {
39     if (executor == nullptr || taskId_ != Executor::INVALID_TASK_ID) {
40         return;
41     }
42     executor_ = executor;
43 }
44 
AutoCache()45 AutoCache::AutoCache() {}
46 
~AutoCache()47 AutoCache::~AutoCache()
48 {
49     GarbageCollect(true);
50     if (executor_ != nullptr) {
51         executor_->Remove(taskId_, true);
52     }
53 }
54 
GetStore(const StoreMetaData & meta,const Watchers & watchers)55 AutoCache::Store AutoCache::GetStore(const StoreMetaData &meta, const Watchers &watchers)
56 {
57     Store store;
58     if (meta.storeType >= MAX_CREATOR_NUM || meta.storeType < 0 || !creators_[meta.storeType]) {
59         return store;
60     }
61 
62     stores_.Compute(meta.tokenId,
63         [this, &meta, &watchers, &store](auto &, std::map<std::string, Delegate> &stores) -> bool {
64             auto it = stores.find(meta.storeId);
65             if (it != stores.end()) {
66                 if (!watchers.empty()) {
67                     it->second.SetObservers(watchers);
68                 }
69                 store = it->second;
70                 return !stores.empty();
71             }
72             auto *dbStore = creators_[meta.storeType](meta);
73             if (dbStore == nullptr) {
74                 ZLOGE("creator failed. storeName:%{public}s", meta.GetStoreAlias().c_str());
75                 return !stores.empty();
76             }
77             auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(meta.storeId),
78                 std::forward_as_tuple(dbStore, watchers, atoi(meta.user.c_str())));
79             store = result.first->second;
80             StartTimer();
81             return !stores.empty();
82         });
83     return store;
84 }
85 
GetStoresIfPresent(uint32_t tokenId,const std::string & storeName)86 AutoCache::Stores AutoCache::GetStoresIfPresent(uint32_t tokenId, const std::string& storeName)
87 {
88     Stores stores;
89     stores_.ComputeIfPresent(tokenId, [&stores, &storeName](auto&, std::map<std::string, Delegate>& delegates) -> bool {
90         if (storeName.empty()) {
91             for (auto& [_, delegate] : delegates) {
92                 stores.push_back(delegate);
93             }
94         } else {
95             auto it = delegates.find(storeName);
96             if (it != delegates.end()) {
97                 stores.push_back(it->second);
98             }
99         }
100         return !stores.empty();
101     });
102     return stores;
103 }
104 
105 // Should be used within stores_'s thread safe methods
StartTimer()106 void AutoCache::StartTimer()
107 {
108     if (executor_ == nullptr || taskId_ != Executor::INVALID_TASK_ID) {
109         return;
110     }
111     taskId_ = executor_->Schedule(
112         [this]() {
113             GarbageCollect(false);
114             stores_.DoActionIfEmpty([this]() {
115                 if (executor_ == nullptr || taskId_ == Executor::INVALID_TASK_ID) {
116                     return;
117                 }
118                 executor_->Remove(taskId_);
119                 ZLOGD("remove timer,taskId: %{public}" PRIu64, taskId_);
120                 taskId_ = Executor::INVALID_TASK_ID;
121             });
122         },
123         std::chrono::minutes(INTERVAL), std::chrono::minutes(INTERVAL));
124     ZLOGD("start timer,taskId: %{public}" PRIu64, taskId_);
125 }
126 
CloseStore(uint32_t tokenId,const std::string & storeId)127 void AutoCache::CloseStore(uint32_t tokenId, const std::string &storeId)
128 {
129     stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, Delegate> &delegates) {
130         auto it = delegates.find(storeId);
131         if (it != delegates.end()) {
132             it->second.Close();
133             delegates.erase(it);
134         }
135         return !delegates.empty();
136     });
137 }
138 
CloseStore(uint32_t tokenId)139 void AutoCache::CloseStore(uint32_t tokenId)
140 {
141     stores_.Erase(tokenId);
142 }
143 
CloseExcept(const std::set<int32_t> & users)144 void AutoCache::CloseExcept(const std::set<int32_t> &users)
145 {
146     stores_.EraseIf([&users](const auto &tokenId, std::map<std::string, Delegate> &delegates) {
147         if (delegates.empty() || users.count(delegates.begin()->second.GetUser()) != 0) {
148             return delegates.empty();
149         }
150 
151         for (auto it = delegates.begin(); it != delegates.end();) {
152             // if the kv store is BUSY we wait more INTERVAL minutes again
153             if (!it->second.Close()) {
154                 ++it;
155             } else {
156                 it = delegates.erase(it);
157             }
158         }
159         return delegates.empty();
160     });
161 }
162 
SetObserver(uint32_t tokenId,const std::string & storeId,const AutoCache::Watchers & watchers)163 void AutoCache::SetObserver(uint32_t tokenId, const std::string &storeId, const AutoCache::Watchers &watchers)
164 {
165     stores_.ComputeIfPresent(tokenId, [&storeId, &watchers](auto &key, auto &stores) {
166         ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, Anonymous::Change(storeId).c_str(),
167             watchers.size());
168         auto it = stores.find(storeId);
169         if (it != stores.end()) {
170             it->second.SetObservers(watchers);
171         }
172         return true;
173     });
174 }
175 
GarbageCollect(bool isForce)176 void AutoCache::GarbageCollect(bool isForce)
177 {
178     auto current = std::chrono::steady_clock::now();
179     stores_.EraseIf([&current, isForce](auto &key, std::map<std::string, Delegate> &delegates) {
180         for (auto it = delegates.begin(); it != delegates.end();) {
181             // if the store is BUSY we wait more INTERVAL minutes again
182             if ((isForce || it->second < current) && it->second.Close()) {
183                 it = delegates.erase(it);
184             } else {
185                 ++it;
186             }
187         }
188         return delegates.empty();
189     });
190 }
191 
Delegate(GeneralStore * delegate,const Watchers & watchers,int32_t user)192 AutoCache::Delegate::Delegate(GeneralStore *delegate, const Watchers &watchers, int32_t user)
193     : store_(delegate), watchers_(watchers), user_(user)
194 {
195     time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
196     if (store_ != nullptr) {
197         store_->Watch(Origin::ORIGIN_ALL, *this);
198     }
199 }
200 
~Delegate()201 AutoCache::Delegate::~Delegate()
202 {
203     if (store_ != nullptr) {
204         store_->Unwatch(Origin::ORIGIN_ALL, *this);
205         store_->Close();
206         store_->Release();
207         store_ = nullptr;
208     }
209 }
210 
operator Store()211 AutoCache::Delegate::operator Store()
212 {
213     time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
214     if (store_ != nullptr) {
215         store_->AddRef();
216         return Store(store_, [](GeneralStore *store) { store->Release(); });
217     }
218     return nullptr;
219 }
220 
operator <(const AutoCache::Time & time) const221 bool AutoCache::Delegate::operator<(const AutoCache::Time &time) const
222 {
223     return time_ < time;
224 }
225 
Close()226 bool AutoCache::Delegate::Close()
227 {
228     std::unique_lock<decltype(mutex_)> lock(mutex_);
229     if (store_ != nullptr) {
230         store_->Unwatch(Origin::ORIGIN_ALL, *this);
231         auto status = store_->Close();
232         if (status == Error::E_BUSY) {
233             return false;
234         }
235     }
236     return true;
237 }
238 
SetObservers(const AutoCache::Watchers & watchers)239 void AutoCache::Delegate::SetObservers(const AutoCache::Watchers &watchers)
240 {
241     std::unique_lock<decltype(mutex_)> lock(mutex_);
242     watchers_ = watchers;
243 }
244 
GetUser() const245 int32_t AutoCache::Delegate::GetUser() const
246 {
247     return user_;
248 }
249 
OnChange(const Origin & origin,const PRIFields & primaryFields,ChangeInfo && values)250 int32_t AutoCache::Delegate::OnChange(const Origin &origin, const PRIFields &primaryFields, ChangeInfo &&values)
251 {
252     Watchers watchers;
253     {
254         std::unique_lock<decltype(mutex_)> lock(mutex_);
255         watchers = watchers_;
256     }
257     size_t remain = watchers.size();
258     for (auto &watcher : watchers) {
259         remain--;
260         if (watcher == nullptr) {
261             continue;
262         }
263         watcher->OnChange(origin, primaryFields, (remain != 0) ? ChangeInfo(values) : std::move(values));
264     }
265     return Error::E_OK;
266 }
267 } // namespace OHOS::DistributedData
268