• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "flat_object_storage_engine.h"
16 
17 #include "anonymous.h"
18 #include "accesstoken_kit.h"
19 #include "ipc_skeleton.h"
20 #include "objectstore_errors.h"
21 #include "process_communicator_impl.h"
22 #include "softbus_adapter.h"
23 #include "string_utils.h"
24 #include "object_radar_reporter.h"
25 
26 namespace OHOS::ObjectStore {
~FlatObjectStorageEngine()27 FlatObjectStorageEngine::~FlatObjectStorageEngine()
28 {
29     if (!isOpened_) {
30         return;
31     }
32     storeManager_ = nullptr;
33     LOG_INFO("FlatObjectStorageEngine::~FlatObjectStorageEngine Crash! end");
34 }
35 
Open(const std::string & bundleName)36 uint32_t FlatObjectStorageEngine::Open(const std::string &bundleName)
37 {
38     if (isOpened_) {
39         LOG_INFO("FlatObjectDatabase: No need to reopen it");
40         return SUCCESS;
41     }
42     auto tokenId = IPCSkeleton::GetSelfTokenID();
43     int32_t ret = Security::AccessToken::AccessTokenKit::VerifyAccessToken(tokenId, DISTRIBUTED_DATASYNC);
44     LOG_INFO("bundleName:%{public}s, permission :%{public}d", bundleName.c_str(), ret);
45     if (ret == Security::AccessToken::PermissionState::PERMISSION_GRANTED) {
46         auto status = DistributedDB::KvStoreDelegateManager::SetProcessLabel("objectstoreDB", bundleName);
47         if (status != DistributedDB::DBStatus::OK) {
48             LOG_ERROR("delegate SetProcessLabel failed: %{public}d.", static_cast<int>(status));
49         }
50         status = DistributedDB::KvStoreDelegateManager::SetProcessCommunicator(
51             std::make_shared<ProcessCommunicatorImpl>());
52         if (status != DistributedDB::DBStatus::OK) {
53             LOG_ERROR("set distributed db communicator failed: %{public}d.", static_cast<int>(status));
54         }
55     }
56     storeManager_ = std::make_shared<DistributedDB::KvStoreDelegateManager>(bundleName, "default");
57     DistributedDB::KvStoreConfig config;
58     config.dataDir = "/data/log";
59     storeManager_->SetKvStoreConfig(config);
60     isOpened_ = true;
61     LOG_INFO("FlatObjectDatabase::Open Succeed");
62     return SUCCESS;
63 }
64 
Close()65 uint32_t FlatObjectStorageEngine::Close()
66 {
67     if (!isOpened_) {
68         LOG_INFO("FlatObjectStorageEngine::Close has been closed!");
69         return SUCCESS;
70     }
71     std::lock_guard<std::mutex> lock(operationMutex_);
72     storeManager_ = nullptr;
73     isOpened_ = false;
74     return SUCCESS;
75 }
76 
OnComplete(const std::string & key,const std::map<std::string,DistributedDB::DBStatus> & devices,std::shared_ptr<StatusWatcher> statusWatcher)77 void FlatObjectStorageEngine::OnComplete(const std::string &key,
78     const std::map<std::string, DistributedDB::DBStatus> &devices, std::shared_ptr<StatusWatcher> statusWatcher)
79 {
80     std::lock_guard<std::mutex> lock(watcherMutex_);
81     LOG_INFO("complete");
82     if (statusWatcher != nullptr) {
83         for (auto item : devices) {
84             statusWatcher->OnChanged(key, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
85                 item.second == DistributedDB::OK ? "online" : "offline");
86         }
87     }
88 }
89 
CreateTable(const std::string & key)90 uint32_t FlatObjectStorageEngine::CreateTable(const std::string &key)
91 {
92     RadarReporter::ReportStage(std::string(__FUNCTION__), CREATE, CREATE_TABLE, IDLE);
93     if (!isOpened_) {
94         RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE,
95             RADAR_FAILED, DB_NOT_INIT, FINISHED);
96         return ERR_DB_NOT_INIT;
97     }
98     {
99         std::lock_guard<std::mutex> lock(operationMutex_);
100         if (delegates_.count(key) != 0) {
101             LOG_ERROR("table: %{public}s already created", Anonymous::Change(key).c_str());
102             RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE, RADAR_FAILED,
103                 DUPLICATE_CREATE, FINISHED);
104             return ERR_EXIST;
105         }
106     }
107     DistributedDB::KvStoreNbDelegate *kvStore = nullptr;
108     DistributedDB::DBStatus status;
109     DistributedDB::KvStoreNbDelegate::Option option = { true, true, false };
110     LOG_INFO("start create table");
111     storeManager_->GetKvStore(key, option,
112         [&status, &kvStore](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
113             status = dbStatus;
114             kvStore = kvStoreNbDelegate;
115             LOG_INFO("create table result %{public}d", status);
116         });
117     if (status != DistributedDB::DBStatus::OK || kvStore == nullptr) {
118         LOG_ERROR("GetKvStore fail[%{public}d], store:%{public}s", status, Anonymous::Change(key).c_str());
119         RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE,
120             RADAR_FAILED, status, FINISHED);
121         return ERR_DB_GETKV_FAIL;
122     }
123     bool autoSync = true;
124     DistributedDB::PragmaData data = static_cast<DistributedDB::PragmaData>(&autoSync);
125     LOG_INFO("start Pragma");
126     status = kvStore->Pragma(DistributedDB::AUTO_SYNC, data);
127     if (status != DistributedDB::DBStatus::OK) {
128         LOG_ERROR("Set Pragma fail[%{public}d], store:%{public}s", status, Anonymous::Change(key).c_str());
129         RadarReporter::ReportStateError(std::string(__FUNCTION__), CREATE, CREATE_TABLE,
130             RADAR_FAILED, status, FINISHED);
131         return ERR_DB_GETKV_FAIL;
132     }
133     LOG_INFO("create table %{public}s success", Anonymous::Change(key).c_str());
134     {
135         std::lock_guard<std::mutex> lock(operationMutex_);
136         delegates_.insert_or_assign(key, kvStore);
137     }
138     auto onComplete = [key, this](const std::map<std::string, DistributedDB::DBStatus> &devices) {
139         OnComplete(key, devices, statusWatcher_);
140     };
141     std::vector<DeviceInfo> devices = SoftBusAdapter::GetInstance()->GetDeviceList();
142     std::vector<std::string> deviceIds;
143     for (auto item : devices) {
144         deviceIds.push_back(item.deviceId);
145     }
146     SyncAllData(key, deviceIds, onComplete);
147     RadarReporter::ReportStateFinished(std::string(__FUNCTION__), CREATE, CREATE_TABLE, RADAR_SUCCESS, FINISHED);
148     return SUCCESS;
149 }
150 
GetTable(const std::string & key,std::map<std::string,Value> & result)151 uint32_t FlatObjectStorageEngine::GetTable(const std::string &key, std::map<std::string, Value> &result)
152 {
153     if (!isOpened_) {
154         LOG_ERROR("not opened %{public}s", key.c_str());
155         return ERR_DB_NOT_INIT;
156     }
157     std::lock_guard<std::mutex> lock(operationMutex_);
158     if (delegates_.count(key) == 0) {
159         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
160         return ERR_DB_NOT_EXIST;
161     }
162     result.clear();
163     DistributedDB::KvStoreResultSet *resultSet = nullptr;
164     Key emptyKey;
165     LOG_DEBUG("start GetEntries");
166     auto delegate = delegates_.at(key);
167     DistributedDB::DBStatus status = delegate->GetEntries(emptyKey, resultSet);
168     if (status != DistributedDB::DBStatus::OK || resultSet == nullptr) {
169         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s GetEntries fail", key.c_str());
170         return ERR_DB_GET_FAIL;
171     }
172     LOG_DEBUG("end GetEntries");
173     while (resultSet->IsAfterLast()) {
174         DistributedDB::Entry entry;
175         status = resultSet->GetEntry(entry);
176         if (status != DistributedDB::DBStatus::OK) {
177             LOG_INFO("FlatObjectStorageEngine::GetTable GetEntry fail, errcode = %{public}d", status);
178             status = delegate->CloseResultSet(resultSet);
179             if (status != DistributedDB::DBStatus::OK) {
180                 LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
181                 return ERR_RESULTSET;
182             }
183             return ERR_DB_ENTRY_FAIL;
184         }
185         result.insert_or_assign(StringUtils::BytesToStr(entry.key), entry.value);
186         resultSet->MoveToNext();
187     }
188     status = delegate->CloseResultSet(resultSet);
189     if (status != DistributedDB::DBStatus::OK) {
190         LOG_INFO("KvStoreNbDelegate::CloseResultSet fail, errcode = %{public}d", status);
191         return ERR_RESULTSET;
192     }
193     return SUCCESS;
194 }
195 
UpdateItem(const std::string & key,const std::string & itemKey,Value & value)196 uint32_t FlatObjectStorageEngine::UpdateItem(const std::string &key, const std::string &itemKey, Value &value)
197 {
198     if (!isOpened_) {
199         return ERR_DB_NOT_INIT;
200     }
201     std::lock_guard<std::mutex> lock(operationMutex_);
202     if (delegates_.count(key) == 0) {
203         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
204         return ERR_DB_NOT_EXIST;
205     }
206     auto delegate = delegates_.at(key);
207     LOG_DEBUG("start Put");
208     auto status = delegate->Put(StringUtils::StrToBytes(itemKey), value);
209     if (status != DistributedDB::DBStatus::OK) {
210         LOG_ERROR("%{public}s Put fail[%{public}d]", key.c_str(), status);
211         return ERR_CLOSE_STORAGE;
212     }
213     LOG_DEBUG("put success");
214     return SUCCESS;
215 }
216 
UpdateItems(const std::string & key,const std::map<std::string,std::vector<uint8_t>> & data)217 uint32_t FlatObjectStorageEngine::UpdateItems(
218     const std::string &key, const std::map<std::string, std::vector<uint8_t>> &data)
219 {
220     if (!isOpened_ || data.size() == 0) {
221         return ERR_DB_NOT_INIT;
222     }
223     std::lock_guard<std::mutex> lock(operationMutex_);
224     if (delegates_.count(key) == 0) {
225         LOG_INFO("FlatObjectStorageEngine::UpdateItems %{public}s not exist", key.c_str());
226         return ERR_DB_NOT_EXIST;
227     }
228 
229     std::vector<DistributedDB::Entry> entries;
230     for (auto &item : data) {
231         DistributedDB::Entry entry = { .key = StringUtils::StrToBytes(item.first), .value = item.second };
232         entries.emplace_back(entry);
233     }
234     auto delegate = delegates_.at(key);
235     LOG_DEBUG("start PutBatch");
236     auto status = delegate->PutBatch(entries);
237     if (status != DistributedDB::DBStatus::OK) {
238         LOG_ERROR("%{public}s PutBatch fail[%{public}d]", key.c_str(), status);
239         return ERR_CLOSE_STORAGE;
240     }
241     LOG_DEBUG("put success");
242     return SUCCESS;
243 }
244 
DeleteTable(const std::string & key)245 uint32_t FlatObjectStorageEngine::DeleteTable(const std::string &key)
246 {
247     if (!isOpened_) {
248         return ERR_DB_NOT_INIT;
249     }
250     std::lock_guard<std::mutex> lock(operationMutex_);
251     if (delegates_.count(key) == 0) {
252         LOG_INFO("FlatObjectStorageEngine::GetTable %{public}s not exist", key.c_str());
253         return ERR_DB_NOT_EXIST;
254     }
255     LOG_DEBUG("start DeleteTable %{public}s", key.c_str());
256     auto status = storeManager_->CloseKvStore(delegates_.at(key));
257     if (status != DistributedDB::DBStatus::OK) {
258         LOG_ERROR(
259             "FlatObjectStorageEngine::CloseKvStore %{public}s CloseKvStore fail[%{public}d]", key.c_str(), status);
260         return ERR_CLOSE_STORAGE;
261     }
262     LOG_DEBUG("DeleteTable success");
263     delegates_.erase(key);
264     return SUCCESS;
265 }
266 
GetItem(const std::string & key,const std::string & itemKey,Value & value)267 uint32_t FlatObjectStorageEngine::GetItem(const std::string &key, const std::string &itemKey, Value &value)
268 {
269     if (!isOpened_) {
270         return ERR_DB_NOT_INIT;
271     }
272     std::lock_guard<std::mutex> lock(operationMutex_);
273     if (delegates_.count(key) == 0) {
274         LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s not exist", key.c_str());
275         return ERR_DB_NOT_EXIST;
276     }
277     LOG_DEBUG("start Get %{public}s", key.c_str());
278     DistributedDB::DBStatus status = delegates_.at(key)->Get(StringUtils::StrToBytes(itemKey), value);
279     if (status != DistributedDB::DBStatus::OK) {
280         LOG_ERROR("FlatObjectStorageEngine::GetItem %{public}s item fail %{public}d", itemKey.c_str(), status);
281         return status;
282     }
283     LOG_DEBUG("end Get %{public}s", key.c_str());
284     return SUCCESS;
285 }
286 
RegisterObserver(const std::string & key,std::shared_ptr<TableWatcher> watcher)287 uint32_t FlatObjectStorageEngine::RegisterObserver(const std::string &key, std::shared_ptr<TableWatcher> watcher)
288 {
289     if (!isOpened_) {
290         LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
291         return ERR_DB_NOT_INIT;
292     }
293     std::lock_guard<std::mutex> lock(operationMutex_);
294     if (delegates_.count(key) == 0) {
295         LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
296         return ERR_DB_NOT_EXIST;
297     }
298     if (observerMap_.count(key) != 0) {
299         LOG_INFO("FlatObjectStorageEngine::RegisterObserver observer already exist.");
300         return SUCCESS;
301     }
302     auto delegate = delegates_.at(key);
303     std::vector<uint8_t> tmpKey;
304     LOG_DEBUG("start RegisterObserver %{public}s", key.c_str());
305     DistributedDB::DBStatus status =
306         delegate->RegisterObserver(tmpKey, DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN, watcher.get());
307     if (status != DistributedDB::DBStatus::OK) {
308         LOG_ERROR("FlatObjectStorageEngine::RegisterObserver watch err %{public}d", status);
309         return ERR_REGISTER;
310     }
311     LOG_DEBUG("end RegisterObserver %{public}s", key.c_str());
312     observerMap_.insert_or_assign(key, watcher);
313     return SUCCESS;
314 }
315 
UnRegisterObserver(const std::string & key)316 uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
317 {
318     if (!isOpened_) {
319         LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
320         return ERR_DB_NOT_INIT;
321     }
322     std::lock_guard<std::mutex> lock(operationMutex_);
323     if (delegates_.count(key) == 0) {
324         LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
325         return ERR_DB_NOT_EXIST;
326     }
327     auto iter = observerMap_.find(key);
328     if (iter == observerMap_.end()) {
329         LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver observer not exist.");
330         return ERR_NO_OBSERVER;
331     }
332     auto delegate = delegates_.at(key);
333     std::shared_ptr<TableWatcher> watcher = iter->second;
334     LOG_DEBUG("start UnRegisterObserver %{public}s", key.c_str());
335     DistributedDB::DBStatus status = delegate->UnRegisterObserver(watcher.get());
336     if (status != DistributedDB::DBStatus::OK) {
337         LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
338         return ERR_UNRIGSTER;
339     }
340     LOG_DEBUG("end UnRegisterObserver %{public}s", key.c_str());
341     observerMap_.erase(key);
342     return SUCCESS;
343 }
344 
SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)345 uint32_t FlatObjectStorageEngine::SetStatusNotifier(std::shared_ptr<StatusWatcher> watcher)
346 {
347     if (!isOpened_) {
348         LOG_ERROR("FlatObjectStorageEngine::SetStatusNotifier kvStore has not init");
349         return ERR_DB_NOT_INIT;
350     }
351     auto databaseStatusNotifyCallback = [this](std::string userId, std::string appId, std::string storeId,
352                                             const std::string deviceId, bool onlineStatus) -> void {
353         std::lock_guard<std::mutex> lock(watcherMutex_);
354         LOG_INFO("complete");
355         if (statusWatcher_ == nullptr) {
356             LOG_INFO("FlatObjectStorageEngine::statusWatcher_ null");
357             return;
358         }
359         if (onlineStatus) {
360             auto onComplete = [this, storeId](const std::map<std::string, DistributedDB::DBStatus> &devices) {
361                 for (auto item : devices) {
362                     LOG_INFO("%{public}s pull data result %{public}d in device %{public}s",
363                         Anonymous::Change(storeId).c_str(), item.second,
364                         Anonymous::Change(SoftBusAdapter::GetInstance()->ToNodeID(item.first)).c_str());
365                 }
366                 if (statusWatcher_ != nullptr) {
367                     for (auto item : devices) {
368                         statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(item.first),
369                             item.second == DistributedDB::OK ? "online" : "offline");
370                     }
371                 }
372             };
373             SyncAllData(storeId, std::vector<std::string>({ deviceId }), onComplete);
374         } else {
375             statusWatcher_->OnChanged(storeId, SoftBusAdapter::GetInstance()->ToNodeID(deviceId), "offline");
376         }
377     };
378     storeManager_->SetStoreStatusNotifier(databaseStatusNotifyCallback);
379     LOG_INFO("FlatObjectStorageEngine::SetStatusNotifier success");
380     std::lock_guard<std::mutex> lock(watcherMutex_);
381     statusWatcher_ = watcher;
382     return SUCCESS;
383 }
384 
SetProgressNotifier(std::shared_ptr<ProgressWatcher> watcher)385 uint32_t FlatObjectStorageEngine::SetProgressNotifier(std::shared_ptr<ProgressWatcher> watcher)
386 {
387     if (!isOpened_) {
388         LOG_ERROR("FlatObjectStorageEngine::SetProgressNotifier has not init");
389         return ERR_DB_NOT_INIT;
390     }
391     std::lock_guard<std::mutex> lock(progressMutex_);
392     progressWatcher_ = watcher;
393     return SUCCESS;
394 }
395 
SyncAllData(const std::string & sessionId,const std::vector<std::string> & deviceIds,const std::function<void (const std::map<std::string,DistributedDB::DBStatus> &)> & onComplete)396 uint32_t FlatObjectStorageEngine::SyncAllData(const std::string &sessionId, const std::vector<std::string> &deviceIds,
397     const std::function<void(const std::map<std::string, DistributedDB::DBStatus> &)> &onComplete)
398 {
399     LOG_INFO("start");
400     std::lock_guard<std::mutex> lock(operationMutex_);
401     if (delegates_.count(sessionId) == 0) {
402         LOG_ERROR("%{public}s already deleted", Anonymous::Change(sessionId).c_str());
403         return ERR_DB_NOT_EXIST;
404     }
405     DistributedDB::KvStoreNbDelegate *kvstore = delegates_.at(sessionId);
406     if (deviceIds.empty()) {
407         LOG_INFO("single device,no need sync");
408         return ERR_SINGLE_DEVICE;
409     }
410     LOG_INFO("start sync %{public}s", Anonymous::Change(sessionId).c_str());
411     DistributedDB::DBStatus status = kvstore->Sync(deviceIds, DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY, onComplete);
412     if (status != DistributedDB::DBStatus::OK) {
413         LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
414         return ERR_UNRIGSTER;
415     }
416     LOG_INFO("end sync %{public}s", Anonymous::Change(sessionId).c_str());
417     return SUCCESS;
418 }
419 
GetItems(const std::string & key,std::map<std::string,std::vector<uint8_t>> & data)420 uint32_t FlatObjectStorageEngine::GetItems(const std::string &key, std::map<std::string, std::vector<uint8_t>> &data)
421 {
422     if (!isOpened_) {
423         LOG_ERROR("GetItems %{public}s not init", Anonymous::Change(key).c_str());
424         return ERR_DB_NOT_INIT;
425     }
426     std::lock_guard<std::mutex> lock(operationMutex_);
427     if (delegates_.count(key) == 0) {
428         LOG_ERROR("GetItems %{public}s not exist", Anonymous::Change(key).c_str());
429         return ERR_DB_NOT_EXIST;
430     }
431     LOG_INFO("start Get %{public}s", Anonymous::Change(key).c_str());
432     std::vector<DistributedDB::Entry> entries;
433     DistributedDB::DBStatus status = delegates_.at(key)->GetEntries(StringUtils::StrToBytes(""), entries);
434     if (status != DistributedDB::DBStatus::OK) {
435         LOG_ERROR("FlatObjectStorageEngine::GetItems item fail status = %{public}d", status);
436         return status;
437     }
438     for (auto &item : entries) {
439         data[StringUtils::BytesToStr(item.key)] = item.value;
440     }
441     LOG_INFO("end Get %{public}s", Anonymous::Change(key).c_str());
442     return SUCCESS;
443 }
444 
NotifyStatus(const std::string & sessionId,const std::string & deviceId,const std::string & status)445 void FlatObjectStorageEngine::NotifyStatus(const std::string &sessionId, const std::string &deviceId,
446                                            const std::string &status)
447 {
448     std::lock_guard<std::mutex> lock(watcherMutex_);
449     if (statusWatcher_ == nullptr) {
450         return;
451     }
452     statusWatcher_->OnChanged(sessionId, deviceId, status);
453 }
454 
NotifyProgress(const std::string & sessionId,int32_t progress)455 bool FlatObjectStorageEngine::NotifyProgress(const std::string &sessionId, int32_t progress)
456 {
457     std::lock_guard<std::mutex> lock(progressMutex_);
458     if (progressWatcher_ == nullptr) {
459         return false;
460     }
461     progressWatcher_->OnChanged(sessionId, progress);
462     return true;
463 }
464 
NotifyChange(const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & changedData)465 void FlatObjectStorageEngine::NotifyChange(
466     const std::string &sessionId, const std::map<std::string, std::vector<uint8_t>> &changedData)
467 {
468     std::lock_guard<std::mutex> lock(operationMutex_);
469     if (observerMap_.count(sessionId) == 0) {
470         return;
471     }
472     std::vector<std::string> data {};
473     for (const auto &item : changedData) {
474         std::string key = item.first;
475         if (key.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
476             key = key.substr(FIELDS_PREFIX_LEN);
477         }
478         data.push_back(key);
479     }
480     observerMap_[sessionId]->OnChanged(sessionId, data, false);
481 }
482 
OnChange(const DistributedDB::KvStoreChangedData & data)483 void Watcher::OnChange(const DistributedDB::KvStoreChangedData &data)
484 {
485     std::vector<std::string> changedData;
486     std::string tmp;
487     for (DistributedDB::Entry item : data.GetEntriesInserted()) {
488         tmp = StringUtils::BytesToStr(item.key);
489         LOG_INFO("inserted %{public}s", tmp.c_str());
490         // property key start with p_, 2 is p_ size
491         if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
492             changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
493         }
494     }
495     for (DistributedDB::Entry item : data.GetEntriesUpdated()) {
496         tmp = StringUtils::BytesToStr(item.key);
497         LOG_INFO("updated %{public}s", tmp.c_str());
498         // property key start with p_, 2 is p_ size
499         if (tmp.compare(0, FIELDS_PREFIX_LEN, FIELDS_PREFIX) == 0) {
500             changedData.push_back(tmp.substr(FIELDS_PREFIX_LEN));
501         }
502     }
503     this->OnChanged(sessionId_, changedData, true);
504 }
505 
Watcher(const std::string & sessionId)506 Watcher::Watcher(const std::string &sessionId) : sessionId_(sessionId)
507 {
508 }
509 } // namespace OHOS::ObjectStore
510