1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "metadata/meta_data_manager.h"
17 #define LOG_TAG "MetaDataManager"
18
19 #include "kv_store_nb_delegate.h"
20 #include "log_print.h"
21 #include "utils/anonymous.h"
22
23 namespace OHOS::DistributedData {
24 class MetaObserver : public DistributedDB::KvStoreObserver {
25 public:
26 using Filter = MetaDataManager::Filter;
27 using MetaStore = MetaDataManager::MetaStore;
28 using Observer = MetaDataManager::Observer;
29 MetaObserver(std::shared_ptr<MetaStore> metaStore,
30 std::shared_ptr<Filter> filter, Observer observer, bool isLocal = false);
31 virtual ~MetaObserver();
32
33 // Database change callback
34 void OnChange(const DistributedDB::KvStoreChangedData &data) override;
35
36 private:
37 std::shared_ptr<MetaStore> metaStore_;
38 std::shared_ptr<Filter> filter_;
39 Observer observer_;
40 };
41
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)42 MetaObserver::MetaObserver(std::shared_ptr<MetaStore> metaStore,
43 std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
44 : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
45 {
46 if (metaStore_ != nullptr) {
47 int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
48 : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
49 auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
50 if (status != DistributedDB::DBStatus::OK) {
51 ZLOGE("register meta observer failed :%{public}d.", status);
52 }
53 }
54 }
55
~MetaObserver()56 MetaObserver::~MetaObserver()
57 {
58 if (metaStore_ != nullptr) {
59 metaStore_->UnRegisterObserver(this);
60 }
61 }
62
operator ()(const std::string & key) const63 bool MetaDataManager::Filter::operator()(const std::string &key) const
64 {
65 return key.find(pattern_) == 0;
66 }
67
GetKey() const68 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
69 {
70 return std::vector<uint8_t>();
71 }
72
Filter(const std::string & pattern)73 MetaDataManager::Filter::Filter(const std::string &pattern)
74 : pattern_(pattern)
75 {
76 }
77
OnChange(const DistributedDB::KvStoreChangedData & data)78 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
79 {
80 auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
81 int32_t next = MetaDataManager::INSERT;
82 for (auto value : values) {
83 int32_t action = next++;
84 if (value->empty()) {
85 continue;
86 }
87 for (const auto &entry : *value) {
88 std::string key(entry.key.begin(), entry.key.end());
89 if (!(*filter_)(key)) {
90 continue;
91 }
92 observer_(key, { entry.value.begin(), entry.value.end() }, action);
93 }
94 }
95 }
96
GetInstance()97 MetaDataManager &MetaDataManager::GetInstance()
98 {
99 static MetaDataManager instance;
100 return instance;
101 }
102
103 MetaDataManager::MetaDataManager() = default;
104
~MetaDataManager()105 MetaDataManager::~MetaDataManager()
106 {
107 metaObservers_.Clear();
108 }
109
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup)110 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup)
111 {
112 if (metaStore == nullptr) {
113 return;
114 }
115
116 std::lock_guard<decltype(mutex_)> lg(mutex_);
117 if (inited_) {
118 return;
119 }
120 metaStore_ = std::move(metaStore);
121 backup_ = backup;
122 inited_ = true;
123 }
124
SetSyncer(const Syncer & syncer)125 void MetaDataManager::SetSyncer(const Syncer &syncer)
126 {
127 if (metaStore_ == nullptr) {
128 return;
129 }
130 syncer_ = syncer;
131 }
132
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)133 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
134 {
135 if (!inited_) {
136 return false;
137 }
138
139 auto data = Serializable::Marshall(value);
140 auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
141 : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
142 if (status == DistributedDB::DBStatus::OK && backup_) {
143 backup_(metaStore_);
144 }
145 if (!isLocal && syncer_) {
146 syncer_(metaStore_, status);
147 }
148 if (status != DistributedDB::DBStatus::OK) {
149 ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s",
150 status, isLocal, Anonymous::Change(key).c_str());
151 }
152 return status == DistributedDB::DBStatus::OK;
153 }
154
LoadMeta(const std::string & key,Serializable & value,bool isLocal)155 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
156 {
157 if (!inited_) {
158 return false;
159 }
160
161 DistributedDB::Value data;
162 auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
163 : metaStore_->Get({ key.begin(), key.end() }, data);
164 if (status != DistributedDB::DBStatus::OK) {
165 return false;
166 }
167 Serializable::Unmarshall({ data.begin(), data.end() }, value);
168 if (isLocal) {
169 data.assign(data.size(), 0);
170 }
171 return true;
172 }
173
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)174 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
175 {
176 std::vector<DistributedDB::Entry> dbEntries;
177 auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
178 : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
179 if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
180 ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
181 status, isLocal);
182 return false;
183 }
184 entries.resize(dbEntries.size());
185 for (size_t i = 0; i < dbEntries.size(); ++i) {
186 entries[i] = std::move(dbEntries[i].value);
187 }
188 return true;
189 }
190
DelMeta(const std::string & key,bool isLocal)191 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
192 {
193 if (!inited_) {
194 return false;
195 }
196
197 DistributedDB::Value data;
198 auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
199 : metaStore_->Delete({ key.begin(), key.end() });
200 if (status == DistributedDB::DBStatus::OK && backup_) {
201 backup_(metaStore_);
202 }
203 if (!isLocal && syncer_) {
204 syncer_(metaStore_, status);
205 }
206 return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
207 }
208
Sync(const std::vector<std::string> & devices,OnComplete complete)209 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete)
210 {
211 if (!inited_ || devices.empty()) {
212 return false;
213 }
214 auto status = metaStore_->Sync(
215 devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) {
216 std::map<std::string, int32_t> results;
217 for (auto &[uuid, status] : dbResults) {
218 results.insert_or_assign(uuid, static_cast<int32_t>(status));
219 }
220 complete(results);
221 });
222 if (status != DistributedDB::OK) {
223 ZLOGW("meta data sync error %{public}d.", status);
224 }
225 return status == DistributedDB::OK;
226 }
227
Subscribe(std::shared_ptr<Filter> filter,Observer observer)228 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
229 {
230 if (!inited_) {
231 return false;
232 }
233
234 return metaObservers_.ComputeIfAbsent(
235 "", [ this, &observer, &filter ](const std::string &key) -> auto {
236 return std::make_shared<MetaObserver>(metaStore_, filter, observer);
237 });
238 }
239
Subscribe(std::string prefix,Observer observer,bool isLocal)240 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
241 {
242 if (!inited_) {
243 return false;
244 }
245
246 return metaObservers_.ComputeIfAbsent(
247 prefix, [ this, isLocal, &observer, &prefix ](const std::string &key) -> auto {
248 return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
249 });
250 }
251
Unsubscribe(std::string filter)252 bool MetaDataManager::Unsubscribe(std::string filter)
253 {
254 if (!inited_) {
255 return false;
256 }
257
258 return metaObservers_.Erase(filter);
259 }
260 } // namespace OHOS::DistributedData