• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef GENRIC_SYNCER_H
17 #define GENRIC_SYNCER_H
18 
19 #include <functional>
20 #include <mutex>
21 #include <map>
22 
23 #include "isyncer.h"
24 #include "isync_engine.h"
25 #include "meta_data.h"
26 #include "sync_operation.h"
27 #include "time_helper.h"
28 
29 namespace DistributedDB {
30 class GenericSyncer : public virtual ISyncer {
31 using DataChangedFunc = std::function<void(const std::string &device)>;
32 
33 public:
34     GenericSyncer();
35     ~GenericSyncer() override;
36 
37     // Init the Syncer modules
38     int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override;
39 
40     // Close
41     int Close(bool isClosedOperation) override;
42 
43     // Sync function.
44     // param devices: The device id list.
45     // param mode: Sync mode, see SyncMode.
46     // param onComplete: The syncer finish callback. set by caller
47     // param onFinalize: will be callback when this Sync Operation finalized.
48     // return a Sync id. It will return a positive value if failed,
49     int Sync(const std::vector<std::string> &devices, int mode,
50         const std::function<void(const std::map<std::string, int> &)> &onComplete,
51         const std::function<void(void)> &onFinalize, bool wait) override;
52 
53     // Sync function. use SyncParam to reduce parameter.
54     int Sync(const SyncParam &param);
55 
56     int Sync(const SyncParam &param, uint64_t connectionId) override;
57 
58     // Cancel sync function.
59     int CancelSync(uint32_t syncId) override;
60 
61     // Remove the operation, with the given syncId, used to clean resource if sync finished or failed.
62     int RemoveSyncOperation(int syncId) override;
63 
64     int StopSync(uint64_t connectionId) override;
65 
66     // Get The current virtual timestamp
67     uint64_t GetTimestamp() override;
68 
69     // Get manual sync queue size
70     int GetQueuedSyncSize(int *queuedSyncSize) const override;
71 
72     // Set manual sync queue limit
73     int SetQueuedSyncLimit(const int *queuedSyncLimit) override;
74 
75     // Get manual sync queue limit
76     int GetQueuedSyncLimit(int *queuedSyncLimit) const override;
77 
78     // Disable add new manual sync, for rekey
79     int DisableManualSync(void) override;
80 
81     // Enable add new manual sync, for rekey
82     int EnableManualSync(void) override;
83 
84     // Get local deviceId, is hashed
85     int GetLocalIdentity(std::string &outTarget) const override;
86 
87     // Set Manual Sync retry config
88     int SetSyncRetry(bool isRetry) override;
89 
90     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
91     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
92 
93     // Inner function, Used for subscribe sync
94     int Sync(const InternalSyncParma &param);
95 
96     // Remote data changed callback
97     virtual void RemoteDataChanged(const std::string &device) = 0;
98 
99     virtual void RemoteDeviceOffline(const std::string &device) = 0;
100 
101     void Dump(int fd) override;
102 
103     SyncerBasicInfo DumpSyncerBasicInfo() override;
104 
105     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
106         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
107 
108     int GetSyncDataSize(const std::string &device, size_t &size) const override;
109 
110     int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const override;
111 
112     int GetWatermarkInfo(const std::string &device, WatermarkInfo &info) override;
113 
114     int UpgradeSchemaVerInMeta() override;
115 
116     void ResetSyncStatus() override;
117 
118     int64_t GetLocalTimeOffset() override;
119 
120     int32_t GetTaskCount() override;
121 
122     bool ExchangeClosePending(bool expected) override;
123 protected:
124 
125     // trigger query auto sync or auto subscribe
126     // trigger auto subscribe only when subscribe task is failed triggered by remote db opened
127     // it won't be triggered again when subscribe task success
128     virtual void QueryAutoSync(const InternalSyncParma &param);
129 
130     // Create a sync engine, if has memory error, will return nullptr.
131     virtual ISyncEngine *CreateSyncEngine() = 0;
132 
133     virtual int PrepareSync(const SyncParam &param, uint32_t syncId, uint64_t connectionId);
134 
135     // Add a Sync Operation, after call this function, the operation will be start
136     virtual void AddSyncOperation(ISyncEngine *engine, SyncOperation *operation);
137 
138     // Used to set to the SyncOperation Onkill
139     virtual void SyncOperationKillCallbackInner(int syncId);
140 
141     // Used to set to the SyncOperation Onkill
142     void SyncOperationKillCallback(int syncId);
143 
144     // Init the metadata
145     int InitMetaData(ISyncInterface *syncInterface);
146 
147     // Init the TimeHelper
148     int InitTimeHelper(ISyncInterface *syncInterface);
149 
150     // Init the Sync engine
151     virtual int InitSyncEngine(ISyncInterface *syncInterface);
152 
153     int CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive);
154 
155     // Used to general a sync id, maybe it is currentSyncId++;
156     // The return value is sync id.
157     uint32_t GenerateSyncId();
158 
159     // Check if the mode arg is valid
160     bool IsValidMode(int mode) const;
161 
162     virtual int SyncConditionCheck(const SyncParam &param, const ISyncEngine *engine, ISyncInterface *storage) const;
163 
164     // Check if the devices arg is valid
165     bool IsValidDevices(const std::vector<std::string> &devices) const;
166 
167     // Used Clear all SyncOperations.
168     // isClosedOperation is false while userChanged
169     void ClearSyncOperations(bool isClosedOperation);
170 
171     void ClearInnerResource(bool isClosedOperation);
172 
173     void TriggerSyncFinished(SyncOperation *operation);
174 
175     // Callback when the special sync finished.
176     void OnSyncFinished(int syncId);
177 
178     bool IsManualSync(int inMode) const;
179 
180     virtual int AddQueuedManualSyncSize(int mode, bool wait);
181 
182     bool IsQueuedManualSyncFull(int mode, bool wait) const;
183 
184     void SubQueuedSyncSize(void);
185 
186     void GetOnlineDevices(std::vector<std::string> &devices) const;
187 
188     std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const;
189 
190     void InitSyncOperation(SyncOperation *operation, const SyncParam &param);
191 
192     int StatusCheck() const;
193 
194     int SyncPreCheck(const SyncParam &param) const;
195 
196     int BuildSyncEngine();
197 
198     int InitTimeChangedListener();
199 
200     void ReleaseInnerResource();
201 
202     void RecordTimeChangeOffset(void *changedOffset);
203 
204     int CloseInner(bool isClosedOperation);
205 
206     int InitStorageResource(ISyncInterface *syncInterface);
207 
208     void ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage);
209 
210     static int SyncModuleInit();
211 
212     static int SyncResourceInit();
213 
214     static bool IsNeedActive(ISyncInterface *syncInterface);
215 
216     static const int MIN_VALID_SYNC_ID;
217     static std::mutex moduleInitLock_;
218 
219     // Used to general the next sync id.
220     static int currentSyncId_;
221     static std::mutex syncIdLock_;
222     // For sync in progress.
223     std::map<uint64_t, std::list<int>> connectionIdMap_;
224     std::map<int, uint64_t> syncIdMap_;
225 
226     ISyncEngine *syncEngine_;
227     ISyncInterface *syncInterface_;
228     std::shared_ptr<TimeHelper> timeHelper_;
229     std::shared_ptr<Metadata> metadata_;
230     bool initialized_;
231     std::mutex operationMapLock_;
232     std::map<int, SyncOperation *> syncOperationMap_;
233     int queuedManualSyncSize_;
234     int queuedManualSyncLimit_;
235     bool manualSyncEnable_;
236     bool closing_;
237     mutable std::mutex queuedManualSyncLock_;
238     mutable std::mutex syncerLock_;
239     std::string label_;
240     std::mutex engineMutex_;
241     bool engineFinalize_;
242     std::condition_variable engineFinalizeCv_;
243 
244     std::mutex timeChangeListenerMutex_;
245     bool timeChangeListenerFinalize_;
246     std::condition_variable timeChangeCv_;
247     NotificationChain::Listener *timeChangedListener_;
248 };
249 } // namespace DistributedDB
250 
251 #endif  // GENRIC_SYNCER_H
252