• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_TASK_CONTEXT_H
17 #define SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 
22 #include "icommunicator.h"
23 #include "ikvdb_sync_interface.h"
24 #include "isync_task_context.h"
25 #include "meta_data.h"
26 #include "runtime_context.h"
27 #include "semaphore_utils.h"
28 #include "sync_operation.h"
29 #include "sync_target.h"
30 #include "time_helper.h"
31 
32 namespace DistributedDB {
33 enum SyncDirectionFlag {
34     SEND = 0,
35     RECEIVE = 1,
36 };
37 struct TaskParam {
38     uint32_t timeout = 0;
39 };
40 class ISyncStateMachine;
41 
42 class SyncTaskContext : public ISyncTaskContext {
43 public:
44     SyncTaskContext();
45 
46     // Add a sync task target to the queue
47     int AddSyncTarget(ISyncTarget *target) override;
48 
49     // Set the status of this task
50     void SetOperationStatus(int status) override;
51 
52     // Clear context data
53     void Clear() override;
54 
55     // remove a sync target by syncId
56     int RemoveSyncOperation(int syncId) override;
57 
58     // If the requestTargetQueue is empty
59     bool IsTargetQueueEmpty() const override;
60 
61     // Get the status of this task
62     int GetOperationStatus() const override;
63 
64     // Set the mode of this task
65     void SetMode(int mode) override;
66 
67     // Get the mode of this task
68     int GetMode() const override;
69 
70     // Move to next target to sync
71     void MoveToNextTarget() override;
72 
73     int GetNextTarget() override;
74 
75     // Get the current task syncId
76     uint32_t GetSyncId() const override;
77 
78     // Get the current task deviceId.
79     std::string GetDeviceId() const override;
80 
81     // Set the sync task queue exec status
82     void SetTaskExecStatus(int status) override;
83 
84     // Get the sync task queue exec status
85     int GetTaskExecStatus() const override;
86 
87     // Return if now is doing auto sync
88     bool IsAutoSync() const override;
89 
90     // Set a Timer used for timeout
91     int StartTimer() override;
92 
93     // delete timer
94     void StopTimer() override;
95 
96     // modify timer
97     int ModifyTimer(int milliSeconds) override;
98 
99     // Set a RetryTime for the sync task
100     void SetRetryTime(int retryTime) override;
101 
102     // Get a RetryTime for the sync task
103     int GetRetryTime() const override;
104 
105     // Set Retry status for the sync task
106     void SetRetryStatus(int isNeedRetry) override;
107 
108     // Get Retry status for the sync task
109     int GetRetryStatus() const override;
110 
111     TimerId GetTimerId() const override;
112 
113     // Inc the current message sequenceId
114     void IncSequenceId() override;
115 
116     // Get the current initiactive sync session id
117     uint32_t GetRequestSessionId() const override;
118 
119     // Get the current message sequence id
120     uint32_t GetSequenceId() const override;
121 
122     void ReSetSequenceId() override;
123 
124     void IncPacketId();
125 
126     uint64_t GetPacketId() const;
127 
128     // Get the current watch timeout time
129     int GetTimeoutTime() const override;
130 
131     void SetTimeoutCallback(const TimerAction &timeOutCallback) override;
132 
133     // Start the sync state machine
134     int StartStateMachine() override;
135 
136     // Set the timeoffset with the remote device
137     void SetTimeOffset(TimeOffset offset) override;
138 
139     // Get the timeoffset with the remote device
140     TimeOffset GetTimeOffset() const override;
141 
142     // Used for sync message callback
143     int ReceiveMessageCallback(Message *inMsg) override;
144 
145     // used to register a callback, called when new SyncTarget added
146     void RegOnSyncTask(const std::function<int(void)> &callback) override;
147 
148     // When schedule a new task, should call this function to inc usedcount
149     int IncUsedCount() override;
150 
151     // When schedule task exit, should call this function to dec usedcount
152     void SafeExit() override;
153 
154     // Get current local time from TimeHelper
155     Timestamp GetCurrentLocalTime() const override;
156 
157     // Set the remount software version num
158     void SetRemoteSoftwareVersion(uint32_t version) override;
159 
160     // Get the remount software version num
161     uint32_t GetRemoteSoftwareVersion() const override;
162 
163     // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase.
164     // Used to check if the version num is is overdue
165     uint64_t GetRemoteSoftwareVersionId() const override;
166 
167     // Judge if the communicator is normal
168     bool IsCommNormal() const override;
169 
170     void ClearSyncOperation() override;
171 
172     // If ability sync request set version, need call this function.
173     // Should be called with ObjLock
174     virtual void Abort(int status);
175 
176     // Used in send msg, as execution is asynchronous, should use this function to handle result.
177     static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId);
178 
179     int GetTaskErrCode() const override;
180 
181     void SetTaskErrCode(int errCode) override;
182 
183     bool IsSyncTaskNeedRetry() const override;
184 
185     void SetSyncRetry(bool isRetry) override;
186 
187     int GetSyncRetryTimes() const override;
188 
189     int GetSyncRetryTimeout(int retryTime) const override;
190 
191     void ClearAllSyncTask() override;
192 
193     bool IsAutoLiftWaterMark() const override;
194 
195     void IncNegotiationCount() override;
196 
197     // check if need trigger query auto sync and get query from inMsg
198     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
199 
200     bool IsAutoSubscribe() const override;
201 
202     bool IsCurrentSyncTaskCanBeSkipped() const override;
203 
204     virtual void ResetLastPushTaskStatus();
205 
206     void SchemaChange() override;
207 
208     void Dump(int fd) override;
209 
210     void AbortMachineIfNeed(uint32_t syncId) override;
211 
212     bool IsSchemaCompatible() const override;
213 
214     void SetDbAbility(DbAbility &remoteDbAbility) override;
215 
216     void TimeChange() override;
217 
218     int32_t GetResponseTaskCount() override;
219 protected:
220     const static int KILL_WAIT_SECONDS = INT32_MAX;
221 
222     ~SyncTaskContext() override;
223 
224     virtual int TimeOut(TimerId id);
225 
226     virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam);
227 
228     void CommErrHandlerFuncInner(int errCode, uint32_t sessionId);
229 
230     void KillWait();
231 
232     void ClearSyncTarget();
233 
234     void CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId);
235 
236     virtual void SaveLastPushTaskExecStatus(int finalStatus);
237 
238     int RunPermissionCheck(uint8_t flag) const;
239 
240     SyncOperation *GetAndIncSyncOperation() const;
241 
242     uint32_t GenerateRequestSessionId();
243 
244     static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode);
245 
246     mutable std::mutex targetQueueLock_;
247     std::list<ISyncTarget *> requestTargetQueue_;
248     std::list<ISyncTarget *> responseTargetQueue_;
249     SyncOperation *syncOperation_;
250     mutable std::mutex operationLock_;
251     volatile uint32_t syncId_;
252     volatile int mode_;
253     volatile bool isAutoSync_;
254     volatile int status_;
255     volatile int taskExecStatus_;
256     std::string deviceId_;
257     std::string syncActionName_;
258     ISyncInterface *syncInterface_;
259     ICommunicator *communicator_;
260     ISyncStateMachine *stateMachine_;
261     TimeOffset timeOffset_ = 0;
262     volatile int retryTime_ = 0;
263     volatile int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
264     volatile uint32_t requestSessionId_ = 0;
265     volatile uint32_t lastRequestSessionId_ = 0;
266     volatile uint32_t sequenceId_ = 1;
267     std::function<int(void)> onSyncTaskAdd_;
268 
269     // for safe exit
270     std::condition_variable safeKill_;
271     volatile int usedCount_ = 0;
272 
273     // for timeout callback
274     std::mutex timerLock_;
275     TimerId timerId_ = 0;
276     int timeout_ = 1000; // 1000ms
277     TimerAction timeOutCallback_;
278     std::unique_ptr<TimeHelper> timeHelper_;
279 
280     // for version sync
281     mutable std::mutex remoteSoftwareVersionLock_;
282     volatile uint32_t remoteSoftwareVersion_;
283     volatile uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue
284 
285     volatile bool isCommNormal_;
286     volatile int taskErrCode_;
287     volatile uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above
288     volatile bool syncTaskRetryStatus_;
289     volatile bool isSyncRetry_;
290     volatile uint32_t negotiationCount_;
291     volatile bool isAutoSubscribe_;
292 
293     // For global ISyncTaskContext Set, used by CommErrCallback.
294     static std::mutex synTaskContextSetLock_;
295     static std::set<ISyncTaskContext *> synTaskContextSet_;
296 };
297 } // namespace DistributedDB
298 
299 #endif // SYNC_TASK_CONTEXT_H
300