• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "metadata/meta_data_manager.h"
17 #define LOG_TAG "MetaDataManager"
18 
19 #include "kv_store_nb_delegate.h"
20 #include "log_print.h"
21 #include "utils/anonymous.h"
22 
23 namespace OHOS::DistributedData {
24 class MetaObserver : public DistributedDB::KvStoreObserver {
25 public:
26     using Filter = MetaDataManager::Filter;
27     using MetaStore = MetaDataManager::MetaStore;
28     using Observer = MetaDataManager::Observer;
29     MetaObserver(std::shared_ptr<MetaStore> metaStore,
30         std::shared_ptr<Filter> filter, Observer observer, bool isLocal = false);
31     virtual ~MetaObserver();
32 
33     // Database change callback
34     void OnChange(const DistributedDB::KvStoreChangedData &data) override;
35 
36 private:
37     std::shared_ptr<MetaStore> metaStore_;
38     std::shared_ptr<Filter> filter_;
39     Observer observer_;
40 };
41 
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)42 MetaObserver::MetaObserver(std::shared_ptr<MetaStore> metaStore,
43     std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
44     : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
45 {
46     if (metaStore_ != nullptr) {
47         int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
48                            : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
49         auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
50         if (status != DistributedDB::DBStatus::OK) {
51             ZLOGE("register meta observer failed :%{public}d.", status);
52         }
53     }
54 }
55 
~MetaObserver()56 MetaObserver::~MetaObserver()
57 {
58     if (metaStore_ != nullptr) {
59         metaStore_->UnRegisterObserver(this);
60     }
61 }
62 
operator ()(const std::string & key) const63 bool MetaDataManager::Filter::operator()(const std::string &key) const
64 {
65     return key.find(pattern_) == 0;
66 }
67 
GetKey() const68 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
69 {
70     return std::vector<uint8_t>();
71 }
72 
Filter(const std::string & pattern)73 MetaDataManager::Filter::Filter(const std::string &pattern)
74     : pattern_(pattern)
75 {
76 }
77 
OnChange(const DistributedDB::KvStoreChangedData & data)78 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
79 {
80     auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
81     int32_t next = MetaDataManager::INSERT;
82     for (auto value : values) {
83         int32_t action = next++;
84         if (value->empty()) {
85             continue;
86         }
87         for (const auto &entry : *value) {
88             std::string key(entry.key.begin(), entry.key.end());
89             if (!(*filter_)(key)) {
90                 continue;
91             }
92             observer_(key, { entry.value.begin(), entry.value.end() }, action);
93         }
94     }
95 }
96 
GetInstance()97 MetaDataManager &MetaDataManager::GetInstance()
98 {
99     static MetaDataManager instance;
100     return instance;
101 }
102 
103 MetaDataManager::MetaDataManager() = default;
104 
~MetaDataManager()105 MetaDataManager::~MetaDataManager()
106 {
107     metaObservers_.Clear();
108 }
109 
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup)110 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup)
111 {
112     if (metaStore == nullptr) {
113         return;
114     }
115 
116     std::lock_guard<decltype(mutex_)> lg(mutex_);
117     if (inited_) {
118         return;
119     }
120     metaStore_ = std::move(metaStore);
121     backup_ = backup;
122     inited_ = true;
123 }
124 
SetSyncer(const Syncer & syncer)125 void MetaDataManager::SetSyncer(const Syncer &syncer)
126 {
127     if (metaStore_ == nullptr) {
128         return;
129     }
130     syncer_ = syncer;
131 }
132 
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)133 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
134 {
135     if (!inited_) {
136         return false;
137     }
138 
139     auto data = Serializable::Marshall(value);
140     auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
141                           : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
142     if (status == DistributedDB::DBStatus::OK && backup_) {
143         backup_(metaStore_);
144     }
145     if (!isLocal && syncer_) {
146         syncer_(metaStore_, status);
147     }
148     if (status != DistributedDB::DBStatus::OK) {
149         ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s",
150             status, isLocal, Anonymous::Change(key).c_str());
151     }
152     return status == DistributedDB::DBStatus::OK;
153 }
154 
LoadMeta(const std::string & key,Serializable & value,bool isLocal)155 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
156 {
157     if (!inited_) {
158         return false;
159     }
160 
161     DistributedDB::Value data;
162     auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
163                           : metaStore_->Get({ key.begin(), key.end() }, data);
164     if (status != DistributedDB::DBStatus::OK) {
165         return false;
166     }
167     Serializable::Unmarshall({ data.begin(), data.end() }, value);
168     if (isLocal) {
169         data.assign(data.size(), 0);
170     }
171     return true;
172 }
173 
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)174 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
175 {
176     std::vector<DistributedDB::Entry> dbEntries;
177     auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
178                           : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
179     if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
180         ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
181             status, isLocal);
182         return false;
183     }
184     entries.resize(dbEntries.size());
185     for (size_t i = 0; i < dbEntries.size(); ++i) {
186         entries[i] = std::move(dbEntries[i].value);
187     }
188     return true;
189 }
190 
DelMeta(const std::string & key,bool isLocal)191 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
192 {
193     if (!inited_) {
194         return false;
195     }
196 
197     DistributedDB::Value data;
198     auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
199                           : metaStore_->Delete({ key.begin(), key.end() });
200     if (status == DistributedDB::DBStatus::OK && backup_) {
201         backup_(metaStore_);
202     }
203     if (!isLocal && syncer_) {
204         syncer_(metaStore_, status);
205     }
206     return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
207 }
208 
Sync(const std::vector<std::string> & devices,OnComplete complete)209 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete)
210 {
211     if (!inited_ || devices.empty()) {
212         return false;
213     }
214     auto status = metaStore_->Sync(
215         devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) {
216         std::map<std::string, int32_t> results;
217         for (auto &[uuid, status] : dbResults) {
218             results.insert_or_assign(uuid, static_cast<int32_t>(status));
219         }
220         complete(results);
221     });
222     if (status != DistributedDB::OK) {
223         ZLOGW("meta data sync error %{public}d.", status);
224     }
225     return status == DistributedDB::OK;
226 }
227 
Subscribe(std::shared_ptr<Filter> filter,Observer observer)228 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
229 {
230     if (!inited_) {
231         return false;
232     }
233 
234     return metaObservers_.ComputeIfAbsent(
235         "", [ this, &observer, &filter ](const std::string &key) -> auto {
236             return std::make_shared<MetaObserver>(metaStore_, filter, observer);
237         });
238 }
239 
Subscribe(std::string prefix,Observer observer,bool isLocal)240 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
241 {
242     if (!inited_) {
243         return false;
244     }
245 
246     return metaObservers_.ComputeIfAbsent(
247         prefix, [ this, isLocal, &observer, &prefix ](const std::string &key) -> auto {
248             return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
249         });
250 }
251 
Unsubscribe(std::string filter)252 bool MetaDataManager::Unsubscribe(std::string filter)
253 {
254     if (!inited_) {
255         return false;
256     }
257 
258     return metaObservers_.Erase(filter);
259 }
260 } // namespace OHOS::DistributedData