• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef GENRIC_SYNCER_H
17 #define GENRIC_SYNCER_H
18 
19 #include <functional>
20 #include <mutex>
21 #include <map>
22 
23 #include "isyncer.h"
24 #include "isync_engine.h"
25 #include "meta_data.h"
26 #include "sync_operation.h"
27 #include "time_helper.h"
28 
29 namespace DistributedDB {
30 class GenericSyncer : public virtual ISyncer {
31 using DataChangedFunc = std::function<void(const std::string &device)>;
32 
33 public:
34     GenericSyncer();
35     ~GenericSyncer() override;
36 
37     // Init the Syncer modules
38     int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override;
39 
40     // Close
41     int Close(bool isClosedOperation) override;
42 
43     // Sync function.
44     // param devices: The device id list.
45     // param mode: Sync mode, see SyncMode.
46     // param onComplete: The syncer finish callback. set by caller
47     // param onFinalize: will be callback when this Sync Operation finalized.
48     // return a Sync id. It will return a positive value if failed,
49     int Sync(const std::vector<std::string> &devices, int mode,
50         const std::function<void(const std::map<std::string, int> &)> &onComplete,
51         const std::function<void(void)> &onFinalize, bool wait) override;
52 
53     // Sync function. use SyncParma to reduce parameter.
54     int Sync(const SyncParma &param);
55 
56     int Sync(const SyncParma &param, uint64_t connectionId) override;
57 
58     // Remove the operation, with the given syncId, used to clean resource if sync finished or failed.
59     int RemoveSyncOperation(int syncId) override;
60 
61     int StopSync(uint64_t connectionId) override;
62 
63     // Get The current virtual timestamp
64     uint64_t GetTimestamp() override;
65 
66     // Get manual sync queue size
67     int GetQueuedSyncSize(int *queuedSyncSize) const override;
68 
69     // Set manual sync queue limit
70     int SetQueuedSyncLimit(const int *queuedSyncLimit) override;
71 
72     // Get manual sync queue limit
73     int GetQueuedSyncLimit(int *queuedSyncLimit) const override;
74 
75     // Disable add new manual sync, for rekey
76     int DisableManualSync(void) override;
77 
78     // Enable add new manual sync, for rekey
79     int EnableManualSync(void) override;
80 
81     // Get local deviceId, is hashed
82     int GetLocalIdentity(std::string &outTarget) const override;
83 
84     // Set Manual Sync retry config
85     int SetSyncRetry(bool isRetry) override;
86 
87     // Set an equal identifier for this database, After this called, send msg to the target will use this identifier
88     int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override;
89 
90     // Inner function, Used for subscribe sync
91     int Sync(const InternalSyncParma &param);
92 
93     // Remote data changed callback
94     virtual void RemoteDataChanged(const std::string &device) = 0;
95 
96     virtual void RemoteDeviceOffline(const std::string &device) = 0;
97 
98     void Dump(int fd) override;
99 
100     SyncerBasicInfo DumpSyncerBasicInfo() override;
101 
102     int RemoteQuery(const std::string &device, const RemoteCondition &condition,
103         uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override;
104 
105     int GetSyncDataSize(const std::string &device, size_t &size) const override;
106 
107     int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const override;
108 protected:
109 
110     // trigger query auto sync or auto subscribe
111     // trigger auto subscribe only when subscribe task is failed triggered by remote db opened
112     // it won't be triggered again when subscribe task success
113     virtual void QueryAutoSync(const InternalSyncParma &param);
114 
115     // Create a sync engine, if has memory error, will return nullptr.
116     virtual ISyncEngine *CreateSyncEngine() = 0;
117 
118     virtual int PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId);
119 
120     // Add a Sync Operation, after call this function, the operation will be start
121     virtual void AddSyncOperation(SyncOperation *operation);
122 
123     // Used to set to the SyncOperation Onkill
124     virtual void SyncOperationKillCallbackInner(int syncId);
125 
126     // Used to set to the SyncOperation Onkill
127     void SyncOperationKillCallback(int syncId);
128 
129     // Init the metadata
130     int InitMetaData(ISyncInterface *syncInterface);
131 
132     // Init the TimeHelper
133     int InitTimeHelper(ISyncInterface *syncInterface);
134 
135     // Init the Sync engine
136     int InitSyncEngine(ISyncInterface *syncInterface);
137 
138     int CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive);
139 
140     // Used to general a sync id, maybe it is currentSyncId++;
141     // The return value is sync id.
142     uint32_t GenerateSyncId();
143 
144     // Check if the mode arg is valid
145     bool IsValidMode(int mode) const;
146 
147     virtual int SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const;
148 
149     // Check if the devices arg is valid
150     bool IsValidDevices(const std::vector<std::string> &devices) const;
151 
152     // Used Clear all SyncOperations.
153     // isClosedOperation is false while userChanged
154     void ClearSyncOperations(bool isClosedOperation);
155 
156     void ClearInnerResource(bool isClosedOperation);
157 
158     void TriggerSyncFinished(SyncOperation *operation);
159 
160     // Callback when the special sync finished.
161     void OnSyncFinished(int syncId);
162 
163     bool IsManualSync(int inMode) const;
164 
165     virtual int AddQueuedManualSyncSize(int mode, bool wait);
166 
167     bool IsQueuedManualSyncFull(int mode, bool wait) const;
168 
169     void SubQueuedSyncSize(void);
170 
171     void GetOnlineDevices(std::vector<std::string> &devices) const;
172 
173     std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const;
174 
175     void InitSyncOperation(SyncOperation *operation, const SyncParma &param);
176 
177     int StatusCheck() const;
178 
179     int SyncPreCheck(const SyncParma &param) const;
180 
181     int BuildSyncEngine();
182 
183     int InitTimeChangedListener();
184 
185     void ReleaseInnerResource();
186 
187     void RecordTimeChangeOffset(void *changedOffset);
188 
189     int CloseInner(bool isClosedOperation);
190 
191     int InitStorageResource(ISyncInterface *syncInterface);
192 
193     static int SyncModuleInit();
194 
195     static int SyncResourceInit();
196 
197     static bool IsNeedActive(ISyncInterface *syncInterface);
198 
199     static const int MIN_VALID_SYNC_ID;
200     static std::mutex moduleInitLock_;
201 
202     // Used to general the next sync id.
203     static int currentSyncId_;
204     static std::mutex syncIdLock_;
205     // For sync in progress.
206     std::map<uint64_t, std::list<int>> connectionIdMap_;
207     std::map<int, uint64_t> syncIdMap_;
208 
209     ISyncEngine *syncEngine_;
210     ISyncInterface *syncInterface_;
211     std::shared_ptr<TimeHelper> timeHelper_;
212     std::shared_ptr<Metadata> metadata_;
213     bool initialized_;
214     std::mutex operationMapLock_;
215     std::map<int, SyncOperation *> syncOperationMap_;
216     int queuedManualSyncSize_;
217     int queuedManualSyncLimit_;
218     bool manualSyncEnable_;
219     bool closing_;
220     mutable std::mutex queuedManualSyncLock_;
221     mutable std::mutex syncerLock_;
222     std::string label_;
223     std::mutex engineMutex_;
224     bool engineFinalize_;
225     std::condition_variable engineFinalizeCv_;
226 
227     std::mutex timeChangeListenerMutex_;
228     bool timeChangeListenerFinalize_;
229     std::condition_variable timeChangeCv_;
230     NotificationChain::Listener *timeChangedListener_;
231 };
232 } // namespace DistributedDB
233 
234 #endif  // GENRIC_SYNCER_H
235