1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_OPERATION_H 17 #define SYNC_OPERATION_H 18 19 #include <functional> 20 #include <map> 21 #include <mutex> 22 #include <string> 23 #include <vector> 24 25 #include "ikvdb_sync_interface.h" 26 #include "isyncer.h" 27 #include "notification_chain.h" 28 #include "query_sync_object.h" 29 #include "ref_object.h" 30 #include "runtime_context.h" 31 #include "semaphore_utils.h" 32 #include "sync_types.h" 33 34 namespace DistributedDB { 35 class SyncOperation : public RefObject { 36 public: 37 enum Status { 38 OP_WAITING = 0, 39 OP_SYNCING, 40 OP_SEND_FINISHED, 41 OP_RECV_FINISHED, 42 OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status. 43 OP_FAILED, 44 OP_TIMEOUT, 45 OP_PERMISSION_CHECK_FAILED, 46 OP_COMM_ABNORMAL, 47 OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local 48 OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error 49 OP_BUSY_FAILURE, 50 OP_SCHEMA_INCOMPATIBLE, 51 OP_QUERY_FORMAT_FAILURE, 52 OP_QUERY_FIELD_FAILURE, 53 OP_NOT_SUPPORT, 54 OP_INTERCEPT_DATA_FAIL, 55 OP_MAX_LIMITS, 56 OP_SCHEMA_CHANGED, 57 OP_INVALID_ARGS, 58 OP_USER_CHANGED, 59 OP_DENIED_SQL, 60 OP_NOTADB_OR_CORRUPTED, 61 OP_DB_CLOSING, 62 }; 63 64 using UserCallback = std::function<void(std::map<std::string, int>)>; 65 using OnSyncFinished = std::function<void(int)>; 66 using OnSyncFinalize = std::function<void(void)>; 67 68 SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode, 69 const UserCallback &userCallback, bool isBlockSync); 70 71 SyncOperation(uint32_t syncId, const ISyncer::SyncParam ¶m); 72 73 DISABLE_COPY_ASSIGN_MOVE(SyncOperation); 74 75 // Init the status for callback 76 int Initialize(); 77 78 // Set the OnSyncFinalize callback 79 void SetOnSyncFinalize(const OnSyncFinalize &callback); 80 81 // Set the OnSyncFinished callback, it will be called either success or failed. 82 void SetOnSyncFinished(const OnSyncFinished &callback); 83 84 // Set the sync status, running or finished 85 void SetStatus(const std::string &deviceId, int status, int commErrCode = E_OK); 86 87 // Set the unfinished devices sync status, running or finished 88 void SetUnfinishedDevStatus(int status); 89 90 // Set the identifier, used in SyncOperation::Finished 91 void SetIdentifier(const std::vector<uint8_t> &identifier); 92 93 // Get the sync status, running or finished 94 int GetStatus(const std::string &deviceId) const; 95 96 // Get the sync id. 97 uint32_t GetSyncId() const; 98 99 // Get the sync mode 100 int GetMode() const; 101 102 // Used to call the onFinished and caller's on complete 103 void Finished(); 104 105 // Get the deviceId of this sync status 106 const std::vector<std::string> &GetDevices() const; 107 108 // Wait if it's a block sync 109 void WaitIfNeed(); 110 111 // Notify if it's a block sync 112 void NotifyIfNeed(); 113 114 // Return if this sync is auto sync 115 bool IsAutoSync() const; 116 117 // Return if this sync is block sync 118 bool IsBlockSync() const; 119 120 // Return if this sync is AUTO_SUBSCRIBE_QUERY 121 bool IsAutoControlCmd() const; 122 123 // Check if All devices sync finished. 124 bool CheckIsAllFinished() const; 125 126 bool IsRetryTask() const; 127 128 // For query sync 129 void SetQuery(const QuerySyncObject &query); 130 void GetQuery(QuerySyncObject &targetObject) const; 131 bool IsQuerySync() const; 132 std::string GetQueryId() const; 133 static SyncType GetSyncType(int mode); 134 static int TransferSyncMode(int mode); 135 136 static DBStatus DBStatusTrans(int operationStatus); 137 138 static ProcessStatus DBStatusTransProcess(int operationStatus); 139 140 void SetSyncContext(RefObject *context); 141 142 bool CanCancel(); 143 144 void SetSyncProcessCallFun(DeviceSyncProcessCallback callBack); 145 146 void SetSyncProcessTotal(const std::string &deviceId, uint32_t total); 147 148 void UpdateFinishedCount(const std::string &deviceId, uint32_t count); 149 150 protected: 151 virtual ~SyncOperation(); 152 153 RefObject *context_ = nullptr; 154 private: 155 DECLARE_OBJECT_TAG(SyncOperation); 156 157 // called by destruction 158 void Finalize(); 159 160 static std::string GetFinishDetailMsg(const std::map<std::string, int> &finishStatus); 161 162 void ReplaceCommErrCode(std::map<std::string, int> &finishStatus); 163 164 // The device list 165 const std::vector<std::string> devices_; 166 167 // The Syncid 168 uint32_t syncId_; 169 170 // The sync mode_ see SyncMode 171 int mode_; 172 173 // The callback caller registered 174 UserCallback userCallback_; 175 176 // The callback caller registered, when sync timeout, call 177 OnSyncFinished onFinished_; 178 179 // The callback caller registered, will be called when destruction. 180 OnSyncFinalize onFinalize_; 181 182 // The syncProcess callback caller registered 183 DeviceSyncProcessCallback userSyncProcessCallback_; 184 185 // The device id we sync with 186 std::map<std::string, int> statuses_; 187 188 // passthrough errCode 189 std::map<std::string, int> commErrCodeMap_; 190 191 // Is this operation is a block sync 192 volatile bool isBlockSync_; 193 194 // Is this operation is an auto sync 195 volatile bool isAutoSync_; 196 197 // Is this operation has finished 198 volatile bool isFinished_; 199 200 // Used for block sync 201 std::unique_ptr<SemaphoreUtils> semaphore_; 202 203 mutable std::mutex queryMutex_; 204 QuerySyncObject query_; 205 volatile bool isQuerySync_; 206 207 volatile bool isAutoSubscribe_; 208 209 volatile bool isRetry_; 210 211 // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished 212 std::string identifier_; 213 214 // The device id we syncProcess with 215 std::map<std::string, DeviceSyncProcess> syncProcessMap_; 216 217 // Can be cancelled 218 bool canCancel_ = false; 219 220 void ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap); 221 }; 222 } // namespace DistributedDB 223 224 #endif // SYNC_OPERATION_H 225