1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "ObjectStoreManager"
16
17 #include "object_manager.h"
18
19 #include <regex>
20
21 #include "accesstoken_kit.h"
22 #include "account/account_delegate.h"
23 #include "block_data.h"
24 #include "bootstrap.h"
25 #include "common/bytes.h"
26 #include "common/string_utils.h"
27 #include "datetime_ex.h"
28 #include "distributed_file_daemon_manager.h"
29 #include "kvstore_utils.h"
30 #include "log_print.h"
31 #include "metadata/meta_data_manager.h"
32 #include "metadata/store_meta_data.h"
33 #include "object_dms_handler.h"
34 #include "object_radar_reporter.h"
35 #include "utils/anonymous.h"
36
37 namespace OHOS {
38 namespace DistributedObject {
39 using namespace OHOS::DistributedKv;
40 using namespace Security::AccessToken;
41 using StoreMetaData = OHOS::DistributedData::StoreMetaData;
42 using Account = OHOS::DistributedData::AccountDelegate;
43 using AccessTokenKit = Security::AccessToken::AccessTokenKit;
44 using ValueProxy = OHOS::DistributedData::ValueProxy;
45 using DistributedFileDaemonManager = Storage::DistributedFile::DistributedFileDaemonManager;
46 constexpr const char *SAVE_INFO = "p_###SAVEINFO###";
ObjectStoreManager()47 ObjectStoreManager::ObjectStoreManager()
48 {
49 ZLOGI("ObjectStoreManager construct");
50 RegisterAssetsLister();
51 }
52
~ObjectStoreManager()53 ObjectStoreManager::~ObjectStoreManager()
54 {
55 ZLOGI("ObjectStoreManager destroy");
56 if (objectAssetsRecvListener_ != nullptr) {
57 auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_);
58 if (status != DistributedDB::DBStatus::OK) {
59 ZLOGE("UnRegister assetsRecvListener err %{public}d", status);
60 }
61 }
62 }
63
OpenObjectKvStore()64 DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore()
65 {
66 DistributedDB::KvStoreNbDelegate *store = nullptr;
67 DistributedDB::KvStoreNbDelegate::Option option;
68 option.createDirByStoreIdOnly = true;
69 option.syncDualTupleMode = true;
70 option.secOption = { DistributedDB::S1, DistributedDB::ECE };
71 if (objectDataListener_ == nullptr) {
72 objectDataListener_ = new ObjectDataListener();
73 }
74 ZLOGD("start GetKvStore");
75 kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option,
76 [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) {
77 if (dbStatus != DistributedDB::DBStatus::OK) {
78 ZLOGE("GetKvStore fail %{public}d", dbStatus);
79 return;
80 }
81 ZLOGI("GetKvStore successsfully");
82 store = kvStoreNbDelegate;
83 std::vector<uint8_t> tmpKey;
84 DistributedDB::DBStatus status = store->RegisterObserver(tmpKey,
85 DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN,
86 objectDataListener_);
87 if (status != DistributedDB::DBStatus::OK) {
88 ZLOGE("RegisterObserver err %{public}d", status);
89 }
90 });
91 return store;
92 }
93
RegisterAssetsLister()94 bool ObjectStoreManager::RegisterAssetsLister()
95 {
96 if (objectAssetsSendListener_ == nullptr) {
97 objectAssetsSendListener_ = new ObjectAssetsSendListener();
98 }
99 if (objectAssetsRecvListener_ == nullptr) {
100 objectAssetsRecvListener_ = new ObjectAssetsRecvListener();
101 }
102 auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_);
103 if (status != DistributedDB::DBStatus::OK) {
104 ZLOGE("Register assetsRecvListener err %{public}d", status);
105 return false;
106 }
107 return true;
108 }
109
ProcessSyncCallback(const std::map<std::string,int32_t> & results,const std::string & appId,const std::string & sessionId,const std::string & deviceId)110 void ObjectStoreManager::ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
111 const std::string &sessionId, const std::string &deviceId)
112 {
113 if (results.empty() || results.find(LOCAL_DEVICE) != results.end()) {
114 return;
115 }
116 int32_t result = Open();
117 if (result != OBJECT_SUCCESS) {
118 ZLOGE("Open failed, errCode = %{public}d", result);
119 return;
120 }
121 // delete local data
122 result = RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, deviceId));
123 if (result != OBJECT_SUCCESS) {
124 ZLOGE("Save failed, status = %{public}d", result);
125 }
126 Close();
127 return;
128 }
129
Save(const std::string & appId,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId,sptr<IRemoteObject> callback)130 int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &sessionId,
131 const ObjectRecord &data, const std::string &deviceId, sptr<IRemoteObject> callback)
132 {
133 auto proxy = iface_cast<ObjectSaveCallbackProxy>(callback);
134 if (deviceId.size() == 0) {
135 ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(), sessionId.c_str());
136 proxy->Completed(std::map<std::string, int32_t>());
137 return INVALID_ARGUMENT;
138 }
139 int32_t result = Open();
140 if (result != OBJECT_SUCCESS) {
141 ZLOGE("Open object kvstore failed, result: %{public}d", result);
142 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
143 ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED);
144 proxy->Completed(std::map<std::string, int32_t>());
145 return STORE_NOT_OPEN;
146 }
147 SaveUserToMeta();
148 std::string dstBundleName = ObjectDmsHandler::GetInstance().GetDstBundleName(appId, deviceId);
149 result = SaveToStore(dstBundleName, sessionId, deviceId, data);
150 if (result != OBJECT_SUCCESS) {
151 ZLOGE("Save to store failed, result: %{public}d", result);
152 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
153 ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
154 Close();
155 proxy->Completed(std::map<std::string, int32_t>());
156 return result;
157 }
158 ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(),
159 sessionId.c_str(), Anonymous::Change(deviceId).c_str());
160 SyncCallBack syncCallback =
161 [proxy, dstBundleName, sessionId, deviceId, this](const std::map<std::string, int32_t> &results) {
162 ProcessSyncCallback(results, dstBundleName, sessionId, deviceId);
163 proxy->Completed(results);
164 };
165 result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), {deviceId}, syncCallback);
166 if (result != OBJECT_SUCCESS) {
167 ZLOGE("Sync data failed, result: %{public}d", result);
168 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
169 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED);
170 Close();
171 proxy->Completed(std::map<std::string, int32_t>());
172 return result;
173 }
174 Close();
175 return PushAssets(appId, dstBundleName, sessionId, data, deviceId);
176 }
177
PushAssets(const std::string & srcBundleName,const std::string & dstBundleName,const std::string & sessionId,const ObjectRecord & data,const std::string & deviceId)178 int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const std::string &dstBundleName,
179 const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId)
180 {
181 Assets assets = GetAssetsFromDBRecords(data);
182 if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) {
183 return OBJECT_SUCCESS;
184 }
185 sptr<AssetObj> assetObj = new AssetObj();
186 assetObj->dstBundleName_ = dstBundleName;
187 assetObj->srcBundleName_ = srcBundleName;
188 assetObj->dstNetworkId_ = deviceId;
189 assetObj->sessionId_ = sessionId;
190 for (const auto& asset : assets) {
191 assetObj->uris_.push_back(asset.uri);
192 }
193 if (objectAssetsSendListener_ == nullptr) {
194 objectAssetsSendListener_ = new ObjectAssetsSendListener();
195 }
196 int userId = std::atoi(GetCurrentUser().c_str());
197 auto status = ObjectAssetLoader::GetInstance()->PushAsset(userId, assetObj, objectAssetsSendListener_);
198 return status;
199 }
200
RevokeSave(const std::string & appId,const std::string & sessionId,sptr<IRemoteObject> callback)201 int32_t ObjectStoreManager::RevokeSave(
202 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback)
203 {
204 auto proxy = iface_cast<ObjectRevokeSaveCallbackProxy>(callback);
205 int32_t result = Open();
206 if (result != OBJECT_SUCCESS) {
207 ZLOGE("Open failed, errCode = %{public}d", result);
208 proxy->Completed(STORE_NOT_OPEN);
209 return STORE_NOT_OPEN;
210 }
211
212 result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId));
213 if (result != OBJECT_SUCCESS) {
214 ZLOGE("RevokeSave failed, errCode = %{public}d", result);
215 Close();
216 proxy->Completed(result);
217 return result;
218 }
219 std::vector<std::string> deviceList;
220 auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices();
221 std::for_each(deviceInfos.begin(), deviceInfos.end(),
222 [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); });
223 if (!deviceList.empty()) {
224 SyncCallBack tmp = [proxy](const std::map<std::string, int32_t> &results) {
225 ZLOGI("revoke save finished");
226 proxy->Completed(OBJECT_SUCCESS);
227 };
228 result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp);
229 if (result != OBJECT_SUCCESS) {
230 ZLOGE("sync failed, errCode = %{public}d", result);
231 proxy->Completed(result);
232 }
233 } else {
234 proxy->Completed(OBJECT_SUCCESS);
235 };
236 Close();
237 return result;
238 }
239
Retrieve(const std::string & bundleName,const std::string & sessionId,sptr<IRemoteObject> callback,uint32_t tokenId)240 int32_t ObjectStoreManager::Retrieve(
241 const std::string &bundleName, const std::string &sessionId, sptr<IRemoteObject> callback, uint32_t tokenId)
242 {
243 auto proxy = iface_cast<ObjectRetrieveCallbackProxy>(callback);
244 int32_t result = Open();
245 if (result != OBJECT_SUCCESS) {
246 ZLOGE("Open object kvstore failed, result: %{public}d", result);
247 proxy->Completed(ObjectRecord(), false);
248 return ObjectStore::GETKV_FAILED;
249 }
250 ObjectRecord results{};
251 int32_t status = RetrieveFromStore(bundleName, sessionId, results);
252 if (status != OBJECT_SUCCESS) {
253 ZLOGI("Retrieve from store failed, status: %{public}d, close after one minute.", status);
254 CloseAfterMinute();
255 proxy->Completed(ObjectRecord(), false);
256 return status;
257 }
258 bool allReady = false;
259 Assets assets = GetAssetsFromDBRecords(results);
260 if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) {
261 allReady = true;
262 } else {
263 auto objectKey = bundleName + sessionId;
264 restoreStatus_.ComputeIfPresent(objectKey, [&allReady](const auto &key, auto &value) {
265 if (value == RestoreStatus::ALL_READY) {
266 allReady = true;
267 return false;
268 }
269 if (value == RestoreStatus::DATA_READY) {
270 value = RestoreStatus::DATA_NOTIFIED;
271 }
272 return true;
273 });
274 }
275 status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId));
276 if (status != OBJECT_SUCCESS) {
277 ZLOGE("Revoke save failed, status: %{public}d", status);
278 Close();
279 proxy->Completed(ObjectRecord(), false);
280 return status;
281 }
282 Close();
283 proxy->Completed(results, allReady);
284 if (allReady) {
285 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
286 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
287 }
288 return status;
289 }
290
Clear()291 int32_t ObjectStoreManager::Clear()
292 {
293 ZLOGI("enter");
294 std::string userId = GetCurrentUser();
295 if (userId.empty()) {
296 return OBJECT_INNER_ERROR;
297 }
298 std::vector<StoreMetaData> metaData;
299 std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
300 std::string metaKey = GetMetaUserIdKey(userId, appId);
301 if (!DistributedData::MetaDataManager::GetInstance().LoadMeta(metaKey, metaData, true)) {
302 ZLOGE("no store of %{public}s", appId.c_str());
303 return OBJECT_STORE_NOT_FOUND;
304 }
305 for (const auto &storeMeta : metaData) {
306 if (storeMeta.storeType < StoreMetaData::StoreType::STORE_OBJECT_BEGIN
307 || storeMeta.storeType > StoreMetaData::StoreType::STORE_OBJECT_END) {
308 continue;
309 }
310 if (storeMeta.user == userId) {
311 ZLOGI("user is same, not need to change, mate user:%{public}s::user:%{public}s.",
312 storeMeta.user.c_str(), userId.c_str());
313 return OBJECT_SUCCESS;
314 }
315 }
316 ZLOGD("user is change, need to change");
317 int32_t result = Open();
318 if (result != OBJECT_SUCCESS) {
319 ZLOGE("Open failed, errCode = %{public}d", result);
320 return STORE_NOT_OPEN;
321 }
322 result = RevokeSaveToStore("");
323 Close();
324 return result;
325 }
326
DeleteByAppId(const std::string & appId,int32_t user)327 int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user)
328 {
329 int32_t result = Open();
330 if (result != OBJECT_SUCCESS) {
331 ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
332 appId.c_str(), user);
333 return STORE_NOT_OPEN;
334 }
335 result = RevokeSaveToStore(appId);
336 if (result != OBJECT_SUCCESS) {
337 ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result,
338 appId.c_str(), user);
339 }
340 Close();
341 std::string userId = std::to_string(user);
342 std::string metaKey = GetMetaUserIdKey(userId, appId);
343 auto status = DistributedData::MetaDataManager::GetInstance().DelMeta(metaKey, true);
344 if (!status) {
345 ZLOGE("Delete meta failed, userId: %{public}s, appId: %{public}s", userId.c_str(), appId.c_str());
346 }
347 return result;
348 }
349
RegisterRemoteCallback(const std::string & bundleName,const std::string & sessionId,pid_t pid,uint32_t tokenId,sptr<IRemoteObject> callback)350 void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
351 pid_t pid, uint32_t tokenId,
352 sptr<IRemoteObject> callback)
353 {
354 if (bundleName.empty() || sessionId.empty()) {
355 ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty");
356 return;
357 }
358 ZLOGD("ObjectStoreManager::RegisterRemoteCallback start");
359 auto proxy = iface_cast<ObjectChangeCallbackProxy>(callback);
360 std::string prefix = bundleName + sessionId;
361 callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) {
362 if (value.pid != pid) {
363 value = CallbackInfo { pid };
364 }
365 value.observers_.insert_or_assign(prefix, proxy);
366 return !value.observers_.empty();
367 }));
368 }
369
UnregisterRemoteCallback(const std::string & bundleName,pid_t pid,uint32_t tokenId,const std::string & sessionId)370 void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
371 const std::string &sessionId)
372 {
373 if (bundleName.empty()) {
374 ZLOGD("bundleName is empty");
375 return;
376 }
377 callbacks_.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, CallbackInfo &value) {
378 if (value.pid != pid) {
379 return true;
380 }
381 if (sessionId.empty()) {
382 return false;
383 }
384 std::string prefix = bundleName + sessionId;
385 for (auto it = value.observers_.begin(); it != value.observers_.end();) {
386 if ((*it).first == prefix) {
387 it = value.observers_.erase(it);
388 } else {
389 ++it;
390 }
391 }
392 return true;
393 }));
394 }
395
NotifyChange(ObjectRecord & changedData)396 void ObjectStoreManager::NotifyChange(ObjectRecord &changedData)
397 {
398 ZLOGI("OnChange start, size:%{public}zu", changedData.size());
399 bool hasAsset = false;
400 SaveInfo saveInfo;
401 for (const auto &[key, value] : changedData) {
402 if (key.find(SAVE_INFO) != std::string::npos) {
403 DistributedData::Serializable::Unmarshall(std::string(value.begin(), value.end()), saveInfo);
404 break;
405 }
406 }
407 auto data = GetObjectData(changedData, saveInfo, hasAsset);
408 if (!hasAsset) {
409 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
410 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
411 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
412 DoNotify(tokenId, value, data, true); // no asset, data ready means all ready
413 return false;
414 });
415 return;
416 }
417 NotifyDataChanged(data, saveInfo);
418 SaveUserToMeta();
419 }
420
GetObjectData(const ObjectRecord & changedData,SaveInfo & saveInfo,bool & hasAsset)421 std::map<std::string, ObjectRecord> ObjectStoreManager::GetObjectData(const ObjectRecord& changedData,
422 SaveInfo& saveInfo, bool& hasAsset)
423 {
424 std::map<std::string, ObjectRecord> data;
425 std::string keyPrefix = saveInfo.ToPropertyPrefix();
426 if (!keyPrefix.empty()) {
427 std::string observerKey = saveInfo.bundleName + saveInfo.sessionId;
428 for (const auto &[key, value] : changedData) {
429 if (key.size() < keyPrefix.size() || key.find(SAVE_INFO) != std::string::npos) {
430 continue;
431 }
432 std::string propertyName = key.substr(keyPrefix.size());
433 data[observerKey].insert_or_assign(propertyName, value);
434 if (!hasAsset && IsAssetKey(propertyName)) {
435 hasAsset = true;
436 }
437 }
438 } else {
439 for (const auto &item : changedData) {
440 std::vector<std::string> splitKeys = SplitEntryKey(item.first);
441 if (splitKeys.size() <= PROPERTY_NAME_INDEX) {
442 continue;
443 }
444 if (saveInfo.sourceDeviceId.empty() || saveInfo.bundleName.empty()) {
445 saveInfo.sourceDeviceId = splitKeys[SOURCE_DEVICE_UDID_INDEX];
446 saveInfo.bundleName = splitKeys[BUNDLE_NAME_INDEX];
447 saveInfo.sessionId = splitKeys[SESSION_ID_INDEX];
448 saveInfo.timestamp = splitKeys[TIME_INDEX];
449 }
450 std::string prefix = splitKeys[BUNDLE_NAME_INDEX] + splitKeys[SESSION_ID_INDEX];
451 std::string propertyName = splitKeys[PROPERTY_NAME_INDEX];
452 data[prefix].insert_or_assign(propertyName, item.second);
453 if (IsAssetKey(propertyName)) {
454 hasAsset = true;
455 }
456 }
457 }
458 return data;
459 }
460
ComputeStatus(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)461 void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
462 const std::map<std::string, ObjectRecord>& data)
463 {
464 restoreStatus_.Compute(objectKey, [this, &data, saveInfo] (const auto &key, auto &value) {
465 if (value == RestoreStatus::ASSETS_READY) {
466 value = RestoreStatus::ALL_READY;
467 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
468 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS);
469 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
470 DoNotify(tokenId, value, data, true);
471 return false;
472 });
473 } else {
474 value = RestoreStatus::DATA_READY;
475 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
476 ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName);
477 callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) {
478 DoNotify(tokenId, value, data, false);
479 return false;
480 });
481 WaitAssets(key, saveInfo, data);
482 }
483 return true;
484 });
485 }
486
NotifyDataChanged(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)487 void ObjectStoreManager::NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
488 {
489 for (auto const& [objectKey, results] : data) {
490 restoreStatus_.ComputeIfAbsent(
491 objectKey, [](const std::string& key) -> auto {
492 return RestoreStatus::NONE;
493 });
494 ComputeStatus(objectKey, saveInfo, data);
495 }
496 }
497
WaitAssets(const std::string & objectKey,const SaveInfo & saveInfo,const std::map<std::string,ObjectRecord> & data)498 int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
499 const std::map<std::string, ObjectRecord>& data)
500 {
501 auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() {
502 ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str());
503 PullAssets(data, saveInfo);
504 DoNotifyWaitAssetTimeout(objectKey);
505 });
506
507 objectTimer_.ComputeIfAbsent(
508 objectKey, [taskId](const std::string& key) -> auto {
509 return taskId;
510 });
511 return OBJECT_SUCCESS;
512 }
513
PullAssets(const std::map<std::string,ObjectRecord> & data,const SaveInfo & saveInfo)514 void ObjectStoreManager::PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo)
515 {
516 std::map<std::string, Assets> changedAssets;
517 for (auto const& [objectId, result] : data) {
518 changedAssets[objectId] = GetAssetsFromDBRecords(result);
519 }
520 for (const auto& [objectId, assets] : changedAssets) {
521 std::string networkId = DmAdaper::GetInstance().ToNetworkID(saveInfo.sourceDeviceId);
522 auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
523 ObjectAssetLoader::GetInstance()->TransferAssetsAsync(std::stoi(GetCurrentUser()),
524 saveInfo.bundleName, networkId, assets, [this, block](bool success) {
525 block->SetValue({ false, success });
526 });
527 auto [timeout, success] = block->GetValue();
528 ZLOGI("Pull assets end, timeout: %{public}d, success: %{public}d, size:%{public}zu, deviceId: %{public}s",
529 timeout, success, assets.size(), DistributedData::Anonymous::Change(networkId).c_str());
530 }
531 }
532
NotifyAssetsReady(const std::string & objectKey,const std::string & bundleName,const std::string & srcNetworkId)533 void ObjectStoreManager::NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName,
534 const std::string& srcNetworkId)
535 {
536 restoreStatus_.ComputeIfAbsent(
537 objectKey, [](const std::string& key) -> auto {
538 return RestoreStatus::NONE;
539 });
540 restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) {
541 if (value == RestoreStatus::DATA_NOTIFIED) {
542 value = RestoreStatus::ALL_READY;
543 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
544 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
545 callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) {
546 DoNotifyAssetsReady(tokenId, value, key, true);
547 return false;
548 });
549 } else if (value == RestoreStatus::DATA_READY) {
550 value = RestoreStatus::ALL_READY;
551 ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
552 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS);
553 auto [has, taskId] = objectTimer_.Find(key);
554 if (has) {
555 executors_->Remove(taskId);
556 objectTimer_.Erase(key);
557 }
558 } else {
559 value = RestoreStatus::ASSETS_READY;
560 ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
561 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, bundleName);
562 }
563 return true;
564 });
565 }
566
NotifyAssetsStart(const std::string & objectKey,const std::string & srcNetworkId)567 void ObjectStoreManager::NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId)
568 {
569 restoreStatus_.ComputeIfAbsent(
570 objectKey, [](const std::string& key) -> auto {
571 return RestoreStatus::NONE;
572 });
573 }
574
IsAssetKey(const std::string & key)575 bool ObjectStoreManager::IsAssetKey(const std::string& key)
576 {
577 return key.find(ObjectStore::ASSET_DOT) != std::string::npos;
578 }
579
IsAssetComplete(const ObjectRecord & result,const std::string & assetPrefix)580 bool ObjectStoreManager::IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix)
581 {
582 if (result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
583 result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() ||
584 result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() ||
585 result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() ||
586 result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() ||
587 result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) {
588 return false;
589 }
590 return true;
591 }
592
GetAssetsFromDBRecords(const ObjectRecord & result)593 Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result)
594 {
595 Assets assets{};
596 std::set<std::string> assetKey;
597 for (const auto& [key, value] : result) {
598 std::string assetPrefix = key.substr(0, key.find(ObjectStore::ASSET_DOT));
599 if (!IsAssetKey(key) || assetKey.find(assetPrefix) != assetKey.end() ||
600 result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() ||
601 result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end()) {
602 continue;
603 }
604 Asset asset;
605 ObjectStore::StringUtils::BytesToStrWithType(
606 result.find(assetPrefix + ObjectStore::NAME_SUFFIX)->second, asset.name);
607 if (asset.name.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
608 asset.name = asset.name.substr(ObjectStore::STRING_PREFIX_LEN);
609 }
610 ObjectStore::StringUtils::BytesToStrWithType(
611 result.find(assetPrefix + ObjectStore::URI_SUFFIX)->second, asset.uri);
612 if (asset.uri.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
613 asset.uri = asset.uri.substr(ObjectStore::STRING_PREFIX_LEN);
614 }
615 ObjectStore::StringUtils::BytesToStrWithType(
616 result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX)->second, asset.modifyTime);
617 if (asset.modifyTime.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
618 asset.modifyTime = asset.modifyTime.substr(ObjectStore::STRING_PREFIX_LEN);
619 }
620 ObjectStore::StringUtils::BytesToStrWithType(
621 result.find(assetPrefix + ObjectStore::SIZE_SUFFIX)->second, asset.size);
622 if (asset.size.find(ObjectStore::STRING_PREFIX) != std::string::npos) {
623 asset.size = asset.size.substr(ObjectStore::STRING_PREFIX_LEN);
624 }
625 asset.hash = asset.modifyTime + "_" + asset.size;
626 assets.push_back(asset);
627 assetKey.insert(assetPrefix);
628 }
629 return assets;
630 }
631
DoNotify(uint32_t tokenId,const CallbackInfo & value,const std::map<std::string,ObjectRecord> & data,bool allReady)632 void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value,
633 const std::map<std::string, ObjectRecord>& data, bool allReady)
634 {
635 for (const auto& observer : value.observers_) {
636 auto it = data.find(observer.first);
637 if (it == data.end()) {
638 continue;
639 }
640 observer.second->Completed((*it).second, allReady);
641 if (allReady) {
642 restoreStatus_.Erase(observer.first);
643 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
644 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
645 } else {
646 restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) {
647 value = RestoreStatus::DATA_NOTIFIED;
648 return true;
649 });
650 }
651 }
652 }
653
DoNotifyAssetsReady(uint32_t tokenId,const CallbackInfo & value,const std::string & objectKey,bool allReady)654 void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value,
655 const std::string& objectKey, bool allReady)
656 {
657 for (const auto& observer : value.observers_) {
658 if (objectKey != observer.first) {
659 continue;
660 }
661 observer.second->Completed(ObjectRecord(), allReady);
662 if (allReady) {
663 restoreStatus_.Erase(objectKey);
664 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
665 ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
666 }
667 auto [has, taskId] = objectTimer_.Find(objectKey);
668 if (has) {
669 executors_->Remove(taskId);
670 objectTimer_.Erase(objectKey);
671 }
672 }
673 }
674
DoNotifyWaitAssetTimeout(const std::string & objectKey)675 void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey)
676 {
677 ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
678 ObjectStore::ASSETS_RECV, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT);
679 callbacks_.ForEach([this, &objectKey](uint32_t tokenId, const CallbackInfo &value) {
680 for (const auto& observer : value.observers_) {
681 if (objectKey != observer.first) {
682 continue;
683 }
684 observer.second->Completed(ObjectRecord(), true);
685 restoreStatus_.Erase(objectKey);
686 auto [has, taskId] = objectTimer_.Find(objectKey);
687 if (has) {
688 executors_->Remove(taskId);
689 objectTimer_.Erase(objectKey);
690 }
691 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE,
692 ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED);
693 }
694 return false;
695 });
696 }
697
SetData(const std::string & dataDir,const std::string & userId)698 void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId)
699 {
700 ZLOGI("enter %{public}s", dataDir.c_str());
701 kvStoreDelegateManager_ =
702 new DistributedDB::KvStoreDelegateManager(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId);
703 DistributedDB::KvStoreConfig kvStoreConfig { dataDir };
704 kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig);
705 userId_ = userId;
706 }
707
Open()708 int32_t ObjectStoreManager::Open()
709 {
710 if (kvStoreDelegateManager_ == nullptr) {
711 ZLOGE("Kvstore delegate manager not init");
712 return OBJECT_INNER_ERROR;
713 }
714 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
715 if (delegate_ == nullptr) {
716 delegate_ = OpenObjectKvStore();
717 if (delegate_ == nullptr) {
718 ZLOGE("Open object kvstore failed");
719 return OBJECT_DBSTATUS_ERROR;
720 }
721 syncCount_ = 1;
722 ZLOGI("Open object kvstore success");
723 } else {
724 syncCount_++;
725 ZLOGI("Object kvstore syncCount: %{public}d", syncCount_);
726 }
727 return OBJECT_SUCCESS;
728 }
729
Close()730 void ObjectStoreManager::Close()
731 {
732 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
733 if (delegate_ == nullptr) {
734 return;
735 }
736 int32_t taskCount = delegate_->GetTaskCount();
737 if (taskCount > 0 && syncCount_ == 1) {
738 CloseAfterMinute();
739 ZLOGW("Store is busy, close after a minute, task count: %{public}d", taskCount);
740 return;
741 }
742 syncCount_--;
743 ZLOGI("closed a store, syncCount = %{public}d", syncCount_);
744 FlushClosedStore();
745 }
746
SyncCompleted(const std::map<std::string,DistributedDB::DBStatus> & results,uint64_t sequenceId)747 void ObjectStoreManager::SyncCompleted(
748 const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId)
749 {
750 std::string userId;
751 SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId);
752 if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) {
753 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
754 SetSyncStatus(false);
755 FlushClosedStore();
756 }
757 for (const auto &item : results) {
758 if (item.second == DistributedDB::DBStatus::OK) {
759 ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
760 ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE,
761 ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED);
762 } else {
763 ZLOGE("Sync data failed, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", sequenceId, item.second);
764 ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE,
765 ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, item.second, ObjectStore::FINISHED);
766 }
767 }
768 }
769
FlushClosedStore()770 void ObjectStoreManager::FlushClosedStore()
771 {
772 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
773 if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) {
774 ZLOGD("close store");
775 auto status = kvStoreDelegateManager_->CloseKvStore(delegate_);
776 if (status != DistributedDB::DBStatus::OK) {
777 int timeOut = 1000;
778 executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() {
779 FlushClosedStore();
780 });
781 ZLOGE("GetEntries fail %{public}d", status);
782 return;
783 }
784 delegate_ = nullptr;
785 if (objectDataListener_ != nullptr) {
786 delete objectDataListener_;
787 objectDataListener_ = nullptr;
788 }
789 }
790 }
791
ProcessOldEntry(const std::string & appId)792 void ObjectStoreManager::ProcessOldEntry(const std::string &appId)
793 {
794 std::vector<DistributedDB::Entry> entries;
795 auto status = delegate_->GetEntries(std::vector<uint8_t>(appId.begin(), appId.end()), entries);
796 if (status == DistributedDB::DBStatus::NOT_FOUND) {
797 ZLOGI("Get old entries empty, bundleName: %{public}s", appId.c_str());
798 return;
799 }
800 if (status != DistributedDB::DBStatus::OK) {
801 ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status);
802 return;
803 }
804 std::map<std::string, int64_t> sessionIds;
805 int64_t oldestTime = 0;
806 std::string deleteKey;
807 for (auto &item : entries) {
808 std::string key(item.key.begin(), item.key.end());
809 std::vector<std::string> splitKeys = SplitEntryKey(key);
810 if (splitKeys.empty()) {
811 continue;
812 }
813 std::string bundleName = splitKeys[BUNDLE_NAME_INDEX];
814 std::string sessionId = splitKeys[SESSION_ID_INDEX];
815 if (sessionIds.count(sessionId) == 0) {
816 char *end = nullptr;
817 sessionIds[sessionId] = strtol(splitKeys[TIME_INDEX].c_str(), &end, DECIMAL_BASE);
818 }
819 if (oldestTime == 0 || oldestTime > sessionIds[sessionId]) {
820 oldestTime = sessionIds[sessionId];
821 deleteKey = GetPrefixWithoutDeviceId(bundleName, sessionId);
822 }
823 }
824 if (sessionIds.size() < MAX_OBJECT_SIZE_PER_APP) {
825 return;
826 }
827 int32_t result = RevokeSaveToStore(deleteKey);
828 if (result != OBJECT_SUCCESS) {
829 ZLOGE("Delete old entries failed, deleteKey: %{public}s, status: %{public}d", deleteKey.c_str(), result);
830 return;
831 }
832 ZLOGI("Delete old entries success, deleteKey: %{public}s", deleteKey.c_str());
833 }
834
SaveToStore(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId,const ObjectRecord & data)835 int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::string &sessionId,
836 const std::string &toDeviceId, const ObjectRecord &data)
837 {
838 ProcessOldEntry(appId);
839 RevokeSaveToStore(GetPropertyPrefix(appId, sessionId, toDeviceId));
840 std::string timestamp = std::to_string(GetSecondsSince1970ToNow());
841 std::string prefix = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR;
842 DistributedDB::Entry saveInfoEntry;
843 std::string saveInfoKey = prefix + SAVE_INFO;
844 saveInfoEntry.key = std::vector<uint8_t>(saveInfoKey.begin(), saveInfoKey.end());
845 SaveInfo saveInfo(appId, sessionId, DmAdaper::GetInstance().GetLocalDevice().udid, toDeviceId, timestamp);
846 std::string saveInfoValue = DistributedData::Serializable::Marshall(saveInfo);
847 saveInfoEntry.value = std::vector<uint8_t>(saveInfoValue.begin(), saveInfoValue.end());
848 std::vector<DistributedDB::Entry> entries;
849 entries.emplace_back(saveInfoEntry);
850 for (auto &item : data) {
851 DistributedDB::Entry entry;
852 std::string key = GetPropertyPrefix(appId, sessionId, toDeviceId) + timestamp + SEPERATOR + item.first;
853 entry.key = std::vector<uint8_t>(key.begin(), key.end());
854 entry.value = item.second;
855 entries.emplace_back(entry);
856 }
857 auto status = delegate_->PutBatch(entries);
858 if (status != DistributedDB::DBStatus::OK) {
859 ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
860 "status: %{public}d", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), status);
861 return status;
862 }
863 ZLOGI("PutBatch success, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, "
864 "count: %{public}zu", appId.c_str(), sessionId.c_str(), Anonymous::Change(toDeviceId).c_str(), entries.size());
865 return OBJECT_SUCCESS;
866 }
867
SyncOnStore(const std::string & prefix,const std::vector<std::string> & deviceList,SyncCallBack & callback)868 int32_t ObjectStoreManager::SyncOnStore(
869 const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback)
870 {
871 std::vector<std::string> syncDevices;
872 for (auto &device : deviceList) {
873 if (device == LOCAL_DEVICE) {
874 ZLOGI("Save to local, do not need sync, prefix: %{public}s", prefix.c_str());
875 callback({{LOCAL_DEVICE, OBJECT_SUCCESS}});
876 return OBJECT_SUCCESS;
877 }
878 syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device));
879 }
880 if (syncDevices.empty()) {
881 ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
882 callback(std::map<std::string, int32_t>());
883 return OBJECT_SUCCESS;
884 }
885 uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback);
886 DistributedDB::Query dbQuery = DistributedDB::Query::Select();
887 dbQuery.PrefixKey(std::vector<uint8_t>(prefix.begin(), prefix.end()));
888 ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
889 auto status = delegate_->Sync(syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY,
890 [this, sequenceId](const std::map<std::string, DistributedDB::DBStatus> &devicesMap) {
891 ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId);
892 std::map<std::string, DistributedDB::DBStatus> result;
893 for (auto &item : devicesMap) {
894 result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second;
895 }
896 SyncCompleted(result, sequenceId);
897 }, dbQuery, false);
898 if (status != DistributedDB::DBStatus::OK) {
899 ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d",
900 Anonymous::Change(prefix).c_str(), sequenceId, status);
901 std::string tmp;
902 SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp);
903 return status;
904 }
905 SetSyncStatus(true);
906 return OBJECT_SUCCESS;
907 }
908
SetSyncStatus(bool status)909 int32_t ObjectStoreManager::SetSyncStatus(bool status)
910 {
911 std::lock_guard<std::recursive_mutex> lock(kvStoreMutex_);
912 isSyncing_ = status;
913 return OBJECT_SUCCESS;
914 }
915
RevokeSaveToStore(const std::string & prefix)916 int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix)
917 {
918 std::vector<DistributedDB::Entry> entries;
919 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
920 if (status == DistributedDB::DBStatus::NOT_FOUND) {
921 ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str());
922 return OBJECT_SUCCESS;
923 }
924 if (status != DistributedDB::DBStatus::OK) {
925 ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status);
926 return DB_ERROR;
927 }
928 std::vector<std::vector<uint8_t>> keys;
929 std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) {
930 keys.emplace_back(entry.key);
931 });
932 if (keys.empty()) {
933 return OBJECT_SUCCESS;
934 }
935 status = delegate_->DeleteBatch(keys);
936 if (status != DistributedDB::DBStatus::OK) {
937 ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(),
938 status);
939 return DB_ERROR;
940 }
941 ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(),
942 keys.size());
943 return OBJECT_SUCCESS;
944 }
945
RetrieveFromStore(const std::string & appId,const std::string & sessionId,ObjectRecord & results)946 int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const std::string &sessionId,
947 ObjectRecord &results)
948 {
949 std::vector<DistributedDB::Entry> entries;
950 std::string prefix = GetPrefixWithoutDeviceId(appId, sessionId);
951 auto status = delegate_->GetEntries(std::vector<uint8_t>(prefix.begin(), prefix.end()), entries);
952 if (status == DistributedDB::DBStatus::NOT_FOUND) {
953 ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
954 return KEY_NOT_FOUND;
955 }
956 if (status != DistributedDB::DBStatus::OK) {
957 ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", prefix.c_str(), status);
958 return DB_ERROR;
959 }
960 ZLOGI("Get entries success, prefix: %{public}s, count: %{public}zu", prefix.c_str(), entries.size());
961 for (const auto &entry : entries) {
962 std::string key(entry.key.begin(), entry.key.end());
963 if (key.find(SAVE_INFO) != std::string::npos) {
964 continue;
965 }
966 auto splitKeys = SplitEntryKey(key);
967 if (!splitKeys.empty()) {
968 results[splitKeys[PROPERTY_NAME_INDEX]] = entry.value;
969 }
970 }
971 return OBJECT_SUCCESS;
972 }
973
SaveInfo(const std::string & bundleName,const std::string & sessionId,const std::string & sourceDeviceId,const std::string & targetDeviceId,const std::string & timestamp)974 ObjectStoreManager::SaveInfo::SaveInfo(const std::string &bundleName, const std::string &sessionId,
975 const std::string &sourceDeviceId, const std::string &targetDeviceId, const std::string ×tamp)
976 : bundleName(bundleName), sessionId(sessionId), sourceDeviceId(sourceDeviceId), targetDeviceId(targetDeviceId),
977 timestamp(timestamp) {}
978
Marshal(json & node) const979 bool ObjectStoreManager::SaveInfo::Marshal(json &node) const
980 {
981 SetValue(node[GET_NAME(bundleName)], bundleName);
982 SetValue(node[GET_NAME(sessionId)], sessionId);
983 SetValue(node[GET_NAME(sourceDeviceId)], sourceDeviceId);
984 SetValue(node[GET_NAME(targetDeviceId)], targetDeviceId);
985 SetValue(node[GET_NAME(timestamp)], timestamp);
986 return true;
987 }
988
Unmarshal(const json & node)989 bool ObjectStoreManager::SaveInfo::Unmarshal(const json &node)
990 {
991 GetValue(node, GET_NAME(bundleName), bundleName);
992 GetValue(node, GET_NAME(sessionId), sessionId);
993 GetValue(node, GET_NAME(sourceDeviceId), sourceDeviceId);
994 GetValue(node, GET_NAME(targetDeviceId), targetDeviceId);
995 GetValue(node, GET_NAME(timestamp), timestamp);
996 return true;
997 }
998
ToPropertyPrefix()999 std::string ObjectStoreManager::SaveInfo::ToPropertyPrefix()
1000 {
1001 if (bundleName.empty() || sessionId.empty() || sourceDeviceId.empty() || targetDeviceId.empty() ||
1002 timestamp.empty()) {
1003 return "";
1004 }
1005 return bundleName + SEPERATOR + sessionId + SEPERATOR + sourceDeviceId + SEPERATOR + targetDeviceId + SEPERATOR +
1006 timestamp + SEPERATOR;
1007 }
1008
SplitEntryKey(const std::string & key)1009 std::vector<std::string> ObjectStoreManager::SplitEntryKey(const std::string &key)
1010 {
1011 std::smatch match;
1012 std::regex timeRegex(TIME_REGEX);
1013 if (!std::regex_search(key, match, timeRegex)) {
1014 ZLOGW("Format error, key.size = %{public}zu", key.size());
1015 return {};
1016 }
1017 auto timePos = match.position();
1018 std::string fromTime = key.substr(timePos + 1);
1019 std::string beforeTime = key.substr(0, timePos);
1020
1021 size_t targetDevicePos = beforeTime.find_last_of(SEPERATOR);
1022 if (targetDevicePos == std::string::npos) {
1023 ZLOGW("Format error, key.size = %{public}zu", key.size());
1024 return {};
1025 }
1026 std::string targetDevice = beforeTime.substr(targetDevicePos + 1);
1027 std::string beforeTargetDevice = beforeTime.substr(0, targetDevicePos);
1028
1029 size_t sourceDeviceUdidPos = beforeTargetDevice.find_last_of(SEPERATOR);
1030 if (sourceDeviceUdidPos == std::string::npos) {
1031 ZLOGW("Format error, key.size = %{public}zu", key.size());
1032 return {};
1033 }
1034 std::string sourceDeviceUdid = beforeTargetDevice.substr(sourceDeviceUdidPos + 1);
1035 std::string beforeSourceDeviceUdid = beforeTargetDevice.substr(0, sourceDeviceUdidPos);
1036
1037 size_t sessionIdPos = beforeSourceDeviceUdid.find_last_of(SEPERATOR);
1038 if (sessionIdPos == std::string::npos) {
1039 ZLOGW("Format error, key.size = %{public}zu", key.size());
1040 return {};
1041 }
1042 std::string sessionId = beforeSourceDeviceUdid.substr(sessionIdPos + 1);
1043 std::string bundleName = beforeSourceDeviceUdid.substr(0, sessionIdPos);
1044
1045 size_t propertyNamePos = fromTime.find_first_of(SEPERATOR);
1046 if (propertyNamePos == std::string::npos) {
1047 ZLOGW("Format error, key.size = %{public}zu", key.size());
1048 return {};
1049 }
1050 std::string propertyName = fromTime.substr(propertyNamePos + 1);
1051 std::string time = fromTime.substr(0, propertyNamePos);
1052
1053 return { bundleName, sessionId, sourceDeviceUdid, targetDevice, time, propertyName };
1054 }
1055
GetCurrentUser()1056 std::string ObjectStoreManager::GetCurrentUser()
1057 {
1058 std::vector<int> users;
1059 AccountDelegate::GetInstance()->QueryUsers(users);
1060 if (users.empty()) {
1061 return "";
1062 }
1063 return std::to_string(users[0]);
1064 }
1065
SaveUserToMeta()1066 void ObjectStoreManager::SaveUserToMeta()
1067 {
1068 ZLOGD("start.");
1069 std::string userId = GetCurrentUser();
1070 if (userId.empty()) {
1071 return;
1072 }
1073 std::string appId = DistributedData::Bootstrap::GetInstance().GetProcessLabel();
1074 StoreMetaData userMeta;
1075 userMeta.storeId = DistributedObject::ObjectCommon::OBJECTSTORE_DB_STOREID;
1076 userMeta.user = userId;
1077 userMeta.storeType = ObjectDistributedType::OBJECT_SINGLE_VERSION;
1078 std::string userMetaKey = GetMetaUserIdKey(userId, appId);
1079 auto saved = DistributedData::MetaDataManager::GetInstance().SaveMeta(userMetaKey, userMeta, true);
1080 if (!saved) {
1081 ZLOGE("userMeta save failed");
1082 }
1083 }
1084
CloseAfterMinute()1085 void ObjectStoreManager::CloseAfterMinute()
1086 {
1087 executors_->Schedule(std::chrono::minutes(INTERVAL), std::bind(&ObjectStoreManager::Close, this));
1088 }
1089
SetThreadPool(std::shared_ptr<ExecutorPool> executors)1090 void ObjectStoreManager::SetThreadPool(std::shared_ptr<ExecutorPool> executors)
1091 {
1092 executors_ = executors;
1093 }
1094
AddNotifier(const std::string & userId,SyncCallBack & callback)1095 uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBack &callback)
1096 {
1097 std::lock_guard<std::mutex> lock(notifierLock_);
1098 uint64_t sequenceId = KvStoreUtils::GenerateSequenceId();
1099 userIdSeqIdRelations_[userId].emplace_back(sequenceId);
1100 seqIdCallbackRelations_[sequenceId] = callback;
1101 return sequenceId;
1102 }
1103
Process(uint64_t sequenceId,const std::map<std::string,DistributedDB::DBStatus> & results,std::string & userId)1104 SequenceSyncManager::Result SequenceSyncManager::Process(
1105 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId)
1106 {
1107 std::lock_guard<std::mutex> lock(notifierLock_);
1108 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1109 ZLOGE("not exist");
1110 return ERR_SID_NOT_EXIST;
1111 }
1112 std::map<std::string, int32_t> syncResults;
1113 for (auto &item : results) {
1114 syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1;
1115 }
1116 seqIdCallbackRelations_[sequenceId](syncResults);
1117 ZLOGD("end complete");
1118 return DeleteNotifierNoLock(sequenceId, userId);
1119 }
1120
DeleteNotifier(uint64_t sequenceId,std::string & userId)1121 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifier(uint64_t sequenceId, std::string &userId)
1122 {
1123 std::lock_guard<std::mutex> lock(notifierLock_);
1124 if (seqIdCallbackRelations_.count(sequenceId) == 0) {
1125 ZLOGE("not exist");
1126 return ERR_SID_NOT_EXIST;
1127 }
1128 return DeleteNotifierNoLock(sequenceId, userId);
1129 }
1130
DeleteNotifierNoLock(uint64_t sequenceId,std::string & userId)1131 SequenceSyncManager::Result SequenceSyncManager::DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId)
1132 {
1133 seqIdCallbackRelations_.erase(sequenceId);
1134 auto userIdIter = userIdSeqIdRelations_.begin();
1135 while (userIdIter != userIdSeqIdRelations_.end()) {
1136 auto sIdIter = std::find(userIdIter->second.begin(), userIdIter->second.end(), sequenceId);
1137 if (sIdIter != userIdIter->second.end()) {
1138 userIdIter->second.erase(sIdIter);
1139 if (userIdIter->second.empty()) {
1140 ZLOGD("finished user callback, userId = %{public}s", userIdIter->first.c_str());
1141 userId = userIdIter->first;
1142 userIdSeqIdRelations_.erase(userIdIter);
1143 return SUCCESS_USER_HAS_FINISHED;
1144 } else {
1145 ZLOGD(" finished a callback but user not finished, userId = %{public}s", userIdIter->first.c_str());
1146 return SUCCESS_USER_IN_USE;
1147 }
1148 }
1149 userIdIter++;
1150 }
1151 return SUCCESS_USER_HAS_FINISHED;
1152 }
1153
BindAsset(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,ObjectStore::Asset & asset,ObjectStore::AssetBindInfo & bindInfo)1154 int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
1155 ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo)
1156 {
1157 auto snapshotKey = appId + SEPERATOR + sessionId;
1158 snapshots_.ComputeIfAbsent(
1159 snapshotKey, [](const std::string& key) -> auto {
1160 return std::make_shared<ObjectSnapshot>();
1161 });
1162 auto storeKey = appId + SEPERATOR + bindInfo.storeName;
1163 bindSnapshots_.ComputeIfAbsent(
1164 storeKey, [](const std::string& key) -> auto {
1165 return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1166 });
1167 auto snapshots = snapshots_.Find(snapshotKey).second;
1168 bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) {
1169 value->emplace(std::pair{asset.uri, snapshots});
1170 return true;
1171 });
1172
1173 HapTokenInfo tokenInfo;
1174 auto status = AccessTokenKit::GetHapTokenInfo(tokenId, tokenInfo);
1175 if (status != RET_SUCCESS) {
1176 ZLOGE("token:0x%{public}x, result:%{public}d, bundleName:%{public}s", tokenId, status, appId.c_str());
1177 return GeneralError::E_ERROR;
1178 }
1179 StoreInfo storeInfo;
1180 storeInfo.bundleName = appId;
1181 storeInfo.tokenId = tokenId;
1182 storeInfo.instanceId = tokenInfo.instIndex;
1183 storeInfo.user = tokenInfo.userID;
1184 storeInfo.storeName = bindInfo.storeName;
1185
1186 snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) {
1187 value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo);
1188 return true;
1189 });
1190 return OBJECT_SUCCESS;
1191 }
1192
ConvertBindInfo(ObjectStore::AssetBindInfo & bindInfo)1193 DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo)
1194 {
1195 return DistributedData::AssetBindInfo{
1196 .storeName = std::move(bindInfo.storeName),
1197 .tableName = std::move(bindInfo.tableName),
1198 .primaryKey = ValueProxy::Convert(std::move(bindInfo.primaryKey)),
1199 .field = std::move(bindInfo.field),
1200 .assetName = std::move(bindInfo.assetName),
1201 };
1202 }
1203
OnAssetChanged(const uint32_t tokenId,const std::string & appId,const std::string & sessionId,const std::string & deviceId,const ObjectStore::Asset & asset)1204 int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId,
1205 const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset)
1206 {
1207 const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId);
1208 auto objectAsset = asset;
1209 Asset dataAsset = ValueProxy::Convert(std::move(objectAsset));
1210 auto snapshotKey = appId + SEPERATOR + sessionId;
1211 int32_t res = OBJECT_SUCCESS;
1212 bool exist = snapshots_.ComputeIfPresent(snapshotKey,
1213 [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr<Snapshot> snapshot) {
1214 if (snapshot != nullptr) {
1215 res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange
1216 }
1217 return snapshot != nullptr;
1218 });
1219 if (exist) {
1220 return res;
1221 }
1222
1223 auto block = std::make_shared<BlockData<std::tuple<bool, bool>>>(WAIT_TIME, std::tuple{ true, true });
1224 ObjectAssetLoader::GetInstance()->TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) {
1225 block->SetValue({ false, ret });
1226 });
1227 auto [timeout, success] = block->GetValue();
1228 if (timeout || !success) {
1229 ZLOGE("transfer failed, timeout: %{public}d, success: %{public}d, name: %{public}s, deviceId: %{public}s ",
1230 timeout, success, asset.name.c_str(), DistributedData::Anonymous::Change(deviceId).c_str());
1231 return OBJECT_INNER_ERROR;
1232 }
1233 return OBJECT_SUCCESS;
1234 }
1235
GetSnapShots(const std::string & bundleName,const std::string & storeName)1236 ObjectStoreManager::UriToSnapshot ObjectStoreManager::GetSnapShots(const std::string& bundleName,
1237 const std::string& storeName)
1238 {
1239 auto storeKey = bundleName + SEPERATOR + storeName;
1240 bindSnapshots_.ComputeIfAbsent(
1241 storeKey, [](const std::string& key) -> auto {
1242 return std::make_shared<std::map<std::string, std::shared_ptr<Snapshot>>>();
1243 });
1244 return bindSnapshots_.Find(storeKey).second;
1245 }
1246
DeleteSnapshot(const std::string & bundleName,const std::string & sessionId)1247 void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std::string& sessionId)
1248 {
1249 auto snapshotKey = bundleName + SEPERATOR + sessionId;
1250 auto snapshot = snapshots_.Find(snapshotKey).second;
1251 if (snapshot == nullptr) {
1252 ZLOGD("Not find snapshot, don't need delete");
1253 return;
1254 }
1255 bindSnapshots_.ForEach([snapshot](auto& key, auto& value) {
1256 for (auto pos = value->begin(); pos != value->end();) {
1257 if (pos->second == snapshot) {
1258 pos = value->erase(pos);
1259 } else {
1260 ++pos;
1261 }
1262 }
1263 return true;
1264 });
1265 snapshots_.Erase(snapshotKey);
1266 }
1267 } // namespace DistributedObject
1268 } // namespace OHOS
1269