1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "ObjectStoreManager"
17
18 #include "object_manager.h"
19
20 #include "bootstrap.h"
21 #include "checker/checker_manager.h"
22 #include "datetime_ex.h"
23 #include "kvstore_utils.h"
24 #include "log_print.h"
25 #include "object_data_listener.h"
26
27 namespace OHOS {
28 namespace DistributedObject {
29 using namespace OHOS::DistributedKv;
ObjectStoreManager()30 ObjectStoreManager::ObjectStoreManager() {}
31
OpenObjectKvStore()32 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
33 {
34 DistributedDB::KvStoreNbDelegate *store = nullptr;
35 DistributedDB::KvStoreNbDelegate::Option option;
36 option.createDirByStoreIdOnly = true;
37 option.syncDualTupleMode = true;
38 option.secOption = { DistributedDB::S1, DistributedDB::ECE };
39 if (objectDataListener_ == nullptr) {
40 objectDataListener_ = new ObjectDataListener();
41 }
42 ZLOGD("start GetKvStore");
43 kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
44 [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
45 if (dbStatus != DistributedDB::DBStatus::OK) {
46 ZLOGE("GetKvStore fail %{public}d", dbStatus);
47 return;
48 }
49 ZLOGI("GetKvStore successsfully");
50 store = kvStoreNbDelegate;
51 std::vector<uint8_t> tmpKey;
52 DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
53 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
54 objectDataListener_);
55 if (status != DistributedDB::DBStatus::OK) {
56 ZLOGE("RegisterObserver err %{public}d", status);
57 }
58 });
59 return store;
60 }
61
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)62 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
63 const std::string &sessionId, const std::string &deviceId)
64 {
65 if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
66 return;
67 }
68 int32_t result = Open();
69 if (result != OBJECT_SUCCESS) {
70 ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
71 return;
72 }
73 // delete local data
74 result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
75 if (result != OBJECT_SUCCESS) {
76 ZLOGE("Save to store failed,please check DB status, status = %{public}d", result);
77 }
78 Close();
79 return;
80 }
81
Save(const std::string & appId,const std::string & sessionId,const std::map<std::string,std::vector<uint8_t>> & data,const std::string & deviceId,sptr<IRemoteObject> callback)82 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
83 const std::map<std::string, std::vector<uint8_t>> &data, const std::string &deviceId,
84 sptr<IRemoteObject> callback)
85 {
86 auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
87 if (deviceId.size() == 0) {
88 ZLOGE("deviceId empty");
89 proxy->Completed(std::map<std::string, int32_t>());
90 return INVALID_ARGUMENT;
91 }
92 int32_t result = Open();
93 if (result != OBJECT_SUCCESS) {
94 ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
95 proxy->Completed(std::map<std::string, int32_t>());
96 return STORE_NOT_OPEN;
97 }
98
99 ZLOGD("start SaveToStore");
100 result = SaveToStore(appId, sessionId, deviceId, data);
101 if (result != OBJECT_SUCCESS) {
102 ZLOGE("Save to store failed, please check DB errCode, errCode = %{public}d", result);
103 Close();
104 proxy->Completed(std::map<std::string, int32_t>());
105 return result;
106 }
107 SyncCallBack tmp = [proxy, appId, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
108 proxy->Completed(results);
109 ProcessSyncCallback(results, appId, sessionId, deviceId);
110 };
111 ZLOGD("start SyncOnStore");
112 std::vector<std::string> deviceList = {deviceId};
113 result = SyncOnStore(GetPropertyPrefix(appId, sessionId, deviceId), deviceList, tmp);
114 if (result != OBJECT_SUCCESS) {
115 ZLOGI("sync on store failed,please check DB errCode, errCode = %{public}d", result);
116 proxy->Completed(std::map<std::string, int32_t>());
117 }
118 Close();
119 return result;
120 }
121
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)122 int32_t ObjectStoreManager::RevokeSave(
123 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
124 {
125 auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
126 int32_t result = Open();
127 if (result != OBJECT_SUCCESS) {
128 ZLOGE("Open objectStore DB failed,please check errCode, errCode = %{public}d", result);
129 proxy->Completed(STORE_NOT_OPEN);
130 return STORE_NOT_OPEN;
131 }
132
133 result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
134 if (result != OBJECT_SUCCESS) {
135 ZLOGE("Save to store failed,please check DB errCode, errCode = %{public}d", result);
136 Close();
137 proxy->Completed(result);
138 return result;
139 }
140 std::vector<std::string> deviceList;
141 auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
142 std::for_each(deviceInfos.begin(), deviceInfos.end(),
143 [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
144 if (!deviceList.empty()) {
145 SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
146 ZLOGI("revoke save finished");
147 proxy->Completed(OBJECT_SUCCESS);
148 };
149 result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
150 if (result != OBJECT_SUCCESS) {
151 ZLOGE("sync on store failed,please check DB errCode, errCode = %{public}d", result);
152 proxy->Completed(result);
153 }
154 } else {
155 proxy->Completed(OBJECT_SUCCESS);
156 };
157 Close();
158 return result;
159 }
160
Retrieve(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)161 int32_t ObjectStoreManager::Retrieve(
162 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
163 {
164 auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
165 ZLOGI("enter");
166 int32_t result = Open();
167 if (result != OBJECT_SUCCESS) {
168 ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
169 proxy->Completed(std::map<std::string, std::vector<uint8_t>>());
170 return STORE_NOT_OPEN;
171 }
172
173 std::map<std::string, std::vector<uint8_t>> results;
174 int32_t status = RetrieveFromStore(appId, sessionId, results);
175 if (status != OBJECT_SUCCESS) {
176 ZLOGE("Retrieve from store failed,please check DB status, status = %{public}d", status);
177 Close();
178 proxy->Completed(std::map<std::string, std::vector<uint8_t>>());
179 return status;
180 }
181 // delete local data
182 status = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
183 if (status != OBJECT_SUCCESS) {
184 ZLOGE("revoke save to store failed,please check DB status, status = %{public}d", status);
185 Close();
186 proxy->Completed(std::map<std::string, std::vector<uint8_t>>());
187 return status;
188 }
189 Close();
190 proxy->Completed(results);
191 return status;
192 }
193
Clear()194 int32_t ObjectStoreManager::Clear()
195 {
196 ZLOGI("enter");
197 int32_t result = Open();
198 if (result != OBJECT_SUCCESS) {
199 ZLOGE("Open objectStore DB failed,please check DB status");
200 return STORE_NOT_OPEN;
201 }
202 result = RevokeSaveToStore("");
203 Close();
204 return result;
205 }
206
DeleteByAppId(const std::string & appId)207 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId)
208 {
209 ZLOGI("enter, %{public}s", appId.c_str());
210 int32_t result = Open();
211 if (result != OBJECT_SUCCESS) {
212 ZLOGE("Open objectStore DB failed,please check DB errCode, errCode = %{public}d", result);
213 return STORE_NOT_OPEN;
214 }
215 result = RevokeSaveToStore(appId);
216 if (result != OBJECT_SUCCESS) {
217 ZLOGE("RevokeSaveToStore failed");
218 }
219 Close();
220 return result;
221 }
222
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)223 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
224 pid_t pid, uint32_t tokenId,
225 sptr<IRemoteObject> callback)
226 {
227 if (bundleName.empty() || sessionId.empty()) {
228 ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
229 return;
230 }
231 ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
232 auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
233 std::string prefix = bundleName + sessionId;
234 callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
235 if (value.pid != pid) {
236 value = CallbackInfo { pid };
237 }
238 value.observers_.insert_or_assign(prefix, proxy);
239 return !value.observers_.empty();
240 }));
241 }
242
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)243 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
244 const std::string &sessionId)
245 {
246 if (bundleName.empty()) {
247 ZLOGD("bundleName is empty");
248 return;
249 }
250 callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
251 if (value.pid != pid) {
252 return true;
253 }
254 if (sessionId.empty()) {
255 return false;
256 }
257 std::string prefix = bundleName + sessionId;
258 for (auto it = value.observers_.begin(); it != value.observers_.end();) {
259 if ((*it).first == prefix) {
260 it = value.observers_.erase(it);
261 } else {
262 ++it;
263 }
264 }
265 return true;
266 }));
267 }
268
NotifyChange(std::map<std::string,std::vector<uint8_t>> & changedData)269 void ObjectStoreManager::NotifyChange(std::map<std::string, std::vector<uint8_t>> &changedData)
270 {
271 ZLOGD("ObjectStoreManager::NotifyChange start");
272 std::map<std::string, std::map<std::string, std::vector<uint8_t>>> data;
273 for (const auto &item : changedData) {
274 std::string prefix = GetBundleName(item.first) + GetSessionId(item.first);
275 std::string propertyName = GetPropertyName(item.first);
276 data[prefix].insert_or_assign(std::move(propertyName), std::move(item.second));
277 }
278
279 callbacks_.ForEach([&data](uint32_t tokenId, CallbackInfo &value) {
280 for (const auto &observer : value.observers_) {
281 auto it = data.find(observer.first);
282 if (it == data.end()) {
283 continue;
284 }
285 observer.second->Completed((*it).second);
286 }
287 return false;
288 });
289 }
290
SetData(const std::string & dataDir,const std::string & userId)291 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
292 {
293 ZLOGI("enter %{public}s", dataDir.c_str());
294 kvStoreDelegateManager_ =
295 new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
296 DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
297 kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
298 userId_ = userId;
299 }
300
Open()301 int32_t ObjectStoreManager::Open()
302 {
303 if (kvStoreDelegateManager_ == nullptr) {
304 ZLOGE("not init");
305 return OBJECT_INNER_ERROR;
306 }
307 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
308 if (delegate_ == nullptr) {
309 ZLOGI("open store");
310 delegate_ = OpenObjectKvStore();
311 if (delegate_ == nullptr) {
312 ZLOGE("open failed,please check DB status");
313 return OBJECT_DBSTATUS_ERROR;
314 }
315 syncCount_ = 1;
316 } else {
317 syncCount_++;
318 ZLOGI("syncCount = %{public}d", syncCount_);
319 }
320 return OBJECT_SUCCESS;
321 }
322
Close()323 void ObjectStoreManager::Close()
324 {
325 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
326 if (delegate_ == nullptr) {
327 return;
328 }
329 syncCount_--;
330 ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
331 FlushClosedStore();
332 }
333
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)334 void ObjectStoreManager::SyncCompleted(
335 const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
336 {
337 std::string userId;
338 SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
339 if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
340 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
341 SetSyncStatus(false);
342 FlushClosedStore();
343 }
344 }
345
FlushClosedStore()346 void ObjectStoreManager::FlushClosedStore()
347 {
348 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
349 if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
350 ZLOGD("close store");
351 auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
352 if (status != DistributedDB::DBStatus::OK) {
353 int timeOut = 1000;
354 executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
355 FlushClosedStore();
356 });
357 ZLOGE("GetEntries fail %{public}d", status);
358 return;
359 }
360 delegate_ = nullptr;
361 if (objectDataListener_ != nullptr) {
362 delete objectDataListener_;
363 objectDataListener_ = nullptr;
364 }
365 }
366 }
367
ProcessOldEntry(const std::string & appId)368 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
369 {
370 std::vector<DistributedDB::Entry> entries;
371 auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
372 if (status != DistributedDB::DBStatus::OK) {
373 ZLOGE("GetEntries fail %{public}d", status);
374 return;
375 }
376
377 std::map<std::string, int64_t> sessionIds;
378 int64_t oldestTime = 0;
379 std::string deleteKey;
380 for (auto &item : entries) {
381 std::string key(item.key.begin(), item.key.end());
382 std::string id = GetSessionId(key);
383 if (sessionIds.count(id) == 0) {
384 sessionIds[id] = GetTime(key);
385 }
386 if (oldestTime == 0 || oldestTime > sessionIds[id]) {
387 oldestTime = sessionIds[id];
388 deleteKey = GetPrefixWithoutDeviceId(appId, id);
389 }
390 }
391 if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
392 return;
393 }
394 ZLOGI("app object is full, delete oldest one %{public}s", deleteKey.c_str());
395 int32_t result = RevokeSaveToStore(deleteKey);
396 if (result != OBJECT_SUCCESS) {
397 ZLOGE("RevokeSaveToStore fail %{public}d", result);
398 return;
399 }
400 }
401
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const std::map<std::string,std::vector<uint8_t>> & data)402 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
403 const std::string &toDeviceId, const std::map<std::string, std::vector<uint8_t>> &data)
404 {
405 ProcessOldEntry(appId);
406 RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
407 std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
408 std::vector<DistributedDB::Entry> entries;
409 for (auto &item : data) {
410 DistributedDB::Entry entry;
411 std::string tmp = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
412 entry.key = std::vector<uint8_t>(tmp.begin(), tmp.end());
413 entry.value = item.second;
414 entries.emplace_back(entry);
415 }
416 auto status = delegate_->PutBatch(entries);
417 if (status != DistributedDB::DBStatus::OK) {
418 ZLOGE("putBatch fail %{public}d", status);
419 }
420 return status;
421 }
422
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)423 int32_t ObjectStoreManager::SyncOnStore(
424 const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
425 {
426 std::vector<std::string> syncDevices;
427 for (auto &device : deviceList) {
428 // save to local, do not need sync
429 if (device == LOCAL_DEVICE) {
430 ZLOGI("save to local successful");
431 std::map<std::string, int32_t> result;
432 result[LOCAL_DEVICE] = OBJECT_SUCCESS;
433 callback(result);
434 return OBJECT_SUCCESS;
435 }
436 syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
437 }
438 if (!syncDevices.empty()) {
439 uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
440 DistributedDB::Query dbQuery = DistributedDB::Query::Select();
441 dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
442 ZLOGD("start sync");
443 auto status = delegate_->Sync(
444 syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
445 [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
446 ZLOGI("objectstore sync finished");
447 std::map<std::string, DistributedDB::DBStatus> result;
448 for (auto &item : devicesMap) {
449 result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
450 }
451 SyncCompleted(result, sequenceId);
452 },
453 dbQuery, false);
454 if (status != DistributedDB::DBStatus::OK) {
455 ZLOGE("sync error %{public}d", status);
456 std::string tmp;
457 SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
458 return status;
459 }
460 SetSyncStatus(true);
461 } else {
462 ZLOGI("single device");
463 callback(std::map<std::string, int32_t>());
464 return OBJECT_SUCCESS;
465 }
466 return OBJECT_SUCCESS;
467 }
468
SetSyncStatus(bool status)469 int32_t ObjectStoreManager::SetSyncStatus(bool status)
470 {
471 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
472 isSyncing_ = status;
473 return OBJECT_SUCCESS;
474 }
475
RevokeSaveToStore(const std::string & prefix)476 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
477 {
478 std::vector<DistributedDB::Entry> entries;
479 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
480 if (status == DistributedDB::DBStatus::NOT_FOUND) {
481 ZLOGI("not found entry");
482 return OBJECT_SUCCESS;
483 }
484 if (status != DistributedDB::DBStatus::OK) {
485 ZLOGE("GetEntries failed,please check DB status");
486 return DB_ERROR;
487 }
488 std::vector<std::vector<uint8_t>> keys;
489 std::for_each(
490 entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) { keys.emplace_back(entry.key); });
491 if (!keys.empty()) {
492 status = delegate_->DeleteBatch(keys);
493 if (status != DistributedDB::DBStatus::OK) {
494 ZLOGE("DeleteBatch failed,please check DB status, status = %{public}d", status);
495 return DB_ERROR;
496 }
497 }
498 return OBJECT_SUCCESS;
499 }
500
RetrieveFromStore(const std::string & appId,const std::string & sessionId,std::map<std::string,std::vector<uint8_t>> & results)501 int32_t ObjectStoreManager::RetrieveFromStore(
502 const std::string &appId, const std::string &sessionId, std::map<std::string, std::vector<uint8_t>> &results)
503 {
504 std::vector<DistributedDB::Entry> entries;
505 std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
506 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
507 if (status != DistributedDB::DBStatus::OK) {
508 ZLOGE("GetEntries failed,please check DB status, status = %{public}d", status);
509 return DB_ERROR;
510 }
511 ZLOGI("GetEntries successfully");
512 std::for_each(entries.begin(), entries.end(), [&results, this](const DistributedDB::Entry &entry) {
513 std::string key(entry.key.begin(), entry.key.end());
514 results[GetPropertyName(key)] = entry.value;
515 });
516 return OBJECT_SUCCESS;
517 }
518
ProcessKeyByIndex(std::string & key,uint8_t index)519 void ObjectStoreManager::ProcessKeyByIndex(std::string &key, uint8_t index)
520 {
521 std::size_t pos;
522 uint8_t i = 0;
523 do {
524 pos = key.find(SEPERATOR);
525 if (pos == std::string::npos) {
526 return;
527 }
528 key.erase(0, pos + 1);
529 i++;
530 } while (i < index);
531 }
532
GetPropertyName(const std::string & key)533 std::string ObjectStoreManager::GetPropertyName(const std::string &key)
534 {
535 std::string result = key;
536 ProcessKeyByIndex(result, 5); // property name is after 5 '_'
537 return result;
538 }
539
GetSessionId(const std::string & key)540 std::string ObjectStoreManager::GetSessionId(const std::string &key)
541 {
542 std::string result = key;
543 ProcessKeyByIndex(result, 1); // sessionId is after 1 '_'
544 auto pos = result.find(SEPERATOR);
545 if (pos == std::string::npos) {
546 return result;
547 }
548 result.erase(pos);
549 return result;
550 }
551
GetTime(const std::string & key)552 int64_t ObjectStoreManager::GetTime(const std::string &key)
553 {
554 std::string result = key;
555 std::size_t pos;
556 int8_t i = 0;
557 do {
558 pos = result.find(SEPERATOR);
559 result.erase(0, pos + 1);
560 i++;
561 } while (pos != std::string::npos && i < 4); // time is after 4 '_'
562 pos = result.find(SEPERATOR);
563 result.erase(pos);
564 char *end = nullptr;
565 return std::strtol(result.c_str(), &end, DECIMAL_BASE);
566 }
567
CloseAfterMinute()568 void ObjectStoreManager::CloseAfterMinute()
569 {
570 executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&ObjectStoreManager::Close, this));
571 }
572
GetBundleName(const std::string & key)573 std::string ObjectStoreManager::GetBundleName(const std::string &key)
574 {
575 std::size_t pos = key.find(SEPERATOR);
576 if (pos == std::string::npos) {
577 return std::string();
578 }
579 std::string result = key;
580 result.erase(pos);
581 return result;
582 }
583
SetThreadPool(std::shared_ptr<ExecutorPool> executors)584 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
585 {
586 executors_ = executors;
587 }
588
AddNotifier(const std::string & userId,SyncCallBack & callback)589 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
590 {
591 std::lock_guard<std::mutex> lock(notifierLock_);
592 uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
593 userIdSeqIdRelations_[userId].emplace_back(sequenceId);
594 seqIdCallbackRelations_[sequenceId] = callback;
595 return sequenceId;
596 }
597
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)598 SequenceSyncManager::Result SequenceSyncManager::Process(
599 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
600 {
601 std::lock_guard<std::mutex> lock(notifierLock_);
602 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
603 ZLOGE("not exist");
604 return ERR_SID_NOT_EXIST;
605 }
606 std::map<std::string, int32_t> syncResults;
607 for (auto &item : results) {
608 syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
609 }
610 seqIdCallbackRelations_[sequenceId](syncResults);
611 ZLOGD("end complete");
612 return DeleteNotifierNoLock(sequenceId, userId);
613 }
614
DeleteNotifier(uint64_t sequenceId,std::string & userId)615 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
616 {
617 std::lock_guard<std::mutex> lock(notifierLock_);
618 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
619 ZLOGE("not exist");
620 return ERR_SID_NOT_EXIST;
621 }
622 return DeleteNotifierNoLock(sequenceId, userId);
623 }
624
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)625 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
626 {
627 seqIdCallbackRelations_.erase(sequenceId);
628 auto userIdIter = userIdSeqIdRelations_.begin();
629 while (userIdIter != userIdSeqIdRelations_.end()) {
630 auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
631 if (sIdIter != userIdIter->second.end()) {
632 userIdIter->second.erase(sIdIter);
633 if (userIdIter->second.empty()) {
634 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
635 userId = userIdIter->first;
636 userIdSeqIdRelations_.erase(userIdIter);
637 return SUCCESS_USER_HAS_FINISHED;
638 } else {
639 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
640 return SUCCESS_USER_IN_USE;
641 }
642 }
643 userIdIter++;
644 }
645 return SUCCESS_USER_HAS_FINISHED;
646 }
647 } // namespace DistributedObject
648 } // namespace OHOS
649