1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_operation.h"
17 #include "db_errno.h"
18 #include "log_print.h"
19 #include "performance_analysis.h"
20
21 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)22 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
23 int mode, const UserCallback &userCallback, bool isBlockSync)
24 : devices_(devices),
25 syncId_(syncId),
26 mode_(mode),
27 userCallback_(userCallback),
28 isBlockSync_(isBlockSync),
29 isAutoSync_(false),
30 isFinished_(false),
31 semaphore_(nullptr),
32 query_(QuerySyncObject()),
33 isQuerySync_(false),
34 isAutoSubscribe_(false)
35 {
36 }
37
~SyncOperation()38 SyncOperation::~SyncOperation()
39 {
40 LOGD("SyncOperation::~SyncOperation()");
41 Finalize();
42 }
43
Initialize()44 int SyncOperation::Initialize()
45 {
46 LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
47 AutoLock lockGuard(this);
48 for (const std::string &deviceId : devices_) {
49 statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
50 }
51
52 if (mode_ == AUTO_PUSH) {
53 mode_ = PUSH;
54 isAutoSync_ = true;
55 } else if (mode_ == AUTO_PULL) {
56 mode_ = PULL;
57 isAutoSync_ = true;
58 } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
59 mode_ = SUBSCRIBE_QUERY;
60 isAutoSubscribe_ = true;
61 }
62 if (isBlockSync_) {
63 semaphore_ = std::make_unique<SemaphoreUtils>(0);
64 }
65
66 return E_OK;
67 }
68
SetOnSyncFinalize(const OnSyncFinalize & callback)69 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
70 {
71 onFinalize_ = callback;
72 }
73
SetOnSyncFinished(const OnSyncFinished & callback)74 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
75 {
76 onFinished_ = callback;
77 }
78
SetStatus(const std::string & deviceId,int status)79 void SyncOperation::SetStatus(const std::string &deviceId, int status)
80 {
81 LOGD("[SyncOperation] SetStatus dev %s{private} status %d", deviceId.c_str(), status);
82 AutoLock lockGuard(this);
83 if (IsKilled()) {
84 LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
85 return;
86 }
87 if (isFinished_) {
88 LOGI("[SyncOperation] SetStatus already finished");
89 return;
90 }
91
92 auto iter = statuses_.find(deviceId);
93 if (iter != statuses_.end()) {
94 if (iter->second >= OP_FINISHED_ALL) {
95 return;
96 }
97 iter->second = status;
98 return;
99 }
100 }
101
SetUnfinishedDevStatus(int status)102 void SyncOperation::SetUnfinishedDevStatus(int status)
103 {
104 LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
105 AutoLock lockGuard(this);
106 if (IsKilled()) {
107 LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
108 return;
109 }
110 if (isFinished_) {
111 LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
112 return;
113 }
114 for (auto &item : statuses_) {
115 if (item.second >= OP_FINISHED_ALL) {
116 continue;
117 }
118 item.second = status;
119 }
120 }
121
GetStatus(const std::string & deviceId) const122 int SyncOperation::GetStatus(const std::string &deviceId) const
123 {
124 AutoLock lockGuard(this);
125 auto iter = statuses_.find(deviceId);
126 if (iter != statuses_.end()) {
127 return iter->second;
128 }
129 return -E_INVALID_ARGS;
130 }
131
GetSyncId() const132 uint32_t SyncOperation::GetSyncId() const
133 {
134 return syncId_;
135 }
136
GetMode() const137 int SyncOperation::GetMode() const
138 {
139 return mode_;
140 }
141
Finished()142 void SyncOperation::Finished()
143 {
144 std::map<std::string, int> tmpStatus;
145 {
146 AutoLock lockGuard(this);
147 if (IsKilled() || isFinished_) {
148 return;
149 }
150 isFinished_ = true;
151 tmpStatus = statuses_;
152 }
153 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
154 if (performance != nullptr) {
155 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
156 }
157 if (userCallback_) {
158 LOGI("[SyncOperation] Sync %d finished call onComplete.", syncId_);
159 if (IsBlockSync()) {
160 userCallback_(tmpStatus);
161 } else {
162 RefObject::IncObjRef(this);
163 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
164 userCallback_(tmpStatus);
165 RefObject::DecObjRef(this);
166 });
167 if (errCode != E_OK) {
168 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
169 RefObject::DecObjRef(this);
170 }
171 }
172 }
173 if (onFinished_) {
174 LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
175 onFinished_(syncId_);
176 }
177 }
178
GetDevices() const179 const std::vector<std::string> &SyncOperation::GetDevices() const
180 {
181 return devices_;
182 }
183
WaitIfNeed()184 void SyncOperation::WaitIfNeed()
185 {
186 if (isBlockSync_ && (semaphore_ != nullptr)) {
187 LOGD("[SyncOperation] Wait.");
188 semaphore_->WaitSemaphore();
189 }
190 }
191
NotifyIfNeed()192 void SyncOperation::NotifyIfNeed()
193 {
194 if (isBlockSync_ && (semaphore_ != nullptr)) {
195 LOGD("[SyncOperation] Notify.");
196 semaphore_->SendSemaphore();
197 }
198 }
199
IsAutoSync() const200 bool SyncOperation::IsAutoSync() const
201 {
202 return isAutoSync_;
203 }
204
IsBlockSync() const205 bool SyncOperation::IsBlockSync() const
206 {
207 return isBlockSync_;
208 }
209
IsAutoControlCmd() const210 bool SyncOperation::IsAutoControlCmd() const
211 {
212 return isAutoSubscribe_;
213 }
214
CheckIsAllFinished() const215 bool SyncOperation::CheckIsAllFinished() const
216 {
217 AutoLock lockGuard(this);
218 for (const auto &iter : statuses_) {
219 if (iter.second < OP_FINISHED_ALL) {
220 return false;
221 }
222 }
223 return true;
224 }
225
Finalize()226 void SyncOperation::Finalize()
227 {
228 if ((syncId_ > 0) && onFinalize_) {
229 LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
230 onFinalize_();
231 }
232 }
233
SetQuery(const QuerySyncObject & query)234 void SyncOperation::SetQuery(const QuerySyncObject &query)
235 {
236 query_ = query;
237 isQuerySync_ = true;
238 if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
239 mode_ += QUERY_SYNC_MODE_BASE;
240 }
241 }
242
GetQuery() const243 QuerySyncObject SyncOperation::GetQuery() const
244 {
245 return query_;
246 }
247
IsQuerySync() const248 bool SyncOperation::IsQuerySync() const
249 {
250 return isQuerySync_;
251 }
252
SetIdentifier(const std::vector<uint8_t> & identifier)253 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
254 {
255 identifier_.assign(identifier.begin(), identifier.end());
256 }
257
GetSyncType(int mode)258 SyncType SyncOperation::GetSyncType(int mode)
259 {
260 static const std::map<int, SyncType> syncTypeMap = {
261 {SyncModeType::PUSH, SyncType::MANUAL_FULL_SYNC_TYPE},
262 {SyncModeType::PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
263 {SyncModeType::PUSH_AND_PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
264 {SyncModeType::RESPONSE_PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
265 {SyncModeType::AUTO_PULL, SyncType::AUTO_SYNC_TYPE},
266 {SyncModeType::AUTO_PUSH, SyncType::AUTO_SYNC_TYPE},
267 {SyncModeType::QUERY_PUSH, SyncType::QUERY_SYNC_TYPE},
268 {SyncModeType::QUERY_PULL, SyncType::QUERY_SYNC_TYPE},
269 {SyncModeType::QUERY_PUSH_PULL, SyncType::QUERY_SYNC_TYPE},
270 };
271 auto iter = syncTypeMap.find(mode);
272 if (iter != syncTypeMap.end()) {
273 return iter->second;
274 }
275 return SyncType::INVALID_SYNC_TYPE;
276 }
277
TransferSyncMode(int mode)278 int SyncOperation::TransferSyncMode(int mode)
279 {
280 // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
281 // so for the three mode, it is no need to transferred.
282 if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
283 return (mode - QUERY_SYNC_MODE_BASE);
284 }
285 return mode;
286 }
287
GetQueryId() const288 std::string SyncOperation::GetQueryId() const
289 {
290 return query_.GetIdentify();
291 }
292
DBStatusTransMap()293 const std::map<int, DBStatus> &SyncOperation::DBStatusTransMap()
294 {
295 static const std::map<int, DBStatus> transMap = {
296 { static_cast<int>(OP_FINISHED_ALL), OK },
297 { static_cast<int>(OP_TIMEOUT), TIME_OUT },
298 { static_cast<int>(OP_PERMISSION_CHECK_FAILED), PERMISSION_CHECK_FORBID_SYNC },
299 { static_cast<int>(OP_COMM_ABNORMAL), COMM_FAILURE },
300 { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
301 { static_cast<int>(OP_EKEYREVOKED_FAILURE), EKEYREVOKED_ERROR },
302 { static_cast<int>(OP_SCHEMA_INCOMPATIBLE), SCHEMA_MISMATCH },
303 { static_cast<int>(OP_BUSY_FAILURE), BUSY },
304 { static_cast<int>(OP_QUERY_FORMAT_FAILURE), INVALID_QUERY_FORMAT },
305 { static_cast<int>(OP_QUERY_FIELD_FAILURE), INVALID_QUERY_FIELD },
306 { static_cast<int>(OP_NOT_SUPPORT), NOT_SUPPORT },
307 { static_cast<int>(OP_INTERCEPT_DATA_FAIL), INTERCEPT_DATA_FAIL },
308 { static_cast<int>(OP_MAX_LIMITS), OVER_MAX_LIMITS },
309 { static_cast<int>(OP_SCHEMA_CHANGED), DISTRIBUTED_SCHEMA_CHANGED },
310 { static_cast<int>(OP_INVALID_ARGS), INVALID_ARGS },
311 { static_cast<int>(OP_USER_CHANGED), USER_CHANGED},
312 };
313 return transMap;
314 }
315 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
316 } // namespace DistributedDB