1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_able_kvdb_connection.h"
17
18 #include "log_print.h"
19 #include "db_errno.h"
20 #include "db_constant.h"
21 #include "kvdb_pragma.h"
22 #include "performance_analysis.h"
23 #include "runtime_context.h"
24 #include "sync_able_kvdb.h"
25
26 namespace DistributedDB {
SyncAbleKvDBConnection(SyncAbleKvDB * kvDB)27 SyncAbleKvDBConnection::SyncAbleKvDBConnection(SyncAbleKvDB *kvDB)
28 : GenericKvDBConnection(kvDB),
29 remotePushFinishedListener_(nullptr)
30 {
31 OnKill([this]() {
32 auto *db = GetDB<SyncAbleKvDB>();
33 if (db == nullptr) {
34 return;
35 }
36 // Drop the lock before we call RemoveSyncOperation().
37 UnlockObj();
38 db->StopSync(GetConnectionId());
39 LockObj();
40 });
41 }
42
~SyncAbleKvDBConnection()43 SyncAbleKvDBConnection::~SyncAbleKvDBConnection()
44 {
45 if (remotePushFinishedListener_ != nullptr) {
46 remotePushFinishedListener_->Drop(true);
47 }
48 remotePushFinishedListener_ = nullptr;
49 }
50
InitPragmaFunc()51 void SyncAbleKvDBConnection::InitPragmaFunc()
52 {
53 if (!pragmaFunc_.empty()) {
54 return;
55 }
56 pragmaFunc_ = {
57 {PRAGMA_SYNC_DEVICES, [this](void *parameter, int &errCode) {
58 errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
59 {PRAGMA_CANCEL_SYNC_DEVICES, [this](void *parameter, int &errCode) {
60 errCode = CancelDeviceSync(*(static_cast<uint32_t *>(parameter))); }},
61 {PRAGMA_AUTO_SYNC, [this](void *parameter, int &errCode) {
62 errCode = EnableAutoSync(*(static_cast<bool *>(parameter))); }},
63 {PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT, [](void *parameter, int &errCode) {
64 *(static_cast<std::string *>(parameter)) = PerformanceAnalysis::GetInstance()->GetStatistics(); }},
65 {PRAGMA_PERFORMANCE_ANALYSIS_OPEN, [](void *parameter, int &errCode) {
66 PerformanceAnalysis::GetInstance()->OpenPerformanceAnalysis(); }},
67 {PRAGMA_PERFORMANCE_ANALYSIS_CLOSE, [](void *parameter, int &errCode) {
68 PerformanceAnalysis::GetInstance()->ClosePerformanceAnalysis(); }},
69 {PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME, [](void *parameter, int &errCode) {
70 PerformanceAnalysis::GetInstance()->SetFileName(*(static_cast<std::string *>(parameter))); }},
71 {PRAGMA_GET_QUEUED_SYNC_SIZE, [this](void *parameter, int &errCode) {
72 errCode = GetQueuedSyncSize(static_cast<int *>(parameter)); }},
73 {PRAGMA_SET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
74 errCode = SetQueuedSyncLimit(static_cast<int *>(parameter)); }},
75 {PRAGMA_GET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
76 errCode = GetQueuedSyncLimit(static_cast<int *>(parameter)); }},
77 {PRAGMA_SET_WIPE_POLICY, [this](void *parameter, int &errCode) {
78 errCode = SetStaleDataWipePolicy(static_cast<WipePolicy *>(parameter)); }},
79 {PRAGMA_REMOTE_PUSH_FINISHED_NOTIFY, [this](void *parameter, int &errCode) {
80 errCode = SetRemotePushFinishedNotify(static_cast<PragmaRemotePushNotify *>(parameter)); }},
81 {PRAGMA_SET_SYNC_RETRY, [this](void *parameter, int &errCode) {
82 errCode = SetSyncRetry(*(static_cast<bool *>(parameter))); }},
83 {PRAGMA_ADD_EQUAL_IDENTIFIER, [this](void *parameter, int &errCode) {
84 errCode = SetEqualIdentifier(static_cast<PragmaSetEqualIdentifier *>(parameter)); }},
85 {PRAGMA_INTERCEPT_SYNC_DATA, [this](void *parameter, int &errCode) {
86 errCode = SetPushDataInterceptor(*static_cast<PushDataInterceptor *>(parameter)); }},
87 {PRAGMA_SUBSCRIBE_QUERY, [this](void *parameter, int &errCode) {
88 errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
89 };
90 }
91
Pragma(int cmd,void * parameter)92 int SyncAbleKvDBConnection::Pragma(int cmd, void *parameter)
93 {
94 int errCode = PragmaParamCheck(cmd, parameter);
95 if (errCode != E_OK) {
96 return -E_INVALID_ARGS;
97 }
98
99 InitPragmaFunc();
100 auto iter = pragmaFunc_.find(cmd);
101 if (iter != pragmaFunc_.end()) {
102 iter->second(parameter, errCode);
103 return errCode;
104 }
105
106 // Call Pragma() of super class.
107 return GenericKvDBConnection::Pragma(cmd, parameter);
108 }
109
PragmaParamCheck(int cmd,const void * parameter)110 int SyncAbleKvDBConnection::PragmaParamCheck(int cmd, const void *parameter)
111 {
112 switch (cmd) {
113 case PRAGMA_AUTO_SYNC:
114 case PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT:
115 case PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME:
116 if (parameter == nullptr) {
117 return -E_INVALID_ARGS;
118 }
119 return E_OK;
120 default:
121 return E_OK;
122 }
123 }
124
PragmaSyncAction(const PragmaSync * syncParameter)125 int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter)
126 {
127 if (syncParameter == nullptr) {
128 return -E_INVALID_ARGS;
129 }
130 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
131 if (kvDB == nullptr) {
132 return -E_INVALID_CONNECTION;
133 }
134
135 if (isExclusive_.load()) {
136 return -E_BUSY;
137 }
138 {
139 AutoLock lockGuard(this);
140 if (IsKilled()) {
141 // If this happens, users are using a closed connection.
142 LOGE("Pragma sync on a closed connection.");
143 return -E_STALE;
144 }
145 IncObjRef(this);
146 }
147
148 ISyncer::SyncParma syncParam;
149 syncParam.devices = syncParameter->devices_;
150 syncParam.mode = syncParameter->mode_;
151 syncParam.wait = syncParameter->wait_;
152 syncParam.isQuerySync = syncParameter->isQuerySync_;
153 syncParam.syncQuery = syncParameter->query_;
154 syncParam.onFinalize = [this]() { DecObjRef(this); };
155 if (syncParameter->onComplete_) {
156 syncParam.onComplete = [this, onComplete = syncParameter->onComplete_, wait = syncParameter->wait_](
157 const std::map<std::string, int> &statuses
158 ) {
159 OnSyncComplete(statuses, onComplete, wait);
160 };
161 }
162 if (syncParameter->onSyncProcess_) {
163 syncParam.onSyncProcess = [this, onSyncProcess = syncParameter->onSyncProcess_](
164 const std::map<std::string, DeviceSyncProcess> &syncRecordMap
165 ) {
166 OnDeviceSyncProcess(syncRecordMap, onSyncProcess);
167 };
168 }
169
170 int errCode = kvDB->Sync(syncParam, GetConnectionId());
171 if (errCode != E_OK) {
172 DecObjRef(this);
173 }
174 return errCode;
175 }
176
CancelDeviceSync(uint32_t syncId)177 int SyncAbleKvDBConnection::CancelDeviceSync(uint32_t syncId)
178 {
179 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
180 if (kvDB == nullptr) {
181 return -E_INVALID_CONNECTION;
182 }
183
184 if (isExclusive_.load()) {
185 return -E_BUSY;
186 }
187 {
188 AutoLock lockGuard(this);
189 if (IsKilled()) {
190 // if this happens, users are using a closed connection.
191 LOGE("CancelDeviceSync on a closed connection.");
192 return -E_STALE;
193 }
194 IncObjRef(this);
195 }
196
197 int errCode = kvDB->CancelSync(syncId);
198 DecObjRef(this);
199 return errCode;
200 }
201
EnableAutoSync(bool enable)202 int SyncAbleKvDBConnection::EnableAutoSync(bool enable)
203 {
204 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
205 if (kvDB == nullptr) {
206 return -E_INVALID_CONNECTION;
207 }
208 kvDB->EnableAutoSync(enable);
209 return E_OK;
210 }
211
OnSyncComplete(const std::map<std::string,int> & statuses,const std::function<void (const std::map<std::string,int> & devicesMap)> & onComplete,bool wait)212 void SyncAbleKvDBConnection::OnSyncComplete(const std::map<std::string, int> &statuses,
213 const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete, bool wait)
214 {
215 AutoLock lockGuard(this);
216 if (!IsKilled() && onComplete) {
217 // Drop the lock before invoking the callback.
218 // Do pragma-sync again in the prev sync callback is supported.
219 UnlockObj();
220 // The connection may be closed after UnlockObj().
221 // RACE: 'KillObj()' against 'onComplete()'.
222 if (!IsKilled()) {
223 onComplete(statuses);
224 }
225 LockObj();
226 }
227 }
228
OnDeviceSyncProcess(const std::map<std::string,DeviceSyncProcess> & syncRecordMap,const DeviceSyncProcessCallback & onProcess)229 void SyncAbleKvDBConnection::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &syncRecordMap,
230 const DeviceSyncProcessCallback &onProcess)
231 {
232 AutoLock lockGuard(this);
233 if (!IsKilled() && onProcess) {
234 // Drop the lock before invoking the callback.
235 // Do pragma-sync again in the prev sync callback is supported.
236 UnlockObj();
237 // The connection may be closed after UnlockObj().
238 // RACE: 'KillObj()' against 'onComplete()'.
239 if (!IsKilled()) {
240 onProcess(syncRecordMap);
241 }
242 LockObj();
243 }
244 }
245
GetQueuedSyncSize(int * queuedSyncSize) const246 int SyncAbleKvDBConnection::GetQueuedSyncSize(int *queuedSyncSize) const
247 {
248 if (queuedSyncSize == nullptr) {
249 return -E_INVALID_ARGS;
250 }
251 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
252 if (kvDB == nullptr) {
253 return -E_INVALID_CONNECTION;
254 }
255 return kvDB->GetQueuedSyncSize(queuedSyncSize);
256 }
257
SetQueuedSyncLimit(const int * queuedSyncLimit)258 int SyncAbleKvDBConnection::SetQueuedSyncLimit(const int *queuedSyncLimit)
259 {
260 if (queuedSyncLimit == nullptr) {
261 return -E_INVALID_ARGS;
262 }
263 if ((*queuedSyncLimit > DBConstant::QUEUED_SYNC_LIMIT_MAX) ||
264 (*queuedSyncLimit < DBConstant::QUEUED_SYNC_LIMIT_MIN)) {
265 return -E_INVALID_ARGS;
266 }
267 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
268 if (kvDB == nullptr) {
269 return -E_INVALID_CONNECTION;
270 }
271 return kvDB->SetQueuedSyncLimit(queuedSyncLimit);
272 }
273
GetQueuedSyncLimit(int * queuedSyncLimit) const274 int SyncAbleKvDBConnection::GetQueuedSyncLimit(int *queuedSyncLimit) const
275 {
276 if (queuedSyncLimit == nullptr) {
277 return -E_INVALID_ARGS;
278 }
279 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
280 if (kvDB == nullptr) {
281 return -E_INVALID_CONNECTION;
282 }
283 return kvDB->GetQueuedSyncLimit(queuedSyncLimit);
284 }
285
DisableManualSync(void)286 int SyncAbleKvDBConnection::DisableManualSync(void)
287 {
288 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
289 if (kvDB == nullptr) {
290 return -E_INVALID_CONNECTION;
291 }
292 return kvDB->DisableManualSync();
293 }
294
EnableManualSync(void)295 int SyncAbleKvDBConnection::EnableManualSync(void)
296 {
297 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
298 if (kvDB == nullptr) {
299 return -E_INVALID_CONNECTION;
300 }
301 return kvDB->EnableManualSync();
302 }
303
SetStaleDataWipePolicy(const WipePolicy * policy)304 int SyncAbleKvDBConnection::SetStaleDataWipePolicy(const WipePolicy *policy)
305 {
306 if (policy == nullptr) {
307 return -E_INVALID_ARGS;
308 }
309 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
310 if (kvDB == nullptr) {
311 return -E_INVALID_CONNECTION;
312 }
313 return kvDB->SetStaleDataWipePolicy(*policy);
314 }
315
SetRemotePushFinishedNotify(PragmaRemotePushNotify * notifyParma)316 int SyncAbleKvDBConnection::SetRemotePushFinishedNotify(PragmaRemotePushNotify *notifyParma)
317 {
318 if (notifyParma == nullptr) {
319 return -E_INVALID_ARGS;
320 }
321
322 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
323 if (kvDB == nullptr) {
324 return -E_INVALID_CONNECTION;
325 }
326
327 int errCode = E_OK;
328 NotificationChain::Listener *tmpListener = nullptr;
329 if (notifyParma->notifier_ != nullptr) {
330 tmpListener = kvDB->AddRemotePushFinishedNotify(notifyParma->notifier_, errCode);
331 if (tmpListener == nullptr) {
332 return errCode;
333 }
334 }
335
336 std::lock_guard<std::mutex> lock(remotePushFinishedListenerLock_);
337 // Drop old listener and set the new listener
338 if (remotePushFinishedListener_ != nullptr) {
339 errCode = remotePushFinishedListener_->Drop();
340 if (errCode != E_OK) {
341 LOGE("[SyncAbleConnection] Drop Remote push finished listener failed %d", errCode);
342 if (tmpListener != nullptr) {
343 tmpListener->Drop();
344 }
345 return errCode;
346 }
347 }
348 remotePushFinishedListener_ = tmpListener;
349 return errCode;
350 }
351
SetSyncRetry(bool isRetry)352 int SyncAbleKvDBConnection::SetSyncRetry(bool isRetry)
353 {
354 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
355 if (kvDB == nullptr) {
356 return -E_INVALID_CONNECTION;
357 }
358 return kvDB->SetSyncRetry(isRetry);
359 }
360
SetEqualIdentifier(const PragmaSetEqualIdentifier * param)361 int SyncAbleKvDBConnection::SetEqualIdentifier(const PragmaSetEqualIdentifier *param)
362 {
363 if (param == nullptr) {
364 return -E_INVALID_ARGS;
365 }
366
367 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
368 if (kvDB == nullptr) {
369 return -E_INVALID_CONNECTION;
370 }
371 return kvDB->SetEqualIdentifier(param->identifier_, param->targets_);
372 }
373
SetPushDataInterceptor(const PushDataInterceptor & interceptor)374 int SyncAbleKvDBConnection::SetPushDataInterceptor(const PushDataInterceptor &interceptor)
375 {
376 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
377 if (kvDB == nullptr) {
378 return -E_INVALID_CONNECTION;
379 }
380 kvDB->SetSendDataInterceptor(interceptor);
381 return E_OK;
382 }
383
GetSyncDataSize(const std::string & device,size_t & size) const384 int SyncAbleKvDBConnection::GetSyncDataSize(const std::string &device, size_t &size) const
385 {
386 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
387 if (kvDB == nullptr) {
388 return -E_INVALID_CONNECTION;
389 }
390 return kvDB->GetSyncDataSize(device, size);
391 }
392
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)393 int SyncAbleKvDBConnection::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
394 {
395 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
396 if (kvDB == nullptr) {
397 return -E_INVALID_CONNECTION;
398 }
399 return kvDB->GetWatermarkInfo(device, info);
400 }
401
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)402 int SyncAbleKvDBConnection::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
403 {
404 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
405 if (kvDB == nullptr) {
406 return -E_INVALID_CONNECTION;
407 }
408
409 int securityLabel = INVALID_SEC_LABEL;
410 int securityFlag = INVALID_SEC_FLAG;
411 GetSecurityOption(securityLabel, securityFlag);
412 if (securityLabel == S4) {
413 LOGE("The current data does not support synchronization.");
414 return -E_SECURITY_OPTION_CHECK_ERROR;
415 }
416 return kvDB->Sync(option, onProcess);
417 }
418
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)419 int SyncAbleKvDBConnection::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
420 {
421 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
422 if (kvDB == nullptr) {
423 return -E_INVALID_CONNECTION;
424 }
425 return kvDB->SetCloudDB(cloudDBs);
426 }
427
GetTaskCount()428 int32_t SyncAbleKvDBConnection::GetTaskCount()
429 {
430 SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
431 if (kvDB == nullptr) {
432 LOGW("[SyncAbleKvDBConnection] Get task count with null db");
433 return -1;
434 }
435 return kvDB->GetTaskCount();
436 }
437
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)438 void SyncAbleKvDBConnection::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
439 {
440 auto *kvDB = GetDB<SyncAbleKvDB>();
441 if (kvDB == nullptr) {
442 LOGW("[SyncAbleKvDBConnection] Set generate cloud version callback with null db");
443 return;
444 }
445 kvDB->SetGenCloudVersionCallback(callback);
446 }
447
SetReceiveDataInterceptor(const DataInterceptor & interceptor)448 int SyncAbleKvDBConnection::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
449 {
450 auto kvDB = GetDB<SyncAbleKvDB>();
451 if (kvDB == nullptr) {
452 return -E_INVALID_CONNECTION;
453 }
454 kvDB->SetReceiveDataInterceptor(interceptor);
455 return E_OK;
456 }
457 }
458