1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "StoreCache"
16 #include "store_cache.h"
17
18 #include "account/account_delegate.h"
19 #include "crypto_manager.h"
20 #include "device_matrix.h"
21 #include "directory/directory_manager.h"
22 #include "log_print.h"
23 #include "metadata/meta_data_manager.h"
24 #include "metadata/secret_key_meta_data.h"
25 #include "types.h"
26 #include "utils/anonymous.h"
27 namespace OHOS::DistributedKv {
28 using namespace OHOS::DistributedData;
29 constexpr int64_t StoreCache::INTERVAL;
GetStore(const StoreMetaData & data,std::shared_ptr<Observers> observers,DBStatus & status)30 StoreCache::Store StoreCache::GetStore(const StoreMetaData &data, std::shared_ptr<Observers> observers,
31 DBStatus &status)
32 {
33 Store store = nullptr;
34 status = DBStatus::NOT_FOUND;
35 stores_.Compute(data.tokenId, [&](const auto &key, std::map<std::string, DBStoreDelegate> &stores) {
36 auto it = stores.find(data.storeId);
37 if (it != stores.end()) {
38 it->second.SetObservers(observers);
39 store = it->second;
40 return true;
41 }
42
43 DBStore *dbStore = nullptr;
44 DBManager manager(data.appId, data.user, data.instanceId);
45 manager.SetKvStoreConfig({ DirectoryManager::GetInstance().GetStorePath(data) });
46 manager.GetKvStore(data.storeId, GetDBOption(data, GetDBPassword(data)),
47 [&status, &dbStore](auto dbStatus, auto *tmpStore) {
48 status = dbStatus;
49 dbStore = tmpStore;
50 });
51
52 if (dbStore == nullptr) {
53 return !stores.empty();
54 }
55
56 if (data.isAutoSync) {
57 auto code = DeviceMatrix::GetInstance().GetCode(data);
58 dbStore->SetRemotePushFinishedNotify([code](const DistributedDB::RemotePushNotifyInfo &info) {
59 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, code, true);
60 });
61 bool param = true;
62 auto data = static_cast<DistributedDB::PragmaData>(¶m);
63 dbStore->Pragma(DistributedDB::SET_SYNC_RETRY, data);
64 }
65
66 auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(data.storeId),
67 std::forward_as_tuple(dbStore, observers));
68 store = result.first->second;
69 return !stores.empty();
70 });
71
72 executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
73 return store;
74 }
75
CloseStore(uint32_t tokenId,const std::string & storeId)76 void StoreCache::CloseStore(uint32_t tokenId, const std::string &storeId)
77 {
78 stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
79 DBManager manager("", "");
80 auto it = delegates.find(storeId);
81 if (it != delegates.end()) {
82 it->second.Close(manager);
83 delegates.erase(it);
84 }
85 return !delegates.empty();
86 });
87 }
88
CloseExcept(const std::set<int32_t> & users)89 void StoreCache::CloseExcept(const std::set<int32_t> &users)
90 {
91 DBManager manager("", "");
92 stores_.EraseIf([&manager, &users](const auto &tokenId, std::map<std::string, DBStoreDelegate> &delegates) {
93 auto userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
94 if (users.count(userId) != 0) {
95 return delegates.empty();
96 }
97 for (auto it = delegates.begin(); it != delegates.end();) {
98 // if the kv store is BUSY we wait more INTERVAL minutes again
99 if (!it->second.Close(manager)) {
100 ++it;
101 } else {
102 it = delegates.erase(it);
103 }
104 }
105 return delegates.empty();
106 });
107 }
108
SetObserver(uint32_t tokenId,const std::string & storeId,std::shared_ptr<Observers> observers)109 void StoreCache::SetObserver(uint32_t tokenId, const std::string &storeId, std::shared_ptr<Observers> observers)
110 {
111 stores_.ComputeIfPresent(tokenId, [&storeId, &observers](auto &key, auto &stores) {
112 ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, Anonymous::Change(storeId).c_str(),
113 observers ? observers->size() : size_t(0));
114 auto it = stores.find(storeId);
115 if (it != stores.end()) {
116 it->second.SetObservers(observers);
117 }
118 return true;
119 });
120 }
121
GarbageCollect()122 void StoreCache::GarbageCollect()
123 {
124 DBManager manager("", "");
125 auto current = std::chrono::steady_clock::now();
126 stores_.EraseIf([&manager, ¤t](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
127 for (auto it = delegates.begin(); it != delegates.end();) {
128 // if the kv store is BUSY we wait more INTERVAL minutes again
129 if ((it->second < current) && it->second.Close(manager)) {
130 it = delegates.erase(it);
131 } else {
132 ++it;
133 }
134 }
135 return delegates.empty();
136 });
137 if (!stores_.Empty()) {
138 ZLOGD("stores size:%{public}zu", stores_.Size());
139 executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
140 }
141 }
142
GetDBOption(const StoreMetaData & data,const DBPassword & password)143 StoreCache::DBOption StoreCache::GetDBOption(const StoreMetaData &data, const DBPassword &password)
144 {
145 DBOption dbOption;
146 dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
147 dbOption.createIfNecessary = false;
148 dbOption.isMemoryDb = false;
149 dbOption.isEncryptedDb = data.isEncrypt;
150 dbOption.isNeedCompressOnSync = data.isNeedCompress;
151 if (data.isEncrypt) {
152 dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
153 dbOption.passwd = password;
154 }
155
156 if (data.storeType == KvStoreType::SINGLE_VERSION) {
157 dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
158 } else if (data.storeType == KvStoreType::DEVICE_COLLABORATION) {
159 dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
160 }
161
162 dbOption.schema = data.schema;
163 dbOption.createDirByStoreIdOnly = true;
164 dbOption.secOption = GetDBSecurity(data.securityLevel);
165 return dbOption;
166 }
167
GetDBSecurity(int32_t secLevel)168 StoreCache::DBSecurity StoreCache::GetDBSecurity(int32_t secLevel)
169 {
170 if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
171 return { DistributedDB::NOT_SET, DistributedDB::ECE };
172 }
173 if (secLevel == SecurityLevel::S3) {
174 return { DistributedDB::S3, DistributedDB::SECE };
175 }
176 if (secLevel == SecurityLevel::S4) {
177 return { DistributedDB::S4, DistributedDB::ECE };
178 }
179 return { secLevel, DistributedDB::ECE };
180 }
181
GetDBPassword(const StoreMetaData & data)182 StoreCache::DBPassword StoreCache::GetDBPassword(const StoreMetaData &data)
183 {
184 DBPassword dbPassword;
185 if (!data.isEncrypt) {
186 return dbPassword;
187 }
188
189 SecretKeyMetaData secretKey;
190 secretKey.storeType = data.storeType;
191 auto storeKey = data.GetSecretKey();
192 MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
193 std::vector<uint8_t> password;
194 CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
195 dbPassword.SetValue(password.data(), password.size());
196 password.assign(password.size(), 0);
197 return dbPassword;
198 }
199
SetThreadPool(std::shared_ptr<ExecutorPool> executors)200 void StoreCache::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
201 {
202 executors_ = executors;
203 }
204
DBStoreDelegate(DBStore * delegate,std::shared_ptr<Observers> observers)205 StoreCache::DBStoreDelegate::DBStoreDelegate(DBStore *delegate, std::shared_ptr<Observers> observers)
206 : time_(std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL)), delegate_(delegate)
207 {
208 SetObservers(std::move(observers));
209 }
210
~DBStoreDelegate()211 StoreCache::DBStoreDelegate::~DBStoreDelegate()
212 {
213 if (delegate_ != nullptr) {
214 delegate_->UnRegisterObserver(this);
215 }
216 DBManager manager("", "");
217 manager.CloseKvStore(delegate_);
218 delegate_ = nullptr;
219 }
220
operator std::shared_ptr<DBStore>()221 StoreCache::DBStoreDelegate::operator std::shared_ptr<DBStore> ()
222 {
223 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
224 mutex_.lock_shared();
225 if (delegate_ == nullptr) {
226 mutex_.unlock_shared();
227 return nullptr;
228 }
229 return std::shared_ptr<DBStore>(delegate_, [this](DBStore *) { mutex_.unlock_shared();});
230 }
231
operator <(const Time & time) const232 bool StoreCache::DBStoreDelegate::operator<(const Time &time) const
233 {
234 return time_ < time;
235 }
236
Close(DBManager & manager)237 bool StoreCache::DBStoreDelegate::Close(DBManager &manager)
238 {
239 std::unique_lock<decltype(mutex_)> lock(mutex_);
240 if (delegate_ != nullptr) {
241 delegate_->UnRegisterObserver(this);
242 auto status = manager.CloseKvStore(delegate_);
243 if (status == DBStatus::BUSY) {
244 return false;
245 }
246 delegate_ = nullptr;
247 }
248 return true;
249 }
250
OnChange(const DistributedDB::KvStoreChangedData & data)251 void StoreCache::DBStoreDelegate::OnChange(const DistributedDB::KvStoreChangedData &data)
252 {
253 if (observers_ == nullptr || delegate_ == nullptr) {
254 ZLOGE("already closed");
255 return;
256 }
257
258 time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
259 auto observers = observers_;
260 std::vector<uint8_t> key;
261 auto inserts = Convert(data.GetEntriesInserted());
262 auto updates = Convert(data.GetEntriesUpdated());
263 auto deletes = Convert(data.GetEntriesDeleted());
264 ZLOGD("C:%{public}zu U:%{public}zu D:%{public}zu storeId:%{public}s", inserts.size(), updates.size(),
265 deletes.size(), delegate_->GetStoreId().c_str());
266 ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
267 for (auto &observer : *observers) {
268 if (observer == nullptr) {
269 continue;
270 }
271 observer->OnChange(change);
272 }
273 }
274
SetObservers(std::shared_ptr<Observers> observers)275 void StoreCache::DBStoreDelegate::SetObservers(std::shared_ptr<Observers> observers)
276 {
277 if (observers_ == observers || delegate_ == nullptr) {
278 return;
279 }
280
281 observers_ = observers;
282
283 if (observers_ != nullptr && !observers_->empty()) {
284 ZLOGD("storeId:%{public}s observers:%{public}zu", delegate_->GetStoreId().c_str(), observers_->size());
285 delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, this);
286 }
287 }
288
Convert(const std::list<DBEntry> & dbEntries)289 std::vector<Entry> StoreCache::DBStoreDelegate::Convert(const std::list<DBEntry> &dbEntries)
290 {
291 std::vector<Entry> entries;
292 for (const auto &entry : dbEntries) {
293 Entry tmpEntry;
294 tmpEntry.key = entry.key;
295 tmpEntry.value = entry.value;
296 entries.push_back(tmpEntry);
297 }
298 return entries;
299 }
300 }; // namespace OHOS::DistributedKv
301