1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_ENGINE_H 17 #define SYNC_ENGINE_H 18 19 #include <map> 20 #include <mutex> 21 #include <queue> 22 23 #include "communicator_proxy.h" 24 #include "device_manager.h" 25 #include "isync_engine.h" 26 #include "isync_task_context.h" 27 #include "remote_executor.h" 28 #include "subscribe_manager.h" 29 #include "task_pool.h" 30 31 namespace DistributedDB { 32 33 class SyncEngine : public ISyncEngine { 34 public: 35 SyncEngine(); 36 ~SyncEngine() override; 37 38 // Do some init things 39 int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata, 40 const InitCallbackParam &callbackParam) override; 41 42 // Do some things, when db close. 43 int Close() override; 44 45 // Alloc and Add sync SyncTarget 46 // return E_OK if operator success. 47 int AddSyncOperation(SyncOperation *operation) override; 48 49 // Clear the SyncTarget matched the syncId. 50 void RemoveSyncOperation(int syncId) override; 51 52 #ifndef OMIT_MULTI_VER 53 // notify other devices data has changed 54 void BroadCastDataChanged() const override; 55 #endif 56 57 // Get Online devices 58 void GetOnlineDevices(std::vector<std::string> &devices) const override; 59 60 // Register the device connect callback, this function must be called after Engine inited 61 void StartCommunicator() override; 62 63 // Get the queue cache memory size 64 int GetQueueCacheSize() const; 65 66 // Set the queue cache memory size 67 void SetQueueCacheSize(int size); 68 69 // Get the number of message which is discarded 70 unsigned int GetDiscardMsgNum() const; 71 72 // Set the number of message which is discarded 73 void SetDiscardMsgNum(unsigned int num); 74 75 // Get the maximum of executing message number 76 unsigned int GetMaxExecNum() const; 77 78 // Get the maximum of queue cache memory size 79 int GetMaxQueueCacheSize() const; 80 81 // Set the maximum of queue cache memory size 82 void SetMaxQueueCacheSize(int value); 83 84 std::string GetLabel() const override; 85 86 bool GetSyncRetry() const; 87 void SetSyncRetry(bool isRetry) override; 88 89 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 90 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 91 92 void SetEqualIdentifier() override; 93 94 void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override; 95 96 void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage); 97 98 void ClearAllSyncTaskByDevice(const std::string &deviceId) override; 99 100 void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 101 102 // subscribeQueries item is queryId 103 void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds); 104 105 void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 106 107 void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries); 108 109 void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries); 110 111 // used by SingleVerSyncer when db online 112 int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override; 113 114 // used by SingleVerSyncer when remote/local db closed 115 void StopAutoSubscribeTimer() override; 116 117 int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override; 118 119 bool IsEngineActive() const override; 120 121 void SchemaChange() override; 122 123 void Dump(int fd) override; 124 125 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 126 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 127 128 void NotifyConnectionClosed(uint64_t connectionId) override; 129 130 void NotifyUserChange() override; 131 132 void AbortMachineIfNeed(uint32_t syncId) override; 133 134 void AddSubscribe(SyncGenericInterface *storage, 135 const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override; 136 137 void TimeChange() override; 138 139 int32_t GetResponseTaskCount() override; 140 protected: 141 // Create a context 142 virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; 143 144 // Find SyncTaskContext from the map 145 ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId); 146 ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId); 147 void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 148 void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 149 150 void ClearSyncInterface(); 151 ISyncInterface *GetAndIncSyncInterface(); 152 void SetSyncInterface(ISyncInterface *syncInterface); 153 154 ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode); 155 156 std::mutex storageMutex_; 157 ISyncInterface *syncInterface_; 158 // Used to store all send sync task infos (such as pull sync response, and push sync request) 159 std::map<std::string, ISyncTaskContext *> syncTaskContextMap_; 160 std::mutex contextMapLock_; 161 std::shared_ptr<SubscribeManager> subManager_; 162 std::function<void(const InternalSyncParma ¶m)> queryAutoSyncCallback_; 163 164 private: 165 166 // Init DeviceManager set callback and remoteExecutor 167 int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged, 168 const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface); 169 170 // Init Comunicator, register callbacks 171 int InitComunicator(const ISyncInterface *syncInterface); 172 173 // Add the sync task info to the map. 174 int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation); 175 176 // Sync Request CallbackTask run at a sub thread. 177 void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 178 179 void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 180 181 void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator); 182 183 // wrapper of MessageReciveCallbackTask 184 void MessageReciveCallback(const std::string &targetDev, Message *inMsg); 185 186 // Sync Request Callback 187 int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg); 188 189 // Exec the given SyncTarget. and callback onComplete. 190 int ExecSyncTask(ISyncTaskContext *context); 191 192 // Anti-DOS attack 193 void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize); 194 195 // Get message size 196 int GetMsgSize(const Message *inMsg) const; 197 198 // Do not run MessageReceiveCallbackTask until msgQueue is empty 199 int DealMsgUtilQueueEmpty(); 200 201 // Handle message in order. 202 int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg); 203 204 ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode); 205 206 ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = ""); 207 208 void UnRegCommunicatorsCallback(); 209 210 void ReleaseCommunicators(); 211 212 bool IsSkipCalculateLen(const Message *inMsg); 213 214 void ClearInnerResource(); 215 216 void IncExecTaskCount(); 217 218 void DecExecTaskCount(); 219 220 RemoteExecutor *GetAndIncRemoteExector(); 221 222 void SetRemoteExector(RemoteExecutor *executor); 223 224 bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId); 225 226 int GetLocalDeviceId(std::string &deviceId); 227 228 void WaitingExecTaskExist(); 229 230 int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg); 231 232 void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query); 233 234 std::string GetUserId(); 235 236 std::string GetUserId(const ISyncInterface *syncInterface); 237 238 uint32_t GetTimeout(const std::string &dev); 239 240 ICommunicator *communicator_; 241 DeviceManager *deviceManager_; 242 std::function<void(const std::string &)> onRemoteDataChanged_; 243 std::function<void(const std::string &)> offlineChanged_; 244 std::shared_ptr<Metadata> metadata_; 245 std::deque<Message *> msgQueue_; 246 volatile uint32_t execTaskCount_; 247 std::string label_; 248 volatile bool isSyncRetry_; 249 std::mutex communicatorProxyLock_; 250 CommunicatorProxy *communicatorProxy_; 251 std::mutex equalCommunicatorsLock_; 252 std::map<std::string, ICommunicator *> equalCommunicators_; 253 254 static int queueCacheSize_; 255 static int maxQueueCacheSize_; 256 static unsigned int discardMsgNum_; 257 static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7 258 static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB 259 static std::mutex queueLock_; 260 std::atomic<bool> isActive_; 261 262 // key: device value: equalIdentifier 263 std::map<std::string, std::string> equalIdentifierMap_; 264 std::mutex execTaskCountLock_; 265 std::condition_variable execTaskCv_; 266 267 std::mutex remoteExecutorLock_; 268 RemoteExecutor *remoteExecutor_; 269 }; 270 } // namespace DistributedDB 271 272 #endif // SYNC_ENGINE_H 273