1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_ENGINE_H 17 #define SYNC_ENGINE_H 18 19 #include <map> 20 #include <mutex> 21 #include <queue> 22 23 #include "communicator_proxy.h" 24 #include "device_manager.h" 25 #include "isync_engine.h" 26 #include "isync_task_context.h" 27 #include "remote_executor.h" 28 #include "subscribe_manager.h" 29 #include "task_pool.h" 30 31 namespace DistributedDB { 32 33 class SyncEngine : public ISyncEngine { 34 public: 35 SyncEngine(); 36 ~SyncEngine() override; 37 38 // Do some init things 39 int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata, 40 const InitCallbackParam &callbackParam) override; 41 42 // Do some things, when db close. 43 int Close() override; 44 45 // Alloc and Add sync SyncTarget 46 // return E_OK if operator success. 47 int AddSyncOperation(SyncOperation *operation) override; 48 49 // Clear the SyncTarget matched the syncId. 50 void RemoveSyncOperation(int syncId) override; 51 52 #ifndef OMIT_MULTI_VER 53 // notify other devices data has changed 54 void BroadCastDataChanged() const override; 55 #endif 56 57 // Get Online devices 58 void GetOnlineDevices(std::vector<std::string> &devices) const override; 59 60 // Register the device connect callback, this function must be called after Engine inited 61 void StartCommunicator() override; 62 63 // Get the queue cache memory size 64 int GetQueueCacheSize() const; 65 66 // Set the queue cache memory size 67 void SetQueueCacheSize(int size); 68 69 // Get the number of message which is discarded 70 unsigned int GetDiscardMsgNum() const; 71 72 // Set the number of message which is discarded 73 void SetDiscardMsgNum(unsigned int num); 74 75 // Get the maximum of executing message number 76 unsigned int GetMaxExecNum() const; 77 78 // Get the maximum of queue cache memory size 79 int GetMaxQueueCacheSize() const; 80 81 // Set the maximum of queue cache memory size 82 void SetMaxQueueCacheSize(int value); 83 84 std::string GetLabel() const override; 85 86 bool GetSyncRetry() const; 87 void SetSyncRetry(bool isRetry) override; 88 89 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 90 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 91 92 void SetEqualIdentifier() override; 93 94 void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override; 95 96 void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage); 97 98 void ClearAllSyncTaskByDevice(const std::string &deviceId) override; 99 100 void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 101 102 // subscribeQueries item is queryId 103 void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds); 104 105 void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 106 107 void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries); 108 109 void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries); 110 111 // used by SingleVerSyncer when db online 112 int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override; 113 114 // used by SingleVerSyncer when remote/local db closed 115 void StopAutoSubscribeTimer() override; 116 117 int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override; 118 119 bool IsEngineActive() const override; 120 121 void SchemaChange() override; 122 123 void Dump(int fd) override; 124 125 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 126 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 127 128 void NotifyConnectionClosed(uint64_t connectionId) override; 129 130 void NotifyUserChange() override; 131 132 void AbortMachineIfNeed(uint32_t syncId) override; 133 134 void AddSubscribe(SyncGenericInterface *storage, 135 const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override; 136 137 void TimeChange() override; 138 139 int32_t GetResponseTaskCount() override; 140 141 int32_t GetRemoteQueryTaskCount() override; 142 143 bool ExchangeClosePending(bool expected) override; 144 protected: 145 // Create a context 146 virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; 147 148 // Find SyncTaskContext from the map 149 ISyncTaskContext *FindSyncTaskContext(const DeviceSyncTarget &target, bool isNeedCorrectUserId); 150 std::vector<ISyncTaskContext *> GetSyncTaskContextAndInc(const std::string &deviceId); 151 void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 152 void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 153 154 void ClearSyncInterface(); 155 ISyncInterface *GetAndIncSyncInterface(); 156 void SetSyncInterface(ISyncInterface *syncInterface); 157 158 ISyncTaskContext *GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode); 159 160 std::mutex storageMutex_; 161 ISyncInterface *syncInterface_; 162 // Used to store all send sync task infos (such as pull sync response, and push sync request) 163 std::map<DeviceSyncTarget, ISyncTaskContext *> syncTaskContextMap_; 164 std::mutex contextMapLock_; 165 std::shared_ptr<SubscribeManager> subManager_; 166 std::function<void(const InternalSyncParma ¶m)> queryAutoSyncCallback_; 167 168 private: 169 170 // Init DeviceManager set callback and remoteExecutor 171 int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged, 172 const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface); 173 174 // Init Comunicator, register callbacks 175 int InitComunicator(const ISyncInterface *syncInterface); 176 177 // Add the sync task info to the map. 178 int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation); 179 180 // Sync Request CallbackTask run at a sub thread. 181 void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 182 183 void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 184 185 void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator); 186 187 // wrapper of MessageReciveCallbackTask 188 int MessageReciveCallback(const std::string &targetDev, Message *inMsg); 189 190 // Sync Request Callback 191 int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg); 192 193 // Exec the given SyncTarget. and callback onComplete. 194 int ExecSyncTask(ISyncTaskContext *context); 195 196 // Anti-DOS attack 197 void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize); 198 199 // Get message size 200 int GetMsgSize(const Message *inMsg) const; 201 202 // Do not run MessageReceiveCallbackTask until msgQueue is empty 203 int DealMsgUtilQueueEmpty(); 204 205 // Handle message in order. 206 int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg); 207 208 ISyncTaskContext *GetContextForMsg(const DeviceSyncTarget &target, int &errCode, bool isNeedCorrectUserId); 209 210 ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = ""); 211 212 void UnRegCommunicatorsCallback(); 213 214 void ReleaseCommunicators(); 215 216 bool IsSkipCalculateLen(const Message *inMsg); 217 218 void ClearInnerResource(); 219 220 void IncExecTaskCount(); 221 222 void DecExecTaskCount(); 223 224 uint32_t GetExecTaskCount(); 225 226 RemoteExecutor *GetAndIncRemoteExector(); 227 228 void SetRemoteExector(RemoteExecutor *executor); 229 230 bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId); 231 232 int GetLocalDeviceId(std::string &deviceId); 233 234 void WaitingExecTaskExist(); 235 236 int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg); 237 238 void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query); 239 240 std::string GetUserId(); 241 242 std::string GetUserId(const ISyncInterface *syncInterface); 243 244 uint32_t GetTimeout(const std::string &dev); 245 246 int RegCallbackOnInitComunicator(ICommunicatorAggregator *communicatorAggregator, 247 const ISyncInterface *syncInterface); 248 249 std::string GetTargetUserId(const std::string &dev); 250 251 void CorrectTargetUserId(std::map<DeviceSyncTarget, ISyncTaskContext *>::iterator &it, bool isNeedCorrectUserId); 252 253 ICommunicator *communicator_; 254 DeviceManager *deviceManager_; 255 std::function<void(const std::string &)> onRemoteDataChanged_; 256 std::function<void(const std::string &)> offlineChanged_; 257 std::shared_ptr<Metadata> metadata_; 258 std::deque<Message *> msgQueue_; 259 volatile uint32_t execTaskCount_; 260 std::string label_; 261 volatile bool isSyncRetry_; 262 std::mutex communicatorProxyLock_; 263 CommunicatorProxy *communicatorProxy_; 264 std::mutex equalCommunicatorsLock_; 265 std::map<std::string, ICommunicator *> equalCommunicators_; 266 267 static int queueCacheSize_; 268 static int maxQueueCacheSize_; 269 static unsigned int discardMsgNum_; 270 static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7 271 static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB 272 static std::mutex queueLock_; 273 std::atomic<bool> isActive_; 274 275 // key: device value: equalIdentifier 276 std::map<std::string, std::string> equalIdentifierMap_; 277 std::mutex execTaskCountLock_; 278 std::condition_variable execTaskCv_; 279 280 std::mutex remoteExecutorLock_; 281 RemoteExecutor *remoteExecutor_; 282 }; 283 } // namespace DistributedDB 284 285 #endif // SYNC_ENGINE_H 286