• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_TASK_CONTEXT_H
17 #define SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 
22 #include "icommunicator.h"
23 #include "ikvdb_sync_interface.h"
24 #include "isync_task_context.h"
25 #include "meta_data.h"
26 #include "runtime_context.h"
27 #include "semaphore_utils.h"
28 #include "sync_operation.h"
29 #include "sync_target.h"
30 #include "time_helper.h"
31 
32 namespace DistributedDB {
33 enum SyncDirectionFlag {
34     SEND = 0,
35     RECEIVE = 1,
36 };
37 struct TaskParam {
38     uint32_t timeout = 0;
39 };
40 class ISyncStateMachine;
41 
42 class SyncTaskContext : public ISyncTaskContext {
43 public:
44     SyncTaskContext();
45 
46     // Add a sync task target to the queue
47     int AddSyncTarget(ISyncTarget *target) override;
48 
49     // Set the status of this task
50     void SetOperationStatus(int status) override;
51 
52     // Clear context data
53     void Clear() override;
54 
55     // remove a sync target by syncId
56     int RemoveSyncOperation(int syncId) override;
57 
58     // If the requestTargetQueue is empty
59     bool IsTargetQueueEmpty() const override;
60 
61     // Get the status of this task
62     int GetOperationStatus() const override;
63 
64     // Set the mode of this task
65     void SetMode(int mode) override;
66 
67     // Get the mode of this task
68     int GetMode() const override;
69 
70     // Move to next target to sync
71     void MoveToNextTarget() override;
72 
73     int GetNextTarget(bool isNeedSetFinished) override;
74 
75     // Get the current task syncId
76     uint32_t GetSyncId() const override;
77 
78     // Get the current task deviceId.
79     std::string GetDeviceId() const override;
80 
81     // Set the sync task queue exec status
82     void SetTaskExecStatus(int status) override;
83 
84     // Get the sync task queue exec status
85     int GetTaskExecStatus() const override;
86 
87     // Return if now is doing auto sync
88     bool IsAutoSync() const override;
89 
90     // Set a Timer used for timeout
91     int StartTimer() override;
92 
93     // delete timer
94     void StopTimer() override;
95 
96     // modify timer
97     int ModifyTimer(int milliSeconds) override;
98 
99     // Set a RetryTime for the sync task
100     void SetRetryTime(int retryTime) override;
101 
102     // Get a RetryTime for the sync task
103     int GetRetryTime() const override;
104 
105     // Set Retry status for the sync task
106     void SetRetryStatus(int isNeedRetry) override;
107 
108     // Get Retry status for the sync task
109     int GetRetryStatus() const override;
110 
111     TimerId GetTimerId() const override;
112 
113     // Inc the current message sequenceId
114     void IncSequenceId() override;
115 
116     // Get the current initiactive sync session id
117     uint32_t GetRequestSessionId() const override;
118 
119     // Get the current message sequence id
120     uint32_t GetSequenceId() const override;
121 
122     void ReSetSequenceId() override;
123 
124     void IncPacketId();
125 
126     uint64_t GetPacketId() const;
127 
128     // Get the current watch timeout time
129     int GetTimeoutTime() const override;
130 
131     void SetTimeoutCallback(const TimerAction &timeOutCallback) override;
132 
133     // Start the sync state machine
134     int StartStateMachine() override;
135 
136     // Set the timeoffset with the remote device
137     void SetTimeOffset(TimeOffset offset) override;
138 
139     // Get the timeoffset with the remote device
140     TimeOffset GetTimeOffset() const override;
141 
142     // Used for sync message callback
143     int ReceiveMessageCallback(Message *inMsg) override;
144 
145     // used to register a callback, called when new SyncTarget added
146     void RegOnSyncTask(const std::function<int(void)> &callback) override;
147 
148     // When schedule a new task, should call this function to inc usedcount
149     int IncUsedCount() override;
150 
151     // When schedule task exit, should call this function to dec usedcount
152     void SafeExit() override;
153 
154     // Get current local time from TimeHelper
155     Timestamp GetCurrentLocalTime() const override;
156 
157     // Set the remount software version num
158     void SetRemoteSoftwareVersion(uint32_t version) override;
159 
160     // Get the remount software version num
161     uint32_t GetRemoteSoftwareVersion() const override;
162 
163     // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase.
164     // Used to check if the version num is is overdue
165     uint64_t GetRemoteSoftwareVersionId() const override;
166 
167     // Judge if the communicator is normal
168     bool IsCommNormal() const override;
169 
170     // If ability sync request set version, need call this function.
171     // Should be called with ObjLock
172     virtual void Abort(int status);
173 
174     // Used in send msg, as execution is asynchronous, should use this function to handle result.
175     static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId);
176 
177     int GetTaskErrCode() const override;
178 
179     void SetTaskErrCode(int errCode) override;
180 
181     bool IsSyncTaskNeedRetry() const override;
182 
183     void SetSyncRetry(bool isRetry) override;
184 
185     int GetSyncRetryTimes() const override;
186 
187     int GetSyncRetryTimeout(int retryTime) const override;
188 
189     void ClearAllSyncTask() override;
190 
191     bool IsAutoLiftWaterMark() const override;
192 
193     void IncNegotiationCount() override;
194 
195     // check if need trigger query auto sync and get query from inMsg
196     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
197 
198     bool IsAutoSubscribe() const override;
199 
200     bool IsCurrentSyncTaskCanBeSkipped() const override;
201 
202     virtual void ResetLastPushTaskStatus();
203 
204     bool GetIsNeedResetAbilitySync() const;
205 
206     void SetIsNeedResetAbilitySync(bool isNeedReset) override;
207 
208     void SchemaChange() override;
209 
210     void Dump(int fd) override;
211 
212     void AbortMachineIfNeed(uint32_t syncId) override;
213 protected:
214     const static int KILL_WAIT_SECONDS = INT32_MAX;
215 
216     ~SyncTaskContext() override;
217 
218     virtual int TimeOut(TimerId id);
219 
220     virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam);
221 
222     void CommErrHandlerFuncInner(int errCode, uint32_t sessionId);
223 
224     void KillWait();
225 
226     void ClearSyncOperation();
227 
228     void ClearSyncTarget();
229 
230     void CancelCurrentSyncRetryIfNeed(int newTargetMode);
231 
232     virtual void SaveLastPushTaskExecStatus(int finalStatus);
233 
234     int RunPermissionCheck(uint8_t flag) const;
235 
236     SyncOperation *GetAndIncSyncOperation() const;
237 
238     static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode);
239 
240     mutable std::mutex targetQueueLock_;
241     std::list<ISyncTarget *> requestTargetQueue_;
242     std::list<ISyncTarget *> responseTargetQueue_;
243     SyncOperation *syncOperation_;
244     mutable std::mutex operationLock_;
245     uint32_t syncId_;
246     int mode_;
247     bool isAutoSync_;
248     int status_;
249     int taskExecStatus_;
250     std::string deviceId_;
251     std::string syncActionName_;
252     ISyncInterface *syncInterface_;
253     ICommunicator *communicator_;
254     ISyncStateMachine *stateMachine_;
255     TimeOffset timeOffset_ = 0;
256     int retryTime_ = 0;
257     int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
258     uint32_t requestSessionId_ = 0;
259     uint32_t lastRequestSessionId_ = 0;
260     uint32_t sequenceId_ = 1;
261     std::function<int(void)> onSyncTaskAdd_;
262 
263     // for safe exit
264     std::condition_variable safeKill_;
265     int usedCount_ = 0;
266 
267     // for timeout callback
268     std::mutex timerLock_;
269     TimerId timerId_ = 0;
270     int timeout_ = 1000; // 1000ms
271     TimerAction timeOutCallback_;
272     std::unique_ptr<TimeHelper> timeHelper_;
273 
274     // for version sync
275     mutable std::mutex remoteSoftwareVersionLock_;
276     uint32_t remoteSoftwareVersion_;
277     uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue
278 
279     bool isCommNormal_;
280     int taskErrCode_;
281     uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above
282     bool syncTaskRetryStatus_;
283     bool isSyncRetry_;
284     uint32_t negotiationCount_;
285     bool isAutoSubscribe_;
286     // syncFinished_ need to set false if isNeedResetSyncFinished_ is true when start do abilitySync interface
287     std::atomic<bool> isNeedResetAbilitySync_;
288 
289     // For global ISyncTaskContext Set, used by CommErrCallback.
290     static std::mutex synTaskContextSetLock_;
291     static std::set<ISyncTaskContext *> synTaskContextSet_;
292 };
293 } // namespace DistributedDB
294 
295 #endif // SYNC_TASK_CONTEXT_H
296