1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "StoreCache"
16 #include "store_cache.h"
17
18 #include "account/account_delegate.h"
19 #include "crypto_manager.h"
20 #include "device_matrix.h"
21 #include "directory/directory_manager.h"
22 #include "log_print.h"
23 #include "metadata/meta_data_manager.h"
24 #include "metadata/secret_key_meta_data.h"
25 #include "types.h"
26 #include "utils/anonymous.h"
27 namespace OHOS::DistributedKv {
28 using namespace OHOS::DistributedData;
29 constexpr int64_t StoreCache::INTERVAL;
GetStore(const StoreMetaData & data,std::shared_ptr<Observers> observers,DBStatus & status)30 StoreCache::Store StoreCache::GetStore(const StoreMetaData &data, std::shared_ptr<Observers> observers,
31 DBStatus &status)
32 {
33 Store store = nullptr;
34 status = DBStatus::NOT_FOUND;
35 stores_.Compute(data.tokenId, [&](const auto &key, std::map<std::string, DBStoreDelegate> &stores) {
36 auto it = stores.find(data.storeId);
37 if (it != stores.end()) {
38 it->second.SetObservers(observers);
39 store = it->second;
40 return true;
41 }
42
43 DBStore *dbStore = nullptr;
44 DBManager manager(data.appId, data.user, data.instanceId);
45 manager.SetKvStoreConfig({ DirectoryManager::GetInstance().GetStorePath(data) });
46 manager.GetKvStore(data.storeId, GetDBOption(data, GetDBPassword(data)),
47 [&status, &dbStore](auto dbStatus, auto *tmpStore) {
48 status = dbStatus;
49 dbStore = tmpStore;
50 });
51
52 if (dbStore == nullptr) {
53 return !stores.empty();
54 }
55
56 if (data.isAutoSync) {
57 auto code = DeviceMatrix::GetInstance().GetCode(data);
58 dbStore->SetRemotePushFinishedNotify([code](const DistributedDB::RemotePushNotifyInfo &info) {
59 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, code, true);
60 });
61 }
62
63 auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(data.storeId),
64 std::forward_as_tuple(dbStore, observers));
65 store = result.first->second;
66 return !stores.empty();
67 });
68
69 executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
70 return store;
71 }
72
CloseStore(uint32_t tokenId,const std::string & storeId)73 void StoreCache::CloseStore(uint32_t tokenId, const std::string &storeId)
74 {
75 stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
76 DBManager manager("", "");
77 auto it = delegates.find(storeId);
78 if (it != delegates.end()) {
79 it->second.Close(manager);
80 delegates.erase(it);
81 }
82 return !delegates.empty();
83 });
84 }
85
CloseExcept(const std::set<int32_t> & users)86 void StoreCache::CloseExcept(const std::set<int32_t> &users)
87 {
88 DBManager manager("", "");
89 stores_.EraseIf([&manager, &users](const auto &tokenId, std::map<std::string, DBStoreDelegate> &delegates) {
90 auto userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
91 if (users.count(userId) != 0) {
92 return delegates.empty();
93 }
94 for (auto it = delegates.begin(); it != delegates.end();) {
95 // if the kv store is BUSY we wait more INTERVAL minutes again
96 if (!it->second.Close(manager)) {
97 ++it;
98 } else {
99 it = delegates.erase(it);
100 }
101 }
102 return delegates.empty();
103 });
104 }
105
SetObserver(uint32_t tokenId,const std::string & storeId,std::shared_ptr<Observers> observers)106 void StoreCache::SetObserver(uint32_t tokenId, const std::string &storeId, std::shared_ptr<Observers> observers)
107 {
108 stores_.ComputeIfPresent(tokenId, [&storeId, &observers](auto &key, auto &stores) {
109 ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, Anonymous::Change(storeId).c_str(),
110 observers ? observers->size() : size_t(0));
111 auto it = stores.find(storeId);
112 if (it != stores.end()) {
113 it->second.SetObservers(observers);
114 }
115 return true;
116 });
117 }
118
GarbageCollect()119 void StoreCache::GarbageCollect()
120 {
121 DBManager manager("", "");
122 auto current = std::chrono::steady_clock::now();
123 stores_.EraseIf([&manager, ¤t](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
124 for (auto it = delegates.begin(); it != delegates.end();) {
125 // if the kv store is BUSY we wait more INTERVAL minutes again
126 if ((it->second < current) && it->second.Close(manager)) {
127 it = delegates.erase(it);
128 } else {
129 ++it;
130 }
131 }
132 return delegates.empty();
133 });
134 if (!stores_.Empty()) {
135 ZLOGD("stores size:%{public}zu", stores_.Size());
136 executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
137 }
138 }
139
GetDBOption(const StoreMetaData & data,const DBPassword & password)140 StoreCache::DBOption StoreCache::GetDBOption(const StoreMetaData &data, const DBPassword &password)
141 {
142 DBOption dbOption;
143 dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
144 dbOption.createIfNecessary = false;
145 dbOption.isMemoryDb = false;
146 dbOption.isEncryptedDb = data.isEncrypt;
147 if (data.isEncrypt) {
148 dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
149 dbOption.passwd = password;
150 }
151
152 if (data.storeType == KvStoreType::SINGLE_VERSION) {
153 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
154 } else if (data.storeType == KvStoreType::DEVICE_COLLABORATION) {
155 dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
156 }
157
158 dbOption.schema = data.schema;
159 dbOption.createDirByStoreIdOnly = true;
160 dbOption.secOption = GetDBSecurity(data.securityLevel);
161 return dbOption;
162 }
163
GetDBSecurity(int32_t secLevel)164 StoreCache::DBSecurity StoreCache::GetDBSecurity(int32_t secLevel)
165 {
166 if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
167 return { DistributedDB::NOT_SET, DistributedDB::ECE };
168 }
169 if (secLevel == SecurityLevel::S3) {
170 return { DistributedDB::S3, DistributedDB::SECE };
171 }
172 if (secLevel == SecurityLevel::S4) {
173 return { DistributedDB::S4, DistributedDB::ECE };
174 }
175 return { secLevel, DistributedDB::ECE };
176 }
177
GetDBPassword(const StoreMetaData & data)178 StoreCache::DBPassword StoreCache::GetDBPassword(const StoreMetaData &data)
179 {
180 DBPassword dbPassword;
181 if (!data.isEncrypt) {
182 return dbPassword;
183 }
184
185 SecretKeyMetaData secretKey;
186 secretKey.storeType = data.storeType;
187 auto storeKey = data.GetSecretKey();
188 MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
189 std::vector<uint8_t> password;
190 CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
191 dbPassword.SetValue(password.data(), password.size());
192 password.assign(password.size(), 0);
193 return dbPassword;
194 }
195
SetThreadPool(std::shared_ptr<ExecutorPool> executors)196 void StoreCache::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
197 {
198 executors_ = executors;
199 }
200
DBStoreDelegate(DBStore * delegate,std::shared_ptr<Observers> observers)201 StoreCache::DBStoreDelegate::DBStoreDelegate(DBStore *delegate, std::shared_ptr<Observers> observers)
202 : time_(std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL)), delegate_(delegate)
203 {
204 SetObservers(std::move(observers));
205 }
206
~DBStoreDelegate()207 StoreCache::DBStoreDelegate::~DBStoreDelegate()
208 {
209 if (delegate_ != nullptr) {
210 delegate_->UnRegisterObserver(this);
211 }
212 DBManager manager("", "");
213 manager.CloseKvStore(delegate_);
214 delegate_ = nullptr;
215 }
216
operator std::shared_ptr<DBStore>()217 StoreCache::DBStoreDelegate::operator std::shared_ptr<DBStore> ()
218 {
219 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
220 mutex_.lock_shared();
221 if (delegate_ == nullptr) {
222 mutex_.unlock_shared();
223 return nullptr;
224 }
225 return std::shared_ptr<DBStore>(delegate_, [this](DBStore *) { mutex_.unlock_shared();});
226 }
227
operator <(const Time & time) const228 bool StoreCache::DBStoreDelegate::operator<(const Time &time) const
229 {
230 return time_ < time;
231 }
232
Close(DBManager & manager)233 bool StoreCache::DBStoreDelegate::Close(DBManager &manager)
234 {
235 std::unique_lock<decltype(mutex_)> lock(mutex_);
236 if (delegate_ != nullptr) {
237 delegate_->UnRegisterObserver(this);
238 auto status = manager.CloseKvStore(delegate_);
239 if (status == DBStatus::BUSY) {
240 return false;
241 }
242 delegate_ = nullptr;
243 }
244 return true;
245 }
246
OnChange(const DistributedDB::KvStoreChangedData & data)247 void StoreCache::DBStoreDelegate::OnChange(const DistributedDB::KvStoreChangedData &data)
248 {
249 if (observers_ == nullptr || delegate_ == nullptr) {
250 ZLOGE("already closed");
251 return;
252 }
253
254 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
255 auto observers = observers_;
256 std::vector<uint8_t> key;
257 auto inserts = Convert(data.GetEntriesInserted());
258 auto updates = Convert(data.GetEntriesUpdated());
259 auto deletes = Convert(data.GetEntriesDeleted());
260 ZLOGD("C:%{public}zu U:%{public}zu D:%{public}zu storeId:%{public}s", inserts.size(), updates.size(),
261 deletes.size(), delegate_->GetStoreId().c_str());
262 ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
263 for (auto &observer : *observers) {
264 if (observer == nullptr) {
265 continue;
266 }
267 observer->OnChange(change);
268 }
269 }
270
SetObservers(std::shared_ptr<Observers> observers)271 void StoreCache::DBStoreDelegate::SetObservers(std::shared_ptr<Observers> observers)
272 {
273 if (observers_ == observers || delegate_ == nullptr) {
274 return;
275 }
276
277 observers_ = observers;
278
279 if (observers_ != nullptr && !observers_->empty()) {
280 ZLOGD("storeId:%{public}s observers:%{public}zu", delegate_->GetStoreId().c_str(), observers_->size());
281 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, this);
282 }
283 }
284
Convert(const std::list<DBEntry> & dbEntries)285 std::vector<Entry> StoreCache::DBStoreDelegate::Convert(const std::list<DBEntry> &dbEntries)
286 {
287 std::vector<Entry> entries;
288 for (const auto &entry : dbEntries) {
289 Entry tmpEntry;
290 tmpEntry.key = entry.key;
291 tmpEntry.value = entry.value;
292 entries.push_back(tmpEntry);
293 }
294 return entries;
295 }
296 }; // namespace OHOS::DistributedKv
297