1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_TASK_CONTEXT_H 17 #define SYNC_TASK_CONTEXT_H 18 19 #include <list> 20 #include <mutex> 21 22 #include "icommunicator.h" 23 #include "ikvdb_sync_interface.h" 24 #include "isync_task_context.h" 25 #include "meta_data.h" 26 #include "runtime_context.h" 27 #include "semaphore_utils.h" 28 #include "sync_operation.h" 29 #include "sync_target.h" 30 #include "time_helper.h" 31 32 namespace DistributedDB { 33 enum SyncDirectionFlag { 34 SEND = 0, 35 RECEIVE = 1, 36 }; 37 struct TaskParam { 38 uint32_t timeout = 0; 39 }; 40 class ISyncStateMachine; 41 42 class SyncTaskContext : public ISyncTaskContext { 43 public: 44 SyncTaskContext(); 45 46 // Add a sync task target to the queue 47 int AddSyncTarget(ISyncTarget *target) override; 48 49 // Set the status of this task 50 void SetOperationStatus(int status) override; 51 52 // Clear context data 53 void Clear() override; 54 55 // remove a sync target by syncId 56 int RemoveSyncOperation(int syncId) override; 57 58 // If the requestTargetQueue is empty 59 bool IsTargetQueueEmpty() const override; 60 61 // Get the status of this task 62 int GetOperationStatus() const override; 63 64 // Set the mode of this task 65 void SetMode(int mode) override; 66 67 // Get the mode of this task 68 int GetMode() const override; 69 70 // Move to next target to sync 71 void MoveToNextTarget(uint32_t timeout) override; 72 73 int GetNextTarget(uint32_t timeout) override; 74 75 // Get the current task syncId 76 uint32_t GetSyncId() const override; 77 78 // Get the current task deviceId. 79 std::string GetDeviceId() const override; 80 81 std::string GetTargetUserId() const override; 82 83 void SetTargetUserId(const std::string &userId) override; 84 85 // Set the sync task queue exec status 86 void SetTaskExecStatus(int status) override; 87 88 // Get the sync task queue exec status 89 int GetTaskExecStatus() const override; 90 91 // Return if now is doing auto sync 92 bool IsAutoSync() const override; 93 94 // Set a Timer used for timeout 95 int StartTimer() override; 96 97 // delete timer 98 void StopTimer() override; 99 100 // modify timer 101 int ModifyTimer(int milliSeconds) override; 102 103 // Set a RetryTime for the sync task 104 void SetRetryTime(int retryTime) override; 105 106 // Get a RetryTime for the sync task 107 int GetRetryTime() const override; 108 109 // Set Retry status for the sync task 110 void SetRetryStatus(int isNeedRetry) override; 111 112 // Get Retry status for the sync task 113 int GetRetryStatus() const override; 114 115 TimerId GetTimerId() const override; 116 117 // Inc the current message sequenceId 118 void IncSequenceId() override; 119 120 // Get the current initiactive sync session id 121 uint32_t GetRequestSessionId() const override; 122 123 // Get the current message sequence id 124 uint32_t GetSequenceId() const override; 125 126 void ReSetSequenceId() override; 127 128 void IncPacketId(); 129 130 uint64_t GetPacketId() const; 131 132 // Get the current watch timeout time 133 int GetTimeoutTime() const override; 134 135 void SetTimeoutCallback(const TimerAction &timeOutCallback) override; 136 137 // Start the sync state machine 138 int StartStateMachine() override; 139 140 // Set the timeoffset with the remote device 141 void SetTimeOffset(TimeOffset offset) override; 142 143 // Get the timeoffset with the remote device 144 TimeOffset GetTimeOffset() const override; 145 146 // Used for sync message callback 147 int ReceiveMessageCallback(Message *inMsg) override; 148 149 // used to register a callback, called when new SyncTarget added 150 void RegOnSyncTask(const std::function<int(void)> &callback) override; 151 152 // When schedule a new task, should call this function to inc usedcount 153 int IncUsedCount() override; 154 155 // When schedule task exit, should call this function to dec usedcount 156 void SafeExit() override; 157 158 // Get current local time from TimeHelper 159 Timestamp GetCurrentLocalTime() const override; 160 161 // Set the remount software version num 162 void SetRemoteSoftwareVersion(uint32_t version) override; 163 164 // Get the remount software version num 165 uint32_t GetRemoteSoftwareVersion() const override; 166 167 // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase. 168 // Used to check if the version num is is overdue 169 uint64_t GetRemoteSoftwareVersionId() const override; 170 171 // Judge if the communicator is normal 172 bool IsCommNormal() const override; 173 174 void ClearSyncOperation() override; 175 176 // If ability sync request set version, need call this function. 177 // Should be called with ObjLock 178 virtual void Abort(int status); 179 180 // Used in send msg, as execution is asynchronous, should use this function to handle result. 181 static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd = true); 182 183 int GetTaskErrCode() const override; 184 185 void SetTaskErrCode(int errCode) override; 186 187 bool IsSyncTaskNeedRetry() const override; 188 189 void SetSyncRetry(bool isRetry) override; 190 191 int GetSyncRetryTimes() const override; 192 193 int GetSyncRetryTimeout(int retryTime) const override; 194 195 void ClearAllSyncTask() override; 196 197 bool IsAutoLiftWaterMark() const override; 198 199 void IncNegotiationCount() override; 200 201 // check if need trigger query auto sync and get query from inMsg 202 bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override; 203 204 bool IsAutoSubscribe() const override; 205 206 bool IsCurrentSyncTaskCanBeSkipped() const override; 207 208 virtual void ResetLastPushTaskStatus(); 209 210 void SchemaChange() override; 211 212 void Dump(int fd) override; 213 214 void AbortMachineIfNeed(uint32_t syncId) override; 215 216 bool IsSchemaCompatible() const override; 217 218 void SetDbAbility(DbAbility &remoteDbAbility) override; 219 220 void TimeChange() override; 221 222 int32_t GetResponseTaskCount() override; 223 224 int GetCommErrCode() const; 225 226 void SetCommFailErrCode(int errCode); 227 protected: 228 const static int KILL_WAIT_SECONDS = INT32_MAX; 229 230 ~SyncTaskContext() override; 231 232 virtual int TimeOut(TimerId id); 233 234 virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam); 235 236 void CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd = true); 237 238 void KillWait(); 239 240 void ClearSyncTarget(); 241 242 void CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId); 243 244 virtual void SaveLastPushTaskExecStatus(int finalStatus); 245 246 int RunPermissionCheck(uint8_t flag) const; 247 248 SyncOperation *GetAndIncSyncOperation() const; 249 250 uint32_t GenerateRequestSessionId(); 251 252 static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode); 253 254 void SetErrCodeWhenWaitTimeOut(int errCode); 255 256 mutable std::mutex targetQueueLock_; 257 std::list<ISyncTarget *> requestTargetQueue_; 258 std::list<ISyncTarget *> responseTargetQueue_; 259 SyncOperation *syncOperation_; 260 mutable std::mutex operationLock_; 261 volatile uint32_t syncId_; 262 volatile int mode_; 263 volatile bool isAutoSync_; 264 volatile int status_; 265 volatile int taskExecStatus_; 266 std::string deviceId_; 267 std::string targetUserId_; 268 std::string syncActionName_; 269 ISyncInterface *syncInterface_; 270 ICommunicator *communicator_; 271 ISyncStateMachine *stateMachine_; 272 TimeOffset timeOffset_ = 0; 273 volatile int retryTime_ = 0; 274 volatile int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY; 275 volatile uint32_t requestSessionId_ = 0; 276 volatile uint32_t lastRequestSessionId_ = 0; 277 volatile uint32_t sequenceId_ = 1; 278 std::function<int(void)> onSyncTaskAdd_; 279 280 // for safe exit 281 std::condition_variable safeKill_; 282 volatile int usedCount_ = 0; 283 284 // for timeout callback 285 std::mutex timerLock_; 286 TimerId timerId_ = 0; 287 int timeout_ = 1000; // 1000ms 288 TimerAction timeOutCallback_; 289 std::unique_ptr<TimeHelper> timeHelper_; 290 291 // for version sync 292 mutable std::mutex remoteSoftwareVersionLock_; 293 volatile uint32_t remoteSoftwareVersion_; 294 volatile uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue 295 296 volatile bool isCommNormal_; 297 volatile int taskErrCode_; 298 std::atomic<int> commErrCode_; 299 volatile uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above 300 volatile bool syncTaskRetryStatus_; 301 volatile bool isSyncRetry_; 302 volatile uint32_t negotiationCount_; 303 volatile bool isAutoSubscribe_; 304 305 // For global ISyncTaskContext Set, used by CommErrCallback. 306 static std::mutex synTaskContextSetLock_; 307 static std::set<ISyncTaskContext *> synTaskContextSet_; 308 }; 309 } // namespace DistributedDB 310 311 #endif // SYNC_TASK_CONTEXT_H 312