1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_TASK_CONTEXT_H 17 #define SYNC_TASK_CONTEXT_H 18 19 #include <list> 20 #include <mutex> 21 22 #include "icommunicator.h" 23 #include "ikvdb_sync_interface.h" 24 #include "isync_task_context.h" 25 #include "meta_data.h" 26 #include "runtime_context.h" 27 #include "semaphore_utils.h" 28 #include "sync_operation.h" 29 #include "sync_target.h" 30 #include "time_helper.h" 31 32 namespace DistributedDB { 33 enum SyncDirectionFlag { 34 SEND = 0, 35 RECEIVE = 1, 36 }; 37 struct TaskParam { 38 uint32_t timeout = 0; 39 }; 40 class ISyncStateMachine; 41 42 class SyncTaskContext : public ISyncTaskContext { 43 public: 44 SyncTaskContext(); 45 46 // Add a sync task target to the queue 47 int AddSyncTarget(ISyncTarget *target) override; 48 49 // Set the status of this task 50 void SetOperationStatus(int status) override; 51 52 // Clear context data 53 void Clear() override; 54 55 // remove a sync target by syncId 56 int RemoveSyncOperation(int syncId) override; 57 58 // If the requestTargetQueue is empty 59 bool IsTargetQueueEmpty() const override; 60 61 // Get the status of this task 62 int GetOperationStatus() const override; 63 64 // Set the mode of this task 65 void SetMode(int mode) override; 66 67 // Get the mode of this task 68 int GetMode() const override; 69 70 // Move to next target to sync 71 void MoveToNextTarget() override; 72 73 int GetNextTarget(bool isNeedSetFinished) override; 74 75 // Get the current task syncId 76 uint32_t GetSyncId() const override; 77 78 // Get the current task deviceId. 79 std::string GetDeviceId() const override; 80 81 // Set the sync task queue exec status 82 void SetTaskExecStatus(int status) override; 83 84 // Get the sync task queue exec status 85 int GetTaskExecStatus() const override; 86 87 // Return if now is doing auto sync 88 bool IsAutoSync() const override; 89 90 // Set a Timer used for timeout 91 int StartTimer() override; 92 93 // delete timer 94 void StopTimer() override; 95 96 // modify timer 97 int ModifyTimer(int milliSeconds) override; 98 99 // Set a RetryTime for the sync task 100 void SetRetryTime(int retryTime) override; 101 102 // Get a RetryTime for the sync task 103 int GetRetryTime() const override; 104 105 // Set Retry status for the sync task 106 void SetRetryStatus(int isNeedRetry) override; 107 108 // Get Retry status for the sync task 109 int GetRetryStatus() const override; 110 111 TimerId GetTimerId() const override; 112 113 // Inc the current message sequenceId 114 void IncSequenceId() override; 115 116 // Get the current initiactive sync session id 117 uint32_t GetRequestSessionId() const override; 118 119 // Get the current message sequence id 120 uint32_t GetSequenceId() const override; 121 122 void ReSetSequenceId() override; 123 124 void IncPacketId(); 125 126 uint64_t GetPacketId() const; 127 128 // Get the current watch timeout time 129 int GetTimeoutTime() const override; 130 131 void SetTimeoutCallback(const TimerAction &timeOutCallback) override; 132 133 // Start the sync state machine 134 int StartStateMachine() override; 135 136 // Set the timeoffset with the remote device 137 void SetTimeOffset(TimeOffset offset) override; 138 139 // Get the timeoffset with the remote device 140 TimeOffset GetTimeOffset() const override; 141 142 // Used for sync message callback 143 int ReceiveMessageCallback(Message *inMsg) override; 144 145 // used to register a callback, called when new SyncTarget added 146 void RegOnSyncTask(const std::function<int(void)> &callback) override; 147 148 // When schedule a new task, should call this function to inc usedcount 149 int IncUsedCount() override; 150 151 // When schedule task exit, should call this function to dec usedcount 152 void SafeExit() override; 153 154 // Get current local time from TimeHelper 155 Timestamp GetCurrentLocalTime() const override; 156 157 // Set the remount software version num 158 void SetRemoteSoftwareVersion(uint32_t version) override; 159 160 // Get the remount software version num 161 uint32_t GetRemoteSoftwareVersion() const override; 162 163 // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase. 164 // Used to check if the version num is is overdue 165 uint64_t GetRemoteSoftwareVersionId() const override; 166 167 // Judge if the communicator is normal 168 bool IsCommNormal() const override; 169 170 // If ability sync request set version, need call this function. 171 // Should be called with ObjLock 172 virtual void Abort(int status); 173 174 // Used in send msg, as execution is asynchronous, should use this function to handle result. 175 static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId); 176 177 int GetTaskErrCode() const override; 178 179 void SetTaskErrCode(int errCode) override; 180 181 bool IsSyncTaskNeedRetry() const override; 182 183 void SetSyncRetry(bool isRetry) override; 184 185 int GetSyncRetryTimes() const override; 186 187 int GetSyncRetryTimeout(int retryTime) const override; 188 189 void ClearAllSyncTask() override; 190 191 bool IsAutoLiftWaterMark() const override; 192 193 void IncNegotiationCount() override; 194 195 // check if need trigger query auto sync and get query from inMsg 196 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 197 198 bool IsAutoSubscribe() const override; 199 200 bool IsCurrentSyncTaskCanBeSkipped() const override; 201 202 virtual void ResetLastPushTaskStatus(); 203 204 bool GetIsNeedResetAbilitySync() const; 205 206 void SetIsNeedResetAbilitySync(bool isNeedReset) override; 207 208 void SchemaChange() override; 209 210 void Dump(int fd) override; 211 212 void AbortMachineIfNeed(uint32_t syncId) override; 213 protected: 214 const static int KILL_WAIT_SECONDS = INT32_MAX; 215 216 ~SyncTaskContext() override; 217 218 virtual int TimeOut(TimerId id); 219 220 virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam); 221 222 void CommErrHandlerFuncInner(int errCode, uint32_t sessionId); 223 224 void KillWait(); 225 226 void ClearSyncOperation(); 227 228 void ClearSyncTarget(); 229 230 void CancelCurrentSyncRetryIfNeed(int newTargetMode); 231 232 virtual void SaveLastPushTaskExecStatus(int finalStatus); 233 234 int RunPermissionCheck(uint8_t flag) const; 235 236 SyncOperation *GetAndIncSyncOperation() const; 237 238 static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode); 239 240 mutable std::mutex targetQueueLock_; 241 std::list<ISyncTarget *> requestTargetQueue_; 242 std::list<ISyncTarget *> responseTargetQueue_; 243 SyncOperation *syncOperation_; 244 mutable std::mutex operationLock_; 245 uint32_t syncId_; 246 int mode_; 247 bool isAutoSync_; 248 int status_; 249 int taskExecStatus_; 250 std::string deviceId_; 251 std::string syncActionName_; 252 ISyncInterface *syncInterface_; 253 ICommunicator *communicator_; 254 ISyncStateMachine *stateMachine_; 255 TimeOffset timeOffset_ = 0; 256 int retryTime_ = 0; 257 int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY; 258 uint32_t requestSessionId_ = 0; 259 uint32_t lastRequestSessionId_ = 0; 260 uint32_t sequenceId_ = 1; 261 std::function<int(void)> onSyncTaskAdd_; 262 263 // for safe exit 264 std::condition_variable safeKill_; 265 int usedCount_ = 0; 266 267 // for timeout callback 268 std::mutex timerLock_; 269 TimerId timerId_ = 0; 270 int timeout_ = 1000; // 1000ms 271 TimerAction timeOutCallback_; 272 std::unique_ptr<TimeHelper> timeHelper_; 273 274 // for version sync 275 mutable std::mutex remoteSoftwareVersionLock_; 276 uint32_t remoteSoftwareVersion_; 277 uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue 278 279 bool isCommNormal_; 280 int taskErrCode_; 281 uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above 282 bool syncTaskRetryStatus_; 283 bool isSyncRetry_; 284 uint32_t negotiationCount_; 285 bool isAutoSubscribe_; 286 // syncFinished_ need to set false if isNeedResetSyncFinished_ is true when start do abilitySync interface 287 std::atomic<bool> isNeedResetAbilitySync_; 288 289 // For global ISyncTaskContext Set, used by CommErrCallback. 290 static std::mutex synTaskContextSetLock_; 291 static std::set<ISyncTaskContext *> synTaskContextSet_; 292 }; 293 } // namespace DistributedDB 294 295 #endif // SYNC_TASK_CONTEXT_H 296