• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_SYNC_TASK_CONTEXT_H
17 #define SINGLE_VER_SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 #include <string>
22 #include <unordered_map>
23 
24 #include "db_ability.h"
25 #include "query_sync_object.h"
26 #include "schema_negotiate.h"
27 #include "single_ver_kvdb_sync_interface.h"
28 #include "single_ver_sync_target.h"
29 #include "subscribe_manager.h"
30 #include "sync_target.h"
31 #include "sync_task_context.h"
32 #include "time_helper.h"
33 
34 
35 namespace DistributedDB {
36 class SingleVerSyncTaskContext : public SyncTaskContext {
37 public:
38 
39     explicit SingleVerSyncTaskContext();
40 
41     DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncTaskContext);
42 
43     // Init SingleVerSyncTaskContext
44     int Initialize(const std::string &deviceId, ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata,
45         ICommunicator *communicator) override;
46 
47     // Add a sync task target with the operation to the queue
48     int AddSyncOperation(SyncOperation *operation) override;
49 
50     bool IsCurrentSyncTaskCanBeSkipped() const override;
51 
52     // Set the end water mark of this task
53     void SetEndMark(WaterMark endMark);
54 
55     // Get the end water mark of this task
56     WaterMark GetEndMark() const;
57 
58     void GetContinueToken(ContinueToken &outToken) const;
59 
60     void SetContinueToken(ContinueToken token);
61 
62     void ReleaseContinueToken();
63 
64     int PopResponseTarget(SingleVerSyncTarget &target);
65 
66     int GetRspTargetQueueSize() const;
67 
68     // responseSessionId used for mark the pull response task
69     void SetResponseSessionId(uint32_t responseSessionId);
70 
71     // responseSessionId used for mark the pull response task
72     uint32_t GetResponseSessionId() const;
73 
74     void Clear() override;
75 
76     void Abort(int status) override;
77 
78     void ClearAllSyncTask() override;
79 
80     // If set true, remote stale data will be clear when remote db rebuiled.
81     void EnableClearRemoteStaleData(bool enable);
82 
83     // Check if need to clear remote device stale data in syncing, when the remote db rebuilt.
84     bool IsNeedClearRemoteStaleData() const;
85 
86     // start a timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
87     bool StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag);
88 
89     // stop timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
90     void StopFeedDogForSync(SyncDirectionFlag flag);
91 
92     virtual int HandleDataRequestRecv(const Message *msg);
93 
94     // is receive warterMark err
95     bool IsReceiveWaterMarkErr() const;
96 
97     // set receive warterMark err
98     void SetReceiveWaterMarkErr(bool isErr);
99 
100     void SetRemoteSeccurityOption(SecurityOption secOption);
101 
102     SecurityOption GetRemoteSeccurityOption() const;
103 
104     void SetReceivcPermitCheck(bool isChecked);
105 
106     bool GetReceivcPermitCheck() const;
107 
108     void SetSendPermitCheck(bool isChecked);
109 
110     bool GetSendPermitCheck() const;
111 
112     virtual SyncStrategy GetSyncStrategy(QuerySyncObject &querySyncObject) const = 0;
113 
114     void SetIsSchemaSync(bool isSchemaSync);
115 
116     bool GetIsSchemaSync() const;
117 
118     bool IsSkipTimeoutError(int errCode) const;
119 
120     bool FindResponseSyncTarget(uint32_t responseSessionId) const;
121 
122     // For query sync
123     void SetQuery(const QuerySyncObject &query);
124     const QuerySyncObject &GetQuery() const;
125     void SetQuerySync(bool isQuerySync);
126     bool IsQuerySync() const;
127     std::set<CompressAlgorithm> GetRemoteCompressAlgo() const;
128     std::string GetRemoteCompressAlgoStr() const;
129     void SetDbAbility(DbAbility &remoteDbAbility);
130     CompressAlgorithm ChooseCompressAlgo() const;
131     bool IsNotSupportAbility(const AbilityItem &abilityItem) const;
132 
133     void SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager);
134     std::shared_ptr<SubscribeManager> GetSubscribeManager() const;
135 
136     void SaveLastPushTaskExecStatus(int finalStatus) override;
137     void ResetLastPushTaskStatus() override;
138 
139     virtual std::string GetQuerySyncId() const = 0;
140     virtual std::string GetDeleteSyncId() const = 0;
141 
142     void SetCommNormal(bool isCommNormal);
143 
144     void StartFeedDogForGetData(uint32_t sessionId);
145     void StopFeedDogForGetData();
146 protected:
147     ~SingleVerSyncTaskContext() override;
148     void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) override;
149 
150     // For querySync
151     QuerySyncObject query_;
152     bool isQuerySync_ = false;
153 
154     // for merge sync task
155     volatile int lastFullSyncTaskStatus_ = SyncOperation::Status::OP_WAITING;
156 private:
157     int GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation, uint64_t &waterMark) const;
158 
159     bool IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const;
160 
161     constexpr static int64_t REDUNDACE_WATER_MARK = 1 * 1000LL * 1000LL * 10LL; // 1s
162 
163     DECLARE_OBJECT_TAG(SingleVerSyncTaskContext);
164 
165     ContinueToken token_;
166     WaterMark endMark_;
167     uint32_t responseSessionId_ = 0;
168 
169     bool needClearRemoteStaleData_;
170     SecurityOption remoteSecOption_ = {0, 0}; // remote targe can handle secOption data or not.
171     bool isReceivcPermitChecked_ = false;
172     bool isSendPermitChecked_ = false;
173     std::atomic<bool> isSchemaSync_ = false;
174 
175     // is receive waterMark err, peerWaterMark bigger than remote localWaterMark
176     bool isReceiveWaterMarkErr_ = false;
177 
178     // For db ability
179     mutable std::mutex remoteDbAbilityLock_;
180     DbAbility remoteDbAbility_;
181 
182     // For subscribe manager
183     std::shared_ptr<SubscribeManager> subManager_;
184 
185     mutable std::mutex queryTaskStatusMutex_;
186     // <queryId, lastExcuStatus>
187     std::unordered_map<std::string, int> lastQuerySyncTaskStatusMap_;
188 };
189 } // namespace DistributedDB
190 
191 #endif // SYNC_TASK_CONTEXT_H
192