• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_ENGINE_H
17 #define SYNC_ENGINE_H
18 
19 #include <map>
20 #include <mutex>
21 #include <queue>
22 
23 #include "communicator_proxy.h"
24 #include "device_manager.h"
25 #include "isync_engine.h"
26 #include "isync_task_context.h"
27 #include "remote_executor.h"
28 #include "subscribe_manager.h"
29 #include "task_pool.h"
30 
31 namespace DistributedDB {
32 
33 class SyncEngine : public ISyncEngine {
34 public:
35     SyncEngine();
36     ~SyncEngine() override;
37 
38     // Do some init things
39     int Initialize(ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata,
40         const std::function<void(std::string)> &onRemoteDataChanged,
41         const std::function<void(std::string)> &offlineChanged,
42         const std::function<void(const InternalSyncParma &param)> &queryAutoSyncCallback) override;
43 
44     // Do some things, when db close.
45     int Close() override;
46 
47     // Alloc and Add sync SyncTarget
48     // return E_OK if operator success.
49     int AddSyncOperation(SyncOperation *operation) override;
50 
51     // Clear the SyncTarget matched the syncId.
52     void RemoveSyncOperation(int syncId) override;
53 
54     // notify other devices data has changed
55     void BroadCastDataChanged() const override;
56 
57     // Get Online devices
58     void GetOnlineDevices(std::vector<std::string> &devices) const override;
59 
60     // Register the device connect callback, this function must be called after Engine inited
61     void RegConnectCallback() override;
62 
63     // Get the queue cache memory size
64     int GetQueueCacheSize() const;
65 
66     // Get the number of message which is discarded
67     unsigned int GetDiscardMsgNum() const;
68 
69     // Get the maximum of executing message number
70     unsigned int GetMaxExecNum() const;
71 
72     // Set the maximum of queue cache memory size
73     void SetMaxQueueCacheSize(int value);
74 
75     std::string GetLabel() const override;
76 
77     bool GetSyncRetry() const;
78     void SetSyncRetry(bool isRetry) override;
79 
80     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
81     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
82 
83     void SetEqualIdentifier() override;
84 
85     void SetEqualIdentifierMap(const std::string &identifier, const std::vector<std::string> &targets) override;
86 
87     void OfflineHandleByDevice(const std::string &deviceId);
88 
89     void GetLocalSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
90 
91     // subscribeQueries item is queryId
92     void GetRemoteSubscribeQueryIds(const std::string &device, std::vector<std::string> &subscribeQueryIds);
93 
94     void GetRemoteSubscribeQueries(const std::string &device, std::vector<QuerySyncObject> &subscribeQueries);
95 
96     void PutUnfiniedSubQueries(const std::string &device, const std::vector<QuerySyncObject> &subscribeQueries);
97 
98     void GetAllUnFinishSubQueries(std::map<std::string, std::vector<QuerySyncObject>> &allSyncQueries);
99 
100     // used by SingleVerSyncer when db online
101     int StartAutoSubscribeTimer() override;
102 
103     // used by SingleVerSyncer when remote/local db closed
104     void StopAutoSubscribeTimer() override;
105 
106     int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override;
107 
108     bool IsEngineActive() const override;
109 
110     void SchemaChange() override;
111 
112     void Dump(int fd) override;
113 
114     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
115         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
116 
117     void NotifyConnectionClosed(uint64_t connectionId) override;
118 
119     void NotifyUserChange() override;
120 
121     void AbortMachineIfNeed(uint32_t syncId) override;
122 
123 protected:
124     // Create a context
125     virtual ISyncTaskContext *CreateSyncTaskContext() = 0;
126 
127     // Find SyncTaskContext from the map
128     ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId);
129     ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId);
130     void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
131     void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
132 
133     ISyncInterface *syncInterface_;
134     // Used to store all send sync task infos (such as pull sync response, and push sync request)
135     std::map<std::string, ISyncTaskContext *> syncTaskContextMap_;
136     std::mutex contextMapLock_;
137     std::shared_ptr<SubscribeManager> subManager_;
138     std::function<void(const InternalSyncParma &param)> queryAutoSyncCallback_;
139 
140 private:
141 
142     // Init DeviceManager set callback and remoteExecutor
143     int InitInnerSource(const std::function<void(std::string)> &onRemoteDataChanged,
144         const std::function<void(std::string)> &offlineChanged);
145 
146     ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode);
147 
148     // Init Comunicator, register callbacks
149     int InitComunicator(const ISyncInterface *syncInterface);
150 
151     // Add the sync task info to the map.
152     int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation);
153 
154     // Sync Request CallbackTask run at a sub thread.
155     void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
156 
157     void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
158 
159     void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator);
160 
161     // wrapper of MessageReciveCallbackTask
162     void MessageReciveCallback(const std::string &targetDev, Message *inMsg);
163 
164     // Sync Request Callback
165     int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg);
166 
167     // Exec the given SyncTarget. and callback onComplete.
168     int ExecSyncTask(ISyncTaskContext *context);
169 
170     // Anti-DOS attack
171     void PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize);
172 
173     // Get message size
174     int GetMsgSize(const Message *inMsg) const;
175 
176     // Do not run MessageReceiveCallbackTask until msgQueue is empty
177     int DealMsgUtilQueueEmpty();
178 
179     // Handle message in order.
180     int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
181 
182     // Schedule Sync Task
183     void ScheduleSyncTask(ISyncTaskContext *context);
184 
185     ISyncTaskContext *GetConextForMsg(const std::string &targetDev, int &errCode);
186 
187     ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode);
188 
189     void UnRegCommunicatorsCallback();
190 
191     void ReleaseCommunicators();
192 
193     bool IsSkipCalculateLen(const Message *inMsg);
194 
195     void ClearInnerResource();
196 
197     void IncExecTaskCount();
198 
199     void DecExecTaskCount();
200 
201     RemoteExecutor *GetAndIncRemoteExector();
202 
203     void SetRemoteExector(RemoteExecutor *executor);
204 
205     bool CheckDeviceIdValid(const std::string &deviceId, const std::string &localDeviceId);
206 
207     int GetLocalDeviceId(std::string &deviceId);
208 
209     void WaitingExecTaskExist();
210 
211     ICommunicator *communicator_;
212     DeviceManager *deviceManager_;
213     std::function<void(const std::string &)> onRemoteDataChanged_;
214     std::function<void(const std::string &)> offlineChanged_;
215     std::shared_ptr<Metadata> metadata_;
216     std::deque<Message *> msgQueue_;
217     uint32_t execTaskCount_;
218     std::string label_;
219     bool isSyncRetry_;
220     CommunicatorProxy *communicatorProxy_;
221     std::mutex equalCommunicatorsLock_;
222     std::map<std::string, ICommunicator *> equalCommunicators_;
223 
224     static int queueCacheSize_;
225     static int maxQueueCacheSize_;
226     static unsigned int discardMsgNum_;
227     static const unsigned int MAX_EXEC_NUM = 7; // Set the maximum of threads as 6 < 7
228     static constexpr int DEFAULT_CACHE_SIZE = 160 * 1024 * 1024; // Initial the default cache size of queue as 160MB
229     static std::mutex queueLock_;
230     std::atomic<bool> isActive_;
231 
232     // key: device value: equalIdentifier
233     std::map<std::string, std::string> equalIdentifierMap_;
234     std::mutex execTaskCountLock_;
235     std::condition_variable execTaskCv_;
236 
237     std::mutex remoteExecutorLock_;
238     RemoteExecutor *remoteExecutor_;
239 };
240 } // namespace DistributedDB
241 
242 #endif // SYNC_ENGINE_H
243