1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "metadata/meta_data_manager.h"
17 #define LOG_TAG "MetaDataManager"
18
19 #include "kv_store_nb_delegate.h"
20 #include "log_print.h"
21 #include "utils/anonymous.h"
22
23 namespace OHOS::DistributedData {
24 class MetaObserver : public DistributedDB::KvStoreObserver {
25 public:
26 using Filter = MetaDataManager::Filter;
27 using MetaStore = MetaDataManager::MetaStore;
28 using Observer = MetaDataManager::Observer;
29 MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer);
30 virtual ~MetaObserver();
31
32 // Database change callback
33 void OnChange(const DistributedDB::KvStoreChangedData &data) override;
34
35 private:
36 std::shared_ptr<MetaStore> metaStore_;
37 std::shared_ptr<Filter> filter_;
38 Observer observer_;
39 };
40
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer)41 MetaObserver::MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer)
42 : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
43 {
44 if (metaStore_ != nullptr) {
45 int mode = DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN;
46 auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
47 if (status != DistributedDB::DBStatus::OK) {
48 ZLOGE("register meta observer failed :%{public}d.", status);
49 }
50 }
51 }
52
~MetaObserver()53 MetaObserver::~MetaObserver()
54 {
55 if (metaStore_ != nullptr) {
56 metaStore_->UnRegisterObserver(this);
57 }
58 }
59
operator ()(const std::string & key) const60 bool MetaDataManager::Filter::operator()(const std::string &key) const
61 {
62 return key.find(pattern_) == 0;
63 }
64
GetKey() const65 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
66 {
67 return std::vector<uint8_t>();
68 }
69
Filter(const std::string & pattern)70 MetaDataManager::Filter::Filter(const std::string &pattern)
71 : pattern_(pattern)
72 {
73 }
74
OnChange(const DistributedDB::KvStoreChangedData & data)75 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
76 {
77 auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
78 int32_t next = MetaDataManager::INSERT;
79 for (auto value : values) {
80 int32_t action = next++;
81 if (value->empty()) {
82 continue;
83 }
84 for (const auto &entry : *value) {
85 std::string key(entry.key.begin(), entry.key.end());
86 if (!(*filter_)(key)) {
87 continue;
88 }
89 observer_(key, { entry.value.begin(), entry.value.end() }, action);
90 }
91 }
92 }
93
GetInstance()94 MetaDataManager &MetaDataManager::GetInstance()
95 {
96 static MetaDataManager instance;
97 return instance;
98 }
99
100 MetaDataManager::MetaDataManager() = default;
101
~MetaDataManager()102 MetaDataManager::~MetaDataManager()
103 {
104 metaObservers_.Clear();
105 }
106
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup,const Syncer & syncer)107 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup, const Syncer &syncer)
108 {
109 if (metaStore == nullptr) {
110 return;
111 }
112
113 std::lock_guard<decltype(mutex_)> lg(mutex_);
114 if (inited_) {
115 return;
116 }
117 metaStore_ = std::move(metaStore);
118 backup_ = backup;
119 syncer_ = syncer;
120 inited_ = true;
121 }
122
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)123 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
124 {
125 if (!inited_) {
126 return false;
127 }
128
129 auto data = Serializable::Marshall(value);
130 auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
131 : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
132 if (status == DistributedDB::DBStatus::OK && backup_) {
133 backup_(metaStore_);
134 }
135 if (!isLocal && syncer_) {
136 syncer_(metaStore_, status);
137 }
138 if (status != DistributedDB::DBStatus::OK) {
139 ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s",
140 status, isLocal, Anonymous::Change(key).c_str());
141 }
142 return status == DistributedDB::DBStatus::OK;
143 }
144
LoadMeta(const std::string & key,Serializable & value,bool isLocal)145 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
146 {
147 if (!inited_) {
148 return false;
149 }
150
151 DistributedDB::Value data;
152 auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
153 : metaStore_->Get({ key.begin(), key.end() }, data);
154 if (status != DistributedDB::DBStatus::OK) {
155 return false;
156 }
157 Serializable::Unmarshall({ data.begin(), data.end() }, value);
158 if (isLocal) {
159 data.assign(data.size(), 0);
160 }
161 return true;
162 }
163
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)164 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
165 {
166 std::vector<DistributedDB::Entry> dbEntries;
167 auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
168 : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
169 if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
170 return false;
171 }
172 entries.resize(dbEntries.size());
173 for (size_t i = 0; i < dbEntries.size(); ++i) {
174 entries[i] = std::move(dbEntries[i].value);
175 }
176 return true;
177 }
178
DelMeta(const std::string & key,bool isLocal)179 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
180 {
181 if (!inited_) {
182 return false;
183 }
184
185 DistributedDB::Value data;
186 auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
187 : metaStore_->Delete({ key.begin(), key.end() });
188 if (status == DistributedDB::DBStatus::OK && backup_) {
189 backup_(metaStore_);
190 }
191 if (!isLocal && syncer_) {
192 syncer_(metaStore_, status);
193 }
194 return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
195 }
196
Subscribe(std::shared_ptr<Filter> filter,Observer observer)197 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
198 {
199 if (!inited_) {
200 return false;
201 }
202
203 return metaObservers_.ComputeIfAbsent(
204 "", [ this, &observer, &filter ](const std::string &key) -> auto {
205 return std::make_shared<MetaObserver>(metaStore_, filter, observer);
206 });
207 }
208
Subscribe(std::string prefix,Observer observer)209 bool MetaDataManager::Subscribe(std::string prefix, Observer observer)
210 {
211 if (!inited_) {
212 return false;
213 }
214
215 return metaObservers_.ComputeIfAbsent(
216 prefix, [ this, &observer, &prefix ](const std::string &key) -> auto {
217 return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer);
218 });
219 }
220
Unsubscribe(std::string filter)221 bool MetaDataManager::Unsubscribe(std::string filter)
222 {
223 if (!inited_) {
224 return false;
225 }
226
227 return metaObservers_.Erase(filter);
228 }
229 } // namespace OHOS::DistributedData