• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21 
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24     int mode, const UserCallback &userCallback, bool isBlockSync)
25     : devices_(devices),
26       syncId_(syncId),
27       mode_(mode),
28       userCallback_(userCallback),
29       isBlockSync_(isBlockSync),
30       isAutoSync_(false),
31       isFinished_(false),
32       semaphore_(nullptr),
33       query_(QuerySyncObject()),
34       isQuerySync_(false),
35       isAutoSubscribe_(false)
36 {
37 }
38 
~SyncOperation()39 SyncOperation::~SyncOperation()
40 {
41     RefObject::DecObjRef(context_);
42     LOGD("SyncOperation::~SyncOperation()");
43     Finalize();
44 }
45 
Initialize()46 int SyncOperation::Initialize()
47 {
48     LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
49     std::map<std::string, DeviceSyncProcess> tempSyncProcessMap;
50     {
51         AutoLock lockGuard(this);
52         for (const std::string &deviceId : devices_) {
53             statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
54             DeviceSyncProcess processInfo;
55             processInfo.errCode = static_cast<DBStatus>(OP_WAITING);
56             processInfo.syncId = syncId_;
57             syncProcessMap_.insert(std::pair<std::string, DeviceSyncProcess>(deviceId, processInfo));
58         }
59 
60         if (mode_ == AUTO_PUSH) {
61             mode_ = PUSH;
62             isAutoSync_ = true;
63         } else if (mode_ == AUTO_PULL) {
64             mode_ = PULL;
65             isAutoSync_ = true;
66         } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
67             mode_ = SUBSCRIBE_QUERY;
68             isAutoSubscribe_ = true;
69         }
70         if (isBlockSync_) {
71             semaphore_ = std::make_unique<SemaphoreUtils>(0);
72         }
73         tempSyncProcessMap = syncProcessMap_;
74     }
75     if (userSyncProcessCallback_) {
76         ExeSyncProcessCallFun(tempSyncProcessMap);
77     }
78 
79     return E_OK;
80 }
81 
SetOnSyncFinalize(const OnSyncFinalize & callback)82 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
83 {
84     onFinalize_ = callback;
85 }
86 
SetOnSyncFinished(const OnSyncFinished & callback)87 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
88 {
89     onFinished_ = callback;
90 }
91 
SetStatus(const std::string & deviceId,int status,int commErrCode)92 void SyncOperation::SetStatus(const std::string &deviceId, int status, int commErrCode)
93 {
94     LOGD("[SyncOperation] SetStatus dev %s{private} status %d commErrCode %d", deviceId.c_str(), status, commErrCode);
95     AutoLock lockGuard(this);
96     if (IsKilled()) {
97         LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
98         return;
99     }
100     if (isFinished_) {
101         LOGI("[SyncOperation] SetStatus already finished");
102         return;
103     }
104 
105     if (userSyncProcessCallback_) {
106         if (syncProcessMap_[deviceId].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
107             syncProcessMap_[deviceId].errCode = static_cast<DBStatus>(status);
108         }
109     }
110 
111     auto iter = statuses_.find(deviceId);
112     if (iter != statuses_.end()) {
113         if (iter->second >= OP_FINISHED_ALL) {
114             return;
115         }
116         iter->second = status;
117         if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
118             return;
119         }
120         commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
121     }
122 }
123 
SetUnfinishedDevStatus(int status)124 void SyncOperation::SetUnfinishedDevStatus(int status)
125 {
126     LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
127     AutoLock lockGuard(this);
128     if (IsKilled()) {
129         LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
130         return;
131     }
132     if (isFinished_) {
133         LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
134         return;
135     }
136     for (auto &item : statuses_) {
137         if (item.second >= OP_FINISHED_ALL) {
138             continue;
139         }
140         item.second = status;
141     }
142 }
143 
GetStatus(const std::string & deviceId) const144 int SyncOperation::GetStatus(const std::string &deviceId) const
145 {
146     AutoLock lockGuard(this);
147     auto iter = statuses_.find(deviceId);
148     if (iter != statuses_.end()) {
149         return iter->second;
150     }
151     return -E_INVALID_ARGS;
152 }
153 
GetSyncId() const154 uint32_t SyncOperation::GetSyncId() const
155 {
156     return syncId_;
157 }
158 
GetMode() const159 int SyncOperation::GetMode() const
160 {
161     return mode_;
162 }
163 
ReplaceCommErrCode(std::map<std::string,int> & finishStatus)164 void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
165 {
166     for (auto &item : finishStatus) {
167         if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
168             continue;
169         }
170         std::string deviceId = item.first;
171         auto iter = commErrCodeMap_.find(deviceId);
172         if (iter != commErrCodeMap_.end()) {
173             item.second = iter->second;
174         }
175     }
176 }
177 
Finished()178 void SyncOperation::Finished()
179 {
180     std::map<std::string, int> tmpStatus;
181     std::map<std::string, DeviceSyncProcess> tmpProcessMap;
182     {
183         AutoLock lockGuard(this);
184         if (IsKilled() || isFinished_) {
185             return;
186         }
187         isFinished_ = true;
188         tmpStatus = statuses_;
189         tmpProcessMap = syncProcessMap_;
190         ReplaceCommErrCode(tmpStatus);
191     }
192     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
193     if (performance != nullptr) {
194         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
195     }
196     if (userCallback_) {
197         std::string msg = GetFinishDetailMsg(tmpStatus);
198         LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
199         if (IsBlockSync()) {
200             userCallback_(tmpStatus);
201         } else {
202             RefObject::IncObjRef(this);
203             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
204                 userCallback_(tmpStatus);
205                 RefObject::DecObjRef(this);
206             });
207             if (errCode != E_OK) {
208                 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
209                 RefObject::DecObjRef(this);
210             }
211         }
212     }
213 
214     if (userSyncProcessCallback_) {
215         ExeSyncProcessCallFun(tmpProcessMap);
216     }
217 
218     if (onFinished_) {
219         LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
220         onFinished_(syncId_);
221     }
222 }
223 
GetDevices() const224 const std::vector<std::string> &SyncOperation::GetDevices() const
225 {
226     return devices_;
227 }
228 
WaitIfNeed()229 void SyncOperation::WaitIfNeed()
230 {
231     if (isBlockSync_ && (semaphore_ != nullptr)) {
232         LOGD("[SyncOperation] Wait.");
233         semaphore_->WaitSemaphore();
234     }
235 }
236 
NotifyIfNeed()237 void SyncOperation::NotifyIfNeed()
238 {
239     if (isBlockSync_ && (semaphore_ != nullptr)) {
240         LOGD("[SyncOperation] Notify.");
241         semaphore_->SendSemaphore();
242     }
243 }
244 
IsAutoSync() const245 bool SyncOperation::IsAutoSync() const
246 {
247     return isAutoSync_;
248 }
249 
IsBlockSync() const250 bool SyncOperation::IsBlockSync() const
251 {
252     return isBlockSync_;
253 }
254 
IsAutoControlCmd() const255 bool SyncOperation::IsAutoControlCmd() const
256 {
257     return isAutoSubscribe_;
258 }
259 
SetSyncContext(RefObject * context)260 void SyncOperation::SetSyncContext(RefObject *context)
261 {
262     RefObject::DecObjRef(context_);
263     context_ = context;
264     RefObject::IncObjRef(context);
265 }
266 
CanCancel()267 bool SyncOperation::CanCancel()
268 {
269     return canCancel_;
270 }
271 
SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)272 void SyncOperation::SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)
273 {
274     if (callBack) {
275         canCancel_ = true;
276         this->userSyncProcessCallback_ = callBack;
277     }
278 }
279 
ExeSyncProcessCallFun(const std::map<std::string,DeviceSyncProcess> & syncProcessMap)280 void SyncOperation::ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap)
281 {
282     if (IsBlockSync()) {
283         userSyncProcessCallback_(syncProcessMap);
284     } else {
285         RefObject::IncObjRef(this);
286         int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, syncProcessMap] {
287             userSyncProcessCallback_(syncProcessMap);
288             RefObject::DecObjRef(this);
289         });
290         if (errCode != E_OK) {
291             LOGE("[SyncOperation] ExeSyncProcessCallFun retCode:%d", errCode);
292             RefObject::DecObjRef(this);
293         }
294     }
295 }
296 
UpdateFinishedCount(const std::string & deviceId,uint32_t count)297 void SyncOperation::UpdateFinishedCount(const std::string &deviceId, uint32_t count)
298 {
299     if (this->userSyncProcessCallback_) {
300         std::map<std::string, DeviceSyncProcess> tmpMap;
301         {
302             AutoLock lockGuard(this);
303             if (IsKilled()) {
304                 return;
305             }
306             LOGD("[UpdateFinishedCount] deviceId %s{private} count %u", deviceId.c_str(), count);
307             this->syncProcessMap_[deviceId].pullInfo.finishedCount += count;
308             tmpMap = this->syncProcessMap_;
309         }
310         ExeSyncProcessCallFun(tmpMap);
311     }
312 }
313 
SetSyncProcessTotal(const std::string & deviceId,uint32_t total)314 void SyncOperation::SetSyncProcessTotal(const std::string &deviceId, uint32_t total)
315 {
316     if (this->userSyncProcessCallback_) {
317         {
318             AutoLock lockGuard(this);
319             if (IsKilled()) {
320                 return;
321             }
322             LOGD("[SetSyncProcessTotal] total=%u, syncId=%u, deviceId=%s{private}", total, syncId_, deviceId.c_str());
323             this->syncProcessMap_[deviceId].pullInfo.total = total;
324         }
325     }
326 }
327 
CheckIsAllFinished() const328 bool SyncOperation::CheckIsAllFinished() const
329 {
330     AutoLock lockGuard(this);
331     for (const auto &iter : statuses_) {
332         if (iter.second < OP_FINISHED_ALL) {
333             return false;
334         }
335     }
336     return true;
337 }
338 
Finalize()339 void SyncOperation::Finalize()
340 {
341     if ((syncId_ > 0) && onFinalize_) {
342         LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
343         onFinalize_();
344     }
345 }
346 
SetQuery(const QuerySyncObject & query)347 void SyncOperation::SetQuery(const QuerySyncObject &query)
348 {
349     std::lock_guard<std::mutex> lock(queryMutex_);
350     query_ = query;
351     isQuerySync_ = true;
352     if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
353         mode_ += QUERY_SYNC_MODE_BASE;
354     }
355 }
356 
GetQuery(QuerySyncObject & targetObject) const357 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
358 {
359     std::lock_guard<std::mutex> lock(queryMutex_);
360     targetObject = query_;
361 }
362 
IsQuerySync() const363 bool SyncOperation::IsQuerySync() const
364 {
365     return isQuerySync_;
366 }
367 
SetIdentifier(const std::vector<uint8_t> & identifier)368 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
369 {
370     identifier_.assign(identifier.begin(), identifier.end());
371 }
372 
373 namespace {
374 struct SyncTypeNode {
375     int mode = static_cast<int>(SyncModeType::INVALID_MODE);
376     SyncType type = SyncType::INVALID_SYNC_TYPE;
377 };
378 struct SyncOperationStatusNode {
379     int operationStatus = 0;
380     DBStatus status = DBStatus::DB_ERROR;
381 };
382 struct SyncOperationProcessStatus {
383     int operationStatus;
384     ProcessStatus proStatus;
385 };
386 }
387 
GetSyncType(int mode)388 SyncType SyncOperation::GetSyncType(int mode)
389 {
390     static const SyncTypeNode syncTypeNodes[] = {
391         {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
392         {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
393         {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
394         {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
395         {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
396         {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
397         {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
398         {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
399         {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
400     };
401     const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
402         return node.mode == mode;
403     });
404     return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
405 }
406 
TransferSyncMode(int mode)407 int SyncOperation::TransferSyncMode(int mode)
408 {
409     // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
410     // so for the three mode, it is no need to transferred.
411     if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
412         return (mode - QUERY_SYNC_MODE_BASE);
413     }
414     return mode;
415 }
416 
GetQueryId() const417 std::string SyncOperation::GetQueryId() const
418 {
419     std::lock_guard<std::mutex> lock(queryMutex_);
420     return query_.GetIdentify();
421 }
422 
DBStatusTrans(int operationStatus)423 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
424 {
425     static const SyncOperationStatusNode syncOperationStatusNodes[] = {
426         { static_cast<int>(OP_FINISHED_ALL),                  OK },
427         { static_cast<int>(OP_WAITING),                       OK },
428         { static_cast<int>(OP_SYNCING),                       OK },
429         { static_cast<int>(OP_SEND_FINISHED),                 OK },
430         { static_cast<int>(OP_RECV_FINISHED),                 OK },
431         { static_cast<int>(OP_TIMEOUT),                       TIME_OUT },
432         { static_cast<int>(OP_PERMISSION_CHECK_FAILED),       PERMISSION_CHECK_FORBID_SYNC },
433         { static_cast<int>(OP_COMM_ABNORMAL),                 COMM_FAILURE },
434         { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
435         { static_cast<int>(OP_EKEYREVOKED_FAILURE),           EKEYREVOKED_ERROR },
436         { static_cast<int>(OP_SCHEMA_INCOMPATIBLE),           SCHEMA_MISMATCH },
437         { static_cast<int>(OP_BUSY_FAILURE),                  BUSY },
438         { static_cast<int>(OP_QUERY_FORMAT_FAILURE),          INVALID_QUERY_FORMAT },
439         { static_cast<int>(OP_QUERY_FIELD_FAILURE),           INVALID_QUERY_FIELD },
440         { static_cast<int>(OP_NOT_SUPPORT),                   NOT_SUPPORT },
441         { static_cast<int>(OP_INTERCEPT_DATA_FAIL),           INTERCEPT_DATA_FAIL },
442         { static_cast<int>(OP_MAX_LIMITS),                    OVER_MAX_LIMITS },
443         { static_cast<int>(OP_SCHEMA_CHANGED),                DISTRIBUTED_SCHEMA_CHANGED },
444         { static_cast<int>(OP_INVALID_ARGS),                  INVALID_ARGS },
445         { static_cast<int>(OP_USER_CHANGED),                  USER_CHANGED },
446         { static_cast<int>(OP_DENIED_SQL),                    NO_PERMISSION },
447         { static_cast<int>(OP_NOTADB_OR_CORRUPTED),           INVALID_PASSWD_OR_CORRUPTED_DB },
448         { static_cast<int>(OP_FAILED),                        DB_ERROR },
449     };
450     const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
451         [operationStatus](const auto &node) {
452             return node.operationStatus == operationStatus;
453         });
454     return result == std::end(syncOperationStatusNodes) ? static_cast<DBStatus>(operationStatus) : result->status;
455 }
456 
DBStatusTransProcess(int operationStatus)457 ProcessStatus SyncOperation::DBStatusTransProcess(int operationStatus)
458 {
459     static const SyncOperationProcessStatus syncOperationProcessStatus[] = {
460         { static_cast<int>(OP_WAITING),                       PREPARED },
461         { static_cast<int>(OP_SYNCING),                       PROCESSING },
462         { static_cast<int>(OP_SEND_FINISHED),                 PROCESSING },
463         { static_cast<int>(OP_RECV_FINISHED),                 PROCESSING },
464         { static_cast<int>(OP_FINISHED_ALL),                  FINISHED },
465         { static_cast<int>(OP_COMM_ABNORMAL),                 FINISHED },
466     };
467     const auto &result = std::find_if(std::begin(syncOperationProcessStatus), std::end(syncOperationProcessStatus),
468         [operationStatus](const auto &node) {
469             return node.operationStatus == operationStatus;
470         });
471     return result == std::end(syncOperationProcessStatus) ? FINISHED : result->proStatus;
472 }
473 
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)474 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
475 {
476     std::string msg = "Sync detail is:";
477     for (const auto &[dev, status]: finishStatus) {
478         msg += "dev=" + DBCommon::StringMasking(dev);
479         if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
480             msg += " sync failed, reason is " + std::to_string(status);
481         } else {
482             msg += " sync success";
483         }
484         msg += " ";
485     }
486     msg.pop_back();
487     return msg;
488 }
489 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
490 } // namespace DistributedDB