1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef GENRIC_SYNCER_H 17 #define GENRIC_SYNCER_H 18 19 #include <functional> 20 #include <mutex> 21 #include <map> 22 23 #include "isyncer.h" 24 #include "isync_engine.h" 25 #include "meta_data.h" 26 #include "sync_operation.h" 27 #include "time_helper.h" 28 29 namespace DistributedDB { 30 class GenericSyncer : public virtual ISyncer { 31 using DataChangedFunc = std::function<void(const std::string &device)>; 32 33 public: 34 GenericSyncer(); 35 ~GenericSyncer() override; 36 37 // Init the Syncer modules 38 int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override; 39 40 // Close 41 int Close(bool isClosedOperation) override; 42 43 // Sync function. 44 // param devices: The device id list. 45 // param mode: Sync mode, see SyncMode. 46 // param onComplete: The syncer finish callback. set by caller 47 // param onFinalize: will be callback when this Sync Operation finalized. 48 // return a Sync id. It will return a positive value if failed, 49 int Sync(const std::vector<std::string> &devices, int mode, 50 const std::function<void(const std::map<std::string, int> &)> &onComplete, 51 const std::function<void(void)> &onFinalize, bool wait) override; 52 53 // Sync function. use SyncParma to reduce parameter. 54 int Sync(const SyncParma ¶m); 55 56 int Sync(const SyncParma ¶m, uint64_t connectionId) override; 57 58 // Cancel sync function. 59 int CancelSync(uint32_t syncId) override; 60 61 // Remove the operation, with the given syncId, used to clean resource if sync finished or failed. 62 int RemoveSyncOperation(int syncId) override; 63 64 int StopSync(uint64_t connectionId) override; 65 66 // Get The current virtual timestamp 67 uint64_t GetTimestamp() override; 68 69 // Get manual sync queue size 70 int GetQueuedSyncSize(int *queuedSyncSize) const override; 71 72 // Set manual sync queue limit 73 int SetQueuedSyncLimit(const int *queuedSyncLimit) override; 74 75 // Get manual sync queue limit 76 int GetQueuedSyncLimit(int *queuedSyncLimit) const override; 77 78 // Disable add new manual sync, for rekey 79 int DisableManualSync(void) override; 80 81 // Enable add new manual sync, for rekey 82 int EnableManualSync(void) override; 83 84 // Get local deviceId, is hashed 85 int GetLocalIdentity(std::string &outTarget) const override; 86 87 // Set Manual Sync retry config 88 int SetSyncRetry(bool isRetry) override; 89 90 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 91 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 92 93 // Inner function, Used for subscribe sync 94 int Sync(const InternalSyncParma ¶m); 95 96 // Remote data changed callback 97 virtual void RemoteDataChanged(const std::string &device) = 0; 98 99 virtual void RemoteDeviceOffline(const std::string &device) = 0; 100 101 void Dump(int fd) override; 102 103 SyncerBasicInfo DumpSyncerBasicInfo() override; 104 105 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 106 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 107 108 int GetSyncDataSize(const std::string &device, size_t &size) const override; 109 110 int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const override; 111 112 int GetWatermarkInfo(const std::string &device, WatermarkInfo &info) override; 113 114 int UpgradeSchemaVerInMeta() override; 115 116 void ResetSyncStatus() override; 117 118 int64_t GetLocalTimeOffset() override; 119 120 int32_t GetTaskCount() override; 121 protected: 122 123 // trigger query auto sync or auto subscribe 124 // trigger auto subscribe only when subscribe task is failed triggered by remote db opened 125 // it won't be triggered again when subscribe task success 126 virtual void QueryAutoSync(const InternalSyncParma ¶m); 127 128 // Create a sync engine, if has memory error, will return nullptr. 129 virtual ISyncEngine *CreateSyncEngine() = 0; 130 131 virtual int PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId); 132 133 // Add a Sync Operation, after call this function, the operation will be start 134 virtual void AddSyncOperation(ISyncEngine *engine, SyncOperation *operation); 135 136 // Used to set to the SyncOperation Onkill 137 virtual void SyncOperationKillCallbackInner(int syncId); 138 139 // Used to set to the SyncOperation Onkill 140 void SyncOperationKillCallback(int syncId); 141 142 // Init the metadata 143 int InitMetaData(ISyncInterface *syncInterface); 144 145 // Init the TimeHelper 146 int InitTimeHelper(ISyncInterface *syncInterface); 147 148 // Init the Sync engine 149 virtual int InitSyncEngine(ISyncInterface *syncInterface); 150 151 int CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive); 152 153 // Used to general a sync id, maybe it is currentSyncId++; 154 // The return value is sync id. 155 uint32_t GenerateSyncId(); 156 157 // Check if the mode arg is valid 158 bool IsValidMode(int mode) const; 159 160 virtual int SyncConditionCheck(const SyncParma ¶m, const ISyncEngine *engine, ISyncInterface *storage) const; 161 162 // Check if the devices arg is valid 163 bool IsValidDevices(const std::vector<std::string> &devices) const; 164 165 // Used Clear all SyncOperations. 166 // isClosedOperation is false while userChanged 167 void ClearSyncOperations(bool isClosedOperation); 168 169 void ClearInnerResource(bool isClosedOperation); 170 171 void TriggerSyncFinished(SyncOperation *operation); 172 173 // Callback when the special sync finished. 174 void OnSyncFinished(int syncId); 175 176 bool IsManualSync(int inMode) const; 177 178 virtual int AddQueuedManualSyncSize(int mode, bool wait); 179 180 bool IsQueuedManualSyncFull(int mode, bool wait) const; 181 182 void SubQueuedSyncSize(void); 183 184 void GetOnlineDevices(std::vector<std::string> &devices) const; 185 186 std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const; 187 188 void InitSyncOperation(SyncOperation *operation, const SyncParma ¶m); 189 190 int StatusCheck() const; 191 192 int SyncPreCheck(const SyncParma ¶m) const; 193 194 int BuildSyncEngine(); 195 196 int InitTimeChangedListener(); 197 198 void ReleaseInnerResource(); 199 200 void RecordTimeChangeOffset(void *changedOffset); 201 202 int CloseInner(bool isClosedOperation); 203 204 int InitStorageResource(ISyncInterface *syncInterface); 205 206 void ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage); 207 208 static int SyncModuleInit(); 209 210 static int SyncResourceInit(); 211 212 static bool IsNeedActive(ISyncInterface *syncInterface); 213 214 static const int MIN_VALID_SYNC_ID; 215 static std::mutex moduleInitLock_; 216 217 // Used to general the next sync id. 218 static int currentSyncId_; 219 static std::mutex syncIdLock_; 220 // For sync in progress. 221 std::map<uint64_t, std::list<int>> connectionIdMap_; 222 std::map<int, uint64_t> syncIdMap_; 223 224 ISyncEngine *syncEngine_; 225 ISyncInterface *syncInterface_; 226 std::shared_ptr<TimeHelper> timeHelper_; 227 std::shared_ptr<Metadata> metadata_; 228 bool initialized_; 229 std::mutex operationMapLock_; 230 std::map<int, SyncOperation *> syncOperationMap_; 231 int queuedManualSyncSize_; 232 int queuedManualSyncLimit_; 233 bool manualSyncEnable_; 234 bool closing_; 235 mutable std::mutex queuedManualSyncLock_; 236 mutable std::mutex syncerLock_; 237 std::string label_; 238 std::mutex engineMutex_; 239 bool engineFinalize_; 240 std::condition_variable engineFinalizeCv_; 241 242 std::mutex timeChangeListenerMutex_; 243 bool timeChangeListenerFinalize_; 244 std::condition_variable timeChangeCv_; 245 NotificationChain::Listener *timeChangedListener_; 246 }; 247 } // namespace DistributedDB 248 249 #endif // GENRIC_SYNCER_H 250