1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 16 #define DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 17 18 #include <atomic> 19 20 #include "concurrent_map.h" 21 #include "device_manager_adapter.h" 22 #include "kv_store_delegate_manager.h" 23 #include "kvstore_sync_callback.h" 24 #include "metadata/store_meta_data.h" 25 #include "object_asset_loader.h" 26 #include "object_callback.h" 27 #include "object_callback_proxy.h" 28 #include "object_common.h" 29 #include "object_data_listener.h" 30 #include "object_snapshot.h" 31 #include "object_types.h" 32 #include "serializable/serializable.h" 33 #include "types.h" 34 #include "value_proxy.h" 35 namespace OHOS { 36 namespace DistributedObject { 37 using SyncCallBack = std::function<void(const std::map<std::string, int32_t> &results)>; 38 using ObjectRecord = std::map<std::string, std::vector<uint8_t>>; 39 class SequenceSyncManager { 40 public: 41 enum Result { 42 SUCCESS_USER_IN_USE, 43 SUCCESS_USER_HAS_FINISHED, 44 ERR_SID_NOT_EXIST 45 }; GetInstance()46 static SequenceSyncManager *GetInstance() 47 { 48 static SequenceSyncManager sequenceSyncManager; 49 return &sequenceSyncManager; 50 } 51 52 uint64_t AddNotifier(const std::string &userId, SyncCallBack &callback); 53 Result DeleteNotifier(uint64_t sequenceId, std::string &userId); 54 Result Process( 55 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId); 56 57 private: 58 Result DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId); 59 std::mutex notifierLock_; 60 std::map<std::string, std::vector<uint64_t>> userIdSeqIdRelations_; 61 std::map<uint64_t, SyncCallBack> seqIdCallbackRelations_; 62 }; 63 64 class ObjectStoreManager { 65 public: 66 using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter; 67 using UriToSnapshot = std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>>; 68 using StoreMetaData = OHOS::DistributedData::StoreMetaData; 69 70 enum RestoreStatus : int32_t { 71 NONE = 0, 72 DATA_READY, 73 DATA_NOTIFIED, 74 ASSETS_READY, 75 ALL_READY, 76 STATUS_BUTT 77 }; 78 79 ObjectStoreManager(); 80 ~ObjectStoreManager(); GetInstance()81 static ObjectStoreManager &GetInstance() 82 { 83 static ObjectStoreManager manager; 84 return manager; 85 } 86 int32_t Save(const std::string &appId, const std::string &sessionId, const ObjectRecord &data, 87 const std::string &deviceId, sptr<IRemoteObject> callback); 88 int32_t RevokeSave( 89 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback); 90 int32_t Retrieve(const std::string &bundleName, const std::string &sessionId, 91 sptr<IRemoteObject> callback, uint32_t tokenId); 92 void SetData(const std::string &dataDir, const std::string &userId); 93 int32_t Clear(); 94 int32_t InitUserMeta(); 95 int32_t DeleteByAppId(const std::string &appId, int32_t user); 96 void RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, 97 pid_t pid, uint32_t tokenId, sptr<IRemoteObject> callback); 98 void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, 99 const std::string &sessionId = ""); 100 void RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId, 101 pid_t pid, uint32_t tokenId, sptr<IRemoteObject> callback); 102 void UnregisterProgressObserverCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, 103 const std::string &sessionId = ""); 104 void NotifyChange(const ObjectRecord &changedData); 105 void NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName, 106 const std::string& srcNetworkId = ""); 107 void NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId = ""); 108 void NotifyAssetsRecvProgress(const std::string& objectKey, int32_t progress); 109 void SetThreadPool(std::shared_ptr<ExecutorPool> executors); 110 UriToSnapshot GetSnapShots(const std::string &bundleName, const std::string &storeName); 111 int32_t BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, 112 ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo); 113 int32_t OnAssetChanged(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, 114 const std::string& deviceId, const ObjectStore::Asset& asset); 115 void DeleteSnapshot(const std::string &bundleName, const std::string &sessionId); 116 int32_t AutoLaunchStore(); 117 bool UnRegisterAssetsLister(); 118 private: 119 constexpr static const char *SEPERATOR = "_"; 120 constexpr static const char *TIME_REGEX = "_\\d{10}_p_"; 121 constexpr static int BUNDLE_NAME_INDEX = 0; 122 constexpr static int SESSION_ID_INDEX = 1; 123 constexpr static int SOURCE_DEVICE_UDID_INDEX = 2; 124 constexpr static int TIME_INDEX = 4; 125 constexpr static int PROPERTY_NAME_INDEX = 5; 126 constexpr static const char *LOCAL_DEVICE = "local"; 127 constexpr static const char *USERID = "USERID"; 128 constexpr static int8_t MAX_OBJECT_SIZE_PER_APP = 16; 129 constexpr static int8_t DECIMAL_BASE = 10; 130 constexpr static int WAIT_TIME = 60; 131 static constexpr size_t TIME_TASK_NUM = 1; 132 static constexpr int64_t INTERVAL = 1; 133 struct CallbackInfo { 134 pid_t pid; 135 std::map<std::string, sptr<ObjectChangeCallbackProxy>> observers_; 136 bool operator<(const CallbackInfo &it_) const 137 { 138 return pid < it_.pid; 139 } 140 }; 141 struct ProgressCallbackInfo { 142 pid_t pid; 143 std::map<std::string, sptr<ObjectProgressCallbackProxy>> observers_; 144 bool operator<(const ProgressCallbackInfo &it_) const 145 { 146 return pid < it_.pid; 147 } 148 }; 149 struct SaveInfo : DistributedData::Serializable { 150 std::string bundleName; 151 std::string sessionId; 152 std::string sourceDeviceId; 153 std::string targetDeviceId; 154 std::string timestamp; 155 SaveInfo() = default; 156 SaveInfo(const std::string &bundleName, const std::string &sessionId, const std::string &sourceDeviceId, 157 const std::string &targetDeviceId, const std::string ×tamp); 158 bool Marshal(json &node) const override; 159 bool Unmarshal(const json &node) override; 160 std::string ToPropertyPrefix(); 161 }; 162 DistributedDB::KvStoreNbDelegate *OpenObjectKvStore(); 163 void FlushClosedStore(); 164 void Close(); 165 void ForceClose(); 166 int32_t SetSyncStatus(bool status); 167 int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId, 168 const ObjectRecord &data); 169 int32_t SyncOnStore(const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback); 170 int32_t RevokeSaveToStore(const std::string &prefix); 171 int32_t RetrieveFromStore(const std::string &appId, const std::string &sessionId, ObjectRecord &results); 172 void SyncCompleted(const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId); 173 std::vector<std::string> SplitEntryKey(const std::string &key); 174 void ProcessOldEntry(const std::string &appId); 175 void ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId, 176 const std::string &sessionId, const std::string &deviceId); 177 void SaveUserToMeta(); 178 bool IsNeedMetaSync(const StoreMetaData &meta, const std::vector<std::string> &networkIds); 179 int32_t DoSync(const std::string &prefix, const std::vector<std::string> &deviceList, uint64_t sequenceId); 180 std::string GetCurrentUser(); 181 void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map<std::string, ObjectRecord>& data, 182 bool allReady); 183 void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady); 184 void DoNotifyWaitAssetTimeout(const std::string &objectKey); 185 std::map<std::string, std::map<std::string, Assets>> GetAssetsFromStore(const ObjectRecord& changedData); 186 static bool IsAssetKey(const std::string& key); 187 static bool IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix); 188 Assets GetAssetsFromDBRecords(const ObjectRecord& result); 189 bool RegisterAssetsLister(); 190 void ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo, 191 const std::map<std::string, ObjectRecord>& data); 192 void NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo); 193 int32_t PushAssets(const std::string &srcBundleName, const std::string &dstBundleName, const std::string &sessionId, 194 const ObjectRecord &data, const std::string &deviceId); 195 int32_t WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo, 196 const std::map<std::string, ObjectRecord>& data); 197 void PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo); 198 std::map<std::string, ObjectRecord> GetObjectData(const ObjectRecord& changedData, SaveInfo& saveInfo, 199 bool& hasAsset); 200 void CloseAfterMinute(); 201 int32_t Open(); GetPropertyPrefix(const std::string & appId,const std::string & sessionId)202 inline std::string GetPropertyPrefix(const std::string &appId, const std::string &sessionId) 203 { 204 return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid + SEPERATOR; 205 }; GetPropertyPrefix(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId)206 inline std::string GetPropertyPrefix( 207 const std::string &appId, const std::string &sessionId, const std::string &toDeviceId) 208 { 209 return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid 210 + SEPERATOR + toDeviceId + SEPERATOR; 211 }; GetPrefixWithoutDeviceId(const std::string & appId,const std::string & sessionId)212 inline std::string GetPrefixWithoutDeviceId(const std::string &appId, const std::string &sessionId) 213 { 214 return appId + SEPERATOR + sessionId + SEPERATOR; 215 }; GetMetaUserIdKey(const std::string & userId,const std::string & appId)216 inline std::string GetMetaUserIdKey(const std::string &userId, const std::string &appId) 217 { 218 return std::string(USERID) + SEPERATOR + userId + SEPERATOR + appId + SEPERATOR 219 + DmAdaper::GetInstance().GetLocalDevice().udid; 220 }; 221 mutable std::shared_timed_mutex rwMutex_; 222 std::shared_ptr<DistributedDB::KvStoreDelegateManager> kvStoreDelegateManager_ = nullptr; 223 DistributedDB::KvStoreNbDelegate *delegate_ = nullptr; 224 ObjectDataListener objectDataListener_; 225 sptr<ObjectAssetsRecvListener> objectAssetsRecvListener_ = nullptr; 226 sptr<ObjectAssetsSendListener> objectAssetsSendListener_ = nullptr; 227 std::atomic<uint32_t> syncCount_{ 0 }; 228 std::string userId_; 229 std::atomic<bool> isSyncing_{ false }; 230 ConcurrentMap<uint32_t /* tokenId */, CallbackInfo > callbacks_; 231 ConcurrentMap<uint32_t /* tokenId */, ProgressCallbackInfo > processCallbacks_; 232 std::shared_ptr<ExecutorPool> executors_; 233 DistributedData::AssetBindInfo ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo); 234 ConcurrentMap<std::string, std::shared_ptr<Snapshot>> snapshots_; // key:bundleName_sessionId 235 ConcurrentMap<std::string, UriToSnapshot> bindSnapshots_; // key:bundleName_storeName 236 ConcurrentMap<std::string, RestoreStatus> restoreStatus_; // key:bundleName+sessionId 237 ConcurrentMap<std::string, ExecutorPool::TaskId> objectTimer_; // key:bundleName+sessionId 238 ConcurrentMap<std::string, uint64_t> assetsRecvProgress_; // key:bundleName+sessionId 239 std::map<std::string, int32_t> progressInfo_; 240 std::mutex progressMutex_; 241 }; 242 } // namespace DistributedObject 243 } // namespace OHOS 244 #endif // DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 245