1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "flat_object_storage_engine.h"
16
17 #include "anonymous.h"
18 #include "accesstoken_kit.h"
19 #include "ipc_skeleton.h"
20 #include "objectstore_errors.h"
21 #include "process_communicator_impl.h"
22 #include "softbus_adapter.h"
23 #include "string_utils.h"
24 #include "object_radar_reporter.h"
25
26 namespace OHOS::ObjectStore {
~FlatObjectStorageEngine()27 FlatObjectStorageEngine::~FlatObjectStorageEngine()
28 {
29 if (!isOpened_) {
30 return;
31 }
32 storeManager_ = nullptr;
33 LOG_INFO("FlatObjectStorageEngine::~FlatObjectStorageEngine Crash! end");
34 }
35
Open(const std::string & bundleName)36 uint32_t FlatObjectStorageEngine::Open(const std::string &bundleName)
37 {
38 if (isOpened_) {
39 LOG_INFO("FlatObjectDatabase: No need to reopen it");
40 return SUCCESS;
41 }
42 auto tokenId = IPCSkeleton::GetSelfTokenID();
43 int32_t ret = Security::AccessToken::AccessTokenKit::VerifyAccessToken(tokenId, DISTRIBUTED_DATASYNC);
44 LOG_INFO("bundleName:%{public}s, permission :%{public}d", bundleName.c_str(), ret);
45 if (ret == Security::AccessToken::PermissionState::PERMISSION_GRANTED) {
46 auto status = DistributedDB::KvStoreDelegateManager::SetProcessLabel("objectstoreDB", bundleName);
47 if (status != DistributedDB::DBStatus::OK) {
48 LOG_ERROR("delegate SetProcessLabel failed: %{public}d.", static_cast<int>(status));
49 }
50 status = DistributedDB::KvStoreDelegateManager::SetProcessCommunicator(
51 std::make_shared<ProcessCommunicatorImpl>());
52 if (status != DistributedDB::DBStatus::OK) {
53 LOG_ERROR("set distributed db communicator failed: %{public}d.", static_cast<int>(status));
54 }
55 }
56 storeManager_ = std::make_shared<DistributedDB::KvStoreDelegateManager>(bundleName, "default");
57 DistributedDB::KvStoreConfig config;
58 config.dataDir = "/data/log";
59 storeManager_->SetKvStoreConfig(config);
60 isOpened_ = true;
61 LOG_INFO("FlatObjectDatabase::Open Succeed");
62 return SUCCESS;
63 }
64
Close()65 uint32_t FlatObjectStorageEngine::Close()
66 {
67 if (!isOpened_) {
68 LOG_INFO("FlatObjectStorageEngine::Close has been closed!");
69 return SUCCESS;
70 }
71 std::lock_guard<std::mutex> lock(operationMutex_);
72 storeManager_ = nullptr;
73 isOpened_ = false;
74 return SUCCESS;
75 }
76
OnComplete(const std::string & key,const std::map<std::string,DistributedDB::DBStatus> & devices,std::shared_ptr<StatusWatcher> statusWatcher)77 void FlatObjectStorageEngine::OnComplete(const std::string &key,
78 const std::map<std::string, DistributedDB::DBStatus> &devices, std::shared_ptr<StatusWatcher> statusWatcher)
79 {
80 std::lock_guard<std::mutex> lock(watcherMutex_);
81 LOG_INFO("complete");
82 if (statusWatcher != nullptr) {
83 for (auto item : devices) {
84 statusWatcher->OnChanged(key, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
85 item.second == DistributedDB::OK ? "online" : "offline");
86 }
87 }
88 }
89
CreateTable(const std::string & key)90 uint32_t FlatObjectStorageEngine::CreateTable(const std::string &key)
91 {
92 RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, CREATE_TABLE, IDLE);
93 if (!isOpened_) {
94 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE,
95 RADAR_FAILED, DB_NOT_INIT, FINISHED);
96 return ERR_DB_NOT_INIT;
97 }
98 {
99 std::lock_guard<std::mutex> lock(operationMutex_);
100 if (delegates_.count(key) != 0) {
101 LOG_ERROR("table: %{public}s already created", Anonymous::Change(key).c_str());
102 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE, RADAR_FAILED,
103 DUPLICATE_CREATE, FINISHED);
104 return ERR_EXIST;
105 }
106 }
107 DistributedDB::KvStoreNbDelegate *kvStore = nullptr;
108 DistributedDB::DBStatus status;
109 DistributedDB::KvStoreNbDelegate::Option option = { true, true, false };
110 LOG_INFO("start create table");
111 storeManager_->GetKvStore(key, option,
112 [&status, &kvStore](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
113 status = dbStatus;
114 kvStore = kvStoreNbDelegate;
115 LOG_INFO("create table result %{public}d", status);
116 });
117 if (status != DistributedDB::DBStatus::OK || kvStore == nullptr) {
118 LOG_ERROR("GetKvStore fail[%{public}d], store:%{public}s", status, Anonymous::Change(key).c_str());
119 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE,
120 RADAR_FAILED, status, FINISHED);
121 return ERR_DB_GETKV_FAIL;
122 }
123 bool autoSync = true;
124 DistributedDB::PragmaData data = static_cast<DistributedDB::PragmaData>(&autoSync);
125 LOG_INFO("start Pragma");
126 status = kvStore->Pragma(DistributedDB::AUTO_SYNC, data);
127 if (status != DistributedDB::DBStatus::OK) {
128 LOG_ERROR("Set Pragma fail[%{public}d], store:%{public}s", status, Anonymous::Change(key).c_str());
129 RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE,
130 RADAR_FAILED, status, FINISHED);
131 return ERR_DB_GETKV_FAIL;
132 }
133 LOG_INFO("create table %{public}s success", Anonymous::Change(key).c_str());
134 {
135 std::lock_guard<std::mutex> lock(operationMutex_);
136 delegates_.insert_or_assign(key, kvStore);
137 }
138 auto onComplete = [key, this](const std::map<std::string, DistributedDB::DBStatus> &devices) {
139 OnComplete(key, devices, statusWatcher_);
140 };
141 std::vector<DeviceInfo> devices = SoftBusAdapter::GetInstance()->GetDeviceList();
142 std::vector<std::string> deviceIds;
143 for (auto item : devices) {
144 deviceIds.push_back(item.deviceId);
145 }
146 SyncAllData(key, deviceIds, onComplete);
147 RadarReporter::ReportStateFinished(std::string(__FUNCTION__), CREATE, CREATE_TABLE, RADAR_SUCCESS, FINISHED);
148 return SUCCESS;
149 }
150
GetTable(const std::string & key,std::map<std::string,Value> & result)151 uint32_t FlatObjectStorageEngine::GetTable(const std::string &key, std::map<std::string, Value> &result)
152 {
153 if (!isOpened_) {
154 LOG_ERROR("not opened %{public}s", key.c_str());
155 return ERR_DB_NOT_INIT;
156 }
157 std::lock_guard<std::mutex> lock(operationMutex_);
158 if (delegates_.count(key) == 0) {
159 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
160 return ERR_DB_NOT_EXIST;
161 }
162 result.clear();
163 DistributedDB::KvStoreResultSet *resultSet = nullptr;
164 Key emptyKey;
165 LOG_DEBUG("start GetEntries");
166 auto delegate = delegates_.at(key);
167 DistributedDB::DBStatus status = delegate->GetEntries(emptyKey, resultSet);
168 if (status != DistributedDB::DBStatus::OK || resultSet == nullptr) {
169 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s GetEntries fail", key.c_str());
170 return ERR_DB_GET_FAIL;
171 }
172 LOG_DEBUG("end GetEntries");
173 while (resultSet->IsAfterLast()) {
174 DistributedDB::Entry entry;
175 status = resultSet->GetEntry(entry);
176 if (status != DistributedDB::DBStatus::OK) {
177 LOG_INFO("FlatObjectStorageEngine::GetTable GetEntry fail, errcode = %{public}d", status);
178 status = delegate->CloseResultSet(resultSet);
179 if (status != DistributedDB::DBStatus::OK) {
180 LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
181 return ERR_RESULTSET;
182 }
183 return ERR_DB_ENTRY_FAIL;
184 }
185 result.insert_or_assign(StringUtils::BytesToStr(entry.key), entry.value);
186 resultSet->MoveToNext();
187 }
188 status = delegate->CloseResultSet(resultSet);
189 if (status != DistributedDB::DBStatus::OK) {
190 LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
191 return ERR_RESULTSET;
192 }
193 return SUCCESS;
194 }
195
UpdateItem(const std::string & key,const std::string & itemKey,Value & value)196 uint32_t FlatObjectStorageEngine::UpdateItem(const std::string &key, const std::string &itemKey, Value &value)
197 {
198 if (!isOpened_) {
199 return ERR_DB_NOT_INIT;
200 }
201 std::lock_guard<std::mutex> lock(operationMutex_);
202 if (delegates_.count(key) == 0) {
203 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
204 return ERR_DB_NOT_EXIST;
205 }
206 auto delegate = delegates_.at(key);
207 LOG_DEBUG("start Put");
208 auto status = delegate->Put(StringUtils::StrToBytes(itemKey), value);
209 if (status != DistributedDB::DBStatus::OK) {
210 LOG_ERROR("%{public}s Put fail[%{public}d]", key.c_str(), status);
211 return ERR_CLOSE_STORAGE;
212 }
213 LOG_DEBUG("put success");
214 return SUCCESS;
215 }
216
UpdateItems(const std::string & key,const std::map<std::string,std::vector<uint8_t>> & data)217 uint32_t FlatObjectStorageEngine::UpdateItems(
218 const std::string &key, const std::map<std::string, std::vector<uint8_t>> &data)
219 {
220 if (!isOpened_ || data.size() == 0) {
221 return ERR_DB_NOT_INIT;
222 }
223 std::lock_guard<std::mutex> lock(operationMutex_);
224 if (delegates_.count(key) == 0) {
225 LOG_INFO("FlatObjectStorageEngine::UpdateItems %{public}s not exist", key.c_str());
226 return ERR_DB_NOT_EXIST;
227 }
228
229 std::vector<DistributedDB::Entry> entries;
230 for (auto &item : data) {
231 DistributedDB::Entry entry = { .key = StringUtils::StrToBytes(item.first), .value = item.second };
232 entries.emplace_back(entry);
233 }
234 auto delegate = delegates_.at(key);
235 LOG_DEBUG("start PutBatch");
236 auto status = delegate->PutBatch(entries);
237 if (status != DistributedDB::DBStatus::OK) {
238 LOG_ERROR("%{public}s PutBatch fail[%{public}d]", key.c_str(), status);
239 return ERR_CLOSE_STORAGE;
240 }
241 LOG_DEBUG("put success");
242 return SUCCESS;
243 }
244
DeleteTable(const std::string & key)245 uint32_t FlatObjectStorageEngine::DeleteTable(const std::string &key)
246 {
247 if (!isOpened_) {
248 return ERR_DB_NOT_INIT;
249 }
250 std::lock_guard<std::mutex> lock(operationMutex_);
251 if (delegates_.count(key) == 0) {
252 LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
253 return ERR_DB_NOT_EXIST;
254 }
255 LOG_DEBUG("start DeleteTable %{public}s", key.c_str());
256 auto status = storeManager_->CloseKvStore(delegates_.at(key));
257 if (status != DistributedDB::DBStatus::OK) {
258 LOG_ERROR(
259 "FlatObjectStorageEngine::CloseKvStore %{public}s CloseKvStore fail[%{public}d]", key.c_str(), status);
260 return ERR_CLOSE_STORAGE;
261 }
262 LOG_DEBUG("DeleteTable success");
263 delegates_.erase(key);
264 return SUCCESS;
265 }
266
GetItem(const std::string & key,const std::string & itemKey,Value & value)267 uint32_t FlatObjectStorageEngine::GetItem(const std::string &key, const std::string &itemKey, Value &value)
268 {
269 if (!isOpened_) {
270 return ERR_DB_NOT_INIT;
271 }
272 std::lock_guard<std::mutex> lock(operationMutex_);
273 if (delegates_.count(key) == 0) {
274 LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s not exist", key.c_str());
275 return ERR_DB_NOT_EXIST;
276 }
277 LOG_DEBUG("start Get %{public}s", key.c_str());
278 DistributedDB::DBStatus status = delegates_.at(key)->Get(StringUtils::StrToBytes(itemKey), value);
279 if (status != DistributedDB::DBStatus::OK) {
280 LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s item fail %{public}d", itemKey.c_str(), status);
281 return status;
282 }
283 LOG_DEBUG("end Get %{public}s", key.c_str());
284 return SUCCESS;
285 }
286
RegisterObserver(const std::string & key,std::shared_ptr<TableWatcher> watcher)287 uint32_t FlatObjectStorageEngine::RegisterObserver(const std::string &key, std::shared_ptr<TableWatcher> watcher)
288 {
289 if (!isOpened_) {
290 LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
291 return ERR_DB_NOT_INIT;
292 }
293 std::lock_guard<std::mutex> lock(operationMutex_);
294 if (delegates_.count(key) == 0) {
295 LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
296 return ERR_DB_NOT_EXIST;
297 }
298 if (observerMap_.count(key) != 0) {
299 LOG_INFO("FlatObjectStorageEngine::RegisterObserver observer already exist.");
300 return SUCCESS;
301 }
302 auto delegate = delegates_.at(key);
303 std::vector<uint8_t> tmpKey;
304 LOG_DEBUG("start RegisterObserver %{public}s", key.c_str());
305 DistributedDB::DBStatus status =
306 delegate->RegisterObserver(tmpKey, DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN, watcher.get());
307 if (status != DistributedDB::DBStatus::OK) {
308 LOG_ERROR("FlatObjectStorageEngine::RegisterObserver watch err %{public}d", status);
309 return ERR_REGISTER;
310 }
311 LOG_DEBUG("end RegisterObserver %{public}s", key.c_str());
312 observerMap_.insert_or_assign(key, watcher);
313 return SUCCESS;
314 }
315
UnRegisterObserver(const std::string & key)316 uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
317 {
318 if (!isOpened_) {
319 LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
320 return ERR_DB_NOT_INIT;
321 }
322 std::lock_guard<std::mutex> lock(operationMutex_);
323 if (delegates_.count(key) == 0) {
324 LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
325 return ERR_DB_NOT_EXIST;
326 }
327 auto iter = observerMap_.find(key);
328 if (iter == observerMap_.end()) {
329 LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver observer not exist.");
330 return ERR_NO_OBSERVER;
331 }
332 auto delegate = delegates_.at(key);
333 std::shared_ptr<TableWatcher> watcher = iter->second;
334 LOG_DEBUG("start UnRegisterObserver %{public}s", key.c_str());
335 DistributedDB::DBStatus status = delegate->UnRegisterObserver(watcher.get());
336 if (status != DistributedDB::DBStatus::OK) {
337 LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
338 return ERR_UNRIGSTER;
339 }
340 LOG_DEBUG("end UnRegisterObserver %{public}s", key.c_str());
341 observerMap_.erase(key);
342 return SUCCESS;
343 }
344
SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)345 uint32_t FlatObjectStorageEngine::SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)
346 {
347 if (!isOpened_) {
348 LOG_ERROR("FlatObjectStorageEngine::SetStatusNotifier kvStore has not init");
349 return ERR_DB_NOT_INIT;
350 }
351 auto databaseStatusNotifyCallback = [this](std::string userId, std::string appId, std::string storeId,
352 const std::string deviceId, bool onlineStatus) -> void {
353 std::lock_guard<std::mutex> lock(watcherMutex_);
354 LOG_INFO("complete");
355 if (statusWatcher_ == nullptr) {
356 LOG_INFO("FlatObjectStorageEngine::statusWatcher_ null");
357 return;
358 }
359 if (onlineStatus) {
360 auto onComplete = [this, storeId](const std::map<std::string, DistributedDB::DBStatus> &devices) {
361 for (auto item : devices) {
362 LOG_INFO("%{public}s pull data result %{public}d in device %{public}s",
363 Anonymous::Change(storeId).c_str(), item.second,
364 Anonymous::Change(SoftBusAdapter::GetInstance()->ToNodeID(item.first)).c_str());
365 }
366 if (statusWatcher_ != nullptr) {
367 for (auto item : devices) {
368 statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
369 item.second == DistributedDB::OK ? "online" : "offline");
370 }
371 }
372 };
373 SyncAllData(storeId, std::vector<std::string>({ deviceId }), onComplete);
374 } else {
375 statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(deviceId), "offline");
376 }
377 };
378 storeManager_->SetStoreStatusNotifier(databaseStatusNotifyCallback);
379 LOG_INFO("FlatObjectStorageEngine::SetStatusNotifier success");
380 std::lock_guard<std::mutex> lock(watcherMutex_);
381 statusWatcher_ = watcher;
382 return SUCCESS;
383 }
384
SetProgressNotifier(std::shared_ptr<ProgressWatcher> watcher)385 uint32_t FlatObjectStorageEngine::SetProgressNotifier(std::shared_ptr<ProgressWatcher> watcher)
386 {
387 if (!isOpened_) {
388 LOG_ERROR("FlatObjectStorageEngine::SetProgressNotifier has not init");
389 return ERR_DB_NOT_INIT;
390 }
391 std::lock_guard<std::mutex> lock(progressMutex_);
392 progressWatcher_ = watcher;
393 return SUCCESS;
394 }
395
SyncAllData(const std::string & sessionId,const std::vector<std::string> & deviceIds,const std::function<void (const std::map<std::string,DistributedDB::DBStatus> &)> & onComplete)396 uint32_t FlatObjectStorageEngine::SyncAllData(const std::string &sessionId, const std::vector<std::string> &deviceIds,
397 const std::function<void(const std::map<std::string, DistributedDB::DBStatus> &)> &onComplete)
398 {
399 LOG_INFO("start");
400 std::lock_guard<std::mutex> lock(operationMutex_);
401 if (delegates_.count(sessionId) == 0) {
402 LOG_ERROR("%{public}s already deleted", Anonymous::Change(sessionId).c_str());
403 return ERR_DB_NOT_EXIST;
404 }
405 DistributedDB::KvStoreNbDelegate *kvstore = delegates_.at(sessionId);
406 if (deviceIds.empty()) {
407 LOG_INFO("single device,no need sync");
408 return ERR_SINGLE_DEVICE;
409 }
410 LOG_INFO("start sync %{public}s", Anonymous::Change(sessionId).c_str());
411 DistributedDB::DBStatus status = kvstore->Sync(deviceIds, DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY, onComplete);
412 if (status != DistributedDB::DBStatus::OK) {
413 LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
414 return ERR_UNRIGSTER;
415 }
416 LOG_INFO("end sync %{public}s", Anonymous::Change(sessionId).c_str());
417 return SUCCESS;
418 }
419
GetItems(const std::string & key,std::map<std::string,std::vector<uint8_t>> & data)420 uint32_t FlatObjectStorageEngine::GetItems(const std::string &key, std::map<std::string, std::vector<uint8_t>> &data)
421 {
422 if (!isOpened_) {
423 LOG_ERROR("GetItems %{public}s not init", Anonymous::Change(key).c_str());
424 return ERR_DB_NOT_INIT;
425 }
426 std::lock_guard<std::mutex> lock(operationMutex_);
427 if (delegates_.count(key) == 0) {
428 LOG_ERROR("GetItems %{public}s not exist", Anonymous::Change(key).c_str());
429 return ERR_DB_NOT_EXIST;
430 }
431 LOG_INFO("start Get %{public}s", Anonymous::Change(key).c_str());
432 std::vector<DistributedDB::Entry> entries;
433 DistributedDB::DBStatus status = delegates_.at(key)->GetEntries(StringUtils::StrToBytes(""), entries);
434 if (status != DistributedDB::DBStatus::OK) {
435 LOG_ERROR("FlatObjectStorageEngine::GetItems item fail status = %{public}d", status);
436 return status;
437 }
438 for (auto &item : entries) {
439 data[StringUtils::BytesToStr(item.key)] = item.value;
440 }
441 LOG_INFO("end Get %{public}s", Anonymous::Change(key).c_str());
442 return SUCCESS;
443 }
444
NotifyStatus(const std::string & sessionId,const std::string & deviceId,const std::string & status)445 void FlatObjectStorageEngine::NotifyStatus(const std::string &sessionId, const std::string &deviceId,
446 const std::string &status)
447 {
448 std::lock_guard<std::mutex> lock(watcherMutex_);
449 if (statusWatcher_ == nullptr) {
450 return;
451 }
452 statusWatcher_->OnChanged(sessionId, deviceId, status);
453 }
454
NotifyProgress(const std::string & sessionId,int32_t progress)455 bool FlatObjectStorageEngine::NotifyProgress(const std::string &sessionId, int32_t progress)
456 {
457 std::lock_guard<std::mutex> lock(progressMutex_);
458 if (progressWatcher_ == nullptr) {
459 return false;
460 }
461 progressWatcher_->OnChanged(sessionId, progress);
462 return true;
463 }
464
NotifyChange(const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & changedData)465 void FlatObjectStorageEngine::NotifyChange(
466 const std::string &sessionId, const std::map<std::string, std::vector<uint8_t>> &changedData)
467 {
468 std::lock_guard<std::mutex> lock(operationMutex_);
469 if (observerMap_.count(sessionId) == 0) {
470 return;
471 }
472 std::vector<std::string> data {};
473 for (const auto &item : changedData) {
474 std::string key = item.first;
475 if (key.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
476 key = key.substr(FIELDS_PREFIX_LEN);
477 }
478 data.push_back(key);
479 }
480 observerMap_[sessionId]->OnChanged(sessionId, data, false);
481 }
482
OnChange(const DistributedDB::KvStoreChangedData & data)483 void Watcher::OnChange(const DistributedDB::KvStoreChangedData &data)
484 {
485 std::vector<std::string> changedData;
486 std::string tmp;
487 for (DistributedDB::Entry item : data.GetEntriesInserted()) {
488 tmp = StringUtils::BytesToStr(item.key);
489 LOG_INFO("inserted %{public}s", tmp.c_str());
490 // property key start with p_, 2 is p_ size
491 if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
492 changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
493 }
494 }
495 for (DistributedDB::Entry item : data.GetEntriesUpdated()) {
496 tmp = StringUtils::BytesToStr(item.key);
497 LOG_INFO("updated %{public}s", tmp.c_str());
498 // property key start with p_, 2 is p_ size
499 if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
500 changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
501 }
502 }
503 this->OnChanged(sessionId_, changedData, true);
504 }
505
Watcher(const std::string & sessionId)506 Watcher::Watcher(const std::string &sessionId) : sessionId_(sessionId)
507 {
508 }
509 } // namespace OHOS::ObjectStore
510