• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_ENGINE_H
17 #define SYNC_ENGINE_H
18 
19 #include <map>
20 #include <mutex>
21 #include <queue>
22 
23 #include "communicator_proxy.h"
24 #include "device_manager.h"
25 #include "isync_engine.h"
26 #include "isync_task_context.h"
27 #include "remote_executor.h"
28 #include "subscribe_manager.h"
29 #include "task_pool.h"
30 
31 namespace DistributedDB {
32 
33 class SyncEngine : public ISyncEngine {
34 public:
35     SyncEngine();
36     ~SyncEngine() override;
37 
38     // Do some init things
39     int Initialize(ISyncInterface *syncInterface, const std::shared_ptr<Metadata> &metadata,
40         const InitCallbackParam &callbackParam) override;
41 
42     // Do some things, when db close.
43     int Close() override;
44 
45     // Alloc and Add sync SyncTarget
46     // return E_OK if operator success.
47     int AddSyncOperation(SyncOperation *operation) override;
48 
49     // Clear the SyncTarget matched the syncId.
50     void RemoveSyncOperation(int syncId) override;
51 
52 #ifndef OMIT_MULTI_VER
53     // notify other devices data has changed
54     void BroadCastDataChanged() const override;
55 #endif
56 
57     // Get Online devices
58     void GetOnlineDevices(std::vector<std::string> &devices) const override;
59 
60     // Register the device connect callback, this function must be called after Engine inited
61     void StartCommunicator() override;
62 
63     // Get the queue cache memory size
64     int GetQueueCacheSize() const;
65 
66     // Get the number of message which is discarded
67     unsigned int GetDiscardMsgNum() const;
68 
69     // Get the maximum of executing message number
70     unsigned int GetMaxExecNum() const;
71 
72     // Set the maximum of queue cache memory size
73     void SetMaxQueueCacheSize(int value);
74 
75     std::string GetLabel() const override;
76 
77     bool GetSyncRetry() const;
78     void SetSyncRetry(bool isRetry) override;
79 
80     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
81     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
82 
83     void SetEqualIdentifier() override;
84 
85     void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override;
86 
87     void OfflineHandleByDevice(const std::string &deviceId, ISyncInterface *storage);
88 
89     void ClearAllSyncTaskByDevice(const std::string &deviceId) override;
90 
91     void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
92 
93     // subscribeQueries item is queryId
94     void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds);
95 
96     void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
97 
98     void PutUnfinishedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries);
99 
100     void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries);
101 
102     // used by SingleVerSyncer when db online
103     int StartAutoSubscribeTimer(const ISyncInterface &syncInterface) override;
104 
105     // used by SingleVerSyncer when remote/local db closed
106     void StopAutoSubscribeTimer() override;
107 
108     int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override;
109 
110     bool IsEngineActive() const override;
111 
112     void SchemaChange() override;
113 
114     void Dump(int fd) override;
115 
116     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
117         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
118 
119     void NotifyConnectionClosed(uint64_t connectionId) override;
120 
121     void NotifyUserChange() override;
122 
123     void AbortMachineIfNeed(uint32_t syncId) override;
124 
125     void AddSubscribe(SyncGenericInterface *storage,
126         const std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) override;
127 
128     void TimeChange() override;
129 
130     int32_t GetResponseTaskCount() override;
131 protected:
132     // Create a context
133     virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0;
134 
135     // Find SyncTaskContext from the map
136     ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId);
137     ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId);
138     void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
139     void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
140 
141     void ClearSyncInterface();
142     ISyncInterface *GetAndIncSyncInterface();
143     void SetSyncInterface(ISyncInterface *syncInterface);
144 
145     ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode);
146 
147     std::mutex storageMutex_;
148     ISyncInterface *syncInterface_;
149     // Used to store all send sync task infos (such as pull sync response, and push sync request)
150     std::map<std::string, ISyncTaskContext *> syncTaskContextMap_;
151     std::mutex contextMapLock_;
152     std::shared_ptr<SubscribeManager> subManager_;
153     std::function<void(const InternalSyncParma &param)> queryAutoSyncCallback_;
154 
155 private:
156 
157     // Init DeviceManager set callback and remoteExecutor
158     int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
159         const std::function<void(std::string)> &offlineChanged, ISyncInterface *syncInterface);
160 
161     // Init Comunicator, register callbacks
162     int InitComunicator(const ISyncInterface *syncInterface);
163 
164     // Add the sync task info to the map.
165     int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation);
166 
167     // Sync Request CallbackTask run at a sub thread.
168     void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
169 
170     void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
171 
172     void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator);
173 
174     // wrapper of MessageReciveCallbackTask
175     void MessageReciveCallback(const std::string &targetDev, Message *inMsg);
176 
177     // Sync Request Callback
178     int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg);
179 
180     // Exec the given SyncTarget. and callback onComplete.
181     int ExecSyncTask(ISyncTaskContext *context);
182 
183     // Anti-DOS attack
184     void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize);
185 
186     // Get message size
187     int GetMsgSize(const Message *inMsg) const;
188 
189     // Do not run MessageReceiveCallbackTask until msgQueue is empty
190     int DealMsgUtilQueueEmpty();
191 
192     // Handle message in order.
193     int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
194 
195     ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode);
196 
197     ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode);
198 
199     void UnRegCommunicatorsCallback();
200 
201     void ReleaseCommunicators();
202 
203     bool IsSkipCalculateLen(const Message *inMsg);
204 
205     void ClearInnerResource();
206 
207     void IncExecTaskCount();
208 
209     void DecExecTaskCount();
210 
211     RemoteExecutor *GetAndIncRemoteExector();
212 
213     void SetRemoteExector(RemoteExecutor *executor);
214 
215     bool CheckDeviceIdValid(const std::string &checkDeviceId, const std::string &localDeviceId);
216 
217     int GetLocalDeviceId(std::string &deviceId);
218 
219     void WaitingExecTaskExist();
220 
221     int HandleRemoteExecutorMsg(const std::string &targetDev, Message *inMsg);
222 
223     void AddQuerySubscribe(SyncGenericInterface *storage, const std::string &device, const QuerySyncObject &query);
224 
225     ICommunicator *communicator_;
226     DeviceManager *deviceManager_;
227     std::function<void(const std::string &)> onRemoteDataChanged_;
228     std::function<void(const std::string &)> offlineChanged_;
229     std::shared_ptr<Metadata> metadata_;
230     std::deque<Message *> msgQueue_;
231     volatile uint32_t execTaskCount_;
232     std::string label_;
233     volatile bool isSyncRetry_;
234     std::mutex communicatorProxyLock_;
235     CommunicatorProxy *communicatorProxy_;
236     std::mutex equalCommunicatorsLock_;
237     std::map<std::string, ICommunicator *> equalCommunicators_;
238 
239     static int queueCacheSize_;
240     static int maxQueueCacheSize_;
241     static unsigned int discardMsgNum_;
242     static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7
243     static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB
244     static std::mutex queueLock_;
245     std::atomic<bool> isActive_;
246 
247     // key: device value: equalIdentifier
248     std::map<std::string, std::string> equalIdentifierMap_;
249     std::mutex execTaskCountLock_;
250     std::condition_variable execTaskCv_;
251 
252     std::mutex remoteExecutorLock_;
253     RemoteExecutor *remoteExecutor_;
254 };
255 } // namespace DistributedDB
256 
257 #endif // SYNC_ENGINE_H
258