1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_TASK_CONTEXT_H 17 #define SYNC_TASK_CONTEXT_H 18 19 #include <list> 20 #include <mutex> 21 22 #include "icommunicator.h" 23 #include "ikvdb_sync_interface.h" 24 #include "isync_task_context.h" 25 #include "meta_data.h" 26 #include "runtime_context.h" 27 #include "semaphore_utils.h" 28 #include "sync_operation.h" 29 #include "sync_target.h" 30 #include "time_helper.h" 31 32 namespace DistributedDB { 33 enum SyncDirectionFlag { 34 SEND = 0, 35 RECEIVE = 1, 36 }; 37 struct TaskParam { 38 uint32_t timeout = 0; 39 }; 40 class ISyncStateMachine; 41 42 class SyncTaskContext : public ISyncTaskContext { 43 public: 44 SyncTaskContext(); 45 46 // Add a sync task target to the queue 47 int AddSyncTarget(ISyncTarget *target) override; 48 49 // Set the status of this task 50 void SetOperationStatus(int status) override; 51 52 // Clear context data 53 void Clear() override; 54 55 // remove a sync target by syncId 56 int RemoveSyncOperation(int syncId) override; 57 58 // If the requestTargetQueue is empty 59 bool IsTargetQueueEmpty() const override; 60 61 // Get the status of this task 62 int GetOperationStatus() const override; 63 64 // Set the mode of this task 65 void SetMode(int mode) override; 66 67 // Get the mode of this task 68 int GetMode() const override; 69 70 // Move to next target to sync 71 void MoveToNextTarget() override; 72 73 int GetNextTarget() override; 74 75 // Get the current task syncId 76 uint32_t GetSyncId() const override; 77 78 // Get the current task deviceId. 79 std::string GetDeviceId() const override; 80 81 // Set the sync task queue exec status 82 void SetTaskExecStatus(int status) override; 83 84 // Get the sync task queue exec status 85 int GetTaskExecStatus() const override; 86 87 // Return if now is doing auto sync 88 bool IsAutoSync() const override; 89 90 // Set a Timer used for timeout 91 int StartTimer() override; 92 93 // delete timer 94 void StopTimer() override; 95 96 // modify timer 97 int ModifyTimer(int milliSeconds) override; 98 99 // Set a RetryTime for the sync task 100 void SetRetryTime(int retryTime) override; 101 102 // Get a RetryTime for the sync task 103 int GetRetryTime() const override; 104 105 // Set Retry status for the sync task 106 void SetRetryStatus(int isNeedRetry) override; 107 108 // Get Retry status for the sync task 109 int GetRetryStatus() const override; 110 111 TimerId GetTimerId() const override; 112 113 // Inc the current message sequenceId 114 void IncSequenceId() override; 115 116 // Get the current initiactive sync session id 117 uint32_t GetRequestSessionId() const override; 118 119 // Get the current message sequence id 120 uint32_t GetSequenceId() const override; 121 122 void ReSetSequenceId() override; 123 124 void IncPacketId(); 125 126 uint64_t GetPacketId() const; 127 128 // Get the current watch timeout time 129 int GetTimeoutTime() const override; 130 131 void SetTimeoutCallback(const TimerAction &timeOutCallback) override; 132 133 // Start the sync state machine 134 int StartStateMachine() override; 135 136 // Set the timeoffset with the remote device 137 void SetTimeOffset(TimeOffset offset) override; 138 139 // Get the timeoffset with the remote device 140 TimeOffset GetTimeOffset() const override; 141 142 // Used for sync message callback 143 int ReceiveMessageCallback(Message *inMsg) override; 144 145 // used to register a callback, called when new SyncTarget added 146 void RegOnSyncTask(const std::function<int(void)> &callback) override; 147 148 // When schedule a new task, should call this function to inc usedcount 149 int IncUsedCount() override; 150 151 // When schedule task exit, should call this function to dec usedcount 152 void SafeExit() override; 153 154 // Get current local time from TimeHelper 155 Timestamp GetCurrentLocalTime() const override; 156 157 // Set the remount software version num 158 void SetRemoteSoftwareVersion(uint32_t version) override; 159 160 // Get the remount software version num 161 uint32_t GetRemoteSoftwareVersion() const override; 162 163 // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase. 164 // Used to check if the version num is is overdue 165 uint64_t GetRemoteSoftwareVersionId() const override; 166 167 // Judge if the communicator is normal 168 bool IsCommNormal() const override; 169 170 void ClearSyncOperation() override; 171 172 // If ability sync request set version, need call this function. 173 // Should be called with ObjLock 174 virtual void Abort(int status); 175 176 // Used in send msg, as execution is asynchronous, should use this function to handle result. 177 static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId); 178 179 int GetTaskErrCode() const override; 180 181 void SetTaskErrCode(int errCode) override; 182 183 bool IsSyncTaskNeedRetry() const override; 184 185 void SetSyncRetry(bool isRetry) override; 186 187 int GetSyncRetryTimes() const override; 188 189 int GetSyncRetryTimeout(int retryTime) const override; 190 191 void ClearAllSyncTask() override; 192 193 bool IsAutoLiftWaterMark() const override; 194 195 void IncNegotiationCount() override; 196 197 // check if need trigger query auto sync and get query from inMsg 198 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 199 200 bool IsAutoSubscribe() const override; 201 202 bool IsCurrentSyncTaskCanBeSkipped() const override; 203 204 virtual void ResetLastPushTaskStatus(); 205 206 bool GetIsNeedResetAbilitySync() const; 207 208 void SetIsNeedResetAbilitySync(bool isNeedReset) override; 209 210 void SchemaChange() override; 211 212 void Dump(int fd) override; 213 214 void AbortMachineIfNeed(uint32_t syncId) override; 215 protected: 216 const static int KILL_WAIT_SECONDS = INT32_MAX; 217 218 ~SyncTaskContext() override; 219 220 virtual int TimeOut(TimerId id); 221 222 virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam); 223 224 void CommErrHandlerFuncInner(int errCode, uint32_t sessionId); 225 226 void KillWait(); 227 228 void ClearSyncTarget(); 229 230 void CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId); 231 232 virtual void SaveLastPushTaskExecStatus(int finalStatus); 233 234 int RunPermissionCheck(uint8_t flag) const; 235 236 SyncOperation *GetAndIncSyncOperation() const; 237 238 uint32_t GenerateRequestSessionId(); 239 240 static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode); 241 242 mutable std::mutex targetQueueLock_; 243 std::list<ISyncTarget *> requestTargetQueue_; 244 std::list<ISyncTarget *> responseTargetQueue_; 245 SyncOperation *syncOperation_; 246 mutable std::mutex operationLock_; 247 volatile uint32_t syncId_; 248 volatile int mode_; 249 volatile bool isAutoSync_; 250 volatile int status_; 251 volatile int taskExecStatus_; 252 std::string deviceId_; 253 std::string syncActionName_; 254 ISyncInterface *syncInterface_; 255 ICommunicator *communicator_; 256 ISyncStateMachine *stateMachine_; 257 TimeOffset timeOffset_ = 0; 258 volatile int retryTime_ = 0; 259 volatile int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY; 260 volatile uint32_t requestSessionId_ = 0; 261 volatile uint32_t lastRequestSessionId_ = 0; 262 volatile uint32_t sequenceId_ = 1; 263 std::function<int(void)> onSyncTaskAdd_; 264 265 // for safe exit 266 std::condition_variable safeKill_; 267 volatile int usedCount_ = 0; 268 269 // for timeout callback 270 std::mutex timerLock_; 271 TimerId timerId_ = 0; 272 int timeout_ = 1000; // 1000ms 273 TimerAction timeOutCallback_; 274 std::unique_ptr<TimeHelper> timeHelper_; 275 276 // for version sync 277 mutable std::mutex remoteSoftwareVersionLock_; 278 volatile uint32_t remoteSoftwareVersion_; 279 volatile uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue 280 281 volatile bool isCommNormal_; 282 volatile int taskErrCode_; 283 volatile uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above 284 volatile bool syncTaskRetryStatus_; 285 volatile bool isSyncRetry_; 286 volatile uint32_t negotiationCount_; 287 volatile bool isAutoSubscribe_; 288 // syncFinished_ need to set false if isNeedResetSyncFinished_ is true when start do abilitySync interface 289 std::atomic<bool> isNeedResetAbilitySync_; 290 291 // For global ISyncTaskContext Set, used by CommErrCallback. 292 static std::mutex synTaskContextSetLock_; 293 static std::set<ISyncTaskContext *> synTaskContextSet_; 294 }; 295 } // namespace DistributedDB 296 297 #endif // SYNC_TASK_CONTEXT_H 298