/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "metadata/meta_data_manager.h" #include #define LOG_TAG "MetaDataManager" #include "kv_store_nb_delegate.h" #include "log_print.h" #include "utils/anonymous.h" namespace OHOS::DistributedData { class MetaObserver : public DistributedDB::KvStoreObserver { public: using Filter = MetaDataManager::Filter; using MetaStore = MetaDataManager::MetaStore; using Observer = MetaDataManager::Observer; using DBOrigin = DistributedDB::Origin; using DBChangeData = DistributedDB::ChangedData; using Type = DistributedDB::Type; MetaObserver(std::shared_ptr metaStore, std::shared_ptr filter, Observer observer, bool isLocal = false); virtual ~MetaObserver(); // Database change callback void OnChange(const DistributedDB::KvStoreChangedData &data) override; void OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) override; void HandleChanges(int32_t flag, std::vector> &priData); private: std::shared_ptr metaStore_; std::shared_ptr filter_; Observer observer_; }; MetaObserver::MetaObserver( std::shared_ptr metaStore, std::shared_ptr filter, Observer observer, bool isLocal) : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer)) { if (metaStore_ != nullptr) { int mode = isLocal ? DistributedDB::OBSERVER_CHANGES_LOCAL_ONLY : (DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN); auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this); if (!isLocal) { status = metaStore_->RegisterObserver(filter_->GetKey(), DistributedDB::OBSERVER_CHANGES_CLOUD, this); } if (status != DistributedDB::DBStatus::OK) { ZLOGE("register meta observer failed :%{public}d.", status); } } } MetaObserver::~MetaObserver() { if (metaStore_ != nullptr) { metaStore_->UnRegisterObserver(this); } } bool MetaDataManager::Filter::operator()(const std::string &key) const { return key.find(pattern_) == 0; } std::vector MetaDataManager::Filter::GetKey() const { return std::vector(); } MetaDataManager::Filter::Filter(const std::string &pattern) : pattern_(pattern) { } void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data) { if (filter_ == nullptr) { ZLOGE("filter_ is nullptr!"); return; } auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() }; int32_t next = MetaDataManager::INSERT; for (auto value : values) { int32_t action = next++; if (value->empty()) { continue; } for (const auto &entry : *value) { std::string key(entry.key.begin(), entry.key.end()); if (!(*filter_)(key)) { continue; } observer_(key, { entry.value.begin(), entry.value.end() }, action); } } } void MetaObserver::OnChange(DBOrigin origin, const std::string &originalId, DBChangeData &&data) { (void)origin; (void)originalId; HandleChanges(MetaDataManager::INSERT, data.primaryData[MetaDataManager::INSERT]); HandleChanges(MetaDataManager::UPDATE, data.primaryData[MetaDataManager::UPDATE]); HandleChanges(MetaDataManager::DELETE, data.primaryData[MetaDataManager::DELETE]); } void MetaObserver::HandleChanges(int32_t flag, std::vector> &priData) { if (priData.empty()) { return; } if (filter_ == nullptr) { ZLOGE("filter_ is nullptr!"); return; } for (const auto &priKey : priData) { if (priKey.empty()) { continue; } auto strValue = std::get_if(&priKey[0]); if (strValue != nullptr) { auto key = *strValue; if (!(*filter_)(key)) { continue; } observer_(key, "", flag); } } } MetaDataManager &MetaDataManager::GetInstance() { static MetaDataManager instance; return instance; } MetaDataManager::MetaDataManager() = default; MetaDataManager::~MetaDataManager() { metaObservers_.Clear(); } void MetaDataManager::Initialize(std::shared_ptr metaStore, const Backup &backup) { if (metaStore == nullptr) { return; } std::lock_guard lg(mutex_); if (inited_) { return; } metaStore_ = std::move(metaStore); backup_ = backup; inited_ = true; } void MetaDataManager::SetSyncer(const Syncer &syncer) { if (metaStore_ == nullptr) { return; } syncer_ = syncer; } void MetaDataManager::SetCloudSyncer(const CloudSyncer &cloudSyncer) { if (metaStore_ == nullptr) { return; } cloudSyncer_ = cloudSyncer; } bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal) { if (!inited_) { return false; } auto data = Serializable::Marshall(value); auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() }) : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() }); if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) { ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal, Anonymous::Change(key).c_str()); StopSA(); return false; } if (status == DistributedDB::DBStatus::OK && backup_) { backup_(metaStore_); } if (!isLocal && cloudSyncer_) { cloudSyncer_(); } if (status != DistributedDB::DBStatus::OK) { ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal, Anonymous::Change(key).c_str()); } return status == DistributedDB::DBStatus::OK; } bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal) { if (!inited_) { return false; } DistributedDB::Value data; auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data) : metaStore_->Get({ key.begin(), key.end() }, data); if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) { ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal, Anonymous::Change(key).c_str()); StopSA(); return false; } if (status != DistributedDB::DBStatus::OK) { return false; } Serializable::Unmarshall({ data.begin(), data.end() }, value); if (isLocal) { data.assign(data.size(), 0); } return true; } bool MetaDataManager::GetEntries(const std::string &prefix, std::vector &entries, bool isLocal) { std::vector dbEntries; auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries) : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries); if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) { ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d", status, isLocal); StopSA(); return false; } if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) { ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(), status, isLocal); return false; } entries.resize(dbEntries.size()); for (size_t i = 0; i < dbEntries.size(); ++i) { entries[i] = std::move(dbEntries[i].value); } return true; } bool MetaDataManager::DelMeta(const std::string &key, bool isLocal) { if (!inited_) { return false; } DistributedDB::Value data; auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() }) : metaStore_->Delete({ key.begin(), key.end() }); if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) { ZLOGE("db corrupted! status:%{public}d isLocal:%{public}d, key:%{public}s", status, isLocal, Anonymous::Change(key).c_str()); StopSA(); return false; } if (status == DistributedDB::DBStatus::OK && backup_) { backup_(metaStore_); } if (!isLocal && cloudSyncer_) { cloudSyncer_(); } return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND)); } bool MetaDataManager::Sync(const std::vector &devices, OnComplete complete) { if (!inited_ || devices.empty()) { return false; } auto status = metaStore_->Sync(devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) { std::map results; for (auto &[uuid, status] : dbResults) { results.insert_or_assign(uuid, static_cast(status)); } complete(results); }); if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) { ZLOGE("db corrupted! status:%{public}d", status); StopSA(); return false; } if (status != DistributedDB::OK) { ZLOGW("meta data sync error %{public}d.", status); } return status == DistributedDB::OK; } bool MetaDataManager::Subscribe(std::shared_ptr filter, Observer observer) { if (!inited_) { return false; } return metaObservers_.ComputeIfAbsent("", [this, &observer, &filter](const std::string &key) -> auto { return std::make_shared(metaStore_, filter, observer); }); } bool MetaDataManager::Subscribe(std::string prefix, Observer observer, bool isLocal) { if (!inited_) { return false; } return metaObservers_.ComputeIfAbsent(prefix, [this, isLocal, &observer, &prefix](const std::string &key) -> auto { return std::make_shared(metaStore_, std::make_shared(prefix), observer, isLocal); }); } bool MetaDataManager::Unsubscribe(std::string filter) { if (!inited_) { return false; } return metaObservers_.Erase(filter); } void MetaDataManager::StopSA() { ZLOGI("stop distributeddata"); int err = raise(SIGKILL); if (err < 0) { ZLOGE("stop distributeddata failed, errCode: %{public}d", err); } } } // namespace OHOS::DistributedData