1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_ENGINE_H 17 #define SYNC_ENGINE_H 18 19 #include <map> 20 #include <mutex> 21 #include <queue> 22 23 #include "communicator_proxy.h" 24 #include "device_manager.h" 25 #include "isync_engine.h" 26 #include "isync_task_context.h" 27 #include "remote_executor.h" 28 #include "subscribe_manager.h" 29 #include "task_pool.h" 30 31 namespace DistributedDB { 32 33 class SyncEngine : public ISyncEngine { 34 public: 35 SyncEngine(); 36 ~SyncEngine() override; 37 38 // Do some init things 39 int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata, 40 const InitCallbackParam &callbackParam) override; 41 42 // Do some things, when db close. 43 int Close() override; 44 45 // Alloc and Add sync SyncTarget 46 // return E_OK if operator success. 47 int AddSyncOperation(SyncOperation *operation) override; 48 49 // Clear the SyncTarget matched the syncId. 50 void RemoveSyncOperation(int syncId) override; 51 52 #ifndef OMIT_MULTI_VER 53 // notify other devices data has changed 54 void BroadCastDataChanged() const override; 55 #endif 56 57 // Get Online devices 58 void GetOnlineDevices(std::vector<std::string> &devices) const override; 59 60 // Register the device connect callback, this function must be called after Engine inited 61 void StartCommunicator() override; 62 63 // Get the queue cache memory size 64 int GetQueueCacheSize() const; 65 66 // Get the number of message which is discarded 67 unsigned int GetDiscardMsgNum() const; 68 69 // Get the maximum of executing message number 70 unsigned int GetMaxExecNum() const; 71 72 // Set the maximum of queue cache memory size 73 void SetMaxQueueCacheSize(int value); 74 75 std::string GetLabel() const override; 76 77 bool GetSyncRetry() const; 78 void SetSyncRetry(bool isRetry) override; 79 80 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 81 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 82 83 void SetEqualIdentifier() override; 84 85 void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override; 86 87 void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage); 88 89 void ClearAllSyncTaskByDevice(const std::string &deviceId) override; 90 91 void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 92 93 // subscribeQueries item is queryId 94 void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds); 95 96 void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 97 98 void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries); 99 100 void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries); 101 102 // used by SingleVerSyncer when db online 103 int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override; 104 105 // used by SingleVerSyncer when remote/local db closed 106 void StopAutoSubscribeTimer() override; 107 108 int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override; 109 110 bool IsEngineActive() const override; 111 112 void SchemaChange() override; 113 114 void Dump(int fd) override; 115 116 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 117 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 118 119 void NotifyConnectionClosed(uint64_t connectionId) override; 120 121 void NotifyUserChange() override; 122 123 void AbortMachineIfNeed(uint32_t syncId) override; 124 125 void AddSubscribe(SyncGenericInterface *storage, 126 const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override; 127 128 void TimeChange() override; 129 130 int32_t GetResponseTaskCount() override; 131 protected: 132 // Create a context 133 virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; 134 135 // Find SyncTaskContext from the map 136 ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId); 137 ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId); 138 void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 139 void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 140 141 void ClearSyncInterface(); 142 ISyncInterface *GetAndIncSyncInterface(); 143 void SetSyncInterface(ISyncInterface *syncInterface); 144 145 ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode); 146 147 std::mutex storageMutex_; 148 ISyncInterface *syncInterface_; 149 // Used to store all send sync task infos (such as pull sync response, and push sync request) 150 std::map<std::string, ISyncTaskContext *> syncTaskContextMap_; 151 std::mutex contextMapLock_; 152 std::shared_ptr<SubscribeManager> subManager_; 153 std::function<void(const InternalSyncParma ¶m)> queryAutoSyncCallback_; 154 155 private: 156 157 // Init DeviceManager set callback and remoteExecutor 158 int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged, 159 const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface); 160 161 // Init Comunicator, register callbacks 162 int InitComunicator(const ISyncInterface *syncInterface); 163 164 // Add the sync task info to the map. 165 int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation); 166 167 // Sync Request CallbackTask run at a sub thread. 168 void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 169 170 void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 171 172 void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator); 173 174 // wrapper of MessageReciveCallbackTask 175 void MessageReciveCallback(const std::string &targetDev, Message *inMsg); 176 177 // Sync Request Callback 178 int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg); 179 180 // Exec the given SyncTarget. and callback onComplete. 181 int ExecSyncTask(ISyncTaskContext *context); 182 183 // Anti-DOS attack 184 void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize); 185 186 // Get message size 187 int GetMsgSize(const Message *inMsg) const; 188 189 // Do not run MessageReceiveCallbackTask until msgQueue is empty 190 int DealMsgUtilQueueEmpty(); 191 192 // Handle message in order. 193 int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg); 194 195 ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode); 196 197 ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode); 198 199 void UnRegCommunicatorsCallback(); 200 201 void ReleaseCommunicators(); 202 203 bool IsSkipCalculateLen(const Message *inMsg); 204 205 void ClearInnerResource(); 206 207 void IncExecTaskCount(); 208 209 void DecExecTaskCount(); 210 211 RemoteExecutor *GetAndIncRemoteExector(); 212 213 void SetRemoteExector(RemoteExecutor *executor); 214 215 bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId); 216 217 int GetLocalDeviceId(std::string &deviceId); 218 219 void WaitingExecTaskExist(); 220 221 int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg); 222 223 void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query); 224 225 ICommunicator *communicator_; 226 DeviceManager *deviceManager_; 227 std::function<void(const std::string &)> onRemoteDataChanged_; 228 std::function<void(const std::string &)> offlineChanged_; 229 std::shared_ptr<Metadata> metadata_; 230 std::deque<Message *> msgQueue_; 231 volatile uint32_t execTaskCount_; 232 std::string label_; 233 volatile bool isSyncRetry_; 234 std::mutex communicatorProxyLock_; 235 CommunicatorProxy *communicatorProxy_; 236 std::mutex equalCommunicatorsLock_; 237 std::map<std::string, ICommunicator *> equalCommunicators_; 238 239 static int queueCacheSize_; 240 static int maxQueueCacheSize_; 241 static unsigned int discardMsgNum_; 242 static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7 243 static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB 244 static std::mutex queueLock_; 245 std::atomic<bool> isActive_; 246 247 // key: device value: equalIdentifier 248 std::map<std::string, std::string> equalIdentifierMap_; 249 std::mutex execTaskCountLock_; 250 std::condition_variable execTaskCv_; 251 252 std::mutex remoteExecutorLock_; 253 RemoteExecutor *remoteExecutor_; 254 }; 255 } // namespace DistributedDB 256 257 #endif // SYNC_ENGINE_H 258