• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_TASK_CONTEXT_H
17 #define SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 
22 #include "icommunicator.h"
23 #include "ikvdb_sync_interface.h"
24 #include "isync_task_context.h"
25 #include "meta_data.h"
26 #include "runtime_context.h"
27 #include "semaphore_utils.h"
28 #include "sync_operation.h"
29 #include "sync_target.h"
30 #include "time_helper.h"
31 
32 namespace DistributedDB {
33 enum SyncDirectionFlag {
34     SEND = 0,
35     RECEIVE = 1,
36 };
37 struct TaskParam {
38     uint32_t timeout = 0;
39 };
40 class ISyncStateMachine;
41 
42 class SyncTaskContext : public ISyncTaskContext {
43 public:
44     SyncTaskContext();
45 
46     // Add a sync task target to the queue
47     int AddSyncTarget(ISyncTarget *target) override;
48 
49     // Set the status of this task
50     void SetOperationStatus(int status) override;
51 
52     // Clear context data
53     void Clear() override;
54 
55     // remove a sync target by syncId
56     int RemoveSyncOperation(int syncId) override;
57 
58     // If the requestTargetQueue is empty
59     bool IsTargetQueueEmpty() const override;
60 
61     // Get the status of this task
62     int GetOperationStatus() const override;
63 
64     // Set the mode of this task
65     void SetMode(int mode) override;
66 
67     // Get the mode of this task
68     int GetMode() const override;
69 
70     // Move to next target to sync
71     void MoveToNextTarget() override;
72 
73     int GetNextTarget() override;
74 
75     // Get the current task syncId
76     uint32_t GetSyncId() const override;
77 
78     // Get the current task deviceId.
79     std::string GetDeviceId() const override;
80 
81     // Set the sync task queue exec status
82     void SetTaskExecStatus(int status) override;
83 
84     // Get the sync task queue exec status
85     int GetTaskExecStatus() const override;
86 
87     // Return if now is doing auto sync
88     bool IsAutoSync() const override;
89 
90     // Set a Timer used for timeout
91     int StartTimer() override;
92 
93     // delete timer
94     void StopTimer() override;
95 
96     // modify timer
97     int ModifyTimer(int milliSeconds) override;
98 
99     // Set a RetryTime for the sync task
100     void SetRetryTime(int retryTime) override;
101 
102     // Get a RetryTime for the sync task
103     int GetRetryTime() const override;
104 
105     // Set Retry status for the sync task
106     void SetRetryStatus(int isNeedRetry) override;
107 
108     // Get Retry status for the sync task
109     int GetRetryStatus() const override;
110 
111     TimerId GetTimerId() const override;
112 
113     // Inc the current message sequenceId
114     void IncSequenceId() override;
115 
116     // Get the current initiactive sync session id
117     uint32_t GetRequestSessionId() const override;
118 
119     // Get the current message sequence id
120     uint32_t GetSequenceId() const override;
121 
122     void ReSetSequenceId() override;
123 
124     void IncPacketId();
125 
126     uint64_t GetPacketId() const;
127 
128     // Get the current watch timeout time
129     int GetTimeoutTime() const override;
130 
131     void SetTimeoutCallback(const TimerAction &timeOutCallback) override;
132 
133     // Start the sync state machine
134     int StartStateMachine() override;
135 
136     // Set the timeoffset with the remote device
137     void SetTimeOffset(TimeOffset offset) override;
138 
139     // Get the timeoffset with the remote device
140     TimeOffset GetTimeOffset() const override;
141 
142     // Used for sync message callback
143     int ReceiveMessageCallback(Message *inMsg) override;
144 
145     // used to register a callback, called when new SyncTarget added
146     void RegOnSyncTask(const std::function<int(void)> &callback) override;
147 
148     // When schedule a new task, should call this function to inc usedcount
149     int IncUsedCount() override;
150 
151     // When schedule task exit, should call this function to dec usedcount
152     void SafeExit() override;
153 
154     // Get current local time from TimeHelper
155     Timestamp GetCurrentLocalTime() const override;
156 
157     // Set the remount software version num
158     void SetRemoteSoftwareVersion(uint32_t version) override;
159 
160     // Get the remount software version num
161     uint32_t GetRemoteSoftwareVersion() const override;
162 
163     // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase.
164     // Used to check if the version num is is overdue
165     uint64_t GetRemoteSoftwareVersionId() const override;
166 
167     // Judge if the communicator is normal
168     bool IsCommNormal() const override;
169 
170     void ClearSyncOperation() override;
171 
172     // If ability sync request set version, need call this function.
173     // Should be called with ObjLock
174     virtual void Abort(int status);
175 
176     // Used in send msg, as execution is asynchronous, should use this function to handle result.
177     static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId);
178 
179     int GetTaskErrCode() const override;
180 
181     void SetTaskErrCode(int errCode) override;
182 
183     bool IsSyncTaskNeedRetry() const override;
184 
185     void SetSyncRetry(bool isRetry) override;
186 
187     int GetSyncRetryTimes() const override;
188 
189     int GetSyncRetryTimeout(int retryTime) const override;
190 
191     void ClearAllSyncTask() override;
192 
193     bool IsAutoLiftWaterMark() const override;
194 
195     void IncNegotiationCount() override;
196 
197     // check if need trigger query auto sync and get query from inMsg
198     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
199 
200     bool IsAutoSubscribe() const override;
201 
202     bool IsCurrentSyncTaskCanBeSkipped() const override;
203 
204     virtual void ResetLastPushTaskStatus();
205 
206     bool GetIsNeedResetAbilitySync() const;
207 
208     void SetIsNeedResetAbilitySync(bool isNeedReset) override;
209 
210     void SchemaChange() override;
211 
212     void Dump(int fd) override;
213 
214     void AbortMachineIfNeed(uint32_t syncId) override;
215 protected:
216     const static int KILL_WAIT_SECONDS = INT32_MAX;
217 
218     ~SyncTaskContext() override;
219 
220     virtual int TimeOut(TimerId id);
221 
222     virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam);
223 
224     void CommErrHandlerFuncInner(int errCode, uint32_t sessionId);
225 
226     void KillWait();
227 
228     void ClearSyncTarget();
229 
230     void CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId);
231 
232     virtual void SaveLastPushTaskExecStatus(int finalStatus);
233 
234     int RunPermissionCheck(uint8_t flag) const;
235 
236     SyncOperation *GetAndIncSyncOperation() const;
237 
238     uint32_t GenerateRequestSessionId();
239 
240     static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode);
241 
242     mutable std::mutex targetQueueLock_;
243     std::list<ISyncTarget *> requestTargetQueue_;
244     std::list<ISyncTarget *> responseTargetQueue_;
245     SyncOperation *syncOperation_;
246     mutable std::mutex operationLock_;
247     volatile uint32_t syncId_;
248     volatile int mode_;
249     volatile bool isAutoSync_;
250     volatile int status_;
251     volatile int taskExecStatus_;
252     std::string deviceId_;
253     std::string syncActionName_;
254     ISyncInterface *syncInterface_;
255     ICommunicator *communicator_;
256     ISyncStateMachine *stateMachine_;
257     TimeOffset timeOffset_ = 0;
258     volatile int retryTime_ = 0;
259     volatile int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
260     volatile uint32_t requestSessionId_ = 0;
261     volatile uint32_t lastRequestSessionId_ = 0;
262     volatile uint32_t sequenceId_ = 1;
263     std::function<int(void)> onSyncTaskAdd_;
264 
265     // for safe exit
266     std::condition_variable safeKill_;
267     volatile int usedCount_ = 0;
268 
269     // for timeout callback
270     std::mutex timerLock_;
271     TimerId timerId_ = 0;
272     int timeout_ = 1000; // 1000ms
273     TimerAction timeOutCallback_;
274     std::unique_ptr<TimeHelper> timeHelper_;
275 
276     // for version sync
277     mutable std::mutex remoteSoftwareVersionLock_;
278     volatile uint32_t remoteSoftwareVersion_;
279     volatile uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue
280 
281     volatile bool isCommNormal_;
282     volatile int taskErrCode_;
283     volatile uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above
284     volatile bool syncTaskRetryStatus_;
285     volatile bool isSyncRetry_;
286     volatile uint32_t negotiationCount_;
287     volatile bool isAutoSubscribe_;
288     // syncFinished_ need to set false if isNeedResetSyncFinished_ is true when start do abilitySync interface
289     std::atomic<bool> isNeedResetAbilitySync_;
290 
291     // For global ISyncTaskContext Set, used by CommErrCallback.
292     static std::mutex synTaskContextSetLock_;
293     static std::set<ISyncTaskContext *> synTaskContextSet_;
294 };
295 } // namespace DistributedDB
296 
297 #endif // SYNC_TASK_CONTEXT_H
298