1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "metadata/meta_data_manager.h"
17 #include <csignal>
18 #define LOG_TAG "MetaDataManager"
19
20 #include "directory/directory_manager.h"
21 #include "kv_store_nb_delegate.h"
22 #include "log_print.h"
23 #include "utils/anonymous.h"
24 #include "utils/corrupt_reporter.h"
25
26 namespace OHOS::DistributedData {
27 class MetaObserver : public DistributedDB::KvStoreObserver {
28 public:
29 using Filter = MetaDataManager::Filter;
30 using MetaStore = MetaDataManager::MetaStore;
31 using Observer = MetaDataManager::Observer;
32 using DBOrigin = DistributedDB::Origin;
33 using DBChangeData = DistributedDB::ChangedData;
34 using Type = DistributedDB::Type;
35 MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer,
36 bool isLocal = false);
37 virtual ~MetaObserver();
38
39 // Database change callback
40 void OnChange(const DistributedDB::KvStoreChangedData &data) override;
41 void OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) override;
42
43 void HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData);
44
45 private:
46 std::shared_ptr<MetaStore> metaStore_;
47 std::shared_ptr<Filter> filter_;
48 Observer observer_;
49 };
50
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)51 MetaObserver::MetaObserver(
52 std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
53 : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
54 {
55 if (metaStore_ != nullptr) {
56 int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
57 : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
58 auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
59 if (!isLocal) {
60 status = metaStore_->RegisterObserver(filter_->GetKey(), DistributedDB::OBSERVER_CHANGES_CLOUD, this);
61 }
62 if (status != DistributedDB::DBStatus::OK) {
63 ZLOGE("register meta observer failed :%{public}d.", status);
64 }
65 }
66 }
67
~MetaObserver()68 MetaObserver::~MetaObserver()
69 {
70 if (metaStore_ != nullptr) {
71 metaStore_->UnRegisterObserver(this);
72 }
73 }
74
operator ()(const std::string & key) const75 bool MetaDataManager::Filter::operator()(const std::string &key) const
76 {
77 return key.find(pattern_) == 0;
78 }
79
GetKey() const80 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
81 {
82 return std::vector<uint8_t>();
83 }
84
Filter(const std::string & pattern)85 MetaDataManager::Filter::Filter(const std::string &pattern) : pattern_(pattern)
86 {
87 }
88
OnChange(const DistributedDB::KvStoreChangedData & data)89 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
90 {
91 if (filter_ == nullptr) {
92 ZLOGE("filter_ is nullptr!");
93 return;
94 }
95 auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
96 int32_t next = MetaDataManager::INSERT;
97 for (auto value : values) {
98 int32_t action = next++;
99 if (value->empty()) {
100 continue;
101 }
102 for (const auto &entry : *value) {
103 std::string key(entry.key.begin(), entry.key.end());
104 if (!(*filter_)(key)) {
105 continue;
106 }
107 observer_(key, { entry.value.begin(), entry.value.end() }, action);
108 }
109 }
110 }
111
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)112 void MetaObserver::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
113 {
114 (void)origin;
115 (void)originalId;
116 HandleChanges(MetaDataManager::INSERT, data.primaryData[MetaDataManager::INSERT]);
117 HandleChanges(MetaDataManager::UPDATE, data.primaryData[MetaDataManager::UPDATE]);
118 HandleChanges(MetaDataManager::DELETE, data.primaryData[MetaDataManager::DELETE]);
119 }
120
HandleChanges(int32_t flag,std::vector<std::vector<Type>> & priData)121 void MetaObserver::HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData)
122 {
123 if (priData.empty()) {
124 return;
125 }
126 if (filter_ == nullptr) {
127 ZLOGE("filter_ is nullptr!");
128 return;
129 }
130 for (const auto &priKey : priData) {
131 if (priKey.empty()) {
132 continue;
133 }
134 auto strValue = std::get_if<std::string>(&priKey[0]);
135 if (strValue != nullptr) {
136 auto key = *strValue;
137 if (!(*filter_)(key)) {
138 continue;
139 }
140 observer_(key, "", flag);
141 }
142 }
143 }
144
GetInstance()145 MetaDataManager &MetaDataManager::GetInstance()
146 {
147 static MetaDataManager instance;
148 return instance;
149 }
150
151 MetaDataManager::MetaDataManager() = default;
152
~MetaDataManager()153 MetaDataManager::~MetaDataManager()
154 {
155 metaObservers_.Clear();
156 }
157
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup,const std::string & storeId)158 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup, const std::string &storeId)
159 {
160 if (metaStore == nullptr) {
161 return;
162 }
163
164 std::lock_guard<decltype(mutex_)> lg(mutex_);
165 if (inited_) {
166 return;
167 }
168 metaStore_ = std::move(metaStore);
169 backup_ = backup;
170 storeId_ = storeId;
171 inited_ = true;
172 }
173
SetSyncer(const Syncer & syncer)174 void MetaDataManager::SetSyncer(const Syncer &syncer)
175 {
176 if (metaStore_ == nullptr) {
177 return;
178 }
179 syncer_ = syncer;
180 }
181
SetCloudSyncer(const CloudSyncer & cloudSyncer)182 void MetaDataManager::SetCloudSyncer(const CloudSyncer &cloudSyncer)
183 {
184 if (metaStore_ == nullptr) {
185 return;
186 }
187 cloudSyncer_ = cloudSyncer;
188 }
189
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)190 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
191 {
192 if (!inited_) {
193 return false;
194 }
195
196 auto data = Serializable::Marshall(value);
197 auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
198 : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
199 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
200 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
201 status, isLocal, Anonymous::Change(key).c_str());
202 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
203 StopSA();
204 return false;
205 }
206 if (status == DistributedDB::DBStatus::OK && backup_) {
207 backup_(metaStore_);
208 }
209 if (!isLocal && cloudSyncer_) {
210 cloudSyncer_();
211 }
212 if (status != DistributedDB::DBStatus::OK) {
213 ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal,
214 Anonymous::Change(key).c_str());
215 }
216 return status == DistributedDB::DBStatus::OK;
217 }
218
LoadMeta(const std::string & key,Serializable & value,bool isLocal)219 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
220 {
221 if (!inited_) {
222 return false;
223 }
224
225 DistributedDB::Value data;
226 auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
227 : metaStore_->Get({ key.begin(), key.end() }, data);
228 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
229 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
230 status, isLocal, Anonymous::Change(key).c_str());
231 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
232 StopSA();
233 return false;
234 }
235 if (status != DistributedDB::DBStatus::OK) {
236 return false;
237 }
238 Serializable::Unmarshall({ data.begin(), data.end() }, value);
239 if (isLocal) {
240 data.assign(data.size(), 0);
241 }
242 return true;
243 }
244
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)245 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
246 {
247 std::vector<DistributedDB::Entry> dbEntries;
248 auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
249 : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
250 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
251 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d", status, isLocal);
252 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
253 StopSA();
254 return false;
255 }
256 if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
257 ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
258 status, isLocal);
259 return false;
260 }
261 entries.resize(dbEntries.size());
262 for (size_t i = 0; i < dbEntries.size(); ++i) {
263 entries[i] = std::move(dbEntries[i].value);
264 }
265 return true;
266 }
267
DelMeta(const std::string & key,bool isLocal)268 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
269 {
270 if (!inited_) {
271 return false;
272 }
273
274 DistributedDB::Value data;
275 auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
276 : metaStore_->Delete({ key.begin(), key.end() });
277 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
278 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
279 status, isLocal, Anonymous::Change(key).c_str());
280 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
281 StopSA();
282 return false;
283 }
284 if (status == DistributedDB::DBStatus::OK && backup_) {
285 backup_(metaStore_);
286 }
287 if (!isLocal && cloudSyncer_) {
288 cloudSyncer_();
289 }
290 return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
291 }
292
Sync(const std::vector<std::string> & devices,OnComplete complete,bool wait)293 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete, bool wait)
294 {
295 if (!inited_ || devices.empty()) {
296 return false;
297 }
298 auto status = metaStore_->Sync(devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) {
299 std::map<std::string, int32_t> results;
300 for (auto &[uuid, status] : dbResults) {
301 results.insert_or_assign(uuid, static_cast<int32_t>(status));
302 }
303 complete(results);
304 }, wait);
305 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
306 ZLOGE("db corrupted! status:%{public}d", status);
307 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
308 StopSA();
309 return false;
310 }
311 if (status != DistributedDB::OK) {
312 ZLOGW("meta data sync error %{public}d.", status);
313 }
314 return status == DistributedDB::OK;
315 }
316
Subscribe(std::shared_ptr<Filter> filter,Observer observer)317 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
318 {
319 if (!inited_) {
320 return false;
321 }
322
323 return metaObservers_.ComputeIfAbsent("", [this, &observer, &filter](const std::string &key) -> auto {
324 return std::make_shared<MetaObserver>(metaStore_, filter, observer);
325 });
326 }
327
Subscribe(std::string prefix,Observer observer,bool isLocal)328 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
329 {
330 if (!inited_) {
331 return false;
332 }
333
334 return metaObservers_.ComputeIfAbsent(prefix, [this, isLocal, &observer, &prefix](const std::string &key) -> auto {
335 return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
336 });
337 }
338
Unsubscribe(std::string filter)339 bool MetaDataManager::Unsubscribe(std::string filter)
340 {
341 if (!inited_) {
342 return false;
343 }
344
345 return metaObservers_.Erase(filter);
346 }
347
StopSA()348 void MetaDataManager::StopSA()
349 {
350 ZLOGI("stop distributeddata");
351 int err = raise(SIGKILL);
352 if (err < 0) {
353 ZLOGE("stop distributeddata failed, errCode: %{public}d", err);
354 }
355 }
356 } // namespace OHOS::DistributedData