1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "metadata/meta_data_manager.h"
17 #include <csignal>
18 #define LOG_TAG "MetaDataManager"
19
20 #include "directory/directory_manager.h"
21 #include "kv_store_nb_delegate.h"
22 #include "log_print.h"
23 #include "utils/anonymous.h"
24 #include "utils/corrupt_reporter.h"
25
26 namespace OHOS::DistributedData {
27 class MetaObserver : public DistributedDB::KvStoreObserver {
28 public:
29 using Filter = MetaDataManager::Filter;
30 using MetaStore = MetaDataManager::MetaStore;
31 using Observer = MetaDataManager::Observer;
32 using DBOrigin = DistributedDB::Origin;
33 using DBChangeData = DistributedDB::ChangedData;
34 using Type = DistributedDB::Type;
35 MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer,
36 bool isLocal = false);
37 virtual ~MetaObserver();
38
39 // Database change callback
40 void OnChange(const DistributedDB::KvStoreChangedData &data) override;
41 void OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) override;
42
43 void HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData);
44
45 private:
46 std::shared_ptr<MetaStore> metaStore_;
47 std::shared_ptr<Filter> filter_;
48 Observer observer_;
49 };
50
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer,bool isLocal)51 MetaObserver::MetaObserver(
52 std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer, bool isLocal)
53 : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
54 {
55 if (metaStore_ != nullptr) {
56 int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY
57 : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN);
58 auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
59 if (!isLocal) {
60 status = metaStore_->RegisterObserver(filter_->GetKey(), DistributedDB::OBSERVER_CHANGES_CLOUD, this);
61 }
62 if (status != DistributedDB::DBStatus::OK) {
63 ZLOGE("register meta observer failed :%{public}d.", status);
64 }
65 }
66 }
67
~MetaObserver()68 MetaObserver::~MetaObserver()
69 {
70 if (metaStore_ != nullptr) {
71 metaStore_->UnRegisterObserver(this);
72 }
73 }
74
operator ()(const std::string & key) const75 bool MetaDataManager::Filter::operator()(const std::string &key) const
76 {
77 return key.find(pattern_) == 0;
78 }
79
GetKey() const80 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
81 {
82 return std::vector<uint8_t>();
83 }
84
Filter(const std::string & pattern)85 MetaDataManager::Filter::Filter(const std::string &pattern) : pattern_(pattern)
86 {
87 }
88
OnChange(const DistributedDB::KvStoreChangedData & data)89 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
90 {
91 if (filter_ == nullptr) {
92 ZLOGE("filter_ is nullptr!");
93 return;
94 }
95 auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
96 int32_t next = MetaDataManager::INSERT;
97 for (auto value : values) {
98 int32_t action = next++;
99 if (value->empty()) {
100 continue;
101 }
102 for (const auto &entry : *value) {
103 std::string key(entry.key.begin(), entry.key.end());
104 if (!(*filter_)(key)) {
105 continue;
106 }
107 observer_(key, { entry.value.begin(), entry.value.end() }, action);
108 }
109 }
110 }
111
OnChange(DBOrigin origin,const std::string & originalId,DBChangeData && data)112 void MetaObserver::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data)
113 {
114 (void)origin;
115 (void)originalId;
116 HandleChanges(MetaDataManager::INSERT, data.primaryData[MetaDataManager::INSERT]);
117 HandleChanges(MetaDataManager::UPDATE, data.primaryData[MetaDataManager::UPDATE]);
118 HandleChanges(MetaDataManager::DELETE, data.primaryData[MetaDataManager::DELETE]);
119 }
120
HandleChanges(int32_t flag,std::vector<std::vector<Type>> & priData)121 void MetaObserver::HandleChanges(int32_t flag, std::vector<std::vector<Type>> &priData)
122 {
123 if (priData.empty()) {
124 return;
125 }
126 if (filter_ == nullptr) {
127 ZLOGE("filter_ is nullptr!");
128 return;
129 }
130 for (const auto &priKey : priData) {
131 if (priKey.empty()) {
132 continue;
133 }
134 auto strValue = std::get_if<std::string>(&priKey[0]);
135 if (strValue != nullptr) {
136 auto key = *strValue;
137 if (!(*filter_)(key)) {
138 continue;
139 }
140 observer_(key, "", flag);
141 }
142 }
143 }
144
GetInstance()145 MetaDataManager &MetaDataManager::GetInstance()
146 {
147 static MetaDataManager instance;
148 return instance;
149 }
150
151 MetaDataManager::MetaDataManager() = default;
152
~MetaDataManager()153 MetaDataManager::~MetaDataManager()
154 {
155 metaObservers_.Clear();
156 }
157
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup,const std::string & storeId)158 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup, const std::string &storeId)
159 {
160 if (metaStore == nullptr) {
161 return;
162 }
163
164 std::lock_guard<decltype(mutex_)> lg(mutex_);
165 if (inited_) {
166 return;
167 }
168 metaStore_ = std::move(metaStore);
169 backup_ = backup;
170 storeId_ = storeId;
171 inited_ = true;
172 }
173
SetSyncer(const Syncer & syncer)174 void MetaDataManager::SetSyncer(const Syncer &syncer)
175 {
176 if (metaStore_ == nullptr) {
177 return;
178 }
179 syncer_ = syncer;
180 }
181
SetCloudSyncer(const CloudSyncer & cloudSyncer)182 void MetaDataManager::SetCloudSyncer(const CloudSyncer &cloudSyncer)
183 {
184 if (metaStore_ == nullptr) {
185 return;
186 }
187 cloudSyncer_ = cloudSyncer;
188 }
189
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)190 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
191 {
192 if (!inited_) {
193 return false;
194 }
195
196 auto data = Serializable::Marshall(value);
197 auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
198 : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
199 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
200 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
201 status, isLocal, Anonymous::Change(key).c_str());
202 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
203 StopSA();
204 return false;
205 }
206 if (status == DistributedDB::DBStatus::OK && backup_) {
207 backup_(metaStore_);
208 }
209 if (!isLocal && cloudSyncer_) {
210 cloudSyncer_();
211 }
212 if (status != DistributedDB::DBStatus::OK) {
213 ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal,
214 Anonymous::Change(key).c_str());
215 }
216 DelCacheMeta(key, isLocal);
217 return status == DistributedDB::DBStatus::OK;
218 }
219
SaveMeta(const std::vector<Entry> & values,bool isLocal)220 bool MetaDataManager::SaveMeta(const std::vector<Entry> &values, bool isLocal)
221 {
222 if (!inited_) {
223 return false;
224 }
225 if (values.empty()) {
226 return true;
227 }
228 std::vector<DistributedDB::Entry> entries;
229 entries.reserve(values.size());
230 for (const auto &[key, value] : values) {
231 entries.push_back({ { key.begin(), key.end() }, { value.begin(), value.end() } });
232 DelCacheMeta(key, isLocal);
233 }
234 auto status = isLocal ? metaStore_->PutLocalBatch(entries) : metaStore_->PutBatch(entries);
235 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
236 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, size:%{public}zu", status, isLocal, values.size());
237 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
238 StopSA();
239 return false;
240 }
241 if (status == DistributedDB::DBStatus::OK && backup_) {
242 backup_(metaStore_);
243 }
244 if (!isLocal && cloudSyncer_) {
245 cloudSyncer_();
246 }
247 if (status != DistributedDB::DBStatus::OK) {
248 ZLOGE("failed! status:%{public}d isLocal:%{public}d, size:%{public}zu", status, isLocal, values.size());
249 }
250 return status == DistributedDB::DBStatus::OK;
251 }
252
LoadMeta(const std::string & key,Serializable & value,bool isLocal)253 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
254 {
255 if (!inited_) {
256 return false;
257 }
258 if (LoadCacheMeta(key, value, isLocal)) {
259 return true;
260 }
261 DistributedDB::Value data;
262 auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
263 : metaStore_->Get({ key.begin(), key.end() }, data);
264 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
265 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
266 status, isLocal, Anonymous::Change(key).c_str());
267 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
268 StopSA();
269 return false;
270 }
271 if (status != DistributedDB::DBStatus::OK) {
272 return false;
273 }
274 std::string tempdata(data.begin(), data.end());
275 SaveCacheMeta(key, tempdata, isLocal);
276 Serializable::Unmarshall(tempdata, value);
277 if (isLocal) {
278 data.assign(data.size(), 0);
279 }
280 return true;
281 }
282
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)283 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
284 {
285 std::vector<DistributedDB::Entry> dbEntries;
286 auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
287 : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
288 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
289 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d", status, isLocal);
290 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
291 StopSA();
292 return false;
293 }
294 if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
295 ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
296 status, isLocal);
297 return false;
298 }
299 entries.resize(dbEntries.size());
300 for (size_t i = 0; i < dbEntries.size(); ++i) {
301 entries[i] = std::move(dbEntries[i].value);
302 }
303 return true;
304 }
305
DelMeta(const std::string & key,bool isLocal)306 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
307 {
308 if (!inited_) {
309 return false;
310 }
311 DelCacheMeta(key, isLocal);
312 auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
313 : metaStore_->Delete({ key.begin(), key.end() });
314 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
315 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s",
316 status, isLocal, Anonymous::Change(key).c_str());
317 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
318 StopSA();
319 return false;
320 }
321 if (status == DistributedDB::DBStatus::OK && backup_) {
322 backup_(metaStore_);
323 }
324 if (!isLocal && cloudSyncer_) {
325 cloudSyncer_();
326 }
327 return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
328 }
329
DelMeta(const std::vector<std::string> & keys,bool isLocal)330 bool MetaDataManager::DelMeta(const std::vector<std::string> &keys, bool isLocal)
331 {
332 if (!inited_) {
333 return false;
334 }
335 if (keys.empty()) {
336 return true;
337 }
338 std::vector<DistributedDB::Key> dbKeys;
339 dbKeys.reserve(keys.size());
340 for (auto &key : keys) {
341 dbKeys.emplace_back(key.begin(), key.end());
342 DelCacheMeta(key, isLocal);
343 }
344 auto status = isLocal ? metaStore_->DeleteLocalBatch(dbKeys) : metaStore_->DeleteBatch(dbKeys);
345 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
346 ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key size:%{public}zu", status, isLocal,
347 dbKeys.size());
348 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
349 StopSA();
350 return false;
351 }
352 if (status == DistributedDB::DBStatus::OK && backup_) {
353 backup_(metaStore_);
354 }
355 if (!isLocal && cloudSyncer_) {
356 cloudSyncer_();
357 }
358 return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
359 }
360
Sync(const std::vector<std::string> & devices,OnComplete complete,bool wait,bool isRetry)361 bool MetaDataManager::Sync(const std::vector<std::string> &devices, OnComplete complete, bool wait, bool isRetry)
362 {
363 if (!inited_ || devices.empty()) {
364 return false;
365 }
366 DistributedDB::DeviceSyncOption syncOption;
367 syncOption.devices = devices;
368 syncOption.mode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL;
369 syncOption.isWait = wait;
370 syncOption.isRetry = isRetry;
371 auto status = metaStore_->Sync(syncOption,
372 [complete](const std::map<std::string, DistributedDB::DBStatus> &dbResults) {
373 if (complete == nullptr) {
374 return;
375 }
376 std::map<std::string, int32_t> results;
377 for (auto &[uuid, status] : dbResults) {
378 results.insert_or_assign(uuid, static_cast<int32_t>(status));
379 }
380 complete(results);
381 });
382 if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) {
383 ZLOGE("db corrupted! status:%{public}d", status);
384 CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_);
385 StopSA();
386 return false;
387 }
388 if (status != DistributedDB::OK) {
389 ZLOGW("meta data sync error %{public}d.", status);
390 }
391 return status == DistributedDB::OK;
392 }
393
Subscribe(std::shared_ptr<Filter> filter,Observer observer)394 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
395 {
396 if (!inited_) {
397 return false;
398 }
399
400 return metaObservers_.ComputeIfAbsent("", [this, &observer, &filter](const std::string &key) -> auto {
401 return std::make_shared<MetaObserver>(metaStore_, filter, observer);
402 });
403 }
404
Subscribe(std::string prefix,Observer observer,bool isLocal)405 bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal)
406 {
407 if (!inited_) {
408 return false;
409 }
410
411 return metaObservers_.ComputeIfAbsent(prefix, [this, isLocal, &observer, &prefix](const std::string &key) -> auto {
412 return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer, isLocal);
413 });
414 }
415
Unsubscribe(std::string filter)416 bool MetaDataManager::Unsubscribe(std::string filter)
417 {
418 if (!inited_) {
419 return false;
420 }
421
422 return metaObservers_.Erase(filter);
423 }
424
StopSA()425 void MetaDataManager::StopSA()
426 {
427 ZLOGI("stop distributeddata");
428 int err = raise(SIGKILL);
429 if (err < 0) {
430 ZLOGE("stop distributeddata failed, errCode: %{public}d", err);
431 }
432 }
433 } // namespace OHOS::DistributedData