1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 17 #define DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 18 19 #include <atomic> 20 21 #include "concurrent_map.h" 22 #include "device_manager_adapter.h" 23 #include "object_callback.h" 24 #include "object_callback_proxy.h" 25 #include "kv_store_delegate_manager.h" 26 #include "kvstore_sync_callback.h" 27 #include "object_common.h" 28 #include "object_data_listener.h" 29 #include "types.h" 30 #include "object_snapshot.h" 31 #include "object_types.h" 32 #include "value_proxy.h" 33 34 namespace OHOS { 35 namespace DistributedObject { 36 using SyncCallBack = std::function<void(const std::map<std::string, int32_t> &results)>; 37 38 class SequenceSyncManager { 39 public: 40 enum Result { 41 SUCCESS_USER_IN_USE, 42 SUCCESS_USER_HAS_FINISHED, 43 ERR_SID_NOT_EXIST 44 }; GetInstance()45 static SequenceSyncManager *GetInstance() 46 { 47 static SequenceSyncManager sequenceSyncManager; 48 return &sequenceSyncManager; 49 } 50 51 uint64_t AddNotifier(const std::string &userId, SyncCallBack &callback); 52 Result DeleteNotifier(uint64_t sequenceId, std::string &userId); 53 Result Process( 54 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId); 55 56 private: 57 Result DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId); 58 std::mutex notifierLock_; 59 std::map<std::string, std::vector<uint64_t>> userIdSeqIdRelations_; 60 std::map<uint64_t, SyncCallBack> seqIdCallbackRelations_; 61 }; 62 63 class ObjectStoreManager { 64 public: 65 using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter; 66 using UriToSnapshot = std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>>; 67 ObjectStoreManager(); GetInstance()68 static ObjectStoreManager *GetInstance() 69 { 70 static ObjectStoreManager *manager = new ObjectStoreManager(); 71 return manager; 72 } 73 int32_t Save(const std::string &appId, const std::string &sessionId, 74 const std::map<std::string, std::vector<uint8_t>> &data, const std::string &deviceId, 75 sptr<IRemoteObject> callback); 76 int32_t RevokeSave( 77 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback); 78 int32_t Retrieve(const std::string &bundleName, const std::string &sessionId, 79 sptr<IRemoteObject> callback, uint32_t tokenId); 80 void SetData(const std::string &dataDir, const std::string &userId); 81 int32_t Clear(); 82 int32_t DeleteByAppId(const std::string &appId); 83 void RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, 84 pid_t pid, uint32_t tokenId, 85 sptr<IRemoteObject> callback); 86 void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, 87 const std::string &sessionId = ""); 88 void NotifyChange(std::map<std::string, std::vector<uint8_t>> &changedData); 89 void CloseAfterMinute(); 90 int32_t Open(); 91 void SetThreadPool(std::shared_ptr<ExecutorPool> executors); 92 UriToSnapshot GetSnapShots(const std::string &bundleName, const std::string &storeName); 93 int32_t BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, 94 ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo); 95 int32_t OnAssetChanged(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, 96 const std::string& deviceId, const ObjectStore::Asset& asset); 97 void DeleteSnapshot(const std::string &bundleName, const std::string &sessionId); 98 private: 99 constexpr static const char *SEPERATOR = "_"; 100 constexpr static const char *LOCAL_DEVICE = "local"; 101 constexpr static const char *USERID = "USERID"; 102 constexpr static int8_t MAX_OBJECT_SIZE_PER_APP = 16; 103 constexpr static int8_t DECIMAL_BASE = 10; 104 constexpr static int8_t SOURCE_DEVICE_ID_INDEX = 2; 105 struct CallbackInfo { 106 pid_t pid; 107 std::map<std::string, sptr<ObjectChangeCallbackProxy>> observers_; 108 bool operator<(const CallbackInfo &it_) const 109 { 110 if (pid < it_.pid) { 111 return true; 112 } 113 return false; 114 } 115 }; 116 DistributedDB::KvStoreNbDelegate *OpenObjectKvStore(); 117 void FlushClosedStore(); 118 void Close(); 119 int32_t SetSyncStatus(bool status); 120 int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId, 121 const std::map<std::string, std::vector<uint8_t>> &data); 122 int32_t SyncOnStore(const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback); 123 int32_t RevokeSaveToStore(const std::string &prefix); 124 int32_t RetrieveFromStore( 125 const std::string &appId, const std::string &sessionId, std::map<std::string, std::vector<uint8_t>> &results); 126 void SyncCompleted(const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId); 127 void ProcessKeyByIndex(std::string &key, uint8_t index); 128 std::string GetPropertyName(const std::string &key); 129 std::string GetSessionId(const std::string &key); 130 std::string GetBundleName(const std::string &key); 131 std::string GetNetworkId(const std::string& key); 132 int64_t GetTime(const std::string &key); 133 void ProcessOldEntry(const std::string &appId); 134 void ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId, 135 const std::string &sessionId, const std::string &deviceId); 136 void SaveUserToMeta(); 137 std::string GetCurrentUser(); 138 void DoNotify(uint32_t tokenId, const CallbackInfo& value, 139 const std::map<std::string, std::map<std::string, std::vector<uint8_t>>>& data); 140 std::map<std::string, std::map<std::string, Assets>> GetAssetsFromStore( 141 const std::map<std::string, std::vector<uint8_t>>& changedData); 142 static bool isAssetKey(const std::string& key); 143 static bool isAssetComplete(const std::map<std::string, std::vector<uint8_t>>& result, 144 const std::string& assetPrefix); 145 Assets GetAssetsFromDBRecords(const std::map<std::string, std::vector<uint8_t>>& result); GetPropertyPrefix(const std::string & appId,const std::string & sessionId)146 inline std::string GetPropertyPrefix(const std::string &appId, const std::string &sessionId) 147 { 148 return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid + SEPERATOR; 149 }; GetPropertyPrefix(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId)150 inline std::string GetPropertyPrefix( 151 const std::string &appId, const std::string &sessionId, const std::string &toDeviceId) 152 { 153 return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid 154 + SEPERATOR + toDeviceId + SEPERATOR; 155 }; GetPrefixWithoutDeviceId(const std::string & appId,const std::string & sessionId)156 inline std::string GetPrefixWithoutDeviceId(const std::string &appId, const std::string &sessionId) 157 { 158 return appId + SEPERATOR + sessionId + SEPERATOR; 159 }; GetMetaUserIdKey(const std::string & userId,const std::string & appId)160 inline std::string GetMetaUserIdKey(const std::string &userId, const std::string &appId) 161 { 162 return std::string(USERID) + SEPERATOR + userId + SEPERATOR + appId + SEPERATOR 163 + DmAdaper::GetInstance().GetLocalDevice().udid; 164 }; 165 std::recursive_mutex kvStoreMutex_; 166 std::mutex mutex_; 167 DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager_ = nullptr; 168 DistributedDB::KvStoreNbDelegate *delegate_ = nullptr; 169 ObjectDataListener *objectDataListener_ = nullptr; 170 uint32_t syncCount_ = 0; 171 std::string userId_; 172 std::atomic<bool> isSyncing_ = false; 173 ConcurrentMap<uint32_t /* tokenId */, CallbackInfo > callbacks_; 174 static constexpr size_t TIME_TASK_NUM = 1; 175 static constexpr int64_t INTERVAL = 1; 176 std::shared_ptr<ExecutorPool> executors_; 177 DistributedData::AssetBindInfo ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo); 178 VBucket ConvertVBucket(ObjectStore::ValuesBucket &vBucket); 179 ConcurrentMap<std::string, std::shared_ptr<Snapshot>> snapshots_; // key:bundleName_sessionId 180 ConcurrentMap<std::string, UriToSnapshot> bindSnapshots_; // key:bundleName_storeName 181 }; 182 } // namespace DistributedObject 183 } // namespace OHOS 184 #endif // DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 185