1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_OPERATION_H 17 #define SYNC_OPERATION_H 18 19 #include <functional> 20 #include <string> 21 #include <vector> 22 #include <map> 23 #include <mutex> 24 25 #include "ikvdb_sync_interface.h" 26 #include "notification_chain.h" 27 #include "query_sync_object.h" 28 #include "ref_object.h" 29 #include "runtime_context.h" 30 #include "semaphore_utils.h" 31 #include "sync_types.h" 32 33 namespace DistributedDB { 34 class SyncOperation : public RefObject { 35 public: 36 enum Status { 37 OP_WAITING = 0, 38 OP_SYNCING, 39 OP_SEND_FINISHED, 40 OP_RECV_FINISHED, 41 OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status. 42 OP_FAILED, 43 OP_TIMEOUT, 44 OP_PERMISSION_CHECK_FAILED, 45 OP_COMM_ABNORMAL, 46 OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local 47 OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error 48 OP_BUSY_FAILURE, 49 OP_SCHEMA_INCOMPATIBLE, 50 OP_QUERY_FORMAT_FAILURE, 51 OP_QUERY_FIELD_FAILURE, 52 OP_NOT_SUPPORT, 53 OP_INTERCEPT_DATA_FAIL, 54 OP_MAX_LIMITS, 55 OP_SCHEMA_CHANGED, 56 OP_INVALID_ARGS, 57 OP_USER_CHANGED 58 }; 59 60 using UserCallback = std::function<void(std::map<std::string, int>)>; 61 using OnSyncFinished = std::function<void(int)>; 62 using OnSyncFinalize = std::function<void(void)>; 63 64 SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode, 65 const UserCallback &userCallback, bool isBlockSync); 66 67 DISABLE_COPY_ASSIGN_MOVE(SyncOperation); 68 69 // Init the status for callback 70 int Initialize(); 71 72 // Set the OnSyncFinalize callback 73 void SetOnSyncFinalize(const OnSyncFinalize &callback); 74 75 // Set the OnSyncFinished callback, it will be called either success or failed. 76 void SetOnSyncFinished(const OnSyncFinished &callback); 77 78 // Set the sync status, running or finished 79 void SetStatus(const std::string &deviceId, int status); 80 81 // Set the unfinished devices sync status, running or finished 82 void SetUnfinishedDevStatus(int status); 83 84 // Set the identifier, used in SyncOperation::Finished 85 void SetIdentifier(const std::vector<uint8_t> &identifier); 86 87 // Get the sync status, running or finished 88 int GetStatus(const std::string &deviceId) const; 89 90 // Get the sync id. 91 uint32_t GetSyncId() const; 92 93 // Get the sync mode 94 int GetMode() const; 95 96 // Used to call the onFinished and caller's on complete 97 void Finished(); 98 99 // Get the deviceId of this sync status 100 const std::vector<std::string> &GetDevices() const; 101 102 // Wait if it's a block sync 103 void WaitIfNeed(); 104 105 // Notify if it's a block sync 106 void NotifyIfNeed(); 107 108 // Return if this sync is auto sync 109 bool IsAutoSync() const; 110 111 // Return if this sync is block sync 112 bool IsBlockSync() const; 113 114 // Return if this sync is AUTO_SUBSCRIBE_QUERY 115 bool IsAutoControlCmd() const; 116 117 // Check if All devices sync finished. 118 bool CheckIsAllFinished() const; 119 120 // For query sync 121 void SetQuery(const QuerySyncObject &query); 122 QuerySyncObject GetQuery() const; 123 bool IsQuerySync() const; 124 std::string GetQueryId() const; 125 static SyncType GetSyncType(int mode); 126 static int TransferSyncMode(int mode); 127 128 static const std::map<int, DBStatus> &DBStatusTransMap(); 129 130 protected: 131 virtual ~SyncOperation(); 132 133 private: 134 DECLARE_OBJECT_TAG(SyncOperation); 135 136 // called by destruction 137 void Finalize(); 138 139 // Transfer sync mode from interface to inner 140 void TransferQuerySyncMode(); 141 142 // The device list 143 const std::vector<std::string> devices_; 144 145 // The Syncid 146 uint32_t syncId_; 147 148 // The sync mode_ see SyncMode 149 int mode_; 150 151 // The callback caller registered 152 UserCallback userCallback_; 153 154 // The callback caller registered, when sync timeout, call 155 OnSyncFinished onFinished_; 156 157 // The callback caller registered, will be called when destruction. 158 OnSyncFinalize onFinalize_; 159 160 // The device id we sync with 161 std::map<std::string, int> statuses_; 162 163 // Is this operation is a block sync 164 bool isBlockSync_; 165 166 // Is this operation is an auto sync 167 bool isAutoSync_; 168 169 // Is this operation has finished 170 bool isFinished_; 171 172 // Used for block sync 173 std::unique_ptr<SemaphoreUtils> semaphore_; 174 175 QuerySyncObject query_; 176 bool isQuerySync_; 177 178 bool isAutoSubscribe_; 179 180 // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished 181 std::string identifier_; 182 }; 183 } // namespace DistributedDB 184 185 #endif // SYNC_OPERATION_H 186