1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24 int mode, const UserCallback &userCallback, bool isBlockSync)
25 : devices_(devices),
26 syncId_(syncId),
27 mode_(mode),
28 userCallback_(userCallback),
29 isBlockSync_(isBlockSync),
30 isAutoSync_(false),
31 isFinished_(false),
32 semaphore_(nullptr),
33 query_(QuerySyncObject()),
34 isQuerySync_(false),
35 isAutoSubscribe_(false)
36 {
37 }
38
~SyncOperation()39 SyncOperation::~SyncOperation()
40 {
41 RefObject::DecObjRef(context_);
42 LOGD("SyncOperation::~SyncOperation()");
43 Finalize();
44 }
45
Initialize()46 int SyncOperation::Initialize()
47 {
48 LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
49 AutoLock lockGuard(this);
50 for (const std::string &deviceId : devices_) {
51 statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
52 }
53
54 if (mode_ == AUTO_PUSH) {
55 mode_ = PUSH;
56 isAutoSync_ = true;
57 } else if (mode_ == AUTO_PULL) {
58 mode_ = PULL;
59 isAutoSync_ = true;
60 } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
61 mode_ = SUBSCRIBE_QUERY;
62 isAutoSubscribe_ = true;
63 }
64 if (isBlockSync_) {
65 semaphore_ = std::make_unique<SemaphoreUtils>(0);
66 }
67
68 return E_OK;
69 }
70
SetOnSyncFinalize(const OnSyncFinalize & callback)71 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
72 {
73 onFinalize_ = callback;
74 }
75
SetOnSyncFinished(const OnSyncFinished & callback)76 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
77 {
78 onFinished_ = callback;
79 }
80
SetStatus(const std::string & deviceId,int status)81 void SyncOperation::SetStatus(const std::string &deviceId, int status)
82 {
83 LOGD("[SyncOperation] SetStatus dev %s{private} status %d", deviceId.c_str(), status);
84 AutoLock lockGuard(this);
85 if (IsKilled()) {
86 LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
87 return;
88 }
89 if (isFinished_) {
90 LOGI("[SyncOperation] SetStatus already finished");
91 return;
92 }
93
94 auto iter = statuses_.find(deviceId);
95 if (iter != statuses_.end()) {
96 if (iter->second >= OP_FINISHED_ALL) {
97 return;
98 }
99 iter->second = status;
100 return;
101 }
102 }
103
SetUnfinishedDevStatus(int status)104 void SyncOperation::SetUnfinishedDevStatus(int status)
105 {
106 LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
107 AutoLock lockGuard(this);
108 if (IsKilled()) {
109 LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
110 return;
111 }
112 if (isFinished_) {
113 LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
114 return;
115 }
116 for (auto &item : statuses_) {
117 if (item.second >= OP_FINISHED_ALL) {
118 continue;
119 }
120 item.second = status;
121 }
122 }
123
GetStatus(const std::string & deviceId) const124 int SyncOperation::GetStatus(const std::string &deviceId) const
125 {
126 AutoLock lockGuard(this);
127 auto iter = statuses_.find(deviceId);
128 if (iter != statuses_.end()) {
129 return iter->second;
130 }
131 return -E_INVALID_ARGS;
132 }
133
GetSyncId() const134 uint32_t SyncOperation::GetSyncId() const
135 {
136 return syncId_;
137 }
138
GetMode() const139 int SyncOperation::GetMode() const
140 {
141 return mode_;
142 }
143
Finished()144 void SyncOperation::Finished()
145 {
146 std::map<std::string, int> tmpStatus;
147 {
148 AutoLock lockGuard(this);
149 if (IsKilled() || isFinished_) {
150 return;
151 }
152 isFinished_ = true;
153 tmpStatus = statuses_;
154 }
155 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
156 if (performance != nullptr) {
157 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
158 }
159 if (userCallback_) {
160 std::string msg = GetFinishDetailMsg(tmpStatus);
161 LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
162 if (IsBlockSync()) {
163 userCallback_(tmpStatus);
164 } else {
165 RefObject::IncObjRef(this);
166 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
167 userCallback_(tmpStatus);
168 RefObject::DecObjRef(this);
169 });
170 if (errCode != E_OK) {
171 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
172 RefObject::DecObjRef(this);
173 }
174 }
175 }
176 if (onFinished_) {
177 LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
178 onFinished_(syncId_);
179 }
180 }
181
GetDevices() const182 const std::vector<std::string> &SyncOperation::GetDevices() const
183 {
184 return devices_;
185 }
186
WaitIfNeed()187 void SyncOperation::WaitIfNeed()
188 {
189 if (isBlockSync_ && (semaphore_ != nullptr)) {
190 LOGD("[SyncOperation] Wait.");
191 semaphore_->WaitSemaphore();
192 }
193 }
194
NotifyIfNeed()195 void SyncOperation::NotifyIfNeed()
196 {
197 if (isBlockSync_ && (semaphore_ != nullptr)) {
198 LOGD("[SyncOperation] Notify.");
199 semaphore_->SendSemaphore();
200 }
201 }
202
IsAutoSync() const203 bool SyncOperation::IsAutoSync() const
204 {
205 return isAutoSync_;
206 }
207
IsBlockSync() const208 bool SyncOperation::IsBlockSync() const
209 {
210 return isBlockSync_;
211 }
212
IsAutoControlCmd() const213 bool SyncOperation::IsAutoControlCmd() const
214 {
215 return isAutoSubscribe_;
216 }
217
SetSyncContext(RefObject * context)218 void SyncOperation::SetSyncContext(RefObject *context)
219 {
220 RefObject::DecObjRef(context_);
221 context_ = context;
222 RefObject::IncObjRef(context);
223 }
224
CheckIsAllFinished() const225 bool SyncOperation::CheckIsAllFinished() const
226 {
227 AutoLock lockGuard(this);
228 for (const auto &iter : statuses_) {
229 if (iter.second < OP_FINISHED_ALL) {
230 return false;
231 }
232 }
233 return true;
234 }
235
Finalize()236 void SyncOperation::Finalize()
237 {
238 if ((syncId_ > 0) && onFinalize_) {
239 LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
240 onFinalize_();
241 }
242 }
243
SetQuery(const QuerySyncObject & query)244 void SyncOperation::SetQuery(const QuerySyncObject &query)
245 {
246 std::lock_guard<std::mutex> lock(queryMutex_);
247 query_ = query;
248 isQuerySync_ = true;
249 if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
250 mode_ += QUERY_SYNC_MODE_BASE;
251 }
252 }
253
GetQuery(QuerySyncObject & targetObject) const254 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
255 {
256 std::lock_guard<std::mutex> lock(queryMutex_);
257 targetObject = query_;
258 }
259
IsQuerySync() const260 bool SyncOperation::IsQuerySync() const
261 {
262 return isQuerySync_;
263 }
264
SetIdentifier(const std::vector<uint8_t> & identifier)265 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
266 {
267 identifier_.assign(identifier.begin(), identifier.end());
268 }
269
270 namespace {
271 struct SyncTypeNode {
272 int mode = static_cast<int>(SyncModeType::INVALID_MODE);
273 SyncType type = SyncType::INVALID_SYNC_TYPE;
274 };
275 struct SyncOperationStatusNode {
276 int operationStatus = 0;
277 DBStatus status = DBStatus::DB_ERROR;
278 };
279 }
280
GetSyncType(int mode)281 SyncType SyncOperation::GetSyncType(int mode)
282 {
283 static const SyncTypeNode syncTypeNodes[] = {
284 {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
285 {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
286 {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
287 {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
288 {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
289 {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
290 {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
291 {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
292 {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
293 };
294 const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
295 return node.mode == mode;
296 });
297 return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
298 }
299
TransferSyncMode(int mode)300 int SyncOperation::TransferSyncMode(int mode)
301 {
302 // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
303 // so for the three mode, it is no need to transferred.
304 if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
305 return (mode - QUERY_SYNC_MODE_BASE);
306 }
307 return mode;
308 }
309
GetQueryId() const310 std::string SyncOperation::GetQueryId() const
311 {
312 std::lock_guard<std::mutex> lock(queryMutex_);
313 return query_.GetIdentify();
314 }
315
DBStatusTrans(int operationStatus)316 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
317 {
318 static const SyncOperationStatusNode syncOperationStatusNodes[] = {
319 { static_cast<int>(OP_FINISHED_ALL), OK },
320 { static_cast<int>(OP_TIMEOUT), TIME_OUT },
321 { static_cast<int>(OP_PERMISSION_CHECK_FAILED), PERMISSION_CHECK_FORBID_SYNC },
322 { static_cast<int>(OP_COMM_ABNORMAL), COMM_FAILURE },
323 { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
324 { static_cast<int>(OP_EKEYREVOKED_FAILURE), EKEYREVOKED_ERROR },
325 { static_cast<int>(OP_SCHEMA_INCOMPATIBLE), SCHEMA_MISMATCH },
326 { static_cast<int>(OP_BUSY_FAILURE), BUSY },
327 { static_cast<int>(OP_QUERY_FORMAT_FAILURE), INVALID_QUERY_FORMAT },
328 { static_cast<int>(OP_QUERY_FIELD_FAILURE), INVALID_QUERY_FIELD },
329 { static_cast<int>(OP_NOT_SUPPORT), NOT_SUPPORT },
330 { static_cast<int>(OP_INTERCEPT_DATA_FAIL), INTERCEPT_DATA_FAIL },
331 { static_cast<int>(OP_MAX_LIMITS), OVER_MAX_LIMITS },
332 { static_cast<int>(OP_SCHEMA_CHANGED), DISTRIBUTED_SCHEMA_CHANGED },
333 { static_cast<int>(OP_INVALID_ARGS), INVALID_ARGS },
334 { static_cast<int>(OP_USER_CHANGED), USER_CHANGED },
335 { static_cast<int>(OP_DENIED_SQL), NO_PERMISSION },
336 { static_cast<int>(OP_NOTADB_OR_CORRUPTED), INVALID_PASSWD_OR_CORRUPTED_DB },
337 };
338 const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
339 [operationStatus](const auto &node) {
340 return node.operationStatus == operationStatus;
341 });
342 return result == std::end(syncOperationStatusNodes) ? DB_ERROR : result->status;
343 }
344
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)345 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
346 {
347 std::string msg = "Sync detail is:";
348 for (const auto &[dev, status]: finishStatus) {
349 msg += "dev=" + DBCommon::StringMasking(dev);
350 if (status > static_cast<int>(OP_FINISHED_ALL)) {
351 msg += " sync failed, reason is " + std::to_string(status);
352 } else {
353 msg += " sync success";
354 }
355 msg += " ";
356 }
357 msg.pop_back();
358 return msg;
359 }
360 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
361 } // namespace DistributedDB