1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_operation.h"
17 #include "db_errno.h"
18 #include "log_print.h"
19 #include "performance_analysis.h"
20
21 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)22 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
23 int mode, const UserCallback &userCallback, bool isBlockSync)
24 : devices_(devices),
25 syncId_(syncId),
26 mode_(mode),
27 userCallback_(userCallback),
28 isBlockSync_(isBlockSync),
29 isAutoSync_(false),
30 isFinished_(false),
31 semaphore_(nullptr),
32 query_(QuerySyncObject()),
33 isQuerySync_(false),
34 isAutoSubscribe_(false)
35 {
36 }
37
~SyncOperation()38 SyncOperation::~SyncOperation()
39 {
40 LOGD("SyncOperation::~SyncOperation()");
41 Finalize();
42 }
43
Initialize()44 int SyncOperation::Initialize()
45 {
46 LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
47 AutoLock lockGuard(this);
48 for (const std::string &deviceId : devices_) {
49 statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
50 }
51
52 if (mode_ == AUTO_PUSH) {
53 mode_ = PUSH;
54 isAutoSync_ = true;
55 } else if (mode_ == AUTO_PULL) {
56 mode_ = PULL;
57 isAutoSync_ = true;
58 } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
59 mode_ = SUBSCRIBE_QUERY;
60 isAutoSubscribe_ = true;
61 }
62 if (isBlockSync_) {
63 semaphore_ = std::make_unique<SemaphoreUtils>(0);
64 }
65
66 return E_OK;
67 }
68
SetOnSyncFinalize(const OnSyncFinalize & callback)69 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
70 {
71 onFinalize_ = callback;
72 }
73
SetOnSyncFinished(const OnSyncFinished & callback)74 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
75 {
76 onFinished_ = callback;
77 }
78
SetStatus(const std::string & deviceId,int status)79 void SyncOperation::SetStatus(const std::string &deviceId, int status)
80 {
81 LOGD("[SyncOperation] SetStatus dev %s{private} status %d", deviceId.c_str(), status);
82 AutoLock lockGuard(this);
83 if (IsKilled()) {
84 LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
85 return;
86 }
87 if (isFinished_) {
88 LOGI("[SyncOperation] SetStatus already finished");
89 return;
90 }
91
92 auto iter = statuses_.find(deviceId);
93 if (iter != statuses_.end()) {
94 if (iter->second >= OP_FINISHED_ALL) {
95 return;
96 }
97 iter->second = status;
98 return;
99 }
100 }
101
SetUnfinishedDevStatus(int status)102 void SyncOperation::SetUnfinishedDevStatus(int status)
103 {
104 LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
105 AutoLock lockGuard(this);
106 if (IsKilled()) {
107 LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
108 return;
109 }
110 if (isFinished_) {
111 LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
112 return;
113 }
114 for (auto &item : statuses_) {
115 if (item.second >= OP_FINISHED_ALL) {
116 continue;
117 }
118 item.second = status;
119 }
120 }
121
GetStatus(const std::string & deviceId) const122 int SyncOperation::GetStatus(const std::string &deviceId) const
123 {
124 AutoLock lockGuard(this);
125 auto iter = statuses_.find(deviceId);
126 if (iter != statuses_.end()) {
127 return iter->second;
128 }
129 return -E_INVALID_ARGS;
130 }
131
GetSyncId() const132 uint32_t SyncOperation::GetSyncId() const
133 {
134 return syncId_;
135 }
136
GetMode() const137 int SyncOperation::GetMode() const
138 {
139 return mode_;
140 }
141
Finished()142 void SyncOperation::Finished()
143 {
144 std::map<std::string, int> tmpStatus;
145 {
146 AutoLock lockGuard(this);
147 if (IsKilled() || isFinished_) {
148 return;
149 }
150 isFinished_ = true;
151 tmpStatus = statuses_;
152 }
153 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
154 if (performance != nullptr) {
155 performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
156 }
157 if (userCallback_) {
158 LOGI("[SyncOperation] Sync %d finished call onComplete.", syncId_);
159 if (IsBlockSync()) {
160 userCallback_(tmpStatus);
161 } else {
162 RefObject::IncObjRef(this);
163 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
164 userCallback_(tmpStatus);
165 RefObject::DecObjRef(this);
166 });
167 if (errCode != E_OK) {
168 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
169 RefObject::DecObjRef(this);
170 }
171 }
172 }
173 if (onFinished_) {
174 LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
175 onFinished_(syncId_);
176 }
177 }
178
GetDevices() const179 const std::vector<std::string> &SyncOperation::GetDevices() const
180 {
181 return devices_;
182 }
183
WaitIfNeed()184 void SyncOperation::WaitIfNeed()
185 {
186 if (isBlockSync_ && (semaphore_ != nullptr)) {
187 LOGD("[SyncOperation] Wait.");
188 semaphore_->WaitSemaphore();
189 }
190 }
191
NotifyIfNeed()192 void SyncOperation::NotifyIfNeed()
193 {
194 if (isBlockSync_ && (semaphore_ != nullptr)) {
195 LOGD("[SyncOperation] Notify.");
196 semaphore_->SendSemaphore();
197 }
198 }
199
IsAutoSync() const200 bool SyncOperation::IsAutoSync() const
201 {
202 return isAutoSync_;
203 }
204
IsBlockSync() const205 bool SyncOperation::IsBlockSync() const
206 {
207 return isBlockSync_;
208 }
209
IsAutoControlCmd() const210 bool SyncOperation::IsAutoControlCmd() const
211 {
212 return isAutoSubscribe_;
213 }
214
CheckIsAllFinished() const215 bool SyncOperation::CheckIsAllFinished() const
216 {
217 AutoLock lockGuard(this);
218 for (const auto &iter : statuses_) {
219 if (iter.second < OP_FINISHED_ALL) {
220 return false;
221 }
222 }
223 return true;
224 }
225
Finalize()226 void SyncOperation::Finalize()
227 {
228 if ((syncId_ > 0) && onFinalize_) {
229 LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
230 onFinalize_();
231 }
232 }
233
SetQuery(const QuerySyncObject & query)234 void SyncOperation::SetQuery(const QuerySyncObject &query)
235 {
236 std::lock_guard<std::mutex> lock(queryMutex_);
237 query_ = query;
238 isQuerySync_ = true;
239 if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
240 mode_ += QUERY_SYNC_MODE_BASE;
241 }
242 }
243
GetQuery(QuerySyncObject & targetObject) const244 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
245 {
246 std::lock_guard<std::mutex> lock(queryMutex_);
247 targetObject = query_;
248 }
249
IsQuerySync() const250 bool SyncOperation::IsQuerySync() const
251 {
252 return isQuerySync_;
253 }
254
SetIdentifier(const std::vector<uint8_t> & identifier)255 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
256 {
257 identifier_.assign(identifier.begin(), identifier.end());
258 }
259
GetSyncType(int mode)260 SyncType SyncOperation::GetSyncType(int mode)
261 {
262 static const std::map<int, SyncType> syncTypeMap = {
263 {SyncModeType::PUSH, SyncType::MANUAL_FULL_SYNC_TYPE},
264 {SyncModeType::PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
265 {SyncModeType::PUSH_AND_PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
266 {SyncModeType::RESPONSE_PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
267 {SyncModeType::AUTO_PULL, SyncType::AUTO_SYNC_TYPE},
268 {SyncModeType::AUTO_PUSH, SyncType::AUTO_SYNC_TYPE},
269 {SyncModeType::QUERY_PUSH, SyncType::QUERY_SYNC_TYPE},
270 {SyncModeType::QUERY_PULL, SyncType::QUERY_SYNC_TYPE},
271 {SyncModeType::QUERY_PUSH_PULL, SyncType::QUERY_SYNC_TYPE},
272 };
273 auto iter = syncTypeMap.find(mode);
274 if (iter != syncTypeMap.end()) {
275 return iter->second;
276 }
277 return SyncType::INVALID_SYNC_TYPE;
278 }
279
TransferSyncMode(int mode)280 int SyncOperation::TransferSyncMode(int mode)
281 {
282 // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
283 // so for the three mode, it is no need to transferred.
284 if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
285 return (mode - QUERY_SYNC_MODE_BASE);
286 }
287 return mode;
288 }
289
GetQueryId() const290 std::string SyncOperation::GetQueryId() const
291 {
292 std::lock_guard<std::mutex> lock(queryMutex_);
293 return query_.GetIdentify();
294 }
295
DBStatusTransMap()296 const std::map<int, DBStatus> &SyncOperation::DBStatusTransMap()
297 {
298 static const std::map<int, DBStatus> transMap = {
299 { static_cast<int>(OP_FINISHED_ALL), OK },
300 { static_cast<int>(OP_TIMEOUT), TIME_OUT },
301 { static_cast<int>(OP_PERMISSION_CHECK_FAILED), PERMISSION_CHECK_FORBID_SYNC },
302 { static_cast<int>(OP_COMM_ABNORMAL), COMM_FAILURE },
303 { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
304 { static_cast<int>(OP_EKEYREVOKED_FAILURE), EKEYREVOKED_ERROR },
305 { static_cast<int>(OP_SCHEMA_INCOMPATIBLE), SCHEMA_MISMATCH },
306 { static_cast<int>(OP_BUSY_FAILURE), BUSY },
307 { static_cast<int>(OP_QUERY_FORMAT_FAILURE), INVALID_QUERY_FORMAT },
308 { static_cast<int>(OP_QUERY_FIELD_FAILURE), INVALID_QUERY_FIELD },
309 { static_cast<int>(OP_NOT_SUPPORT), NOT_SUPPORT },
310 { static_cast<int>(OP_INTERCEPT_DATA_FAIL), INTERCEPT_DATA_FAIL },
311 { static_cast<int>(OP_MAX_LIMITS), OVER_MAX_LIMITS },
312 { static_cast<int>(OP_SCHEMA_CHANGED), DISTRIBUTED_SCHEMA_CHANGED },
313 { static_cast<int>(OP_INVALID_ARGS), INVALID_ARGS },
314 { static_cast<int>(OP_USER_CHANGED), USER_CHANGED },
315 { static_cast<int>(OP_DENIED_SQL), NO_PERMISSION },
316 { static_cast<int>(OP_NOTADB_OR_CORRUPTED), INVALID_PASSWD_OR_CORRUPTED_DB },
317 };
318 return transMap;
319 }
320 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
321 } // namespace DistributedDB