• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "ObjectStoreManager"
17 
18 #include "object_manager.h"
19 
20 #include "bootstrap.h"
21 #include "checker/checker_manager.h"
22 #include "datetime_ex.h"
23 #include "kvstore_utils.h"
24 #include "log_print.h"
25 #include "object_data_listener.h"
26 
27 namespace OHOS {
28 namespace DistributedObject {
ObjectStoreManager()29 ObjectStoreManager::ObjectStoreManager() : timer_("CloseRetryTimer")
30 {
31     timer_.Setup();
32 }
33 
OpenObjectKvStore()34 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
35 {
36     DistributedDB::KvStoreNbDelegate *store = nullptr;
37     DistributedDB::KvStoreNbDelegate::Option option;
38     option.createDirByStoreIdOnly = true;
39     option.syncDualTupleMode = true;
40     option.secOption = { DistributedDB::S1, DistributedDB::ECE };
41     if (objectDataListener_ == nullptr) {
42         objectDataListener_ = new ObjectDataListener();
43     }
44     ZLOGD("start GetKvStore");
45     kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
46         [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
47             if (dbStatus != DistributedDB::DBStatus::OK) {
48                 ZLOGE("GetKvStore fail %{public}d", dbStatus);
49                 return;
50             }
51             ZLOGI("GetKvStore successsfully");
52             store = kvStoreNbDelegate;
53             std::vector<uint8_t> tmpKey;
54             DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
55                 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
56                 objectDataListener_);
57             if (status != DistributedDB::DBStatus::OK) {
58                 ZLOGE("RegisterObserver err %{public}d", status);
59             }
60         });
61     return store;
62 }
63 
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)64 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
65     const std::string &sessionId, const std::string &deviceId)
66 {
67     if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
68         return;
69     }
70     int32_t result = Open();
71     if (result != OBJECT_SUCCESS) {
72         ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
73         return;
74     }
75     // delete local data
76     result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
77     if (result != OBJECT_SUCCESS) {
78         ZLOGE("Save to store failed,please check DB status, status = %{public}d", result);
79     }
80     Close();
81     return;
82 }
83 
Save(const std::string & appId,const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & data,const std::string & deviceId,sptr<IObjectSaveCallback> & callback)84 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
85     const std::map<std::string, std::vector<uint8_t>> &data, const std::string &deviceId,
86     sptr<IObjectSaveCallback> &callback)
87 {
88     if (deviceId.size() == 0) {
89         ZLOGE("deviceId empty");
90         callback->Completed(std::map<std::string, int32_t>());
91         return INVALID_ARGUMENT;
92     }
93     int32_t result = Open();
94     if (result != OBJECT_SUCCESS) {
95         ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
96         callback->Completed(std::map<std::string, int32_t>());
97         return STORE_NOT_OPEN;
98     }
99 
100     ZLOGD("start SaveToStore");
101     result = SaveToStore(appId, sessionId, deviceId, data);
102     if (result != OBJECT_SUCCESS) {
103         ZLOGE("Save to store failed, please check DB errCode, errCode = %{public}d", result);
104         Close();
105         callback->Completed(std::map<std::string, int32_t>());
106         return result;
107     }
108     SyncCallBack tmp = [callback, appId, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
109         callback->Completed(results);
110         ProcessSyncCallback(results, appId, sessionId, deviceId);
111     };
112     ZLOGD("start SyncOnStore");
113     std::vector<std::string> deviceList = {deviceId};
114     result = SyncOnStore(GetPropertyPrefix(appId, sessionId, deviceId), deviceList, tmp);
115     if (result != OBJECT_SUCCESS) {
116         ZLOGI("sync on store failed,please check DB errCode, errCode = %{public}d", result);
117         callback->Completed(std::map<std::string, int32_t>());
118     }
119     Close();
120     return result;
121 }
122 
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IObjectRevokeSaveCallback> & callback)123 int32_t ObjectStoreManager::RevokeSave(
124     const std::string &appId, const std::string &sessionId, sptr<IObjectRevokeSaveCallback> &callback)
125 {
126     int32_t result = Open();
127     if (result != OBJECT_SUCCESS) {
128         ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
129         callback->Completed(STORE_NOT_OPEN);
130         return STORE_NOT_OPEN;
131     }
132 
133     result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
134     if (result != OBJECT_SUCCESS) {
135         ZLOGE("Save to store failed,please check DB errCode, errCode = %{public}d", result);
136         Close();
137         callback->Completed(result);
138         return result;
139     }
140     std::vector<std::string> deviceList;
141     auto deviceInfos = AppDistributedKv::CommunicationProvider::GetInstance().GetRemoteDevices();
142     std::for_each(deviceInfos.begin(), deviceInfos.end(),
143         [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
144     if (!deviceList.empty()) {
145         SyncCallBack tmp = [callback](const std::map<std::string, int32_t> &results) {
146             ZLOGI("revoke save finished");
147             callback->Completed(OBJECT_SUCCESS);
148         };
149         result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
150         if (result != OBJECT_SUCCESS) {
151             ZLOGE("sync on store failed,please check DB errCode, errCode = %{public}d", result);
152             callback->Completed(result);
153         }
154     } else {
155         callback->Completed(OBJECT_SUCCESS);
156     };
157     Close();
158     return result;
159 }
160 
Retrieve(const std::string & appId,const std::string & sessionId,sptr<IObjectRetrieveCallback> callback)161 int32_t ObjectStoreManager::Retrieve(
162     const std::string &appId, const std::string &sessionId, sptr<IObjectRetrieveCallback> callback)
163 {
164     ZLOGI("enter");
165     int32_t result = Open();
166     if (result != OBJECT_SUCCESS) {
167         ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
168         callback->Completed(std::map<std::string, std::vector<uint8_t>>());
169         return STORE_NOT_OPEN;
170     }
171 
172     std::map<std::string, std::vector<uint8_t>> results;
173     int32_t status = RetrieveFromStore(appId, sessionId, results);
174     if (status != OBJECT_SUCCESS) {
175         ZLOGE("Retrieve from store failed,please check DB status, status = %{public}d", status);
176         Close();
177         callback->Completed(std::map<std::string, std::vector<uint8_t>>());
178         return status;
179     }
180     // delete local data
181     status = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
182     if (status != OBJECT_SUCCESS) {
183         ZLOGE("revoke save to store failed,please check DB status, status = %{public}d", status);
184         Close();
185         callback->Completed(std::map<std::string, std::vector<uint8_t>>());
186         return status;
187     }
188     Close();
189     callback->Completed(results);
190     return status;
191 }
192 
Clear()193 int32_t ObjectStoreManager::Clear()
194 {
195     ZLOGI("enter");
196     int32_t result = Open();
197     if (result != OBJECT_SUCCESS) {
198         ZLOGE("Open objectStore DB failed,please check DB status");
199         return STORE_NOT_OPEN;
200     }
201     result = RevokeSaveToStore("");
202     Close();
203     return result;
204 }
205 
DeleteByAppId(const std::string & appId)206 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId)
207 {
208     ZLOGI("enter, %{public}s", appId.c_str());
209     int32_t result = Open();
210     if (result != OBJECT_SUCCESS) {
211         ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
212         return STORE_NOT_OPEN;
213     }
214     result = RevokeSaveToStore(appId);
215     if (result != OBJECT_SUCCESS) {
216         ZLOGE("RevokeSaveToStore failed");
217     }
218     Close();
219     return result;
220 }
221 
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IObjectChangeCallback> & callback)222 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
223                                                 pid_t pid, uint32_t tokenId,
224                                                 sptr <IObjectChangeCallback> &callback)
225 {
226     if (bundleName.empty() || sessionId.empty()) {
227         ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
228         return;
229     }
230     ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
231     std::string prefix = bundleName + sessionId;
232     callbacks_.Compute(tokenId, ([pid, &callback, &prefix](const uint32_t key, CallbackInfo &value) {
233         if (value.pid != pid) {
234             value = CallbackInfo { pid };
235         }
236         value.observers_.insert_or_assign(prefix, callback);
237         return !value.observers_.empty();
238     }));
239 }
240 
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)241 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
242                                                   const std::string &sessionId)
243 {
244     if (bundleName.empty()) {
245         ZLOGD("bundleName is empty");
246         return;
247     }
248     callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
249         if (value.pid != pid) {
250             return true;
251         }
252         if (sessionId.empty()) {
253             return false;
254         }
255         std::string prefix = bundleName + sessionId;
256         for (auto it = value.observers_.begin(); it != value.observers_.end();) {
257             if ((*it).first == prefix) {
258                 it = value.observers_.erase(it);
259             } else {
260                 ++it;
261             }
262         }
263         return true;
264     }));
265 }
266 
NotifyChange(std::map<std::string,std::vector<uint8_t>> & changedData)267 void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>> &changedData)
268 {
269     ZLOGD("ObjectStoreManager::NotifyChange start");
270     std::map<std::string, std::map<std::string, std::vector<uint8_t>>> data;
271     for (const auto &item : changedData) {
272         std::string prefix = GetBundleName(item.first) + GetSessionId(item.first);
273         std::string propertyName = GetPropertyName(item.first);
274         data[prefix].insert_or_assign(std::move(propertyName), std::move(item.second));
275     }
276 
277     callbacks_.ForEach([&data](uint32_t tokenId, CallbackInfo &value) {
278         for (const auto &observer : value.observers_) {
279             auto it = data.find(observer.first);
280             if (it == data.end()) {
281                 continue;
282             }
283             observer.second->Completed((*it).second);
284         }
285         return false;
286     });
287 }
288 
SetData(const std::string & dataDir,const std::string & userId)289 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
290 {
291     ZLOGI("enter %{public}s", dataDir.c_str());
292     kvStoreDelegateManager_ =
293         new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
294     DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
295     kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
296     userId_ = userId;
297 }
298 
Open()299 int32_t ObjectStoreManager::Open()
300 {
301     if (kvStoreDelegateManager_ == nullptr) {
302         ZLOGE("not init");
303         return OBJECT_INNER_ERROR;
304     }
305     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
306     if (delegate_ == nullptr) {
307         ZLOGI("open store");
308         delegate_ = OpenObjectKvStore();
309         if (delegate_ == nullptr) {
310             ZLOGE("open failed,please check DB status");
311             return OBJECT_DBSTATUS_ERROR;
312         }
313         syncCount_ = 1;
314     } else {
315         syncCount_++;
316         ZLOGI("syncCount = %{public}d", syncCount_);
317     }
318     return OBJECT_SUCCESS;
319 }
320 
Close()321 void ObjectStoreManager::Close()
322 {
323     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
324     if (delegate_ == nullptr) {
325         return;
326     }
327     syncCount_--;
328     ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
329     FlushClosedStore();
330 }
331 
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)332 void ObjectStoreManager::SyncCompleted(
333     const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
334 {
335     std::string userId;
336     SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
337     if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
338         std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
339         SetSyncStatus(false);
340         FlushClosedStore();
341     }
342 }
343 
FlushClosedStore()344 void ObjectStoreManager::FlushClosedStore()
345 {
346     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
347     if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
348         ZLOGD("close store");
349         auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
350         if (status != DistributedDB::DBStatus::OK) {
351             timer_.Register([this]() { FlushClosedStore(); }, 1000, true); // retry after 1000ms
352             ZLOGE("GetEntries fail %{public}d", status);
353             return;
354         }
355         delegate_ = nullptr;
356         if (objectDataListener_ != nullptr) {
357             delete objectDataListener_;
358             objectDataListener_ = nullptr;
359         }
360     }
361 }
362 
ProcessOldEntry(const std::string & appId)363 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
364 {
365     std::vector<DistributedDB::Entry> entries;
366     auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
367     if (status != DistributedDB::DBStatus::OK) {
368         ZLOGE("GetEntries fail %{public}d", status);
369         return;
370     }
371 
372     std::map<std::string, int64_t> sessionIds;
373     int64_t oldestTime = 0;
374     std::string deleteKey;
375     for (auto &item : entries) {
376         std::string key(item.key.begin(), item.key.end());
377         std::string id = GetSessionId(key);
378         if (sessionIds.count(id) == 0) {
379             sessionIds[id] = GetTime(key);
380         }
381         if (oldestTime == 0 || oldestTime > sessionIds[id]) {
382             oldestTime = sessionIds[id];
383             deleteKey = GetPrefixWithoutDeviceId(appId, id);
384         }
385     }
386     if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
387         return;
388     }
389     ZLOGI("app object is full, delete oldest one %{public}s", deleteKey.c_str());
390     int32_t result = RevokeSaveToStore(deleteKey);
391     if (result != OBJECT_SUCCESS) {
392         ZLOGE("RevokeSaveToStore fail %{public}d", result);
393         return;
394     }
395 }
396 
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const std::map<std::string,std::vector<uint8_t>> & data)397 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
398     const std::string &toDeviceId, const std::map<std::string, std::vector<uint8_t>> &data)
399 {
400     ProcessOldEntry(appId);
401     RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
402     std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
403     std::vector<DistributedDB::Entry> entries;
404     for (auto &item : data) {
405         DistributedDB::Entry entry;
406         std::string tmp = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
407         entry.key = std::vector<uint8_t>(tmp.begin(), tmp.end());
408         entry.value = item.second;
409         entries.emplace_back(entry);
410     }
411     auto status = delegate_->PutBatch(entries);
412     if (status != DistributedDB::DBStatus::OK) {
413         ZLOGE("putBatch fail %{public}d", status);
414     }
415     return status;
416 }
417 
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)418 int32_t ObjectStoreManager::SyncOnStore(
419     const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
420 {
421     std::vector<std::string> syncDevices;
422     for (auto &device : deviceList) {
423         // save to local, do not need sync
424         if (device == LOCAL_DEVICE) {
425             ZLOGI("save to local successful");
426             std::map<std::string, int32_t> result;
427             result[LOCAL_DEVICE] = OBJECT_SUCCESS;
428             callback(result);
429             return OBJECT_SUCCESS;
430         }
431         syncDevices.emplace_back(AppDistributedKv::CommunicationProvider::GetInstance().GetUuidByNodeId(device));
432     }
433     if (!syncDevices.empty()) {
434         uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
435         DistributedDB::Query dbQuery = DistributedDB::Query::Select();
436         dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
437         ZLOGD("start sync");
438         auto status = delegate_->Sync(
439             syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
440             [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
441                 ZLOGI("objectstore sync finished");
442                 std::map<std::string, DistributedDB::DBStatus> result;
443                 for (auto &item : devicesMap) {
444                     result[AppDistributedKv::CommunicationProvider::GetInstance().ToNodeId(item.first)] = item.second;
445                 }
446                 SyncCompleted(result, sequenceId);
447             },
448             dbQuery, false);
449         if (status != DistributedDB::DBStatus::OK) {
450             ZLOGE("sync error %{public}d", status);
451             std::string tmp;
452             SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
453             return status;
454         }
455         SetSyncStatus(true);
456     } else {
457         ZLOGI("single device");
458         callback(std::map<std::string, int32_t>());
459         return OBJECT_SUCCESS;
460     }
461     return OBJECT_SUCCESS;
462 }
463 
SetSyncStatus(bool status)464 int32_t ObjectStoreManager::SetSyncStatus(bool status)
465 {
466     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
467     isSyncing_ = status;
468     return OBJECT_SUCCESS;
469 }
470 
RevokeSaveToStore(const std::string & prefix)471 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
472 {
473     std::vector<DistributedDB::Entry> entries;
474     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
475     if (status == DistributedDB::DBStatus::NOT_FOUND) {
476         ZLOGI("not found entry");
477         return OBJECT_SUCCESS;
478     }
479     if (status != DistributedDB::DBStatus::OK) {
480         ZLOGE("GetEntries failed,please check DB status");
481         return DB_ERROR;
482     }
483     std::vector<std::vector<uint8_t>> keys;
484     std::for_each(
485         entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) { keys.emplace_back(entry.key); });
486     if (!keys.empty()) {
487         status = delegate_->DeleteBatch(keys);
488         if (status != DistributedDB::DBStatus::OK) {
489             ZLOGE("DeleteBatch failed,please check DB status, status = %{public}d", status);
490             return DB_ERROR;
491         }
492     }
493     return OBJECT_SUCCESS;
494 }
495 
RetrieveFromStore(const std::string & appId,const std::string & sessionId,std::map<std::string,std::vector<uint8_t>> & results)496 int32_t ObjectStoreManager::RetrieveFromStore(
497     const std::string &appId, const std::string &sessionId, std::map<std::string, std::vector<uint8_t>> &results)
498 {
499     std::vector<DistributedDB::Entry> entries;
500     std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
501     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
502     if (status != DistributedDB::DBStatus::OK) {
503         ZLOGE("GetEntries failed,please check DB status, status = %{public}d", status);
504         return DB_ERROR;
505     }
506     ZLOGI("GetEntries successfully");
507     std::for_each(entries.begin(), entries.end(), [&results, this](const DistributedDB::Entry &entry) {
508         std::string key(entry.key.begin(), entry.key.end());
509         results[GetPropertyName(key)] = entry.value;
510     });
511     return OBJECT_SUCCESS;
512 }
513 
ProcessKeyByIndex(std::string & key,uint8_t index)514 void ObjectStoreManager::ProcessKeyByIndex(std::string &key, uint8_t index)
515 {
516     std::size_t pos;
517     uint8_t i = 0;
518     do {
519         pos = key.find(SEPERATOR);
520         if (pos == std::string::npos) {
521             return;
522         }
523         key.erase(0, pos + 1);
524         i++;
525     } while (i < index);
526 }
527 
GetPropertyName(const std::string & key)528 std::string ObjectStoreManager::GetPropertyName(const std::string &key)
529 {
530     std::string result = key;
531     ProcessKeyByIndex(result, 5); // property name is after 5 '_'
532     return result;
533 }
534 
GetSessionId(const std::string & key)535 std::string ObjectStoreManager::GetSessionId(const std::string &key)
536 {
537     std::string result = key;
538     ProcessKeyByIndex(result, 1); // sessionId is after 1 '_'
539     auto pos = result.find(SEPERATOR);
540     if (pos == std::string::npos) {
541         return result;
542     }
543     result.erase(pos);
544     return result;
545 }
546 
GetTime(const std::string & key)547 int64_t ObjectStoreManager::GetTime(const std::string &key)
548 {
549     std::string result = key;
550     std::size_t pos;
551     int8_t i = 0;
552     do {
553         pos = result.find(SEPERATOR);
554         result.erase(0, pos + 1);
555         i++;
556     } while (pos != std::string::npos && i < 4); // time is after 4 '_'
557     pos = result.find(SEPERATOR);
558     result.erase(pos);
559     char *end = nullptr;
560     return std::strtol(result.c_str(), &end, DECIMAL_BASE);
561 }
562 
CloseAfterMinute()563 void ObjectStoreManager::CloseAfterMinute()
564 {
565     scheduler_.At(std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL),
566                   std::bind(&ObjectStoreManager::Close, this));
567 }
568 
GetBundleName(const std::string & key)569 std::string ObjectStoreManager::GetBundleName(const std::string &key)
570 {
571     std::size_t pos = key.find(SEPERATOR);
572     if (pos == std::string::npos) {
573         return std::string();
574     }
575     std::string result = key;
576     result.erase(pos);
577     return result;
578 }
579 
AddNotifier(const std::string & userId,SyncCallBack & callback)580 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
581 {
582     std::lock_guard<std::mutex> lock(notifierLock_);
583     uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
584     userIdSeqIdRelations_[userId].emplace_back(sequenceId);
585     seqIdCallbackRelations_[sequenceId] = callback;
586     return sequenceId;
587 }
588 
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)589 SequenceSyncManager::Result SequenceSyncManager::Process(
590     uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
591 {
592     std::lock_guard<std::mutex> lock(notifierLock_);
593     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
594         ZLOGE("not exist");
595         return ERR_SID_NOT_EXIST;
596     }
597     std::map<std::string, int32_t> syncResults;
598     for (auto &item : results) {
599         syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
600     }
601     seqIdCallbackRelations_[sequenceId](syncResults);
602     ZLOGD("end complete");
603     return DeleteNotifierNoLock(sequenceId, userId);
604 }
605 
DeleteNotifier(uint64_t sequenceId,std::string & userId)606 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
607 {
608     std::lock_guard<std::mutex> lock(notifierLock_);
609     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
610         ZLOGE("not exist");
611         return ERR_SID_NOT_EXIST;
612     }
613     return DeleteNotifierNoLock(sequenceId, userId);
614 }
615 
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)616 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
617 {
618     seqIdCallbackRelations_.erase(sequenceId);
619     auto userIdIter = userIdSeqIdRelations_.begin();
620     while (userIdIter != userIdSeqIdRelations_.end()) {
621         auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
622         if (sIdIter != userIdIter->second.end()) {
623             userIdIter->second.erase(sIdIter);
624             if (userIdIter->second.empty()) {
625                 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
626                 userId = userIdIter->first;
627                 userIdSeqIdRelations_.erase(userIdIter);
628                 return SUCCESS_USER_HAS_FINISHED;
629             } else {
630                 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
631                 return SUCCESS_USER_IN_USE;
632             }
633         }
634         userIdIter++;
635     }
636     return SUCCESS_USER_HAS_FINISHED;
637 }
638 } // namespace DistributedObject
639 } // namespace OHOS
640