• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_SYNC_TASK_CONTEXT_H
17 #define SINGLE_VER_SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 #include <string>
22 #include <unordered_map>
23 
24 #include "db_ability.h"
25 #include "query_sync_object.h"
26 #include "schema_negotiate.h"
27 #include "single_ver_kvdb_sync_interface.h"
28 #include "single_ver_sync_target.h"
29 #include "subscribe_manager.h"
30 #include "sync_target.h"
31 #include "sync_task_context.h"
32 #include "time_helper.h"
33 
34 
35 namespace DistributedDB {
36 class SingleVerSyncTaskContext : public SyncTaskContext {
37 public:
38     explicit SingleVerSyncTaskContext();
39 
40     DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncTaskContext);
41 
42     // Init SingleVerSyncTaskContext
43     int Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
44         const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator) override;
45 
46     // Add a sync task target with the operation to the queue
47     int AddSyncOperation(SyncOperation *operation) override;
48 
49     bool IsCurrentSyncTaskCanBeSkipped() const override;
50 
51     // Set the end watermark of this task
52     void SetEndMark(WaterMark endMark);
53 
54     // Get the end watermark of this task
55     WaterMark GetEndMark() const;
56 
57     void GetContinueToken(ContinueToken &outToken) const;
58 
59     void SetContinueToken(ContinueToken token);
60 
61     void ReleaseContinueToken();
62 
63     int PopResponseTarget(SingleVerSyncTarget &target);
64 
65     int GetRspTargetQueueSize() const;
66 
67     // responseSessionId used for mark the pull response task
68     void SetResponseSessionId(uint32_t responseSessionId);
69 
70     // responseSessionId used for mark the pull response task
71     uint32_t GetResponseSessionId() const;
72 
73     void Clear() override;
74 
75     void Abort(int status) override;
76 
77     void ClearAllSyncTask() override;
78 
79     // If set true, remote stale data will be clear when remote db rebuilt.
80     void EnableClearRemoteStaleData(bool enable);
81 
82     // Check if need to clear remote device stale data in syncing, when the remote db rebuilt.
83     bool IsNeedClearRemoteStaleData() const;
84 
85     // start a timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
86     bool StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag);
87 
88     // stop timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
89     void StopFeedDogForSync(SyncDirectionFlag flag);
90 
91     // is receive waterMark err
92     bool IsReceiveWaterMarkErr() const;
93 
94     // set receive waterMark err
95     void SetReceiveWaterMarkErr(bool isErr);
96 
97     void SetRemoteSeccurityOption(SecurityOption secOption);
98 
99     SecurityOption GetRemoteSeccurityOption() const;
100 
101     void SetReceivcPermitCheck(bool isChecked);
102 
103     bool GetReceivcPermitCheck() const;
104 
105     void SetSendPermitCheck(bool isChecked);
106 
107     bool GetSendPermitCheck() const;
108     // pair<bool, bool>: first:SyncStrategy.permitSync, second: isSchemaSync_
109     virtual std::pair<bool, bool> GetSchemaSyncStatus(QuerySyncObject &querySyncObject) const = 0;
110 
111     bool IsSkipTimeoutError(int errCode) const;
112 
113     bool FindResponseSyncTarget(uint32_t responseSessionId) const;
114 
115     // For query sync
116     void SetQuery(const QuerySyncObject &query);
117     QuerySyncObject GetQuery() const;
118     void SetQuerySync(bool isQuerySync);
119     bool IsQuerySync() const;
120     std::set<CompressAlgorithm> GetRemoteCompressAlgo() const;
121     std::string GetRemoteCompressAlgoStr() const;
122     void SetDbAbility(DbAbility &remoteDbAbility) override;
123     CompressAlgorithm ChooseCompressAlgo() const;
124     bool IsNotSupportAbility(const AbilityItem &abilityItem) const;
125 
126     void SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager);
127     std::shared_ptr<SubscribeManager> GetSubscribeManager() const;
128 
129     void SaveLastPushTaskExecStatus(int finalStatus) override;
130     void ResetLastPushTaskStatus() override;
131 
132     virtual std::string GetQuerySyncId() const = 0;
133     virtual std::string GetDeleteSyncId() const = 0;
134 
135     void SetCommNormal(bool isCommNormal);
136 
137     void StartFeedDogForGetData(uint32_t sessionId);
138     void StopFeedDogForGetData();
139 
140     void UpdateOperationFinishedCount(const std::string &deviceId, uint32_t count);
141     void SetOperationSyncProcessTotal(const std::string &deviceId, uint32_t total);
142 
143     void SetInitWaterMark(WaterMark waterMark);
144     WaterMark GetInitWaterMark() const;
145     void SetInitDeletedMark(WaterMark waterMark);
146     WaterMark GetInitDeletedMark() const;
147 
148     int32_t GetResponseTaskCount() override;
149 
150 protected:
151     ~SingleVerSyncTaskContext() override;
152     void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) override;
153 
154     // For querySync
155     mutable std::mutex queryMutex_;
156     QuerySyncObject query_;
157     bool isQuerySync_ = false;
158 
159     // for merge sync task
160     volatile int lastFullSyncTaskStatus_ = SyncOperation::Status::OP_WAITING;
161 private:
162     int GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation, uint64_t &waterMark) const;
163 
164     bool IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const;
165 
166     DECLARE_OBJECT_TAG(SingleVerSyncTaskContext);
167 
168     ContinueToken token_;
169     WaterMark endMark_;
170     volatile uint32_t responseSessionId_ = 0;
171 
172     bool needClearRemoteStaleData_;
173     mutable std::mutex securityOptionMutex_;
174     SecurityOption remoteSecOption_ = {0, 0}; // remote targe can handle secOption data or not.
175     volatile bool isReceivcPermitChecked_ = false;
176     volatile bool isSendPermitChecked_ = false;
177 
178     // is receive waterMark err, peerWaterMark bigger than remote localWaterMark
179     volatile bool isReceiveWaterMarkErr_ = false;
180 
181     // For db ability
182     mutable std::mutex remoteDbAbilityLock_;
183     DbAbility remoteDbAbility_;
184 
185     // For subscribe manager
186     std::shared_ptr<SubscribeManager> subManager_;
187 
188     mutable std::mutex queryTaskStatusMutex_;
189     // <queryId, lastExcuStatus>
190     std::unordered_map<std::string, int> lastQuerySyncTaskStatusMap_;
191     // Initial Water Mark when the sync task launched.
192     WaterMark initWaterMark_ = 0;
193     WaterMark initDeletedMark_ = 0;
194 };
195 } // namespace DistributedDB
196 
197 #endif // SYNC_TASK_CONTEXT_H
198