• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_operation.h"
17 #include "db_common.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "performance_analysis.h"
21 
22 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)23 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
24     int mode, const UserCallback &userCallback, bool isBlockSync)
25     : devices_(devices),
26       syncId_(syncId),
27       mode_(mode),
28       userCallback_(userCallback),
29       isBlockSync_(isBlockSync),
30       isAutoSync_(false),
31       isFinished_(false),
32       semaphore_(nullptr),
33       query_(QuerySyncObject()),
34       isQuerySync_(false),
35       isAutoSubscribe_(false)
36 {
37 }
38 
~SyncOperation()39 SyncOperation::~SyncOperation()
40 {
41     RefObject::DecObjRef(context_);
42     LOGD("SyncOperation::~SyncOperation()");
43     Finalize();
44 }
45 
Initialize()46 int SyncOperation::Initialize()
47 {
48     LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
49     AutoLock lockGuard(this);
50     for (const std::string &deviceId : devices_) {
51         statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
52     }
53 
54     if (mode_ == AUTO_PUSH) {
55         mode_ = PUSH;
56         isAutoSync_ = true;
57     } else if (mode_ == AUTO_PULL) {
58         mode_ = PULL;
59         isAutoSync_ = true;
60     } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
61         mode_ = SUBSCRIBE_QUERY;
62         isAutoSubscribe_ = true;
63     }
64     if (isBlockSync_) {
65         semaphore_ = std::make_unique<SemaphoreUtils>(0);
66     }
67 
68     return E_OK;
69 }
70 
SetOnSyncFinalize(const OnSyncFinalize & callback)71 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
72 {
73     onFinalize_ = callback;
74 }
75 
SetOnSyncFinished(const OnSyncFinished & callback)76 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
77 {
78     onFinished_ = callback;
79 }
80 
SetStatus(const std::string & deviceId,int status)81 void SyncOperation::SetStatus(const std::string &deviceId, int status)
82 {
83     LOGD("[SyncOperation] SetStatus dev %s{private} status %d", deviceId.c_str(), status);
84     AutoLock lockGuard(this);
85     if (IsKilled()) {
86         LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
87         return;
88     }
89     if (isFinished_) {
90         LOGI("[SyncOperation] SetStatus already finished");
91         return;
92     }
93 
94     auto iter = statuses_.find(deviceId);
95     if (iter != statuses_.end()) {
96         if (iter->second >= OP_FINISHED_ALL) {
97             return;
98         }
99         iter->second = status;
100         return;
101     }
102 }
103 
SetUnfinishedDevStatus(int status)104 void SyncOperation::SetUnfinishedDevStatus(int status)
105 {
106     LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
107     AutoLock lockGuard(this);
108     if (IsKilled()) {
109         LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
110         return;
111     }
112     if (isFinished_) {
113         LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
114         return;
115     }
116     for (auto &item : statuses_) {
117         if (item.second >= OP_FINISHED_ALL) {
118             continue;
119         }
120         item.second = status;
121     }
122 }
123 
GetStatus(const std::string & deviceId) const124 int SyncOperation::GetStatus(const std::string &deviceId) const
125 {
126     AutoLock lockGuard(this);
127     auto iter = statuses_.find(deviceId);
128     if (iter != statuses_.end()) {
129         return iter->second;
130     }
131     return -E_INVALID_ARGS;
132 }
133 
GetSyncId() const134 uint32_t SyncOperation::GetSyncId() const
135 {
136     return syncId_;
137 }
138 
GetMode() const139 int SyncOperation::GetMode() const
140 {
141     return mode_;
142 }
143 
Finished()144 void SyncOperation::Finished()
145 {
146     std::map<std::string, int> tmpStatus;
147     {
148         AutoLock lockGuard(this);
149         if (IsKilled() || isFinished_) {
150             return;
151         }
152         isFinished_ = true;
153         tmpStatus = statuses_;
154     }
155     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
156     if (performance != nullptr) {
157         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
158     }
159     if (userCallback_) {
160         std::string msg = GetFinishDetailMsg(tmpStatus);
161         LOGI("[SyncOperation] SyncId=%d finished, %s", syncId_, msg.c_str());
162         if (IsBlockSync()) {
163             userCallback_(tmpStatus);
164         } else {
165             RefObject::IncObjRef(this);
166             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
167                 userCallback_(tmpStatus);
168                 RefObject::DecObjRef(this);
169             });
170             if (errCode != E_OK) {
171                 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
172                 RefObject::DecObjRef(this);
173             }
174         }
175     }
176     if (onFinished_) {
177         LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
178         onFinished_(syncId_);
179     }
180 }
181 
GetDevices() const182 const std::vector<std::string> &SyncOperation::GetDevices() const
183 {
184     return devices_;
185 }
186 
WaitIfNeed()187 void SyncOperation::WaitIfNeed()
188 {
189     if (isBlockSync_ && (semaphore_ != nullptr)) {
190         LOGD("[SyncOperation] Wait.");
191         semaphore_->WaitSemaphore();
192     }
193 }
194 
NotifyIfNeed()195 void SyncOperation::NotifyIfNeed()
196 {
197     if (isBlockSync_ && (semaphore_ != nullptr)) {
198         LOGD("[SyncOperation] Notify.");
199         semaphore_->SendSemaphore();
200     }
201 }
202 
IsAutoSync() const203 bool SyncOperation::IsAutoSync() const
204 {
205     return isAutoSync_;
206 }
207 
IsBlockSync() const208 bool SyncOperation::IsBlockSync() const
209 {
210     return isBlockSync_;
211 }
212 
IsAutoControlCmd() const213 bool SyncOperation::IsAutoControlCmd() const
214 {
215     return isAutoSubscribe_;
216 }
217 
SetSyncContext(RefObject * context)218 void SyncOperation::SetSyncContext(RefObject *context)
219 {
220     RefObject::DecObjRef(context_);
221     context_ = context;
222     RefObject::IncObjRef(context);
223 }
224 
CheckIsAllFinished() const225 bool SyncOperation::CheckIsAllFinished() const
226 {
227     AutoLock lockGuard(this);
228     for (const auto &iter : statuses_) {
229         if (iter.second < OP_FINISHED_ALL) {
230             return false;
231         }
232     }
233     return true;
234 }
235 
Finalize()236 void SyncOperation::Finalize()
237 {
238     if ((syncId_ > 0) && onFinalize_) {
239         LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
240         onFinalize_();
241     }
242 }
243 
SetQuery(const QuerySyncObject & query)244 void SyncOperation::SetQuery(const QuerySyncObject &query)
245 {
246     std::lock_guard<std::mutex> lock(queryMutex_);
247     query_ = query;
248     isQuerySync_ = true;
249     if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
250         mode_ += QUERY_SYNC_MODE_BASE;
251     }
252 }
253 
GetQuery(QuerySyncObject & targetObject) const254 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
255 {
256     std::lock_guard<std::mutex> lock(queryMutex_);
257     targetObject = query_;
258 }
259 
IsQuerySync() const260 bool SyncOperation::IsQuerySync() const
261 {
262     return isQuerySync_;
263 }
264 
SetIdentifier(const std::vector<uint8_t> & identifier)265 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
266 {
267     identifier_.assign(identifier.begin(), identifier.end());
268 }
269 
270 namespace {
271 struct SyncTypeNode {
272     int mode = static_cast<int>(SyncModeType::INVALID_MODE);
273     SyncType type = SyncType::INVALID_SYNC_TYPE;
274 };
275 struct SyncOperationStatusNode {
276     int operationStatus = 0;
277     DBStatus status = DBStatus::DB_ERROR;
278 };
279 }
280 
GetSyncType(int mode)281 SyncType SyncOperation::GetSyncType(int mode)
282 {
283     static const SyncTypeNode syncTypeNodes[] = {
284         {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
285         {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
286         {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
287         {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
288         {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
289         {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
290         {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
291         {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
292         {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
293     };
294     const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
295         return node.mode == mode;
296     });
297     return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
298 }
299 
TransferSyncMode(int mode)300 int SyncOperation::TransferSyncMode(int mode)
301 {
302     // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
303     // so for the three mode, it is no need to transferred.
304     if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
305         return (mode - QUERY_SYNC_MODE_BASE);
306     }
307     return mode;
308 }
309 
GetQueryId() const310 std::string SyncOperation::GetQueryId() const
311 {
312     std::lock_guard<std::mutex> lock(queryMutex_);
313     return query_.GetIdentify();
314 }
315 
DBStatusTrans(int operationStatus)316 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
317 {
318     static const SyncOperationStatusNode syncOperationStatusNodes[] = {
319         { static_cast<int>(OP_FINISHED_ALL),                  OK },
320         { static_cast<int>(OP_TIMEOUT),                       TIME_OUT },
321         { static_cast<int>(OP_PERMISSION_CHECK_FAILED),       PERMISSION_CHECK_FORBID_SYNC },
322         { static_cast<int>(OP_COMM_ABNORMAL),                 COMM_FAILURE },
323         { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
324         { static_cast<int>(OP_EKEYREVOKED_FAILURE),           EKEYREVOKED_ERROR },
325         { static_cast<int>(OP_SCHEMA_INCOMPATIBLE),           SCHEMA_MISMATCH },
326         { static_cast<int>(OP_BUSY_FAILURE),                  BUSY },
327         { static_cast<int>(OP_QUERY_FORMAT_FAILURE),          INVALID_QUERY_FORMAT },
328         { static_cast<int>(OP_QUERY_FIELD_FAILURE),           INVALID_QUERY_FIELD },
329         { static_cast<int>(OP_NOT_SUPPORT),                   NOT_SUPPORT },
330         { static_cast<int>(OP_INTERCEPT_DATA_FAIL),           INTERCEPT_DATA_FAIL },
331         { static_cast<int>(OP_MAX_LIMITS),                    OVER_MAX_LIMITS },
332         { static_cast<int>(OP_SCHEMA_CHANGED),                DISTRIBUTED_SCHEMA_CHANGED },
333         { static_cast<int>(OP_INVALID_ARGS),                  INVALID_ARGS },
334         { static_cast<int>(OP_USER_CHANGED),                  USER_CHANGED },
335         { static_cast<int>(OP_DENIED_SQL),                    NO_PERMISSION },
336         { static_cast<int>(OP_NOTADB_OR_CORRUPTED),           INVALID_PASSWD_OR_CORRUPTED_DB },
337     };
338     const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
339         [operationStatus](const auto &node) {
340             return node.operationStatus == operationStatus;
341         });
342     return result == std::end(syncOperationStatusNodes) ? DB_ERROR : result->status;
343 }
344 
GetFinishDetailMsg(const std::map<std::string,int> & finishStatus)345 std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
346 {
347     std::string msg = "Sync detail is:";
348     for (const auto &[dev, status]: finishStatus) {
349         msg += "dev=" + DBCommon::StringMasking(dev);
350         if (status > static_cast<int>(OP_FINISHED_ALL)) {
351             msg += " sync failed, reason is " + std::to_string(status);
352         } else {
353             msg += " sync success";
354         }
355         msg += " ";
356     }
357     msg.pop_back();
358     return msg;
359 }
360 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
361 } // namespace DistributedDB