• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "ObjectStoreManager"
17 
18 #include "object_manager.h"
19 
20 #include "bootstrap.h"
21 #include "checker/checker_manager.h"
22 #include "datetime_ex.h"
23 #include "kvstore_utils.h"
24 #include "log_print.h"
25 #include "object_data_listener.h"
26 
27 namespace OHOS {
28 namespace DistributedObject {
29 using namespace OHOS::DistributedKv;
ObjectStoreManager()30 ObjectStoreManager::ObjectStoreManager() {}
31 
OpenObjectKvStore()32 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
33 {
34     DistributedDB::KvStoreNbDelegate *store = nullptr;
35     DistributedDB::KvStoreNbDelegate::Option option;
36     option.createDirByStoreIdOnly = true;
37     option.syncDualTupleMode = true;
38     option.secOption = { DistributedDB::S1, DistributedDB::ECE };
39     if (objectDataListener_ == nullptr) {
40         objectDataListener_ = new ObjectDataListener();
41     }
42     ZLOGD("start GetKvStore");
43     kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
44         [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
45             if (dbStatus != DistributedDB::DBStatus::OK) {
46                 ZLOGE("GetKvStore fail %{public}d", dbStatus);
47                 return;
48             }
49             ZLOGI("GetKvStore successsfully");
50             store = kvStoreNbDelegate;
51             std::vector<uint8_t> tmpKey;
52             DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
53                 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
54                 objectDataListener_);
55             if (status != DistributedDB::DBStatus::OK) {
56                 ZLOGE("RegisterObserver err %{public}d", status);
57             }
58         });
59     return store;
60 }
61 
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)62 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
63     const std::string &sessionId, const std::string &deviceId)
64 {
65     if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
66         return;
67     }
68     int32_t result = Open();
69     if (result != OBJECT_SUCCESS) {
70         ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
71         return;
72     }
73     // delete local data
74     result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
75     if (result != OBJECT_SUCCESS) {
76         ZLOGE("Save to store failed,please check DB status, status = %{public}d", result);
77     }
78     Close();
79     return;
80 }
81 
Save(const std::string & appId,const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & data,const std::string & deviceId,sptr<IRemoteObject> callback)82 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
83     const std::map<std::string, std::vector<uint8_t>> &data, const std::string &deviceId,
84     sptr<IRemoteObject> callback)
85 {
86     auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
87     if (deviceId.size() == 0) {
88         ZLOGE("deviceId empty");
89         proxy->Completed(std::map<std::string, int32_t>());
90         return INVALID_ARGUMENT;
91     }
92     int32_t result = Open();
93     if (result != OBJECT_SUCCESS) {
94         ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
95         proxy->Completed(std::map<std::string, int32_t>());
96         return STORE_NOT_OPEN;
97     }
98 
99     ZLOGD("start SaveToStore");
100     result = SaveToStore(appId, sessionId, deviceId, data);
101     if (result != OBJECT_SUCCESS) {
102         ZLOGE("Save to store failed, please check DB errCode, errCode = %{public}d", result);
103         Close();
104         proxy->Completed(std::map<std::string, int32_t>());
105         return result;
106     }
107     SyncCallBack tmp = [proxy, appId, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
108         proxy->Completed(results);
109         ProcessSyncCallback(results, appId, sessionId, deviceId);
110     };
111     ZLOGD("start SyncOnStore");
112     std::vector<std::string> deviceList = {deviceId};
113     result = SyncOnStore(GetPropertyPrefix(appId, sessionId, deviceId), deviceList, tmp);
114     if (result != OBJECT_SUCCESS) {
115         ZLOGI("sync on store failed,please check DB errCode, errCode = %{public}d", result);
116         proxy->Completed(std::map<std::string, int32_t>());
117     }
118     Close();
119     return result;
120 }
121 
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)122 int32_t ObjectStoreManager::RevokeSave(
123     const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
124 {
125     auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
126     int32_t result = Open();
127     if (result != OBJECT_SUCCESS) {
128         ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
129         proxy->Completed(STORE_NOT_OPEN);
130         return STORE_NOT_OPEN;
131     }
132 
133     result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
134     if (result != OBJECT_SUCCESS) {
135         ZLOGE("Save to store failed,please check DB errCode, errCode = %{public}d", result);
136         Close();
137         proxy->Completed(result);
138         return result;
139     }
140     std::vector<std::string> deviceList;
141     auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
142     std::for_each(deviceInfos.begin(), deviceInfos.end(),
143         [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
144     if (!deviceList.empty()) {
145         SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
146             ZLOGI("revoke save finished");
147             proxy->Completed(OBJECT_SUCCESS);
148         };
149         result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
150         if (result != OBJECT_SUCCESS) {
151             ZLOGE("sync on store failed,please check DB errCode, errCode = %{public}d", result);
152             proxy->Completed(result);
153         }
154     } else {
155         proxy->Completed(OBJECT_SUCCESS);
156     };
157     Close();
158     return result;
159 }
160 
Retrieve(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)161 int32_t ObjectStoreManager::Retrieve(
162     const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
163 {
164     auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
165     ZLOGI("enter");
166     int32_t result = Open();
167     if (result != OBJECT_SUCCESS) {
168         ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
169         proxy->Completed(std::map<std::string, std::vector<uint8_t>>());
170         return STORE_NOT_OPEN;
171     }
172 
173     std::map<std::string, std::vector<uint8_t>> results;
174     int32_t status = RetrieveFromStore(appId, sessionId, results);
175     if (status != OBJECT_SUCCESS) {
176         ZLOGE("Retrieve from store failed,please check DB status, status = %{public}d", status);
177         Close();
178         proxy->Completed(std::map<std::string, std::vector<uint8_t>>());
179         return status;
180     }
181     // delete local data
182     status = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
183     if (status != OBJECT_SUCCESS) {
184         ZLOGE("revoke save to store failed,please check DB status, status = %{public}d", status);
185         Close();
186         proxy->Completed(std::map<std::string, std::vector<uint8_t>>());
187         return status;
188     }
189     Close();
190     proxy->Completed(results);
191     return status;
192 }
193 
Clear()194 int32_t ObjectStoreManager::Clear()
195 {
196     ZLOGI("enter");
197     int32_t result = Open();
198     if (result != OBJECT_SUCCESS) {
199         ZLOGE("Open objectStore DB failed,please check DB status");
200         return STORE_NOT_OPEN;
201     }
202     result = RevokeSaveToStore("");
203     Close();
204     return result;
205 }
206 
DeleteByAppId(const std::string & appId)207 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId)
208 {
209     ZLOGI("enter, %{public}s", appId.c_str());
210     int32_t result = Open();
211     if (result != OBJECT_SUCCESS) {
212         ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
213         return STORE_NOT_OPEN;
214     }
215     result = RevokeSaveToStore(appId);
216     if (result != OBJECT_SUCCESS) {
217         ZLOGE("RevokeSaveToStore failed");
218     }
219     Close();
220     return result;
221 }
222 
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)223 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
224                                                 pid_t pid, uint32_t tokenId,
225                                                 sptr<IRemoteObject> callback)
226 {
227     if (bundleName.empty() || sessionId.empty()) {
228         ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
229         return;
230     }
231     ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
232     auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
233     std::string prefix = bundleName + sessionId;
234     callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
235         if (value.pid != pid) {
236             value = CallbackInfo { pid };
237         }
238         value.observers_.insert_or_assign(prefix, proxy);
239         return !value.observers_.empty();
240     }));
241 }
242 
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)243 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
244                                                   const std::string &sessionId)
245 {
246     if (bundleName.empty()) {
247         ZLOGD("bundleName is empty");
248         return;
249     }
250     callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
251         if (value.pid != pid) {
252             return true;
253         }
254         if (sessionId.empty()) {
255             return false;
256         }
257         std::string prefix = bundleName + sessionId;
258         for (auto it = value.observers_.begin(); it != value.observers_.end();) {
259             if ((*it).first == prefix) {
260                 it = value.observers_.erase(it);
261             } else {
262                 ++it;
263             }
264         }
265         return true;
266     }));
267 }
268 
NotifyChange(std::map<std::string,std::vector<uint8_t>> & changedData)269 void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>> &changedData)
270 {
271     ZLOGD("ObjectStoreManager::NotifyChange start");
272     std::map<std::string, std::map<std::string, std::vector<uint8_t>>> data;
273     for (const auto &item : changedData) {
274         std::string prefix = GetBundleName(item.first) + GetSessionId(item.first);
275         std::string propertyName = GetPropertyName(item.first);
276         data[prefix].insert_or_assign(std::move(propertyName), std::move(item.second));
277     }
278 
279     callbacks_.ForEach([&data](uint32_t tokenId, CallbackInfo &value) {
280         for (const auto &observer : value.observers_) {
281             auto it = data.find(observer.first);
282             if (it == data.end()) {
283                 continue;
284             }
285             observer.second->Completed((*it).second);
286         }
287         return false;
288     });
289 }
290 
SetData(const std::string & dataDir,const std::string & userId)291 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
292 {
293     ZLOGI("enter %{public}s", dataDir.c_str());
294     kvStoreDelegateManager_ =
295         new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
296     DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
297     kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
298     userId_ = userId;
299 }
300 
Open()301 int32_t ObjectStoreManager::Open()
302 {
303     if (kvStoreDelegateManager_ == nullptr) {
304         ZLOGE("not init");
305         return OBJECT_INNER_ERROR;
306     }
307     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
308     if (delegate_ == nullptr) {
309         ZLOGI("open store");
310         delegate_ = OpenObjectKvStore();
311         if (delegate_ == nullptr) {
312             ZLOGE("open failed,please check DB status");
313             return OBJECT_DBSTATUS_ERROR;
314         }
315         syncCount_ = 1;
316     } else {
317         syncCount_++;
318         ZLOGI("syncCount = %{public}d", syncCount_);
319     }
320     return OBJECT_SUCCESS;
321 }
322 
Close()323 void ObjectStoreManager::Close()
324 {
325     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
326     if (delegate_ == nullptr) {
327         return;
328     }
329     syncCount_--;
330     ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
331     FlushClosedStore();
332 }
333 
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)334 void ObjectStoreManager::SyncCompleted(
335     const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
336 {
337     std::string userId;
338     SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
339     if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
340         std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
341         SetSyncStatus(false);
342         FlushClosedStore();
343     }
344 }
345 
FlushClosedStore()346 void ObjectStoreManager::FlushClosedStore()
347 {
348     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
349     if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
350         ZLOGD("close store");
351         auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
352         if (status != DistributedDB::DBStatus::OK) {
353             int timeOut = 1000;
354             executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
355                 FlushClosedStore();
356             });
357             ZLOGE("GetEntries fail %{public}d", status);
358             return;
359         }
360         delegate_ = nullptr;
361         if (objectDataListener_ != nullptr) {
362             delete objectDataListener_;
363             objectDataListener_ = nullptr;
364         }
365     }
366 }
367 
ProcessOldEntry(const std::string & appId)368 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
369 {
370     std::vector<DistributedDB::Entry> entries;
371     auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
372     if (status != DistributedDB::DBStatus::OK) {
373         ZLOGE("GetEntries fail %{public}d", status);
374         return;
375     }
376 
377     std::map<std::string, int64_t> sessionIds;
378     int64_t oldestTime = 0;
379     std::string deleteKey;
380     for (auto &item : entries) {
381         std::string key(item.key.begin(), item.key.end());
382         std::string id = GetSessionId(key);
383         if (sessionIds.count(id) == 0) {
384             sessionIds[id] = GetTime(key);
385         }
386         if (oldestTime == 0 || oldestTime > sessionIds[id]) {
387             oldestTime = sessionIds[id];
388             deleteKey = GetPrefixWithoutDeviceId(appId, id);
389         }
390     }
391     if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
392         return;
393     }
394     ZLOGI("app object is full, delete oldest one %{public}s", deleteKey.c_str());
395     int32_t result = RevokeSaveToStore(deleteKey);
396     if (result != OBJECT_SUCCESS) {
397         ZLOGE("RevokeSaveToStore fail %{public}d", result);
398         return;
399     }
400 }
401 
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const std::map<std::string,std::vector<uint8_t>> & data)402 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
403     const std::string &toDeviceId, const std::map<std::string, std::vector<uint8_t>> &data)
404 {
405     ProcessOldEntry(appId);
406     RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
407     std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
408     std::vector<DistributedDB::Entry> entries;
409     for (auto &item : data) {
410         DistributedDB::Entry entry;
411         std::string tmp = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
412         entry.key = std::vector<uint8_t>(tmp.begin(), tmp.end());
413         entry.value = item.second;
414         entries.emplace_back(entry);
415     }
416     auto status = delegate_->PutBatch(entries);
417     if (status != DistributedDB::DBStatus::OK) {
418         ZLOGE("putBatch fail %{public}d", status);
419     }
420     return status;
421 }
422 
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)423 int32_t ObjectStoreManager::SyncOnStore(
424     const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
425 {
426     std::vector<std::string> syncDevices;
427     for (auto &device : deviceList) {
428         // save to local, do not need sync
429         if (device == LOCAL_DEVICE) {
430             ZLOGI("save to local successful");
431             std::map<std::string, int32_t> result;
432             result[LOCAL_DEVICE] = OBJECT_SUCCESS;
433             callback(result);
434             return OBJECT_SUCCESS;
435         }
436         syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
437     }
438     if (!syncDevices.empty()) {
439         uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
440         DistributedDB::Query dbQuery = DistributedDB::Query::Select();
441         dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
442         ZLOGD("start sync");
443         auto status = delegate_->Sync(
444             syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
445             [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
446                 ZLOGI("objectstore sync finished");
447                 std::map<std::string, DistributedDB::DBStatus> result;
448                 for (auto &item : devicesMap) {
449                     result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
450                 }
451                 SyncCompleted(result, sequenceId);
452             },
453             dbQuery, false);
454         if (status != DistributedDB::DBStatus::OK) {
455             ZLOGE("sync error %{public}d", status);
456             std::string tmp;
457             SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
458             return status;
459         }
460         SetSyncStatus(true);
461     } else {
462         ZLOGI("single device");
463         callback(std::map<std::string, int32_t>());
464         return OBJECT_SUCCESS;
465     }
466     return OBJECT_SUCCESS;
467 }
468 
SetSyncStatus(bool status)469 int32_t ObjectStoreManager::SetSyncStatus(bool status)
470 {
471     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
472     isSyncing_ = status;
473     return OBJECT_SUCCESS;
474 }
475 
RevokeSaveToStore(const std::string & prefix)476 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
477 {
478     std::vector<DistributedDB::Entry> entries;
479     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
480     if (status == DistributedDB::DBStatus::NOT_FOUND) {
481         ZLOGI("not found entry");
482         return OBJECT_SUCCESS;
483     }
484     if (status != DistributedDB::DBStatus::OK) {
485         ZLOGE("GetEntries failed,please check DB status");
486         return DB_ERROR;
487     }
488     std::vector<std::vector<uint8_t>> keys;
489     std::for_each(
490         entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) { keys.emplace_back(entry.key); });
491     if (!keys.empty()) {
492         status = delegate_->DeleteBatch(keys);
493         if (status != DistributedDB::DBStatus::OK) {
494             ZLOGE("DeleteBatch failed,please check DB status, status = %{public}d", status);
495             return DB_ERROR;
496         }
497     }
498     return OBJECT_SUCCESS;
499 }
500 
RetrieveFromStore(const std::string & appId,const std::string & sessionId,std::map<std::string,std::vector<uint8_t>> & results)501 int32_t ObjectStoreManager::RetrieveFromStore(
502     const std::string &appId, const std::string &sessionId, std::map<std::string, std::vector<uint8_t>> &results)
503 {
504     std::vector<DistributedDB::Entry> entries;
505     std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
506     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
507     if (status != DistributedDB::DBStatus::OK) {
508         ZLOGE("GetEntries failed,please check DB status, status = %{public}d", status);
509         return DB_ERROR;
510     }
511     ZLOGI("GetEntries successfully");
512     std::for_each(entries.begin(), entries.end(), [&results, this](const DistributedDB::Entry &entry) {
513         std::string key(entry.key.begin(), entry.key.end());
514         results[GetPropertyName(key)] = entry.value;
515     });
516     return OBJECT_SUCCESS;
517 }
518 
ProcessKeyByIndex(std::string & key,uint8_t index)519 void ObjectStoreManager::ProcessKeyByIndex(std::string &key, uint8_t index)
520 {
521     std::size_t pos;
522     uint8_t i = 0;
523     do {
524         pos = key.find(SEPERATOR);
525         if (pos == std::string::npos) {
526             return;
527         }
528         key.erase(0, pos + 1);
529         i++;
530     } while (i < index);
531 }
532 
GetPropertyName(const std::string & key)533 std::string ObjectStoreManager::GetPropertyName(const std::string &key)
534 {
535     std::string result = key;
536     ProcessKeyByIndex(result, 5); // property name is after 5 '_'
537     return result;
538 }
539 
GetSessionId(const std::string & key)540 std::string ObjectStoreManager::GetSessionId(const std::string &key)
541 {
542     std::string result = key;
543     ProcessKeyByIndex(result, 1); // sessionId is after 1 '_'
544     auto pos = result.find(SEPERATOR);
545     if (pos == std::string::npos) {
546         return result;
547     }
548     result.erase(pos);
549     return result;
550 }
551 
GetTime(const std::string & key)552 int64_t ObjectStoreManager::GetTime(const std::string &key)
553 {
554     std::string result = key;
555     std::size_t pos;
556     int8_t i = 0;
557     do {
558         pos = result.find(SEPERATOR);
559         result.erase(0, pos + 1);
560         i++;
561     } while (pos != std::string::npos && i < 4); // time is after 4 '_'
562     pos = result.find(SEPERATOR);
563     result.erase(pos);
564     char *end = nullptr;
565     return std::strtol(result.c_str(), &end, DECIMAL_BASE);
566 }
567 
CloseAfterMinute()568 void ObjectStoreManager::CloseAfterMinute()
569 {
570     executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&ObjectStoreManager::Close, this));
571 }
572 
GetBundleName(const std::string & key)573 std::string ObjectStoreManager::GetBundleName(const std::string &key)
574 {
575     std::size_t pos = key.find(SEPERATOR);
576     if (pos == std::string::npos) {
577         return std::string();
578     }
579     std::string result = key;
580     result.erase(pos);
581     return result;
582 }
583 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)584 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
585 {
586     executors_ = executors;
587 }
588 
AddNotifier(const std::string & userId,SyncCallBack & callback)589 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
590 {
591     std::lock_guard<std::mutex> lock(notifierLock_);
592     uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
593     userIdSeqIdRelations_[userId].emplace_back(sequenceId);
594     seqIdCallbackRelations_[sequenceId] = callback;
595     return sequenceId;
596 }
597 
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)598 SequenceSyncManager::Result SequenceSyncManager::Process(
599     uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
600 {
601     std::lock_guard<std::mutex> lock(notifierLock_);
602     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
603         ZLOGE("not exist");
604         return ERR_SID_NOT_EXIST;
605     }
606     std::map<std::string, int32_t> syncResults;
607     for (auto &item : results) {
608         syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
609     }
610     seqIdCallbackRelations_[sequenceId](syncResults);
611     ZLOGD("end complete");
612     return DeleteNotifierNoLock(sequenceId, userId);
613 }
614 
DeleteNotifier(uint64_t sequenceId,std::string & userId)615 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
616 {
617     std::lock_guard<std::mutex> lock(notifierLock_);
618     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
619         ZLOGE("not exist");
620         return ERR_SID_NOT_EXIST;
621     }
622     return DeleteNotifierNoLock(sequenceId, userId);
623 }
624 
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)625 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
626 {
627     seqIdCallbackRelations_.erase(sequenceId);
628     auto userIdIter = userIdSeqIdRelations_.begin();
629     while (userIdIter != userIdSeqIdRelations_.end()) {
630         auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
631         if (sIdIter != userIdIter->second.end()) {
632             userIdIter->second.erase(sIdIter);
633             if (userIdIter->second.empty()) {
634                 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
635                 userId = userIdIter->first;
636                 userIdSeqIdRelations_.erase(userIdIter);
637                 return SUCCESS_USER_HAS_FINISHED;
638             } else {
639                 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
640                 return SUCCESS_USER_IN_USE;
641             }
642         }
643         userIdIter++;
644     }
645     return SUCCESS_USER_HAS_FINISHED;
646 }
647 } // namespace DistributedObject
648 } // namespace OHOS
649