1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24 int mode, const UserCallback &userCallback, bool isBlockSync)
25 : devices_(devices),
26 syncId_(syncId),
27 mode_(mode),
28 userCallback_(userCallback),
29 isBlockSync_(isBlockSync),
30 isAutoSync_(false),
31 isFinished_(false),
32 semaphore_(nullptr),
33 query_(QuerySyncObject()),
34 isQuerySync_(false),
35 isAutoSubscribe_(false),
36 isRetry_(true)
37 {
38 }
39
SyncOperation(uint32_t syncId,const ISyncer::SyncParam & param)40 SyncOperation::SyncOperation(uint32_t syncId, const ISyncer::SyncParam ¶m)
41 : devices_(param.devices),
42 syncId_(syncId),
43 mode_(param.mode),
44 userCallback_(param.onComplete),
45 isBlockSync_(param.wait),
46 isAutoSync_(false),
47 isFinished_(false),
48 semaphore_(nullptr),
49 query_(QuerySyncObject()),
50 isQuerySync_(false),
51 isAutoSubscribe_(false),
52 isRetry_(param.isRetry)
53 {
54 }
55
~SyncOperation()56 SyncOperation::~SyncOperation()
57 {
58 RefObject::DecObjRef(context_);
59 LOGD("SyncOperation::~SyncOperation()");
60 Finalize();
61 }
62
Initialize()63 int SyncOperation::Initialize()
64 {
65 LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
66 std::map<std::string, DeviceSyncProcess> tempSyncProcessMap;
67 {
68 AutoLock lockGuard(this);
69 for (const std::string &deviceId : devices_) {
70 statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
71 DeviceSyncProcess processInfo;
72 processInfo.errCode = static_cast<DBStatus>(OP_WAITING);
73 processInfo.syncId = syncId_;
74 syncProcessMap_.insert(std::pair<std::string, DeviceSyncProcess>(deviceId, processInfo));
75 }
76
77 if (mode_ == AUTO_PUSH) {
78 mode_ = PUSH;
79 isAutoSync_ = true;
80 } else if (mode_ == AUTO_PULL) {
81 mode_ = PULL;
82 isAutoSync_ = true;
83 } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
84 mode_ = SUBSCRIBE_QUERY;
85 isAutoSubscribe_ = true;
86 }
87 if (isBlockSync_) {
88 semaphore_ = std::make_unique<SemaphoreUtils>(0);
89 }
90 tempSyncProcessMap = syncProcessMap_;
91 }
92 if (userSyncProcessCallback_) {
93 ExeSyncProcessCallFun(tempSyncProcessMap);
94 }
95
96 return E_OK;
97 }
98
SetOnSyncFinalize(const OnSyncFinalize & callback)99 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
100 {
101 onFinalize_ = callback;
102 }
103
SetOnSyncFinished(const OnSyncFinished & callback)104 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
105 {
106 onFinished_ = callback;
107 }
108
SetStatus(const std::string & deviceId,int status,int commErrCode)109 void SyncOperation::SetStatus(const std::string &deviceId, int status, int commErrCode)
110 {
111 LOGD("[SyncOperation] SetStatus dev %s{private} status %d commErrCode %d", deviceId.c_str(), status, commErrCode);
112 AutoLock lockGuard(this);
113 if (IsKilled()) {
114 LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
115 return;
116 }
117 if (isFinished_) {
118 LOGI("[SyncOperation] SetStatus already finished");
119 return;
120 }
121
122 if (userSyncProcessCallback_) {
123 if (syncProcessMap_[deviceId].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
124 syncProcessMap_[deviceId].errCode = static_cast<DBStatus>(status);
125 }
126 }
127
128 auto iter = statuses_.find(deviceId);
129 if (iter != statuses_.end()) {
130 if (iter->second >= OP_FINISHED_ALL) {
131 return;
132 }
133 iter->second = status;
134 if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
135 return;
136 }
137 commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
138 }
139 }
140
SetUnfinishedDevStatus(int status)141 void SyncOperation::SetUnfinishedDevStatus(int status)
142 {
143 LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
144 AutoLock lockGuard(this);
145 if (IsKilled()) {
146 LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
147 return;
148 }
149 if (isFinished_) {
150 LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
151 return;
152 }
153 for (auto &item : statuses_) {
154 if (item.second >= OP_FINISHED_ALL) {
155 continue;
156 }
157 item.second = status;
158 }
159 }
160
GetStatus(const std::string & deviceId) const161 int SyncOperation::GetStatus(const std::string &deviceId) const
162 {
163 AutoLock lockGuard(this);
164 auto iter = statuses_.find(deviceId);
165 if (iter != statuses_.end()) {
166 return iter->second;
167 }
168 return -E_INVALID_ARGS;
169 }
170
GetSyncId() const171 uint32_t SyncOperation::GetSyncId() const
172 {
173 return syncId_;
174 }
175
GetMode() const176 int SyncOperation::GetMode() const
177 {
178 return mode_;
179 }
180
ReplaceCommErrCode(std::map<std::string,int> & finishStatus)181 void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
182 {
183 for (auto &item : finishStatus) {
184 if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
185 continue;
186 }
187 std::string deviceId = item.first;
188 auto iter = commErrCodeMap_.find(deviceId);
189 if (iter != commErrCodeMap_.end()) {
190 item.second = iter->second;
191 }
192 }
193 }
194
Finished()195 void SyncOperation::Finished()
196 {
197 std::map<std::string, int> tmpStatus;
198 std::map<std::string, DeviceSyncProcess> tmpProcessMap;
199 {
200 AutoLock lockGuard(this);
201 if (IsKilled() || isFinished_) {
202 return;
203 }
204 isFinished_ = true;
205 tmpStatus = statuses_;
206 tmpProcessMap = syncProcessMap_;
207 ReplaceCommErrCode(tmpStatus);
208 }
209 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
210 if (performance != nullptr) {
211 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
212 }
213 if (userCallback_) {
214 std::string msg = GetFinishDetailMsg(tmpStatus);
215 LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
216 if (IsBlockSync()) {
217 userCallback_(tmpStatus);
218 } else {
219 RefObject::IncObjRef(this);
220 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
221 userCallback_(tmpStatus);
222 RefObject::DecObjRef(this);
223 });
224 if (errCode != E_OK) {
225 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
226 RefObject::DecObjRef(this);
227 }
228 }
229 }
230
231 if (userSyncProcessCallback_) {
232 ExeSyncProcessCallFun(tmpProcessMap);
233 }
234
235 if (onFinished_) {
236 LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
237 onFinished_(syncId_);
238 }
239 }
240
GetDevices() const241 const std::vector<std::string> &SyncOperation::GetDevices() const
242 {
243 return devices_;
244 }
245
WaitIfNeed()246 void SyncOperation::WaitIfNeed()
247 {
248 if (isBlockSync_ && (semaphore_ != nullptr)) {
249 LOGD("[SyncOperation] Wait.");
250 semaphore_->WaitSemaphore();
251 }
252 }
253
NotifyIfNeed()254 void SyncOperation::NotifyIfNeed()
255 {
256 if (isBlockSync_ && (semaphore_ != nullptr)) {
257 LOGD("[SyncOperation] Notify.");
258 semaphore_->SendSemaphore();
259 }
260 }
261
IsAutoSync() const262 bool SyncOperation::IsAutoSync() const
263 {
264 return isAutoSync_;
265 }
266
IsBlockSync() const267 bool SyncOperation::IsBlockSync() const
268 {
269 return isBlockSync_;
270 }
271
IsAutoControlCmd() const272 bool SyncOperation::IsAutoControlCmd() const
273 {
274 return isAutoSubscribe_;
275 }
276
SetSyncContext(RefObject * context)277 void SyncOperation::SetSyncContext(RefObject *context)
278 {
279 RefObject::DecObjRef(context_);
280 context_ = context;
281 RefObject::IncObjRef(context);
282 }
283
CanCancel()284 bool SyncOperation::CanCancel()
285 {
286 return canCancel_;
287 }
288
SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)289 void SyncOperation::SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)
290 {
291 if (callBack) {
292 canCancel_ = true;
293 this->userSyncProcessCallback_ = callBack;
294 }
295 }
296
ExeSyncProcessCallFun(const std::map<std::string,DeviceSyncProcess> & syncProcessMap)297 void SyncOperation::ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap)
298 {
299 if (IsBlockSync()) {
300 userSyncProcessCallback_(syncProcessMap);
301 } else {
302 RefObject::IncObjRef(this);
303 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, syncProcessMap] {
304 userSyncProcessCallback_(syncProcessMap);
305 RefObject::DecObjRef(this);
306 });
307 if (errCode != E_OK) {
308 LOGE("[SyncOperation] ExeSyncProcessCallFun retCode:%d", errCode);
309 RefObject::DecObjRef(this);
310 }
311 }
312 }
313
UpdateFinishedCount(const std::string & deviceId,uint32_t count)314 void SyncOperation::UpdateFinishedCount(const std::string &deviceId, uint32_t count)
315 {
316 if (this->userSyncProcessCallback_) {
317 std::map<std::string, DeviceSyncProcess> tmpMap;
318 {
319 AutoLock lockGuard(this);
320 if (IsKilled()) {
321 return;
322 }
323 LOGD("[UpdateFinishedCount] deviceId %s{private} count %u", deviceId.c_str(), count);
324 this->syncProcessMap_[deviceId].pullInfo.finishedCount += count;
325 tmpMap = this->syncProcessMap_;
326 }
327 ExeSyncProcessCallFun(tmpMap);
328 }
329 }
330
SetSyncProcessTotal(const std::string & deviceId,uint32_t total)331 void SyncOperation::SetSyncProcessTotal(const std::string &deviceId, uint32_t total)
332 {
333 if (this->userSyncProcessCallback_) {
334 {
335 AutoLock lockGuard(this);
336 if (IsKilled()) {
337 return;
338 }
339 LOGD("[SetSyncProcessTotal] total=%u, syncId=%u, deviceId=%s{private}", total, syncId_, deviceId.c_str());
340 this->syncProcessMap_[deviceId].pullInfo.total = total;
341 }
342 }
343 }
344
CheckIsAllFinished() const345 bool SyncOperation::CheckIsAllFinished() const
346 {
347 AutoLock lockGuard(this);
348 for (const auto &iter : statuses_) {
349 if (iter.second < OP_FINISHED_ALL) {
350 return false;
351 }
352 }
353 return true;
354 }
355
Finalize()356 void SyncOperation::Finalize()
357 {
358 if ((syncId_ > 0) && onFinalize_) {
359 LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
360 onFinalize_();
361 }
362 }
363
SetQuery(const QuerySyncObject & query)364 void SyncOperation::SetQuery(const QuerySyncObject &query)
365 {
366 std::lock_guard<std::mutex> lock(queryMutex_);
367 query_ = query;
368 isQuerySync_ = true;
369 if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
370 mode_ += QUERY_SYNC_MODE_BASE;
371 }
372 }
373
GetQuery(QuerySyncObject & targetObject) const374 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
375 {
376 std::lock_guard<std::mutex> lock(queryMutex_);
377 targetObject = query_;
378 }
379
IsQuerySync() const380 bool SyncOperation::IsQuerySync() const
381 {
382 return isQuerySync_;
383 }
384
SetIdentifier(const std::vector<uint8_t> & identifier)385 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
386 {
387 identifier_.assign(identifier.begin(), identifier.end());
388 }
389
390 namespace {
391 struct SyncTypeNode {
392 int mode = static_cast<int>(SyncModeType::INVALID_MODE);
393 SyncType type = SyncType::INVALID_SYNC_TYPE;
394 };
395 struct SyncOperationStatusNode {
396 int operationStatus = 0;
397 DBStatus status = DBStatus::DB_ERROR;
398 };
399 struct SyncOperationProcessStatus {
400 int operationStatus;
401 ProcessStatus proStatus;
402 };
403 }
404
GetSyncType(int mode)405 SyncType SyncOperation::GetSyncType(int mode)
406 {
407 static const SyncTypeNode syncTypeNodes[] = {
408 {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
409 {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
410 {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
411 {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
412 {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
413 {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
414 {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
415 {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
416 {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
417 };
418 const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
419 return node.mode == mode;
420 });
421 return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
422 }
423
TransferSyncMode(int mode)424 int SyncOperation::TransferSyncMode(int mode)
425 {
426 // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
427 // so for the three mode, it is no need to transferred.
428 if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
429 return (mode - QUERY_SYNC_MODE_BASE);
430 }
431 return mode;
432 }
433
GetQueryId() const434 std::string SyncOperation::GetQueryId() const
435 {
436 std::lock_guard<std::mutex> lock(queryMutex_);
437 return query_.GetIdentify();
438 }
439
DBStatusTrans(int operationStatus)440 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
441 {
442 static const SyncOperationStatusNode syncOperationStatusNodes[] = {
443 { static_cast<int>(OP_FINISHED_ALL), OK },
444 { static_cast<int>(OP_WAITING), OK },
445 { static_cast<int>(OP_SYNCING), OK },
446 { static_cast<int>(OP_SEND_FINISHED), OK },
447 { static_cast<int>(OP_RECV_FINISHED), OK },
448 { static_cast<int>(OP_TIMEOUT), TIME_OUT },
449 { static_cast<int>(OP_PERMISSION_CHECK_FAILED), PERMISSION_CHECK_FORBID_SYNC },
450 { static_cast<int>(OP_COMM_ABNORMAL), COMM_FAILURE },
451 { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
452 { static_cast<int>(OP_EKEYREVOKED_FAILURE), EKEYREVOKED_ERROR },
453 { static_cast<int>(OP_SCHEMA_INCOMPATIBLE), SCHEMA_MISMATCH },
454 { static_cast<int>(OP_BUSY_FAILURE), BUSY },
455 { static_cast<int>(OP_QUERY_FORMAT_FAILURE), INVALID_QUERY_FORMAT },
456 { static_cast<int>(OP_QUERY_FIELD_FAILURE), INVALID_QUERY_FIELD },
457 { static_cast<int>(OP_NOT_SUPPORT), NOT_SUPPORT },
458 { static_cast<int>(OP_INTERCEPT_DATA_FAIL), INTERCEPT_DATA_FAIL },
459 { static_cast<int>(OP_MAX_LIMITS), OVER_MAX_LIMITS },
460 { static_cast<int>(OP_SCHEMA_CHANGED), DISTRIBUTED_SCHEMA_CHANGED },
461 { static_cast<int>(OP_INVALID_ARGS), INVALID_ARGS },
462 { static_cast<int>(OP_USER_CHANGED), USER_CHANGED },
463 { static_cast<int>(OP_DENIED_SQL), NO_PERMISSION },
464 { static_cast<int>(OP_NOTADB_OR_CORRUPTED), INVALID_PASSWD_OR_CORRUPTED_DB },
465 { static_cast<int>(OP_DB_CLOSING), OK },
466 { static_cast<int>(OP_FAILED), DB_ERROR },
467 };
468 const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
469 [operationStatus](const auto &node) {
470 return node.operationStatus == operationStatus;
471 });
472 return result == std::end(syncOperationStatusNodes) ? static_cast<DBStatus>(operationStatus) : result->status;
473 }
474
DBStatusTransProcess(int operationStatus)475 ProcessStatus SyncOperation::DBStatusTransProcess(int operationStatus)
476 {
477 static const SyncOperationProcessStatus syncOperationProcessStatus[] = {
478 { static_cast<int>(OP_WAITING), PREPARED },
479 { static_cast<int>(OP_SYNCING), PROCESSING },
480 { static_cast<int>(OP_SEND_FINISHED), PROCESSING },
481 { static_cast<int>(OP_RECV_FINISHED), PROCESSING },
482 { static_cast<int>(OP_FINISHED_ALL), FINISHED },
483 { static_cast<int>(OP_COMM_ABNORMAL), FINISHED },
484 };
485 const auto &result = std::find_if(std::begin(syncOperationProcessStatus), std::end(syncOperationProcessStatus),
486 [operationStatus](const auto &node) {
487 return node.operationStatus == operationStatus;
488 });
489 return result == std::end(syncOperationProcessStatus) ? FINISHED : result->proStatus;
490 }
491
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)492 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
493 {
494 std::string msg = "Sync detail is:";
495 for (const auto &[dev, status]: finishStatus) {
496 msg += "dev=" + DBCommon::StringMasking(dev);
497 if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
498 msg += " sync failed, reason is " + std::to_string(status);
499 } else {
500 msg += " sync success";
501 }
502 msg += " ";
503 }
504 msg.pop_back();
505 return msg;
506 }
507
IsRetryTask() const508 bool SyncOperation::IsRetryTask() const
509 {
510 return isRetry_;
511 }
512 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
513 } // namespace DistributedDB