• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_operation.h"
17 #include "db_errno.h"
18 #include "log_print.h"
19 #include "performance_analysis.h"
20 
21 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)22 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
23     int mode, const UserCallback &userCallback, bool isBlockSync)
24     : devices_(devices),
25       syncId_(syncId),
26       mode_(mode),
27       userCallback_(userCallback),
28       isBlockSync_(isBlockSync),
29       isAutoSync_(false),
30       isFinished_(false),
31       semaphore_(nullptr),
32       query_(QuerySyncObject()),
33       isQuerySync_(false),
34       isAutoSubscribe_(false)
35 {
36 }
37 
~SyncOperation()38 SyncOperation::~SyncOperation()
39 {
40     RefObject::DecObjRef(context_);
41     LOGD("SyncOperation::~SyncOperation()");
42     Finalize();
43 }
44 
Initialize()45 int SyncOperation::Initialize()
46 {
47     LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
48     AutoLock lockGuard(this);
49     for (const std::string &deviceId : devices_) {
50         statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
51     }
52 
53     if (mode_ == AUTO_PUSH) {
54         mode_ = PUSH;
55         isAutoSync_ = true;
56     } else if (mode_ == AUTO_PULL) {
57         mode_ = PULL;
58         isAutoSync_ = true;
59     } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
60         mode_ = SUBSCRIBE_QUERY;
61         isAutoSubscribe_ = true;
62     }
63     if (isBlockSync_) {
64         semaphore_ = std::make_unique<SemaphoreUtils>(0);
65     }
66 
67     return E_OK;
68 }
69 
SetOnSyncFinalize(const OnSyncFinalize & callback)70 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
71 {
72     onFinalize_ = callback;
73 }
74 
SetOnSyncFinished(const OnSyncFinished & callback)75 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
76 {
77     onFinished_ = callback;
78 }
79 
SetStatus(const std::string & deviceId,int status)80 void SyncOperation::SetStatus(const std::string &deviceId, int status)
81 {
82     LOGD("[SyncOperation] SetStatus dev %s{private} status %d", deviceId.c_str(), status);
83     AutoLock lockGuard(this);
84     if (IsKilled()) {
85         LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
86         return;
87     }
88     if (isFinished_) {
89         LOGI("[SyncOperation] SetStatus already finished");
90         return;
91     }
92 
93     auto iter = statuses_.find(deviceId);
94     if (iter != statuses_.end()) {
95         if (iter->second >= OP_FINISHED_ALL) {
96             return;
97         }
98         iter->second = status;
99         return;
100     }
101 }
102 
SetUnfinishedDevStatus(int status)103 void SyncOperation::SetUnfinishedDevStatus(int status)
104 {
105     LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
106     AutoLock lockGuard(this);
107     if (IsKilled()) {
108         LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
109         return;
110     }
111     if (isFinished_) {
112         LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
113         return;
114     }
115     for (auto &item : statuses_) {
116         if (item.second >= OP_FINISHED_ALL) {
117             continue;
118         }
119         item.second = status;
120     }
121 }
122 
GetStatus(const std::string & deviceId) const123 int SyncOperation::GetStatus(const std::string &deviceId) const
124 {
125     AutoLock lockGuard(this);
126     auto iter = statuses_.find(deviceId);
127     if (iter != statuses_.end()) {
128         return iter->second;
129     }
130     return -E_INVALID_ARGS;
131 }
132 
GetSyncId() const133 uint32_t SyncOperation::GetSyncId() const
134 {
135     return syncId_;
136 }
137 
GetMode() const138 int SyncOperation::GetMode() const
139 {
140     return mode_;
141 }
142 
Finished()143 void SyncOperation::Finished()
144 {
145     std::map<std::string, int> tmpStatus;
146     {
147         AutoLock lockGuard(this);
148         if (IsKilled() || isFinished_) {
149             return;
150         }
151         isFinished_ = true;
152         tmpStatus = statuses_;
153     }
154     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
155     if (performance != nullptr) {
156         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
157     }
158     if (userCallback_) {
159         LOGI("[SyncOperation] Sync %d finished call onComplete.", syncId_);
160         if (IsBlockSync()) {
161             userCallback_(tmpStatus);
162         } else {
163             RefObject::IncObjRef(this);
164             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
165                 userCallback_(tmpStatus);
166                 RefObject::DecObjRef(this);
167             });
168             if (errCode != E_OK) {
169                 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
170                 RefObject::DecObjRef(this);
171             }
172         }
173     }
174     if (onFinished_) {
175         LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
176         onFinished_(syncId_);
177     }
178 }
179 
GetDevices() const180 const std::vector<std::string> &SyncOperation::GetDevices() const
181 {
182     return devices_;
183 }
184 
WaitIfNeed()185 void SyncOperation::WaitIfNeed()
186 {
187     if (isBlockSync_ && (semaphore_ != nullptr)) {
188         LOGD("[SyncOperation] Wait.");
189         semaphore_->WaitSemaphore();
190     }
191 }
192 
NotifyIfNeed()193 void SyncOperation::NotifyIfNeed()
194 {
195     if (isBlockSync_ && (semaphore_ != nullptr)) {
196         LOGD("[SyncOperation] Notify.");
197         semaphore_->SendSemaphore();
198     }
199 }
200 
IsAutoSync() const201 bool SyncOperation::IsAutoSync() const
202 {
203     return isAutoSync_;
204 }
205 
IsBlockSync() const206 bool SyncOperation::IsBlockSync() const
207 {
208     return isBlockSync_;
209 }
210 
IsAutoControlCmd() const211 bool SyncOperation::IsAutoControlCmd() const
212 {
213     return isAutoSubscribe_;
214 }
215 
SetSyncContext(RefObject * context)216 void SyncOperation::SetSyncContext(RefObject *context)
217 {
218     RefObject::DecObjRef(context_);
219     context_ = context;
220     RefObject::IncObjRef(context);
221 }
222 
CheckIsAllFinished() const223 bool SyncOperation::CheckIsAllFinished() const
224 {
225     AutoLock lockGuard(this);
226     for (const auto &iter : statuses_) {
227         if (iter.second < OP_FINISHED_ALL) {
228             return false;
229         }
230     }
231     return true;
232 }
233 
Finalize()234 void SyncOperation::Finalize()
235 {
236     if ((syncId_ > 0) && onFinalize_) {
237         LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
238         onFinalize_();
239     }
240 }
241 
SetQuery(const QuerySyncObject & query)242 void SyncOperation::SetQuery(const QuerySyncObject &query)
243 {
244     std::lock_guard<std::mutex> lock(queryMutex_);
245     query_ = query;
246     isQuerySync_ = true;
247     if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
248         mode_ += QUERY_SYNC_MODE_BASE;
249     }
250 }
251 
GetQuery(QuerySyncObject & targetObject) const252 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
253 {
254     std::lock_guard<std::mutex> lock(queryMutex_);
255     targetObject = query_;
256 }
257 
IsQuerySync() const258 bool SyncOperation::IsQuerySync() const
259 {
260     return isQuerySync_;
261 }
262 
SetIdentifier(const std::vector<uint8_t> & identifier)263 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
264 {
265     identifier_.assign(identifier.begin(), identifier.end());
266 }
267 
268 namespace {
269 struct SyncTypeNode {
270     int mode = static_cast<int>(SyncModeType::INVALID_MODE);
271     SyncType type = SyncType::INVALID_SYNC_TYPE;
272 };
273 struct SyncOperationStatusNode {
274     int operationStatus = 0;
275     DBStatus status = DBStatus::DB_ERROR;
276 };
277 }
278 
GetSyncType(int mode)279 SyncType SyncOperation::GetSyncType(int mode)
280 {
281     static const SyncTypeNode syncTypeNodes[] = {
282         {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
283         {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
284         {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
285         {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
286         {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
287         {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
288         {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
289         {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
290         {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
291     };
292     const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
293         return node.mode == mode;
294     });
295     return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
296 }
297 
TransferSyncMode(int mode)298 int SyncOperation::TransferSyncMode(int mode)
299 {
300     // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
301     // so for the three mode, it is no need to transferred.
302     if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
303         return (mode - QUERY_SYNC_MODE_BASE);
304     }
305     return mode;
306 }
307 
GetQueryId() const308 std::string SyncOperation::GetQueryId() const
309 {
310     std::lock_guard<std::mutex> lock(queryMutex_);
311     return query_.GetIdentify();
312 }
313 
DBStatusTrans(int operationStatus)314 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
315 {
316     static const SyncOperationStatusNode syncOperationStatusNodes[] = {
317         { static_cast<int>(OP_FINISHED_ALL),                  OK },
318         { static_cast<int>(OP_TIMEOUT),                       TIME_OUT },
319         { static_cast<int>(OP_PERMISSION_CHECK_FAILED),       PERMISSION_CHECK_FORBID_SYNC },
320         { static_cast<int>(OP_COMM_ABNORMAL),                 COMM_FAILURE },
321         { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
322         { static_cast<int>(OP_EKEYREVOKED_FAILURE),           EKEYREVOKED_ERROR },
323         { static_cast<int>(OP_SCHEMA_INCOMPATIBLE),           SCHEMA_MISMATCH },
324         { static_cast<int>(OP_BUSY_FAILURE),                  BUSY },
325         { static_cast<int>(OP_QUERY_FORMAT_FAILURE),          INVALID_QUERY_FORMAT },
326         { static_cast<int>(OP_QUERY_FIELD_FAILURE),           INVALID_QUERY_FIELD },
327         { static_cast<int>(OP_NOT_SUPPORT),                   NOT_SUPPORT },
328         { static_cast<int>(OP_INTERCEPT_DATA_FAIL),           INTERCEPT_DATA_FAIL },
329         { static_cast<int>(OP_MAX_LIMITS),                    OVER_MAX_LIMITS },
330         { static_cast<int>(OP_SCHEMA_CHANGED),                DISTRIBUTED_SCHEMA_CHANGED },
331         { static_cast<int>(OP_INVALID_ARGS),                  INVALID_ARGS },
332         { static_cast<int>(OP_USER_CHANGED),                  USER_CHANGED },
333         { static_cast<int>(OP_DENIED_SQL),                    NO_PERMISSION },
334         { static_cast<int>(OP_NOTADB_OR_CORRUPTED),           INVALID_PASSWD_OR_CORRUPTED_DB },
335     };
336     const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
337         [operationStatus](const auto &node) {
338             return node.operationStatus == operationStatus;
339         });
340     return result == std::end(syncOperationStatusNodes) ? DB_ERROR : result->status;
341 }
342 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
343 } // namespace DistributedDB