• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_ENGINE_H
17 #define SYNC_ENGINE_H
18 
19 #include <map>
20 #include <mutex>
21 #include <queue>
22 
23 #include "communicator_proxy.h"
24 #include "device_manager.h"
25 #include "isync_engine.h"
26 #include "isync_task_context.h"
27 #include "remote_executor.h"
28 #include "subscribe_manager.h"
29 #include "task_pool.h"
30 
31 namespace DistributedDB {
32 
33 class SyncEngine : public ISyncEngine {
34 public:
35     SyncEngine();
36     ~SyncEngine() override;
37 
38     // Do some init things
39     int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
40         const InitCallbackParam &callbackParam) override;
41 
42     // Do some things, when db close.
43     int Close() override;
44 
45     // Alloc and Add sync SyncTarget
46     // return E_OK if operator success.
47     int AddSyncOperation(SyncOperation *operation) override;
48 
49     // Clear the SyncTarget matched the syncId.
50     void RemoveSyncOperation(int syncId) override;
51 
52 #ifndef OMIT_MULTI_VER
53     // notify other devices data has changed
54     void BroadCastDataChanged() const override;
55 #endif
56 
57     // Get Online devices
58     void GetOnlineDevices(std::vector<std::string> &devices) const override;
59 
60     // Register the device connect callback, this function must be called after Engine inited
61     void StartCommunicator() override;
62 
63     // Get the queue cache memory size
64     int GetQueueCacheSize() const;
65 
66     // Set the queue cache memory size
67     void SetQueueCacheSize(int size);
68 
69     // Get the number of message which is discarded
70     unsigned int GetDiscardMsgNum() const;
71 
72     // Set the number of message which is discarded
73     void SetDiscardMsgNum(unsigned int num);
74 
75     // Get the maximum of executing message number
76     unsigned int GetMaxExecNum() const;
77 
78     // Get the maximum of queue cache memory size
79     int GetMaxQueueCacheSize() const;
80 
81     // Set the maximum of queue cache memory size
82     void SetMaxQueueCacheSize(int value);
83 
84     std::string GetLabel() const override;
85 
86     bool GetSyncRetry() const;
87     void SetSyncRetry(bool isRetry) override;
88 
89     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
90     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
91 
92     void SetEqualIdentifier() override;
93 
94     void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override;
95 
96     void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage);
97 
98     void ClearAllSyncTaskByDevice(const std::string &deviceId) override;
99 
100     void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
101 
102     // subscribeQueries item is queryId
103     void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds);
104 
105     void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
106 
107     void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries);
108 
109     void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries);
110 
111     // used by SingleVerSyncer when db online
112     int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override;
113 
114     // used by SingleVerSyncer when remote/local db closed
115     void StopAutoSubscribeTimer() override;
116 
117     int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override;
118 
119     bool IsEngineActive() const override;
120 
121     void SchemaChange() override;
122 
123     void Dump(int fd) override;
124 
125     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
126         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
127 
128     void NotifyConnectionClosed(uint64_t connectionId) override;
129 
130     void NotifyUserChange() override;
131 
132     void AbortMachineIfNeed(uint32_t syncId) override;
133 
134     void AddSubscribe(SyncGenericInterface *storage,
135         const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override;
136 
137     void TimeChange() override;
138 
139     int32_t GetResponseTaskCount() override;
140 protected:
141     // Create a context
142     virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0;
143 
144     // Find SyncTaskContext from the map
145     ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId);
146     ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId);
147     void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
148     void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
149 
150     void ClearSyncInterface();
151     ISyncInterface *GetAndIncSyncInterface();
152     void SetSyncInterface(ISyncInterface *syncInterface);
153 
154     ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode);
155 
156     std::mutex storageMutex_;
157     ISyncInterface *syncInterface_;
158     // Used to store all send sync task infos (such as pull sync response, and push sync request)
159     std::map<std::string, ISyncTaskContext *> syncTaskContextMap_;
160     std::mutex contextMapLock_;
161     std::shared_ptr<SubscribeManager> subManager_;
162     std::function<void(const InternalSyncParma &param)> queryAutoSyncCallback_;
163 
164 private:
165 
166     // Init DeviceManager set callback and remoteExecutor
167     int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
168         const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface);
169 
170     // Init Comunicator, register callbacks
171     int InitComunicator(const ISyncInterface *syncInterface);
172 
173     // Add the sync task info to the map.
174     int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation);
175 
176     // Sync Request CallbackTask run at a sub thread.
177     void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
178 
179     void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
180 
181     void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator);
182 
183     // wrapper of MessageReciveCallbackTask
184     void MessageReciveCallback(const std::string &targetDev, Message *inMsg);
185 
186     // Sync Request Callback
187     int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg);
188 
189     // Exec the given SyncTarget. and callback onComplete.
190     int ExecSyncTask(ISyncTaskContext *context);
191 
192     // Anti-DOS attack
193     void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize);
194 
195     // Get message size
196     int GetMsgSize(const Message *inMsg) const;
197 
198     // Do not run MessageReceiveCallbackTask until msgQueue is empty
199     int DealMsgUtilQueueEmpty();
200 
201     // Handle message in order.
202     int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
203 
204     ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode);
205 
206     ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = "");
207 
208     void UnRegCommunicatorsCallback();
209 
210     void ReleaseCommunicators();
211 
212     bool IsSkipCalculateLen(const Message *inMsg);
213 
214     void ClearInnerResource();
215 
216     void IncExecTaskCount();
217 
218     void DecExecTaskCount();
219 
220     RemoteExecutor *GetAndIncRemoteExector();
221 
222     void SetRemoteExector(RemoteExecutor *executor);
223 
224     bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId);
225 
226     int GetLocalDeviceId(std::string &deviceId);
227 
228     void WaitingExecTaskExist();
229 
230     int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg);
231 
232     void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query);
233 
234     std::string GetUserId();
235 
236     std::string GetUserId(const ISyncInterface *syncInterface);
237 
238     uint32_t GetTimeout(const std::string &dev);
239 
240     ICommunicator *communicator_;
241     DeviceManager *deviceManager_;
242     std::function<void(const std::string &)> onRemoteDataChanged_;
243     std::function<void(const std::string &)> offlineChanged_;
244     std::shared_ptr<Metadata> metadata_;
245     std::deque<Message *> msgQueue_;
246     volatile uint32_t execTaskCount_;
247     std::string label_;
248     volatile bool isSyncRetry_;
249     std::mutex communicatorProxyLock_;
250     CommunicatorProxy *communicatorProxy_;
251     std::mutex equalCommunicatorsLock_;
252     std::map<std::string, ICommunicator *> equalCommunicators_;
253 
254     static int queueCacheSize_;
255     static int maxQueueCacheSize_;
256     static unsigned int discardMsgNum_;
257     static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7
258     static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB
259     static std::mutex queueLock_;
260     std::atomic<bool> isActive_;
261 
262     // key: device value: equalIdentifier
263     std::map<std::string, std::string> equalIdentifierMap_;
264     std::mutex execTaskCountLock_;
265     std::condition_variable execTaskCv_;
266 
267     std::mutex remoteExecutorLock_;
268     RemoteExecutor *remoteExecutor_;
269 };
270 } // namespace DistributedDB
271 
272 #endif // SYNC_ENGINE_H
273