1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "ObjectStoreManager"
17
18 #include "object_manager.h"
19
20 #include "bootstrap.h"
21 #include "checker/checker_manager.h"
22 #include "datetime_ex.h"
23 #include "kvstore_utils.h"
24 #include "log_print.h"
25 #include "object_data_listener.h"
26
27 namespace OHOS {
28 namespace DistributedObject {
ObjectStoreManager()29 ObjectStoreManager::ObjectStoreManager() : timer_("CloseRetryTimer")
30 {
31 timer_.Setup();
32 }
33
OpenObjectKvStore()34 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
35 {
36 DistributedDB::KvStoreNbDelegate *store = nullptr;
37 DistributedDB::KvStoreNbDelegate::Option option;
38 option.createDirByStoreIdOnly = true;
39 option.syncDualTupleMode = true;
40 option.secOption = { DistributedDB::S1, DistributedDB::ECE };
41 if (objectDataListener_ == nullptr) {
42 objectDataListener_ = new ObjectDataListener();
43 }
44 ZLOGD("start GetKvStore");
45 kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
46 [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
47 if (dbStatus != DistributedDB::DBStatus::OK) {
48 ZLOGE("GetKvStore fail %{public}d", dbStatus);
49 return;
50 }
51 ZLOGI("GetKvStore successsfully");
52 store = kvStoreNbDelegate;
53 std::vector<uint8_t> tmpKey;
54 DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
55 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
56 objectDataListener_);
57 if (status != DistributedDB::DBStatus::OK) {
58 ZLOGE("RegisterObserver err %{public}d", status);
59 }
60 });
61 return store;
62 }
63
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)64 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
65 const std::string &sessionId, const std::string &deviceId)
66 {
67 if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
68 return;
69 }
70 int32_t result = Open();
71 if (result != OBJECT_SUCCESS) {
72 ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
73 return;
74 }
75 // delete local data
76 result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
77 if (result != OBJECT_SUCCESS) {
78 ZLOGE("Save to store failed,please check DB status, status = %{public}d", result);
79 }
80 Close();
81 return;
82 }
83
Save(const std::string & appId,const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & data,const std::string & deviceId,sptr<IObjectSaveCallback> & callback)84 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
85 const std::map<std::string, std::vector<uint8_t>> &data, const std::string &deviceId,
86 sptr<IObjectSaveCallback> &callback)
87 {
88 if (deviceId.size() == 0) {
89 ZLOGE("deviceId empty");
90 callback->Completed(std::map<std::string, int32_t>());
91 return INVALID_ARGUMENT;
92 }
93 int32_t result = Open();
94 if (result != OBJECT_SUCCESS) {
95 ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
96 callback->Completed(std::map<std::string, int32_t>());
97 return STORE_NOT_OPEN;
98 }
99
100 ZLOGD("start SaveToStore");
101 result = SaveToStore(appId, sessionId, deviceId, data);
102 if (result != OBJECT_SUCCESS) {
103 ZLOGE("Save to store failed, please check DB errCode, errCode = %{public}d", result);
104 Close();
105 callback->Completed(std::map<std::string, int32_t>());
106 return result;
107 }
108 SyncCallBack tmp = [callback, appId, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
109 callback->Completed(results);
110 ProcessSyncCallback(results, appId, sessionId, deviceId);
111 };
112 ZLOGD("start SyncOnStore");
113 std::vector<std::string> deviceList = {deviceId};
114 result = SyncOnStore(GetPropertyPrefix(appId, sessionId, deviceId), deviceList, tmp);
115 if (result != OBJECT_SUCCESS) {
116 ZLOGI("sync on store failed,please check DB errCode, errCode = %{public}d", result);
117 callback->Completed(std::map<std::string, int32_t>());
118 }
119 Close();
120 return result;
121 }
122
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IObjectRevokeSaveCallback> & callback)123 int32_t ObjectStoreManager::RevokeSave(
124 const std::string &appId, const std::string &sessionId, sptr<IObjectRevokeSaveCallback> &callback)
125 {
126 int32_t result = Open();
127 if (result != OBJECT_SUCCESS) {
128 ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
129 callback->Completed(STORE_NOT_OPEN);
130 return STORE_NOT_OPEN;
131 }
132
133 result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
134 if (result != OBJECT_SUCCESS) {
135 ZLOGE("Save to store failed,please check DB errCode, errCode = %{public}d", result);
136 Close();
137 callback->Completed(result);
138 return result;
139 }
140 std::vector<std::string> deviceList;
141 auto deviceInfos = AppDistributedKv::CommunicationProvider::GetInstance().GetRemoteDevices();
142 std::for_each(deviceInfos.begin(), deviceInfos.end(),
143 [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
144 if (!deviceList.empty()) {
145 SyncCallBack tmp = [callback](const std::map<std::string, int32_t> &results) {
146 ZLOGI("revoke save finished");
147 callback->Completed(OBJECT_SUCCESS);
148 };
149 result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
150 if (result != OBJECT_SUCCESS) {
151 ZLOGE("sync on store failed,please check DB errCode, errCode = %{public}d", result);
152 callback->Completed(result);
153 }
154 } else {
155 callback->Completed(OBJECT_SUCCESS);
156 };
157 Close();
158 return result;
159 }
160
Retrieve(const std::string & appId,const std::string & sessionId,sptr<IObjectRetrieveCallback> callback)161 int32_t ObjectStoreManager::Retrieve(
162 const std::string &appId, const std::string &sessionId, sptr<IObjectRetrieveCallback> callback)
163 {
164 ZLOGI("enter");
165 int32_t result = Open();
166 if (result != OBJECT_SUCCESS) {
167 ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
168 callback->Completed(std::map<std::string, std::vector<uint8_t>>());
169 return STORE_NOT_OPEN;
170 }
171
172 std::map<std::string, std::vector<uint8_t>> results;
173 int32_t status = RetrieveFromStore(appId, sessionId, results);
174 if (status != OBJECT_SUCCESS) {
175 ZLOGE("Retrieve from store failed,please check DB status, status = %{public}d", status);
176 Close();
177 callback->Completed(std::map<std::string, std::vector<uint8_t>>());
178 return status;
179 }
180 // delete local data
181 status = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
182 if (status != OBJECT_SUCCESS) {
183 ZLOGE("revoke save to store failed,please check DB status, status = %{public}d", status);
184 Close();
185 callback->Completed(std::map<std::string, std::vector<uint8_t>>());
186 return status;
187 }
188 Close();
189 callback->Completed(results);
190 return status;
191 }
192
Clear()193 int32_t ObjectStoreManager::Clear()
194 {
195 ZLOGI("enter");
196 int32_t result = Open();
197 if (result != OBJECT_SUCCESS) {
198 ZLOGE("Open objectStore DB failed,please check DB status");
199 return STORE_NOT_OPEN;
200 }
201 result = RevokeSaveToStore("");
202 Close();
203 return result;
204 }
205
DeleteByAppId(const std::string & appId)206 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId)
207 {
208 ZLOGI("enter, %{public}s", appId.c_str());
209 int32_t result = Open();
210 if (result != OBJECT_SUCCESS) {
211 ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
212 return STORE_NOT_OPEN;
213 }
214 result = RevokeSaveToStore(appId);
215 if (result != OBJECT_SUCCESS) {
216 ZLOGE("RevokeSaveToStore failed");
217 }
218 Close();
219 return result;
220 }
221
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IObjectChangeCallback> & callback)222 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
223 pid_t pid, uint32_t tokenId,
224 sptr <IObjectChangeCallback> &callback)
225 {
226 if (bundleName.empty() || sessionId.empty()) {
227 ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
228 return;
229 }
230 ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
231 std::string prefix = bundleName + sessionId;
232 callbacks_.Compute(tokenId, ([pid, &callback, &prefix](const uint32_t key, CallbackInfo &value) {
233 if (value.pid != pid) {
234 value = CallbackInfo { pid };
235 }
236 value.observers_.insert_or_assign(prefix, callback);
237 return !value.observers_.empty();
238 }));
239 }
240
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)241 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
242 const std::string &sessionId)
243 {
244 if (bundleName.empty()) {
245 ZLOGD("bundleName is empty");
246 return;
247 }
248 callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
249 if (value.pid != pid) {
250 return true;
251 }
252 if (sessionId.empty()) {
253 return false;
254 }
255 std::string prefix = bundleName + sessionId;
256 for (auto it = value.observers_.begin(); it != value.observers_.end();) {
257 if ((*it).first == prefix) {
258 it = value.observers_.erase(it);
259 } else {
260 ++it;
261 }
262 }
263 return true;
264 }));
265 }
266
NotifyChange(std::map<std::string,std::vector<uint8_t>> & changedData)267 void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>> &changedData)
268 {
269 ZLOGD("ObjectStoreManager::NotifyChange start");
270 std::map<std::string, std::map<std::string, std::vector<uint8_t>>> data;
271 for (const auto &item : changedData) {
272 std::string prefix = GetBundleName(item.first) + GetSessionId(item.first);
273 std::string propertyName = GetPropertyName(item.first);
274 data[prefix].insert_or_assign(std::move(propertyName), std::move(item.second));
275 }
276
277 callbacks_.ForEach([&data](uint32_t tokenId, CallbackInfo &value) {
278 for (const auto &observer : value.observers_) {
279 auto it = data.find(observer.first);
280 if (it == data.end()) {
281 continue;
282 }
283 observer.second->Completed((*it).second);
284 }
285 return false;
286 });
287 }
288
SetData(const std::string & dataDir,const std::string & userId)289 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
290 {
291 ZLOGI("enter %{public}s", dataDir.c_str());
292 kvStoreDelegateManager_ =
293 new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
294 DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
295 kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
296 userId_ = userId;
297 }
298
Open()299 int32_t ObjectStoreManager::Open()
300 {
301 if (kvStoreDelegateManager_ == nullptr) {
302 ZLOGE("not init");
303 return OBJECT_INNER_ERROR;
304 }
305 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
306 if (delegate_ == nullptr) {
307 ZLOGI("open store");
308 delegate_ = OpenObjectKvStore();
309 if (delegate_ == nullptr) {
310 ZLOGE("open failed,please check DB status");
311 return OBJECT_DBSTATUS_ERROR;
312 }
313 syncCount_ = 1;
314 } else {
315 syncCount_++;
316 ZLOGI("syncCount = %{public}d", syncCount_);
317 }
318 return OBJECT_SUCCESS;
319 }
320
Close()321 void ObjectStoreManager::Close()
322 {
323 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
324 if (delegate_ == nullptr) {
325 return;
326 }
327 syncCount_--;
328 ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
329 FlushClosedStore();
330 }
331
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)332 void ObjectStoreManager::SyncCompleted(
333 const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
334 {
335 std::string userId;
336 SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
337 if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
338 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
339 SetSyncStatus(false);
340 FlushClosedStore();
341 }
342 }
343
FlushClosedStore()344 void ObjectStoreManager::FlushClosedStore()
345 {
346 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
347 if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
348 ZLOGD("close store");
349 auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
350 if (status != DistributedDB::DBStatus::OK) {
351 timer_.Register([this]() { FlushClosedStore(); }, 1000, true); // retry after 1000ms
352 ZLOGE("GetEntries fail %{public}d", status);
353 return;
354 }
355 delegate_ = nullptr;
356 if (objectDataListener_ != nullptr) {
357 delete objectDataListener_;
358 objectDataListener_ = nullptr;
359 }
360 }
361 }
362
ProcessOldEntry(const std::string & appId)363 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
364 {
365 std::vector<DistributedDB::Entry> entries;
366 auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
367 if (status != DistributedDB::DBStatus::OK) {
368 ZLOGE("GetEntries fail %{public}d", status);
369 return;
370 }
371
372 std::map<std::string, int64_t> sessionIds;
373 int64_t oldestTime = 0;
374 std::string deleteKey;
375 for (auto &item : entries) {
376 std::string key(item.key.begin(), item.key.end());
377 std::string id = GetSessionId(key);
378 if (sessionIds.count(id) == 0) {
379 sessionIds[id] = GetTime(key);
380 }
381 if (oldestTime == 0 || oldestTime > sessionIds[id]) {
382 oldestTime = sessionIds[id];
383 deleteKey = GetPrefixWithoutDeviceId(appId, id);
384 }
385 }
386 if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
387 return;
388 }
389 ZLOGI("app object is full, delete oldest one %{public}s", deleteKey.c_str());
390 int32_t result = RevokeSaveToStore(deleteKey);
391 if (result != OBJECT_SUCCESS) {
392 ZLOGE("RevokeSaveToStore fail %{public}d", result);
393 return;
394 }
395 }
396
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const std::map<std::string,std::vector<uint8_t>> & data)397 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
398 const std::string &toDeviceId, const std::map<std::string, std::vector<uint8_t>> &data)
399 {
400 ProcessOldEntry(appId);
401 RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
402 std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
403 std::vector<DistributedDB::Entry> entries;
404 for (auto &item : data) {
405 DistributedDB::Entry entry;
406 std::string tmp = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
407 entry.key = std::vector<uint8_t>(tmp.begin(), tmp.end());
408 entry.value = item.second;
409 entries.emplace_back(entry);
410 }
411 auto status = delegate_->PutBatch(entries);
412 if (status != DistributedDB::DBStatus::OK) {
413 ZLOGE("putBatch fail %{public}d", status);
414 }
415 return status;
416 }
417
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)418 int32_t ObjectStoreManager::SyncOnStore(
419 const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
420 {
421 std::vector<std::string> syncDevices;
422 for (auto &device : deviceList) {
423 // save to local, do not need sync
424 if (device == LOCAL_DEVICE) {
425 ZLOGI("save to local successful");
426 std::map<std::string, int32_t> result;
427 result[LOCAL_DEVICE] = OBJECT_SUCCESS;
428 callback(result);
429 return OBJECT_SUCCESS;
430 }
431 syncDevices.emplace_back(AppDistributedKv::CommunicationProvider::GetInstance().GetUuidByNodeId(device));
432 }
433 if (!syncDevices.empty()) {
434 uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
435 DistributedDB::Query dbQuery = DistributedDB::Query::Select();
436 dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
437 ZLOGD("start sync");
438 auto status = delegate_->Sync(
439 syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
440 [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
441 ZLOGI("objectstore sync finished");
442 std::map<std::string, DistributedDB::DBStatus> result;
443 for (auto &item : devicesMap) {
444 result[AppDistributedKv::CommunicationProvider::GetInstance().ToNodeId(item.first)] = item.second;
445 }
446 SyncCompleted(result, sequenceId);
447 },
448 dbQuery, false);
449 if (status != DistributedDB::DBStatus::OK) {
450 ZLOGE("sync error %{public}d", status);
451 std::string tmp;
452 SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
453 return status;
454 }
455 SetSyncStatus(true);
456 } else {
457 ZLOGI("single device");
458 callback(std::map<std::string, int32_t>());
459 return OBJECT_SUCCESS;
460 }
461 return OBJECT_SUCCESS;
462 }
463
SetSyncStatus(bool status)464 int32_t ObjectStoreManager::SetSyncStatus(bool status)
465 {
466 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
467 isSyncing_ = status;
468 return OBJECT_SUCCESS;
469 }
470
RevokeSaveToStore(const std::string & prefix)471 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
472 {
473 std::vector<DistributedDB::Entry> entries;
474 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
475 if (status == DistributedDB::DBStatus::NOT_FOUND) {
476 ZLOGI("not found entry");
477 return OBJECT_SUCCESS;
478 }
479 if (status != DistributedDB::DBStatus::OK) {
480 ZLOGE("GetEntries failed,please check DB status");
481 return DB_ERROR;
482 }
483 std::vector<std::vector<uint8_t>> keys;
484 std::for_each(
485 entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) { keys.emplace_back(entry.key); });
486 if (!keys.empty()) {
487 status = delegate_->DeleteBatch(keys);
488 if (status != DistributedDB::DBStatus::OK) {
489 ZLOGE("DeleteBatch failed,please check DB status, status = %{public}d", status);
490 return DB_ERROR;
491 }
492 }
493 return OBJECT_SUCCESS;
494 }
495
RetrieveFromStore(const std::string & appId,const std::string & sessionId,std::map<std::string,std::vector<uint8_t>> & results)496 int32_t ObjectStoreManager::RetrieveFromStore(
497 const std::string &appId, const std::string &sessionId, std::map<std::string, std::vector<uint8_t>> &results)
498 {
499 std::vector<DistributedDB::Entry> entries;
500 std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
501 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
502 if (status != DistributedDB::DBStatus::OK) {
503 ZLOGE("GetEntries failed,please check DB status, status = %{public}d", status);
504 return DB_ERROR;
505 }
506 ZLOGI("GetEntries successfully");
507 std::for_each(entries.begin(), entries.end(), [&results, this](const DistributedDB::Entry &entry) {
508 std::string key(entry.key.begin(), entry.key.end());
509 results[GetPropertyName(key)] = entry.value;
510 });
511 return OBJECT_SUCCESS;
512 }
513
ProcessKeyByIndex(std::string & key,uint8_t index)514 void ObjectStoreManager::ProcessKeyByIndex(std::string &key, uint8_t index)
515 {
516 std::size_t pos;
517 uint8_t i = 0;
518 do {
519 pos = key.find(SEPERATOR);
520 if (pos == std::string::npos) {
521 return;
522 }
523 key.erase(0, pos + 1);
524 i++;
525 } while (i < index);
526 }
527
GetPropertyName(const std::string & key)528 std::string ObjectStoreManager::GetPropertyName(const std::string &key)
529 {
530 std::string result = key;
531 ProcessKeyByIndex(result, 5); // property name is after 5 '_'
532 return result;
533 }
534
GetSessionId(const std::string & key)535 std::string ObjectStoreManager::GetSessionId(const std::string &key)
536 {
537 std::string result = key;
538 ProcessKeyByIndex(result, 1); // sessionId is after 1 '_'
539 auto pos = result.find(SEPERATOR);
540 if (pos == std::string::npos) {
541 return result;
542 }
543 result.erase(pos);
544 return result;
545 }
546
GetTime(const std::string & key)547 int64_t ObjectStoreManager::GetTime(const std::string &key)
548 {
549 std::string result = key;
550 std::size_t pos;
551 int8_t i = 0;
552 do {
553 pos = result.find(SEPERATOR);
554 result.erase(0, pos + 1);
555 i++;
556 } while (pos != std::string::npos && i < 4); // time is after 4 '_'
557 pos = result.find(SEPERATOR);
558 result.erase(pos);
559 char *end = nullptr;
560 return std::strtol(result.c_str(), &end, DECIMAL_BASE);
561 }
562
CloseAfterMinute()563 void ObjectStoreManager::CloseAfterMinute()
564 {
565 scheduler_.At(std::chrono::steady_clock::now() + std::chrono::minutes(INTERVAL),
566 std::bind(&ObjectStoreManager::Close, this));
567 }
568
GetBundleName(const std::string & key)569 std::string ObjectStoreManager::GetBundleName(const std::string &key)
570 {
571 std::size_t pos = key.find(SEPERATOR);
572 if (pos == std::string::npos) {
573 return std::string();
574 }
575 std::string result = key;
576 result.erase(pos);
577 return result;
578 }
579
AddNotifier(const std::string & userId,SyncCallBack & callback)580 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
581 {
582 std::lock_guard<std::mutex> lock(notifierLock_);
583 uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
584 userIdSeqIdRelations_[userId].emplace_back(sequenceId);
585 seqIdCallbackRelations_[sequenceId] = callback;
586 return sequenceId;
587 }
588
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)589 SequenceSyncManager::Result SequenceSyncManager::Process(
590 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
591 {
592 std::lock_guard<std::mutex> lock(notifierLock_);
593 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
594 ZLOGE("not exist");
595 return ERR_SID_NOT_EXIST;
596 }
597 std::map<std::string, int32_t> syncResults;
598 for (auto &item : results) {
599 syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
600 }
601 seqIdCallbackRelations_[sequenceId](syncResults);
602 ZLOGD("end complete");
603 return DeleteNotifierNoLock(sequenceId, userId);
604 }
605
DeleteNotifier(uint64_t sequenceId,std::string & userId)606 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
607 {
608 std::lock_guard<std::mutex> lock(notifierLock_);
609 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
610 ZLOGE("not exist");
611 return ERR_SID_NOT_EXIST;
612 }
613 return DeleteNotifierNoLock(sequenceId, userId);
614 }
615
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)616 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
617 {
618 seqIdCallbackRelations_.erase(sequenceId);
619 auto userIdIter = userIdSeqIdRelations_.begin();
620 while (userIdIter != userIdSeqIdRelations_.end()) {
621 auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
622 if (sIdIter != userIdIter->second.end()) {
623 userIdIter->second.erase(sIdIter);
624 if (userIdIter->second.empty()) {
625 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
626 userId = userIdIter->first;
627 userIdSeqIdRelations_.erase(userIdIter);
628 return SUCCESS_USER_HAS_FINISHED;
629 } else {
630 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
631 return SUCCESS_USER_IN_USE;
632 }
633 }
634 userIdIter++;
635 }
636 return SUCCESS_USER_HAS_FINISHED;
637 }
638 } // namespace DistributedObject
639 } // namespace OHOS
640