• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "metadata/meta_data_manager.h"
17 #include <csignal>
18 #define LOG_TAG "MetaDataManager"
19 
20 #include "directory/directory_manager.h"
21 #include "kv_store_nb_delegate.h"
22 #include "log_print.h"
23 #include "utils/anonymous.h"
24 #include "utils/corrupt_reporter.h"
25 
26 namespace OHOS::DistributedData {
27 class MetaObserver : public DistributedDB::KvStoreObserver {
28 public:
29     using Filter = MetaDataManager::Filter;
30     using MetaStore = MetaDataManager::MetaStore;
31     using Observer = MetaDataManager::Observer;
32     using DBOrigin = DistributedDB::Origin;
33     using DBChangeData = DistributedDB::ChangedData;
34     using Type = DistributedDB::Type;
35     MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer,
36         bool isLocal = false);
37     virtual ~MetaObserver();
38 
39     // Database change callback
40     void OnChange(const DistributedDB::KvStoreChangedData &data) override;
41     void OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) override;
42 
43     void HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData);
44 
45 private:
46     std::shared_ptr<MetaStore> metaStore_;
47     std::shared_ptr<Filter> filter_;
48     Observer observer_;
49 };
50 
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)51 MetaObserver::MetaObserver(
52     std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
53     : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
54 {
55     if (metaStore_ != nullptr) {
56         int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
57                            : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
58         auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
59         if (!isLocal) {
60             status = metaStore_->RegisterObserver(filter_->GetKey(), DistributedDB::OBSERVER_CHANGES_CLOUD, this);
61         }
62         if (status != DistributedDB::DBStatus::OK) {
63             ZLOGE("register meta observer failed :%{public}d.", status);
64         }
65     }
66 }
67 
~MetaObserver()68 MetaObserver::~MetaObserver()
69 {
70     if (metaStore_ != nullptr) {
71         metaStore_->UnRegisterObserver(this);
72     }
73 }
74 
operator ()(const std::string & key) const75 bool MetaDataManager::Filter::operator()(const std::string &key) const
76 {
77     return key.find(pattern_) == 0;
78 }
79 
GetKey() const80 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
81 {
82     return std::vector<uint8_t>();
83 }
84 
Filter(const std::string & pattern)85 MetaDataManager::Filter::Filter(const std::string &pattern) : pattern_(pattern)
86 {
87 }
88 
OnChange(const DistributedDB::KvStoreChangedData & data)89 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
90 {
91     if (filter_ == nullptr) {
92         ZLOGE("filter_ is nullptr!");
93         return;
94     }
95     auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
96     int32_t next = MetaDataManager::INSERT;
97     for (auto value : values) {
98         int32_t action = next++;
99         if (value->empty()) {
100             continue;
101         }
102         for (const auto &entry : *value) {
103             std::string key(entry.key.begin(), entry.key.end());
104             if (!(*filter_)(key)) {
105                 continue;
106             }
107             observer_(key, { entry.value.begin(), entry.value.end() }, action);
108         }
109     }
110 }
111 
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)112 void MetaObserver::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
113 {
114     (void)origin;
115     (void)originalId;
116     HandleChanges(MetaDataManager::INSERT, data.primaryData[MetaDataManager::INSERT]);
117     HandleChanges(MetaDataManager::UPDATE, data.primaryData[MetaDataManager::UPDATE]);
118     HandleChanges(MetaDataManager::DELETE, data.primaryData[MetaDataManager::DELETE]);
119 }
120 
HandleChanges(int32_t flag,std::vector<std::vector<Type>> & priData)121 void MetaObserver::HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData)
122 {
123     if (priData.empty()) {
124         return;
125     }
126     if (filter_ == nullptr) {
127         ZLOGE("filter_ is nullptr!");
128         return;
129     }
130     for (const auto &priKey : priData) {
131         if (priKey.empty()) {
132             continue;
133         }
134         auto strValue = std::get_if<std::string>(&priKey[0]);
135         if (strValue != nullptr) {
136             auto key = *strValue;
137             if (!(*filter_)(key)) {
138                 continue;
139             }
140             observer_(key, "", flag);
141         }
142     }
143 }
144 
GetInstance()145 MetaDataManager &MetaDataManager::GetInstance()
146 {
147     static MetaDataManager instance;
148     return instance;
149 }
150 
151 MetaDataManager::MetaDataManager() = default;
152 
~MetaDataManager()153 MetaDataManager::~MetaDataManager()
154 {
155     metaObservers_.Clear();
156 }
157 
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup,const std::string & storeId)158 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup, const std::string &storeId)
159 {
160     if (metaStore == nullptr) {
161         return;
162     }
163 
164     std::lock_guard<decltype(mutex_)> lg(mutex_);
165     if (inited_) {
166         return;
167     }
168     metaStore_ = std::move(metaStore);
169     backup_ = backup;
170     storeId_ = storeId;
171     inited_ = true;
172 }
173 
SetSyncer(const Syncer & syncer)174 void MetaDataManager::SetSyncer(const Syncer &syncer)
175 {
176     if (metaStore_ == nullptr) {
177         return;
178     }
179     syncer_ = syncer;
180 }
181 
SetCloudSyncer(const CloudSyncer & cloudSyncer)182 void MetaDataManager::SetCloudSyncer(const CloudSyncer &cloudSyncer)
183 {
184     if (metaStore_ == nullptr) {
185         return;
186     }
187     cloudSyncer_ = cloudSyncer;
188 }
189 
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)190 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
191 {
192     if (!inited_) {
193         return false;
194     }
195 
196     auto data = Serializable::Marshall(value);
197     auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
198                           : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
199     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
200         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
201             status, isLocal, Anonymous::Change(key).c_str());
202         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
203         StopSA();
204         return false;
205     }
206     if (status == DistributedDB::DBStatus::OK && backup_) {
207         backup_(metaStore_);
208     }
209     if (!isLocal && cloudSyncer_) {
210         cloudSyncer_();
211     }
212     if (status != DistributedDB::DBStatus::OK) {
213         ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal,
214             Anonymous::Change(key).c_str());
215     }
216     DelCacheMeta(key, isLocal);
217     return status == DistributedDB::DBStatus::OK;
218 }
219 
SaveMeta(const std::vector<Entry> & values,bool isLocal)220 bool MetaDataManager::SaveMeta(const std::vector<Entry> &values, bool isLocal)
221 {
222     if (!inited_) {
223         return false;
224     }
225     if (values.empty()) {
226         return true;
227     }
228     std::vector<DistributedDB::Entry> entries;
229     entries.reserve(values.size());
230     for (const auto &[key, value] : values) {
231         entries.push_back({ { key.begin(), key.end() }, { value.begin(), value.end() } });
232         DelCacheMeta(key, isLocal);
233     }
234     auto status = isLocal ? metaStore_->PutLocalBatch(entries) : metaStore_->PutBatch(entries);
235     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
236         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, size:%{public}zu", status, isLocal, values.size());
237         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
238         StopSA();
239         return false;
240     }
241     if (status == DistributedDB::DBStatus::OK && backup_) {
242         backup_(metaStore_);
243     }
244     if (!isLocal && cloudSyncer_) {
245         cloudSyncer_();
246     }
247     if (status != DistributedDB::DBStatus::OK) {
248         ZLOGE("failed! status:%{public}d isLocal:%{public}d, size:%{public}zu", status, isLocal, values.size());
249     }
250     return status == DistributedDB::DBStatus::OK;
251 }
252 
LoadMeta(const std::string & key,Serializable & value,bool isLocal)253 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
254 {
255     if (!inited_) {
256         return false;
257     }
258     if (LoadCacheMeta(key, value, isLocal)) {
259         return true;
260     }
261     DistributedDB::Value data;
262     auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
263                           : metaStore_->Get({ key.begin(), key.end() }, data);
264     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
265         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
266             status, isLocal, Anonymous::Change(key).c_str());
267         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
268         StopSA();
269         return false;
270     }
271     if (status != DistributedDB::DBStatus::OK) {
272         return false;
273     }
274     std::string tempdata(data.begin(), data.end());
275     SaveCacheMeta(key, tempdata, isLocal);
276     Serializable::Unmarshall(tempdata, value);
277     if (isLocal) {
278         data.assign(data.size(), 0);
279     }
280     return true;
281 }
282 
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)283 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
284 {
285     std::vector<DistributedDB::Entry> dbEntries;
286     auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
287                           : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
288     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
289         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d", status, isLocal);
290         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
291         StopSA();
292         return false;
293     }
294     if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
295         ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
296             status, isLocal);
297         return false;
298     }
299     entries.resize(dbEntries.size());
300     for (size_t i = 0; i < dbEntries.size(); ++i) {
301         entries[i] = std::move(dbEntries[i].value);
302     }
303     return true;
304 }
305 
DelMeta(const std::string & key,bool isLocal)306 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
307 {
308     if (!inited_) {
309         return false;
310     }
311     DelCacheMeta(key, isLocal);
312     auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
313                           : metaStore_->Delete({ key.begin(), key.end() });
314     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
315         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
316             status, isLocal, Anonymous::Change(key).c_str());
317         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
318         StopSA();
319         return false;
320     }
321     if (status == DistributedDB::DBStatus::OK && backup_) {
322         backup_(metaStore_);
323     }
324     if (!isLocal && cloudSyncer_) {
325         cloudSyncer_();
326     }
327     return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
328 }
329 
DelMeta(const std::vector<std::string> & keys,bool isLocal)330 bool MetaDataManager::DelMeta(const std::vector<std::string> &keys, bool isLocal)
331 {
332     if (!inited_) {
333         return false;
334     }
335     if (keys.empty()) {
336         return true;
337     }
338     std::vector<DistributedDB::Key> dbKeys;
339     dbKeys.reserve(keys.size());
340     for (auto &key : keys) {
341         dbKeys.emplace_back(key.begin(), key.end());
342         DelCacheMeta(key, isLocal);
343     }
344     auto status = isLocal ? metaStore_->DeleteLocalBatch(dbKeys) : metaStore_->DeleteBatch(dbKeys);
345     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
346         ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key size:%{public}zu", status, isLocal,
347             dbKeys.size());
348         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
349         StopSA();
350         return false;
351     }
352     if (status == DistributedDB::DBStatus::OK && backup_) {
353         backup_(metaStore_);
354     }
355     if (!isLocal && cloudSyncer_) {
356         cloudSyncer_();
357     }
358     return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
359 }
360 
Sync(const std::vector<std::string> & devices,OnComplete complete,bool wait,bool isRetry)361 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete, bool wait, bool isRetry)
362 {
363     if (!inited_ || devices.empty()) {
364         return false;
365     }
366     DistributedDB::DeviceSyncOption syncOption;
367     syncOption.devices = devices;
368     syncOption.mode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL;
369     syncOption.isWait = wait;
370     syncOption.isRetry = isRetry;
371     auto status = metaStore_->Sync(syncOption,
372         [complete](const std::map<std::string, DistributedDB::DBStatus> &dbResults) {
373             if (complete == nullptr) {
374                 return;
375             }
376             std::map<std::string, int32_t> results;
377             for (auto &[uuid, status] : dbResults) {
378                 results.insert_or_assign(uuid, static_cast<int32_t>(status));
379             }
380             complete(results);
381         });
382     if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
383         ZLOGE("db corrupted! status:%{public}d", status);
384         CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
385         StopSA();
386         return false;
387     }
388     if (status != DistributedDB::OK) {
389         ZLOGW("meta data sync error %{public}d.", status);
390     }
391     return status == DistributedDB::OK;
392 }
393 
Subscribe(std::shared_ptr<Filter> filter,Observer observer)394 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
395 {
396     if (!inited_) {
397         return false;
398     }
399 
400     return metaObservers_.ComputeIfAbsent("", [this, &observer, &filter](const std::string &key) -> auto {
401         return std::make_shared<MetaObserver>(metaStore_, filter, observer);
402     });
403 }
404 
Subscribe(std::string prefix,Observer observer,bool isLocal)405 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
406 {
407     if (!inited_) {
408         return false;
409     }
410 
411     return metaObservers_.ComputeIfAbsent(prefix, [this, isLocal, &observer, &prefix](const std::string &key) -> auto {
412         return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
413     });
414 }
415 
Unsubscribe(std::string filter)416 bool MetaDataManager::Unsubscribe(std::string filter)
417 {
418     if (!inited_) {
419         return false;
420     }
421 
422     return metaObservers_.Erase(filter);
423 }
424 
StopSA()425 void MetaDataManager::StopSA()
426 {
427     ZLOGI("stop distributeddata");
428     int err = raise(SIGKILL);
429     if (err < 0) {
430         ZLOGE("stop distributeddata failed, errCode: %{public}d", err);
431     }
432 }
433 } // namespace OHOS::DistributedData