1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_ENGINE_H 17 #define SYNC_ENGINE_H 18 19 #include <map> 20 #include <mutex> 21 #include <queue> 22 23 #include "communicator_proxy.h" 24 #include "device_manager.h" 25 #include "isync_engine.h" 26 #include "isync_task_context.h" 27 #include "remote_executor.h" 28 #include "subscribe_manager.h" 29 #include "task_pool.h" 30 31 namespace DistributedDB { 32 33 class SyncEngine : public ISyncEngine { 34 public: 35 SyncEngine(); 36 ~SyncEngine() override; 37 38 // Do some init things 39 int Initialize(ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata, 40 const std::function<void(std::string)> &onRemoteDataChanged, 41 const std::function<void(std::string)> &offlineChanged, 42 const std::function<void(const InternalSyncParma ¶m)> &queryAutoSyncCallback) override; 43 44 // Do some things, when db close. 45 int Close() override; 46 47 // Alloc and Add sync SyncTarget 48 // return E_OK if operator success. 49 int AddSyncOperation(SyncOperation *operation) override; 50 51 // Clear the SyncTarget matched the syncId. 52 void RemoveSyncOperation(int syncId) override; 53 54 // notify other devices data has changed 55 void BroadCastDataChanged() const override; 56 57 // Get Online devices 58 void GetOnlineDevices(std::vector<std::string> &devices) const override; 59 60 // Register the device connect callback, this function must be called after Engine inited 61 void RegConnectCallback() override; 62 63 // Get the queue cache memory size 64 int GetQueueCacheSize() const; 65 66 // Get the number of message which is discarded 67 unsigned int GetDiscardMsgNum() const; 68 69 // Get the maximum of executing message number 70 unsigned int GetMaxExecNum() const; 71 72 // Set the maximum of queue cache memory size 73 void SetMaxQueueCacheSize(int value); 74 75 std::string GetLabel() const override; 76 77 bool GetSyncRetry() const; 78 void SetSyncRetry(bool isRetry) override; 79 80 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 81 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 82 83 void SetEqualIdentifier() override; 84 85 void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override; 86 87 void OfflineHandleByDevice(const std::string &deviceId); 88 89 void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 90 91 // subscribeQueries item is queryId 92 void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds); 93 94 void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries); 95 96 void PutUnfiniedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries); 97 98 void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries); 99 100 // used by SingleVerSyncer when db online 101 int StartAutoSubscribeTimer() override; 102 103 // used by SingleVerSyncer when remote/local db closed 104 void StopAutoSubscribeTimer() override; 105 106 int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override; 107 108 bool IsEngineActive() const override; 109 110 void SchemaChange() override; 111 112 void Dump(int fd) override; 113 114 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 115 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 116 117 void NotifyConnectionClosed(uint64_t connectionId) override; 118 119 void NotifyUserChange() override; 120 121 void AbortMachineIfNeed(uint32_t syncId) override; 122 123 protected: 124 // Create a context 125 virtual ISyncTaskContext *CreateSyncTaskContext() = 0; 126 127 // Find SyncTaskContext from the map 128 ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId); 129 ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId); 130 void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 131 void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam); 132 133 ISyncInterface *syncInterface_; 134 // Used to store all send sync task infos (such as pull sync response, and push sync request) 135 std::map<std::string, ISyncTaskContext *> syncTaskContextMap_; 136 std::mutex contextMapLock_; 137 std::shared_ptr<SubscribeManager> subManager_; 138 std::function<void(const InternalSyncParma ¶m)> queryAutoSyncCallback_; 139 140 private: 141 142 // Init DeviceManager set callback and remoteExecutor 143 int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged, 144 const std::function<void(std::string)> &offlineChanged); 145 146 ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode); 147 148 // Init Comunicator, register callbacks 149 int InitComunicator(const ISyncInterface *syncInterface); 150 151 // Add the sync task info to the map. 152 int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation); 153 154 // Sync Request CallbackTask run at a sub thread. 155 void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 156 157 void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg); 158 159 void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator); 160 161 // wrapper of MessageReciveCallbackTask 162 void MessageReciveCallback(const std::string &targetDev, Message *inMsg); 163 164 // Sync Request Callback 165 int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg); 166 167 // Exec the given SyncTarget. and callback onComplete. 168 int ExecSyncTask(ISyncTaskContext *context); 169 170 // Anti-DOS attack 171 void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize); 172 173 // Get message size 174 int GetMsgSize(const Message *inMsg) const; 175 176 // Do not run MessageReceiveCallbackTask until msgQueue is empty 177 int DealMsgUtilQueueEmpty(); 178 179 // Handle message in order. 180 int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg); 181 182 // Schedule Sync Task 183 void ScheduleSyncTask(ISyncTaskContext *context); 184 185 ISyncTaskContext *GetConextForMsg(const std::string &targetDev, int &errCode); 186 187 ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode); 188 189 void UnRegCommunicatorsCallback(); 190 191 void ReleaseCommunicators(); 192 193 bool IsSkipCalculateLen(const Message *inMsg); 194 195 void ClearInnerResource(); 196 197 void IncExecTaskCount(); 198 199 void DecExecTaskCount(); 200 201 RemoteExecutor *GetAndIncRemoteExector(); 202 203 void SetRemoteExector(RemoteExecutor *executor); 204 205 bool CheckDeviceIdValid(const std::string &deviceId, const std::string &localDeviceId); 206 207 int GetLocalDeviceId(std::string &deviceId); 208 209 void WaitingExecTaskExist(); 210 211 ICommunicator *communicator_; 212 DeviceManager *deviceManager_; 213 std::function<void(const std::string &)> onRemoteDataChanged_; 214 std::function<void(const std::string &)> offlineChanged_; 215 std::shared_ptr<Metadata> metadata_; 216 std::deque<Message *> msgQueue_; 217 uint32_t execTaskCount_; 218 std::string label_; 219 bool isSyncRetry_; 220 CommunicatorProxy *communicatorProxy_; 221 std::mutex equalCommunicatorsLock_; 222 std::map<std::string, ICommunicator *> equalCommunicators_; 223 224 static int queueCacheSize_; 225 static int maxQueueCacheSize_; 226 static unsigned int discardMsgNum_; 227 static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7 228 static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB 229 static std::mutex queueLock_; 230 std::atomic<bool> isActive_; 231 232 // key: device value: equalIdentifier 233 std::map<std::string, std::string> equalIdentifierMap_; 234 std::mutex execTaskCountLock_; 235 std::condition_variable execTaskCv_; 236 237 std::mutex remoteExecutorLock_; 238 RemoteExecutor *remoteExecutor_; 239 }; 240 } // namespace DistributedDB 241 242 #endif // SYNC_ENGINE_H 243