• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef GENRIC_SYNCER_H
17 #define GENRIC_SYNCER_H
18 
19 #include <functional>
20 #include <mutex>
21 #include <map>
22 
23 #include "isyncer.h"
24 #include "isync_engine.h"
25 #include "meta_data.h"
26 #include "sync_operation.h"
27 #include "time_helper.h"
28 
29 namespace DistributedDB {
30 class GenericSyncer : public virtual ISyncer {
31 using DataChangedFunc = std::function<void(const std::string &device)>;
32 
33 public:
34     GenericSyncer();
35     ~GenericSyncer() override;
36 
37     // Init the Syncer modules
38     int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override;
39 
40     // Close
41     int Close(bool isClosedOperation) override;
42 
43     // Sync function.
44     // param devices: The device id list.
45     // param mode: Sync mode, see SyncMode.
46     // param onComplete: The syncer finish callback. set by caller
47     // param onFinalize: will be callback when this Sync Operation finalized.
48     // return a Sync id. It will return a positive value if failed,
49     int Sync(const std::vector<std::string> &devices, int mode,
50         const std::function<void(const std::map<std::string, int> &)> &onComplete,
51         const std::function<void(void)> &onFinalize, bool wait) override;
52 
53     // Sync function. use SyncParma to reduce parameter.
54     int Sync(const SyncParma &param);
55 
56     int Sync(const SyncParma &param, uint64_t connectionId) override;
57 
58     // Cancel sync function.
59     int CancelSync(uint32_t syncId) override;
60 
61     // Remove the operation, with the given syncId, used to clean resource if sync finished or failed.
62     int RemoveSyncOperation(int syncId) override;
63 
64     int StopSync(uint64_t connectionId) override;
65 
66     // Get The current virtual timestamp
67     uint64_t GetTimestamp() override;
68 
69     // Get manual sync queue size
70     int GetQueuedSyncSize(int *queuedSyncSize) const override;
71 
72     // Set manual sync queue limit
73     int SetQueuedSyncLimit(const int *queuedSyncLimit) override;
74 
75     // Get manual sync queue limit
76     int GetQueuedSyncLimit(int *queuedSyncLimit) const override;
77 
78     // Disable add new manual sync, for rekey
79     int DisableManualSync(void) override;
80 
81     // Enable add new manual sync, for rekey
82     int EnableManualSync(void) override;
83 
84     // Get local deviceId, is hashed
85     int GetLocalIdentity(std::string &outTarget) const override;
86 
87     // Set Manual Sync retry config
88     int SetSyncRetry(bool isRetry) override;
89 
90     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
91     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
92 
93     // Inner function, Used for subscribe sync
94     int Sync(const InternalSyncParma &param);
95 
96     // Remote data changed callback
97     virtual void RemoteDataChanged(const std::string &device) = 0;
98 
99     virtual void RemoteDeviceOffline(const std::string &device) = 0;
100 
101     void Dump(int fd) override;
102 
103     SyncerBasicInfo DumpSyncerBasicInfo() override;
104 
105     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
106         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
107 
108     int GetSyncDataSize(const std::string &device, size_t &size) const override;
109 
110     int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const override;
111 
112     int GetWatermarkInfo(const std::string &device, WatermarkInfo &info) override;
113 
114     int UpgradeSchemaVerInMeta() override;
115 
116     void ResetSyncStatus() override;
117 
118     int64_t GetLocalTimeOffset() override;
119 
120     int32_t GetTaskCount() override;
121 protected:
122 
123     // trigger query auto sync or auto subscribe
124     // trigger auto subscribe only when subscribe task is failed triggered by remote db opened
125     // it won't be triggered again when subscribe task success
126     virtual void QueryAutoSync(const InternalSyncParma &param);
127 
128     // Create a sync engine, if has memory error, will return nullptr.
129     virtual ISyncEngine *CreateSyncEngine() = 0;
130 
131     virtual int PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId);
132 
133     // Add a Sync Operation, after call this function, the operation will be start
134     virtual void AddSyncOperation(ISyncEngine *engine, SyncOperation *operation);
135 
136     // Used to set to the SyncOperation Onkill
137     virtual void SyncOperationKillCallbackInner(int syncId);
138 
139     // Used to set to the SyncOperation Onkill
140     void SyncOperationKillCallback(int syncId);
141 
142     // Init the metadata
143     int InitMetaData(ISyncInterface *syncInterface);
144 
145     // Init the TimeHelper
146     int InitTimeHelper(ISyncInterface *syncInterface);
147 
148     // Init the Sync engine
149     virtual int InitSyncEngine(ISyncInterface *syncInterface);
150 
151     int CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive);
152 
153     // Used to general a sync id, maybe it is currentSyncId++;
154     // The return value is sync id.
155     uint32_t GenerateSyncId();
156 
157     // Check if the mode arg is valid
158     bool IsValidMode(int mode) const;
159 
160     virtual int SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const;
161 
162     // Check if the devices arg is valid
163     bool IsValidDevices(const std::vector<std::string> &devices) const;
164 
165     // Used Clear all SyncOperations.
166     // isClosedOperation is false while userChanged
167     void ClearSyncOperations(bool isClosedOperation);
168 
169     void ClearInnerResource(bool isClosedOperation);
170 
171     void TriggerSyncFinished(SyncOperation *operation);
172 
173     // Callback when the special sync finished.
174     void OnSyncFinished(int syncId);
175 
176     bool IsManualSync(int inMode) const;
177 
178     virtual int AddQueuedManualSyncSize(int mode, bool wait);
179 
180     bool IsQueuedManualSyncFull(int mode, bool wait) const;
181 
182     void SubQueuedSyncSize(void);
183 
184     void GetOnlineDevices(std::vector<std::string> &devices) const;
185 
186     std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const;
187 
188     void InitSyncOperation(SyncOperation *operation, const SyncParma &param);
189 
190     int StatusCheck() const;
191 
192     int SyncPreCheck(const SyncParma &param) const;
193 
194     int BuildSyncEngine();
195 
196     int InitTimeChangedListener();
197 
198     void ReleaseInnerResource();
199 
200     void RecordTimeChangeOffset(void *changedOffset);
201 
202     int CloseInner(bool isClosedOperation);
203 
204     int InitStorageResource(ISyncInterface *syncInterface);
205 
206     void ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage);
207 
208     static int SyncModuleInit();
209 
210     static int SyncResourceInit();
211 
212     static bool IsNeedActive(ISyncInterface *syncInterface);
213 
214     static const int MIN_VALID_SYNC_ID;
215     static std::mutex moduleInitLock_;
216 
217     // Used to general the next sync id.
218     static int currentSyncId_;
219     static std::mutex syncIdLock_;
220     // For sync in progress.
221     std::map<uint64_t, std::list<int>> connectionIdMap_;
222     std::map<int, uint64_t> syncIdMap_;
223 
224     ISyncEngine *syncEngine_;
225     ISyncInterface *syncInterface_;
226     std::shared_ptr<TimeHelper> timeHelper_;
227     std::shared_ptr<Metadata> metadata_;
228     bool initialized_;
229     std::mutex operationMapLock_;
230     std::map<int, SyncOperation *> syncOperationMap_;
231     int queuedManualSyncSize_;
232     int queuedManualSyncLimit_;
233     bool manualSyncEnable_;
234     bool closing_;
235     mutable std::mutex queuedManualSyncLock_;
236     mutable std::mutex syncerLock_;
237     std::string label_;
238     std::mutex engineMutex_;
239     bool engineFinalize_;
240     std::condition_variable engineFinalizeCv_;
241 
242     std::mutex timeChangeListenerMutex_;
243     bool timeChangeListenerFinalize_;
244     std::condition_variable timeChangeCv_;
245     NotificationChain::Listener *timeChangedListener_;
246 };
247 } // namespace DistributedDB
248 
249 #endif  // GENRIC_SYNCER_H
250