1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef GENRIC_SYNCER_H 17 #define GENRIC_SYNCER_H 18 19 #include <functional> 20 #include <mutex> 21 #include <map> 22 23 #include "isyncer.h" 24 #include "isync_engine.h" 25 #include "meta_data.h" 26 #include "sync_operation.h" 27 #include "time_helper.h" 28 29 namespace DistributedDB { 30 class GenericSyncer : public virtual ISyncer { 31 using DataChangedFunc = std::function<void(const std::string &device)>; 32 33 public: 34 GenericSyncer(); 35 ~GenericSyncer() override; 36 37 // Init the Syncer modules 38 int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override; 39 40 // Close 41 int Close(bool isClosedOperation) override; 42 43 // Sync function. 44 // param devices: The device id list. 45 // param mode: Sync mode, see SyncMode. 46 // param onComplete: The syncer finish callback. set by caller 47 // param onFinalize: will be callback when this Sync Operation finalized. 48 // return a Sync id. It will return a positive value if failed, 49 int Sync(const std::vector<std::string> &devices, int mode, 50 const std::function<void(const std::map<std::string, int> &)> &onComplete, 51 const std::function<void(void)> &onFinalize, bool wait) override; 52 53 // Sync function. use SyncParam to reduce parameter. 54 int Sync(const SyncParam ¶m); 55 56 int Sync(const SyncParam ¶m, uint64_t connectionId) override; 57 58 // Cancel sync function. 59 int CancelSync(uint32_t syncId) override; 60 61 // Remove the operation, with the given syncId, used to clean resource if sync finished or failed. 62 int RemoveSyncOperation(int syncId) override; 63 64 int StopSync(uint64_t connectionId) override; 65 66 // Get The current virtual timestamp 67 uint64_t GetTimestamp() override; 68 69 // Get manual sync queue size 70 int GetQueuedSyncSize(int *queuedSyncSize) const override; 71 72 // Set manual sync queue limit 73 int SetQueuedSyncLimit(const int *queuedSyncLimit) override; 74 75 // Get manual sync queue limit 76 int GetQueuedSyncLimit(int *queuedSyncLimit) const override; 77 78 // Disable add new manual sync, for rekey 79 int DisableManualSync(void) override; 80 81 // Enable add new manual sync, for rekey 82 int EnableManualSync(void) override; 83 84 // Get local deviceId, is hashed 85 int GetLocalIdentity(std::string &outTarget) const override; 86 87 // Set Manual Sync retry config 88 int SetSyncRetry(bool isRetry) override; 89 90 // Set an equal identifier for this database, After this called, send msg to the target will use this identifier 91 int SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets) override; 92 93 // Inner function, Used for subscribe sync 94 int Sync(const InternalSyncParma ¶m); 95 96 // Remote data changed callback 97 virtual void RemoteDataChanged(const std::string &device) = 0; 98 99 virtual void RemoteDeviceOffline(const std::string &device) = 0; 100 101 void Dump(int fd) override; 102 103 SyncerBasicInfo DumpSyncerBasicInfo() override; 104 105 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 106 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result) override; 107 108 int GetSyncDataSize(const std::string &device, size_t &size) const override; 109 110 int GetHashDeviceId(const std::string &clientId, std::string &hashDevId) const override; 111 112 int GetWatermarkInfo(const std::string &device, WatermarkInfo &info) override; 113 114 int UpgradeSchemaVerInMeta() override; 115 116 void ResetSyncStatus() override; 117 118 int64_t GetLocalTimeOffset() override; 119 120 int32_t GetTaskCount() override; 121 122 bool ExchangeClosePending(bool expected) override; 123 protected: 124 125 // trigger query auto sync or auto subscribe 126 // trigger auto subscribe only when subscribe task is failed triggered by remote db opened 127 // it won't be triggered again when subscribe task success 128 virtual void QueryAutoSync(const InternalSyncParma ¶m); 129 130 // Create a sync engine, if has memory error, will return nullptr. 131 virtual ISyncEngine *CreateSyncEngine() = 0; 132 133 virtual int PrepareSync(const SyncParam ¶m, uint32_t syncId, uint64_t connectionId); 134 135 // Add a Sync Operation, after call this function, the operation will be start 136 virtual void AddSyncOperation(ISyncEngine *engine, SyncOperation *operation); 137 138 // Used to set to the SyncOperation Onkill 139 virtual void SyncOperationKillCallbackInner(int syncId); 140 141 // Used to set to the SyncOperation Onkill 142 void SyncOperationKillCallback(int syncId); 143 144 // Init the metadata 145 int InitMetaData(ISyncInterface *syncInterface); 146 147 // Init the TimeHelper 148 int InitTimeHelper(ISyncInterface *syncInterface); 149 150 // Init the Sync engine 151 virtual int InitSyncEngine(ISyncInterface *syncInterface); 152 153 int CheckSyncActive(ISyncInterface *syncInterface, bool isNeedActive); 154 155 // Used to general a sync id, maybe it is currentSyncId++; 156 // The return value is sync id. 157 uint32_t GenerateSyncId(); 158 159 // Check if the mode arg is valid 160 bool IsValidMode(int mode) const; 161 162 virtual int SyncConditionCheck(const SyncParam ¶m, const ISyncEngine *engine, ISyncInterface *storage) const; 163 164 // Check if the devices arg is valid 165 bool IsValidDevices(const std::vector<std::string> &devices) const; 166 167 // Used Clear all SyncOperations. 168 // isClosedOperation is false while userChanged 169 void ClearSyncOperations(bool isClosedOperation); 170 171 void ClearInnerResource(bool isClosedOperation); 172 173 void TriggerSyncFinished(SyncOperation *operation); 174 175 // Callback when the special sync finished. 176 void OnSyncFinished(int syncId); 177 178 bool IsManualSync(int inMode) const; 179 180 virtual int AddQueuedManualSyncSize(int mode, bool wait); 181 182 bool IsQueuedManualSyncFull(int mode, bool wait) const; 183 184 void SubQueuedSyncSize(void); 185 186 void GetOnlineDevices(std::vector<std::string> &devices) const; 187 188 std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const; 189 190 void InitSyncOperation(SyncOperation *operation, const SyncParam ¶m); 191 192 int StatusCheck() const; 193 194 int SyncPreCheck(const SyncParam ¶m) const; 195 196 int BuildSyncEngine(); 197 198 int InitTimeChangedListener(); 199 200 void ReleaseInnerResource(); 201 202 void RecordTimeChangeOffset(void *changedOffset); 203 204 int CloseInner(bool isClosedOperation); 205 206 int InitStorageResource(ISyncInterface *syncInterface); 207 208 void ResetTimeSyncMarkByTimeChange(std::shared_ptr<Metadata> &metadata, ISyncInterface &storage); 209 210 static int SyncModuleInit(); 211 212 static int SyncResourceInit(); 213 214 static bool IsNeedActive(ISyncInterface *syncInterface); 215 216 static const int MIN_VALID_SYNC_ID; 217 static std::mutex moduleInitLock_; 218 219 // Used to general the next sync id. 220 static int currentSyncId_; 221 static std::mutex syncIdLock_; 222 // For sync in progress. 223 std::map<uint64_t, std::list<int>> connectionIdMap_; 224 std::map<int, uint64_t> syncIdMap_; 225 226 ISyncEngine *syncEngine_; 227 ISyncInterface *syncInterface_; 228 std::shared_ptr<TimeHelper> timeHelper_; 229 std::shared_ptr<Metadata> metadata_; 230 bool initialized_; 231 std::mutex operationMapLock_; 232 std::map<int, SyncOperation *> syncOperationMap_; 233 int queuedManualSyncSize_; 234 int queuedManualSyncLimit_; 235 bool manualSyncEnable_; 236 bool closing_; 237 mutable std::mutex queuedManualSyncLock_; 238 mutable std::mutex syncerLock_; 239 std::string label_; 240 std::mutex engineMutex_; 241 bool engineFinalize_; 242 std::condition_variable engineFinalizeCv_; 243 244 std::mutex timeChangeListenerMutex_; 245 bool timeChangeListenerFinalize_; 246 std::condition_variable timeChangeCv_; 247 NotificationChain::Listener *timeChangedListener_; 248 }; 249 } // namespace DistributedDB 250 251 #endif // GENRIC_SYNCER_H 252