• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_ENGINE_H
17 #define SYNC_ENGINE_H
18 
19 #include <map>
20 #include <mutex>
21 #include <queue>
22 
23 #include "communicator_proxy.h"
24 #include "device_manager.h"
25 #include "isync_engine.h"
26 #include "isync_task_context.h"
27 #include "remote_executor.h"
28 #include "subscribe_manager.h"
29 #include "task_pool.h"
30 
31 namespace DistributedDB {
32 
33 class SyncEngine : public ISyncEngine {
34 public:
35     SyncEngine();
36     ~SyncEngine() override;
37 
38     // Do some init things
39     int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
40         const InitCallbackParam &callbackParam) override;
41 
42     // Do some things, when db close.
43     int Close() override;
44 
45     // Alloc and Add sync SyncTarget
46     // return E_OK if operator success.
47     int AddSyncOperation(SyncOperation *operation) override;
48 
49     // Clear the SyncTarget matched the syncId.
50     void RemoveSyncOperation(int syncId) override;
51 
52 #ifndef OMIT_MULTI_VER
53     // notify other devices data has changed
54     void BroadCastDataChanged() const override;
55 #endif
56 
57     // Get Online devices
58     void GetOnlineDevices(std::vector<std::string> &devices) const override;
59 
60     // Register the device connect callback, this function must be called after Engine inited
61     void StartCommunicator() override;
62 
63     // Get the queue cache memory size
64     int GetQueueCacheSize() const;
65 
66     // Set the queue cache memory size
67     void SetQueueCacheSize(int size);
68 
69     // Get the number of message which is discarded
70     unsigned int GetDiscardMsgNum() const;
71 
72     // Set the number of message which is discarded
73     void SetDiscardMsgNum(unsigned int num);
74 
75     // Get the maximum of executing message number
76     unsigned int GetMaxExecNum() const;
77 
78     // Get the maximum of queue cache memory size
79     int GetMaxQueueCacheSize() const;
80 
81     // Set the maximum of queue cache memory size
82     void SetMaxQueueCacheSize(int value);
83 
84     std::string GetLabel() const override;
85 
86     bool GetSyncRetry() const;
87     void SetSyncRetry(bool isRetry) override;
88 
89     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
90     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
91 
92     void SetEqualIdentifier() override;
93 
94     void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override;
95 
96     void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage);
97 
98     void ClearAllSyncTaskByDevice(const std::string &deviceId) override;
99 
100     void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
101 
102     // subscribeQueries item is queryId
103     void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds);
104 
105     void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
106 
107     void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries);
108 
109     void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries);
110 
111     // used by SingleVerSyncer when db online
112     int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override;
113 
114     // used by SingleVerSyncer when remote/local db closed
115     void StopAutoSubscribeTimer() override;
116 
117     int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override;
118 
119     bool IsEngineActive() const override;
120 
121     void SchemaChange() override;
122 
123     void Dump(int fd) override;
124 
125     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
126         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
127 
128     void NotifyConnectionClosed(uint64_t connectionId) override;
129 
130     void NotifyUserChange() override;
131 
132     void AbortMachineIfNeed(uint32_t syncId) override;
133 
134     void AddSubscribe(SyncGenericInterface *storage,
135         const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override;
136 
137     void TimeChange() override;
138 
139     int32_t GetResponseTaskCount() override;
140 
141     int32_t GetRemoteQueryTaskCount() override;
142 
143     bool ExchangeClosePending(bool expected) override;
144 protected:
145     // Create a context
146     virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0;
147 
148     // Find SyncTaskContext from the map
149     ISyncTaskContext *FindSyncTaskContext(const DeviceSyncTarget &target, bool isNeedCorrectUserId);
150     std::vector<ISyncTaskContext *> GetSyncTaskContextAndInc(const std::string &deviceId);
151     void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
152     void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
153 
154     void ClearSyncInterface();
155     ISyncInterface *GetAndIncSyncInterface();
156     void SetSyncInterface(ISyncInterface *syncInterface);
157 
158     ISyncTaskContext *GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode);
159 
160     std::mutex storageMutex_;
161     ISyncInterface *syncInterface_;
162     // Used to store all send sync task infos (such as pull sync response, and push sync request)
163     std::map<DeviceSyncTarget, ISyncTaskContext *> syncTaskContextMap_;
164     std::mutex contextMapLock_;
165     std::shared_ptr<SubscribeManager> subManager_;
166     std::function<void(const InternalSyncParma &param)> queryAutoSyncCallback_;
167 
168 private:
169 
170     // Init DeviceManager set callback and remoteExecutor
171     int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
172         const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface);
173 
174     // Init Comunicator, register callbacks
175     int InitComunicator(const ISyncInterface *syncInterface);
176 
177     // Add the sync task info to the map.
178     int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation);
179 
180     // Sync Request CallbackTask run at a sub thread.
181     void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
182 
183     void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
184 
185     void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator);
186 
187     // wrapper of MessageReciveCallbackTask
188     int MessageReciveCallback(const std::string &targetDev, Message *inMsg);
189 
190     // Sync Request Callback
191     int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg);
192 
193     // Exec the given SyncTarget. and callback onComplete.
194     int ExecSyncTask(ISyncTaskContext *context);
195 
196     // Anti-DOS attack
197     void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize);
198 
199     // Get message size
200     int GetMsgSize(const Message *inMsg) const;
201 
202     // Do not run MessageReceiveCallbackTask until msgQueue is empty
203     int DealMsgUtilQueueEmpty();
204 
205     // Handle message in order.
206     int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
207 
208     ISyncTaskContext *GetContextForMsg(const DeviceSyncTarget &target, int &errCode, bool isNeedCorrectUserId);
209 
210     ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = "");
211 
212     void UnRegCommunicatorsCallback();
213 
214     void ReleaseCommunicators();
215 
216     bool IsSkipCalculateLen(const Message *inMsg);
217 
218     void ClearInnerResource();
219 
220     void IncExecTaskCount();
221 
222     void DecExecTaskCount();
223 
224     uint32_t GetExecTaskCount();
225 
226     RemoteExecutor *GetAndIncRemoteExector();
227 
228     void SetRemoteExector(RemoteExecutor *executor);
229 
230     bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId);
231 
232     int GetLocalDeviceId(std::string &deviceId);
233 
234     void WaitingExecTaskExist();
235 
236     int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg);
237 
238     void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query);
239 
240     std::string GetUserId();
241 
242     std::string GetUserId(const ISyncInterface *syncInterface);
243 
244     uint32_t GetTimeout(const std::string &dev);
245 
246     int RegCallbackOnInitComunicator(ICommunicatorAggregator *communicatorAggregator,
247         const ISyncInterface *syncInterface);
248 
249     std::string GetTargetUserId(const std::string &dev);
250 
251     void CorrectTargetUserId(std::map<DeviceSyncTarget, ISyncTaskContext *>::iterator &it, bool isNeedCorrectUserId);
252 
253     ICommunicator *communicator_;
254     DeviceManager *deviceManager_;
255     std::function<void(const std::string &)> onRemoteDataChanged_;
256     std::function<void(const std::string &)> offlineChanged_;
257     std::shared_ptr<Metadata> metadata_;
258     std::deque<Message *> msgQueue_;
259     volatile uint32_t execTaskCount_;
260     std::string label_;
261     volatile bool isSyncRetry_;
262     std::mutex communicatorProxyLock_;
263     CommunicatorProxy *communicatorProxy_;
264     std::mutex equalCommunicatorsLock_;
265     std::map<std::string, ICommunicator *> equalCommunicators_;
266 
267     static int queueCacheSize_;
268     static int maxQueueCacheSize_;
269     static unsigned int discardMsgNum_;
270     static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7
271     static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB
272     static std::mutex queueLock_;
273     std::atomic<bool> isActive_;
274 
275     // key: device value: equalIdentifier
276     std::map<std::string, std::string> equalIdentifierMap_;
277     std::mutex execTaskCountLock_;
278     std::condition_variable execTaskCv_;
279 
280     std::mutex remoteExecutorLock_;
281     RemoteExecutor *remoteExecutor_;
282 };
283 } // namespace DistributedDB
284 
285 #endif // SYNC_ENGINE_H
286