• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "metadata/meta_data_manager.h"
17 #define LOG_TAG "MetaDataManager"
18 
19 #include "kv_store_nb_delegate.h"
20 #include "log_print.h"
21 #include "utils/anonymous.h"
22 
23 namespace OHOS::DistributedData {
24 class MetaObserver : public DistributedDB::KvStoreObserver {
25 public:
26     using Filter = MetaDataManager::Filter;
27     using MetaStore = MetaDataManager::MetaStore;
28     using Observer = MetaDataManager::Observer;
29     MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer);
30     virtual ~MetaObserver();
31 
32     // Database change callback
33     void OnChange(const DistributedDB::KvStoreChangedData &data) override;
34 
35 private:
36     std::shared_ptr<MetaStore> metaStore_;
37     std::shared_ptr<Filter> filter_;
38     Observer observer_;
39 };
40 
MetaObserver(std::shared_ptr<MetaStore> metaStore,std::shared_ptr<Filter> filter,Observer observer)41 MetaObserver::MetaObserver(std::shared_ptr<MetaStore> metaStore, std::shared_ptr<Filter> filter, Observer observer)
42     : metaStore_(std::move(metaStore)), filter_(std::move(filter)), observer_(std::move(observer))
43 {
44     if (metaStore_ != nullptr) {
45         int mode = DistributedDB::OBSERVER_CHANGES_NATIVE | DistributedDB::OBSERVER_CHANGES_FOREIGN;
46         auto status = metaStore_->RegisterObserver(filter_->GetKey(), mode, this);
47         if (status != DistributedDB::DBStatus::OK) {
48             ZLOGE("register meta observer failed :%{public}d.", status);
49         }
50     }
51 }
52 
~MetaObserver()53 MetaObserver::~MetaObserver()
54 {
55     if (metaStore_ != nullptr) {
56         metaStore_->UnRegisterObserver(this);
57     }
58 }
59 
operator ()(const std::string & key) const60 bool MetaDataManager::Filter::operator()(const std::string &key) const
61 {
62     return key.find(pattern_) == 0;
63 }
64 
GetKey() const65 std::vector<uint8_t> MetaDataManager::Filter::GetKey() const
66 {
67     return std::vector<uint8_t>();
68 }
69 
Filter(const std::string & pattern)70 MetaDataManager::Filter::Filter(const std::string &pattern)
71     : pattern_(pattern)
72 {
73 }
74 
OnChange(const DistributedDB::KvStoreChangedData & data)75 void MetaObserver::OnChange(const DistributedDB::KvStoreChangedData &data)
76 {
77     auto values = { &data.GetEntriesInserted(), &data.GetEntriesUpdated(), &data.GetEntriesDeleted() };
78     int32_t next = MetaDataManager::INSERT;
79     for (auto value : values) {
80         int32_t action = next++;
81         if (value->empty()) {
82             continue;
83         }
84         for (const auto &entry : *value) {
85             std::string key(entry.key.begin(), entry.key.end());
86             if (!(*filter_)(key)) {
87                 continue;
88             }
89             observer_(key, { entry.value.begin(), entry.value.end() }, action);
90         }
91     }
92 }
93 
GetInstance()94 MetaDataManager &MetaDataManager::GetInstance()
95 {
96     static MetaDataManager instance;
97     return instance;
98 }
99 
100 MetaDataManager::MetaDataManager() = default;
101 
~MetaDataManager()102 MetaDataManager::~MetaDataManager()
103 {
104     metaObservers_.Clear();
105 }
106 
Initialize(std::shared_ptr<MetaStore> metaStore,const Backup & backup,const Syncer & syncer)107 void MetaDataManager::Initialize(std::shared_ptr<MetaStore> metaStore, const Backup &backup, const Syncer &syncer)
108 {
109     if (metaStore == nullptr) {
110         return;
111     }
112 
113     std::lock_guard<decltype(mutex_)> lg(mutex_);
114     if (inited_) {
115         return;
116     }
117     metaStore_ = std::move(metaStore);
118     backup_ = backup;
119     syncer_ = syncer;
120     inited_ = true;
121 }
122 
SaveMeta(const std::string & key,const Serializable & value,bool isLocal)123 bool MetaDataManager::SaveMeta(const std::string &key, const Serializable &value, bool isLocal)
124 {
125     if (!inited_) {
126         return false;
127     }
128 
129     auto data = Serializable::Marshall(value);
130     auto status = isLocal ? metaStore_->PutLocal({ key.begin(), key.end() }, { data.begin(), data.end() })
131                           : metaStore_->Put({ key.begin(), key.end() }, { data.begin(), data.end() });
132     if (status == DistributedDB::DBStatus::OK && backup_) {
133         backup_(metaStore_);
134     }
135     if (!isLocal && syncer_) {
136         syncer_(metaStore_, status);
137     }
138     if (status != DistributedDB::DBStatus::OK) {
139         ZLOGE("failed! status:%{public}d isLocal:%{public}d, key:%{public}s",
140             status, isLocal, Anonymous::Change(key).c_str());
141     }
142     return status == DistributedDB::DBStatus::OK;
143 }
144 
LoadMeta(const std::string & key,Serializable & value,bool isLocal)145 bool MetaDataManager::LoadMeta(const std::string &key, Serializable &value, bool isLocal)
146 {
147     if (!inited_) {
148         return false;
149     }
150 
151     DistributedDB::Value data;
152     auto status = isLocal ? metaStore_->GetLocal({ key.begin(), key.end() }, data)
153                           : metaStore_->Get({ key.begin(), key.end() }, data);
154     if (status != DistributedDB::DBStatus::OK) {
155         return false;
156     }
157     Serializable::Unmarshall({ data.begin(), data.end() }, value);
158     if (isLocal) {
159         data.assign(data.size(), 0);
160     }
161     return true;
162 }
163 
GetEntries(const std::string & prefix,std::vector<Bytes> & entries,bool isLocal)164 bool MetaDataManager::GetEntries(const std::string &prefix, std::vector<Bytes> &entries, bool isLocal)
165 {
166     std::vector<DistributedDB::Entry> dbEntries;
167     auto status = isLocal ? metaStore_->GetLocalEntries({ prefix.begin(), prefix.end() }, dbEntries)
168                           : metaStore_->GetEntries({ prefix.begin(), prefix.end() }, dbEntries);
169     if (status != DistributedDB::DBStatus::OK && status != DistributedDB::DBStatus::NOT_FOUND) {
170         ZLOGE("failed! prefix:%{public}s status:%{public}d isLocal:%{public}d", Anonymous::Change(prefix).c_str(),
171             status, isLocal);
172         return false;
173     }
174     entries.resize(dbEntries.size());
175     for (size_t i = 0; i < dbEntries.size(); ++i) {
176         entries[i] = std::move(dbEntries[i].value);
177     }
178     return true;
179 }
180 
DelMeta(const std::string & key,bool isLocal)181 bool MetaDataManager::DelMeta(const std::string &key, bool isLocal)
182 {
183     if (!inited_) {
184         return false;
185     }
186 
187     DistributedDB::Value data;
188     auto status = isLocal ? metaStore_->DeleteLocal({ key.begin(), key.end() })
189                           : metaStore_->Delete({ key.begin(), key.end() });
190     if (status == DistributedDB::DBStatus::OK && backup_) {
191         backup_(metaStore_);
192     }
193     if (!isLocal && syncer_) {
194         syncer_(metaStore_, status);
195     }
196     return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND));
197 }
198 
Subscribe(std::shared_ptr<Filter> filter,Observer observer)199 bool MetaDataManager::Subscribe(std::shared_ptr<Filter> filter, Observer observer)
200 {
201     if (!inited_) {
202         return false;
203     }
204 
205     return metaObservers_.ComputeIfAbsent(
206         "", [ this, &observer, &filter ](const std::string &key) -> auto {
207             return std::make_shared<MetaObserver>(metaStore_, filter, observer);
208         });
209 }
210 
Subscribe(std::string prefix,Observer observer)211 bool MetaDataManager::Subscribe(std::string prefix, Observer observer)
212 {
213     if (!inited_) {
214         return false;
215     }
216 
217     return metaObservers_.ComputeIfAbsent(
218         prefix, [ this, &observer, &prefix ](const std::string &key) -> auto {
219             return std::make_shared<MetaObserver>(metaStore_, std::make_shared<Filter>(prefix), observer);
220         });
221 }
222 
Unsubscribe(std::string filter)223 bool MetaDataManager::Unsubscribe(std::string filter)
224 {
225     if (!inited_) {
226         return false;
227     }
228 
229     return metaObservers_.Erase(filter);
230 }
231 } // namespace OHOS::DistributedData