1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 17 #define DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 18 19 #include <atomic> 20 21 #include "concurrent_map.h" 22 #include "device_manager_adapter.h" 23 #include "object_callback.h" 24 #include "object_callback_proxy.h" 25 #include "kv_store_delegate_manager.h" 26 #include "kvstore_sync_callback.h" 27 #include "object_common.h" 28 #include "object_data_listener.h" 29 #include "types.h" 30 31 namespace OHOS { 32 namespace DistributedObject { 33 using SyncCallBack = std::function<void(const std::map<std::string, int32_t> &results)>; 34 35 enum Status { 36 OBJECT_SUCCESS, 37 OBJECT_DBSTATUS_ERROR, 38 OBJECT_INNER_ERROR, 39 OBJECT_PERMISSION_DENIED, 40 OBJECT_STORE_NOT_FOUND 41 }; 42 43 class SequenceSyncManager { 44 public: 45 enum Result { 46 SUCCESS_USER_IN_USE, 47 SUCCESS_USER_HAS_FINISHED, 48 ERR_SID_NOT_EXIST 49 }; GetInstance()50 static SequenceSyncManager *GetInstance() 51 { 52 static SequenceSyncManager sequenceSyncManager; 53 return &sequenceSyncManager; 54 } 55 56 uint64_t AddNotifier(const std::string &userId, SyncCallBack &callback); 57 Result DeleteNotifier(uint64_t sequenceId, std::string &userId); 58 Result Process( 59 uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId); 60 61 private: 62 Result DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId); 63 std::mutex notifierLock_; 64 std::map<std::string, std::vector<uint64_t>> userIdSeqIdRelations_; 65 std::map<uint64_t, SyncCallBack> seqIdCallbackRelations_; 66 }; 67 68 class ObjectStoreManager { 69 public: 70 using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter; 71 ObjectStoreManager(); GetInstance()72 static ObjectStoreManager *GetInstance() 73 { 74 static ObjectStoreManager *manager = new ObjectStoreManager(); 75 return manager; 76 } 77 int32_t Save(const std::string &appId, const std::string &sessionId, 78 const std::map<std::string, std::vector<uint8_t>> &data, const std::string &deviceId, 79 sptr<IRemoteObject> callback); 80 int32_t RevokeSave( 81 const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback); 82 int32_t Retrieve(const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback); 83 void SetData(const std::string &dataDir, const std::string &userId); 84 int32_t Clear(); 85 int32_t DeleteByAppId(const std::string &appId); 86 void RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, 87 pid_t pid, uint32_t tokenId, 88 sptr<IRemoteObject> callback); 89 void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, 90 const std::string &sessionId = ""); 91 void NotifyChange(std::map<std::string, std::vector<uint8_t>> &changedData); 92 void CloseAfterMinute(); 93 int32_t Open(); 94 void SetThreadPool(std::shared_ptr<ExecutorPool> executors); 95 private: 96 constexpr static const char *SEPERATOR = "_"; 97 constexpr static const char *LOCAL_DEVICE = "local"; 98 constexpr static int8_t MAX_OBJECT_SIZE_PER_APP = 16; 99 constexpr static int8_t DECIMAL_BASE = 10; 100 struct CallbackInfo { 101 pid_t pid; 102 std::map<std::string, sptr<ObjectChangeCallbackProxy>> observers_; 103 bool operator<(const CallbackInfo &it_) const 104 { 105 if (pid < it_.pid) { 106 return true; 107 } 108 return false; 109 } 110 }; 111 DistributedDB::KvStoreNbDelegate *OpenObjectKvStore(); 112 void FlushClosedStore(); 113 void Close(); 114 int32_t SetSyncStatus(bool status); 115 int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId, 116 const std::map<std::string, std::vector<uint8_t>> &data); 117 int32_t SyncOnStore(const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback); 118 int32_t RevokeSaveToStore(const std::string &prefix); 119 int32_t RetrieveFromStore( 120 const std::string &appId, const std::string &sessionId, std::map<std::string, std::vector<uint8_t>> &results); 121 void SyncCompleted(const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId); 122 void ProcessKeyByIndex(std::string &key, uint8_t index); 123 std::string GetPropertyName(const std::string &key); 124 std::string GetSessionId(const std::string &key); 125 std::string GetBundleName(const std::string &key); 126 int64_t GetTime(const std::string &key); 127 void ProcessOldEntry(const std::string &appId); 128 void ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId, 129 const std::string &sessionId, const std::string &deviceId); GetPropertyPrefix(const std::string & appId,const std::string & sessionId)130 inline std::string GetPropertyPrefix(const std::string &appId, const std::string &sessionId) 131 { 132 return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid + SEPERATOR; 133 }; GetPropertyPrefix(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId)134 inline std::string GetPropertyPrefix( 135 const std::string &appId, const std::string &sessionId, const std::string &toDeviceId) 136 { 137 return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid 138 + SEPERATOR + toDeviceId + SEPERATOR; 139 }; GetPrefixWithoutDeviceId(const std::string & appId,const std::string & sessionId)140 inline std::string GetPrefixWithoutDeviceId(const std::string &appId, const std::string &sessionId) 141 { 142 return appId + SEPERATOR + sessionId + SEPERATOR; 143 }; 144 std::recursive_mutex kvStoreMutex_; 145 std::mutex mutex_; 146 DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager_ = nullptr; 147 DistributedDB::KvStoreNbDelegate *delegate_ = nullptr; 148 ObjectDataListener *objectDataListener_ = nullptr; 149 uint32_t syncCount_ = 0; 150 std::string userId_; 151 std::atomic<bool> isSyncing_ = false; 152 ConcurrentMap<uint32_t /* tokenId */, CallbackInfo > callbacks_; 153 static constexpr size_t TIME_TASK_NUM = 1; 154 static constexpr int64_t INTERVAL = 1; 155 std::shared_ptr<ExecutorPool> executors_; 156 }; 157 } // namespace DistributedObject 158 } // namespace OHOS 159 #endif // DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H 160