• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H
16 #define DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H
17 
18 #include <atomic>
19 
20 #include "concurrent_map.h"
21 #include "device_manager_adapter.h"
22 #include "kv_store_delegate_manager.h"
23 #include "kvstore_sync_callback.h"
24 #include "metadata/store_meta_data.h"
25 #include "object_asset_loader.h"
26 #include "object_callback.h"
27 #include "object_callback_proxy.h"
28 #include "object_common.h"
29 #include "object_data_listener.h"
30 #include "object_snapshot.h"
31 #include "object_types.h"
32 #include "serializable/serializable.h"
33 #include "types.h"
34 #include "value_proxy.h"
35 namespace OHOS {
36 namespace DistributedObject {
37 using SyncCallBack = std::function<void(const std::map<std::string, int32_t> &results)>;
38 using ObjectRecord = std::map<std::string, std::vector<uint8_t>>;
39 class SequenceSyncManager {
40 public:
41     enum Result {
42         SUCCESS_USER_IN_USE,
43         SUCCESS_USER_HAS_FINISHED,
44         ERR_SID_NOT_EXIST
45     };
GetInstance()46     static SequenceSyncManager *GetInstance()
47     {
48         static SequenceSyncManager sequenceSyncManager;
49         return &sequenceSyncManager;
50     }
51 
52     uint64_t AddNotifier(const std::string &userId, SyncCallBack &callback);
53     Result DeleteNotifier(uint64_t sequenceId, std::string &userId);
54     Result Process(
55         uint64_t sequenceId, const std::map<std::string, DistributedDB::DBStatus> &results, std::string &userId);
56 
57 private:
58     Result DeleteNotifierNoLock(uint64_t sequenceId, std::string &userId);
59     std::mutex notifierLock_;
60     std::map<std::string, std::vector<uint64_t>> userIdSeqIdRelations_;
61     std::map<uint64_t, SyncCallBack> seqIdCallbackRelations_;
62 };
63 
64 class ObjectStoreManager {
65 public:
66     using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter;
67     using UriToSnapshot = std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>>;
68     using StoreMetaData = OHOS::DistributedData::StoreMetaData;
69 
70     enum RestoreStatus : int32_t {
71         NONE = 0,
72         DATA_READY,
73         DATA_NOTIFIED,
74         ASSETS_READY,
75         ALL_READY,
76         STATUS_BUTT
77     };
78 
79     ObjectStoreManager();
80     ~ObjectStoreManager();
GetInstance()81     static ObjectStoreManager &GetInstance()
82     {
83         static ObjectStoreManager manager;
84         return manager;
85     }
86     int32_t Save(const std::string &appId, const std::string &sessionId, const ObjectRecord &data,
87         const std::string &deviceId, sptr<IRemoteObject> callback);
88     int32_t RevokeSave(
89         const std::string &appId, const std::string &sessionId, sptr<IRemoteObject> callback);
90     int32_t Retrieve(const std::string &bundleName, const std::string &sessionId,
91         sptr<IRemoteObject> callback, uint32_t tokenId);
92     void SetData(const std::string &dataDir, const std::string &userId);
93     int32_t Clear();
94     int32_t InitUserMeta();
95     int32_t DeleteByAppId(const std::string &appId, int32_t user);
96     void RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId,
97         pid_t pid, uint32_t tokenId, sptr<IRemoteObject> callback);
98     void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
99         const std::string &sessionId = "");
100     void RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId,
101         pid_t pid, uint32_t tokenId, sptr<IRemoteObject> callback);
102     void UnregisterProgressObserverCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId,
103         const std::string &sessionId = "");
104     void NotifyChange(const ObjectRecord &changedData);
105     void NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName,
106         const std::string& srcNetworkId = "");
107     void NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId = "");
108     void NotifyAssetsRecvProgress(const std::string& objectKey, int32_t progress);
109     void SetThreadPool(std::shared_ptr<ExecutorPool> executors);
110     UriToSnapshot GetSnapShots(const std::string &bundleName, const std::string &storeName);
111     int32_t BindAsset(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
112         ObjectStore::Asset& asset, ObjectStore::AssetBindInfo& bindInfo);
113     int32_t OnAssetChanged(const uint32_t tokenId, const std::string& appId, const std::string& sessionId,
114          const std::string& deviceId, const ObjectStore::Asset& asset);
115     void DeleteSnapshot(const std::string &bundleName, const std::string &sessionId);
116     int32_t AutoLaunchStore();
117     bool UnRegisterAssetsLister();
118 private:
119     constexpr static const char *SEPERATOR = "_";
120     constexpr static const char *TIME_REGEX = "_\\d{10}_p_";
121     constexpr static int BUNDLE_NAME_INDEX = 0;
122     constexpr static int SESSION_ID_INDEX = 1;
123     constexpr static int SOURCE_DEVICE_UDID_INDEX = 2;
124     constexpr static int TIME_INDEX = 4;
125     constexpr static int PROPERTY_NAME_INDEX = 5;
126     constexpr static const char *LOCAL_DEVICE = "local";
127     constexpr static const char *USERID = "USERID";
128     constexpr static int8_t MAX_OBJECT_SIZE_PER_APP = 16;
129     constexpr static int8_t DECIMAL_BASE = 10;
130     constexpr static int WAIT_TIME = 60;
131     static constexpr size_t TIME_TASK_NUM = 1;
132     static constexpr int64_t INTERVAL = 1;
133     struct CallbackInfo {
134         pid_t pid;
135         std::map<std::string, sptr<ObjectChangeCallbackProxy>> observers_;
136         bool operator<(const CallbackInfo &it_) const
137         {
138             return pid < it_.pid;
139         }
140     };
141     struct ProgressCallbackInfo {
142         pid_t pid;
143         std::map<std::string, sptr<ObjectProgressCallbackProxy>> observers_;
144         bool operator<(const ProgressCallbackInfo &it_) const
145         {
146             return pid < it_.pid;
147         }
148     };
149     struct SaveInfo : DistributedData::Serializable {
150         std::string bundleName;
151         std::string sessionId;
152         std::string sourceDeviceId;
153         std::string targetDeviceId;
154         std::string timestamp;
155         SaveInfo() = default;
156         SaveInfo(const std::string &bundleName, const std::string &sessionId, const std::string &sourceDeviceId,
157             const std::string &targetDeviceId, const std::string &timestamp);
158         bool Marshal(json &node) const override;
159         bool Unmarshal(const json &node) override;
160         std::string ToPropertyPrefix();
161     };
162     DistributedDB::KvStoreNbDelegate *OpenObjectKvStore();
163     void FlushClosedStore();
164     void Close();
165     void ForceClose();
166     int32_t SetSyncStatus(bool status);
167     int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId,
168         const ObjectRecord &data);
169     int32_t SyncOnStore(const std::string &prefix, const std::vector<std::string> &deviceList, SyncCallBack &callback);
170     int32_t RevokeSaveToStore(const std::string &prefix);
171     int32_t RetrieveFromStore(const std::string &appId, const std::string &sessionId, ObjectRecord &results);
172     void SyncCompleted(const std::map<std::string, DistributedDB::DBStatus> &results, uint64_t sequenceId);
173     std::vector<std::string> SplitEntryKey(const std::string &key);
174     void ProcessOldEntry(const std::string &appId);
175     void ProcessSyncCallback(const std::map<std::string, int32_t> &results, const std::string &appId,
176         const std::string &sessionId, const std::string &deviceId);
177     void SaveUserToMeta();
178     bool IsNeedMetaSync(const StoreMetaData &meta, const std::vector<std::string> &networkIds);
179     int32_t DoSync(const std::string &prefix, const std::vector<std::string> &deviceList, uint64_t sequenceId);
180     std::string GetCurrentUser();
181     void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map<std::string, ObjectRecord>& data,
182         bool allReady);
183     void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady);
184     void DoNotifyWaitAssetTimeout(const std::string &objectKey);
185     std::map<std::string, std::map<std::string, Assets>> GetAssetsFromStore(const ObjectRecord& changedData);
186     static bool IsAssetKey(const std::string& key);
187     static bool IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix);
188     Assets GetAssetsFromDBRecords(const ObjectRecord& result);
189     bool RegisterAssetsLister();
190     void ComputeStatus(const std::string& objectKey, const SaveInfo& saveInfo,
191         const std::map<std::string, ObjectRecord>& data);
192     void NotifyDataChanged(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo);
193     int32_t PushAssets(const std::string &srcBundleName, const std::string &dstBundleName, const std::string &sessionId,
194         const ObjectRecord &data, const std::string &deviceId);
195     int32_t WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo,
196         const std::map<std::string, ObjectRecord>& data);
197     void PullAssets(const std::map<std::string, ObjectRecord>& data, const SaveInfo& saveInfo);
198     std::map<std::string, ObjectRecord> GetObjectData(const ObjectRecord& changedData, SaveInfo& saveInfo,
199         bool& hasAsset);
200     void CloseAfterMinute();
201     int32_t Open();
GetPropertyPrefix(const std::string & appId,const std::string & sessionId)202     inline std::string GetPropertyPrefix(const std::string &appId, const std::string &sessionId)
203     {
204         return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid + SEPERATOR;
205     };
GetPropertyPrefix(const std::string & appId,const std::string & sessionId,const std::string & toDeviceId)206     inline std::string GetPropertyPrefix(
207         const std::string &appId, const std::string &sessionId, const std::string &toDeviceId)
208     {
209         return appId + SEPERATOR + sessionId + SEPERATOR + DmAdaper::GetInstance().GetLocalDevice().udid
210              + SEPERATOR + toDeviceId + SEPERATOR;
211     };
GetPrefixWithoutDeviceId(const std::string & appId,const std::string & sessionId)212     inline std::string GetPrefixWithoutDeviceId(const std::string &appId, const std::string &sessionId)
213     {
214         return appId + SEPERATOR + sessionId + SEPERATOR;
215     };
GetMetaUserIdKey(const std::string & userId,const std::string & appId)216     inline std::string GetMetaUserIdKey(const std::string &userId, const std::string &appId)
217     {
218         return std::string(USERID) + SEPERATOR + userId + SEPERATOR + appId + SEPERATOR
219              + DmAdaper::GetInstance().GetLocalDevice().udid;
220     };
221     mutable std::shared_timed_mutex rwMutex_;
222     std::shared_ptr<DistributedDB::KvStoreDelegateManager> kvStoreDelegateManager_ = nullptr;
223     DistributedDB::KvStoreNbDelegate *delegate_ = nullptr;
224     ObjectDataListener objectDataListener_;
225     sptr<ObjectAssetsRecvListener> objectAssetsRecvListener_ = nullptr;
226     sptr<ObjectAssetsSendListener> objectAssetsSendListener_ = nullptr;
227     std::atomic<uint32_t> syncCount_{ 0 };
228     std::string userId_;
229     std::atomic<bool> isSyncing_{ false };
230     ConcurrentMap<uint32_t /* tokenId */, CallbackInfo > callbacks_;
231     ConcurrentMap<uint32_t /* tokenId */, ProgressCallbackInfo > processCallbacks_;
232     std::shared_ptr<ExecutorPool> executors_;
233     DistributedData::AssetBindInfo ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo);
234     ConcurrentMap<std::string, std::shared_ptr<Snapshot>> snapshots_; // key:bundleName_sessionId
235     ConcurrentMap<std::string, UriToSnapshot> bindSnapshots_; // key:bundleName_storeName
236     ConcurrentMap<std::string, RestoreStatus> restoreStatus_; // key:bundleName+sessionId
237     ConcurrentMap<std::string, ExecutorPool::TaskId> objectTimer_; // key:bundleName+sessionId
238     ConcurrentMap<std::string, uint64_t> assetsRecvProgress_; // key:bundleName+sessionId
239     std::map<std::string, int32_t> progressInfo_;
240     std::mutex progressMutex_;
241 };
242 } // namespace DistributedObject
243 } // namespace OHOS
244 #endif // DISTRIBUTEDDATAMGR_OBJECT_MANAGER_H
245