• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_OPERATION_H
17 #define SYNC_OPERATION_H
18 
19 #include <functional>
20 #include <map>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
25 #include "ikvdb_sync_interface.h"
26 #include "isyncer.h"
27 #include "notification_chain.h"
28 #include "query_sync_object.h"
29 #include "ref_object.h"
30 #include "runtime_context.h"
31 #include "semaphore_utils.h"
32 #include "sync_types.h"
33 
34 namespace DistributedDB {
35 class SyncOperation : public RefObject {
36 public:
37     enum Status {
38         OP_WAITING = 0,
39         OP_SYNCING,
40         OP_SEND_FINISHED,
41         OP_RECV_FINISHED,
42         OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status.
43         OP_FAILED,
44         OP_TIMEOUT,
45         OP_PERMISSION_CHECK_FAILED,
46         OP_COMM_ABNORMAL,
47         OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local
48         OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error
49         OP_BUSY_FAILURE,
50         OP_SCHEMA_INCOMPATIBLE,
51         OP_QUERY_FORMAT_FAILURE,
52         OP_QUERY_FIELD_FAILURE,
53         OP_NOT_SUPPORT,
54         OP_INTERCEPT_DATA_FAIL,
55         OP_MAX_LIMITS,
56         OP_SCHEMA_CHANGED,
57         OP_INVALID_ARGS,
58         OP_USER_CHANGED,
59         OP_DENIED_SQL,
60         OP_NOTADB_OR_CORRUPTED,
61         OP_DB_CLOSING,
62     };
63 
64     using UserCallback = std::function<void(std::map<std::string, int>)>;
65     using OnSyncFinished = std::function<void(int)>;
66     using OnSyncFinalize = std::function<void(void)>;
67 
68     SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
69         const UserCallback &userCallback, bool isBlockSync);
70 
71     SyncOperation(uint32_t syncId, const ISyncer::SyncParam &param);
72 
73     DISABLE_COPY_ASSIGN_MOVE(SyncOperation);
74 
75     // Init the status for callback
76     int Initialize();
77 
78     // Set the OnSyncFinalize callback
79     void SetOnSyncFinalize(const OnSyncFinalize &callback);
80 
81     // Set the OnSyncFinished callback, it will be called either success or failed.
82     void SetOnSyncFinished(const OnSyncFinished &callback);
83 
84     // Set the sync status, running or finished
85     void SetStatus(const std::string &deviceId, int status, int commErrCode = E_OK);
86 
87     // Set the unfinished devices sync status, running or finished
88     void SetUnfinishedDevStatus(int status);
89 
90     // Set the identifier, used in SyncOperation::Finished
91     void SetIdentifier(const std::vector<uint8_t> &identifier);
92 
93     // Get the sync status, running or finished
94     int GetStatus(const std::string &deviceId) const;
95 
96     // Get the sync id.
97     uint32_t GetSyncId() const;
98 
99     // Get the sync mode
100     int GetMode() const;
101 
102     // Used to call the onFinished and caller's on complete
103     void Finished();
104 
105     // Get the deviceId of this sync status
106     const std::vector<std::string> &GetDevices() const;
107 
108     // Wait if it's a block sync
109     void WaitIfNeed();
110 
111     // Notify if it's a block sync
112     void NotifyIfNeed();
113 
114     // Return if this sync is auto sync
115     bool IsAutoSync() const;
116 
117     // Return if this sync is block sync
118     bool IsBlockSync() const;
119 
120     // Return if this sync is AUTO_SUBSCRIBE_QUERY
121     bool IsAutoControlCmd() const;
122 
123     // Check if All devices sync finished.
124     bool CheckIsAllFinished() const;
125 
126     bool IsRetryTask() const;
127 
128     // For query sync
129     void SetQuery(const QuerySyncObject &query);
130     void GetQuery(QuerySyncObject &targetObject) const;
131     bool IsQuerySync() const;
132     std::string GetQueryId() const;
133     static SyncType GetSyncType(int mode);
134     static int TransferSyncMode(int mode);
135 
136     static DBStatus DBStatusTrans(int operationStatus);
137 
138     static ProcessStatus DBStatusTransProcess(int operationStatus);
139 
140     void SetSyncContext(RefObject *context);
141 
142     bool CanCancel();
143 
144     void SetSyncProcessCallFun(DeviceSyncProcessCallback callBack);
145 
146     void SetSyncProcessTotal(const std::string &deviceId, uint32_t total);
147 
148     void UpdateFinishedCount(const std::string &deviceId, uint32_t count);
149 
150 protected:
151     virtual ~SyncOperation();
152 
153     RefObject *context_ = nullptr;
154 private:
155     DECLARE_OBJECT_TAG(SyncOperation);
156 
157     // called by destruction
158     void Finalize();
159 
160     static std::string GetFinishDetailMsg(const std::map<std::string, int> &finishStatus);
161 
162     void ReplaceCommErrCode(std::map<std::string, int> &finishStatus);
163 
164     // The device list
165     const std::vector<std::string> devices_;
166 
167     // The Syncid
168     uint32_t syncId_;
169 
170     // The sync mode_ see SyncMode
171     int mode_;
172 
173     // The callback caller registered
174     UserCallback userCallback_;
175 
176     // The callback caller registered, when sync timeout, call
177     OnSyncFinished onFinished_;
178 
179     // The callback caller registered, will be called when destruction.
180     OnSyncFinalize onFinalize_;
181 
182     // The syncProcess callback caller registered
183     DeviceSyncProcessCallback userSyncProcessCallback_;
184 
185     // The device id we sync with
186     std::map<std::string, int> statuses_;
187 
188     // passthrough errCode
189     std::map<std::string, int> commErrCodeMap_;
190 
191     // Is this operation is a block sync
192     volatile bool isBlockSync_;
193 
194     // Is this operation is an auto sync
195     volatile bool isAutoSync_;
196 
197     // Is this operation has finished
198     volatile bool isFinished_;
199 
200     // Used for block sync
201     std::unique_ptr<SemaphoreUtils> semaphore_;
202 
203     mutable std::mutex queryMutex_;
204     QuerySyncObject query_;
205     volatile bool isQuerySync_;
206 
207     volatile bool isAutoSubscribe_;
208 
209     volatile bool isRetry_;
210 
211     // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished
212     std::string identifier_;
213 
214     // The device id we syncProcess with
215     std::map<std::string, DeviceSyncProcess> syncProcessMap_;
216 
217     // Can be cancelled
218     bool canCancel_ = false;
219 
220     void ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap);
221 };
222 } // namespace DistributedDB
223 
224 #endif  // SYNC_OPERATION_H
225