• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21 
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24     int mode, const UserCallback &userCallback, bool isBlockSync)
25     : devices_(devices),
26       syncId_(syncId),
27       mode_(mode),
28       userCallback_(userCallback),
29       isBlockSync_(isBlockSync),
30       isAutoSync_(false),
31       isFinished_(false),
32       semaphore_(nullptr),
33       query_(QuerySyncObject()),
34       isQuerySync_(false),
35       isAutoSubscribe_(false),
36       isRetry_(true)
37 {
38 }
39 
SyncOperation(uint32_t syncId,const ISyncer::SyncParam & param)40 SyncOperation::SyncOperation(uint32_t syncId, const ISyncer::SyncParam &param)
41     : devices_(param.devices),
42       syncId_(syncId),
43       mode_(param.mode),
44       userCallback_(param.onComplete),
45       isBlockSync_(param.wait),
46       isAutoSync_(false),
47       isFinished_(false),
48       semaphore_(nullptr),
49       query_(QuerySyncObject()),
50       isQuerySync_(false),
51       isAutoSubscribe_(false),
52       isRetry_(param.isRetry)
53 {
54 }
55 
~SyncOperation()56 SyncOperation::~SyncOperation()
57 {
58     RefObject::DecObjRef(context_);
59     LOGD("SyncOperation::~SyncOperation()");
60     Finalize();
61 }
62 
Initialize()63 int SyncOperation::Initialize()
64 {
65     LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
66     std::map<std::string, DeviceSyncProcess> tempSyncProcessMap;
67     {
68         AutoLock lockGuard(this);
69         for (const std::string &deviceId : devices_) {
70             statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
71             DeviceSyncProcess processInfo;
72             processInfo.errCode = static_cast<DBStatus>(OP_WAITING);
73             processInfo.syncId = syncId_;
74             syncProcessMap_.insert(std::pair<std::string, DeviceSyncProcess>(deviceId, processInfo));
75         }
76 
77         if (mode_ == AUTO_PUSH) {
78             mode_ = PUSH;
79             isAutoSync_ = true;
80         } else if (mode_ == AUTO_PULL) {
81             mode_ = PULL;
82             isAutoSync_ = true;
83         } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
84             mode_ = SUBSCRIBE_QUERY;
85             isAutoSubscribe_ = true;
86         }
87         if (isBlockSync_) {
88             semaphore_ = std::make_unique<SemaphoreUtils>(0);
89         }
90         tempSyncProcessMap = syncProcessMap_;
91     }
92     if (userSyncProcessCallback_) {
93         ExeSyncProcessCallFun(tempSyncProcessMap);
94     }
95 
96     return E_OK;
97 }
98 
SetOnSyncFinalize(const OnSyncFinalize & callback)99 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
100 {
101     onFinalize_ = callback;
102 }
103 
SetOnSyncFinished(const OnSyncFinished & callback)104 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
105 {
106     onFinished_ = callback;
107 }
108 
SetStatus(const std::string & deviceId,int status,int commErrCode)109 void SyncOperation::SetStatus(const std::string &deviceId, int status, int commErrCode)
110 {
111     LOGD("[SyncOperation] SetStatus dev %s{private} status %d commErrCode %d", deviceId.c_str(), status, commErrCode);
112     AutoLock lockGuard(this);
113     if (IsKilled()) {
114         LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
115         return;
116     }
117     if (isFinished_) {
118         LOGI("[SyncOperation] SetStatus already finished");
119         return;
120     }
121 
122     if (userSyncProcessCallback_) {
123         if (syncProcessMap_[deviceId].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
124             syncProcessMap_[deviceId].errCode = static_cast<DBStatus>(status);
125         }
126     }
127 
128     auto iter = statuses_.find(deviceId);
129     if (iter != statuses_.end()) {
130         if (iter->second >= OP_FINISHED_ALL) {
131             return;
132         }
133         iter->second = status;
134         if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
135             return;
136         }
137         commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
138     }
139 }
140 
SetUnfinishedDevStatus(int status)141 void SyncOperation::SetUnfinishedDevStatus(int status)
142 {
143     LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
144     AutoLock lockGuard(this);
145     if (IsKilled()) {
146         LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
147         return;
148     }
149     if (isFinished_) {
150         LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
151         return;
152     }
153     for (auto &item : statuses_) {
154         if (item.second >= OP_FINISHED_ALL) {
155             continue;
156         }
157         item.second = status;
158     }
159 }
160 
GetStatus(const std::string & deviceId) const161 int SyncOperation::GetStatus(const std::string &deviceId) const
162 {
163     AutoLock lockGuard(this);
164     auto iter = statuses_.find(deviceId);
165     if (iter != statuses_.end()) {
166         return iter->second;
167     }
168     return -E_INVALID_ARGS;
169 }
170 
GetSyncId() const171 uint32_t SyncOperation::GetSyncId() const
172 {
173     return syncId_;
174 }
175 
GetMode() const176 int SyncOperation::GetMode() const
177 {
178     return mode_;
179 }
180 
ReplaceCommErrCode(std::map<std::string,int> & finishStatus)181 void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
182 {
183     for (auto &item : finishStatus) {
184         if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
185             continue;
186         }
187         std::string deviceId = item.first;
188         auto iter = commErrCodeMap_.find(deviceId);
189         if (iter != commErrCodeMap_.end()) {
190             item.second = iter->second;
191         }
192     }
193 }
194 
Finished()195 void SyncOperation::Finished()
196 {
197     std::map<std::string, int> tmpStatus;
198     std::map<std::string, DeviceSyncProcess> tmpProcessMap;
199     {
200         AutoLock lockGuard(this);
201         if (IsKilled() || isFinished_) {
202             return;
203         }
204         isFinished_ = true;
205         tmpStatus = statuses_;
206         tmpProcessMap = syncProcessMap_;
207         ReplaceCommErrCode(tmpStatus);
208     }
209     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
210     if (performance != nullptr) {
211         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
212     }
213     if (userCallback_) {
214         std::string msg = GetFinishDetailMsg(tmpStatus);
215         LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
216         if (IsBlockSync()) {
217             userCallback_(tmpStatus);
218         } else {
219             RefObject::IncObjRef(this);
220             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
221                 userCallback_(tmpStatus);
222                 RefObject::DecObjRef(this);
223             });
224             if (errCode != E_OK) {
225                 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
226                 RefObject::DecObjRef(this);
227             }
228         }
229     }
230 
231     if (userSyncProcessCallback_) {
232         ExeSyncProcessCallFun(tmpProcessMap);
233     }
234 
235     if (onFinished_) {
236         LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
237         onFinished_(syncId_);
238     }
239 }
240 
GetDevices() const241 const std::vector<std::string> &SyncOperation::GetDevices() const
242 {
243     return devices_;
244 }
245 
WaitIfNeed()246 void SyncOperation::WaitIfNeed()
247 {
248     if (isBlockSync_ && (semaphore_ != nullptr)) {
249         LOGD("[SyncOperation] Wait.");
250         semaphore_->WaitSemaphore();
251     }
252 }
253 
NotifyIfNeed()254 void SyncOperation::NotifyIfNeed()
255 {
256     if (isBlockSync_ && (semaphore_ != nullptr)) {
257         LOGD("[SyncOperation] Notify.");
258         semaphore_->SendSemaphore();
259     }
260 }
261 
IsAutoSync() const262 bool SyncOperation::IsAutoSync() const
263 {
264     return isAutoSync_;
265 }
266 
IsBlockSync() const267 bool SyncOperation::IsBlockSync() const
268 {
269     return isBlockSync_;
270 }
271 
IsAutoControlCmd() const272 bool SyncOperation::IsAutoControlCmd() const
273 {
274     return isAutoSubscribe_;
275 }
276 
SetSyncContext(RefObject * context)277 void SyncOperation::SetSyncContext(RefObject *context)
278 {
279     RefObject::DecObjRef(context_);
280     context_ = context;
281     RefObject::IncObjRef(context);
282 }
283 
CanCancel()284 bool SyncOperation::CanCancel()
285 {
286     return canCancel_;
287 }
288 
SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)289 void SyncOperation::SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)
290 {
291     if (callBack) {
292         canCancel_ = true;
293         this->userSyncProcessCallback_ = callBack;
294     }
295 }
296 
ExeSyncProcessCallFun(const std::map<std::string,DeviceSyncProcess> & syncProcessMap)297 void SyncOperation::ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap)
298 {
299     if (IsBlockSync()) {
300         userSyncProcessCallback_(syncProcessMap);
301     } else {
302         RefObject::IncObjRef(this);
303         int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, syncProcessMap] {
304             userSyncProcessCallback_(syncProcessMap);
305             RefObject::DecObjRef(this);
306         });
307         if (errCode != E_OK) {
308             LOGE("[SyncOperation] ExeSyncProcessCallFun retCode:%d", errCode);
309             RefObject::DecObjRef(this);
310         }
311     }
312 }
313 
UpdateFinishedCount(const std::string & deviceId,uint32_t count)314 void SyncOperation::UpdateFinishedCount(const std::string &deviceId, uint32_t count)
315 {
316     if (this->userSyncProcessCallback_) {
317         std::map<std::string, DeviceSyncProcess> tmpMap;
318         {
319             AutoLock lockGuard(this);
320             if (IsKilled()) {
321                 return;
322             }
323             LOGD("[UpdateFinishedCount] deviceId %s{private} count %u", deviceId.c_str(), count);
324             this->syncProcessMap_[deviceId].pullInfo.finishedCount += count;
325             tmpMap = this->syncProcessMap_;
326         }
327         ExeSyncProcessCallFun(tmpMap);
328     }
329 }
330 
SetSyncProcessTotal(const std::string & deviceId,uint32_t total)331 void SyncOperation::SetSyncProcessTotal(const std::string &deviceId, uint32_t total)
332 {
333     if (this->userSyncProcessCallback_) {
334         {
335             AutoLock lockGuard(this);
336             if (IsKilled()) {
337                 return;
338             }
339             LOGD("[SetSyncProcessTotal] total=%u, syncId=%u, deviceId=%s{private}", total, syncId_, deviceId.c_str());
340             this->syncProcessMap_[deviceId].pullInfo.total = total;
341         }
342     }
343 }
344 
CheckIsAllFinished() const345 bool SyncOperation::CheckIsAllFinished() const
346 {
347     AutoLock lockGuard(this);
348     for (const auto &iter : statuses_) {
349         if (iter.second < OP_FINISHED_ALL) {
350             return false;
351         }
352     }
353     return true;
354 }
355 
Finalize()356 void SyncOperation::Finalize()
357 {
358     if ((syncId_ > 0) && onFinalize_) {
359         LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
360         onFinalize_();
361     }
362 }
363 
SetQuery(const QuerySyncObject & query)364 void SyncOperation::SetQuery(const QuerySyncObject &query)
365 {
366     std::lock_guard<std::mutex> lock(queryMutex_);
367     query_ = query;
368     isQuerySync_ = true;
369     if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
370         mode_ += QUERY_SYNC_MODE_BASE;
371     }
372 }
373 
GetQuery(QuerySyncObject & targetObject) const374 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
375 {
376     std::lock_guard<std::mutex> lock(queryMutex_);
377     targetObject = query_;
378 }
379 
IsQuerySync() const380 bool SyncOperation::IsQuerySync() const
381 {
382     return isQuerySync_;
383 }
384 
SetIdentifier(const std::vector<uint8_t> & identifier)385 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
386 {
387     identifier_.assign(identifier.begin(), identifier.end());
388 }
389 
390 namespace {
391 struct SyncTypeNode {
392     int mode = static_cast<int>(SyncModeType::INVALID_MODE);
393     SyncType type = SyncType::INVALID_SYNC_TYPE;
394 };
395 struct SyncOperationStatusNode {
396     int operationStatus = 0;
397     DBStatus status = DBStatus::DB_ERROR;
398 };
399 struct SyncOperationProcessStatus {
400     int operationStatus;
401     ProcessStatus proStatus;
402 };
403 }
404 
GetSyncType(int mode)405 SyncType SyncOperation::GetSyncType(int mode)
406 {
407     static const SyncTypeNode syncTypeNodes[] = {
408         {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
409         {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
410         {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
411         {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
412         {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
413         {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
414         {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
415         {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
416         {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
417     };
418     const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
419         return node.mode == mode;
420     });
421     return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
422 }
423 
TransferSyncMode(int mode)424 int SyncOperation::TransferSyncMode(int mode)
425 {
426     // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
427     // so for the three mode, it is no need to transferred.
428     if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
429         return (mode - QUERY_SYNC_MODE_BASE);
430     }
431     return mode;
432 }
433 
GetQueryId() const434 std::string SyncOperation::GetQueryId() const
435 {
436     std::lock_guard<std::mutex> lock(queryMutex_);
437     return query_.GetIdentify();
438 }
439 
DBStatusTrans(int operationStatus)440 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
441 {
442     static const SyncOperationStatusNode syncOperationStatusNodes[] = {
443         { static_cast<int>(OP_FINISHED_ALL),                  OK },
444         { static_cast<int>(OP_WAITING),                       OK },
445         { static_cast<int>(OP_SYNCING),                       OK },
446         { static_cast<int>(OP_SEND_FINISHED),                 OK },
447         { static_cast<int>(OP_RECV_FINISHED),                 OK },
448         { static_cast<int>(OP_TIMEOUT),                       TIME_OUT },
449         { static_cast<int>(OP_PERMISSION_CHECK_FAILED),       PERMISSION_CHECK_FORBID_SYNC },
450         { static_cast<int>(OP_COMM_ABNORMAL),                 COMM_FAILURE },
451         { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
452         { static_cast<int>(OP_EKEYREVOKED_FAILURE),           EKEYREVOKED_ERROR },
453         { static_cast<int>(OP_SCHEMA_INCOMPATIBLE),           SCHEMA_MISMATCH },
454         { static_cast<int>(OP_BUSY_FAILURE),                  BUSY },
455         { static_cast<int>(OP_QUERY_FORMAT_FAILURE),          INVALID_QUERY_FORMAT },
456         { static_cast<int>(OP_QUERY_FIELD_FAILURE),           INVALID_QUERY_FIELD },
457         { static_cast<int>(OP_NOT_SUPPORT),                   NOT_SUPPORT },
458         { static_cast<int>(OP_INTERCEPT_DATA_FAIL),           INTERCEPT_DATA_FAIL },
459         { static_cast<int>(OP_MAX_LIMITS),                    OVER_MAX_LIMITS },
460         { static_cast<int>(OP_SCHEMA_CHANGED),                DISTRIBUTED_SCHEMA_CHANGED },
461         { static_cast<int>(OP_INVALID_ARGS),                  INVALID_ARGS },
462         { static_cast<int>(OP_USER_CHANGED),                  USER_CHANGED },
463         { static_cast<int>(OP_DENIED_SQL),                    NO_PERMISSION },
464         { static_cast<int>(OP_NOTADB_OR_CORRUPTED),           INVALID_PASSWD_OR_CORRUPTED_DB },
465         { static_cast<int>(OP_DB_CLOSING),                    OK },
466         { static_cast<int>(OP_FAILED),                        DB_ERROR },
467     };
468     const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
469         [operationStatus](const auto &node) {
470             return node.operationStatus == operationStatus;
471         });
472     return result == std::end(syncOperationStatusNodes) ? static_cast<DBStatus>(operationStatus) : result->status;
473 }
474 
DBStatusTransProcess(int operationStatus)475 ProcessStatus SyncOperation::DBStatusTransProcess(int operationStatus)
476 {
477     static const SyncOperationProcessStatus syncOperationProcessStatus[] = {
478         { static_cast<int>(OP_WAITING),                       PREPARED },
479         { static_cast<int>(OP_SYNCING),                       PROCESSING },
480         { static_cast<int>(OP_SEND_FINISHED),                 PROCESSING },
481         { static_cast<int>(OP_RECV_FINISHED),                 PROCESSING },
482         { static_cast<int>(OP_FINISHED_ALL),                  FINISHED },
483         { static_cast<int>(OP_COMM_ABNORMAL),                 FINISHED },
484     };
485     const auto &result = std::find_if(std::begin(syncOperationProcessStatus), std::end(syncOperationProcessStatus),
486         [operationStatus](const auto &node) {
487             return node.operationStatus == operationStatus;
488         });
489     return result == std::end(syncOperationProcessStatus) ? FINISHED : result->proStatus;
490 }
491 
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)492 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
493 {
494     std::string msg = "Sync detail is:";
495     for (const auto &[dev, status]: finishStatus) {
496         msg += "dev=" + DBCommon::StringMasking(dev);
497         if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
498             msg += " sync failed, reason is " + std::to_string(status);
499         } else {
500             msg += " sync success";
501         }
502         msg += " ";
503     }
504     msg.pop_back();
505     return msg;
506 }
507 
IsRetryTask() const508 bool SyncOperation::IsRetryTask() const
509 {
510     return isRetry_;
511 }
512 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
513 } // namespace DistributedDB