• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "StoreCache"
16 #include "store_cache.h"
17 
18 #include "account/account_delegate.h"
19 #include "crypto_manager.h"
20 #include "device_matrix.h"
21 #include "directory/directory_manager.h"
22 #include "log_print.h"
23 #include "metadata/meta_data_manager.h"
24 #include "metadata/secret_key_meta_data.h"
25 #include "types.h"
26 #include "utils/anonymous.h"
27 namespace OHOS::DistributedKv {
28 using namespace OHOS::DistributedData;
29 constexpr int64_t StoreCache::INTERVAL;
GetStore(const StoreMetaData & data,std::shared_ptr<Observers> observers,DBStatus & status)30 StoreCache::Store StoreCache::GetStore(const StoreMetaData &data, std::shared_ptr<Observers> observers,
31     DBStatus &status)
32 {
33     Store store = nullptr;
34     status = DBStatus::NOT_FOUND;
35     stores_.Compute(data.tokenId, [&](const auto &key, std::map<std::string, DBStoreDelegate> &stores) {
36         auto it = stores.find(data.storeId);
37         if (it != stores.end()) {
38             it->second.SetObservers(observers);
39             store = it->second;
40             return true;
41         }
42 
43         DBStore *dbStore = nullptr;
44         DBManager manager(data.appId, data.user, data.instanceId);
45         manager.SetKvStoreConfig({ DirectoryManager::GetInstance().GetStorePath(data) });
46         manager.GetKvStore(data.storeId, GetDBOption(data, GetDBPassword(data)),
47             [&status, &dbStore](auto dbStatus, auto *tmpStore) {
48                 status = dbStatus;
49                 dbStore = tmpStore;
50             });
51 
52         if (dbStore == nullptr) {
53             return !stores.empty();
54         }
55 
56         if (data.isAutoSync) {
57             auto code = DeviceMatrix::GetInstance().GetCode(data);
58             dbStore->SetRemotePushFinishedNotify([code](const DistributedDB::RemotePushNotifyInfo &info) {
59                 DeviceMatrix::GetInstance().OnExchanged(info.deviceId, code, true);
60             });
61             bool param = true;
62             auto data = static_cast<DistributedDB::PragmaData>(&param);
63             dbStore->Pragma(DistributedDB::SET_SYNC_RETRY, data);
64         }
65 
66         auto result = stores.emplace(std::piecewise_construct, std::forward_as_tuple(data.storeId),
67             std::forward_as_tuple(dbStore, observers));
68         store = result.first->second;
69         return !stores.empty();
70     });
71 
72     executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
73     return store;
74 }
75 
CloseStore(uint32_t tokenId,const std::string & storeId)76 void StoreCache::CloseStore(uint32_t tokenId, const std::string &storeId)
77 {
78     stores_.ComputeIfPresent(tokenId, [&storeId](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
79         DBManager manager("", "");
80         auto it = delegates.find(storeId);
81         if (it != delegates.end()) {
82             it->second.Close(manager);
83             delegates.erase(it);
84         }
85         return !delegates.empty();
86     });
87 }
88 
CloseExcept(const std::set<int32_t> & users)89 void StoreCache::CloseExcept(const std::set<int32_t> &users)
90 {
91     DBManager manager("", "");
92     stores_.EraseIf([&manager, &users](const auto &tokenId, std::map<std::string, DBStoreDelegate> &delegates) {
93         auto userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
94         if (users.count(userId) != 0) {
95             return delegates.empty();
96         }
97         for (auto it = delegates.begin(); it != delegates.end();) {
98             // if the kv store is BUSY we wait more INTERVAL minutes again
99             if (!it->second.Close(manager)) {
100                 ++it;
101             } else {
102                 it = delegates.erase(it);
103             }
104         }
105         return delegates.empty();
106     });
107 }
108 
SetObserver(uint32_t tokenId,const std::string & storeId,std::shared_ptr<Observers> observers)109 void StoreCache::SetObserver(uint32_t tokenId, const std::string &storeId, std::shared_ptr<Observers> observers)
110 {
111     stores_.ComputeIfPresent(tokenId, [&storeId, &observers](auto &key, auto &stores) {
112         ZLOGD("tokenId:0x%{public}x storeId:%{public}s observers:%{public}zu", key, Anonymous::Change(storeId).c_str(),
113             observers ? observers->size() : size_t(0));
114         auto it = stores.find(storeId);
115         if (it != stores.end()) {
116             it->second.SetObservers(observers);
117         }
118         return true;
119     });
120 }
121 
GarbageCollect()122 void StoreCache::GarbageCollect()
123 {
124     DBManager manager("", "");
125     auto current = std::chrono::steady_clock::now();
126     stores_.EraseIf([&manager, &current](auto &key, std::map<std::string, DBStoreDelegate> &delegates) {
127         for (auto it = delegates.begin(); it != delegates.end();) {
128             // if the kv store is BUSY we wait more INTERVAL minutes again
129             if ((it->second < current) && it->second.Close(manager)) {
130                 it = delegates.erase(it);
131             } else {
132                 ++it;
133             }
134         }
135         return delegates.empty();
136     });
137     if (!stores_.Empty()) {
138         ZLOGD("stores size:%{public}zu", stores_.Size());
139         executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&StoreCache::GarbageCollect, this));
140     }
141 }
142 
GetDBOption(const StoreMetaData & data,const DBPassword & password)143 StoreCache::DBOption StoreCache::GetDBOption(const StoreMetaData &data, const DBPassword &password)
144 {
145     DBOption dbOption;
146     dbOption.syncDualTupleMode = true; // tuple of (appid+storeid)
147     dbOption.createIfNecessary = false;
148     dbOption.isMemoryDb = false;
149     dbOption.isEncryptedDb = data.isEncrypt;
150     dbOption.isNeedCompressOnSync = data.isNeedCompress;
151     if (data.isEncrypt) {
152         dbOption.cipher = DistributedDB::CipherType::AES_256_GCM;
153         dbOption.passwd = password;
154     }
155 
156     if (data.storeType == KvStoreType::SINGLE_VERSION) {
157         dbOption.conflictResolvePolicy = DistributedDB::LAST_WIN;
158     } else if (data.storeType == KvStoreType::DEVICE_COLLABORATION) {
159         dbOption.conflictResolvePolicy = DistributedDB::DEVICE_COLLABORATION;
160     }
161 
162     dbOption.schema = data.schema;
163     dbOption.createDirByStoreIdOnly = true;
164     dbOption.secOption = GetDBSecurity(data.securityLevel);
165     return dbOption;
166 }
167 
GetDBSecurity(int32_t secLevel)168 StoreCache::DBSecurity StoreCache::GetDBSecurity(int32_t secLevel)
169 {
170     if (secLevel < SecurityLevel::NO_LABEL || secLevel > SecurityLevel::S4) {
171         return { DistributedDB::NOT_SET, DistributedDB::ECE };
172     }
173     if (secLevel == SecurityLevel::S3) {
174         return { DistributedDB::S3, DistributedDB::SECE };
175     }
176     if (secLevel == SecurityLevel::S4) {
177         return { DistributedDB::S4, DistributedDB::ECE };
178     }
179     return { secLevel, DistributedDB::ECE };
180 }
181 
GetDBPassword(const StoreMetaData & data)182 StoreCache::DBPassword StoreCache::GetDBPassword(const StoreMetaData &data)
183 {
184     DBPassword dbPassword;
185     if (!data.isEncrypt) {
186         return dbPassword;
187     }
188 
189     SecretKeyMetaData secretKey;
190     secretKey.storeType = data.storeType;
191     auto storeKey = data.GetSecretKey();
192     MetaDataManager::GetInstance().LoadMeta(storeKey, secretKey, true);
193     std::vector<uint8_t> password;
194     CryptoManager::GetInstance().Decrypt(secretKey.sKey, password);
195     dbPassword.SetValue(password.data(), password.size());
196     password.assign(password.size(), 0);
197     return dbPassword;
198 }
199 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)200 void StoreCache::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
201 {
202     executors_ = executors;
203 }
204 
DBStoreDelegate(DBStore * delegate,std::shared_ptr<Observers> observers)205 StoreCache::DBStoreDelegate::DBStoreDelegate(DBStore *delegate, std::shared_ptr<Observers> observers)
206     : time_(std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL)), delegate_(delegate)
207 {
208     SetObservers(std::move(observers));
209 }
210 
~DBStoreDelegate()211 StoreCache::DBStoreDelegate::~DBStoreDelegate()
212 {
213     if (delegate_ != nullptr) {
214         delegate_->UnRegisterObserver(this);
215     }
216     DBManager manager("", "");
217     manager.CloseKvStore(delegate_);
218     delegate_ = nullptr;
219 }
220 
operator std::shared_ptr<DBStore>()221 StoreCache::DBStoreDelegate::operator std::shared_ptr<DBStore> ()
222 {
223     time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
224     mutex_.lock_shared();
225     if (delegate_ == nullptr) {
226         mutex_.unlock_shared();
227         return nullptr;
228     }
229     return std::shared_ptr<DBStore>(delegate_, [this](DBStore *) { mutex_.unlock_shared();});
230 }
231 
operator <(const Time & time) const232 bool StoreCache::DBStoreDelegate::operator<(const Time &time) const
233 {
234     return time_ < time;
235 }
236 
Close(DBManager & manager)237 bool StoreCache::DBStoreDelegate::Close(DBManager &manager)
238 {
239     std::unique_lock<decltype(mutex_)> lock(mutex_);
240     if (delegate_ != nullptr) {
241         delegate_->UnRegisterObserver(this);
242         auto status = manager.CloseKvStore(delegate_);
243         if (status == DBStatus::BUSY) {
244             return false;
245         }
246         delegate_ = nullptr;
247     }
248     return true;
249 }
250 
OnChange(const DistributedDB::KvStoreChangedData & data)251 void StoreCache::DBStoreDelegate::OnChange(const DistributedDB::KvStoreChangedData &data)
252 {
253     if (observers_ == nullptr || delegate_ == nullptr) {
254         ZLOGE("already closed");
255         return;
256     }
257 
258     time_ = std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL);
259     auto observers = observers_;
260     std::vector<uint8_t> key;
261     auto inserts = Convert(data.GetEntriesInserted());
262     auto updates = Convert(data.GetEntriesUpdated());
263     auto deletes = Convert(data.GetEntriesDeleted());
264     ZLOGD("C:%{public}zu U:%{public}zu D:%{public}zu storeId:%{public}s", inserts.size(), updates.size(),
265         deletes.size(), delegate_->GetStoreId().c_str());
266     ChangeNotification change(std::move(inserts), std::move(updates), std::move(deletes), {}, false);
267     for (auto &observer : *observers) {
268         if (observer == nullptr) {
269             continue;
270         }
271         observer->OnChange(change);
272     }
273 }
274 
SetObservers(std::shared_ptr<Observers> observers)275 void StoreCache::DBStoreDelegate::SetObservers(std::shared_ptr<Observers> observers)
276 {
277     if (observers_ == observers || delegate_ == nullptr) {
278         return;
279     }
280 
281     observers_ = observers;
282 
283     if (observers_ != nullptr && !observers_->empty()) {
284         ZLOGD("storeId:%{public}s observers:%{public}zu", delegate_->GetStoreId().c_str(), observers_->size());
285         delegate_->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, this);
286     }
287 }
288 
Convert(const std::list<DBEntry> & dbEntries)289 std::vector<Entry> StoreCache::DBStoreDelegate::Convert(const std::list<DBEntry> &dbEntries)
290 {
291     std::vector<Entry> entries;
292     for (const auto &entry : dbEntries) {
293         Entry tmpEntry;
294         tmpEntry.key = entry.key;
295         tmpEntry.value = entry.value;
296         entries.push_back(tmpEntry);
297     }
298     return entries;
299 }
300 }; // namespace OHOS::DistributedKv
301