• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_TASK_CONTEXT_H
17 #define SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 
22 #include "icommunicator.h"
23 #include "ikvdb_sync_interface.h"
24 #include "isync_task_context.h"
25 #include "meta_data.h"
26 #include "runtime_context.h"
27 #include "semaphore_utils.h"
28 #include "sync_operation.h"
29 #include "sync_target.h"
30 #include "time_helper.h"
31 
32 namespace DistributedDB {
33 enum SyncDirectionFlag {
34     SEND = 0,
35     RECEIVE = 1,
36 };
37 struct TaskParam {
38     uint32_t timeout = 0;
39 };
40 class ISyncStateMachine;
41 
42 class SyncTaskContext : public ISyncTaskContext {
43 public:
44     SyncTaskContext();
45 
46     // Add a sync task target to the queue
47     int AddSyncTarget(ISyncTarget *target) override;
48 
49     // Set the status of this task
50     void SetOperationStatus(int status) override;
51 
52     // Clear context data
53     void Clear() override;
54 
55     // remove a sync target by syncId
56     int RemoveSyncOperation(int syncId) override;
57 
58     // If the requestTargetQueue is empty
59     bool IsTargetQueueEmpty() const override;
60 
61     // Get the status of this task
62     int GetOperationStatus() const override;
63 
64     // Set the mode of this task
65     void SetMode(int mode) override;
66 
67     // Get the mode of this task
68     int GetMode() const override;
69 
70     // Move to next target to sync
71     void MoveToNextTarget(uint32_t timeout) override;
72 
73     int GetNextTarget(uint32_t timeout) override;
74 
75     // Get the current task syncId
76     uint32_t GetSyncId() const override;
77 
78     // Get the current task deviceId.
79     std::string GetDeviceId() const override;
80 
81     std::string GetTargetUserId() const override;
82 
83     void SetTargetUserId(const std::string &userId) override;
84 
85     // Set the sync task queue exec status
86     void SetTaskExecStatus(int status) override;
87 
88     // Get the sync task queue exec status
89     int GetTaskExecStatus() const override;
90 
91     // Return if now is doing auto sync
92     bool IsAutoSync() const override;
93 
94     // Set a Timer used for timeout
95     int StartTimer() override;
96 
97     // delete timer
98     void StopTimer() override;
99 
100     // modify timer
101     int ModifyTimer(int milliSeconds) override;
102 
103     // Set a RetryTime for the sync task
104     void SetRetryTime(int retryTime) override;
105 
106     // Get a RetryTime for the sync task
107     int GetRetryTime() const override;
108 
109     // Set Retry status for the sync task
110     void SetRetryStatus(int isNeedRetry) override;
111 
112     // Get Retry status for the sync task
113     int GetRetryStatus() const override;
114 
115     TimerId GetTimerId() const override;
116 
117     // Inc the current message sequenceId
118     void IncSequenceId() override;
119 
120     // Get the current initiactive sync session id
121     uint32_t GetRequestSessionId() const override;
122 
123     // Get the current message sequence id
124     uint32_t GetSequenceId() const override;
125 
126     void ReSetSequenceId() override;
127 
128     void IncPacketId();
129 
130     uint64_t GetPacketId() const;
131 
132     // Get the current watch timeout time
133     int GetTimeoutTime() const override;
134 
135     void SetTimeoutCallback(const TimerAction &timeOutCallback) override;
136 
137     // Start the sync state machine
138     int StartStateMachine() override;
139 
140     // Set the timeoffset with the remote device
141     void SetTimeOffset(TimeOffset offset) override;
142 
143     // Get the timeoffset with the remote device
144     TimeOffset GetTimeOffset() const override;
145 
146     // Used for sync message callback
147     int ReceiveMessageCallback(Message *inMsg) override;
148 
149     // used to register a callback, called when new SyncTarget added
150     void RegOnSyncTask(const std::function<int(void)> &callback) override;
151 
152     // When schedule a new task, should call this function to inc usedcount
153     int IncUsedCount() override;
154 
155     // When schedule task exit, should call this function to dec usedcount
156     void SafeExit() override;
157 
158     // Get current local time from TimeHelper
159     Timestamp GetCurrentLocalTime() const override;
160 
161     // Set the remount software version num
162     void SetRemoteSoftwareVersion(uint32_t version) override;
163 
164     // Get the remount software version num
165     uint32_t GetRemoteSoftwareVersion() const override;
166 
167     // Get the remount software version id, when called GetRemoteSoftwareVersion this id will be increase.
168     // Used to check if the version num is is overdue
169     uint64_t GetRemoteSoftwareVersionId() const override;
170 
171     // Judge if the communicator is normal
172     bool IsCommNormal() const override;
173 
174     void ClearSyncOperation() override;
175 
176     // If ability sync request set version, need call this function.
177     // Should be called with ObjLock
178     virtual void Abort(int status);
179 
180     // Used in send msg, as execution is asynchronous, should use this function to handle result.
181     static void CommErrHandlerFunc(int errCode, ISyncTaskContext *context, int32_t sessionId, bool isDirectEnd = true);
182 
183     int GetTaskErrCode() const override;
184 
185     void SetTaskErrCode(int errCode) override;
186 
187     bool IsSyncTaskNeedRetry() const override;
188 
189     void SetSyncRetry(bool isRetry) override;
190 
191     int GetSyncRetryTimes() const override;
192 
193     int GetSyncRetryTimeout(int retryTime) const override;
194 
195     void ClearAllSyncTask() override;
196 
197     bool IsAutoLiftWaterMark() const override;
198 
199     void IncNegotiationCount() override;
200 
201     // check if need trigger query auto sync and get query from inMsg
202     bool IsNeedTriggerQueryAutoSync(Message *inMsg, QuerySyncObject &query) override;
203 
204     bool IsAutoSubscribe() const override;
205 
206     bool IsCurrentSyncTaskCanBeSkipped() const override;
207 
208     virtual void ResetLastPushTaskStatus();
209 
210     void SchemaChange() override;
211 
212     void Dump(int fd) override;
213 
214     void AbortMachineIfNeed(uint32_t syncId) override;
215 
216     bool IsSchemaCompatible() const override;
217 
218     void SetDbAbility(DbAbility &remoteDbAbility) override;
219 
220     void TimeChange() override;
221 
222     int32_t GetResponseTaskCount() override;
223 
224     int GetCommErrCode() const;
225 
226     void SetCommFailErrCode(int errCode);
227 protected:
228     const static int KILL_WAIT_SECONDS = INT32_MAX;
229 
230     ~SyncTaskContext() override;
231 
232     virtual int TimeOut(TimerId id);
233 
234     virtual void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam);
235 
236     void CommErrHandlerFuncInner(int errCode, uint32_t sessionId, bool isDirectEnd = true);
237 
238     void KillWait();
239 
240     void ClearSyncTarget();
241 
242     void CancelCurrentSyncRetryIfNeed(int newTargetMode, uint32_t syncId);
243 
244     virtual void SaveLastPushTaskExecStatus(int finalStatus);
245 
246     int RunPermissionCheck(uint8_t flag) const;
247 
248     SyncOperation *GetAndIncSyncOperation() const;
249 
250     uint32_t GenerateRequestSessionId();
251 
252     static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode);
253 
254     void SetErrCodeWhenWaitTimeOut(int errCode);
255 
256     mutable std::mutex targetQueueLock_;
257     std::list<ISyncTarget *> requestTargetQueue_;
258     std::list<ISyncTarget *> responseTargetQueue_;
259     SyncOperation *syncOperation_;
260     mutable std::mutex operationLock_;
261     volatile uint32_t syncId_;
262     volatile int mode_;
263     volatile bool isAutoSync_;
264     volatile int status_;
265     volatile int taskExecStatus_;
266     std::string deviceId_;
267     std::string targetUserId_;
268     std::string syncActionName_;
269     ISyncInterface *syncInterface_;
270     ICommunicator *communicator_;
271     ISyncStateMachine *stateMachine_;
272     TimeOffset timeOffset_ = 0;
273     volatile int retryTime_ = 0;
274     volatile int isNeedRetry_ = SyncTaskContext::NO_NEED_RETRY;
275     volatile uint32_t requestSessionId_ = 0;
276     volatile uint32_t lastRequestSessionId_ = 0;
277     volatile uint32_t sequenceId_ = 1;
278     std::function<int(void)> onSyncTaskAdd_;
279 
280     // for safe exit
281     std::condition_variable safeKill_;
282     volatile int usedCount_ = 0;
283 
284     // for timeout callback
285     std::mutex timerLock_;
286     TimerId timerId_ = 0;
287     int timeout_ = 1000; // 1000ms
288     TimerAction timeOutCallback_;
289     std::unique_ptr<TimeHelper> timeHelper_;
290 
291     // for version sync
292     mutable std::mutex remoteSoftwareVersionLock_;
293     volatile uint32_t remoteSoftwareVersion_;
294     volatile uint64_t remoteSoftwareVersionId_; // Check if the remoteSoftwareVersion_ is is overdue
295 
296     volatile bool isCommNormal_;
297     volatile int taskErrCode_;
298     std::atomic<int> commErrCode_;
299     volatile uint64_t packetId_ = 0; // used for assignment to reSendMap_.ReSendInfo.packetId in 103 version or above
300     volatile bool syncTaskRetryStatus_;
301     volatile bool isSyncRetry_;
302     volatile uint32_t negotiationCount_;
303     volatile bool isAutoSubscribe_;
304 
305     // For global ISyncTaskContext Set, used by CommErrCallback.
306     static std::mutex synTaskContextSetLock_;
307     static std::set<ISyncTaskContext *> synTaskContextSet_;
308 };
309 } // namespace DistributedDB
310 
311 #endif // SYNC_TASK_CONTEXT_H
312