• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ObjectStoreManager"
16 
17 #include "object_manager.h"
18 
19 #include <regex>
20 
21 #include "accesstoken_kit.h"
22 #include "account/account_delegate.h"
23 #include "block_data.h"
24 #include "bootstrap.h"
25 #include "common/bytes.h"
26 #include "common/string_utils.h"
27 #include "datetime_ex.h"
28 #include "distributed_file_daemon_manager.h"
29 #include "device_matrix.h"
30 #include "ipc_skeleton.h"
31 #include "metadata/capability_meta_data.h"
32 #include "kvstore_utils.h"
33 #include "log_print.h"
34 #include "metadata/meta_data_manager.h"
35 #include "metadata/object_user_meta_data.h"
36 #include "metadata/store_meta_data.h"
37 #include "object_dms_handler.h"
38 #include "object_radar_reporter.h"
39 #include "utils/anonymous.h"
40 
41 namespace OHOS {
42 namespace DistributedObject {
43 using namespace OHOS::DistributedKv;
44 using namespace Security::AccessToken;
45 using StoreMetaData = OHOS::DistributedData::StoreMetaData;
46 using Account = OHOS::DistributedData::AccountDelegate;
47 using AccessTokenKit = Security::AccessToken::AccessTokenKit;
48 using ValueProxy = OHOS::DistributedData::ValueProxy;
49 using DistributedFileDaemonManager = Storage::DistributedFile::DistributedFileDaemonManager;
50 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
51 
52 constexpr const char *SAVE_INFO = "p_###SAVEINFO###";
53 constexpr int32_t PROGRESS_MAX = 100;
54 constexpr int32_t PROGRESS_INVALID = -1;
55 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
ObjectStoreManager()56 ObjectStoreManager::ObjectStoreManager()
57 {
58     ZLOGI("ObjectStoreManager construct");
59     RegisterAssetsLister();
60 }
61 
~ObjectStoreManager()62 ObjectStoreManager::~ObjectStoreManager()
63 {
64 }
65 
OpenObjectKvStore()66 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
67 {
68     DistributedDB::KvStoreNbDelegate *store = nullptr;
69     DistributedDB::KvStoreNbDelegate::Option option;
70     option.createDirByStoreIdOnly = true;
71     option.syncDualTupleMode = true;
72     option.secOption = { DistributedDB::S1, DistributedDB::ECE };
73     ZLOGD("start GetKvStore");
74     kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
75         [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
76             if (dbStatus != DistributedDB::DBStatus::OK) {
77                 ZLOGE("GetKvStore fail %{public}d", dbStatus);
78                 return;
79             }
80             if (kvStoreNbDelegate == nullptr) {
81                 ZLOGE("GetKvStore kvStoreNbDelegate is nullptr");
82                 return;
83             }
84             ZLOGI("GetKvStore successsfully");
85             store = kvStoreNbDelegate;
86             std::vector<uint8_t> tmpKey;
87             DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
88                 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
89                 &objectDataListener_);
90             if (status != DistributedDB::DBStatus::OK) {
91                 ZLOGE("RegisterObserver err %{public}d", status);
92             }
93         });
94     return store;
95 }
96 
RegisterAssetsLister()97 bool ObjectStoreManager::RegisterAssetsLister()
98 {
99     if (objectAssetsSendListener_ == nullptr) {
100         objectAssetsSendListener_ = new ObjectAssetsSendListener();
101     }
102     if (objectAssetsRecvListener_ == nullptr) {
103         objectAssetsRecvListener_ = new ObjectAssetsRecvListener();
104     }
105     auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_);
106     if (status != DistributedDB::DBStatus::OK) {
107         ZLOGE("Register assetsRecvListener err %{public}d", status);
108         return false;
109     }
110     return true;
111 }
112 
UnRegisterAssetsLister()113 bool ObjectStoreManager::UnRegisterAssetsLister()
114 {
115     ZLOGI("ObjectStoreManager UnRegisterAssetsLister");
116     if (objectAssetsRecvListener_ != nullptr) {
117         auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_);
118         if (status != DistributedDB::DBStatus::OK) {
119             ZLOGE("UnRegister assetsRecvListener err %{public}d", status);
120             return false;
121         }
122     }
123     return true;
124 }
125 
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)126 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
127     const std::string &sessionId, const std::string &deviceId)
128 {
129     if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
130         return;
131     }
132     int32_t result = Open();
133     if (result != OBJECT_SUCCESS) {
134         ZLOGE("Open failed, errCode = %{public}d", result);
135         return;
136     }
137     // delete local data
138     result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
139     if (result != OBJECT_SUCCESS) {
140         ZLOGE("Save failed, status = %{public}d", result);
141     }
142     Close();
143     return;
144 }
145 
Save(const std::string & appId,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId,sptr<IRemoteObject> callback)146 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
147     const ObjectRecord &data, const std::string &deviceId, sptr<IRemoteObject> callback)
148 {
149     auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
150     if (proxy == nullptr) {
151         ZLOGE("proxy is nullptr, callback is %{public}s.", (callback == nullptr) ? "nullptr" : "not null");
152         return INVALID_ARGUMENT;
153     }
154     if (deviceId.size() == 0) {
155         ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(),
156             Anonymous::Change(sessionId).c_str());
157         proxy->Completed(std::map<std::string, int32_t>());
158         return INVALID_ARGUMENT;
159     }
160     int32_t result = Open();
161     if (result != OBJECT_SUCCESS) {
162         ZLOGE("Open object kvstore failed, result: %{public}d", result);
163         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
164             ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED);
165         proxy->Completed(std::map<std::string, int32_t>());
166         return STORE_NOT_OPEN;
167     }
168     SaveUserToMeta();
169     std::string dstBundleName = ObjectDmsHandler::GetInstance().GetDstBundleName(appId, deviceId);
170     result = SaveToStore(dstBundleName, sessionId, deviceId, data);
171     if (result != OBJECT_SUCCESS) {
172         ZLOGE("Save to store failed, result: %{public}d", result);
173         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
174             ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
175         Close();
176         proxy->Completed(std::map<std::string, int32_t>());
177         return result;
178     }
179     ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(),
180         Anonymous::Change(sessionId).c_str(), Anonymous::Change(deviceId).c_str());
181     SyncCallBack syncCallback =
182         [proxy, dstBundleName, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
183             ProcessSyncCallback(results, dstBundleName, sessionId, deviceId);
184             proxy->Completed(results);
185         };
186     result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), { deviceId }, syncCallback);
187     if (result != OBJECT_SUCCESS) {
188         ZLOGE("Sync data failed, result: %{public}d", result);
189         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
190             ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
191         Close();
192         proxy->Completed(std::map<std::string, int32_t>());
193         return result;
194     }
195     Close();
196     return PushAssets(appId, dstBundleName, sessionId, data, deviceId);
197 }
198 
PushAssets(const std::string & srcBundleName,const std::string & dstBundleName,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId)199 int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const std::string &dstBundleName,
200     const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId)
201 {
202     Assets assets = GetAssetsFromDBRecords(data);
203     if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) {
204         return OBJECT_SUCCESS;
205     }
206     sptr<AssetObj> assetObj = new (std::nothrow) AssetObj();
207     if (assetObj == nullptr) {
208         ZLOGE("New AssetObj failed");
209         return OBJECT_INNER_ERROR;
210     }
211     assetObj->dstBundleName_ = dstBundleName;
212     assetObj->srcBundleName_ = srcBundleName;
213     assetObj->dstNetworkId_ = deviceId;
214     assetObj->sessionId_ = sessionId;
215     for (const auto& asset : assets) {
216         assetObj->uris_.push_back(asset.uri);
217     }
218     if (objectAssetsSendListener_ == nullptr) {
219         objectAssetsSendListener_ = new ObjectAssetsSendListener();
220     }
221     int userId = std::atoi(GetCurrentUser().c_str());
222     auto status = ObjectAssetLoader::GetInstance().PushAsset(userId, assetObj, objectAssetsSendListener_);
223     return status;
224 }
225 
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)226 int32_t ObjectStoreManager::RevokeSave(
227     const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
228 {
229     auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
230     if (proxy == nullptr) {
231         ZLOGE("proxy is nullptr, callback is %{public}s, appId: %{public}s, sessionId: %{public}s.",
232             (callback == nullptr) ? "nullptr" : "not null", appId.c_str(), Anonymous::Change(sessionId).c_str());
233         return INVALID_ARGUMENT;
234     }
235     int32_t result = Open();
236     if (result != OBJECT_SUCCESS) {
237         ZLOGE("Open failed, errCode = %{public}d", result);
238         proxy->Completed(STORE_NOT_OPEN);
239         return STORE_NOT_OPEN;
240     }
241 
242     result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
243     if (result != OBJECT_SUCCESS) {
244         ZLOGE("RevokeSave failed, errCode = %{public}d", result);
245         Close();
246         proxy->Completed(result);
247         return result;
248     }
249     std::vector<std::string> deviceList;
250     auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
251     std::for_each(deviceInfos.begin(), deviceInfos.end(),
252         [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
253     if (!deviceList.empty()) {
254         SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
255             ZLOGI("revoke save finished");
256             proxy->Completed(OBJECT_SUCCESS);
257         };
258         result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
259         if (result != OBJECT_SUCCESS) {
260             ZLOGE("sync failed, errCode = %{public}d", result);
261             proxy->Completed(result);
262         }
263     } else {
264         proxy->Completed(OBJECT_SUCCESS);
265     };
266     Close();
267     return result;
268 }
269 
Retrieve(const std::string & bundleName,const std::string & sessionId,sptr<IRemoteObject> callback,uint32_t tokenId)270 int32_t ObjectStoreManager::Retrieve(
271     const std::string &bundleName, const std::string &sessionId, sptr<IRemoteObject> callback, uint32_t tokenId)
272 {
273     auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
274     if (proxy == nullptr) {
275         ZLOGE("proxy is nullptr, callback is %{public}s.", (callback == nullptr) ? "nullptr" : "not null");
276         return INVALID_ARGUMENT;
277     }
278     int32_t result = Open();
279     if (result != OBJECT_SUCCESS) {
280         ZLOGE("Open object kvstore failed, result: %{public}d", result);
281         proxy->Completed(ObjectRecord(), false);
282         return ObjectStore::GETKV_FAILED;
283     }
284     ObjectRecord results{};
285     int32_t status = RetrieveFromStore(bundleName, sessionId, results);
286     if (status != OBJECT_SUCCESS) {
287         ZLOGE("Retrieve from store failed, status: %{public}d, close after one minute.", status);
288         CloseAfterMinute();
289         proxy->Completed(ObjectRecord(), false);
290         return status;
291     }
292     bool allReady = false;
293     Assets assets = GetAssetsFromDBRecords(results);
294     if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) {
295         allReady = true;
296     } else {
297         restoreStatus_.ComputeIfPresent(bundleName + sessionId, [&allReady](const auto &key, auto &value) {
298             if (value == RestoreStatus::ALL_READY) {
299                 allReady = true;
300                 return false;
301             }
302             if (value == RestoreStatus::DATA_READY) {
303                 value = RestoreStatus::DATA_NOTIFIED;
304             }
305             return true;
306         });
307     }
308     status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId));
309     Close();
310     if (status != OBJECT_SUCCESS) {
311         ZLOGE("Revoke save failed, status: %{public}d", status);
312         proxy->Completed(ObjectRecord(), false);
313         return status;
314     }
315     proxy->Completed(results, allReady);
316     if (allReady) {
317         ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
318             ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
319     }
320     return status;
321 }
322 
Clear()323 int32_t ObjectStoreManager::Clear()
324 {
325     ZLOGI("enter");
326     DistributedData::ObjectUserMetaData userMeta;
327     if (!DistributedData::MetaDataManager::GetInstance().LoadMeta(userMeta.GetKey(), userMeta, true)) {
328         ZLOGE("load meta error");
329         return OBJECT_INNER_ERROR;
330     }
331     if (userMeta.userId.empty()) {
332         ZLOGI("no object user meta, don't need clean");
333         return OBJECT_SUCCESS;
334     }
335     std::string userId = GetCurrentUser();
336     if (userId.empty()) {
337         ZLOGE("no user error");
338         return OBJECT_INNER_ERROR;
339     }
340     if (userMeta.userId == userId) {
341         ZLOGI("user is same, don't need clear, user:%{public}s.", userId.c_str());
342         return OBJECT_SUCCESS;
343     }
344     ZLOGI("user changed, need clear, userId:%{public}s", userId.c_str());
345     int32_t result = Open();
346     if (result != OBJECT_SUCCESS) {
347         ZLOGE("Open failed, errCode = %{public}d", result);
348         return STORE_NOT_OPEN;
349     }
350     result = RevokeSaveToStore("");
351     callbacks_.Clear();
352     processCallbacks_.Clear();
353     ForceClose();
354     return result;
355 }
356 
InitUserMeta()357 int32_t ObjectStoreManager::InitUserMeta()
358 {
359     ObjectUserMetaData userMeta;
360     if (DistributedData::MetaDataManager::GetInstance().LoadMeta(userMeta.GetKey(), userMeta, true)) {
361         ZLOGI("userId has been set, don't need clean");
362         return OBJECT_SUCCESS;
363     }
364     std::string userId = GetCurrentUser();
365     if (userId.empty()) {
366         ZLOGI("get userId error");
367         return OBJECT_INNER_ERROR;
368     }
369     userMeta.userId = userId;
370     std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
371     std::string metaKey = GetMetaUserIdKey(userId, appId);
372     if (!DistributedData::MetaDataManager::GetInstance().DelMeta(metaKey, true)) {
373         ZLOGE("delete old meta error, userId:%{public}s", userId.c_str());
374         return OBJECT_INNER_ERROR;
375     }
376     if (!DistributedData::MetaDataManager::GetInstance().SaveMeta(DistributedData::ObjectUserMetaData::GetKey(),
377         userMeta, true)) {
378         ZLOGE("save meta error, userId:%{public}s", userId.c_str());
379         return OBJECT_INNER_ERROR;
380     }
381     return OBJECT_SUCCESS;
382 }
383 
DeleteByAppId(const std::string & appId,int32_t user)384 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user)
385 {
386     int32_t result = Open();
387     if (result != OBJECT_SUCCESS) {
388         ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
389             appId.c_str(), user);
390         return STORE_NOT_OPEN;
391     }
392     result = RevokeSaveToStore(appId);
393     if (result != OBJECT_SUCCESS) {
394         ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
395             appId.c_str(), user);
396     }
397     Close();
398     return result;
399 }
400 
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)401 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
402                                                 pid_t pid, uint32_t tokenId,
403                                                 sptr<IRemoteObject> callback)
404 {
405     if (bundleName.empty() || sessionId.empty()) {
406         ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
407         return;
408     }
409     ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
410     auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
411     if (proxy == nullptr) {
412         ZLOGE("proxy is nullptr, callback is %{public}s, bundleName: %{public}s, sessionId: %{public}s, pid: "
413               "%{public}d, tokenId: %{public}u.",
414             (callback == nullptr) ? "nullptr" : "not null", bundleName.c_str(), Anonymous::Change(sessionId).c_str(),
415             pid, tokenId);
416         return;
417     }
418     std::string prefix = bundleName + sessionId;
419     callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
420         if (value.pid != pid) {
421             value = CallbackInfo { pid };
422         }
423         value.observers_.insert_or_assign(prefix, proxy);
424         return !value.observers_.empty();
425     }));
426 }
427 
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)428 void ObjectStoreManager::UnregisterRemoteCallback(
429     const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId)
430 {
431     if (bundleName.empty()) {
432         ZLOGD("bundleName is empty");
433         return;
434     }
435     callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
436         if (value.pid != pid) {
437             return true;
438         }
439         if (sessionId.empty()) {
440             return false;
441         }
442         std::string prefix = bundleName + sessionId;
443         for (auto it = value.observers_.begin(); it != value.observers_.end();) {
444             if ((*it).first == prefix) {
445                 it = value.observers_.erase(it);
446             } else {
447                 ++it;
448             }
449         }
450         return true;
451     }));
452 }
453 
RegisterProgressObserverCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)454 void ObjectStoreManager::RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId,
455     pid_t pid, uint32_t tokenId, sptr<IRemoteObject> callback)
456 {
457     if (bundleName.empty() || sessionId.empty()) {
458         ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback empty bundleName = %{public}s, sessionId = "
459               "%{public}s",
460             bundleName.c_str(), DistributedData::Anonymous::Change(sessionId).c_str());
461         return;
462     }
463     auto proxy = iface_cast<ObjectProgressCallbackProxy>(callback);
464     if (proxy == nullptr) {
465         ZLOGE("proxy is nullptr, callback is %{public}s, bundleName: %{public}s, sessionId: %{public}s, pid: "
466               "%{public}d, tokenId: %{public}u.",
467             (callback == nullptr) ? "nullptr" : "not null", bundleName.c_str(), Anonymous::Change(sessionId).c_str(),
468             pid, tokenId);
469         return;
470     }
471     std::string objectKey = bundleName + sessionId;
472     sptr<ObjectProgressCallbackProxy> observer;
473     processCallbacks_.Compute(
474         tokenId, ([pid, &proxy, &objectKey, &observer](const uint32_t key, ProgressCallbackInfo &value) {
475             if (value.pid != pid) {
476                 value = ProgressCallbackInfo{ pid };
477             }
478             value.observers_.insert_or_assign(objectKey, proxy);
479             observer = value.observers_[objectKey];
480             return !value.observers_.empty();
481         }));
482     int32_t progress = PROGRESS_INVALID;
483     {
484         std::lock_guard<std::mutex> lock(progressMutex_);
485         auto it = progressInfo_.find(objectKey);
486         if (it == progressInfo_.end()) {
487             return;
488         }
489         progress = it->second;
490     }
491     if (observer != nullptr && progress != PROGRESS_INVALID) {
492         observer->Completed(progress);
493     }
494     if (progress == PROGRESS_MAX) {
495         assetsRecvProgress_.Erase(objectKey);
496         std::lock_guard<std::mutex> lock(progressMutex_);
497         progressInfo_.erase(objectKey);
498     }
499 }
500 
UnregisterProgressObserverCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)501 void ObjectStoreManager::UnregisterProgressObserverCallback(
502     const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId)
503 {
504     if (bundleName.empty()) {
505         ZLOGD("bundleName is empty");
506         return;
507     }
508     processCallbacks_.Compute(
509         tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, ProgressCallbackInfo &value) {
510         if (value.pid != pid) {
511             return true;
512         }
513         if (sessionId.empty()) {
514             return false;
515         }
516         std::string prefix = bundleName + sessionId;
517         for (auto it = value.observers_.begin(); it != value.observers_.end();) {
518             if ((*it).first == prefix) {
519                 it = value.observers_.erase(it);
520             } else {
521                 ++it;
522             }
523         }
524         return true;
525     }));
526 }
527 
NotifyChange(const ObjectRecord & changedData)528 void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData)
529 {
530     ZLOGI("OnChange start, size:%{public}zu", changedData.size());
531     bool hasAsset = false;
532     SaveInfo saveInfo;
533     for (const auto &[key, value] : changedData) {
534         if (key.find(SAVE_INFO) != std::string::npos) {
535             DistributedData::Serializable::Unmarshall(std::string(value.begin(), value.end()), saveInfo);
536             break;
537         }
538     }
539     auto data = GetObjectData(changedData, saveInfo, hasAsset);
540     if (!hasAsset) {
541         ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
542             ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
543         callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
544             DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
545             return false;
546         });
547         return;
548     }
549     NotifyDataChanged(data, saveInfo);
550     SaveUserToMeta();
551 }
552 
GetObjectData(const ObjectRecord & changedData,SaveInfo & saveInfo,bool & hasAsset)553 std::map<std::string, ObjectRecord> ObjectStoreManager::GetObjectData(const ObjectRecord& changedData,
554     SaveInfo& saveInfo, bool& hasAsset)
555 {
556     std::map<std::string, ObjectRecord> data;
557     std::string keyPrefix = saveInfo.ToPropertyPrefix();
558     if (!keyPrefix.empty()) {
559         std::string observerKey = saveInfo.bundleName + saveInfo.sessionId;
560         for (const auto &[key, value] : changedData) {
561             if (key.size() < keyPrefix.size() || key.find(SAVE_INFO) != std::string::npos) {
562                 continue;
563             }
564             std::string propertyName = key.substr(keyPrefix.size());
565             data[observerKey].insert_or_assign(propertyName, value);
566             if (!hasAsset && IsAssetKey(propertyName)) {
567                 hasAsset = true;
568             }
569         }
570     } else {
571         for (const auto &item : changedData) {
572             std::vector<std::string> splitKeys = SplitEntryKey(item.first);
573             if (splitKeys.size() <= PROPERTY_NAME_INDEX) {
574                 continue;
575             }
576             if (saveInfo.sourceDeviceId.empty() || saveInfo.bundleName.empty()) {
577                 saveInfo.sourceDeviceId = splitKeys[SOURCE_DEVICE_UDID_INDEX];
578                 saveInfo.bundleName = splitKeys[BUNDLE_NAME_INDEX];
579                 saveInfo.sessionId = splitKeys[SESSION_ID_INDEX];
580                 saveInfo.timestamp = splitKeys[TIME_INDEX];
581             }
582             std::string prefix = splitKeys[BUNDLE_NAME_INDEX] + splitKeys[SESSION_ID_INDEX];
583             std::string propertyName = splitKeys[PROPERTY_NAME_INDEX];
584             data[prefix].insert_or_assign(propertyName, item.second);
585             if (IsAssetKey(propertyName)) {
586                 hasAsset = true;
587             }
588         }
589     }
590     return data;
591 }
592 
ComputeStatus(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)593 void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
594     const std::map<std::string, ObjectRecord>& data)
595 {
596     restoreStatus_.Compute(objectKey, [this, &data, saveInfo] (const auto &key, auto &value) {
597         if (value == RestoreStatus::ASSETS_READY) {
598             value = RestoreStatus::ALL_READY;
599             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
600                 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS);
601             callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
602                 DoNotify(tokenId, value, data, true);
603                 return false;
604             });
605         } else {
606             value = RestoreStatus::DATA_READY;
607             ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
608                 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
609             callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
610                 DoNotify(tokenId, value, data, false);
611                 return false;
612             });
613             WaitAssets(key, saveInfo, data);
614         }
615         return true;
616     });
617 }
618 
IsNeedMetaSync(const StoreMetaData & meta,const std::vector<std::string> & uuids)619 bool ObjectStoreManager::IsNeedMetaSync(const StoreMetaData &meta, const std::vector<std::string> &uuids)
620 {
621     bool isAfterMeta = false;
622     for (const auto &uuid : uuids) {
623         auto metaData = meta;
624         metaData.deviceId = uuid;
625         CapMetaData capMeta;
626         auto capKey = CapMetaRow::GetKeyFor(uuid);
627         bool flag = !MetaDataManager::GetInstance().LoadMeta(std::string(capKey.begin(), capKey.end()), capMeta) ||
628             !MetaDataManager::GetInstance().LoadMeta(metaData.GetKeyWithoutPath(), metaData);
629         if (flag) {
630             isAfterMeta = true;
631             break;
632         }
633         auto [exist, mask] = DeviceMatrix::GetInstance().GetRemoteMask(uuid);
634         if ((mask & DeviceMatrix::META_STORE_MASK) == DeviceMatrix::META_STORE_MASK) {
635             isAfterMeta = true;
636             break;
637         }
638         auto [existLocal, localMask] = DeviceMatrix::GetInstance().GetMask(uuid);
639         if ((localMask & DeviceMatrix::META_STORE_MASK) == DeviceMatrix::META_STORE_MASK) {
640             isAfterMeta = true;
641             break;
642         }
643     }
644     return isAfterMeta;
645 }
646 
NotifyDataChanged(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)647 void ObjectStoreManager::NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
648 {
649     for (auto const& [objectKey, results] : data) {
650         restoreStatus_.ComputeIfAbsent(
651             objectKey, [](const std::string& key) -> auto {
652             return RestoreStatus::NONE;
653         });
654         ComputeStatus(objectKey, saveInfo, data);
655     }
656 }
657 
WaitAssets(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)658 int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
659     const std::map<std::string, ObjectRecord>& data)
660 {
661     if (executors_ == nullptr) {
662         ZLOGE("executors_ is null");
663         return OBJECT_INNER_ERROR;
664     }
665     auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() {
666         ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str());
667         PullAssets(data, saveInfo);
668         DoNotifyWaitAssetTimeout(objectKey);
669     });
670 
671     objectTimer_.ComputeIfAbsent(
672         objectKey, [taskId](const std::string& key) -> auto {
673             return taskId;
674     });
675     return  OBJECT_SUCCESS;
676 }
677 
PullAssets(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)678 void ObjectStoreManager::PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
679 {
680     std::map<std::string, Assets> changedAssets;
681     for (auto const& [objectId, result] : data) {
682         changedAssets[objectId] = GetAssetsFromDBRecords(result);
683     }
684     for (const auto& [objectId, assets] : changedAssets) {
685         std::string networkId = DmAdaper::GetInstance().ToNetworkID(saveInfo.sourceDeviceId);
686         auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
687         ObjectAssetLoader::GetInstance().TransferAssetsAsync(std::atoi(GetCurrentUser().c_str()),
688             saveInfo.bundleName, networkId, assets, [this, block](bool success) {
689                 block->SetValue({ false, success });
690         });
691         auto [timeout, success] = block->GetValue();
692         ZLOGI("Pull assets end, timeout: %{public}d, success: %{public}d, size:%{public}zu, deviceId: %{public}s",
693             timeout, success, assets.size(), DistributedData::Anonymous::Change(networkId).c_str());
694     }
695 }
696 
NotifyAssetsRecvProgress(const std::string & objectKey,int32_t progress)697 void ObjectStoreManager::NotifyAssetsRecvProgress(const std::string &objectKey, int32_t progress)
698 {
699     assetsRecvProgress_.InsertOrAssign(objectKey, progress);
700     std::list<sptr<ObjectProgressCallbackProxy>> observers;
701     bool flag = false;
702     processCallbacks_.ForEach(
703         [&objectKey, &observers, &flag](uint32_t tokenId, const ProgressCallbackInfo &value) {
704             if (value.observers_.empty()) {
705                 flag = true;
706                 return false;
707             }
708             auto it = value.observers_.find(objectKey);
709             if (it != value.observers_.end()) {
710                 observers.push_back(it->second);
711             }
712             return false;
713         });
714     if (flag) {
715         std::lock_guard<std::mutex> lock(progressMutex_);
716         progressInfo_.insert_or_assign(objectKey, progress);
717     }
718     for (auto &observer : observers) {
719         if (observer == nullptr) {
720             continue;
721         }
722         observer->Completed(progress);
723     }
724     if (!observers.empty() && progress == PROGRESS_MAX) {
725         assetsRecvProgress_.Erase(objectKey);
726         std::lock_guard<std::mutex> lock(progressMutex_);
727         progressInfo_.erase(objectKey);
728     }
729 }
730 
NotifyAssetsReady(const std::string & objectKey,const std::string & bundleName,const std::string & srcNetworkId)731 void ObjectStoreManager::NotifyAssetsReady(
732     const std::string &objectKey, const std::string &bundleName, const std::string &srcNetworkId)
733 {
734     if (executors_ == nullptr) {
735         ZLOGE("executors_ is nullptr");
736         return;
737     }
738     restoreStatus_.ComputeIfAbsent(
739         objectKey, [](const std::string& key) -> auto {
740         return RestoreStatus::NONE;
741     });
742     restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) {
743         if (value == RestoreStatus::DATA_NOTIFIED) {
744             value = RestoreStatus::ALL_READY;
745             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
746                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
747             callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) {
748                 DoNotifyAssetsReady(tokenId, value,  key, true);
749                 return false;
750             });
751         } else if (value == RestoreStatus::DATA_READY) {
752             value = RestoreStatus::ALL_READY;
753             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
754                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
755             auto [has, taskId] = objectTimer_.Find(key);
756             if (has) {
757                 executors_->Remove(taskId);
758                 objectTimer_.Erase(key);
759             }
760         } else {
761             value = RestoreStatus::ASSETS_READY;
762             ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
763                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, bundleName);
764         }
765         return true;
766     });
767 }
768 
NotifyAssetsStart(const std::string & objectKey,const std::string & srcNetworkId)769 void ObjectStoreManager::NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId)
770 {
771     restoreStatus_.ComputeIfAbsent(
772         objectKey, [](const std::string& key) -> auto {
773         return RestoreStatus::NONE;
774     });
775 }
776 
IsAssetKey(const std::string & key)777 bool ObjectStoreManager::IsAssetKey(const std::string& key)
778 {
779     return key.find(ObjectStore::ASSET_DOT) != std::string::npos;
780 }
781 
IsAssetComplete(const ObjectRecord & result,const std::string & assetPrefix)782 bool ObjectStoreManager::IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix)
783 {
784     if (result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
785         result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
786         result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() ||
787         result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() ||
788         result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
789         result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
790         return false;
791     }
792     return true;
793 }
794 
GetAssetsFromDBRecords(const ObjectRecord & result)795 Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result)
796 {
797     Assets assets{};
798     std::set<std::string> assetKey;
799     for (const auto& [key, value] : result) {
800         std::string assetPrefix = key.substr(0, key.find(ObjectStore::ASSET_DOT));
801         if (!IsAssetKey(key) || assetKey.find(assetPrefix) != assetKey.end() ||
802             result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
803             result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
804             result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
805             result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
806             continue;
807         }
808         Asset asset;
809         ObjectStore::StringUtils::BytesToStrWithType(
810             result.find(assetPrefix + ObjectStore::NAME_SUFFIX)->second, asset.name);
811         if (asset.name.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
812             asset.name = asset.name.substr(ObjectStore::STRING_PREFIX_LEN);
813         }
814         ObjectStore::StringUtils::BytesToStrWithType(
815             result.find(assetPrefix + ObjectStore::URI_SUFFIX)->second, asset.uri);
816         if (asset.uri.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
817             asset.uri = asset.uri.substr(ObjectStore::STRING_PREFIX_LEN);
818         }
819         ObjectStore::StringUtils::BytesToStrWithType(
820             result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX)->second, asset.modifyTime);
821         if (asset.modifyTime.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
822             asset.modifyTime = asset.modifyTime.substr(ObjectStore::STRING_PREFIX_LEN);
823         }
824         ObjectStore::StringUtils::BytesToStrWithType(
825             result.find(assetPrefix + ObjectStore::SIZE_SUFFIX)->second, asset.size);
826         if (asset.size.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
827             asset.size = asset.size.substr(ObjectStore::STRING_PREFIX_LEN);
828         }
829         asset.hash = asset.modifyTime + "_" + asset.size;
830         assets.push_back(asset);
831         assetKey.insert(assetPrefix);
832     }
833     return assets;
834 }
835 
DoNotify(uint32_t tokenId,const CallbackInfo & value,const std::map<std::string,ObjectRecord> & data,bool allReady)836 void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value,
837     const std::map<std::string, ObjectRecord>& data, bool allReady)
838 {
839     for (const auto& observer : value.observers_) {
840         auto it = data.find(observer.first);
841         if (it == data.end()) {
842             continue;
843         }
844         observer.second->Completed((*it).second, allReady);
845         if (allReady) {
846             restoreStatus_.Erase(observer.first);
847             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
848                 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
849         } else {
850             restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) {
851                 value = RestoreStatus::DATA_NOTIFIED;
852                 return true;
853             });
854         }
855     }
856 }
857 
DoNotifyAssetsReady(uint32_t tokenId,const CallbackInfo & value,const std::string & objectKey,bool allReady)858 void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value,
859     const std::string& objectKey, bool allReady)
860 {
861     if (executors_ == nullptr) {
862         ZLOGE("executors_ is nullptr");
863         return;
864     }
865     for (const auto& observer : value.observers_) {
866         if (objectKey != observer.first) {
867             continue;
868         }
869         observer.second->Completed(ObjectRecord(), allReady);
870         if (allReady) {
871             restoreStatus_.Erase(objectKey);
872             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
873                 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
874         }
875         auto [has, taskId] = objectTimer_.Find(objectKey);
876         if (has) {
877             executors_->Remove(taskId);
878             objectTimer_.Erase(objectKey);
879         }
880     }
881 }
882 
DoNotifyWaitAssetTimeout(const std::string & objectKey)883 void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey)
884 {
885     if (executors_ == nullptr) {
886         ZLOGE("executors_ is nullptr");
887         return;
888     }
889     ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
890         ObjectStore::ASSETS_RECV, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT);
891     callbacks_.ForEach([this, &objectKey](uint32_t tokenId, const CallbackInfo &value) {
892         for (const auto& observer : value.observers_) {
893             if (objectKey != observer.first) {
894                 continue;
895             }
896             observer.second->Completed(ObjectRecord(), true);
897             restoreStatus_.Erase(objectKey);
898             auto [has, taskId] = objectTimer_.Find(objectKey);
899             if (has) {
900                 executors_->Remove(taskId);
901                 objectTimer_.Erase(objectKey);
902             }
903             ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
904                 ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED);
905         }
906         return false;
907     });
908 }
909 
SetData(const std::string & dataDir,const std::string & userId)910 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
911 {
912     ZLOGI("enter, user: %{public}s", userId.c_str());
913     kvStoreDelegateManager_ = std::make_shared<DistributedDB::KvStoreDelegateManager>
914         (DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
915     DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
916     auto status = kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
917     if (status != DistributedDB::OK) {
918         ZLOGE("Set kvstore config failed, status: %{public}d", status);
919         return;
920     }
921     userId_ = userId;
922 }
923 
Open()924 int32_t ObjectStoreManager::Open()
925 {
926     if (kvStoreDelegateManager_ == nullptr) {
927         ZLOGE("Kvstore delegate manager not init");
928         return OBJECT_INNER_ERROR;
929     }
930     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
931     if (delegate_ == nullptr) {
932         delegate_ = OpenObjectKvStore();
933         if (delegate_ == nullptr) {
934             ZLOGE("Open object kvstore failed");
935             return OBJECT_DBSTATUS_ERROR;
936         }
937         syncCount_ = 1;
938         ZLOGI("Open object kvstore success");
939     } else {
940         syncCount_++;
941         ZLOGI("Object kvstore syncCount: %{public}d", syncCount_.load());
942     }
943     return OBJECT_SUCCESS;
944 }
945 
ForceClose()946 void ObjectStoreManager::ForceClose()
947 {
948     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(LOCK_TIMEOUT));
949     if (delegate_ == nullptr) {
950         return;
951     }
952     auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
953     if (status != DistributedDB::DBStatus::OK) {
954         ZLOGE("CloseKvStore fail %{public}d", status);
955         return;
956     }
957     delegate_ = nullptr;
958     isSyncing_ = false;
959     syncCount_ = 0;
960 }
961 
Close()962 void ObjectStoreManager::Close()
963 {
964     int32_t taskCount = 0;
965     {
966         std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
967         if (delegate_ == nullptr) {
968             return;
969         }
970         taskCount = delegate_->GetTaskCount();
971     }
972     if (taskCount > 0 && syncCount_ == 1) {
973         CloseAfterMinute();
974         ZLOGW("Store is busy, close after a minute, task count: %{public}d", taskCount);
975         return;
976     }
977     syncCount_--;
978     ZLOGI("closed a store, syncCount = %{public}d", syncCount_.load());
979     FlushClosedStore();
980 }
981 
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)982 void ObjectStoreManager::SyncCompleted(
983     const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
984 {
985     std::string userId;
986     SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
987     if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
988         SetSyncStatus(false);
989         FlushClosedStore();
990     }
991     for (const auto &item : results) {
992         if (item.second == DistributedDB::DBStatus::OK) {
993             ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
994             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE,
995                 ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
996         } else {
997             ZLOGE("Sync data failed, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", sequenceId, item.second);
998             ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
999                 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, item.second, ObjectStore::FINISHED);
1000         }
1001     }
1002 }
1003 
FlushClosedStore()1004 void ObjectStoreManager::FlushClosedStore()
1005 {
1006     if (executors_ == nullptr) {
1007         ZLOGE("executors_ is nullptr");
1008         return;
1009     }
1010     std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
1011     if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
1012         ZLOGD("close store");
1013         auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
1014         if (status != DistributedDB::DBStatus::OK) {
1015             int timeOut = 1000;
1016             executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
1017                 FlushClosedStore();
1018             });
1019             ZLOGE("GetEntries fail %{public}d", status);
1020             return;
1021         }
1022         delegate_ = nullptr;
1023     }
1024 }
1025 
ProcessOldEntry(const std::string & appId)1026 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
1027 {
1028     std::vector<DistributedDB::Entry> entries;
1029     {
1030         std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1031         if (delegate_ == nullptr) {
1032             ZLOGE("delegate is nullptr.");
1033             return;
1034         }
1035         auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
1036         if (status != DistributedDB::DBStatus::OK) {
1037             ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status);
1038             return;
1039         }
1040     }
1041     std::map<std::string, int64_t> sessionIds;
1042     int64_t oldestTime = 0;
1043     std::string deleteKey;
1044     for (auto &item : entries) {
1045         std::string key(item.key.begin(), item.key.end());
1046         std::vector<std::string> splitKeys = SplitEntryKey(key);
1047         if (splitKeys.empty()) {
1048             continue;
1049         }
1050         std::string bundleName = splitKeys[BUNDLE_NAME_INDEX];
1051         std::string sessionId = splitKeys[SESSION_ID_INDEX];
1052         if (sessionIds.count(sessionId) == 0) {
1053             char *end = nullptr;
1054             sessionIds[sessionId] = strtol(splitKeys[TIME_INDEX].c_str(), &end, DECIMAL_BASE);
1055         }
1056         if (oldestTime == 0 || oldestTime > sessionIds[sessionId]) {
1057             oldestTime = sessionIds[sessionId];
1058             deleteKey = GetPrefixWithoutDeviceId(bundleName, sessionId);
1059         }
1060     }
1061     if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
1062         return;
1063     }
1064     int32_t result = RevokeSaveToStore(deleteKey);
1065     if (result != OBJECT_SUCCESS) {
1066         ZLOGE("Delete old entries failed, deleteKey: %{public}s, status: %{public}d", deleteKey.c_str(), result);
1067         return;
1068     }
1069     ZLOGI("Delete old entries success, deleteKey: %{public}s", deleteKey.c_str());
1070 }
1071 
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const ObjectRecord & data)1072 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
1073     const std::string &toDeviceId, const ObjectRecord &data)
1074 {
1075     ProcessOldEntry(appId);
1076     RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
1077     std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
1078     std::string prefix = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR;
1079     DistributedDB::Entry saveInfoEntry;
1080     std::string saveInfoKey = prefix + SAVE_INFO;
1081     saveInfoEntry.key = std::vector<uint8_t>(saveInfoKey.begin(), saveInfoKey.end());
1082     SaveInfo saveInfo(appId, sessionId, DmAdaper::GetInstance().GetLocalDevice().udid, toDeviceId, timestamp);
1083     std::string saveInfoValue = DistributedData::Serializable::Marshall(saveInfo);
1084     saveInfoEntry.value = std::vector<uint8_t>(saveInfoValue.begin(), saveInfoValue.end());
1085     std::vector<DistributedDB::Entry> entries;
1086     entries.emplace_back(saveInfoEntry);
1087     for (const auto &item : data) {
1088         DistributedDB::Entry entry;
1089         std::string key = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
1090         entry.key = std::vector<uint8_t>(key.begin(), key.end());
1091         entry.value = item.second;
1092         entries.emplace_back(entry);
1093     }
1094     {
1095         std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1096         if (delegate_ == nullptr) {
1097             ZLOGE("delegate is nullptr.");
1098             return E_DB_ERROR;
1099         }
1100         auto status = delegate_->PutBatch(entries);
1101         if (status != DistributedDB::DBStatus::OK) {
1102             ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
1103                   "status: %{public}d",
1104                 appId.c_str(), Anonymous::Change(sessionId).c_str(), Anonymous::Change(toDeviceId).c_str(), status);
1105             return status;
1106         }
1107     }
1108     ZLOGI("PutBatch success, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
1109           "count: %{public}zu",
1110         appId.c_str(), Anonymous::Change(sessionId).c_str(), Anonymous::Change(toDeviceId).c_str(), entries.size());
1111     return OBJECT_SUCCESS;
1112 }
1113 
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)1114 int32_t ObjectStoreManager::SyncOnStore(
1115     const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
1116 {
1117     std::vector<std::string> syncDevices;
1118     for (auto const &device : deviceList) {
1119         if (device == LOCAL_DEVICE) {
1120             ZLOGI("Save to local, do not need sync, prefix: %{public}s", Anonymous::Change(prefix).c_str());
1121             callback({{LOCAL_DEVICE, OBJECT_SUCCESS}});
1122             return OBJECT_SUCCESS;
1123         }
1124         syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
1125     }
1126     if (syncDevices.empty()) {
1127         ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
1128         callback(std::map<std::string, int32_t>());
1129         return OBJECT_SUCCESS;
1130     }
1131     uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
1132 
1133     int32_t id = AccountDelegate::GetInstance()->GetUserByToken(IPCSkeleton::GetCallingFullTokenID());
1134     StoreMetaData meta = StoreMetaData(std::to_string(id), Bootstrap::GetInstance().GetProcessLabel(),
1135         DistributedObject::ObjectCommon::OBJECTSTORE_DB_STOREID);
1136     auto uuids = DmAdapter::GetInstance().ToUUID(syncDevices);
1137     bool isNeedMetaSync = IsNeedMetaSync(meta, uuids);
1138     if (!isNeedMetaSync) {
1139         return DoSync(prefix, syncDevices, sequenceId);
1140     }
1141     bool result = MetaDataManager::GetInstance().Sync(uuids, [this, prefix, syncDevices, sequenceId](auto &results) {
1142         auto status = DoSync(prefix, syncDevices, sequenceId);
1143         ZLOGI("Store sync after meta sync end, status:%{public}d", status);
1144     });
1145     ZLOGI("prefix:%{public}s, meta sync end, result:%{public}d", Anonymous::Change(prefix).c_str(), result);
1146     return result ? OBJECT_SUCCESS : DoSync(prefix, syncDevices, sequenceId);
1147 }
1148 
DoSync(const std::string & prefix,const std::vector<std::string> & deviceList,uint64_t sequenceId)1149 int32_t ObjectStoreManager::DoSync(const std::string &prefix, const std::vector<std::string> &deviceList,
1150     uint64_t sequenceId)
1151 {
1152     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1153     if (delegate_ == nullptr) {
1154         ZLOGE("db store was closed.");
1155         return E_DB_ERROR;
1156     }
1157     DistributedDB::Query dbQuery = DistributedDB::Query::Select();
1158     dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
1159     ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
1160     auto status = delegate_->Sync(deviceList, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
1161         [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
1162             ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
1163             std::map<std::string, DistributedDB::DBStatus> result;
1164             for (auto &item : devicesMap) {
1165                 result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
1166             }
1167             SyncCompleted(result, sequenceId);
1168         }, dbQuery, false);
1169     if (status != DistributedDB::DBStatus::OK) {
1170         ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d",
1171             Anonymous::Change(prefix).c_str(), sequenceId, status);
1172         std::string tmp;
1173         SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
1174         return status;
1175     }
1176     SetSyncStatus(true);
1177     return OBJECT_SUCCESS;
1178 }
1179 
SetSyncStatus(bool status)1180 int32_t ObjectStoreManager::SetSyncStatus(bool status)
1181 {
1182     isSyncing_ = status;
1183     return OBJECT_SUCCESS;
1184 }
1185 
RevokeSaveToStore(const std::string & prefix)1186 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
1187 {
1188     std::vector<DistributedDB::Entry> entries;
1189     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1190     if (delegate_ == nullptr) {
1191         ZLOGE("db store was closed.");
1192         return E_DB_ERROR;
1193     }
1194     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
1195     if (status == DistributedDB::DBStatus::NOT_FOUND) {
1196         ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
1197         return OBJECT_SUCCESS;
1198     }
1199     if (status != DistributedDB::DBStatus::OK) {
1200         ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
1201         return DB_ERROR;
1202     }
1203     std::vector<std::vector<uint8_t>> keys;
1204     std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) {
1205         keys.emplace_back(entry.key);
1206     });
1207     if (keys.empty()) {
1208         return OBJECT_SUCCESS;
1209     }
1210     status = delegate_->DeleteBatch(keys);
1211     if (status != DistributedDB::DBStatus::OK) {
1212         ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(),
1213             status);
1214         return DB_ERROR;
1215     }
1216     ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(),
1217         keys.size());
1218     return OBJECT_SUCCESS;
1219 }
1220 
RetrieveFromStore(const std::string & appId,const std::string & sessionId,ObjectRecord & results)1221 int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const std::string &sessionId,
1222     ObjectRecord &results)
1223 {
1224     std::vector<DistributedDB::Entry> entries;
1225     std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
1226     std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1227     if (delegate_ == nullptr) {
1228         ZLOGE("db store was closed.");
1229         return E_DB_ERROR;
1230     }
1231     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
1232     if (status == DistributedDB::DBStatus::NOT_FOUND) {
1233         ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
1234         return KEY_NOT_FOUND;
1235     }
1236     if (status != DistributedDB::DBStatus::OK) {
1237         ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
1238         return DB_ERROR;
1239     }
1240     ZLOGI("GetEntries success,prefix:%{public}s,count:%{public}zu", Anonymous::Change(prefix).c_str(), entries.size());
1241     for (const auto &entry : entries) {
1242         std::string key(entry.key.begin(), entry.key.end());
1243         if (key.find(SAVE_INFO) != std::string::npos) {
1244             continue;
1245         }
1246         auto splitKeys = SplitEntryKey(key);
1247         if (!splitKeys.empty()) {
1248             results[splitKeys[PROPERTY_NAME_INDEX]] = entry.value;
1249         }
1250     }
1251     return OBJECT_SUCCESS;
1252 }
1253 
SaveInfo(const std::string & bundleName,const std::string & sessionId,const std::string & sourceDeviceId,const std::string & targetDeviceId,const std::string & timestamp)1254 ObjectStoreManager::SaveInfo::SaveInfo(const std::string &bundleName, const std::string &sessionId,
1255     const std::string &sourceDeviceId, const std::string &targetDeviceId, const std::string &timestamp)
1256     : bundleName(bundleName), sessionId(sessionId), sourceDeviceId(sourceDeviceId), targetDeviceId(targetDeviceId),
1257     timestamp(timestamp) {}
1258 
Marshal(json & node) const1259 bool ObjectStoreManager::SaveInfo::Marshal(json &node) const
1260 {
1261     SetValue(node[GET_NAME(bundleName)], bundleName);
1262     SetValue(node[GET_NAME(sessionId)], sessionId);
1263     SetValue(node[GET_NAME(sourceDeviceId)], sourceDeviceId);
1264     SetValue(node[GET_NAME(targetDeviceId)], targetDeviceId);
1265     SetValue(node[GET_NAME(timestamp)], timestamp);
1266     return true;
1267 }
1268 
Unmarshal(const json & node)1269 bool ObjectStoreManager::SaveInfo::Unmarshal(const json &node)
1270 {
1271     GetValue(node, GET_NAME(bundleName), bundleName);
1272     GetValue(node, GET_NAME(sessionId), sessionId);
1273     GetValue(node, GET_NAME(sourceDeviceId), sourceDeviceId);
1274     GetValue(node, GET_NAME(targetDeviceId), targetDeviceId);
1275     GetValue(node, GET_NAME(timestamp), timestamp);
1276     return true;
1277 }
1278 
ToPropertyPrefix()1279 std::string ObjectStoreManager::SaveInfo::ToPropertyPrefix()
1280 {
1281     if (bundleName.empty() || sessionId.empty() || sourceDeviceId.empty() || targetDeviceId.empty() ||
1282         timestamp.empty()) {
1283         return "";
1284     }
1285     return bundleName + SEPERATOR + sessionId + SEPERATOR + sourceDeviceId + SEPERATOR + targetDeviceId + SEPERATOR +
1286         timestamp + SEPERATOR;
1287 }
1288 
SplitEntryKey(const std::string & key)1289 std::vector<std::string> ObjectStoreManager::SplitEntryKey(const std::string &key)
1290 {
1291     std::smatch match;
1292     std::regex timeRegex(TIME_REGEX);
1293     if (!std::regex_search(key, match, timeRegex)) {
1294         ZLOGW("Format error, key.size = %{public}zu", key.size());
1295         return {};
1296     }
1297     auto timePos = match.position();
1298     std::string fromTime = key.substr(timePos + 1);
1299     std::string beforeTime = key.substr(0, timePos);
1300 
1301     size_t targetDevicePos = beforeTime.find_last_of(SEPERATOR);
1302     if (targetDevicePos == std::string::npos) {
1303         ZLOGW("Format error, key.size = %{public}zu", key.size());
1304         return {};
1305     }
1306     std::string targetDevice = beforeTime.substr(targetDevicePos + 1);
1307     std::string beforeTargetDevice = beforeTime.substr(0, targetDevicePos);
1308 
1309     size_t sourceDeviceUdidPos = beforeTargetDevice.find_last_of(SEPERATOR);
1310     if (sourceDeviceUdidPos == std::string::npos) {
1311         ZLOGW("Format error, key.size = %{public}zu", key.size());
1312         return {};
1313     }
1314     std::string sourceDeviceUdid = beforeTargetDevice.substr(sourceDeviceUdidPos + 1);
1315     std::string beforeSourceDeviceUdid = beforeTargetDevice.substr(0, sourceDeviceUdidPos);
1316 
1317     size_t sessionIdPos = beforeSourceDeviceUdid.find_last_of(SEPERATOR);
1318     if (sessionIdPos == std::string::npos) {
1319         ZLOGW("Format error, key.size = %{public}zu", key.size());
1320         return {};
1321     }
1322     std::string sessionId = beforeSourceDeviceUdid.substr(sessionIdPos + 1);
1323     std::string bundleName = beforeSourceDeviceUdid.substr(0, sessionIdPos);
1324 
1325     size_t propertyNamePos = fromTime.find_first_of(SEPERATOR);
1326     if (propertyNamePos == std::string::npos) {
1327         ZLOGW("Format error, key.size = %{public}zu", key.size());
1328         return {};
1329     }
1330     std::string propertyName = fromTime.substr(propertyNamePos + 1);
1331     std::string time = fromTime.substr(0, propertyNamePos);
1332 
1333     return { bundleName, sessionId, sourceDeviceUdid, targetDevice, time, propertyName };
1334 }
1335 
GetCurrentUser()1336 std::string ObjectStoreManager::GetCurrentUser()
1337 {
1338     std::vector<int> users;
1339     auto ret = AccountDelegate::GetInstance()->QueryUsers(users);
1340     if (!ret || users.empty()) {
1341         ZLOGE("failed to query os accounts, ret:%{public}d", ret);
1342         return "";
1343     }
1344     return std::to_string(users[0]);
1345 }
1346 
SaveUserToMeta()1347 void ObjectStoreManager::SaveUserToMeta()
1348 {
1349     ZLOGD("start.");
1350     std::string userId = GetCurrentUser();
1351     if (userId.empty()) {
1352         return;
1353     }
1354     std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
1355     DistributedData::ObjectUserMetaData userMeta;
1356     userMeta.userId = userId;
1357     auto saved = DistributedData::MetaDataManager::GetInstance().SaveMeta(
1358         DistributedData::ObjectUserMetaData::GetKey(), userMeta, true);
1359     if (!saved) {
1360         ZLOGE("userMeta save failed, userId:%{public}s", userId.c_str());
1361     }
1362 }
1363 
CloseAfterMinute()1364 void ObjectStoreManager::CloseAfterMinute()
1365 {
1366     if (executors_ == nullptr) {
1367         ZLOGE("executors_ is nullptr.");
1368         return;
1369     }
1370     executors_->Schedule(std::chrono::minutes(INTERVAL), [this]() {
1371         Close();
1372     });
1373 }
1374 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)1375 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
1376 {
1377     executors_ = executors;
1378 }
1379 
AddNotifier(const std::string & userId,SyncCallBack & callback)1380 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
1381 {
1382     std::lock_guard<std::mutex> lock(notifierLock_);
1383     uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
1384     userIdSeqIdRelations_[userId].emplace_back(sequenceId);
1385     seqIdCallbackRelations_[sequenceId] = callback;
1386     return sequenceId;
1387 }
1388 
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)1389 SequenceSyncManager::Result SequenceSyncManager::Process(
1390     uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
1391 {
1392     std::lock_guard<std::mutex> lock(notifierLock_);
1393     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1394         ZLOGE("not exist");
1395         return ERR_SID_NOT_EXIST;
1396     }
1397     std::map<std::string, int32_t> syncResults;
1398     for (const auto &item : results) {
1399         syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
1400     }
1401     seqIdCallbackRelations_[sequenceId](syncResults);
1402     ZLOGD("end complete");
1403     return DeleteNotifierNoLock(sequenceId, userId);
1404 }
1405 
DeleteNotifier(uint64_t sequenceId,std::string & userId)1406 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
1407 {
1408     std::lock_guard<std::mutex> lock(notifierLock_);
1409     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1410         ZLOGE("not exist");
1411         return ERR_SID_NOT_EXIST;
1412     }
1413     return DeleteNotifierNoLock(sequenceId, userId);
1414 }
1415 
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)1416 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
1417 {
1418     seqIdCallbackRelations_.erase(sequenceId);
1419     auto userIdIter = userIdSeqIdRelations_.begin();
1420     while (userIdIter != userIdSeqIdRelations_.end()) {
1421         auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
1422         if (sIdIter != userIdIter->second.end()) {
1423             userIdIter->second.erase(sIdIter);
1424             if (userIdIter->second.empty()) {
1425                 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
1426                 userId = userIdIter->first;
1427                 userIdSeqIdRelations_.erase(userIdIter);
1428                 return SUCCESS_USER_HAS_FINISHED;
1429             } else {
1430                 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
1431                 return SUCCESS_USER_IN_USE;
1432             }
1433         }
1434         userIdIter++;
1435     }
1436     return SUCCESS_USER_HAS_FINISHED;
1437 }
1438 
BindAsset(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,ObjectStore::Asset & asset,ObjectStore::AssetBindInfo & bindInfo)1439 int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
1440     ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo)
1441 {
1442     auto snapshotKey = appId + SEPERATOR + sessionId;
1443     snapshots_.ComputeIfAbsent(
1444         snapshotKey, [](const std::string& key) -> auto {
1445             return std::make_shared<ObjectSnapshot>();
1446         });
1447     auto storeKey = appId + SEPERATOR + bindInfo.storeName;
1448     bindSnapshots_.ComputeIfAbsent(
1449         storeKey, [](const std::string& key) -> auto {
1450             return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1451         });
1452     auto snapshots = snapshots_.Find(snapshotKey).second;
1453     bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) {
1454         value->emplace(std::pair{asset.uri, snapshots});
1455         return true;
1456     });
1457 
1458     if (AccessTokenKit::GetTokenTypeFlag(tokenId) != TOKEN_HAP) {
1459         ZLOGE("TokenType is not TOKEN_HAP, token:0x%{public}x, bundleName:%{public}s", tokenId, appId.c_str());
1460         return GeneralError::E_ERROR;
1461     }
1462     HapTokenInfo tokenInfo;
1463     auto status = AccessTokenKit::GetHapTokenInfo(tokenId, tokenInfo);
1464     if (status != RET_SUCCESS) {
1465         ZLOGE("token:0x%{public}x, result:%{public}d, bundleName:%{public}s", tokenId, status, appId.c_str());
1466         return GeneralError::E_ERROR;
1467     }
1468     StoreInfo storeInfo;
1469     storeInfo.bundleName = appId;
1470     storeInfo.tokenId = tokenId;
1471     storeInfo.instanceId = tokenInfo.instIndex;
1472     storeInfo.user = tokenInfo.userID;
1473     storeInfo.storeName = bindInfo.storeName;
1474 
1475     snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) {
1476         value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo);
1477         return true;
1478     });
1479     return OBJECT_SUCCESS;
1480 }
1481 
ConvertBindInfo(ObjectStore::AssetBindInfo & bindInfo)1482 DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo)
1483 {
1484     return DistributedData::AssetBindInfo{
1485         .storeName = std::move(bindInfo.storeName),
1486         .tableName = std::move(bindInfo.tableName),
1487         .primaryKey = ValueProxy::Convert(std::move(bindInfo.primaryKey)),
1488         .field = std::move(bindInfo.field),
1489         .assetName = std::move(bindInfo.assetName),
1490     };
1491 }
1492 
OnAssetChanged(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,const std::string & deviceId,const ObjectStore::Asset & asset)1493 int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId,
1494     const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset)
1495 {
1496     const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
1497     auto objectAsset = asset;
1498     Asset dataAsset =  ValueProxy::Convert(std::move(objectAsset));
1499     auto snapshotKey = appId + SEPERATOR + sessionId;
1500     int32_t res = OBJECT_SUCCESS;
1501     bool exist = snapshots_.ComputeIfPresent(snapshotKey,
1502         [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr<Snapshot> snapshot) {
1503             if (snapshot != nullptr) {
1504                 res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange
1505             }
1506             return snapshot != nullptr;
1507         });
1508     if (exist) {
1509         return res;
1510     }
1511 
1512     auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
1513     ObjectAssetLoader::GetInstance().TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) {
1514         block->SetValue({ false, ret });
1515     });
1516     auto [timeout, success] = block->GetValue();
1517     if (timeout || !success) {
1518         ZLOGE("transfer failed, timeout: %{public}d, success: %{public}d, name: %{public}s, deviceId: %{public}s ",
1519             timeout, success, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
1520         return OBJECT_INNER_ERROR;
1521     }
1522     return OBJECT_SUCCESS;
1523 }
1524 
GetSnapShots(const std::string & bundleName,const std::string & storeName)1525 ObjectStoreManager::UriToSnapshot ObjectStoreManager::GetSnapShots(const std::string& bundleName,
1526     const std::string& storeName)
1527 {
1528     auto storeKey = bundleName + SEPERATOR + storeName;
1529     bindSnapshots_.ComputeIfAbsent(
1530         storeKey, [](const std::string& key) -> auto {
1531             return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1532         });
1533     return bindSnapshots_.Find(storeKey).second;
1534 }
1535 
DeleteSnapshot(const std::string & bundleName,const std::string & sessionId)1536 void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std::string& sessionId)
1537 {
1538     auto snapshotKey = bundleName + SEPERATOR + sessionId;
1539     auto snapshot = snapshots_.Find(snapshotKey).second;
1540     if (snapshot == nullptr) {
1541         ZLOGD("Not find snapshot, don't need delete");
1542         return;
1543     }
1544     bindSnapshots_.ForEach([snapshot](auto& key, auto& value) {
1545         for (auto pos = value->begin(); pos != value->end();) {
1546             if (pos->second == snapshot) {
1547                 pos = value->erase(pos);
1548             } else {
1549                 ++pos;
1550             }
1551         }
1552         return true;
1553     });
1554     snapshots_.Erase(snapshotKey);
1555 }
1556 
AutoLaunchStore()1557 int32_t ObjectStoreManager::AutoLaunchStore()
1558 {
1559     int32_t status = Open();
1560     if (status != OBJECT_SUCCESS) {
1561         ZLOGE("Open fail %{public}d", status);
1562         return status;
1563     }
1564     CloseAfterMinute();
1565     ZLOGI("Auto launch, close after a minute");
1566     return OBJECT_SUCCESS;
1567 }
1568 } // namespace DistributedObject
1569 } // namespace OHOS
1570