• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "StoreCache"
16 #include "store_cache.h"
17 
18 #include "account/account_delegate.h"
19 #include "crypto_manager.h"
20 #include "device_matrix.h"
21 #include "directory/directory_manager.h"
22 #include "log_print.h"
23 #include "metadata/meta_data_manager.h"
24 #include "metadata/secret_key_meta_data.h"
25 #include "types.h"
26 #include "utils/anonymous.h"
27 namespace OHOS::DistributedKv {
28 using namespace OHOS::DistributedData;
29 constexpr int64_t StoreCache::INTERVAL;
GetStore(const StoreMetaData & data,std::shared_ptr<Observers> observers,DBStatus & status)30 StoreCache::Store StoreCache::GetStore(const StoreMetaData &data, std::shared_ptr<Observers> observers,
31     DBStatus &status)
32 {
33     Store store = nullptr;
34     status = DBStatus::NOT_FOUND;
35     stores_.Compute(data.tokenId, [&](const auto &key, std::map<std::string, DBStoreDelegate> &stores) {
36         auto it = stores.find(data.storeId);
37         if (it != stores.end()) {
38             it->second.SetObservers(observers);
39             store = it->second;
40             return true;
41         }
42 
43         DBStore *dbStore = nullptr;
44         DBManager manager(data.appId, data.user, data.instanceId);
45         manager.SetKvStoreConfig({ DirectoryManager::GetInstance().GetStorePath(data) });
46         manager.GetKvStore(data.storeId, GetDBOption(data, GetDBPassword(data)),
47             [&status, &dbStore](auto dbStatus, auto *tmpStore) {
48                 status = dbStatus;
49                 dbStore = tmpStore;
50             });
51 
52         if (dbStore == nullptr) {
53             return !stores.empty();
54         }
55 
56         if (data.isAutoSync) {
57             auto code = DeviceMatrix::GetInstance().GetCode(data);
58             dbStore->SetRemotePushFinishedNotify([code](const DistributedDB::RemotePushNotifyInfo &info) {
59                 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, code, true);
60             });
61         }
62 
63         auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(data.storeId),
64             std::forward_as_tuple(dbStore, observers));
65         store = result.first->second;
66         return !stores.empty();
67     });
68 
69     executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
70     return store;
71 }
72 
CloseStore(uint32_t tokenId,const std::string & storeId)73 void StoreCache::CloseStore(uint32_t tokenId, const std::string &storeId)
74 {
75     stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
76         DBManager manager("", "");
77         auto it = delegates.find(storeId);
78         if (it != delegates.end()) {
79             it->second.Close(manager);
80             delegates.erase(it);
81         }
82         return !delegates.empty();
83     });
84 }
85 
CloseExcept(const std::set<int32_t> & users)86 void StoreCache::CloseExcept(const std::set<int32_t> &users)
87 {
88     DBManager manager("", "");
89     stores_.EraseIf([&manager, &users](const auto &tokenId, std::map<std::string, DBStoreDelegate> &delegates) {
90         auto userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
91         if (users.count(userId) != 0) {
92             return delegates.empty();
93         }
94         for (auto it = delegates.begin(); it != delegates.end();) {
95             // if the kv store is BUSY we wait more INTERVAL minutes again
96             if (!it->second.Close(manager)) {
97                 ++it;
98             } else {
99                 it = delegates.erase(it);
100             }
101         }
102         return delegates.empty();
103     });
104 }
105 
SetObserver(uint32_t tokenId,const std::string & storeId,std::shared_ptr<Observers> observers)106 void StoreCache::SetObserver(uint32_t tokenId, const std::string &storeId, std::shared_ptr<Observers> observers)
107 {
108     stores_.ComputeIfPresent(tokenId, [&storeId, &observers](auto &key, auto &stores) {
109         ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, Anonymous::Change(storeId).c_str(),
110             observers ? observers->size() : size_t(0));
111         auto it = stores.find(storeId);
112         if (it != stores.end()) {
113             it->second.SetObservers(observers);
114         }
115         return true;
116     });
117 }
118 
GarbageCollect()119 void StoreCache::GarbageCollect()
120 {
121     DBManager manager("", "");
122     auto current = std::chrono::steady_clock::now();
123     stores_.EraseIf([&manager, &current](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
124         for (auto it = delegates.begin(); it != delegates.end();) {
125             // if the kv store is BUSY we wait more INTERVAL minutes again
126             if ((it->second < current) && it->second.Close(manager)) {
127                 it = delegates.erase(it);
128             } else {
129                 ++it;
130             }
131         }
132         return delegates.empty();
133     });
134     if (!stores_.Empty()) {
135         ZLOGD("stores size:%{public}zu", stores_.Size());
136         executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
137     }
138 }
139 
GetDBOption(const StoreMetaData & data,const DBPassword & password)140 StoreCache::DBOption StoreCache::GetDBOption(const StoreMetaData &data, const DBPassword &password)
141 {
142     DBOption dbOption;
143     dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
144     dbOption.createIfNecessary = false;
145     dbOption.isMemoryDb = false;
146     dbOption.isEncryptedDb = data.isEncrypt;
147     if (data.isEncrypt) {
148         dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
149         dbOption.passwd = password;
150     }
151 
152     if (data.storeType == KvStoreType::SINGLE_VERSION) {
153         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
154     } else if (data.storeType == KvStoreType::DEVICE_COLLABORATION) {
155         dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
156     }
157 
158     dbOption.schema = data.schema;
159     dbOption.createDirByStoreIdOnly = true;
160     dbOption.secOption = GetDBSecurity(data.securityLevel);
161     return dbOption;
162 }
163 
GetDBSecurity(int32_t secLevel)164 StoreCache::DBSecurity StoreCache::GetDBSecurity(int32_t secLevel)
165 {
166     if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
167         return { DistributedDB::NOT_SET, DistributedDB::ECE };
168     }
169     if (secLevel == SecurityLevel::S3) {
170         return { DistributedDB::S3, DistributedDB::SECE };
171     }
172     if (secLevel == SecurityLevel::S4) {
173         return { DistributedDB::S4, DistributedDB::ECE };
174     }
175     return { secLevel, DistributedDB::ECE };
176 }
177 
GetDBPassword(const StoreMetaData & data)178 StoreCache::DBPassword StoreCache::GetDBPassword(const StoreMetaData &data)
179 {
180     DBPassword dbPassword;
181     if (!data.isEncrypt) {
182         return dbPassword;
183     }
184 
185     SecretKeyMetaData secretKey;
186     secretKey.storeType = data.storeType;
187     auto storeKey = data.GetSecretKey();
188     MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
189     std::vector<uint8_t> password;
190     CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
191     dbPassword.SetValue(password.data(), password.size());
192     password.assign(password.size(), 0);
193     return dbPassword;
194 }
195 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)196 void StoreCache::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
197 {
198     executors_ = executors;
199 }
200 
DBStoreDelegate(DBStore * delegate,std::shared_ptr<Observers> observers)201 StoreCache::DBStoreDelegate::DBStoreDelegate(DBStore *delegate, std::shared_ptr<Observers> observers)
202     : time_(std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL)), delegate_(delegate)
203 {
204     SetObservers(std::move(observers));
205 }
206 
~DBStoreDelegate()207 StoreCache::DBStoreDelegate::~DBStoreDelegate()
208 {
209     if (delegate_ != nullptr) {
210         delegate_->UnRegisterObserver(this);
211     }
212     DBManager manager("", "");
213     manager.CloseKvStore(delegate_);
214     delegate_ = nullptr;
215 }
216 
operator std::shared_ptr<DBStore>()217 StoreCache::DBStoreDelegate::operator std::shared_ptr<DBStore> ()
218 {
219     time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
220     mutex_.lock_shared();
221     if (delegate_ == nullptr) {
222         mutex_.unlock_shared();
223         return nullptr;
224     }
225     return std::shared_ptr<DBStore>(delegate_, [this](DBStore *) { mutex_.unlock_shared();});
226 }
227 
operator <(const Time & time) const228 bool StoreCache::DBStoreDelegate::operator<(const Time &time) const
229 {
230     return time_ < time;
231 }
232 
Close(DBManager & manager)233 bool StoreCache::DBStoreDelegate::Close(DBManager &manager)
234 {
235     std::unique_lock<decltype(mutex_)> lock(mutex_);
236     if (delegate_ != nullptr) {
237         delegate_->UnRegisterObserver(this);
238         auto status = manager.CloseKvStore(delegate_);
239         if (status == DBStatus::BUSY) {
240             return false;
241         }
242         delegate_ = nullptr;
243     }
244     return true;
245 }
246 
OnChange(const DistributedDB::KvStoreChangedData & data)247 void StoreCache::DBStoreDelegate::OnChange(const DistributedDB::KvStoreChangedData &data)
248 {
249     if (observers_ == nullptr || delegate_ == nullptr) {
250         ZLOGE("already closed");
251         return;
252     }
253 
254     time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
255     auto observers = observers_;
256     std::vector<uint8_t> key;
257     auto inserts = Convert(data.GetEntriesInserted());
258     auto updates = Convert(data.GetEntriesUpdated());
259     auto deletes = Convert(data.GetEntriesDeleted());
260     ZLOGD("C:%{public}zu U:%{public}zu D:%{public}zu storeId:%{public}s", inserts.size(), updates.size(),
261         deletes.size(), delegate_->GetStoreId().c_str());
262     ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
263     for (auto &observer : *observers) {
264         if (observer == nullptr) {
265             continue;
266         }
267         observer->OnChange(change);
268     }
269 }
270 
SetObservers(std::shared_ptr<Observers> observers)271 void StoreCache::DBStoreDelegate::SetObservers(std::shared_ptr<Observers> observers)
272 {
273     if (observers_ == observers || delegate_ == nullptr) {
274         return;
275     }
276 
277     observers_ = observers;
278 
279     if (observers_ != nullptr && !observers_->empty()) {
280         ZLOGD("storeId:%{public}s observers:%{public}zu", delegate_->GetStoreId().c_str(), observers_->size());
281         delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, this);
282     }
283 }
284 
Convert(const std::list<DBEntry> & dbEntries)285 std::vector<Entry> StoreCache::DBStoreDelegate::Convert(const std::list<DBEntry> &dbEntries)
286 {
287     std::vector<Entry> entries;
288     for (const auto &entry : dbEntries) {
289         Entry tmpEntry;
290         tmpEntry.key = entry.key;
291         tmpEntry.value = entry.value;
292         entries.push_back(tmpEntry);
293     }
294     return entries;
295 }
296 }; // namespace OHOS::DistributedKv
297