• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_operation.h"
17 #include "db_errno.h"
18 #include "log_print.h"
19 #include "performance_analysis.h"
20 
21 namespace DistributedDB {
SyncOperation(uint32_t syncId,const std::vector<std::string> & devices,int mode,const UserCallback & userCallback,bool isBlockSync)22 SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
23     int mode, const UserCallback &userCallback, bool isBlockSync)
24     : devices_(devices),
25       syncId_(syncId),
26       mode_(mode),
27       userCallback_(userCallback),
28       isBlockSync_(isBlockSync),
29       isAutoSync_(false),
30       isFinished_(false),
31       semaphore_(nullptr),
32       query_(QuerySyncObject()),
33       isQuerySync_(false),
34       isAutoSubscribe_(false)
35 {
36 }
37 
~SyncOperation()38 SyncOperation::~SyncOperation()
39 {
40     LOGD("SyncOperation::~SyncOperation()");
41     Finalize();
42 }
43 
Initialize()44 int SyncOperation::Initialize()
45 {
46     LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
47     AutoLock lockGuard(this);
48     for (const std::string &deviceId : devices_) {
49         statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
50     }
51 
52     if (mode_ == AUTO_PUSH) {
53         mode_ = PUSH;
54         isAutoSync_ = true;
55     } else if (mode_ == AUTO_PULL) {
56         mode_ = PULL;
57         isAutoSync_ = true;
58     } else if (mode_ == AUTO_SUBSCRIBE_QUERY) {
59         mode_ = SUBSCRIBE_QUERY;
60         isAutoSubscribe_ = true;
61     }
62     if (isBlockSync_) {
63         semaphore_ = std::make_unique<SemaphoreUtils>(0);
64     }
65 
66     return E_OK;
67 }
68 
SetOnSyncFinalize(const OnSyncFinalize & callback)69 void SyncOperation::SetOnSyncFinalize(const OnSyncFinalize &callback)
70 {
71     onFinalize_ = callback;
72 }
73 
SetOnSyncFinished(const OnSyncFinished & callback)74 void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
75 {
76     onFinished_ = callback;
77 }
78 
SetStatus(const std::string & deviceId,int status)79 void SyncOperation::SetStatus(const std::string &deviceId, int status)
80 {
81     LOGD("[SyncOperation] SetStatus dev %s{private} status %d", deviceId.c_str(), status);
82     AutoLock lockGuard(this);
83     if (IsKilled()) {
84         LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
85         return;
86     }
87     if (isFinished_) {
88         LOGI("[SyncOperation] SetStatus already finished");
89         return;
90     }
91 
92     auto iter = statuses_.find(deviceId);
93     if (iter != statuses_.end()) {
94         if (iter->second >= OP_FINISHED_ALL) {
95             return;
96         }
97         iter->second = status;
98         return;
99     }
100 }
101 
SetUnfinishedDevStatus(int status)102 void SyncOperation::SetUnfinishedDevStatus(int status)
103 {
104     LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
105     AutoLock lockGuard(this);
106     if (IsKilled()) {
107         LOGE("[SyncOperation] SetUnfinishedDevStatus failed, the SyncOperation has been killed!");
108         return;
109     }
110     if (isFinished_) {
111         LOGI("[SyncOperation] SetUnfinishedDevStatus already finished");
112         return;
113     }
114     for (auto &item : statuses_) {
115         if (item.second >= OP_FINISHED_ALL) {
116             continue;
117         }
118         item.second = status;
119     }
120 }
121 
GetStatus(const std::string & deviceId) const122 int SyncOperation::GetStatus(const std::string &deviceId) const
123 {
124     AutoLock lockGuard(this);
125     auto iter = statuses_.find(deviceId);
126     if (iter != statuses_.end()) {
127         return iter->second;
128     }
129     return -E_INVALID_ARGS;
130 }
131 
GetSyncId() const132 uint32_t SyncOperation::GetSyncId() const
133 {
134     return syncId_;
135 }
136 
GetMode() const137 int SyncOperation::GetMode() const
138 {
139     return mode_;
140 }
141 
Finished()142 void SyncOperation::Finished()
143 {
144     std::map<std::string, int> tmpStatus;
145     {
146         AutoLock lockGuard(this);
147         if (IsKilled() || isFinished_) {
148             return;
149         }
150         isFinished_ = true;
151         tmpStatus = statuses_;
152     }
153     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
154     if (performance != nullptr) {
155         performance->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_ACK_RECV_TO_USER_CALL_BACK);
156     }
157     if (userCallback_) {
158         LOGI("[SyncOperation] Sync %d finished call onComplete.", syncId_);
159         if (IsBlockSync()) {
160             userCallback_(tmpStatus);
161         } else {
162             RefObject::IncObjRef(this);
163             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, tmpStatus] {
164                 userCallback_(tmpStatus);
165                 RefObject::DecObjRef(this);
166             });
167             if (errCode != E_OK) {
168                 LOGE("[Finished] SyncOperation Finished userCallback_ retCode:%d", errCode);
169                 RefObject::DecObjRef(this);
170             }
171         }
172     }
173     if (onFinished_) {
174         LOGD("[SyncOperation] Sync %d finished call onFinished.", syncId_);
175         onFinished_(syncId_);
176     }
177 }
178 
GetDevices() const179 const std::vector<std::string> &SyncOperation::GetDevices() const
180 {
181     return devices_;
182 }
183 
WaitIfNeed()184 void SyncOperation::WaitIfNeed()
185 {
186     if (isBlockSync_ && (semaphore_ != nullptr)) {
187         LOGD("[SyncOperation] Wait.");
188         semaphore_->WaitSemaphore();
189     }
190 }
191 
NotifyIfNeed()192 void SyncOperation::NotifyIfNeed()
193 {
194     if (isBlockSync_ && (semaphore_ != nullptr)) {
195         LOGD("[SyncOperation] Notify.");
196         semaphore_->SendSemaphore();
197     }
198 }
199 
IsAutoSync() const200 bool SyncOperation::IsAutoSync() const
201 {
202     return isAutoSync_;
203 }
204 
IsBlockSync() const205 bool SyncOperation::IsBlockSync() const
206 {
207     return isBlockSync_;
208 }
209 
IsAutoControlCmd() const210 bool SyncOperation::IsAutoControlCmd() const
211 {
212     return isAutoSubscribe_;
213 }
214 
CheckIsAllFinished() const215 bool SyncOperation::CheckIsAllFinished() const
216 {
217     AutoLock lockGuard(this);
218     for (const auto &iter : statuses_) {
219         if (iter.second < OP_FINISHED_ALL) {
220             return false;
221         }
222     }
223     return true;
224 }
225 
Finalize()226 void SyncOperation::Finalize()
227 {
228     if ((syncId_ > 0) && onFinalize_) {
229         LOGD("[SyncOperation] Callback SyncOperation onFinalize.");
230         onFinalize_();
231     }
232 }
233 
SetQuery(const QuerySyncObject & query)234 void SyncOperation::SetQuery(const QuerySyncObject &query)
235 {
236     std::lock_guard<std::mutex> lock(queryMutex_);
237     query_ = query;
238     isQuerySync_ = true;
239     if (mode_ != SyncModeType::SUBSCRIBE_QUERY && mode_ != SyncModeType::UNSUBSCRIBE_QUERY) {
240         mode_ += QUERY_SYNC_MODE_BASE;
241     }
242 }
243 
GetQuery(QuerySyncObject & targetObject) const244 void SyncOperation::GetQuery(QuerySyncObject &targetObject) const
245 {
246     std::lock_guard<std::mutex> lock(queryMutex_);
247     targetObject = query_;
248 }
249 
IsQuerySync() const250 bool SyncOperation::IsQuerySync() const
251 {
252     return isQuerySync_;
253 }
254 
SetIdentifier(const std::vector<uint8_t> & identifier)255 void SyncOperation::SetIdentifier(const std::vector<uint8_t> &identifier)
256 {
257     identifier_.assign(identifier.begin(), identifier.end());
258 }
259 
GetSyncType(int mode)260 SyncType SyncOperation::GetSyncType(int mode)
261 {
262     static const std::map<int, SyncType> syncTypeMap = {
263         {SyncModeType::PUSH, SyncType::MANUAL_FULL_SYNC_TYPE},
264         {SyncModeType::PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
265         {SyncModeType::PUSH_AND_PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
266         {SyncModeType::RESPONSE_PULL, SyncType::MANUAL_FULL_SYNC_TYPE},
267         {SyncModeType::AUTO_PULL, SyncType::AUTO_SYNC_TYPE},
268         {SyncModeType::AUTO_PUSH, SyncType::AUTO_SYNC_TYPE},
269         {SyncModeType::QUERY_PUSH, SyncType::QUERY_SYNC_TYPE},
270         {SyncModeType::QUERY_PULL, SyncType::QUERY_SYNC_TYPE},
271         {SyncModeType::QUERY_PUSH_PULL, SyncType::QUERY_SYNC_TYPE},
272     };
273     auto iter = syncTypeMap.find(mode);
274     if (iter != syncTypeMap.end()) {
275         return iter->second;
276     }
277     return SyncType::INVALID_SYNC_TYPE;
278 }
279 
TransferSyncMode(int mode)280 int SyncOperation::TransferSyncMode(int mode)
281 {
282     // AUTO_PUSH and AUTO_PULL mode is used before sync, RESPONSE_PULL is regarded as push or query push mode.
283     // so for the three mode, it is no need to transferred.
284     if (mode >= SyncModeType::QUERY_PUSH && mode <= SyncModeType::QUERY_PUSH_PULL) {
285         return (mode - QUERY_SYNC_MODE_BASE);
286     }
287     return mode;
288 }
289 
GetQueryId() const290 std::string SyncOperation::GetQueryId() const
291 {
292     std::lock_guard<std::mutex> lock(queryMutex_);
293     return query_.GetIdentify();
294 }
295 
DBStatusTransMap()296 const std::map<int, DBStatus> &SyncOperation::DBStatusTransMap()
297 {
298     static const std::map<int, DBStatus> transMap = {
299         { static_cast<int>(OP_FINISHED_ALL),                  OK },
300         { static_cast<int>(OP_TIMEOUT),                       TIME_OUT },
301         { static_cast<int>(OP_PERMISSION_CHECK_FAILED),       PERMISSION_CHECK_FORBID_SYNC },
302         { static_cast<int>(OP_COMM_ABNORMAL),                 COMM_FAILURE },
303         { static_cast<int>(OP_SECURITY_OPTION_CHECK_FAILURE), SECURITY_OPTION_CHECK_ERROR },
304         { static_cast<int>(OP_EKEYREVOKED_FAILURE),           EKEYREVOKED_ERROR },
305         { static_cast<int>(OP_SCHEMA_INCOMPATIBLE),           SCHEMA_MISMATCH },
306         { static_cast<int>(OP_BUSY_FAILURE),                  BUSY },
307         { static_cast<int>(OP_QUERY_FORMAT_FAILURE),          INVALID_QUERY_FORMAT },
308         { static_cast<int>(OP_QUERY_FIELD_FAILURE),           INVALID_QUERY_FIELD },
309         { static_cast<int>(OP_NOT_SUPPORT),                   NOT_SUPPORT },
310         { static_cast<int>(OP_INTERCEPT_DATA_FAIL),           INTERCEPT_DATA_FAIL },
311         { static_cast<int>(OP_MAX_LIMITS),                    OVER_MAX_LIMITS },
312         { static_cast<int>(OP_SCHEMA_CHANGED),                DISTRIBUTED_SCHEMA_CHANGED },
313         { static_cast<int>(OP_INVALID_ARGS),                  INVALID_ARGS },
314         { static_cast<int>(OP_USER_CHANGED),                  USER_CHANGED },
315         { static_cast<int>(OP_DENIED_SQL),                    NO_PERMISSION },
316         { static_cast<int>(OP_NOTADB_OR_CORRUPTED),           INVALID_PASSWD_OR_CORRUPTED_DB },
317     };
318     return transMap;
319 }
320 DEFINE_OBJECT_TAG_FACILITIES(SyncOperation)
321 } // namespace DistributedDB