• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_OPERATION_H
17 #define SYNC_OPERATION_H
18 
19 #include <functional>
20 #include <map>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
25 #include "ikvdb_sync_interface.h"
26 #include "notification_chain.h"
27 #include "query_sync_object.h"
28 #include "ref_object.h"
29 #include "runtime_context.h"
30 #include "semaphore_utils.h"
31 #include "sync_types.h"
32 
33 namespace DistributedDB {
34 class SyncOperation : public RefObject {
35 public:
36     enum Status {
37         OP_WAITING = 0,
38         OP_SYNCING,
39         OP_SEND_FINISHED,
40         OP_RECV_FINISHED,
41         OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status.
42         OP_FAILED,
43         OP_TIMEOUT,
44         OP_PERMISSION_CHECK_FAILED,
45         OP_COMM_ABNORMAL,
46         OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local
47         OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error
48         OP_BUSY_FAILURE,
49         OP_SCHEMA_INCOMPATIBLE,
50         OP_QUERY_FORMAT_FAILURE,
51         OP_QUERY_FIELD_FAILURE,
52         OP_NOT_SUPPORT,
53         OP_INTERCEPT_DATA_FAIL,
54         OP_MAX_LIMITS,
55         OP_SCHEMA_CHANGED,
56         OP_INVALID_ARGS,
57         OP_USER_CHANGED,
58         OP_DENIED_SQL,
59         OP_NOTADB_OR_CORRUPTED,
60     };
61 
62     using UserCallback = std::function<void(std::map<std::string, int>)>;
63     using OnSyncFinished = std::function<void(int)>;
64     using OnSyncFinalize = std::function<void(void)>;
65 
66     SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
67         const UserCallback &userCallback, bool isBlockSync);
68 
69     DISABLE_COPY_ASSIGN_MOVE(SyncOperation);
70 
71     // Init the status for callback
72     int Initialize();
73 
74     // Set the OnSyncFinalize callback
75     void SetOnSyncFinalize(const OnSyncFinalize &callback);
76 
77     // Set the OnSyncFinished callback, it will be called either success or failed.
78     void SetOnSyncFinished(const OnSyncFinished &callback);
79 
80     // Set the sync status, running or finished
81     void SetStatus(const std::string &deviceId, int status, int commErrCode = E_OK);
82 
83     // Set the unfinished devices sync status, running or finished
84     void SetUnfinishedDevStatus(int status);
85 
86     // Set the identifier, used in SyncOperation::Finished
87     void SetIdentifier(const std::vector<uint8_t> &identifier);
88 
89     // Get the sync status, running or finished
90     int GetStatus(const std::string &deviceId) const;
91 
92     // Get the sync id.
93     uint32_t GetSyncId() const;
94 
95     // Get the sync mode
96     int GetMode() const;
97 
98     // Used to call the onFinished and caller's on complete
99     void Finished();
100 
101     // Get the deviceId of this sync status
102     const std::vector<std::string> &GetDevices() const;
103 
104     // Wait if it's a block sync
105     void WaitIfNeed();
106 
107     // Notify if it's a block sync
108     void NotifyIfNeed();
109 
110     // Return if this sync is auto sync
111     bool IsAutoSync() const;
112 
113     // Return if this sync is block sync
114     bool IsBlockSync() const;
115 
116     // Return if this sync is AUTO_SUBSCRIBE_QUERY
117     bool IsAutoControlCmd() const;
118 
119     // Check if All devices sync finished.
120     bool CheckIsAllFinished() const;
121 
122     // For query sync
123     void SetQuery(const QuerySyncObject &query);
124     void GetQuery(QuerySyncObject &targetObject) const;
125     bool IsQuerySync() const;
126     std::string GetQueryId() const;
127     static SyncType GetSyncType(int mode);
128     static int TransferSyncMode(int mode);
129 
130     static DBStatus DBStatusTrans(int operationStatus);
131 
132     static ProcessStatus DBStatusTransProcess(int operationStatus);
133 
134     void SetSyncContext(RefObject *context);
135 
136     bool CanCancel();
137 
138     void SetSyncProcessCallFun(DeviceSyncProcessCallback callBack);
139 
140     void SetSyncProcessTotal(const std::string &deviceId, uint32_t total);
141 
142     void UpdateFinishedCount(const std::string &deviceId, uint32_t count);
143 
144 protected:
145     virtual ~SyncOperation();
146 
147     RefObject *context_ = nullptr;
148 private:
149     DECLARE_OBJECT_TAG(SyncOperation);
150 
151     // called by destruction
152     void Finalize();
153 
154     static std::string GetFinishDetailMsg(const std::map<std::string, int> &finishStatus);
155 
156     void ReplaceCommErrCode(std::map<std::string, int> &finishStatus);
157 
158     // The device list
159     const std::vector<std::string> devices_;
160 
161     // The Syncid
162     uint32_t syncId_;
163 
164     // The sync mode_ see SyncMode
165     int mode_;
166 
167     // The callback caller registered
168     UserCallback userCallback_;
169 
170     // The callback caller registered, when sync timeout, call
171     OnSyncFinished onFinished_;
172 
173     // The callback caller registered, will be called when destruction.
174     OnSyncFinalize onFinalize_;
175 
176     // The syncProcess callback caller registered
177     DeviceSyncProcessCallback userSyncProcessCallback_;
178 
179     // The device id we sync with
180     std::map<std::string, int> statuses_;
181 
182     // passthrough errCode
183     std::map<std::string, int> commErrCodeMap_;
184 
185     // Is this operation is a block sync
186     volatile bool isBlockSync_;
187 
188     // Is this operation is an auto sync
189     volatile bool isAutoSync_;
190 
191     // Is this operation has finished
192     volatile bool isFinished_;
193 
194     // Used for block sync
195     std::unique_ptr<SemaphoreUtils> semaphore_;
196 
197     mutable std::mutex queryMutex_;
198     QuerySyncObject query_;
199     volatile bool isQuerySync_;
200 
201     volatile bool isAutoSubscribe_;
202 
203     // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished
204     std::string identifier_;
205 
206     // The device id we syncProcess with
207     std::map<std::string, DeviceSyncProcess> syncProcessMap_;
208 
209     // Can be cancelled
210     bool canCancel_ = false;
211 
212     void ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap);
213 };
214 } // namespace DistributedDB
215 
216 #endif  // SYNC_OPERATION_H
217