1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_SYNC_TASK_CONTEXT_H 17 #define SINGLE_VER_SYNC_TASK_CONTEXT_H 18 19 #include <list> 20 #include <mutex> 21 #include <string> 22 #include <unordered_map> 23 24 #include "db_ability.h" 25 #include "query_sync_object.h" 26 #include "schema_negotiate.h" 27 #include "single_ver_kvdb_sync_interface.h" 28 #include "single_ver_sync_target.h" 29 #include "subscribe_manager.h" 30 #include "sync_target.h" 31 #include "sync_task_context.h" 32 #include "time_helper.h" 33 34 35 namespace DistributedDB { 36 class SingleVerSyncTaskContext : public SyncTaskContext { 37 public: 38 39 explicit SingleVerSyncTaskContext(); 40 41 DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncTaskContext); 42 43 // Init SingleVerSyncTaskContext 44 int Initialize(const std::string &deviceId, ISyncInterface *syncInterface, std::shared_ptr<Metadata> &metadata, 45 ICommunicator *communicator) override; 46 47 // Add a sync task target with the operation to the queue 48 int AddSyncOperation(SyncOperation *operation) override; 49 50 bool IsCurrentSyncTaskCanBeSkipped() const override; 51 52 // Set the end water mark of this task 53 void SetEndMark(WaterMark endMark); 54 55 // Get the end water mark of this task 56 WaterMark GetEndMark() const; 57 58 void GetContinueToken(ContinueToken &outToken) const; 59 60 void SetContinueToken(ContinueToken token); 61 62 void ReleaseContinueToken(); 63 64 int PopResponseTarget(SingleVerSyncTarget &target); 65 66 int GetRspTargetQueueSize() const; 67 68 // responseSessionId used for mark the pull response task 69 void SetResponseSessionId(uint32_t responseSessionId); 70 71 // responseSessionId used for mark the pull response task 72 uint32_t GetResponseSessionId() const; 73 74 void Clear() override; 75 76 void Abort(int status) override; 77 78 void ClearAllSyncTask() override; 79 80 // If set true, remote stale data will be clear when remote db rebuiled. 81 void EnableClearRemoteStaleData(bool enable); 82 83 // Check if need to clear remote device stale data in syncing, when the remote db rebuilt. 84 bool IsNeedClearRemoteStaleData() const; 85 86 // start a timer to ResetWatchDog when sync data one (key,value) size bigger than mtu 87 bool StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag); 88 89 // stop timer to ResetWatchDog when sync data one (key,value) size bigger than mtu 90 void StopFeedDogForSync(SyncDirectionFlag flag); 91 92 virtual int HandleDataRequestRecv(const Message *msg); 93 94 // is receive warterMark err 95 bool IsReceiveWaterMarkErr() const; 96 97 // set receive warterMark err 98 void SetReceiveWaterMarkErr(bool isErr); 99 100 void SetRemoteSeccurityOption(SecurityOption secOption); 101 102 SecurityOption GetRemoteSeccurityOption() const; 103 104 void SetReceivcPermitCheck(bool isChecked); 105 106 bool GetReceivcPermitCheck() const; 107 108 void SetSendPermitCheck(bool isChecked); 109 110 bool GetSendPermitCheck() const; 111 112 virtual SyncStrategy GetSyncStrategy(QuerySyncObject &querySyncObject) const = 0; 113 114 void SetIsSchemaSync(bool isSchemaSync); 115 116 bool GetIsSchemaSync() const; 117 118 bool IsSkipTimeoutError(int errCode) const; 119 120 bool FindResponseSyncTarget(uint32_t responseSessionId) const; 121 122 // For query sync 123 void SetQuery(const QuerySyncObject &query); 124 const QuerySyncObject &GetQuery() const; 125 void SetQuerySync(bool isQuerySync); 126 bool IsQuerySync() const; 127 std::set<CompressAlgorithm> GetRemoteCompressAlgo() const; 128 std::string GetRemoteCompressAlgoStr() const; 129 void SetDbAbility(DbAbility &remoteDbAbility); 130 CompressAlgorithm ChooseCompressAlgo() const; 131 bool IsNotSupportAbility(const AbilityItem &abilityItem) const; 132 133 void SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager); 134 std::shared_ptr<SubscribeManager> GetSubscribeManager() const; 135 136 void SaveLastPushTaskExecStatus(int finalStatus) override; 137 void ResetLastPushTaskStatus() override; 138 139 virtual std::string GetQuerySyncId() const = 0; 140 virtual std::string GetDeleteSyncId() const = 0; 141 142 void SetCommNormal(bool isCommNormal); 143 144 void StartFeedDogForGetData(uint32_t sessionId); 145 void StopFeedDogForGetData(); 146 protected: 147 ~SingleVerSyncTaskContext() override; 148 void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) override; 149 150 // For querySync 151 QuerySyncObject query_; 152 bool isQuerySync_ = false; 153 154 // for merge sync task 155 volatile int lastFullSyncTaskStatus_ = SyncOperation::Status::OP_WAITING; 156 private: 157 int GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation, uint64_t &waterMark) const; 158 159 bool IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const; 160 161 constexpr static int64_t REDUNDACE_WATER_MARK = 1 * 1000LL * 1000LL * 10LL; // 1s 162 163 DECLARE_OBJECT_TAG(SingleVerSyncTaskContext); 164 165 ContinueToken token_; 166 WaterMark endMark_; 167 uint32_t responseSessionId_ = 0; 168 169 bool needClearRemoteStaleData_; 170 SecurityOption remoteSecOption_ = {0, 0}; // remote targe can handle secOption data or not. 171 bool isReceivcPermitChecked_ = false; 172 bool isSendPermitChecked_ = false; 173 std::atomic<bool> isSchemaSync_ = false; 174 175 // is receive waterMark err, peerWaterMark bigger than remote localWaterMark 176 bool isReceiveWaterMarkErr_ = false; 177 178 // For db ability 179 mutable std::mutex remoteDbAbilityLock_; 180 DbAbility remoteDbAbility_; 181 182 // For subscribe manager 183 std::shared_ptr<SubscribeManager> subManager_; 184 185 mutable std::mutex queryTaskStatusMutex_; 186 // <queryId, lastExcuStatus> 187 std::unordered_map<std::string, int> lastQuerySyncTaskStatusMap_; 188 }; 189 } // namespace DistributedDB 190 191 #endif // SYNC_TASK_CONTEXT_H 192