1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "StoreCache"
16 #include "store_cache.h"
17 #include "account/account_delegate.h"
18 #include "crypto_manager.h"
19 #include "device_matrix.h"
20 #include "directory_manager.h"
21 #include "log_print.h"
22 #include "metadata/meta_data_manager.h"
23 #include "metadata/secret_key_meta_data.h"
24 #include "types.h"
25 namespace OHOS::DistributedKv {
26 using namespace OHOS::DistributedData;
27 constexpr int64_t StoreCache::INTERVAL;
28 constexpr size_t StoreCache::TIME_TASK_NUM;
GetStore(const StoreMetaData & data,std::shared_ptr<Observers> observers,DBStatus & status)29 StoreCache::Store StoreCache::GetStore(const StoreMetaData &data, std::shared_ptr<Observers> observers,
30 DBStatus &status)
31 {
32 Store store = nullptr;
33 status = DBStatus::NOT_FOUND;
34 stores_.Compute(data.tokenId, [&](const auto &key, std::map<std::string, DBStoreDelegate> &stores) {
35 auto it = stores.find(data.storeId);
36 if (it != stores.end()) {
37 it->second.SetObservers(observers);
38 store = it->second;
39 return true;
40 }
41
42 DBStore *dbStore = nullptr;
43 DBManager manager(data.appId, data.user, data.instanceId);
44 manager.SetKvStoreConfig({ DirectoryManager::GetInstance().GetStorePath(data) });
45 manager.GetKvStore(data.storeId, GetDBOption(data, GetDBPassword(data)),
46 [&status, &dbStore](auto dbStatus, auto *tmpStore) {
47 status = dbStatus;
48 dbStore = tmpStore;
49 });
50
51 if (dbStore == nullptr) {
52 return !stores.empty();
53 }
54
55 if (data.isAutoSync) {
56 auto code = DeviceMatrix::GetInstance().GetCode(data);
57 dbStore->SetRemotePushFinishedNotify([code](const DistributedDB::RemotePushNotifyInfo &info) {
58 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, code, true);
59 });
60 }
61
62 auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(data.storeId),
63 std::forward_as_tuple(dbStore, observers));
64 store = result.first->second;
65 return !stores.empty();
66 });
67
68 scheduler_.At(std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL),
69 std::bind(&StoreCache::GarbageCollect, this));
70
71 return store;
72 }
73
CloseStore(uint32_t tokenId,const std::string & storeId)74 void StoreCache::CloseStore(uint32_t tokenId, const std::string &storeId)
75 {
76 stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
77 DBManager manager("", "");
78 auto it = delegates.find(storeId);
79 if (it != delegates.end()) {
80 it->second.Close(manager);
81 delegates.erase(it);
82 }
83 return !delegates.empty();
84 });
85 }
86
CloseExcept(const std::set<int32_t> & users)87 void StoreCache::CloseExcept(const std::set<int32_t> &users)
88 {
89 DBManager manager("", "");
90 stores_.EraseIf([&manager, &users](const auto &tokenId, std::map<std::string, DBStoreDelegate> &delegates) {
91 auto userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
92 if (users.count(userId) != 0) {
93 return delegates.empty();
94 }
95 for (auto it = delegates.begin(); it != delegates.end();) {
96 // if the kv store is BUSY we wait more INTERVAL minutes again
97 if (!it->second.Close(manager)) {
98 ++it;
99 } else {
100 it = delegates.erase(it);
101 }
102 }
103 return delegates.empty();
104 });
105 }
106
SetObserver(uint32_t tokenId,const std::string & storeId,std::shared_ptr<Observers> observers)107 void StoreCache::SetObserver(uint32_t tokenId, const std::string &storeId, std::shared_ptr<Observers> observers)
108 {
109 stores_.ComputeIfPresent(tokenId, [&storeId, &observers](auto &key, auto &stores) {
110 ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, storeId.c_str(),
111 observers ? observers->size() : size_t(0));
112 auto it = stores.find(storeId);
113 if (it != stores.end()) {
114 it->second.SetObservers(observers);
115 }
116 return true;
117 });
118 }
119
GarbageCollect()120 void StoreCache::GarbageCollect()
121 {
122 DBManager manager("", "");
123 auto current = std::chrono::steady_clock::now();
124 stores_.EraseIf([&manager, ¤t](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
125 for (auto it = delegates.begin(); it != delegates.end();) {
126 // if the kv store is BUSY we wait more INTERVAL minutes again
127 if ((it->second < current) && it->second.Close(manager)) {
128 it = delegates.erase(it);
129 } else {
130 ++it;
131 }
132 }
133 return delegates.empty();
134 });
135
136 if (!stores_.Empty()) {
137 scheduler_.At(current + std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
138 }
139 }
140
GetDBOption(const StoreMetaData & data,const DBPassword & password)141 StoreCache::DBOption StoreCache::GetDBOption(const StoreMetaData &data, const DBPassword &password)
142 {
143 DBOption dbOption;
144 dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
145 dbOption.createIfNecessary = false;
146 dbOption.isMemoryDb = false;
147 dbOption.isEncryptedDb = data.isEncrypt;
148 if (data.isEncrypt) {
149 dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
150 dbOption.passwd = password;
151 }
152
153 if (data.storeType == KvStoreType::SINGLE_VERSION) {
154 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
155 } else if (data.storeType == KvStoreType::DEVICE_COLLABORATION) {
156 dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
157 }
158
159 dbOption.schema = data.schema;
160 dbOption.createDirByStoreIdOnly = true;
161 dbOption.secOption = GetDBSecurity(data.securityLevel);
162 return dbOption;
163 }
164
GetDBSecurity(int32_t secLevel)165 StoreCache::DBSecurity StoreCache::GetDBSecurity(int32_t secLevel)
166 {
167 if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
168 return { DistributedDB::NOT_SET, DistributedDB::ECE };
169 }
170 if (secLevel == SecurityLevel::S3) {
171 return { DistributedDB::S3, DistributedDB::SECE };
172 }
173 if (secLevel == SecurityLevel::S4) {
174 return { DistributedDB::S4, DistributedDB::ECE };
175 }
176 return { secLevel, DistributedDB::ECE };
177 }
178
GetDBPassword(const StoreMetaData & data)179 StoreCache::DBPassword StoreCache::GetDBPassword(const StoreMetaData &data)
180 {
181 DBPassword dbPassword;
182 if (!data.isEncrypt) {
183 return dbPassword;
184 }
185
186 SecretKeyMetaData secretKey;
187 secretKey.storeType = data.storeType;
188 auto storeKey = data.GetSecretKey();
189 MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
190 std::vector<uint8_t> password;
191 CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
192 dbPassword.SetValue(password.data(), password.size());
193 password.assign(password.size(), 0);
194 return dbPassword;
195 }
196
DBStoreDelegate(DBStore * delegate,std::shared_ptr<Observers> observers)197 StoreCache::DBStoreDelegate::DBStoreDelegate(DBStore *delegate, std::shared_ptr<Observers> observers)
198 : delegate_(delegate)
199 {
200 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
201 SetObservers(std::move(observers));
202 }
203
~DBStoreDelegate()204 StoreCache::DBStoreDelegate::~DBStoreDelegate()
205 {
206 if (delegate_ != nullptr) {
207 delegate_->UnRegisterObserver(this);
208 }
209 DBManager manager("", "");
210 manager.CloseKvStore(delegate_);
211 delegate_ = nullptr;
212 }
213
operator std::shared_ptr<DBStore>()214 StoreCache::DBStoreDelegate::operator std::shared_ptr<DBStore> ()
215 {
216 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
217 mutex_.lock_shared();
218 if (delegate_ == nullptr) {
219 mutex_.unlock_shared();
220 return nullptr;
221 }
222 return std::shared_ptr<DBStore>(delegate_, [this](DBStore *) { mutex_.unlock_shared();});
223 }
224
operator <(const Time & time) const225 bool StoreCache::DBStoreDelegate::operator<(const Time &time) const
226 {
227 return time_ < time;
228 }
229
Close(DBManager & manager)230 bool StoreCache::DBStoreDelegate::Close(DBManager &manager)
231 {
232 std::unique_lock<decltype(mutex_)> lock(mutex_);
233 if (delegate_ != nullptr) {
234 delegate_->UnRegisterObserver(this);
235 }
236
237 auto status = manager.CloseKvStore(delegate_);
238 if (status == DBStatus::BUSY) {
239 return false;
240 }
241 delegate_ = nullptr;
242 return true;
243 }
244
OnChange(const DistributedDB::KvStoreChangedData & data)245 void StoreCache::DBStoreDelegate::OnChange(const DistributedDB::KvStoreChangedData &data)
246 {
247 if (observers_ == nullptr || delegate_ == nullptr) {
248 ZLOGE("already closed");
249 return;
250 }
251
252 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
253 auto observers = observers_;
254 std::vector<uint8_t> key;
255 auto inserts = Convert(data.GetEntriesInserted());
256 auto updates = Convert(data.GetEntriesUpdated());
257 auto deletes = Convert(data.GetEntriesDeleted());
258 ZLOGD("C:%{public}zu U:%{public}zu D:%{public}zu storeId:%{public}s", inserts.size(), updates.size(),
259 deletes.size(), delegate_->GetStoreId().c_str());
260 ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
261 for (auto &observer : *observers) {
262 if (observer == nullptr) {
263 continue;
264 }
265 observer->OnChange(change);
266 }
267 }
268
SetObservers(std::shared_ptr<Observers> observers)269 void StoreCache::DBStoreDelegate::SetObservers(std::shared_ptr<Observers> observers)
270 {
271 if (observers_ == observers || delegate_ == nullptr) {
272 return;
273 }
274
275 observers_ = observers;
276
277 if (observers_ != nullptr && !observers_->empty()) {
278 ZLOGD("storeId:%{public}s observers:%{public}zu", delegate_->GetStoreId().c_str(), observers_->size());
279 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, this);
280 }
281 }
282
Convert(const std::list<DBEntry> & dbEntries)283 std::vector<Entry> StoreCache::DBStoreDelegate::Convert(const std::list<DBEntry> &dbEntries)
284 {
285 std::vector<Entry> entries;
286 for (const auto &entry : dbEntries) {
287 Entry tmpEntry;
288 tmpEntry.key = entry.key;
289 tmpEntry.value = entry.value;
290 entries.push_back(tmpEntry);
291 }
292 return entries;
293 }
294 }; // namespace OHOS::DistributedKv
295