1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24 int mode, const UserCallback &userCallback, bool isBlockSync)
25 : devices_(devices),
26 syncId_(syncId),
27 mode_(mode),
28 userCallback_(userCallback),
29 isBlockSync_(isBlockSync),
30 isAutoSync_(false),
31 isFinished_(false),
32 semaphore_(nullptr),
33 query_(QuerySyncObject()),
34 isQuerySync_(false),
35 isAutoSubscribe_(false)
36 {
37 }
38
~SyncOperation()39 SyncOperation::~SyncOperation()
40 {
41 RefObject::DecObjRef(context_);
42 LOGD("SyncOperation::~SyncOperation()");
43 Finalize();
44 }
45
Initialize()46 int SyncOperation::Initialize()
47 {
48 LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
49 std::map<std::string, DeviceSyncProcess> tempSyncProcessMap;
50 {
51 AutoLock lockGuard(this);
52 for (const std::string &deviceId : devices_) {
53 statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
54 DeviceSyncProcess processInfo;
55 processInfo.errCode = static_cast<DBStatus>(OP_WAITING);
56 processInfo.syncId = syncId_;
57 syncProcessMap_.insert(std::pair<std::string, DeviceSyncProcess>(deviceId, processInfo));
58 }
59
60 if (mode_ == AUTO_PUSH) {
61 mode_ = PUSH;
62 isAutoSync_ = true;
63 } else if (mode_ == AUTO_PULL) {
64 mode_ = PULL;
65 isAutoSync_ = true;
66 } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
67 mode_ = SUBSCRIBE_QUERY;
68 isAutoSubscribe_ = true;
69 }
70 if (isBlockSync_) {
71 semaphore_ = std::make_unique<SemaphoreUtils>(0);
72 }
73 tempSyncProcessMap = syncProcessMap_;
74 }
75 if (userSyncProcessCallback_) {
76 ExeSyncProcessCallFun(tempSyncProcessMap);
77 }
78
79 return E_OK;
80 }
81
SetOnSyncFinalize(const OnSyncFinalize & callback)82 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
83 {
84 onFinalize_ = callback;
85 }
86
SetOnSyncFinished(const OnSyncFinished & callback)87 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
88 {
89 onFinished_ = callback;
90 }
91
SetStatus(const std::string & deviceId,int status,int commErrCode)92 void SyncOperation::SetStatus(const std::string &deviceId, int status, int commErrCode)
93 {
94 LOGD("[SyncOperation] SetStatus dev %s{private} status %d commErrCode %d", deviceId.c_str(), status, commErrCode);
95 AutoLock lockGuard(this);
96 if (IsKilled()) {
97 LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
98 return;
99 }
100 if (isFinished_) {
101 LOGI("[SyncOperation] SetStatus already finished");
102 return;
103 }
104
105 if (userSyncProcessCallback_) {
106 if (syncProcessMap_[deviceId].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
107 syncProcessMap_[deviceId].errCode = static_cast<DBStatus>(status);
108 }
109 }
110
111 auto iter = statuses_.find(deviceId);
112 if (iter != statuses_.end()) {
113 if (iter->second >= OP_FINISHED_ALL) {
114 return;
115 }
116 iter->second = status;
117 if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
118 return;
119 }
120 commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
121 }
122 }
123
SetUnfinishedDevStatus(int status)124 void SyncOperation::SetUnfinishedDevStatus(int status)
125 {
126 LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
127 AutoLock lockGuard(this);
128 if (IsKilled()) {
129 LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
130 return;
131 }
132 if (isFinished_) {
133 LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
134 return;
135 }
136 for (auto &item : statuses_) {
137 if (item.second >= OP_FINISHED_ALL) {
138 continue;
139 }
140 item.second = status;
141 }
142 }
143
GetStatus(const std::string & deviceId) const144 int SyncOperation::GetStatus(const std::string &deviceId) const
145 {
146 AutoLock lockGuard(this);
147 auto iter = statuses_.find(deviceId);
148 if (iter != statuses_.end()) {
149 return iter->second;
150 }
151 return -E_INVALID_ARGS;
152 }
153
GetSyncId() const154 uint32_t SyncOperation::GetSyncId() const
155 {
156 return syncId_;
157 }
158
GetMode() const159 int SyncOperation::GetMode() const
160 {
161 return mode_;
162 }
163
ReplaceCommErrCode(std::map<std::string,int> & finishStatus)164 void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
165 {
166 for (auto &item : finishStatus) {
167 if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
168 continue;
169 }
170 std::string deviceId = item.first;
171 auto iter = commErrCodeMap_.find(deviceId);
172 if (iter != commErrCodeMap_.end()) {
173 item.second = iter->second;
174 }
175 }
176 }
177
Finished()178 void SyncOperation::Finished()
179 {
180 std::map<std::string, int> tmpStatus;
181 std::map<std::string, DeviceSyncProcess> tmpProcessMap;
182 {
183 AutoLock lockGuard(this);
184 if (IsKilled() || isFinished_) {
185 return;
186 }
187 isFinished_ = true;
188 tmpStatus = statuses_;
189 tmpProcessMap = syncProcessMap_;
190 ReplaceCommErrCode(tmpStatus);
191 }
192 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
193 if (performance != nullptr) {
194 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
195 }
196 if (userCallback_) {
197 std::string msg = GetFinishDetailMsg(tmpStatus);
198 LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
199 if (IsBlockSync()) {
200 userCallback_(tmpStatus);
201 } else {
202 RefObject::IncObjRef(this);
203 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
204 userCallback_(tmpStatus);
205 RefObject::DecObjRef(this);
206 });
207 if (errCode != E_OK) {
208 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
209 RefObject::DecObjRef(this);
210 }
211 }
212 }
213
214 if (userSyncProcessCallback_) {
215 ExeSyncProcessCallFun(tmpProcessMap);
216 }
217
218 if (onFinished_) {
219 LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
220 onFinished_(syncId_);
221 }
222 }
223
GetDevices() const224 const std::vector<std::string> &SyncOperation::GetDevices() const
225 {
226 return devices_;
227 }
228
WaitIfNeed()229 void SyncOperation::WaitIfNeed()
230 {
231 if (isBlockSync_ && (semaphore_ != nullptr)) {
232 LOGD("[SyncOperation] Wait.");
233 semaphore_->WaitSemaphore();
234 }
235 }
236
NotifyIfNeed()237 void SyncOperation::NotifyIfNeed()
238 {
239 if (isBlockSync_ && (semaphore_ != nullptr)) {
240 LOGD("[SyncOperation] Notify.");
241 semaphore_->SendSemaphore();
242 }
243 }
244
IsAutoSync() const245 bool SyncOperation::IsAutoSync() const
246 {
247 return isAutoSync_;
248 }
249
IsBlockSync() const250 bool SyncOperation::IsBlockSync() const
251 {
252 return isBlockSync_;
253 }
254
IsAutoControlCmd() const255 bool SyncOperation::IsAutoControlCmd() const
256 {
257 return isAutoSubscribe_;
258 }
259
SetSyncContext(RefObject * context)260 void SyncOperation::SetSyncContext(RefObject *context)
261 {
262 RefObject::DecObjRef(context_);
263 context_ = context;
264 RefObject::IncObjRef(context);
265 }
266
CanCancel()267 bool SyncOperation::CanCancel()
268 {
269 return canCancel_;
270 }
271
SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)272 void SyncOperation::SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)
273 {
274 if (callBack) {
275 canCancel_ = true;
276 this->userSyncProcessCallback_ = callBack;
277 }
278 }
279
ExeSyncProcessCallFun(const std::map<std::string,DeviceSyncProcess> & syncProcessMap)280 void SyncOperation::ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap)
281 {
282 if (IsBlockSync()) {
283 userSyncProcessCallback_(syncProcessMap);
284 } else {
285 RefObject::IncObjRef(this);
286 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, syncProcessMap] {
287 userSyncProcessCallback_(syncProcessMap);
288 RefObject::DecObjRef(this);
289 });
290 if (errCode != E_OK) {
291 LOGE("[SyncOperation] ExeSyncProcessCallFun retCode:%d", errCode);
292 RefObject::DecObjRef(this);
293 }
294 }
295 }
296
UpdateFinishedCount(const std::string & deviceId,uint32_t count)297 void SyncOperation::UpdateFinishedCount(const std::string &deviceId, uint32_t count)
298 {
299 if (this->userSyncProcessCallback_) {
300 std::map<std::string, DeviceSyncProcess> tmpMap;
301 {
302 AutoLock lockGuard(this);
303 if (IsKilled()) {
304 return;
305 }
306 LOGD("[UpdateFinishedCount] deviceId %s{private} count %u", deviceId.c_str(), count);
307 this->syncProcessMap_[deviceId].pullInfo.finishedCount += count;
308 tmpMap = this->syncProcessMap_;
309 }
310 ExeSyncProcessCallFun(tmpMap);
311 }
312 }
313
SetSyncProcessTotal(const std::string & deviceId,uint32_t total)314 void SyncOperation::SetSyncProcessTotal(const std::string &deviceId, uint32_t total)
315 {
316 if (this->userSyncProcessCallback_) {
317 {
318 AutoLock lockGuard(this);
319 if (IsKilled()) {
320 return;
321 }
322 LOGD("[SetSyncProcessTotal] total=%u, syncId=%u, deviceId=%s{private}", total, syncId_, deviceId.c_str());
323 this->syncProcessMap_[deviceId].pullInfo.total = total;
324 }
325 }
326 }
327
CheckIsAllFinished() const328 bool SyncOperation::CheckIsAllFinished() const
329 {
330 AutoLock lockGuard(this);
331 for (const auto &iter : statuses_) {
332 if (iter.second < OP_FINISHED_ALL) {
333 return false;
334 }
335 }
336 return true;
337 }
338
Finalize()339 void SyncOperation::Finalize()
340 {
341 if ((syncId_ > 0) && onFinalize_) {
342 LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
343 onFinalize_();
344 }
345 }
346
SetQuery(const QuerySyncObject & query)347 void SyncOperation::SetQuery(const QuerySyncObject &query)
348 {
349 std::lock_guard<std::mutex> lock(queryMutex_);
350 query_ = query;
351 isQuerySync_ = true;
352 if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
353 mode_ += QUERY_SYNC_MODE_BASE;
354 }
355 }
356
GetQuery(QuerySyncObject & targetObject) const357 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
358 {
359 std::lock_guard<std::mutex> lock(queryMutex_);
360 targetObject = query_;
361 }
362
IsQuerySync() const363 bool SyncOperation::IsQuerySync() const
364 {
365 return isQuerySync_;
366 }
367
SetIdentifier(const std::vector<uint8_t> & identifier)368 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
369 {
370 identifier_.assign(identifier.begin(), identifier.end());
371 }
372
373 namespace {
374 struct SyncTypeNode {
375 int mode = static_cast<int>(SyncModeType::INVALID_MODE);
376 SyncType type = SyncType::INVALID_SYNC_TYPE;
377 };
378 struct SyncOperationStatusNode {
379 int operationStatus = 0;
380 DBStatus status = DBStatus::DB_ERROR;
381 };
382 struct SyncOperationProcessStatus {
383 int operationStatus;
384 ProcessStatus proStatus;
385 };
386 }
387
GetSyncType(int mode)388 SyncType SyncOperation::GetSyncType(int mode)
389 {
390 static const SyncTypeNode syncTypeNodes[] = {
391 {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
392 {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
393 {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
394 {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
395 {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
396 {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
397 {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
398 {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
399 {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
400 };
401 const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
402 return node.mode == mode;
403 });
404 return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
405 }
406
TransferSyncMode(int mode)407 int SyncOperation::TransferSyncMode(int mode)
408 {
409 // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
410 // so for the three mode, it is no need to transferred.
411 if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
412 return (mode - QUERY_SYNC_MODE_BASE);
413 }
414 return mode;
415 }
416
GetQueryId() const417 std::string SyncOperation::GetQueryId() const
418 {
419 std::lock_guard<std::mutex> lock(queryMutex_);
420 return query_.GetIdentify();
421 }
422
DBStatusTrans(int operationStatus)423 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
424 {
425 static const SyncOperationStatusNode syncOperationStatusNodes[] = {
426 { static_cast<int>(OP_FINISHED_ALL), OK },
427 { static_cast<int>(OP_WAITING), OK },
428 { static_cast<int>(OP_SYNCING), OK },
429 { static_cast<int>(OP_SEND_FINISHED), OK },
430 { static_cast<int>(OP_RECV_FINISHED), OK },
431 { static_cast<int>(OP_TIMEOUT), TIME_OUT },
432 { static_cast<int>(OP_PERMISSION_CHECK_FAILED), PERMISSION_CHECK_FORBID_SYNC },
433 { static_cast<int>(OP_COMM_ABNORMAL), COMM_FAILURE },
434 { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
435 { static_cast<int>(OP_EKEYREVOKED_FAILURE), EKEYREVOKED_ERROR },
436 { static_cast<int>(OP_SCHEMA_INCOMPATIBLE), SCHEMA_MISMATCH },
437 { static_cast<int>(OP_BUSY_FAILURE), BUSY },
438 { static_cast<int>(OP_QUERY_FORMAT_FAILURE), INVALID_QUERY_FORMAT },
439 { static_cast<int>(OP_QUERY_FIELD_FAILURE), INVALID_QUERY_FIELD },
440 { static_cast<int>(OP_NOT_SUPPORT), NOT_SUPPORT },
441 { static_cast<int>(OP_INTERCEPT_DATA_FAIL), INTERCEPT_DATA_FAIL },
442 { static_cast<int>(OP_MAX_LIMITS), OVER_MAX_LIMITS },
443 { static_cast<int>(OP_SCHEMA_CHANGED), DISTRIBUTED_SCHEMA_CHANGED },
444 { static_cast<int>(OP_INVALID_ARGS), INVALID_ARGS },
445 { static_cast<int>(OP_USER_CHANGED), USER_CHANGED },
446 { static_cast<int>(OP_DENIED_SQL), NO_PERMISSION },
447 { static_cast<int>(OP_NOTADB_OR_CORRUPTED), INVALID_PASSWD_OR_CORRUPTED_DB },
448 { static_cast<int>(OP_FAILED), DB_ERROR },
449 };
450 const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
451 [operationStatus](const auto &node) {
452 return node.operationStatus == operationStatus;
453 });
454 return result == std::end(syncOperationStatusNodes) ? static_cast<DBStatus>(operationStatus) : result->status;
455 }
456
DBStatusTransProcess(int operationStatus)457 ProcessStatus SyncOperation::DBStatusTransProcess(int operationStatus)
458 {
459 static const SyncOperationProcessStatus syncOperationProcessStatus[] = {
460 { static_cast<int>(OP_WAITING), PREPARED },
461 { static_cast<int>(OP_SYNCING), PROCESSING },
462 { static_cast<int>(OP_SEND_FINISHED), PROCESSING },
463 { static_cast<int>(OP_RECV_FINISHED), PROCESSING },
464 { static_cast<int>(OP_FINISHED_ALL), FINISHED },
465 { static_cast<int>(OP_COMM_ABNORMAL), FINISHED },
466 };
467 const auto &result = std::find_if(std::begin(syncOperationProcessStatus), std::end(syncOperationProcessStatus),
468 [operationStatus](const auto &node) {
469 return node.operationStatus == operationStatus;
470 });
471 return result == std::end(syncOperationProcessStatus) ? FINISHED : result->proStatus;
472 }
473
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)474 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
475 {
476 std::string msg = "Sync detail is:";
477 for (const auto &[dev, status]: finishStatus) {
478 msg += "dev=" + DBCommon::StringMasking(dev);
479 if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
480 msg += " sync failed, reason is " + std::to_string(status);
481 } else {
482 msg += " sync success";
483 }
484 msg += " ";
485 }
486 msg.pop_back();
487 return msg;
488 }
489 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
490 } // namespace DistributedDB