• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "ObjectStoreManager"
16 
17 #include "object_manager.h"
18 
19 #include <regex>
20 
21 #include "accesstoken_kit.h"
22 #include "account/account_delegate.h"
23 #include "block_data.h"
24 #include "bootstrap.h"
25 #include "common/bytes.h"
26 #include "common/string_utils.h"
27 #include "datetime_ex.h"
28 #include "distributed_file_daemon_manager.h"
29 #include "kvstore_utils.h"
30 #include "log_print.h"
31 #include "metadata/meta_data_manager.h"
32 #include "metadata/store_meta_data.h"
33 #include "object_dms_handler.h"
34 #include "object_radar_reporter.h"
35 #include "utils/anonymous.h"
36 
37 namespace OHOS {
38 namespace DistributedObject {
39 using namespace OHOS::DistributedKv;
40 using namespace Security::AccessToken;
41 using StoreMetaData = OHOS::DistributedData::StoreMetaData;
42 using Account = OHOS::DistributedData::AccountDelegate;
43 using AccessTokenKit = Security::AccessToken::AccessTokenKit;
44 using ValueProxy = OHOS::DistributedData::ValueProxy;
45 using DistributedFileDaemonManager = Storage::DistributedFile::DistributedFileDaemonManager;
46 constexpr const char *SAVE_INFO = "p_###SAVEINFO###";
ObjectStoreManager()47 ObjectStoreManager::ObjectStoreManager()
48 {
49     ZLOGI("ObjectStoreManager construct");
50     RegisterAssetsLister();
51 }
52 
~ObjectStoreManager()53 ObjectStoreManager::~ObjectStoreManager()
54 {
55     ZLOGI("ObjectStoreManager destroy");
56     if (objectAssetsRecvListener_ != nullptr) {
57         auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_);
58         if (status != DistributedDB::DBStatus::OK) {
59             ZLOGE("UnRegister assetsRecvListener err %{public}d", status);
60         }
61     }
62 }
63 
OpenObjectKvStore()64 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
65 {
66     DistributedDB::KvStoreNbDelegate *store = nullptr;
67     DistributedDB::KvStoreNbDelegate::Option option;
68     option.createDirByStoreIdOnly = true;
69     option.syncDualTupleMode = true;
70     option.secOption = { DistributedDB::S1, DistributedDB::ECE };
71     if (objectDataListener_ == nullptr) {
72         objectDataListener_ = new ObjectDataListener();
73     }
74     ZLOGD("start GetKvStore");
75     kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
76         [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
77             if (dbStatus != DistributedDB::DBStatus::OK) {
78                 ZLOGE("GetKvStore fail %{public}d", dbStatus);
79                 return;
80             }
81             ZLOGI("GetKvStore successsfully");
82             store = kvStoreNbDelegate;
83             std::vector<uint8_t> tmpKey;
84             DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
85                 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
86                 objectDataListener_);
87             if (status != DistributedDB::DBStatus::OK) {
88                 ZLOGE("RegisterObserver err %{public}d", status);
89             }
90         });
91     return store;
92 }
93 
RegisterAssetsLister()94 bool ObjectStoreManager::RegisterAssetsLister()
95 {
96     if (objectAssetsSendListener_ == nullptr) {
97         objectAssetsSendListener_ = new ObjectAssetsSendListener();
98     }
99     if (objectAssetsRecvListener_ == nullptr) {
100         objectAssetsRecvListener_ = new ObjectAssetsRecvListener();
101     }
102     auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_);
103     if (status != DistributedDB::DBStatus::OK) {
104         ZLOGE("Register assetsRecvListener err %{public}d", status);
105         return false;
106     }
107     return true;
108 }
109 
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)110 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
111     const std::string &sessionId, const std::string &deviceId)
112 {
113     if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
114         return;
115     }
116     int32_t result = Open();
117     if (result != OBJECT_SUCCESS) {
118         ZLOGE("Open failed, errCode = %{public}d", result);
119         return;
120     }
121     // delete local data
122     result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
123     if (result != OBJECT_SUCCESS) {
124         ZLOGE("Save failed, status = %{public}d", result);
125     }
126     Close();
127     return;
128 }
129 
Save(const std::string & appId,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId,sptr<IRemoteObject> callback)130 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
131     const ObjectRecord &data, const std::string &deviceId, sptr<IRemoteObject> callback)
132 {
133     auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
134     if (deviceId.size() == 0) {
135         ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(), sessionId.c_str());
136         proxy->Completed(std::map<std::string, int32_t>());
137         return INVALID_ARGUMENT;
138     }
139     int32_t result = Open();
140     if (result != OBJECT_SUCCESS) {
141         ZLOGE("Open object kvstore failed, result: %{public}d", result);
142         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
143             ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED);
144         proxy->Completed(std::map<std::string, int32_t>());
145         return STORE_NOT_OPEN;
146     }
147     SaveUserToMeta();
148     std::string dstBundleName = ObjectDmsHandler::GetInstance().GetDstBundleName(appId, deviceId);
149     result = SaveToStore(dstBundleName, sessionId, deviceId, data);
150     if (result != OBJECT_SUCCESS) {
151         ZLOGE("Save to store failed, result: %{public}d", result);
152         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
153             ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
154         Close();
155         proxy->Completed(std::map<std::string, int32_t>());
156         return result;
157     }
158     ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(),
159         sessionId.c_str(), Anonymous::Change(deviceId).c_str());
160     SyncCallBack syncCallback =
161         [proxy, dstBundleName, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
162             ProcessSyncCallback(results, dstBundleName, sessionId, deviceId);
163             proxy->Completed(results);
164         };
165     result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), {deviceId}, syncCallback);
166     if (result != OBJECT_SUCCESS) {
167         ZLOGE("Sync data failed, result: %{public}d", result);
168         ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
169             ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
170         Close();
171         proxy->Completed(std::map<std::string, int32_t>());
172         return result;
173     }
174     Close();
175     return PushAssets(appId, dstBundleName, sessionId, data, deviceId);
176 }
177 
PushAssets(const std::string & srcBundleName,const std::string & dstBundleName,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId)178 int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const std::string &dstBundleName,
179     const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId)
180 {
181     Assets assets = GetAssetsFromDBRecords(data);
182     if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) {
183         return OBJECT_SUCCESS;
184     }
185     sptr<AssetObj> assetObj = new AssetObj();
186     assetObj->dstBundleName_ = dstBundleName;
187     assetObj->srcBundleName_ = srcBundleName;
188     assetObj->dstNetworkId_ = deviceId;
189     assetObj->sessionId_ = sessionId;
190     for (const auto& asset : assets) {
191         assetObj->uris_.push_back(asset.uri);
192     }
193     if (objectAssetsSendListener_ == nullptr) {
194         objectAssetsSendListener_ = new ObjectAssetsSendListener();
195     }
196     int userId = std::atoi(GetCurrentUser().c_str());
197     auto status =  ObjectAssetLoader::GetInstance()->PushAsset(userId, assetObj, objectAssetsSendListener_);
198     return status;
199 }
200 
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)201 int32_t ObjectStoreManager::RevokeSave(
202     const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
203 {
204     auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
205     int32_t result = Open();
206     if (result != OBJECT_SUCCESS) {
207         ZLOGE("Open failed, errCode = %{public}d", result);
208         proxy->Completed(STORE_NOT_OPEN);
209         return STORE_NOT_OPEN;
210     }
211 
212     result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
213     if (result != OBJECT_SUCCESS) {
214         ZLOGE("RevokeSave failed, errCode = %{public}d", result);
215         Close();
216         proxy->Completed(result);
217         return result;
218     }
219     std::vector<std::string> deviceList;
220     auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
221     std::for_each(deviceInfos.begin(), deviceInfos.end(),
222         [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
223     if (!deviceList.empty()) {
224         SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
225             ZLOGI("revoke save finished");
226             proxy->Completed(OBJECT_SUCCESS);
227         };
228         result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
229         if (result != OBJECT_SUCCESS) {
230             ZLOGE("sync failed, errCode = %{public}d", result);
231             proxy->Completed(result);
232         }
233     } else {
234         proxy->Completed(OBJECT_SUCCESS);
235     };
236     Close();
237     return result;
238 }
239 
Retrieve(const std::string & bundleName,const std::string & sessionId,sptr<IRemoteObject> callback,uint32_t tokenId)240 int32_t ObjectStoreManager::Retrieve(
241     const std::string &bundleName, const std::string &sessionId, sptr<IRemoteObject> callback, uint32_t tokenId)
242 {
243     auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
244     int32_t result = Open();
245     if (result != OBJECT_SUCCESS) {
246         ZLOGE("Open object kvstore failed, result: %{public}d", result);
247         proxy->Completed(ObjectRecord(), false);
248         return ObjectStore::GETKV_FAILED;
249     }
250     ObjectRecord results{};
251     int32_t status = RetrieveFromStore(bundleName, sessionId, results);
252     if (status != OBJECT_SUCCESS) {
253         ZLOGI("Retrieve from store failed, status: %{public}d, close after one minute.", status);
254         CloseAfterMinute();
255         proxy->Completed(ObjectRecord(), false);
256         return status;
257     }
258     bool allReady = false;
259     Assets assets = GetAssetsFromDBRecords(results);
260     if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) {
261         allReady = true;
262     } else {
263         auto objectKey = bundleName + sessionId;
264         restoreStatus_.ComputeIfPresent(objectKey, [&allReady](const auto &key, auto &value) {
265             if (value == RestoreStatus::ALL_READY) {
266                 allReady = true;
267                 return false;
268             }
269             if (value == RestoreStatus::DATA_READY) {
270                 value = RestoreStatus::DATA_NOTIFIED;
271             }
272             return true;
273         });
274     }
275     status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId));
276     if (status != OBJECT_SUCCESS) {
277         ZLOGE("Revoke save failed, status: %{public}d", status);
278         Close();
279         proxy->Completed(ObjectRecord(), false);
280         return status;
281     }
282     Close();
283     proxy->Completed(results, allReady);
284     if (allReady) {
285         ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
286             ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
287     }
288     return status;
289 }
290 
Clear()291 int32_t ObjectStoreManager::Clear()
292 {
293     ZLOGI("enter");
294     std::string userId = GetCurrentUser();
295     if (userId.empty()) {
296         return OBJECT_INNER_ERROR;
297     }
298     std::vector<StoreMetaData> metaData;
299     std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
300     std::string metaKey = GetMetaUserIdKey(userId, appId);
301     if (!DistributedData::MetaDataManager::GetInstance().LoadMeta(metaKey, metaData, true)) {
302         ZLOGE("no store of %{public}s", appId.c_str());
303         return OBJECT_STORE_NOT_FOUND;
304     }
305     for (const auto &storeMeta : metaData) {
306         if (storeMeta.storeType < StoreMetaData::StoreType::STORE_OBJECT_BEGIN
307             || storeMeta.storeType > StoreMetaData::StoreType::STORE_OBJECT_END) {
308             continue;
309         }
310         if (storeMeta.user == userId) {
311             ZLOGI("user is same, not need to change, mate user:%{public}s::user:%{public}s.",
312                 storeMeta.user.c_str(), userId.c_str());
313             return OBJECT_SUCCESS;
314         }
315     }
316     ZLOGD("user is change, need to change");
317     int32_t result = Open();
318     if (result != OBJECT_SUCCESS) {
319         ZLOGE("Open failed, errCode = %{public}d", result);
320         return STORE_NOT_OPEN;
321     }
322     result = RevokeSaveToStore("");
323     Close();
324     return result;
325 }
326 
DeleteByAppId(const std::string & appId,int32_t user)327 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user)
328 {
329     int32_t result = Open();
330     if (result != OBJECT_SUCCESS) {
331         ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
332             appId.c_str(), user);
333         return STORE_NOT_OPEN;
334     }
335     result = RevokeSaveToStore(appId);
336     if (result != OBJECT_SUCCESS) {
337         ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
338             appId.c_str(), user);
339     }
340     Close();
341     std::string userId = std::to_string(user);
342     std::string metaKey = GetMetaUserIdKey(userId, appId);
343     auto status = DistributedData::MetaDataManager::GetInstance().DelMeta(metaKey, true);
344     if (!status) {
345         ZLOGE("Delete meta failed, userId: %{public}s, appId: %{public}s", userId.c_str(), appId.c_str());
346     }
347     return result;
348 }
349 
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)350 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
351                                                 pid_t pid, uint32_t tokenId,
352                                                 sptr<IRemoteObject> callback)
353 {
354     if (bundleName.empty() || sessionId.empty()) {
355         ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
356         return;
357     }
358     ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
359     auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
360     std::string prefix = bundleName + sessionId;
361     callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
362         if (value.pid != pid) {
363             value = CallbackInfo { pid };
364         }
365         value.observers_.insert_or_assign(prefix, proxy);
366         return !value.observers_.empty();
367     }));
368 }
369 
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)370 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
371                                                   const std::string &sessionId)
372 {
373     if (bundleName.empty()) {
374         ZLOGD("bundleName is empty");
375         return;
376     }
377     callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
378         if (value.pid != pid) {
379             return true;
380         }
381         if (sessionId.empty()) {
382             return false;
383         }
384         std::string prefix = bundleName + sessionId;
385         for (auto it = value.observers_.begin(); it != value.observers_.end();) {
386             if ((*it).first == prefix) {
387                 it = value.observers_.erase(it);
388             } else {
389                 ++it;
390             }
391         }
392         return true;
393     }));
394 }
395 
NotifyChange(ObjectRecord & changedData)396 void ObjectStoreManager::NotifyChange(ObjectRecord &changedData)
397 {
398     ZLOGI("OnChange start, size:%{public}zu", changedData.size());
399     bool hasAsset = false;
400     SaveInfo saveInfo;
401     for (const auto &[key, value] : changedData) {
402         if (key.find(SAVE_INFO) != std::string::npos) {
403             DistributedData::Serializable::Unmarshall(std::string(value.begin(), value.end()), saveInfo);
404             break;
405         }
406     }
407     auto data = GetObjectData(changedData, saveInfo, hasAsset);
408     if (!hasAsset) {
409         ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
410             ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
411         callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
412             DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
413             return false;
414         });
415         return;
416     }
417     NotifyDataChanged(data, saveInfo);
418     SaveUserToMeta();
419 }
420 
GetObjectData(const ObjectRecord & changedData,SaveInfo & saveInfo,bool & hasAsset)421 std::map<std::string, ObjectRecord> ObjectStoreManager::GetObjectData(const ObjectRecord& changedData,
422     SaveInfo& saveInfo, bool& hasAsset)
423 {
424     std::map<std::string, ObjectRecord> data;
425     std::string keyPrefix = saveInfo.ToPropertyPrefix();
426     if (!keyPrefix.empty()) {
427         std::string observerKey = saveInfo.bundleName + saveInfo.sessionId;
428         for (const auto &[key, value] : changedData) {
429             if (key.size() < keyPrefix.size() || key.find(SAVE_INFO) != std::string::npos) {
430                 continue;
431             }
432             std::string propertyName = key.substr(keyPrefix.size());
433             data[observerKey].insert_or_assign(propertyName, value);
434             if (!hasAsset && IsAssetKey(propertyName)) {
435                 hasAsset = true;
436             }
437         }
438     } else {
439         for (const auto &item : changedData) {
440             std::vector<std::string> splitKeys = SplitEntryKey(item.first);
441             if (splitKeys.size() <= PROPERTY_NAME_INDEX) {
442                 continue;
443             }
444             if (saveInfo.sourceDeviceId.empty() || saveInfo.bundleName.empty()) {
445                 saveInfo.sourceDeviceId = splitKeys[SOURCE_DEVICE_UDID_INDEX];
446                 saveInfo.bundleName = splitKeys[BUNDLE_NAME_INDEX];
447                 saveInfo.sessionId = splitKeys[SESSION_ID_INDEX];
448                 saveInfo.timestamp = splitKeys[TIME_INDEX];
449             }
450             std::string prefix = splitKeys[BUNDLE_NAME_INDEX] + splitKeys[SESSION_ID_INDEX];
451             std::string propertyName = splitKeys[PROPERTY_NAME_INDEX];
452             data[prefix].insert_or_assign(propertyName, item.second);
453             if (IsAssetKey(propertyName)) {
454                 hasAsset = true;
455             }
456         }
457     }
458     return data;
459 }
460 
ComputeStatus(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)461 void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
462     const std::map<std::string, ObjectRecord>& data)
463 {
464     restoreStatus_.Compute(objectKey, [this, &data, saveInfo] (const auto &key, auto &value) {
465         if (value == RestoreStatus::ASSETS_READY) {
466             value = RestoreStatus::ALL_READY;
467             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
468                 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS);
469             callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
470                 DoNotify(tokenId, value, data, true);
471                 return false;
472             });
473         } else {
474             value = RestoreStatus::DATA_READY;
475             ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
476                 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
477             callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
478                 DoNotify(tokenId, value, data, false);
479                 return false;
480             });
481             WaitAssets(key, saveInfo, data);
482         }
483         return true;
484     });
485 }
486 
NotifyDataChanged(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)487 void ObjectStoreManager::NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
488 {
489     for (auto const& [objectKey, results] : data) {
490         restoreStatus_.ComputeIfAbsent(
491             objectKey, [](const std::string& key) -> auto {
492             return RestoreStatus::NONE;
493         });
494         ComputeStatus(objectKey, saveInfo, data);
495     }
496 }
497 
WaitAssets(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)498 int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
499     const std::map<std::string, ObjectRecord>& data)
500 {
501     auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() {
502         ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str());
503         PullAssets(data, saveInfo);
504         DoNotifyWaitAssetTimeout(objectKey);
505     });
506 
507     objectTimer_.ComputeIfAbsent(
508         objectKey, [taskId](const std::string& key) -> auto {
509             return taskId;
510     });
511     return  OBJECT_SUCCESS;
512 }
513 
PullAssets(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)514 void ObjectStoreManager::PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
515 {
516     std::map<std::string, Assets> changedAssets;
517     for (auto const& [objectId, result] : data) {
518         changedAssets[objectId] = GetAssetsFromDBRecords(result);
519     }
520     for (const auto& [objectId, assets] : changedAssets) {
521         std::string networkId = DmAdaper::GetInstance().ToNetworkID(saveInfo.sourceDeviceId);
522         auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
523         ObjectAssetLoader::GetInstance()->TransferAssetsAsync(std::stoi(GetCurrentUser()),
524             saveInfo.bundleName, networkId, assets, [this, block](bool success) {
525                 block->SetValue({ false, success });
526         });
527         auto [timeout, success] = block->GetValue();
528         ZLOGI("Pull assets end, timeout: %{public}d, success: %{public}d, size:%{public}zu, deviceId: %{public}s",
529             timeout, success, assets.size(), DistributedData::Anonymous::Change(networkId).c_str());
530     }
531 }
532 
NotifyAssetsReady(const std::string & objectKey,const std::string & bundleName,const std::string & srcNetworkId)533 void ObjectStoreManager::NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName,
534     const std::string& srcNetworkId)
535 {
536     restoreStatus_.ComputeIfAbsent(
537         objectKey, [](const std::string& key) -> auto {
538         return RestoreStatus::NONE;
539     });
540     restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) {
541         if (value == RestoreStatus::DATA_NOTIFIED) {
542             value = RestoreStatus::ALL_READY;
543             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
544                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
545             callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) {
546                 DoNotifyAssetsReady(tokenId, value,  key, true);
547                 return false;
548             });
549         } else if (value == RestoreStatus::DATA_READY) {
550             value = RestoreStatus::ALL_READY;
551             ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
552                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
553             auto [has, taskId] = objectTimer_.Find(key);
554             if (has) {
555                 executors_->Remove(taskId);
556                 objectTimer_.Erase(key);
557             }
558         } else {
559             value = RestoreStatus::ASSETS_READY;
560             ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
561                 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, bundleName);
562         }
563         return true;
564     });
565 }
566 
NotifyAssetsStart(const std::string & objectKey,const std::string & srcNetworkId)567 void ObjectStoreManager::NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId)
568 {
569     restoreStatus_.ComputeIfAbsent(
570         objectKey, [](const std::string& key) -> auto {
571         return RestoreStatus::NONE;
572     });
573 }
574 
IsAssetKey(const std::string & key)575 bool ObjectStoreManager::IsAssetKey(const std::string& key)
576 {
577     return key.find(ObjectStore::ASSET_DOT) != std::string::npos;
578 }
579 
IsAssetComplete(const ObjectRecord & result,const std::string & assetPrefix)580 bool ObjectStoreManager::IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix)
581 {
582     if (result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
583         result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
584         result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() ||
585         result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() ||
586         result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
587         result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
588         return false;
589     }
590     return true;
591 }
592 
GetAssetsFromDBRecords(const ObjectRecord & result)593 Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result)
594 {
595     Assets assets{};
596     std::set<std::string> assetKey;
597     for (const auto& [key, value] : result) {
598         std::string assetPrefix = key.substr(0, key.find(ObjectStore::ASSET_DOT));
599         if (!IsAssetKey(key) || assetKey.find(assetPrefix) != assetKey.end() ||
600             result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
601             result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end()) {
602             continue;
603         }
604         Asset asset;
605         ObjectStore::StringUtils::BytesToStrWithType(
606             result.find(assetPrefix + ObjectStore::NAME_SUFFIX)->second, asset.name);
607         if (asset.name.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
608             asset.name = asset.name.substr(ObjectStore::STRING_PREFIX_LEN);
609         }
610         ObjectStore::StringUtils::BytesToStrWithType(
611             result.find(assetPrefix + ObjectStore::URI_SUFFIX)->second, asset.uri);
612         if (asset.uri.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
613             asset.uri = asset.uri.substr(ObjectStore::STRING_PREFIX_LEN);
614         }
615         ObjectStore::StringUtils::BytesToStrWithType(
616             result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX)->second, asset.modifyTime);
617         if (asset.modifyTime.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
618             asset.modifyTime = asset.modifyTime.substr(ObjectStore::STRING_PREFIX_LEN);
619         }
620         ObjectStore::StringUtils::BytesToStrWithType(
621             result.find(assetPrefix + ObjectStore::SIZE_SUFFIX)->second, asset.size);
622         if (asset.size.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
623             asset.size = asset.size.substr(ObjectStore::STRING_PREFIX_LEN);
624         }
625         asset.hash = asset.modifyTime + "_" + asset.size;
626         assets.push_back(asset);
627         assetKey.insert(assetPrefix);
628     }
629     return assets;
630 }
631 
DoNotify(uint32_t tokenId,const CallbackInfo & value,const std::map<std::string,ObjectRecord> & data,bool allReady)632 void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value,
633     const std::map<std::string, ObjectRecord>& data, bool allReady)
634 {
635     for (const auto& observer : value.observers_) {
636         auto it = data.find(observer.first);
637         if (it == data.end()) {
638             continue;
639         }
640         observer.second->Completed((*it).second, allReady);
641         if (allReady) {
642             restoreStatus_.Erase(observer.first);
643             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
644                 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
645         } else {
646             restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) {
647                 value = RestoreStatus::DATA_NOTIFIED;
648                 return true;
649             });
650         }
651     }
652 }
653 
DoNotifyAssetsReady(uint32_t tokenId,const CallbackInfo & value,const std::string & objectKey,bool allReady)654 void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value,
655     const std::string& objectKey, bool allReady)
656 {
657     for (const auto& observer : value.observers_) {
658         if (objectKey != observer.first) {
659             continue;
660         }
661         observer.second->Completed(ObjectRecord(), allReady);
662         if (allReady) {
663             restoreStatus_.Erase(objectKey);
664             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
665                 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
666         }
667         auto [has, taskId] = objectTimer_.Find(objectKey);
668         if (has) {
669             executors_->Remove(taskId);
670             objectTimer_.Erase(objectKey);
671         }
672     }
673 }
674 
DoNotifyWaitAssetTimeout(const std::string & objectKey)675 void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey)
676 {
677     ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
678         ObjectStore::ASSETS_RECV, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT);
679     callbacks_.ForEach([this, &objectKey](uint32_t tokenId, const CallbackInfo &value) {
680         for (const auto& observer : value.observers_) {
681             if (objectKey != observer.first) {
682                 continue;
683             }
684             observer.second->Completed(ObjectRecord(), true);
685             restoreStatus_.Erase(objectKey);
686             auto [has, taskId] = objectTimer_.Find(objectKey);
687             if (has) {
688                 executors_->Remove(taskId);
689                 objectTimer_.Erase(objectKey);
690             }
691             ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
692                 ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED);
693         }
694         return false;
695     });
696 }
697 
SetData(const std::string & dataDir,const std::string & userId)698 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
699 {
700     ZLOGI("enter %{public}s", dataDir.c_str());
701     kvStoreDelegateManager_ =
702         new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
703     DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
704     kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
705     userId_ = userId;
706 }
707 
Open()708 int32_t ObjectStoreManager::Open()
709 {
710     if (kvStoreDelegateManager_ == nullptr) {
711         ZLOGE("Kvstore delegate manager not init");
712         return OBJECT_INNER_ERROR;
713     }
714     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
715     if (delegate_ == nullptr) {
716         delegate_ = OpenObjectKvStore();
717         if (delegate_ == nullptr) {
718             ZLOGE("Open object kvstore failed");
719             return OBJECT_DBSTATUS_ERROR;
720         }
721         syncCount_ = 1;
722         ZLOGI("Open object kvstore success");
723     } else {
724         syncCount_++;
725         ZLOGI("Object kvstore syncCount: %{public}d", syncCount_);
726     }
727     return OBJECT_SUCCESS;
728 }
729 
Close()730 void ObjectStoreManager::Close()
731 {
732     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
733     if (delegate_ == nullptr) {
734         return;
735     }
736     int32_t taskCount = delegate_->GetTaskCount();
737     if (taskCount > 0 && syncCount_ == 1) {
738         CloseAfterMinute();
739         ZLOGW("Store is busy, close after a minute, task count: %{public}d", taskCount);
740         return;
741     }
742     syncCount_--;
743     ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
744     FlushClosedStore();
745 }
746 
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)747 void ObjectStoreManager::SyncCompleted(
748     const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
749 {
750     std::string userId;
751     SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
752     if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
753         std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
754         SetSyncStatus(false);
755         FlushClosedStore();
756     }
757     for (const auto &item : results) {
758         if (item.second == DistributedDB::DBStatus::OK) {
759             ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
760             ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE,
761                 ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
762         } else {
763             ZLOGE("Sync data failed, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", sequenceId, item.second);
764             ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
765                 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, item.second, ObjectStore::FINISHED);
766         }
767     }
768 }
769 
FlushClosedStore()770 void ObjectStoreManager::FlushClosedStore()
771 {
772     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
773     if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
774         ZLOGD("close store");
775         auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
776         if (status != DistributedDB::DBStatus::OK) {
777             int timeOut = 1000;
778             executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
779                 FlushClosedStore();
780             });
781             ZLOGE("GetEntries fail %{public}d", status);
782             return;
783         }
784         delegate_ = nullptr;
785         if (objectDataListener_ != nullptr) {
786             delete objectDataListener_;
787             objectDataListener_ = nullptr;
788         }
789     }
790 }
791 
ProcessOldEntry(const std::string & appId)792 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
793 {
794     std::vector<DistributedDB::Entry> entries;
795     auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
796     if (status == DistributedDB::DBStatus::NOT_FOUND) {
797         ZLOGI("Get old entries empty, bundleName: %{public}s", appId.c_str());
798         return;
799     }
800     if (status != DistributedDB::DBStatus::OK) {
801         ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status);
802         return;
803     }
804     std::map<std::string, int64_t> sessionIds;
805     int64_t oldestTime = 0;
806     std::string deleteKey;
807     for (auto &item : entries) {
808         std::string key(item.key.begin(), item.key.end());
809         std::vector<std::string> splitKeys = SplitEntryKey(key);
810         if (splitKeys.empty()) {
811             continue;
812         }
813         std::string bundleName = splitKeys[BUNDLE_NAME_INDEX];
814         std::string sessionId = splitKeys[SESSION_ID_INDEX];
815         if (sessionIds.count(sessionId) == 0) {
816             char *end = nullptr;
817             sessionIds[sessionId] = strtol(splitKeys[TIME_INDEX].c_str(), &end, DECIMAL_BASE);
818         }
819         if (oldestTime == 0 || oldestTime > sessionIds[sessionId]) {
820             oldestTime = sessionIds[sessionId];
821             deleteKey = GetPrefixWithoutDeviceId(bundleName, sessionId);
822         }
823     }
824     if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
825         return;
826     }
827     int32_t result = RevokeSaveToStore(deleteKey);
828     if (result != OBJECT_SUCCESS) {
829         ZLOGE("Delete old entries failed, deleteKey: %{public}s, status: %{public}d", deleteKey.c_str(), result);
830         return;
831     }
832     ZLOGI("Delete old entries success, deleteKey: %{public}s", deleteKey.c_str());
833 }
834 
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const ObjectRecord & data)835 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
836     const std::string &toDeviceId, const ObjectRecord &data)
837 {
838     ProcessOldEntry(appId);
839     RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
840     std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
841     std::string prefix = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR;
842     DistributedDB::Entry saveInfoEntry;
843     std::string saveInfoKey = prefix + SAVE_INFO;
844     saveInfoEntry.key = std::vector<uint8_t>(saveInfoKey.begin(), saveInfoKey.end());
845     SaveInfo saveInfo(appId, sessionId, DmAdaper::GetInstance().GetLocalDevice().udid, toDeviceId, timestamp);
846     std::string saveInfoValue = DistributedData::Serializable::Marshall(saveInfo);
847     saveInfoEntry.value = std::vector<uint8_t>(saveInfoValue.begin(), saveInfoValue.end());
848     std::vector<DistributedDB::Entry> entries;
849     entries.emplace_back(saveInfoEntry);
850     for (auto &item : data) {
851         DistributedDB::Entry entry;
852         std::string key = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
853         entry.key = std::vector<uint8_t>(key.begin(), key.end());
854         entry.value = item.second;
855         entries.emplace_back(entry);
856     }
857     auto status = delegate_->PutBatch(entries);
858     if (status != DistributedDB::DBStatus::OK) {
859         ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
860             "status: %{public}d", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), status);
861         return status;
862     }
863     ZLOGI("PutBatch success, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
864         "count: %{public}zu", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), entries.size());
865     return OBJECT_SUCCESS;
866 }
867 
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)868 int32_t ObjectStoreManager::SyncOnStore(
869     const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
870 {
871     std::vector<std::string> syncDevices;
872     for (auto &device : deviceList) {
873         if (device == LOCAL_DEVICE) {
874             ZLOGI("Save to local, do not need sync, prefix: %{public}s", prefix.c_str());
875             callback({{LOCAL_DEVICE, OBJECT_SUCCESS}});
876             return OBJECT_SUCCESS;
877         }
878         syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
879     }
880     if (syncDevices.empty()) {
881         ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
882         callback(std::map<std::string, int32_t>());
883         return OBJECT_SUCCESS;
884     }
885     uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
886     DistributedDB::Query dbQuery = DistributedDB::Query::Select();
887     dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
888     ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
889     auto status = delegate_->Sync(syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
890         [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
891             ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
892             std::map<std::string, DistributedDB::DBStatus> result;
893             for (auto &item : devicesMap) {
894                 result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
895             }
896             SyncCompleted(result, sequenceId);
897         }, dbQuery, false);
898     if (status != DistributedDB::DBStatus::OK) {
899         ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d",
900             Anonymous::Change(prefix).c_str(), sequenceId, status);
901         std::string tmp;
902         SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
903         return status;
904     }
905     SetSyncStatus(true);
906     return OBJECT_SUCCESS;
907 }
908 
SetSyncStatus(bool status)909 int32_t ObjectStoreManager::SetSyncStatus(bool status)
910 {
911     std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
912     isSyncing_ = status;
913     return OBJECT_SUCCESS;
914 }
915 
RevokeSaveToStore(const std::string & prefix)916 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
917 {
918     std::vector<DistributedDB::Entry> entries;
919     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
920     if (status == DistributedDB::DBStatus::NOT_FOUND) {
921         ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
922         return OBJECT_SUCCESS;
923     }
924     if (status != DistributedDB::DBStatus::OK) {
925         ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
926         return DB_ERROR;
927     }
928     std::vector<std::vector<uint8_t>> keys;
929     std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) {
930         keys.emplace_back(entry.key);
931     });
932     if (keys.empty()) {
933         return OBJECT_SUCCESS;
934     }
935     status = delegate_->DeleteBatch(keys);
936     if (status != DistributedDB::DBStatus::OK) {
937         ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(),
938             status);
939         return DB_ERROR;
940     }
941     ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(),
942         keys.size());
943     return OBJECT_SUCCESS;
944 }
945 
RetrieveFromStore(const std::string & appId,const std::string & sessionId,ObjectRecord & results)946 int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const std::string &sessionId,
947     ObjectRecord &results)
948 {
949     std::vector<DistributedDB::Entry> entries;
950     std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
951     auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
952     if (status == DistributedDB::DBStatus::NOT_FOUND) {
953         ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
954         return KEY_NOT_FOUND;
955     }
956     if (status != DistributedDB::DBStatus::OK) {
957         ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
958         return DB_ERROR;
959     }
960     ZLOGI("Get entries success, prefix: %{public}s, count: %{public}zu", prefix.c_str(), entries.size());
961     for (const auto &entry : entries) {
962         std::string key(entry.key.begin(), entry.key.end());
963         if (key.find(SAVE_INFO) != std::string::npos) {
964             continue;
965         }
966         auto splitKeys = SplitEntryKey(key);
967         if (!splitKeys.empty()) {
968             results[splitKeys[PROPERTY_NAME_INDEX]] = entry.value;
969         }
970     }
971     return OBJECT_SUCCESS;
972 }
973 
SaveInfo(const std::string & bundleName,const std::string & sessionId,const std::string & sourceDeviceId,const std::string & targetDeviceId,const std::string & timestamp)974 ObjectStoreManager::SaveInfo::SaveInfo(const std::string &bundleName, const std::string &sessionId,
975     const std::string &sourceDeviceId, const std::string &targetDeviceId, const std::string &timestamp)
976     : bundleName(bundleName), sessionId(sessionId), sourceDeviceId(sourceDeviceId), targetDeviceId(targetDeviceId),
977     timestamp(timestamp) {}
978 
Marshal(json & node) const979 bool ObjectStoreManager::SaveInfo::Marshal(json &node) const
980 {
981     SetValue(node[GET_NAME(bundleName)], bundleName);
982     SetValue(node[GET_NAME(sessionId)], sessionId);
983     SetValue(node[GET_NAME(sourceDeviceId)], sourceDeviceId);
984     SetValue(node[GET_NAME(targetDeviceId)], targetDeviceId);
985     SetValue(node[GET_NAME(timestamp)], timestamp);
986     return true;
987 }
988 
Unmarshal(const json & node)989 bool ObjectStoreManager::SaveInfo::Unmarshal(const json &node)
990 {
991     GetValue(node, GET_NAME(bundleName), bundleName);
992     GetValue(node, GET_NAME(sessionId), sessionId);
993     GetValue(node, GET_NAME(sourceDeviceId), sourceDeviceId);
994     GetValue(node, GET_NAME(targetDeviceId), targetDeviceId);
995     GetValue(node, GET_NAME(timestamp), timestamp);
996     return true;
997 }
998 
ToPropertyPrefix()999 std::string ObjectStoreManager::SaveInfo::ToPropertyPrefix()
1000 {
1001     if (bundleName.empty() || sessionId.empty() || sourceDeviceId.empty() || targetDeviceId.empty() ||
1002         timestamp.empty()) {
1003         return "";
1004     }
1005     return bundleName + SEPERATOR + sessionId + SEPERATOR + sourceDeviceId + SEPERATOR + targetDeviceId + SEPERATOR +
1006         timestamp + SEPERATOR;
1007 }
1008 
SplitEntryKey(const std::string & key)1009 std::vector<std::string> ObjectStoreManager::SplitEntryKey(const std::string &key)
1010 {
1011     std::smatch match;
1012     std::regex timeRegex(TIME_REGEX);
1013     if (!std::regex_search(key, match, timeRegex)) {
1014         ZLOGW("Format error, key.size = %{public}zu", key.size());
1015         return {};
1016     }
1017     auto timePos = match.position();
1018     std::string fromTime = key.substr(timePos + 1);
1019     std::string beforeTime = key.substr(0, timePos);
1020 
1021     size_t targetDevicePos = beforeTime.find_last_of(SEPERATOR);
1022     if (targetDevicePos == std::string::npos) {
1023         ZLOGW("Format error, key.size = %{public}zu", key.size());
1024         return {};
1025     }
1026     std::string targetDevice = beforeTime.substr(targetDevicePos + 1);
1027     std::string beforeTargetDevice = beforeTime.substr(0, targetDevicePos);
1028 
1029     size_t sourceDeviceUdidPos = beforeTargetDevice.find_last_of(SEPERATOR);
1030     if (sourceDeviceUdidPos == std::string::npos) {
1031         ZLOGW("Format error, key.size = %{public}zu", key.size());
1032         return {};
1033     }
1034     std::string sourceDeviceUdid = beforeTargetDevice.substr(sourceDeviceUdidPos + 1);
1035     std::string beforeSourceDeviceUdid = beforeTargetDevice.substr(0, sourceDeviceUdidPos);
1036 
1037     size_t sessionIdPos = beforeSourceDeviceUdid.find_last_of(SEPERATOR);
1038     if (sessionIdPos == std::string::npos) {
1039         ZLOGW("Format error, key.size = %{public}zu", key.size());
1040         return {};
1041     }
1042     std::string sessionId = beforeSourceDeviceUdid.substr(sessionIdPos + 1);
1043     std::string bundleName = beforeSourceDeviceUdid.substr(0, sessionIdPos);
1044 
1045     size_t propertyNamePos = fromTime.find_first_of(SEPERATOR);
1046     if (propertyNamePos == std::string::npos) {
1047         ZLOGW("Format error, key.size = %{public}zu", key.size());
1048         return {};
1049     }
1050     std::string propertyName = fromTime.substr(propertyNamePos + 1);
1051     std::string time = fromTime.substr(0, propertyNamePos);
1052 
1053     return { bundleName, sessionId, sourceDeviceUdid, targetDevice, time, propertyName };
1054 }
1055 
GetCurrentUser()1056 std::string ObjectStoreManager::GetCurrentUser()
1057 {
1058     std::vector<int> users;
1059     AccountDelegate::GetInstance()->QueryUsers(users);
1060     if (users.empty()) {
1061         return "";
1062     }
1063     return std::to_string(users[0]);
1064 }
1065 
SaveUserToMeta()1066 void ObjectStoreManager::SaveUserToMeta()
1067 {
1068     ZLOGD("start.");
1069     std::string userId = GetCurrentUser();
1070     if (userId.empty()) {
1071         return;
1072     }
1073     std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
1074     StoreMetaData userMeta;
1075     userMeta.storeId = DistributedObject::ObjectCommon::OBJECTSTORE_DB_STOREID;
1076     userMeta.user = userId;
1077     userMeta.storeType = ObjectDistributedType::OBJECT_SINGLE_VERSION;
1078     std::string userMetaKey = GetMetaUserIdKey(userId, appId);
1079     auto saved = DistributedData::MetaDataManager::GetInstance().SaveMeta(userMetaKey, userMeta, true);
1080     if (!saved) {
1081         ZLOGE("userMeta save failed");
1082     }
1083 }
1084 
CloseAfterMinute()1085 void ObjectStoreManager::CloseAfterMinute()
1086 {
1087     executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&ObjectStoreManager::Close, this));
1088 }
1089 
SetThreadPool(std::shared_ptr<ExecutorPool> executors)1090 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
1091 {
1092     executors_ = executors;
1093 }
1094 
AddNotifier(const std::string & userId,SyncCallBack & callback)1095 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
1096 {
1097     std::lock_guard<std::mutex> lock(notifierLock_);
1098     uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
1099     userIdSeqIdRelations_[userId].emplace_back(sequenceId);
1100     seqIdCallbackRelations_[sequenceId] = callback;
1101     return sequenceId;
1102 }
1103 
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)1104 SequenceSyncManager::Result SequenceSyncManager::Process(
1105     uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
1106 {
1107     std::lock_guard<std::mutex> lock(notifierLock_);
1108     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1109         ZLOGE("not exist");
1110         return ERR_SID_NOT_EXIST;
1111     }
1112     std::map<std::string, int32_t> syncResults;
1113     for (auto &item : results) {
1114         syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
1115     }
1116     seqIdCallbackRelations_[sequenceId](syncResults);
1117     ZLOGD("end complete");
1118     return DeleteNotifierNoLock(sequenceId, userId);
1119 }
1120 
DeleteNotifier(uint64_t sequenceId,std::string & userId)1121 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
1122 {
1123     std::lock_guard<std::mutex> lock(notifierLock_);
1124     if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1125         ZLOGE("not exist");
1126         return ERR_SID_NOT_EXIST;
1127     }
1128     return DeleteNotifierNoLock(sequenceId, userId);
1129 }
1130 
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)1131 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
1132 {
1133     seqIdCallbackRelations_.erase(sequenceId);
1134     auto userIdIter = userIdSeqIdRelations_.begin();
1135     while (userIdIter != userIdSeqIdRelations_.end()) {
1136         auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
1137         if (sIdIter != userIdIter->second.end()) {
1138             userIdIter->second.erase(sIdIter);
1139             if (userIdIter->second.empty()) {
1140                 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
1141                 userId = userIdIter->first;
1142                 userIdSeqIdRelations_.erase(userIdIter);
1143                 return SUCCESS_USER_HAS_FINISHED;
1144             } else {
1145                 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
1146                 return SUCCESS_USER_IN_USE;
1147             }
1148         }
1149         userIdIter++;
1150     }
1151     return SUCCESS_USER_HAS_FINISHED;
1152 }
1153 
BindAsset(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,ObjectStore::Asset & asset,ObjectStore::AssetBindInfo & bindInfo)1154 int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
1155     ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo)
1156 {
1157     auto snapshotKey = appId + SEPERATOR + sessionId;
1158     snapshots_.ComputeIfAbsent(
1159         snapshotKey, [](const std::string& key) -> auto {
1160             return std::make_shared<ObjectSnapshot>();
1161         });
1162     auto storeKey = appId + SEPERATOR + bindInfo.storeName;
1163     bindSnapshots_.ComputeIfAbsent(
1164         storeKey, [](const std::string& key) -> auto {
1165             return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1166         });
1167     auto snapshots = snapshots_.Find(snapshotKey).second;
1168     bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) {
1169         value->emplace(std::pair{asset.uri, snapshots});
1170         return true;
1171     });
1172 
1173     HapTokenInfo tokenInfo;
1174     auto status = AccessTokenKit::GetHapTokenInfo(tokenId, tokenInfo);
1175     if (status != RET_SUCCESS) {
1176         ZLOGE("token:0x%{public}x, result:%{public}d, bundleName:%{public}s", tokenId, status, appId.c_str());
1177         return GeneralError::E_ERROR;
1178     }
1179     StoreInfo storeInfo;
1180     storeInfo.bundleName = appId;
1181     storeInfo.tokenId = tokenId;
1182     storeInfo.instanceId = tokenInfo.instIndex;
1183     storeInfo.user = tokenInfo.userID;
1184     storeInfo.storeName = bindInfo.storeName;
1185 
1186     snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) {
1187         value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo);
1188         return true;
1189     });
1190     return OBJECT_SUCCESS;
1191 }
1192 
ConvertBindInfo(ObjectStore::AssetBindInfo & bindInfo)1193 DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo)
1194 {
1195     return DistributedData::AssetBindInfo{
1196         .storeName = std::move(bindInfo.storeName),
1197         .tableName = std::move(bindInfo.tableName),
1198         .primaryKey = ValueProxy::Convert(std::move(bindInfo.primaryKey)),
1199         .field = std::move(bindInfo.field),
1200         .assetName = std::move(bindInfo.assetName),
1201     };
1202 }
1203 
OnAssetChanged(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,const std::string & deviceId,const ObjectStore::Asset & asset)1204 int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId,
1205     const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset)
1206 {
1207     const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
1208     auto objectAsset = asset;
1209     Asset dataAsset =  ValueProxy::Convert(std::move(objectAsset));
1210     auto snapshotKey = appId + SEPERATOR + sessionId;
1211     int32_t res = OBJECT_SUCCESS;
1212     bool exist = snapshots_.ComputeIfPresent(snapshotKey,
1213         [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr<Snapshot> snapshot) {
1214             if (snapshot != nullptr) {
1215                 res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange
1216             }
1217             return snapshot != nullptr;
1218         });
1219     if (exist) {
1220         return res;
1221     }
1222 
1223     auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
1224     ObjectAssetLoader::GetInstance()->TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) {
1225         block->SetValue({ false, ret });
1226     });
1227     auto [timeout, success] = block->GetValue();
1228     if (timeout || !success) {
1229         ZLOGE("transfer failed, timeout: %{public}d, success: %{public}d, name: %{public}s, deviceId: %{public}s ",
1230             timeout, success, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
1231         return OBJECT_INNER_ERROR;
1232     }
1233     return OBJECT_SUCCESS;
1234 }
1235 
GetSnapShots(const std::string & bundleName,const std::string & storeName)1236 ObjectStoreManager::UriToSnapshot ObjectStoreManager::GetSnapShots(const std::string& bundleName,
1237     const std::string& storeName)
1238 {
1239     auto storeKey = bundleName + SEPERATOR + storeName;
1240     bindSnapshots_.ComputeIfAbsent(
1241         storeKey, [](const std::string& key) -> auto {
1242             return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1243         });
1244     return bindSnapshots_.Find(storeKey).second;
1245 }
1246 
DeleteSnapshot(const std::string & bundleName,const std::string & sessionId)1247 void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std::string& sessionId)
1248 {
1249     auto snapshotKey = bundleName + SEPERATOR + sessionId;
1250     auto snapshot = snapshots_.Find(snapshotKey).second;
1251     if (snapshot == nullptr) {
1252         ZLOGD("Not find snapshot, don't need delete");
1253         return;
1254     }
1255     bindSnapshots_.ForEach([snapshot](auto& key, auto& value) {
1256         for (auto pos = value->begin(); pos != value->end();) {
1257             if (pos->second == snapshot) {
1258                 pos = value->erase(pos);
1259             } else {
1260                 ++pos;
1261             }
1262         }
1263         return true;
1264     });
1265     snapshots_.Erase(snapshotKey);
1266 }
1267 } // namespace DistributedObject
1268 } // namespace OHOS
1269