/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H #define DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H #include #include "concurrent_map.h" #include "device_manager_adapter.h" #include "object_callback.h" #include "object_callback_proxy.h" #include "kv_store_delegate_manager.h" #include "kvstore_sync_callback.h" #include "object_common.h" #include "object_data_listener.h" #include "types.h" #include "object_snapshot.h" #include "object_types.h" #include "value_proxy.h" namespace OHOS { namespace DistributedObject { using SyncCallBack = std::function &results)>; class SequenceSyncManager { public: enum Result { SUCCESS_USER_IN_USE, SUCCESS_USER_HAS_FINISHED, ERR_SID_NOT_EXIST }; static SequenceSyncManager *GetInstance() { static SequenceSyncManager sequenceSyncManager; return &sequenceSyncManager; } uint64_t AddNotifier(const std::string &userId, SyncCallBack &callback); Result DeleteNotifier(uint64_t sequenceId, std::string &userId); Result Process( uint64_t sequenceId, const std::map &results, std::string &userId); private: Result DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId); std::mutex notifierLock_; std::map> userIdSeqIdRelations_; std::map seqIdCallbackRelations_; }; class ObjectStoreManager { public: using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter; using UriToSnapshot = std::shared_ptr>>; ObjectStoreManager(); static ObjectStoreManager *GetInstance() { static ObjectStoreManager *manager = new ObjectStoreManager(); return manager; } int32_t Save(const std::string &appId, const std::string &sessionId, const std::map> &data, const std::string &deviceId, sptr callback); int32_t RevokeSave( const std::string &appId, const std::string &sessionId, sptr callback); int32_t Retrieve(const std::string &bundleName, const std::string &sessionId, sptr callback, uint32_t tokenId); void SetData(const std::string &dataDir, const std::string &userId); int32_t Clear(); int32_t DeleteByAppId(const std::string &appId); void RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, pid_t pid, uint32_t tokenId, sptr callback); void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId = ""); void NotifyChange(std::map> &changedData); void CloseAfterMinute(); int32_t Open(); void SetThreadPool(std::shared_ptr executors); UriToSnapshot GetSnapShots(const std::string &bundleName, const std::string &storeName); int32_t BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo); int32_t OnAssetChanged(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset); void DeleteSnapshot(const std::string &bundleName, const std::string &sessionId); private: constexpr static const char *SEPERATOR = "_"; constexpr static const char *LOCAL_DEVICE = "local"; constexpr static const char *USERID = "USERID"; constexpr static int8_t MAX_OBJECT_SIZE_PER_APP = 16; constexpr static int8_t DECIMAL_BASE = 10; constexpr static int8_t SOURCE_DEVICE_ID_INDEX = 2; struct CallbackInfo { pid_t pid; std::map> observers_; bool operator<(const CallbackInfo &it_) const { if (pid < it_.pid) { return true; } return false; } }; DistributedDB::KvStoreNbDelegate *OpenObjectKvStore(); void FlushClosedStore(); void Close(); int32_t SetSyncStatus(bool status); int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId, const std::map> &data); int32_t SyncOnStore(const std::string &prefix, const std::vector &deviceList, SyncCallBack &callback); int32_t RevokeSaveToStore(const std::string &prefix); int32_t RetrieveFromStore( const std::string &appId, const std::string &sessionId, std::map> &results); void SyncCompleted(const std::map &results, uint64_t sequenceId); void ProcessKeyByIndex(std::string &key, uint8_t index); std::string GetPropertyName(const std::string &key); std::string GetSessionId(const std::string &key); std::string GetBundleName(const std::string &key); std::string GetNetworkId(const std::string& key); int64_t GetTime(const std::string &key); void ProcessOldEntry(const std::string &appId); void ProcessSyncCallback(const std::map &results, const std::string &appId, const std::string &sessionId, const std::string &deviceId); void SaveUserToMeta(); std::string GetCurrentUser(); void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map>>& data); std::map> GetAssetsFromStore( const std::map>& changedData); static bool isAssetKey(const std::string& key); static bool isAssetComplete(const std::map>& result, const std::string& assetPrefix); Assets GetAssetsFromDBRecords(const std::map>& result); inline std::string GetPropertyPrefix(const std::string &appId, const std::string &sessionId) { return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid + SEPERATOR; }; inline std::string GetPropertyPrefix( const std::string &appId, const std::string &sessionId, const std::string &toDeviceId) { return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid + SEPERATOR + toDeviceId + SEPERATOR; }; inline std::string GetPrefixWithoutDeviceId(const std::string &appId, const std::string &sessionId) { return appId + SEPERATOR + sessionId + SEPERATOR; }; inline std::string GetMetaUserIdKey(const std::string &userId, const std::string &appId) { return std::string(USERID) + SEPERATOR + userId + SEPERATOR + appId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid; }; std::recursive_mutex kvStoreMutex_; std::mutex mutex_; DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager_ = nullptr; DistributedDB::KvStoreNbDelegate *delegate_ = nullptr; ObjectDataListener *objectDataListener_ = nullptr; uint32_t syncCount_ = 0; std::string userId_; std::atomic isSyncing_ = false; ConcurrentMap callbacks_; static constexpr size_t TIME_TASK_NUM = 1; static constexpr int64_t INTERVAL = 1; std::shared_ptr executors_; DistributedData::AssetBindInfo ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo); VBucket ConvertVBucket(ObjectStore::ValuesBucket &vBucket); ConcurrentMap> snapshots_; // key:bundleName_sessionId ConcurrentMap bindSnapshots_; // key:bundleName_storeName }; } // namespace DistributedObject } // namespace OHOS #endif // DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H