1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ObjectStoreManager"
16
17 #include "object_manager.h"
18
19 #include <regex>
20
21 #include "accesstoken_kit.h"
22 #include "account/account_delegate.h"
23 #include "block_data.h"
24 #include "bootstrap.h"
25 #include "common/bytes.h"
26 #include "common/string_utils.h"
27 #include "datetime_ex.h"
28 #include "distributed_file_daemon_manager.h"
29 #include "device_matrix.h"
30 #include "ipc_skeleton.h"
31 #include "metadata/capability_meta_data.h"
32 #include "kvstore_utils.h"
33 #include "log_print.h"
34 #include "metadata/meta_data_manager.h"
35 #include "metadata/object_user_meta_data.h"
36 #include "metadata/store_meta_data.h"
37 #include "object_dms_handler.h"
38 #include "object_radar_reporter.h"
39 #include "utils/anonymous.h"
40
41 namespace OHOS {
42 namespace DistributedObject {
43 using namespace OHOS::DistributedKv;
44 using namespace Security::AccessToken;
45 using StoreMetaData = OHOS::DistributedData::StoreMetaData;
46 using Account = OHOS::DistributedData::AccountDelegate;
47 using AccessTokenKit = Security::AccessToken::AccessTokenKit;
48 using ValueProxy = OHOS::DistributedData::ValueProxy;
49 using DistributedFileDaemonManager = Storage::DistributedFile::DistributedFileDaemonManager;
50 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
51
52 constexpr const char *SAVE_INFO = "p_###SAVEINFO###";
53 constexpr int32_t PROGRESS_MAX = 100;
54 constexpr int32_t PROGRESS_INVALID = -1;
55 constexpr uint32_t LOCK_TIMEOUT = 3600; // second
ObjectStoreManager()56 ObjectStoreManager::ObjectStoreManager()
57 {
58 ZLOGI("ObjectStoreManager construct");
59 RegisterAssetsLister();
60 }
61
~ObjectStoreManager()62 ObjectStoreManager::~ObjectStoreManager()
63 {
64 }
65
OpenObjectKvStore()66 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
67 {
68 DistributedDB::KvStoreNbDelegate *store = nullptr;
69 DistributedDB::KvStoreNbDelegate::Option option;
70 option.createDirByStoreIdOnly = true;
71 option.syncDualTupleMode = true;
72 option.secOption = { DistributedDB::S1, DistributedDB::ECE };
73 ZLOGD("start GetKvStore");
74 kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
75 [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
76 if (dbStatus != DistributedDB::DBStatus::OK) {
77 ZLOGE("GetKvStore fail %{public}d", dbStatus);
78 return;
79 }
80 if (kvStoreNbDelegate == nullptr) {
81 ZLOGE("GetKvStore kvStoreNbDelegate is nullptr");
82 return;
83 }
84 ZLOGI("GetKvStore successsfully");
85 store = kvStoreNbDelegate;
86 std::vector<uint8_t> tmpKey;
87 DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
88 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
89 &objectDataListener_);
90 if (status != DistributedDB::DBStatus::OK) {
91 ZLOGE("RegisterObserver err %{public}d", status);
92 }
93 });
94 return store;
95 }
96
RegisterAssetsLister()97 bool ObjectStoreManager::RegisterAssetsLister()
98 {
99 if (objectAssetsSendListener_ == nullptr) {
100 objectAssetsSendListener_ = new ObjectAssetsSendListener();
101 }
102 if (objectAssetsRecvListener_ == nullptr) {
103 objectAssetsRecvListener_ = new ObjectAssetsRecvListener();
104 }
105 auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_);
106 if (status != DistributedDB::DBStatus::OK) {
107 ZLOGE("Register assetsRecvListener err %{public}d", status);
108 return false;
109 }
110 return true;
111 }
112
UnRegisterAssetsLister()113 bool ObjectStoreManager::UnRegisterAssetsLister()
114 {
115 ZLOGI("ObjectStoreManager UnRegisterAssetsLister");
116 if (objectAssetsRecvListener_ != nullptr) {
117 auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_);
118 if (status != DistributedDB::DBStatus::OK) {
119 ZLOGE("UnRegister assetsRecvListener err %{public}d", status);
120 return false;
121 }
122 }
123 return true;
124 }
125
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)126 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
127 const std::string &sessionId, const std::string &deviceId)
128 {
129 if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
130 return;
131 }
132 int32_t result = Open();
133 if (result != OBJECT_SUCCESS) {
134 ZLOGE("Open failed, errCode = %{public}d", result);
135 return;
136 }
137 // delete local data
138 result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
139 if (result != OBJECT_SUCCESS) {
140 ZLOGE("Save failed, status = %{public}d", result);
141 }
142 Close();
143 return;
144 }
145
Save(const std::string & appId,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId,sptr<IRemoteObject> callback)146 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
147 const ObjectRecord &data, const std::string &deviceId, sptr<IRemoteObject> callback)
148 {
149 auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
150 if (proxy == nullptr) {
151 ZLOGE("proxy is nullptr, callback is %{public}s.", (callback == nullptr) ? "nullptr" : "not null");
152 return INVALID_ARGUMENT;
153 }
154 if (deviceId.size() == 0) {
155 ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(),
156 Anonymous::Change(sessionId).c_str());
157 proxy->Completed(std::map<std::string, int32_t>());
158 return INVALID_ARGUMENT;
159 }
160 int32_t result = Open();
161 if (result != OBJECT_SUCCESS) {
162 ZLOGE("Open object kvstore failed, result: %{public}d", result);
163 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
164 ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED);
165 proxy->Completed(std::map<std::string, int32_t>());
166 return STORE_NOT_OPEN;
167 }
168 SaveUserToMeta();
169 std::string dstBundleName = ObjectDmsHandler::GetInstance().GetDstBundleName(appId, deviceId);
170 result = SaveToStore(dstBundleName, sessionId, deviceId, data);
171 if (result != OBJECT_SUCCESS) {
172 ZLOGE("Save to store failed, result: %{public}d", result);
173 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
174 ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
175 Close();
176 proxy->Completed(std::map<std::string, int32_t>());
177 return result;
178 }
179 ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(),
180 Anonymous::Change(sessionId).c_str(), Anonymous::Change(deviceId).c_str());
181 SyncCallBack syncCallback =
182 [proxy, dstBundleName, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
183 ProcessSyncCallback(results, dstBundleName, sessionId, deviceId);
184 proxy->Completed(results);
185 };
186 result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), { deviceId }, syncCallback);
187 if (result != OBJECT_SUCCESS) {
188 ZLOGE("Sync data failed, result: %{public}d", result);
189 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
190 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
191 Close();
192 proxy->Completed(std::map<std::string, int32_t>());
193 return result;
194 }
195 Close();
196 return PushAssets(appId, dstBundleName, sessionId, data, deviceId);
197 }
198
PushAssets(const std::string & srcBundleName,const std::string & dstBundleName,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId)199 int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const std::string &dstBundleName,
200 const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId)
201 {
202 Assets assets = GetAssetsFromDBRecords(data);
203 if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) {
204 return OBJECT_SUCCESS;
205 }
206 sptr<AssetObj> assetObj = new (std::nothrow) AssetObj();
207 if (assetObj == nullptr) {
208 ZLOGE("New AssetObj failed");
209 return OBJECT_INNER_ERROR;
210 }
211 assetObj->dstBundleName_ = dstBundleName;
212 assetObj->srcBundleName_ = srcBundleName;
213 assetObj->dstNetworkId_ = deviceId;
214 assetObj->sessionId_ = sessionId;
215 for (const auto& asset : assets) {
216 assetObj->uris_.push_back(asset.uri);
217 }
218 if (objectAssetsSendListener_ == nullptr) {
219 objectAssetsSendListener_ = new ObjectAssetsSendListener();
220 }
221 int userId = std::atoi(GetCurrentUser().c_str());
222 auto status = ObjectAssetLoader::GetInstance().PushAsset(userId, assetObj, objectAssetsSendListener_);
223 return status;
224 }
225
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)226 int32_t ObjectStoreManager::RevokeSave(
227 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
228 {
229 auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
230 if (proxy == nullptr) {
231 ZLOGE("proxy is nullptr, callback is %{public}s, appId: %{public}s, sessionId: %{public}s.",
232 (callback == nullptr) ? "nullptr" : "not null", appId.c_str(), Anonymous::Change(sessionId).c_str());
233 return INVALID_ARGUMENT;
234 }
235 int32_t result = Open();
236 if (result != OBJECT_SUCCESS) {
237 ZLOGE("Open failed, errCode = %{public}d", result);
238 proxy->Completed(STORE_NOT_OPEN);
239 return STORE_NOT_OPEN;
240 }
241
242 result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
243 if (result != OBJECT_SUCCESS) {
244 ZLOGE("RevokeSave failed, errCode = %{public}d", result);
245 Close();
246 proxy->Completed(result);
247 return result;
248 }
249 std::vector<std::string> deviceList;
250 auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
251 std::for_each(deviceInfos.begin(), deviceInfos.end(),
252 [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
253 if (!deviceList.empty()) {
254 SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
255 ZLOGI("revoke save finished");
256 proxy->Completed(OBJECT_SUCCESS);
257 };
258 result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
259 if (result != OBJECT_SUCCESS) {
260 ZLOGE("sync failed, errCode = %{public}d", result);
261 proxy->Completed(result);
262 }
263 } else {
264 proxy->Completed(OBJECT_SUCCESS);
265 };
266 Close();
267 return result;
268 }
269
Retrieve(const std::string & bundleName,const std::string & sessionId,sptr<IRemoteObject> callback,uint32_t tokenId)270 int32_t ObjectStoreManager::Retrieve(
271 const std::string &bundleName, const std::string &sessionId, sptr<IRemoteObject> callback, uint32_t tokenId)
272 {
273 auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
274 if (proxy == nullptr) {
275 ZLOGE("proxy is nullptr, callback is %{public}s.", (callback == nullptr) ? "nullptr" : "not null");
276 return INVALID_ARGUMENT;
277 }
278 int32_t result = Open();
279 if (result != OBJECT_SUCCESS) {
280 ZLOGE("Open object kvstore failed, result: %{public}d", result);
281 proxy->Completed(ObjectRecord(), false);
282 return ObjectStore::GETKV_FAILED;
283 }
284 ObjectRecord results{};
285 int32_t status = RetrieveFromStore(bundleName, sessionId, results);
286 if (status != OBJECT_SUCCESS) {
287 ZLOGE("Retrieve from store failed, status: %{public}d, close after one minute.", status);
288 CloseAfterMinute();
289 proxy->Completed(ObjectRecord(), false);
290 return status;
291 }
292 bool allReady = false;
293 Assets assets = GetAssetsFromDBRecords(results);
294 if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) {
295 allReady = true;
296 } else {
297 restoreStatus_.ComputeIfPresent(bundleName + sessionId, [&allReady](const auto &key, auto &value) {
298 if (value == RestoreStatus::ALL_READY) {
299 allReady = true;
300 return false;
301 }
302 if (value == RestoreStatus::DATA_READY) {
303 value = RestoreStatus::DATA_NOTIFIED;
304 }
305 return true;
306 });
307 }
308 status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId));
309 Close();
310 if (status != OBJECT_SUCCESS) {
311 ZLOGE("Revoke save failed, status: %{public}d", status);
312 proxy->Completed(ObjectRecord(), false);
313 return status;
314 }
315 proxy->Completed(results, allReady);
316 if (allReady) {
317 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
318 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
319 }
320 return status;
321 }
322
Clear()323 int32_t ObjectStoreManager::Clear()
324 {
325 ZLOGI("enter");
326 DistributedData::ObjectUserMetaData userMeta;
327 if (!DistributedData::MetaDataManager::GetInstance().LoadMeta(userMeta.GetKey(), userMeta, true)) {
328 ZLOGE("load meta error");
329 return OBJECT_INNER_ERROR;
330 }
331 if (userMeta.userId.empty()) {
332 ZLOGI("no object user meta, don't need clean");
333 return OBJECT_SUCCESS;
334 }
335 std::string userId = GetCurrentUser();
336 if (userId.empty()) {
337 ZLOGE("no user error");
338 return OBJECT_INNER_ERROR;
339 }
340 if (userMeta.userId == userId) {
341 ZLOGI("user is same, don't need clear, user:%{public}s.", userId.c_str());
342 return OBJECT_SUCCESS;
343 }
344 ZLOGI("user changed, need clear, userId:%{public}s", userId.c_str());
345 int32_t result = Open();
346 if (result != OBJECT_SUCCESS) {
347 ZLOGE("Open failed, errCode = %{public}d", result);
348 return STORE_NOT_OPEN;
349 }
350 result = RevokeSaveToStore("");
351 callbacks_.Clear();
352 processCallbacks_.Clear();
353 ForceClose();
354 return result;
355 }
356
InitUserMeta()357 int32_t ObjectStoreManager::InitUserMeta()
358 {
359 ObjectUserMetaData userMeta;
360 if (DistributedData::MetaDataManager::GetInstance().LoadMeta(userMeta.GetKey(), userMeta, true)) {
361 ZLOGI("userId has been set, don't need clean");
362 return OBJECT_SUCCESS;
363 }
364 std::string userId = GetCurrentUser();
365 if (userId.empty()) {
366 ZLOGI("get userId error");
367 return OBJECT_INNER_ERROR;
368 }
369 userMeta.userId = userId;
370 std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
371 std::string metaKey = GetMetaUserIdKey(userId, appId);
372 if (!DistributedData::MetaDataManager::GetInstance().DelMeta(metaKey, true)) {
373 ZLOGE("delete old meta error, userId:%{public}s", userId.c_str());
374 return OBJECT_INNER_ERROR;
375 }
376 if (!DistributedData::MetaDataManager::GetInstance().SaveMeta(DistributedData::ObjectUserMetaData::GetKey(),
377 userMeta, true)) {
378 ZLOGE("save meta error, userId:%{public}s", userId.c_str());
379 return OBJECT_INNER_ERROR;
380 }
381 return OBJECT_SUCCESS;
382 }
383
DeleteByAppId(const std::string & appId,int32_t user)384 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user)
385 {
386 int32_t result = Open();
387 if (result != OBJECT_SUCCESS) {
388 ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
389 appId.c_str(), user);
390 return STORE_NOT_OPEN;
391 }
392 result = RevokeSaveToStore(appId);
393 if (result != OBJECT_SUCCESS) {
394 ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
395 appId.c_str(), user);
396 }
397 Close();
398 return result;
399 }
400
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)401 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
402 pid_t pid, uint32_t tokenId,
403 sptr<IRemoteObject> callback)
404 {
405 if (bundleName.empty() || sessionId.empty()) {
406 ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
407 return;
408 }
409 ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
410 auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
411 if (proxy == nullptr) {
412 ZLOGE("proxy is nullptr, callback is %{public}s, bundleName: %{public}s, sessionId: %{public}s, pid: "
413 "%{public}d, tokenId: %{public}u.",
414 (callback == nullptr) ? "nullptr" : "not null", bundleName.c_str(), Anonymous::Change(sessionId).c_str(),
415 pid, tokenId);
416 return;
417 }
418 std::string prefix = bundleName + sessionId;
419 callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
420 if (value.pid != pid) {
421 value = CallbackInfo { pid };
422 }
423 value.observers_.insert_or_assign(prefix, proxy);
424 return !value.observers_.empty();
425 }));
426 }
427
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)428 void ObjectStoreManager::UnregisterRemoteCallback(
429 const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId)
430 {
431 if (bundleName.empty()) {
432 ZLOGD("bundleName is empty");
433 return;
434 }
435 callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
436 if (value.pid != pid) {
437 return true;
438 }
439 if (sessionId.empty()) {
440 return false;
441 }
442 std::string prefix = bundleName + sessionId;
443 for (auto it = value.observers_.begin(); it != value.observers_.end();) {
444 if ((*it).first == prefix) {
445 it = value.observers_.erase(it);
446 } else {
447 ++it;
448 }
449 }
450 return true;
451 }));
452 }
453
RegisterProgressObserverCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)454 void ObjectStoreManager::RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId,
455 pid_t pid, uint32_t tokenId, sptr<IRemoteObject> callback)
456 {
457 if (bundleName.empty() || sessionId.empty()) {
458 ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback empty bundleName = %{public}s, sessionId = "
459 "%{public}s",
460 bundleName.c_str(), DistributedData::Anonymous::Change(sessionId).c_str());
461 return;
462 }
463 auto proxy = iface_cast<ObjectProgressCallbackProxy>(callback);
464 if (proxy == nullptr) {
465 ZLOGE("proxy is nullptr, callback is %{public}s, bundleName: %{public}s, sessionId: %{public}s, pid: "
466 "%{public}d, tokenId: %{public}u.",
467 (callback == nullptr) ? "nullptr" : "not null", bundleName.c_str(), Anonymous::Change(sessionId).c_str(),
468 pid, tokenId);
469 return;
470 }
471 std::string objectKey = bundleName + sessionId;
472 sptr<ObjectProgressCallbackProxy> observer;
473 processCallbacks_.Compute(
474 tokenId, ([pid, &proxy, &objectKey, &observer](const uint32_t key, ProgressCallbackInfo &value) {
475 if (value.pid != pid) {
476 value = ProgressCallbackInfo{ pid };
477 }
478 value.observers_.insert_or_assign(objectKey, proxy);
479 observer = value.observers_[objectKey];
480 return !value.observers_.empty();
481 }));
482 int32_t progress = PROGRESS_INVALID;
483 {
484 std::lock_guard<std::mutex> lock(progressMutex_);
485 auto it = progressInfo_.find(objectKey);
486 if (it == progressInfo_.end()) {
487 return;
488 }
489 progress = it->second;
490 }
491 if (observer != nullptr && progress != PROGRESS_INVALID) {
492 observer->Completed(progress);
493 }
494 if (progress == PROGRESS_MAX) {
495 assetsRecvProgress_.Erase(objectKey);
496 std::lock_guard<std::mutex> lock(progressMutex_);
497 progressInfo_.erase(objectKey);
498 }
499 }
500
UnregisterProgressObserverCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)501 void ObjectStoreManager::UnregisterProgressObserverCallback(
502 const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId)
503 {
504 if (bundleName.empty()) {
505 ZLOGD("bundleName is empty");
506 return;
507 }
508 processCallbacks_.Compute(
509 tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, ProgressCallbackInfo &value) {
510 if (value.pid != pid) {
511 return true;
512 }
513 if (sessionId.empty()) {
514 return false;
515 }
516 std::string prefix = bundleName + sessionId;
517 for (auto it = value.observers_.begin(); it != value.observers_.end();) {
518 if ((*it).first == prefix) {
519 it = value.observers_.erase(it);
520 } else {
521 ++it;
522 }
523 }
524 return true;
525 }));
526 }
527
NotifyChange(const ObjectRecord & changedData)528 void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData)
529 {
530 ZLOGI("OnChange start, size:%{public}zu", changedData.size());
531 bool hasAsset = false;
532 SaveInfo saveInfo;
533 for (const auto &[key, value] : changedData) {
534 if (key.find(SAVE_INFO) != std::string::npos) {
535 DistributedData::Serializable::Unmarshall(std::string(value.begin(), value.end()), saveInfo);
536 break;
537 }
538 }
539 auto data = GetObjectData(changedData, saveInfo, hasAsset);
540 if (!hasAsset) {
541 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
542 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
543 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
544 DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
545 return false;
546 });
547 return;
548 }
549 NotifyDataChanged(data, saveInfo);
550 SaveUserToMeta();
551 }
552
GetObjectData(const ObjectRecord & changedData,SaveInfo & saveInfo,bool & hasAsset)553 std::map<std::string, ObjectRecord> ObjectStoreManager::GetObjectData(const ObjectRecord& changedData,
554 SaveInfo& saveInfo, bool& hasAsset)
555 {
556 std::map<std::string, ObjectRecord> data;
557 std::string keyPrefix = saveInfo.ToPropertyPrefix();
558 if (!keyPrefix.empty()) {
559 std::string observerKey = saveInfo.bundleName + saveInfo.sessionId;
560 for (const auto &[key, value] : changedData) {
561 if (key.size() < keyPrefix.size() || key.find(SAVE_INFO) != std::string::npos) {
562 continue;
563 }
564 std::string propertyName = key.substr(keyPrefix.size());
565 data[observerKey].insert_or_assign(propertyName, value);
566 if (!hasAsset && IsAssetKey(propertyName)) {
567 hasAsset = true;
568 }
569 }
570 } else {
571 for (const auto &item : changedData) {
572 std::vector<std::string> splitKeys = SplitEntryKey(item.first);
573 if (splitKeys.size() <= PROPERTY_NAME_INDEX) {
574 continue;
575 }
576 if (saveInfo.sourceDeviceId.empty() || saveInfo.bundleName.empty()) {
577 saveInfo.sourceDeviceId = splitKeys[SOURCE_DEVICE_UDID_INDEX];
578 saveInfo.bundleName = splitKeys[BUNDLE_NAME_INDEX];
579 saveInfo.sessionId = splitKeys[SESSION_ID_INDEX];
580 saveInfo.timestamp = splitKeys[TIME_INDEX];
581 }
582 std::string prefix = splitKeys[BUNDLE_NAME_INDEX] + splitKeys[SESSION_ID_INDEX];
583 std::string propertyName = splitKeys[PROPERTY_NAME_INDEX];
584 data[prefix].insert_or_assign(propertyName, item.second);
585 if (IsAssetKey(propertyName)) {
586 hasAsset = true;
587 }
588 }
589 }
590 return data;
591 }
592
ComputeStatus(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)593 void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
594 const std::map<std::string, ObjectRecord>& data)
595 {
596 restoreStatus_.Compute(objectKey, [this, &data, saveInfo] (const auto &key, auto &value) {
597 if (value == RestoreStatus::ASSETS_READY) {
598 value = RestoreStatus::ALL_READY;
599 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
600 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS);
601 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
602 DoNotify(tokenId, value, data, true);
603 return false;
604 });
605 } else {
606 value = RestoreStatus::DATA_READY;
607 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
608 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
609 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
610 DoNotify(tokenId, value, data, false);
611 return false;
612 });
613 WaitAssets(key, saveInfo, data);
614 }
615 return true;
616 });
617 }
618
IsNeedMetaSync(const StoreMetaData & meta,const std::vector<std::string> & uuids)619 bool ObjectStoreManager::IsNeedMetaSync(const StoreMetaData &meta, const std::vector<std::string> &uuids)
620 {
621 bool isAfterMeta = false;
622 for (const auto &uuid : uuids) {
623 auto metaData = meta;
624 metaData.deviceId = uuid;
625 CapMetaData capMeta;
626 auto capKey = CapMetaRow::GetKeyFor(uuid);
627 bool flag = !MetaDataManager::GetInstance().LoadMeta(std::string(capKey.begin(), capKey.end()), capMeta) ||
628 !MetaDataManager::GetInstance().LoadMeta(metaData.GetKeyWithoutPath(), metaData);
629 if (flag) {
630 isAfterMeta = true;
631 break;
632 }
633 auto [exist, mask] = DeviceMatrix::GetInstance().GetRemoteMask(uuid);
634 if ((mask & DeviceMatrix::META_STORE_MASK) == DeviceMatrix::META_STORE_MASK) {
635 isAfterMeta = true;
636 break;
637 }
638 auto [existLocal, localMask] = DeviceMatrix::GetInstance().GetMask(uuid);
639 if ((localMask & DeviceMatrix::META_STORE_MASK) == DeviceMatrix::META_STORE_MASK) {
640 isAfterMeta = true;
641 break;
642 }
643 }
644 return isAfterMeta;
645 }
646
NotifyDataChanged(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)647 void ObjectStoreManager::NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
648 {
649 for (auto const& [objectKey, results] : data) {
650 restoreStatus_.ComputeIfAbsent(
651 objectKey, [](const std::string& key) -> auto {
652 return RestoreStatus::NONE;
653 });
654 ComputeStatus(objectKey, saveInfo, data);
655 }
656 }
657
WaitAssets(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)658 int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
659 const std::map<std::string, ObjectRecord>& data)
660 {
661 if (executors_ == nullptr) {
662 ZLOGE("executors_ is null");
663 return OBJECT_INNER_ERROR;
664 }
665 auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() {
666 ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str());
667 PullAssets(data, saveInfo);
668 DoNotifyWaitAssetTimeout(objectKey);
669 });
670
671 objectTimer_.ComputeIfAbsent(
672 objectKey, [taskId](const std::string& key) -> auto {
673 return taskId;
674 });
675 return OBJECT_SUCCESS;
676 }
677
PullAssets(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)678 void ObjectStoreManager::PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
679 {
680 std::map<std::string, Assets> changedAssets;
681 for (auto const& [objectId, result] : data) {
682 changedAssets[objectId] = GetAssetsFromDBRecords(result);
683 }
684 for (const auto& [objectId, assets] : changedAssets) {
685 std::string networkId = DmAdaper::GetInstance().ToNetworkID(saveInfo.sourceDeviceId);
686 auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
687 ObjectAssetLoader::GetInstance().TransferAssetsAsync(std::atoi(GetCurrentUser().c_str()),
688 saveInfo.bundleName, networkId, assets, [this, block](bool success) {
689 block->SetValue({ false, success });
690 });
691 auto [timeout, success] = block->GetValue();
692 ZLOGI("Pull assets end, timeout: %{public}d, success: %{public}d, size:%{public}zu, deviceId: %{public}s",
693 timeout, success, assets.size(), DistributedData::Anonymous::Change(networkId).c_str());
694 }
695 }
696
NotifyAssetsRecvProgress(const std::string & objectKey,int32_t progress)697 void ObjectStoreManager::NotifyAssetsRecvProgress(const std::string &objectKey, int32_t progress)
698 {
699 assetsRecvProgress_.InsertOrAssign(objectKey, progress);
700 std::list<sptr<ObjectProgressCallbackProxy>> observers;
701 bool flag = false;
702 processCallbacks_.ForEach(
703 [&objectKey, &observers, &flag](uint32_t tokenId, const ProgressCallbackInfo &value) {
704 if (value.observers_.empty()) {
705 flag = true;
706 return false;
707 }
708 auto it = value.observers_.find(objectKey);
709 if (it != value.observers_.end()) {
710 observers.push_back(it->second);
711 }
712 return false;
713 });
714 if (flag) {
715 std::lock_guard<std::mutex> lock(progressMutex_);
716 progressInfo_.insert_or_assign(objectKey, progress);
717 }
718 for (auto &observer : observers) {
719 if (observer == nullptr) {
720 continue;
721 }
722 observer->Completed(progress);
723 }
724 if (!observers.empty() && progress == PROGRESS_MAX) {
725 assetsRecvProgress_.Erase(objectKey);
726 std::lock_guard<std::mutex> lock(progressMutex_);
727 progressInfo_.erase(objectKey);
728 }
729 }
730
NotifyAssetsReady(const std::string & objectKey,const std::string & bundleName,const std::string & srcNetworkId)731 void ObjectStoreManager::NotifyAssetsReady(
732 const std::string &objectKey, const std::string &bundleName, const std::string &srcNetworkId)
733 {
734 if (executors_ == nullptr) {
735 ZLOGE("executors_ is nullptr");
736 return;
737 }
738 restoreStatus_.ComputeIfAbsent(
739 objectKey, [](const std::string& key) -> auto {
740 return RestoreStatus::NONE;
741 });
742 restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) {
743 if (value == RestoreStatus::DATA_NOTIFIED) {
744 value = RestoreStatus::ALL_READY;
745 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
746 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
747 callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) {
748 DoNotifyAssetsReady(tokenId, value, key, true);
749 return false;
750 });
751 } else if (value == RestoreStatus::DATA_READY) {
752 value = RestoreStatus::ALL_READY;
753 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
754 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
755 auto [has, taskId] = objectTimer_.Find(key);
756 if (has) {
757 executors_->Remove(taskId);
758 objectTimer_.Erase(key);
759 }
760 } else {
761 value = RestoreStatus::ASSETS_READY;
762 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
763 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, bundleName);
764 }
765 return true;
766 });
767 }
768
NotifyAssetsStart(const std::string & objectKey,const std::string & srcNetworkId)769 void ObjectStoreManager::NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId)
770 {
771 restoreStatus_.ComputeIfAbsent(
772 objectKey, [](const std::string& key) -> auto {
773 return RestoreStatus::NONE;
774 });
775 }
776
IsAssetKey(const std::string & key)777 bool ObjectStoreManager::IsAssetKey(const std::string& key)
778 {
779 return key.find(ObjectStore::ASSET_DOT) != std::string::npos;
780 }
781
IsAssetComplete(const ObjectRecord & result,const std::string & assetPrefix)782 bool ObjectStoreManager::IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix)
783 {
784 if (result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
785 result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
786 result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() ||
787 result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() ||
788 result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
789 result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
790 return false;
791 }
792 return true;
793 }
794
GetAssetsFromDBRecords(const ObjectRecord & result)795 Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result)
796 {
797 Assets assets{};
798 std::set<std::string> assetKey;
799 for (const auto& [key, value] : result) {
800 std::string assetPrefix = key.substr(0, key.find(ObjectStore::ASSET_DOT));
801 if (!IsAssetKey(key) || assetKey.find(assetPrefix) != assetKey.end() ||
802 result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
803 result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
804 result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
805 result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
806 continue;
807 }
808 Asset asset;
809 ObjectStore::StringUtils::BytesToStrWithType(
810 result.find(assetPrefix + ObjectStore::NAME_SUFFIX)->second, asset.name);
811 if (asset.name.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
812 asset.name = asset.name.substr(ObjectStore::STRING_PREFIX_LEN);
813 }
814 ObjectStore::StringUtils::BytesToStrWithType(
815 result.find(assetPrefix + ObjectStore::URI_SUFFIX)->second, asset.uri);
816 if (asset.uri.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
817 asset.uri = asset.uri.substr(ObjectStore::STRING_PREFIX_LEN);
818 }
819 ObjectStore::StringUtils::BytesToStrWithType(
820 result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX)->second, asset.modifyTime);
821 if (asset.modifyTime.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
822 asset.modifyTime = asset.modifyTime.substr(ObjectStore::STRING_PREFIX_LEN);
823 }
824 ObjectStore::StringUtils::BytesToStrWithType(
825 result.find(assetPrefix + ObjectStore::SIZE_SUFFIX)->second, asset.size);
826 if (asset.size.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
827 asset.size = asset.size.substr(ObjectStore::STRING_PREFIX_LEN);
828 }
829 asset.hash = asset.modifyTime + "_" + asset.size;
830 assets.push_back(asset);
831 assetKey.insert(assetPrefix);
832 }
833 return assets;
834 }
835
DoNotify(uint32_t tokenId,const CallbackInfo & value,const std::map<std::string,ObjectRecord> & data,bool allReady)836 void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value,
837 const std::map<std::string, ObjectRecord>& data, bool allReady)
838 {
839 for (const auto& observer : value.observers_) {
840 auto it = data.find(observer.first);
841 if (it == data.end()) {
842 continue;
843 }
844 observer.second->Completed((*it).second, allReady);
845 if (allReady) {
846 restoreStatus_.Erase(observer.first);
847 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
848 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
849 } else {
850 restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) {
851 value = RestoreStatus::DATA_NOTIFIED;
852 return true;
853 });
854 }
855 }
856 }
857
DoNotifyAssetsReady(uint32_t tokenId,const CallbackInfo & value,const std::string & objectKey,bool allReady)858 void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value,
859 const std::string& objectKey, bool allReady)
860 {
861 if (executors_ == nullptr) {
862 ZLOGE("executors_ is nullptr");
863 return;
864 }
865 for (const auto& observer : value.observers_) {
866 if (objectKey != observer.first) {
867 continue;
868 }
869 observer.second->Completed(ObjectRecord(), allReady);
870 if (allReady) {
871 restoreStatus_.Erase(objectKey);
872 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
873 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
874 }
875 auto [has, taskId] = objectTimer_.Find(objectKey);
876 if (has) {
877 executors_->Remove(taskId);
878 objectTimer_.Erase(objectKey);
879 }
880 }
881 }
882
DoNotifyWaitAssetTimeout(const std::string & objectKey)883 void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey)
884 {
885 if (executors_ == nullptr) {
886 ZLOGE("executors_ is nullptr");
887 return;
888 }
889 ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
890 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT);
891 callbacks_.ForEach([this, &objectKey](uint32_t tokenId, const CallbackInfo &value) {
892 for (const auto& observer : value.observers_) {
893 if (objectKey != observer.first) {
894 continue;
895 }
896 observer.second->Completed(ObjectRecord(), true);
897 restoreStatus_.Erase(objectKey);
898 auto [has, taskId] = objectTimer_.Find(objectKey);
899 if (has) {
900 executors_->Remove(taskId);
901 objectTimer_.Erase(objectKey);
902 }
903 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
904 ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED);
905 }
906 return false;
907 });
908 }
909
SetData(const std::string & dataDir,const std::string & userId)910 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
911 {
912 ZLOGI("enter, user: %{public}s", userId.c_str());
913 kvStoreDelegateManager_ = std::make_shared<DistributedDB::KvStoreDelegateManager>
914 (DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
915 DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
916 auto status = kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
917 if (status != DistributedDB::OK) {
918 ZLOGE("Set kvstore config failed, status: %{public}d", status);
919 return;
920 }
921 userId_ = userId;
922 }
923
Open()924 int32_t ObjectStoreManager::Open()
925 {
926 if (kvStoreDelegateManager_ == nullptr) {
927 ZLOGE("Kvstore delegate manager not init");
928 return OBJECT_INNER_ERROR;
929 }
930 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
931 if (delegate_ == nullptr) {
932 delegate_ = OpenObjectKvStore();
933 if (delegate_ == nullptr) {
934 ZLOGE("Open object kvstore failed");
935 return OBJECT_DBSTATUS_ERROR;
936 }
937 syncCount_ = 1;
938 ZLOGI("Open object kvstore success");
939 } else {
940 syncCount_++;
941 ZLOGI("Object kvstore syncCount: %{public}d", syncCount_.load());
942 }
943 return OBJECT_SUCCESS;
944 }
945
ForceClose()946 void ObjectStoreManager::ForceClose()
947 {
948 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_, std::chrono::seconds(LOCK_TIMEOUT));
949 if (delegate_ == nullptr) {
950 return;
951 }
952 auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
953 if (status != DistributedDB::DBStatus::OK) {
954 ZLOGE("CloseKvStore fail %{public}d", status);
955 return;
956 }
957 delegate_ = nullptr;
958 isSyncing_ = false;
959 syncCount_ = 0;
960 }
961
Close()962 void ObjectStoreManager::Close()
963 {
964 int32_t taskCount = 0;
965 {
966 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
967 if (delegate_ == nullptr) {
968 return;
969 }
970 taskCount = delegate_->GetTaskCount();
971 }
972 if (taskCount > 0 && syncCount_ == 1) {
973 CloseAfterMinute();
974 ZLOGW("Store is busy, close after a minute, task count: %{public}d", taskCount);
975 return;
976 }
977 syncCount_--;
978 ZLOGI("closed a store, syncCount = %{public}d", syncCount_.load());
979 FlushClosedStore();
980 }
981
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)982 void ObjectStoreManager::SyncCompleted(
983 const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
984 {
985 std::string userId;
986 SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
987 if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
988 SetSyncStatus(false);
989 FlushClosedStore();
990 }
991 for (const auto &item : results) {
992 if (item.second == DistributedDB::DBStatus::OK) {
993 ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
994 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE,
995 ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
996 } else {
997 ZLOGE("Sync data failed, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", sequenceId, item.second);
998 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
999 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, item.second, ObjectStore::FINISHED);
1000 }
1001 }
1002 }
1003
FlushClosedStore()1004 void ObjectStoreManager::FlushClosedStore()
1005 {
1006 if (executors_ == nullptr) {
1007 ZLOGE("executors_ is nullptr");
1008 return;
1009 }
1010 std::unique_lock<decltype(rwMutex_)> lock(rwMutex_);
1011 if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
1012 ZLOGD("close store");
1013 auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
1014 if (status != DistributedDB::DBStatus::OK) {
1015 int timeOut = 1000;
1016 executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
1017 FlushClosedStore();
1018 });
1019 ZLOGE("GetEntries fail %{public}d", status);
1020 return;
1021 }
1022 delegate_ = nullptr;
1023 }
1024 }
1025
ProcessOldEntry(const std::string & appId)1026 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
1027 {
1028 std::vector<DistributedDB::Entry> entries;
1029 {
1030 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1031 if (delegate_ == nullptr) {
1032 ZLOGE("delegate is nullptr.");
1033 return;
1034 }
1035 auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
1036 if (status != DistributedDB::DBStatus::OK) {
1037 ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status);
1038 return;
1039 }
1040 }
1041 std::map<std::string, int64_t> sessionIds;
1042 int64_t oldestTime = 0;
1043 std::string deleteKey;
1044 for (auto &item : entries) {
1045 std::string key(item.key.begin(), item.key.end());
1046 std::vector<std::string> splitKeys = SplitEntryKey(key);
1047 if (splitKeys.empty()) {
1048 continue;
1049 }
1050 std::string bundleName = splitKeys[BUNDLE_NAME_INDEX];
1051 std::string sessionId = splitKeys[SESSION_ID_INDEX];
1052 if (sessionIds.count(sessionId) == 0) {
1053 char *end = nullptr;
1054 sessionIds[sessionId] = strtol(splitKeys[TIME_INDEX].c_str(), &end, DECIMAL_BASE);
1055 }
1056 if (oldestTime == 0 || oldestTime > sessionIds[sessionId]) {
1057 oldestTime = sessionIds[sessionId];
1058 deleteKey = GetPrefixWithoutDeviceId(bundleName, sessionId);
1059 }
1060 }
1061 if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
1062 return;
1063 }
1064 int32_t result = RevokeSaveToStore(deleteKey);
1065 if (result != OBJECT_SUCCESS) {
1066 ZLOGE("Delete old entries failed, deleteKey: %{public}s, status: %{public}d", deleteKey.c_str(), result);
1067 return;
1068 }
1069 ZLOGI("Delete old entries success, deleteKey: %{public}s", deleteKey.c_str());
1070 }
1071
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const ObjectRecord & data)1072 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
1073 const std::string &toDeviceId, const ObjectRecord &data)
1074 {
1075 ProcessOldEntry(appId);
1076 RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
1077 std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
1078 std::string prefix = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR;
1079 DistributedDB::Entry saveInfoEntry;
1080 std::string saveInfoKey = prefix + SAVE_INFO;
1081 saveInfoEntry.key = std::vector<uint8_t>(saveInfoKey.begin(), saveInfoKey.end());
1082 SaveInfo saveInfo(appId, sessionId, DmAdaper::GetInstance().GetLocalDevice().udid, toDeviceId, timestamp);
1083 std::string saveInfoValue = DistributedData::Serializable::Marshall(saveInfo);
1084 saveInfoEntry.value = std::vector<uint8_t>(saveInfoValue.begin(), saveInfoValue.end());
1085 std::vector<DistributedDB::Entry> entries;
1086 entries.emplace_back(saveInfoEntry);
1087 for (const auto &item : data) {
1088 DistributedDB::Entry entry;
1089 std::string key = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
1090 entry.key = std::vector<uint8_t>(key.begin(), key.end());
1091 entry.value = item.second;
1092 entries.emplace_back(entry);
1093 }
1094 {
1095 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1096 if (delegate_ == nullptr) {
1097 ZLOGE("delegate is nullptr.");
1098 return E_DB_ERROR;
1099 }
1100 auto status = delegate_->PutBatch(entries);
1101 if (status != DistributedDB::DBStatus::OK) {
1102 ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
1103 "status: %{public}d",
1104 appId.c_str(), Anonymous::Change(sessionId).c_str(), Anonymous::Change(toDeviceId).c_str(), status);
1105 return status;
1106 }
1107 }
1108 ZLOGI("PutBatch success, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
1109 "count: %{public}zu",
1110 appId.c_str(), Anonymous::Change(sessionId).c_str(), Anonymous::Change(toDeviceId).c_str(), entries.size());
1111 return OBJECT_SUCCESS;
1112 }
1113
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)1114 int32_t ObjectStoreManager::SyncOnStore(
1115 const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
1116 {
1117 std::vector<std::string> syncDevices;
1118 for (auto const &device : deviceList) {
1119 if (device == LOCAL_DEVICE) {
1120 ZLOGI("Save to local, do not need sync, prefix: %{public}s", Anonymous::Change(prefix).c_str());
1121 callback({{LOCAL_DEVICE, OBJECT_SUCCESS}});
1122 return OBJECT_SUCCESS;
1123 }
1124 syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
1125 }
1126 if (syncDevices.empty()) {
1127 ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
1128 callback(std::map<std::string, int32_t>());
1129 return OBJECT_SUCCESS;
1130 }
1131 uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
1132
1133 int32_t id = AccountDelegate::GetInstance()->GetUserByToken(IPCSkeleton::GetCallingFullTokenID());
1134 StoreMetaData meta = StoreMetaData(std::to_string(id), Bootstrap::GetInstance().GetProcessLabel(),
1135 DistributedObject::ObjectCommon::OBJECTSTORE_DB_STOREID);
1136 auto uuids = DmAdapter::GetInstance().ToUUID(syncDevices);
1137 bool isNeedMetaSync = IsNeedMetaSync(meta, uuids);
1138 if (!isNeedMetaSync) {
1139 return DoSync(prefix, syncDevices, sequenceId);
1140 }
1141 bool result = MetaDataManager::GetInstance().Sync(uuids, [this, prefix, syncDevices, sequenceId](auto &results) {
1142 auto status = DoSync(prefix, syncDevices, sequenceId);
1143 ZLOGI("Store sync after meta sync end, status:%{public}d", status);
1144 });
1145 ZLOGI("prefix:%{public}s, meta sync end, result:%{public}d", Anonymous::Change(prefix).c_str(), result);
1146 return result ? OBJECT_SUCCESS : DoSync(prefix, syncDevices, sequenceId);
1147 }
1148
DoSync(const std::string & prefix,const std::vector<std::string> & deviceList,uint64_t sequenceId)1149 int32_t ObjectStoreManager::DoSync(const std::string &prefix, const std::vector<std::string> &deviceList,
1150 uint64_t sequenceId)
1151 {
1152 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1153 if (delegate_ == nullptr) {
1154 ZLOGE("db store was closed.");
1155 return E_DB_ERROR;
1156 }
1157 DistributedDB::Query dbQuery = DistributedDB::Query::Select();
1158 dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
1159 ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
1160 auto status = delegate_->Sync(deviceList, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
1161 [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
1162 ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
1163 std::map<std::string, DistributedDB::DBStatus> result;
1164 for (auto &item : devicesMap) {
1165 result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
1166 }
1167 SyncCompleted(result, sequenceId);
1168 }, dbQuery, false);
1169 if (status != DistributedDB::DBStatus::OK) {
1170 ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d",
1171 Anonymous::Change(prefix).c_str(), sequenceId, status);
1172 std::string tmp;
1173 SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
1174 return status;
1175 }
1176 SetSyncStatus(true);
1177 return OBJECT_SUCCESS;
1178 }
1179
SetSyncStatus(bool status)1180 int32_t ObjectStoreManager::SetSyncStatus(bool status)
1181 {
1182 isSyncing_ = status;
1183 return OBJECT_SUCCESS;
1184 }
1185
RevokeSaveToStore(const std::string & prefix)1186 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
1187 {
1188 std::vector<DistributedDB::Entry> entries;
1189 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1190 if (delegate_ == nullptr) {
1191 ZLOGE("db store was closed.");
1192 return E_DB_ERROR;
1193 }
1194 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
1195 if (status == DistributedDB::DBStatus::NOT_FOUND) {
1196 ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
1197 return OBJECT_SUCCESS;
1198 }
1199 if (status != DistributedDB::DBStatus::OK) {
1200 ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
1201 return DB_ERROR;
1202 }
1203 std::vector<std::vector<uint8_t>> keys;
1204 std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) {
1205 keys.emplace_back(entry.key);
1206 });
1207 if (keys.empty()) {
1208 return OBJECT_SUCCESS;
1209 }
1210 status = delegate_->DeleteBatch(keys);
1211 if (status != DistributedDB::DBStatus::OK) {
1212 ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(),
1213 status);
1214 return DB_ERROR;
1215 }
1216 ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(),
1217 keys.size());
1218 return OBJECT_SUCCESS;
1219 }
1220
RetrieveFromStore(const std::string & appId,const std::string & sessionId,ObjectRecord & results)1221 int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const std::string &sessionId,
1222 ObjectRecord &results)
1223 {
1224 std::vector<DistributedDB::Entry> entries;
1225 std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
1226 std::shared_lock<decltype(rwMutex_)> lock(rwMutex_);
1227 if (delegate_ == nullptr) {
1228 ZLOGE("db store was closed.");
1229 return E_DB_ERROR;
1230 }
1231 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
1232 if (status == DistributedDB::DBStatus::NOT_FOUND) {
1233 ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
1234 return KEY_NOT_FOUND;
1235 }
1236 if (status != DistributedDB::DBStatus::OK) {
1237 ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
1238 return DB_ERROR;
1239 }
1240 ZLOGI("GetEntries success,prefix:%{public}s,count:%{public}zu", Anonymous::Change(prefix).c_str(), entries.size());
1241 for (const auto &entry : entries) {
1242 std::string key(entry.key.begin(), entry.key.end());
1243 if (key.find(SAVE_INFO) != std::string::npos) {
1244 continue;
1245 }
1246 auto splitKeys = SplitEntryKey(key);
1247 if (!splitKeys.empty()) {
1248 results[splitKeys[PROPERTY_NAME_INDEX]] = entry.value;
1249 }
1250 }
1251 return OBJECT_SUCCESS;
1252 }
1253
SaveInfo(const std::string & bundleName,const std::string & sessionId,const std::string & sourceDeviceId,const std::string & targetDeviceId,const std::string & timestamp)1254 ObjectStoreManager::SaveInfo::SaveInfo(const std::string &bundleName, const std::string &sessionId,
1255 const std::string &sourceDeviceId, const std::string &targetDeviceId, const std::string ×tamp)
1256 : bundleName(bundleName), sessionId(sessionId), sourceDeviceId(sourceDeviceId), targetDeviceId(targetDeviceId),
1257 timestamp(timestamp) {}
1258
Marshal(json & node) const1259 bool ObjectStoreManager::SaveInfo::Marshal(json &node) const
1260 {
1261 SetValue(node[GET_NAME(bundleName)], bundleName);
1262 SetValue(node[GET_NAME(sessionId)], sessionId);
1263 SetValue(node[GET_NAME(sourceDeviceId)], sourceDeviceId);
1264 SetValue(node[GET_NAME(targetDeviceId)], targetDeviceId);
1265 SetValue(node[GET_NAME(timestamp)], timestamp);
1266 return true;
1267 }
1268
Unmarshal(const json & node)1269 bool ObjectStoreManager::SaveInfo::Unmarshal(const json &node)
1270 {
1271 GetValue(node, GET_NAME(bundleName), bundleName);
1272 GetValue(node, GET_NAME(sessionId), sessionId);
1273 GetValue(node, GET_NAME(sourceDeviceId), sourceDeviceId);
1274 GetValue(node, GET_NAME(targetDeviceId), targetDeviceId);
1275 GetValue(node, GET_NAME(timestamp), timestamp);
1276 return true;
1277 }
1278
ToPropertyPrefix()1279 std::string ObjectStoreManager::SaveInfo::ToPropertyPrefix()
1280 {
1281 if (bundleName.empty() || sessionId.empty() || sourceDeviceId.empty() || targetDeviceId.empty() ||
1282 timestamp.empty()) {
1283 return "";
1284 }
1285 return bundleName + SEPERATOR + sessionId + SEPERATOR + sourceDeviceId + SEPERATOR + targetDeviceId + SEPERATOR +
1286 timestamp + SEPERATOR;
1287 }
1288
SplitEntryKey(const std::string & key)1289 std::vector<std::string> ObjectStoreManager::SplitEntryKey(const std::string &key)
1290 {
1291 std::smatch match;
1292 std::regex timeRegex(TIME_REGEX);
1293 if (!std::regex_search(key, match, timeRegex)) {
1294 ZLOGW("Format error, key.size = %{public}zu", key.size());
1295 return {};
1296 }
1297 auto timePos = match.position();
1298 std::string fromTime = key.substr(timePos + 1);
1299 std::string beforeTime = key.substr(0, timePos);
1300
1301 size_t targetDevicePos = beforeTime.find_last_of(SEPERATOR);
1302 if (targetDevicePos == std::string::npos) {
1303 ZLOGW("Format error, key.size = %{public}zu", key.size());
1304 return {};
1305 }
1306 std::string targetDevice = beforeTime.substr(targetDevicePos + 1);
1307 std::string beforeTargetDevice = beforeTime.substr(0, targetDevicePos);
1308
1309 size_t sourceDeviceUdidPos = beforeTargetDevice.find_last_of(SEPERATOR);
1310 if (sourceDeviceUdidPos == std::string::npos) {
1311 ZLOGW("Format error, key.size = %{public}zu", key.size());
1312 return {};
1313 }
1314 std::string sourceDeviceUdid = beforeTargetDevice.substr(sourceDeviceUdidPos + 1);
1315 std::string beforeSourceDeviceUdid = beforeTargetDevice.substr(0, sourceDeviceUdidPos);
1316
1317 size_t sessionIdPos = beforeSourceDeviceUdid.find_last_of(SEPERATOR);
1318 if (sessionIdPos == std::string::npos) {
1319 ZLOGW("Format error, key.size = %{public}zu", key.size());
1320 return {};
1321 }
1322 std::string sessionId = beforeSourceDeviceUdid.substr(sessionIdPos + 1);
1323 std::string bundleName = beforeSourceDeviceUdid.substr(0, sessionIdPos);
1324
1325 size_t propertyNamePos = fromTime.find_first_of(SEPERATOR);
1326 if (propertyNamePos == std::string::npos) {
1327 ZLOGW("Format error, key.size = %{public}zu", key.size());
1328 return {};
1329 }
1330 std::string propertyName = fromTime.substr(propertyNamePos + 1);
1331 std::string time = fromTime.substr(0, propertyNamePos);
1332
1333 return { bundleName, sessionId, sourceDeviceUdid, targetDevice, time, propertyName };
1334 }
1335
GetCurrentUser()1336 std::string ObjectStoreManager::GetCurrentUser()
1337 {
1338 std::vector<int> users;
1339 auto ret = AccountDelegate::GetInstance()->QueryUsers(users);
1340 if (!ret || users.empty()) {
1341 ZLOGE("failed to query os accounts, ret:%{public}d", ret);
1342 return "";
1343 }
1344 return std::to_string(users[0]);
1345 }
1346
SaveUserToMeta()1347 void ObjectStoreManager::SaveUserToMeta()
1348 {
1349 ZLOGD("start.");
1350 std::string userId = GetCurrentUser();
1351 if (userId.empty()) {
1352 return;
1353 }
1354 std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
1355 DistributedData::ObjectUserMetaData userMeta;
1356 userMeta.userId = userId;
1357 auto saved = DistributedData::MetaDataManager::GetInstance().SaveMeta(
1358 DistributedData::ObjectUserMetaData::GetKey(), userMeta, true);
1359 if (!saved) {
1360 ZLOGE("userMeta save failed, userId:%{public}s", userId.c_str());
1361 }
1362 }
1363
CloseAfterMinute()1364 void ObjectStoreManager::CloseAfterMinute()
1365 {
1366 if (executors_ == nullptr) {
1367 ZLOGE("executors_ is nullptr.");
1368 return;
1369 }
1370 executors_->Schedule(std::chrono::minutes(INTERVAL), [this]() {
1371 Close();
1372 });
1373 }
1374
SetThreadPool(std::shared_ptr<ExecutorPool> executors)1375 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
1376 {
1377 executors_ = executors;
1378 }
1379
AddNotifier(const std::string & userId,SyncCallBack & callback)1380 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
1381 {
1382 std::lock_guard<std::mutex> lock(notifierLock_);
1383 uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
1384 userIdSeqIdRelations_[userId].emplace_back(sequenceId);
1385 seqIdCallbackRelations_[sequenceId] = callback;
1386 return sequenceId;
1387 }
1388
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)1389 SequenceSyncManager::Result SequenceSyncManager::Process(
1390 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
1391 {
1392 std::lock_guard<std::mutex> lock(notifierLock_);
1393 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1394 ZLOGE("not exist");
1395 return ERR_SID_NOT_EXIST;
1396 }
1397 std::map<std::string, int32_t> syncResults;
1398 for (const auto &item : results) {
1399 syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
1400 }
1401 seqIdCallbackRelations_[sequenceId](syncResults);
1402 ZLOGD("end complete");
1403 return DeleteNotifierNoLock(sequenceId, userId);
1404 }
1405
DeleteNotifier(uint64_t sequenceId,std::string & userId)1406 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
1407 {
1408 std::lock_guard<std::mutex> lock(notifierLock_);
1409 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1410 ZLOGE("not exist");
1411 return ERR_SID_NOT_EXIST;
1412 }
1413 return DeleteNotifierNoLock(sequenceId, userId);
1414 }
1415
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)1416 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
1417 {
1418 seqIdCallbackRelations_.erase(sequenceId);
1419 auto userIdIter = userIdSeqIdRelations_.begin();
1420 while (userIdIter != userIdSeqIdRelations_.end()) {
1421 auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
1422 if (sIdIter != userIdIter->second.end()) {
1423 userIdIter->second.erase(sIdIter);
1424 if (userIdIter->second.empty()) {
1425 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
1426 userId = userIdIter->first;
1427 userIdSeqIdRelations_.erase(userIdIter);
1428 return SUCCESS_USER_HAS_FINISHED;
1429 } else {
1430 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
1431 return SUCCESS_USER_IN_USE;
1432 }
1433 }
1434 userIdIter++;
1435 }
1436 return SUCCESS_USER_HAS_FINISHED;
1437 }
1438
BindAsset(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,ObjectStore::Asset & asset,ObjectStore::AssetBindInfo & bindInfo)1439 int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
1440 ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo)
1441 {
1442 auto snapshotKey = appId + SEPERATOR + sessionId;
1443 snapshots_.ComputeIfAbsent(
1444 snapshotKey, [](const std::string& key) -> auto {
1445 return std::make_shared<ObjectSnapshot>();
1446 });
1447 auto storeKey = appId + SEPERATOR + bindInfo.storeName;
1448 bindSnapshots_.ComputeIfAbsent(
1449 storeKey, [](const std::string& key) -> auto {
1450 return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1451 });
1452 auto snapshots = snapshots_.Find(snapshotKey).second;
1453 bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) {
1454 value->emplace(std::pair{asset.uri, snapshots});
1455 return true;
1456 });
1457
1458 if (AccessTokenKit::GetTokenTypeFlag(tokenId) != TOKEN_HAP) {
1459 ZLOGE("TokenType is not TOKEN_HAP, token:0x%{public}x, bundleName:%{public}s", tokenId, appId.c_str());
1460 return GeneralError::E_ERROR;
1461 }
1462 HapTokenInfo tokenInfo;
1463 auto status = AccessTokenKit::GetHapTokenInfo(tokenId, tokenInfo);
1464 if (status != RET_SUCCESS) {
1465 ZLOGE("token:0x%{public}x, result:%{public}d, bundleName:%{public}s", tokenId, status, appId.c_str());
1466 return GeneralError::E_ERROR;
1467 }
1468 StoreInfo storeInfo;
1469 storeInfo.bundleName = appId;
1470 storeInfo.tokenId = tokenId;
1471 storeInfo.instanceId = tokenInfo.instIndex;
1472 storeInfo.user = tokenInfo.userID;
1473 storeInfo.storeName = bindInfo.storeName;
1474
1475 snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) {
1476 value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo);
1477 return true;
1478 });
1479 return OBJECT_SUCCESS;
1480 }
1481
ConvertBindInfo(ObjectStore::AssetBindInfo & bindInfo)1482 DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo)
1483 {
1484 return DistributedData::AssetBindInfo{
1485 .storeName = std::move(bindInfo.storeName),
1486 .tableName = std::move(bindInfo.tableName),
1487 .primaryKey = ValueProxy::Convert(std::move(bindInfo.primaryKey)),
1488 .field = std::move(bindInfo.field),
1489 .assetName = std::move(bindInfo.assetName),
1490 };
1491 }
1492
OnAssetChanged(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,const std::string & deviceId,const ObjectStore::Asset & asset)1493 int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId,
1494 const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset)
1495 {
1496 const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
1497 auto objectAsset = asset;
1498 Asset dataAsset = ValueProxy::Convert(std::move(objectAsset));
1499 auto snapshotKey = appId + SEPERATOR + sessionId;
1500 int32_t res = OBJECT_SUCCESS;
1501 bool exist = snapshots_.ComputeIfPresent(snapshotKey,
1502 [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr<Snapshot> snapshot) {
1503 if (snapshot != nullptr) {
1504 res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange
1505 }
1506 return snapshot != nullptr;
1507 });
1508 if (exist) {
1509 return res;
1510 }
1511
1512 auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
1513 ObjectAssetLoader::GetInstance().TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) {
1514 block->SetValue({ false, ret });
1515 });
1516 auto [timeout, success] = block->GetValue();
1517 if (timeout || !success) {
1518 ZLOGE("transfer failed, timeout: %{public}d, success: %{public}d, name: %{public}s, deviceId: %{public}s ",
1519 timeout, success, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
1520 return OBJECT_INNER_ERROR;
1521 }
1522 return OBJECT_SUCCESS;
1523 }
1524
GetSnapShots(const std::string & bundleName,const std::string & storeName)1525 ObjectStoreManager::UriToSnapshot ObjectStoreManager::GetSnapShots(const std::string& bundleName,
1526 const std::string& storeName)
1527 {
1528 auto storeKey = bundleName + SEPERATOR + storeName;
1529 bindSnapshots_.ComputeIfAbsent(
1530 storeKey, [](const std::string& key) -> auto {
1531 return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1532 });
1533 return bindSnapshots_.Find(storeKey).second;
1534 }
1535
DeleteSnapshot(const std::string & bundleName,const std::string & sessionId)1536 void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std::string& sessionId)
1537 {
1538 auto snapshotKey = bundleName + SEPERATOR + sessionId;
1539 auto snapshot = snapshots_.Find(snapshotKey).second;
1540 if (snapshot == nullptr) {
1541 ZLOGD("Not find snapshot, don't need delete");
1542 return;
1543 }
1544 bindSnapshots_.ForEach([snapshot](auto& key, auto& value) {
1545 for (auto pos = value->begin(); pos != value->end();) {
1546 if (pos->second == snapshot) {
1547 pos = value->erase(pos);
1548 } else {
1549 ++pos;
1550 }
1551 }
1552 return true;
1553 });
1554 snapshots_.Erase(snapshotKey);
1555 }
1556
AutoLaunchStore()1557 int32_t ObjectStoreManager::AutoLaunchStore()
1558 {
1559 int32_t status = Open();
1560 if (status != OBJECT_SUCCESS) {
1561 ZLOGE("Open fail %{public}d", status);
1562 return status;
1563 }
1564 CloseAfterMinute();
1565 ZLOGI("Auto launch, close after a minute");
1566 return OBJECT_SUCCESS;
1567 }
1568 } // namespace DistributedObject
1569 } // namespace OHOS
1570