1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_operation.h"
17 #include "db_errno.h"
18 #include "log_print.h"
19 #include "performance_analysis.h"
20
21 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)22 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
23 int mode, const UserCallback &userCallback, bool isBlockSync)
24 : devices_(devices),
25 syncId_(syncId),
26 mode_(mode),
27 userCallback_(userCallback),
28 isBlockSync_(isBlockSync),
29 isAutoSync_(false),
30 isFinished_(false),
31 semaphore_(nullptr),
32 query_(QuerySyncObject()),
33 isQuerySync_(false),
34 isAutoSubscribe_(false)
35 {
36 }
37
~SyncOperation()38 SyncOperation::~SyncOperation()
39 {
40 RefObject::DecObjRef(context_);
41 LOGD("SyncOperation::~SyncOperation()");
42 Finalize();
43 }
44
Initialize()45 int SyncOperation::Initialize()
46 {
47 LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
48 AutoLock lockGuard(this);
49 for (const std::string &deviceId : devices_) {
50 statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
51 }
52
53 if (mode_ == AUTO_PUSH) {
54 mode_ = PUSH;
55 isAutoSync_ = true;
56 } else if (mode_ == AUTO_PULL) {
57 mode_ = PULL;
58 isAutoSync_ = true;
59 } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
60 mode_ = SUBSCRIBE_QUERY;
61 isAutoSubscribe_ = true;
62 }
63 if (isBlockSync_) {
64 semaphore_ = std::make_unique<SemaphoreUtils>(0);
65 }
66
67 return E_OK;
68 }
69
SetOnSyncFinalize(const OnSyncFinalize & callback)70 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
71 {
72 onFinalize_ = callback;
73 }
74
SetOnSyncFinished(const OnSyncFinished & callback)75 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
76 {
77 onFinished_ = callback;
78 }
79
SetStatus(const std::string & deviceId,int status)80 void SyncOperation::SetStatus(const std::string &deviceId, int status)
81 {
82 LOGD("[SyncOperation] SetStatus dev %s{private} status %d", deviceId.c_str(), status);
83 AutoLock lockGuard(this);
84 if (IsKilled()) {
85 LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
86 return;
87 }
88 if (isFinished_) {
89 LOGI("[SyncOperation] SetStatus already finished");
90 return;
91 }
92
93 auto iter = statuses_.find(deviceId);
94 if (iter != statuses_.end()) {
95 if (iter->second >= OP_FINISHED_ALL) {
96 return;
97 }
98 iter->second = status;
99 return;
100 }
101 }
102
SetUnfinishedDevStatus(int status)103 void SyncOperation::SetUnfinishedDevStatus(int status)
104 {
105 LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
106 AutoLock lockGuard(this);
107 if (IsKilled()) {
108 LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
109 return;
110 }
111 if (isFinished_) {
112 LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
113 return;
114 }
115 for (auto &item : statuses_) {
116 if (item.second >= OP_FINISHED_ALL) {
117 continue;
118 }
119 item.second = status;
120 }
121 }
122
GetStatus(const std::string & deviceId) const123 int SyncOperation::GetStatus(const std::string &deviceId) const
124 {
125 AutoLock lockGuard(this);
126 auto iter = statuses_.find(deviceId);
127 if (iter != statuses_.end()) {
128 return iter->second;
129 }
130 return -E_INVALID_ARGS;
131 }
132
GetSyncId() const133 uint32_t SyncOperation::GetSyncId() const
134 {
135 return syncId_;
136 }
137
GetMode() const138 int SyncOperation::GetMode() const
139 {
140 return mode_;
141 }
142
Finished()143 void SyncOperation::Finished()
144 {
145 std::map<std::string, int> tmpStatus;
146 {
147 AutoLock lockGuard(this);
148 if (IsKilled() || isFinished_) {
149 return;
150 }
151 isFinished_ = true;
152 tmpStatus = statuses_;
153 }
154 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
155 if (performance != nullptr) {
156 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
157 }
158 if (userCallback_) {
159 LOGI("[SyncOperation] Sync %d finished call onComplete.", syncId_);
160 if (IsBlockSync()) {
161 userCallback_(tmpStatus);
162 } else {
163 RefObject::IncObjRef(this);
164 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
165 userCallback_(tmpStatus);
166 RefObject::DecObjRef(this);
167 });
168 if (errCode != E_OK) {
169 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
170 RefObject::DecObjRef(this);
171 }
172 }
173 }
174 if (onFinished_) {
175 LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
176 onFinished_(syncId_);
177 }
178 }
179
GetDevices() const180 const std::vector<std::string> &SyncOperation::GetDevices() const
181 {
182 return devices_;
183 }
184
WaitIfNeed()185 void SyncOperation::WaitIfNeed()
186 {
187 if (isBlockSync_ && (semaphore_ != nullptr)) {
188 LOGD("[SyncOperation] Wait.");
189 semaphore_->WaitSemaphore();
190 }
191 }
192
NotifyIfNeed()193 void SyncOperation::NotifyIfNeed()
194 {
195 if (isBlockSync_ && (semaphore_ != nullptr)) {
196 LOGD("[SyncOperation] Notify.");
197 semaphore_->SendSemaphore();
198 }
199 }
200
IsAutoSync() const201 bool SyncOperation::IsAutoSync() const
202 {
203 return isAutoSync_;
204 }
205
IsBlockSync() const206 bool SyncOperation::IsBlockSync() const
207 {
208 return isBlockSync_;
209 }
210
IsAutoControlCmd() const211 bool SyncOperation::IsAutoControlCmd() const
212 {
213 return isAutoSubscribe_;
214 }
215
SetSyncContext(RefObject * context)216 void SyncOperation::SetSyncContext(RefObject *context)
217 {
218 RefObject::DecObjRef(context_);
219 context_ = context;
220 RefObject::IncObjRef(context);
221 }
222
CheckIsAllFinished() const223 bool SyncOperation::CheckIsAllFinished() const
224 {
225 AutoLock lockGuard(this);
226 for (const auto &iter : statuses_) {
227 if (iter.second < OP_FINISHED_ALL) {
228 return false;
229 }
230 }
231 return true;
232 }
233
Finalize()234 void SyncOperation::Finalize()
235 {
236 if ((syncId_ > 0) && onFinalize_) {
237 LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
238 onFinalize_();
239 }
240 }
241
SetQuery(const QuerySyncObject & query)242 void SyncOperation::SetQuery(const QuerySyncObject &query)
243 {
244 std::lock_guard<std::mutex> lock(queryMutex_);
245 query_ = query;
246 isQuerySync_ = true;
247 if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
248 mode_ += QUERY_SYNC_MODE_BASE;
249 }
250 }
251
GetQuery(QuerySyncObject & targetObject) const252 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
253 {
254 std::lock_guard<std::mutex> lock(queryMutex_);
255 targetObject = query_;
256 }
257
IsQuerySync() const258 bool SyncOperation::IsQuerySync() const
259 {
260 return isQuerySync_;
261 }
262
SetIdentifier(const std::vector<uint8_t> & identifier)263 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
264 {
265 identifier_.assign(identifier.begin(), identifier.end());
266 }
267
268 namespace {
269 struct SyncTypeNode {
270 int mode = static_cast<int>(SyncModeType::INVALID_MODE);
271 SyncType type = SyncType::INVALID_SYNC_TYPE;
272 };
273 struct SyncOperationStatusNode {
274 int operationStatus = 0;
275 DBStatus status = DBStatus::DB_ERROR;
276 };
277 }
278
GetSyncType(int mode)279 SyncType SyncOperation::GetSyncType(int mode)
280 {
281 static const SyncTypeNode syncTypeNodes[] = {
282 {static_cast<int>(SyncModeType::PUSH), SyncType::MANUAL_FULL_SYNC_TYPE},
283 {static_cast<int>(SyncModeType::PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
284 {static_cast<int>(SyncModeType::PUSH_AND_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
285 {static_cast<int>(SyncModeType::RESPONSE_PULL), SyncType::MANUAL_FULL_SYNC_TYPE},
286 {static_cast<int>(SyncModeType::AUTO_PULL), SyncType::AUTO_SYNC_TYPE},
287 {static_cast<int>(SyncModeType::AUTO_PUSH), SyncType::AUTO_SYNC_TYPE},
288 {static_cast<int>(SyncModeType::QUERY_PUSH), SyncType::QUERY_SYNC_TYPE},
289 {static_cast<int>(SyncModeType::QUERY_PULL), SyncType::QUERY_SYNC_TYPE},
290 {static_cast<int>(SyncModeType::QUERY_PUSH_PULL), SyncType::QUERY_SYNC_TYPE}
291 };
292 const auto &result = std::find_if(std::begin(syncTypeNodes), std::end(syncTypeNodes), [mode](const auto &node) {
293 return node.mode == mode;
294 });
295 return result == std::end(syncTypeNodes) ? SyncType::INVALID_SYNC_TYPE : result->type;
296 }
297
TransferSyncMode(int mode)298 int SyncOperation::TransferSyncMode(int mode)
299 {
300 // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
301 // so for the three mode, it is no need to transferred.
302 if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
303 return (mode - QUERY_SYNC_MODE_BASE);
304 }
305 return mode;
306 }
307
GetQueryId() const308 std::string SyncOperation::GetQueryId() const
309 {
310 std::lock_guard<std::mutex> lock(queryMutex_);
311 return query_.GetIdentify();
312 }
313
DBStatusTrans(int operationStatus)314 DBStatus SyncOperation::DBStatusTrans(int operationStatus)
315 {
316 static const SyncOperationStatusNode syncOperationStatusNodes[] = {
317 { static_cast<int>(OP_FINISHED_ALL), OK },
318 { static_cast<int>(OP_TIMEOUT), TIME_OUT },
319 { static_cast<int>(OP_PERMISSION_CHECK_FAILED), PERMISSION_CHECK_FORBID_SYNC },
320 { static_cast<int>(OP_COMM_ABNORMAL), COMM_FAILURE },
321 { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
322 { static_cast<int>(OP_EKEYREVOKED_FAILURE), EKEYREVOKED_ERROR },
323 { static_cast<int>(OP_SCHEMA_INCOMPATIBLE), SCHEMA_MISMATCH },
324 { static_cast<int>(OP_BUSY_FAILURE), BUSY },
325 { static_cast<int>(OP_QUERY_FORMAT_FAILURE), INVALID_QUERY_FORMAT },
326 { static_cast<int>(OP_QUERY_FIELD_FAILURE), INVALID_QUERY_FIELD },
327 { static_cast<int>(OP_NOT_SUPPORT), NOT_SUPPORT },
328 { static_cast<int>(OP_INTERCEPT_DATA_FAIL), INTERCEPT_DATA_FAIL },
329 { static_cast<int>(OP_MAX_LIMITS), OVER_MAX_LIMITS },
330 { static_cast<int>(OP_SCHEMA_CHANGED), DISTRIBUTED_SCHEMA_CHANGED },
331 { static_cast<int>(OP_INVALID_ARGS), INVALID_ARGS },
332 { static_cast<int>(OP_USER_CHANGED), USER_CHANGED },
333 { static_cast<int>(OP_DENIED_SQL), NO_PERMISSION },
334 { static_cast<int>(OP_NOTADB_OR_CORRUPTED), INVALID_PASSWD_OR_CORRUPTED_DB },
335 };
336 const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes),
337 [operationStatus](const auto &node) {
338 return node.operationStatus == operationStatus;
339 });
340 return result == std::end(syncOperationStatusNodes) ? DB_ERROR : result->status;
341 }
342 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
343 } // namespace DistributedDB