• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync_able_kvdb_connection.h"
17 
18 #include "log_print.h"
19 #include "db_errno.h"
20 #include "db_constant.h"
21 #include "kvdb_pragma.h"
22 #include "performance_analysis.h"
23 #include "runtime_context.h"
24 #include "sync_able_kvdb.h"
25 
26 namespace DistributedDB {
SyncAbleKvDBConnection(SyncAbleKvDB * kvDB)27 SyncAbleKvDBConnection::SyncAbleKvDBConnection(SyncAbleKvDB *kvDB)
28     : GenericKvDBConnection(kvDB),
29       remotePushFinishedListener_(nullptr)
30 {
31     OnKill([this]() {
32         auto *db = GetDB<SyncAbleKvDB>();
33         if (db == nullptr) {
34             return;
35         }
36         // Drop the lock before we call RemoveSyncOperation().
37         UnlockObj();
38         db->StopSync(GetConnectionId());
39         LockObj();
40     });
41 }
42 
~SyncAbleKvDBConnection()43 SyncAbleKvDBConnection::~SyncAbleKvDBConnection()
44 {
45     if (remotePushFinishedListener_ != nullptr) {
46         remotePushFinishedListener_->Drop(true);
47     }
48     remotePushFinishedListener_ = nullptr;
49 }
50 
InitPragmaFunc()51 void SyncAbleKvDBConnection::InitPragmaFunc()
52 {
53     if (!pragmaFunc_.empty()) {
54         return;
55     }
56     pragmaFunc_ = {
57         {PRAGMA_SYNC_DEVICES, [this](void *parameter, int &errCode) {
58             errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
59         {PRAGMA_CANCEL_SYNC_DEVICES, [this](void *parameter, int &errCode) {
60             errCode = CancelDeviceSync(*(static_cast<uint32_t *>(parameter))); }},
61         {PRAGMA_AUTO_SYNC, [this](void *parameter, int &errCode) {
62             errCode = EnableAutoSync(*(static_cast<bool *>(parameter))); }},
63         {PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT, [](void *parameter, int &errCode) {
64             *(static_cast<std::string *>(parameter)) = PerformanceAnalysis::GetInstance()->GetStatistics(); }},
65         {PRAGMA_PERFORMANCE_ANALYSIS_OPEN, [](void *parameter, int &errCode) {
66             PerformanceAnalysis::GetInstance()->OpenPerformanceAnalysis(); }},
67         {PRAGMA_PERFORMANCE_ANALYSIS_CLOSE, [](void *parameter, int &errCode) {
68             PerformanceAnalysis::GetInstance()->ClosePerformanceAnalysis(); }},
69         {PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME,  [](void *parameter, int &errCode) {
70             PerformanceAnalysis::GetInstance()->SetFileName(*(static_cast<std::string *>(parameter))); }},
71         {PRAGMA_GET_QUEUED_SYNC_SIZE, [this](void *parameter, int &errCode) {
72             errCode = GetQueuedSyncSize(static_cast<int *>(parameter)); }},
73         {PRAGMA_SET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
74             errCode = SetQueuedSyncLimit(static_cast<int *>(parameter)); }},
75         {PRAGMA_GET_QUEUED_SYNC_LIMIT, [this](void *parameter, int &errCode) {
76             errCode = GetQueuedSyncLimit(static_cast<int *>(parameter)); }},
77         {PRAGMA_SET_WIPE_POLICY, [this](void *parameter, int &errCode) {
78             errCode = SetStaleDataWipePolicy(static_cast<WipePolicy *>(parameter)); }},
79         {PRAGMA_REMOTE_PUSH_FINISHED_NOTIFY, [this](void *parameter, int &errCode) {
80             errCode = SetRemotePushFinishedNotify(static_cast<PragmaRemotePushNotify *>(parameter)); }},
81         {PRAGMA_SET_SYNC_RETRY, [this](void *parameter, int &errCode) {
82             errCode = SetSyncRetry(*(static_cast<bool *>(parameter))); }},
83         {PRAGMA_ADD_EQUAL_IDENTIFIER, [this](void *parameter, int &errCode) {
84             errCode = SetEqualIdentifier(static_cast<PragmaSetEqualIdentifier *>(parameter)); }},
85         {PRAGMA_INTERCEPT_SYNC_DATA, [this](void *parameter, int &errCode) {
86             errCode = SetPushDataInterceptor(*static_cast<PushDataInterceptor *>(parameter)); }},
87         {PRAGMA_SUBSCRIBE_QUERY, [this](void *parameter, int &errCode) {
88             errCode = PragmaSyncAction(static_cast<PragmaSync *>(parameter)); }},
89     };
90 }
91 
Pragma(int cmd,void * parameter)92 int SyncAbleKvDBConnection::Pragma(int cmd, void *parameter)
93 {
94     int errCode = PragmaParamCheck(cmd, parameter);
95     if (errCode != E_OK) {
96         return -E_INVALID_ARGS;
97     }
98 
99     InitPragmaFunc();
100     auto iter = pragmaFunc_.find(cmd);
101     if (iter != pragmaFunc_.end()) {
102         iter->second(parameter, errCode);
103         return errCode;
104     }
105 
106     // Call Pragma() of super class.
107     return GenericKvDBConnection::Pragma(cmd, parameter);
108 }
109 
PragmaParamCheck(int cmd,const void * parameter)110 int SyncAbleKvDBConnection::PragmaParamCheck(int cmd, const void *parameter)
111 {
112     switch (cmd) {
113         case PRAGMA_AUTO_SYNC:
114         case PRAGMA_PERFORMANCE_ANALYSIS_GET_REPORT:
115         case PRAGMA_PERFORMANCE_ANALYSIS_SET_REPORTFILENAME:
116             if (parameter == nullptr) {
117                 return -E_INVALID_ARGS;
118             }
119             return E_OK;
120         default:
121             return E_OK;
122     }
123 }
124 
PragmaSyncAction(const PragmaSync * syncParameter)125 int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter)
126 {
127     if (syncParameter == nullptr) {
128         return -E_INVALID_ARGS;
129     }
130     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
131     if (kvDB == nullptr) {
132         return -E_INVALID_CONNECTION;
133     }
134 
135     if (isExclusive_.load()) {
136         return -E_BUSY;
137     }
138     {
139         AutoLock lockGuard(this);
140         if (IsKilled()) {
141             // If this happens, users are using a closed connection.
142             LOGE("Pragma sync on a closed connection.");
143             return -E_STALE;
144         }
145         IncObjRef(this);
146     }
147 
148     ISyncer::SyncParma syncParam;
149     syncParam.devices = syncParameter->devices_;
150     syncParam.mode = syncParameter->mode_;
151     syncParam.wait = syncParameter->wait_;
152     syncParam.isQuerySync = syncParameter->isQuerySync_;
153     syncParam.syncQuery = syncParameter->query_;
154     syncParam.onFinalize =  [this]() { DecObjRef(this); };
155     if (syncParameter->onComplete_) {
156         syncParam.onComplete = [this, onComplete = syncParameter->onComplete_, wait = syncParameter->wait_](
157             const std::map<std::string, int> &statuses
158         ) {
159             OnSyncComplete(statuses, onComplete, wait);
160         };
161     }
162     if (syncParameter->onSyncProcess_) {
163         syncParam.onSyncProcess = [this, onSyncProcess = syncParameter->onSyncProcess_](
164             const std::map<std::string, DeviceSyncProcess> &syncRecordMap
165         ) {
166             OnDeviceSyncProcess(syncRecordMap, onSyncProcess);
167         };
168     }
169 
170     int errCode = kvDB->Sync(syncParam, GetConnectionId());
171     if (errCode != E_OK) {
172         DecObjRef(this);
173     }
174     return errCode;
175 }
176 
CancelDeviceSync(uint32_t syncId)177 int SyncAbleKvDBConnection::CancelDeviceSync(uint32_t syncId)
178 {
179     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
180     if (kvDB == nullptr) {
181         return -E_INVALID_CONNECTION;
182     }
183 
184     if (isExclusive_.load()) {
185         return -E_BUSY;
186     }
187     {
188         AutoLock lockGuard(this);
189         if (IsKilled()) {
190             // if this happens, users are using a closed connection.
191             LOGE("CancelDeviceSync on a closed connection.");
192             return -E_STALE;
193         }
194         IncObjRef(this);
195     }
196 
197     int errCode = kvDB->CancelSync(syncId);
198     DecObjRef(this);
199     return errCode;
200 }
201 
EnableAutoSync(bool enable)202 int SyncAbleKvDBConnection::EnableAutoSync(bool enable)
203 {
204     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
205     if (kvDB == nullptr) {
206         return -E_INVALID_CONNECTION;
207     }
208     kvDB->EnableAutoSync(enable);
209     return E_OK;
210 }
211 
OnSyncComplete(const std::map<std::string,int> & statuses,const std::function<void (const std::map<std::string,int> & devicesMap)> & onComplete,bool wait)212 void SyncAbleKvDBConnection::OnSyncComplete(const std::map<std::string, int> &statuses,
213     const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete, bool wait)
214 {
215     AutoLock lockGuard(this);
216     if (!IsKilled() && onComplete) {
217         // Drop the lock before invoking the callback.
218         // Do pragma-sync again in the prev sync callback is supported.
219         UnlockObj();
220         // The connection may be closed after UnlockObj().
221         // RACE: 'KillObj()' against 'onComplete()'.
222         if (!IsKilled()) {
223             onComplete(statuses);
224         }
225         LockObj();
226     }
227 }
228 
OnDeviceSyncProcess(const std::map<std::string,DeviceSyncProcess> & syncRecordMap,const DeviceSyncProcessCallback & onProcess)229 void SyncAbleKvDBConnection::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &syncRecordMap,
230     const DeviceSyncProcessCallback &onProcess)
231 {
232     AutoLock lockGuard(this);
233     if (!IsKilled() && onProcess) {
234         // Drop the lock before invoking the callback.
235         // Do pragma-sync again in the prev sync callback is supported.
236         UnlockObj();
237         // The connection may be closed after UnlockObj().
238         // RACE: 'KillObj()' against 'onComplete()'.
239         if (!IsKilled()) {
240             onProcess(syncRecordMap);
241         }
242         LockObj();
243     }
244 }
245 
GetQueuedSyncSize(int * queuedSyncSize) const246 int SyncAbleKvDBConnection::GetQueuedSyncSize(int *queuedSyncSize) const
247 {
248     if (queuedSyncSize == nullptr) {
249         return -E_INVALID_ARGS;
250     }
251     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
252     if (kvDB == nullptr) {
253         return -E_INVALID_CONNECTION;
254     }
255     return kvDB->GetQueuedSyncSize(queuedSyncSize);
256 }
257 
SetQueuedSyncLimit(const int * queuedSyncLimit)258 int SyncAbleKvDBConnection::SetQueuedSyncLimit(const int *queuedSyncLimit)
259 {
260     if (queuedSyncLimit == nullptr) {
261         return -E_INVALID_ARGS;
262     }
263     if ((*queuedSyncLimit > DBConstant::QUEUED_SYNC_LIMIT_MAX) ||
264         (*queuedSyncLimit < DBConstant::QUEUED_SYNC_LIMIT_MIN)) {
265         return -E_INVALID_ARGS;
266     }
267     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
268     if (kvDB == nullptr) {
269         return -E_INVALID_CONNECTION;
270     }
271     return kvDB->SetQueuedSyncLimit(queuedSyncLimit);
272 }
273 
GetQueuedSyncLimit(int * queuedSyncLimit) const274 int SyncAbleKvDBConnection::GetQueuedSyncLimit(int *queuedSyncLimit) const
275 {
276     if (queuedSyncLimit == nullptr) {
277         return -E_INVALID_ARGS;
278     }
279     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
280     if (kvDB == nullptr) {
281         return -E_INVALID_CONNECTION;
282     }
283     return kvDB->GetQueuedSyncLimit(queuedSyncLimit);
284 }
285 
DisableManualSync(void)286 int SyncAbleKvDBConnection::DisableManualSync(void)
287 {
288     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
289     if (kvDB == nullptr) {
290         return -E_INVALID_CONNECTION;
291     }
292     return kvDB->DisableManualSync();
293 }
294 
EnableManualSync(void)295 int SyncAbleKvDBConnection::EnableManualSync(void)
296 {
297     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
298     if (kvDB == nullptr) {
299         return -E_INVALID_CONNECTION;
300     }
301     return kvDB->EnableManualSync();
302 }
303 
SetStaleDataWipePolicy(const WipePolicy * policy)304 int SyncAbleKvDBConnection::SetStaleDataWipePolicy(const WipePolicy *policy)
305 {
306     if (policy == nullptr) {
307         return -E_INVALID_ARGS;
308     }
309     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
310     if (kvDB == nullptr) {
311         return -E_INVALID_CONNECTION;
312     }
313     return kvDB->SetStaleDataWipePolicy(*policy);
314 }
315 
SetRemotePushFinishedNotify(PragmaRemotePushNotify * notifyParma)316 int SyncAbleKvDBConnection::SetRemotePushFinishedNotify(PragmaRemotePushNotify *notifyParma)
317 {
318     if (notifyParma == nullptr) {
319         return -E_INVALID_ARGS;
320     }
321 
322     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
323     if (kvDB == nullptr) {
324         return -E_INVALID_CONNECTION;
325     }
326 
327     int errCode = E_OK;
328     NotificationChain::Listener *tmpListener = nullptr;
329     if (notifyParma->notifier_ != nullptr) {
330         tmpListener = kvDB->AddRemotePushFinishedNotify(notifyParma->notifier_, errCode);
331         if (tmpListener == nullptr) {
332             return errCode;
333         }
334     }
335 
336     std::lock_guard<std::mutex> lock(remotePushFinishedListenerLock_);
337     // Drop old listener and set the new listener
338     if (remotePushFinishedListener_ != nullptr) {
339         errCode = remotePushFinishedListener_->Drop();
340         if (errCode != E_OK) {
341             LOGE("[SyncAbleConnection] Drop Remote push finished listener failed %d", errCode);
342             if (tmpListener != nullptr) {
343                 tmpListener->Drop();
344             }
345             return errCode;
346         }
347     }
348     remotePushFinishedListener_ = tmpListener;
349     return errCode;
350 }
351 
SetSyncRetry(bool isRetry)352 int SyncAbleKvDBConnection::SetSyncRetry(bool isRetry)
353 {
354     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
355     if (kvDB == nullptr) {
356         return -E_INVALID_CONNECTION;
357     }
358     return kvDB->SetSyncRetry(isRetry);
359 }
360 
SetEqualIdentifier(const PragmaSetEqualIdentifier * param)361 int SyncAbleKvDBConnection::SetEqualIdentifier(const PragmaSetEqualIdentifier *param)
362 {
363     if (param == nullptr) {
364         return -E_INVALID_ARGS;
365     }
366 
367     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
368     if (kvDB == nullptr) {
369         return -E_INVALID_CONNECTION;
370     }
371     return kvDB->SetEqualIdentifier(param->identifier_, param->targets_);
372 }
373 
SetPushDataInterceptor(const PushDataInterceptor & interceptor)374 int SyncAbleKvDBConnection::SetPushDataInterceptor(const PushDataInterceptor &interceptor)
375 {
376     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
377     if (kvDB == nullptr) {
378         return -E_INVALID_CONNECTION;
379     }
380     kvDB->SetSendDataInterceptor(interceptor);
381     return E_OK;
382 }
383 
GetSyncDataSize(const std::string & device,size_t & size) const384 int SyncAbleKvDBConnection::GetSyncDataSize(const std::string &device, size_t &size) const
385 {
386     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
387     if (kvDB == nullptr) {
388         return -E_INVALID_CONNECTION;
389     }
390     return kvDB->GetSyncDataSize(device, size);
391 }
392 
GetWatermarkInfo(const std::string & device,WatermarkInfo & info)393 int SyncAbleKvDBConnection::GetWatermarkInfo(const std::string &device, WatermarkInfo &info)
394 {
395     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
396     if (kvDB == nullptr) {
397         return -E_INVALID_CONNECTION;
398     }
399     return kvDB->GetWatermarkInfo(device, info);
400 }
401 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)402 int SyncAbleKvDBConnection::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
403 {
404     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
405     if (kvDB == nullptr) {
406         return -E_INVALID_CONNECTION;
407     }
408 
409     int securityLabel = INVALID_SEC_LABEL;
410     int securityFlag = INVALID_SEC_FLAG;
411     GetSecurityOption(securityLabel, securityFlag);
412     if (securityLabel == S4) {
413         LOGE("The current data does not support synchronization.");
414         return -E_SECURITY_OPTION_CHECK_ERROR;
415     }
416     return kvDB->Sync(option, onProcess);
417 }
418 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)419 int SyncAbleKvDBConnection::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
420 {
421     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
422     if (kvDB == nullptr) {
423         return -E_INVALID_CONNECTION;
424     }
425     return kvDB->SetCloudDB(cloudDBs);
426 }
427 
GetTaskCount()428 int32_t SyncAbleKvDBConnection::GetTaskCount()
429 {
430     SyncAbleKvDB *kvDB = GetDB<SyncAbleKvDB>();
431     if (kvDB == nullptr) {
432         LOGW("[SyncAbleKvDBConnection] Get task count with null db");
433         return -1;
434     }
435     return kvDB->GetTaskCount();
436 }
437 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)438 void SyncAbleKvDBConnection::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
439 {
440     auto *kvDB = GetDB<SyncAbleKvDB>();
441     if (kvDB == nullptr) {
442         LOGW("[SyncAbleKvDBConnection] Set generate cloud version callback with null db");
443         return;
444     }
445     kvDB->SetGenCloudVersionCallback(callback);
446 }
447 
SetReceiveDataInterceptor(const DataInterceptor & interceptor)448 int SyncAbleKvDBConnection::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
449 {
450     auto kvDB = GetDB<SyncAbleKvDB>();
451     if (kvDB == nullptr) {
452         return -E_INVALID_CONNECTION;
453     }
454     kvDB->SetReceiveDataInterceptor(interceptor);
455     return E_OK;
456 }
457 }
458