• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "metadata/meta_data_manager.h"
17 #include <csignal>
18 #define LOG_TAG "MetaDataManager"
19 
20 #include "directory/directory_manager.h"
21 #include "kv_store_nb_delegate.h"
22 #include "log_print.h"
23 #include "utils/anonymous.h"
24 #include "utils/corrupt_reporter.h"
25 
26 namespace OHOS::DistributedData {
27 class MetaObserver : public DistributedDB::KvStoreObserver {
28 public:
29     using Filter = MetaDataManager::Filter;
30     using MetaStore = MetaDataManager::MetaStore;
31     using Observer = MetaDataManager::Observer;
32     using DBOrigin = DistributedDB::Origin;
33     using DBChangeData = DistributedDB::ChangedData;
34     using Type = DistributedDB::Type;
35     MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer,
36         bool isLocal = false);
37     virtual ~MetaObserver();
38 
39     // Database change callback
40     void OnChange(const DistributedDB::KvStoreChangedData &data) override;
41     void OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) override;
42 
43     void HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData);
44 
45 private:
46     std::shared_ptr<MetaStore> metaStore_;
47     std::shared_ptr<Filter> filter_;
48     Observer observer_;
49 };
50 
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)51 MetaObserver::MetaObserver(
52     std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
53     : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
54 {
55     if (metaStore_ != nullptr) {
56         int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
57                            : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
58         auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
59         if (!isLocal) {
60             status = metaStore_->RegisterObserver(filter_->GetKey(), DistributedDB::OBSERVER_CHANGES_CLOUD, this);
61         }
62         if (status != DistributedDB::DBStatus::OK) {
63             ZLOGE("register meta observer failed :%{public}d.", status);
64         }
65     }
66 }
67 
~MetaObserver()68 MetaObserver::~MetaObserver()
69 {
70     if (metaStore_ != nullptr) {
71         metaStore_->UnRegisterObserver(this);
72     }
73 }
74 
operator ()(const std::string & key) const75 bool MetaDataManager::Filter::operator()(const std::string &key) const
76 {
77     return key.find(pattern_) == 0;
78 }
79 
GetKey() const80 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
81 {
82     return std::vector<uint8_t>();
83 }
84 
Filter(const std::string & pattern)85 MetaDataManager::Filter::Filter(const std::string &pattern) : pattern_(pattern)
86 {
87 }
88 
OnChange(const DistributedDB::KvStoreChangedData & data)89 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
90 {
91     if (filter_ == nullptr) {
92         ZLOGE("filter_ is nullptr!");
93         return;
94     }
95     auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
96     int32_t next = MetaDataManager::INSERT;
97     for (auto value : values) {
98         int32_t action = next++;
99         if (value->empty()) {
100             continue;
101         }
102         for (const auto &entry : *value) {
103             std::string key(entry.key.begin(), entry.key.end());
104             if (!(*filter_)(key)) {
105                 continue;
106             }
107             observer_(key, { entry.value.begin(), entry.value.end() }, action);
108         }
109     }
110 }
111 
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)112 void MetaObserver::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
113 {
114     (void)origin;
115     (void)originalId;
116     HandleChanges(MetaDataManager::INSERT, data.primaryData[MetaDataManager::INSERT]);
117     HandleChanges(MetaDataManager::UPDATE, data.primaryData[MetaDataManager::UPDATE]);
118     HandleChanges(MetaDataManager::DELETE, data.primaryData[MetaDataManager::DELETE]);
119 }
120 
HandleChanges(int32_t flag,std::vector<std::vector<Type>> & priData)121 void MetaObserver::HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData)
122 {
123     if (priData.empty()) {
124         return;
125     }
126     if (filter_ == nullptr) {
127         ZLOGE("filter_ is nullptr!");
128         return;
129     }
130     for (const auto &priKey : priData) {
131         if (priKey.empty()) {
132             continue;
133         }
134         auto strValue = std::get_if<std::string>(&priKey[0]);
135         if (strValue != nullptr) {
136             auto key = *strValue;
137             if (!(*filter_)(key)) {
138                 continue;
139             }
140             observer_(key, "", flag);
141         }
142     }
143 }
144 
GetInstance()145 MetaDataManager &MetaDataManager::GetInstance()
146 {
147     static MetaDataManager instance;
148     return instance;
149 }
150 
151 MetaDataManager::MetaDataManager() = default;
152 
~MetaDataManager()153 MetaDataManager::~MetaDataManager()
154 {
155     metaObservers_.Clear();
156 }
157 
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup,const std::string & storeId)158 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup, const std::string &storeId)
159 {
160     if (metaStore == nullptr) {
161         return;
162     }
163 
164     std::lock_guard<decltype(mutex_)> lg(mutex_);
165     if (inited_) {
166         return;
167     }
168     metaStore_ = std::move(metaStore);
169     backup_ = backup;
170     storeId_ = storeId;
171     inited_ = true;
172 }
173 
SetSyncer(const Syncer & syncer)174 void MetaDataManager::SetSyncer(const Syncer &syncer)
175 {
176     if (metaStore_ == nullptr) {
177         return;
178     }
179     syncer_ = syncer;
180 }
181 
SetCloudSyncer(const CloudSyncer & cloudSyncer)182 void MetaDataManager::SetCloudSyncer(const CloudSyncer &cloudSyncer)
183 {
184     if (metaStore_ == nullptr) {
185         return;
186     }
187     cloudSyncer_ = cloudSyncer;
188 }
189 
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)190 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
191 {
192     if (!inited_) {
193         return false;
194     }
195 
196     auto data = Serializable::Marshall(value);
197     auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
198                           : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
199     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
200         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
201             status, isLocal, Anonymous::Change(key).c_str());
202         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
203         StopSA();
204         return false;
205     }
206     if (status == DistributedDB::DBStatus::OK && backup_) {
207         backup_(metaStore_);
208     }
209     if (!isLocal && cloudSyncer_) {
210         cloudSyncer_();
211     }
212     if (status != DistributedDB::DBStatus::OK) {
213         ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal,
214             Anonymous::Change(key).c_str());
215     }
216     return status == DistributedDB::DBStatus::OK;
217 }
218 
LoadMeta(const std::string & key,Serializable & value,bool isLocal)219 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
220 {
221     if (!inited_) {
222         return false;
223     }
224 
225     DistributedDB::Value data;
226     auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
227                           : metaStore_->Get({ key.begin(), key.end() }, data);
228     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
229         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
230             status, isLocal, Anonymous::Change(key).c_str());
231         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
232         StopSA();
233         return false;
234     }
235     if (status != DistributedDB::DBStatus::OK) {
236         return false;
237     }
238     Serializable::Unmarshall({ data.begin(), data.end() }, value);
239     if (isLocal) {
240         data.assign(data.size(), 0);
241     }
242     return true;
243 }
244 
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)245 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
246 {
247     std::vector<DistributedDB::Entry> dbEntries;
248     auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
249                           : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
250     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
251         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d", status, isLocal);
252         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
253         StopSA();
254         return false;
255     }
256     if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
257         ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
258             status, isLocal);
259         return false;
260     }
261     entries.resize(dbEntries.size());
262     for (size_t i = 0; i < dbEntries.size(); ++i) {
263         entries[i] = std::move(dbEntries[i].value);
264     }
265     return true;
266 }
267 
DelMeta(const std::string & key,bool isLocal)268 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
269 {
270     if (!inited_) {
271         return false;
272     }
273 
274     DistributedDB::Value data;
275     auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
276                           : metaStore_->Delete({ key.begin(), key.end() });
277     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
278         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
279             status, isLocal, Anonymous::Change(key).c_str());
280         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
281         StopSA();
282         return false;
283     }
284     if (status == DistributedDB::DBStatus::OK && backup_) {
285         backup_(metaStore_);
286     }
287     if (!isLocal && cloudSyncer_) {
288         cloudSyncer_();
289     }
290     return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
291 }
292 
Sync(const std::vector<std::string> & devices,OnComplete complete,bool wait)293 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete, bool wait)
294 {
295     if (!inited_ || devices.empty()) {
296         return false;
297     }
298     auto status = metaStore_->Sync(devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) {
299         std::map<std::string, int32_t> results;
300         for (auto &[uuid, status] : dbResults) {
301             results.insert_or_assign(uuid, static_cast<int32_t>(status));
302         }
303         complete(results);
304     }, wait);
305     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
306         ZLOGE("db corrupted! status:%{public}d", status);
307         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
308         StopSA();
309         return false;
310     }
311     if (status != DistributedDB::OK) {
312         ZLOGW("meta data sync error %{public}d.", status);
313     }
314     return status == DistributedDB::OK;
315 }
316 
Subscribe(std::shared_ptr<Filter> filter,Observer observer)317 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
318 {
319     if (!inited_) {
320         return false;
321     }
322 
323     return metaObservers_.ComputeIfAbsent("", [this, &observer, &filter](const std::string &key) -> auto {
324         return std::make_shared<MetaObserver>(metaStore_, filter, observer);
325     });
326 }
327 
Subscribe(std::string prefix,Observer observer,bool isLocal)328 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
329 {
330     if (!inited_) {
331         return false;
332     }
333 
334     return metaObservers_.ComputeIfAbsent(prefix, [this, isLocal, &observer, &prefix](const std::string &key) -> auto {
335         return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
336     });
337 }
338 
Unsubscribe(std::string filter)339 bool MetaDataManager::Unsubscribe(std::string filter)
340 {
341     if (!inited_) {
342         return false;
343     }
344 
345     return metaObservers_.Erase(filter);
346 }
347 
StopSA()348 void MetaDataManager::StopSA()
349 {
350     ZLOGI("stop distributeddata");
351     int err = raise(SIGKILL);
352     if (err < 0) {
353         ZLOGE("stop distributeddata failed, errCode: %{public}d", err);
354     }
355 }
356 } // namespace OHOS::DistributedData