1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "data_syncer.h"
17
18 #include <functional>
19
20 #include "data_sync_const.h"
21 #include "data_sync_notifier.h"
22 #include "dfs_error.h"
23 #include "ipc/cloud_sync_callback_manager.h"
24 #include "sdk_helper.h"
25 #include "sync_rule/battery_status.h"
26 #include "utils_log.h"
27
28 namespace OHOS {
29 namespace FileManagement {
30 namespace CloudSync {
31 using namespace std;
32 using namespace placeholders;
33 using namespace DriveKit;
34 using ChangeType = AAFwk::ChangeInfo::ChangeType;
35
DataSyncer(const std::string bundleName,const int32_t userId)36 DataSyncer::DataSyncer(const std::string bundleName, const int32_t userId)
37 : bundleName_(bundleName), userId_(userId)
38 {
39 /* alloc task runner */
40 taskRunner_ = DelayedSingleton<TaskManager>::GetInstance()->AllocRunner(userId,
41 bundleName, bind(&DataSyncer::Schedule, this));
42 }
43
~DataSyncer()44 DataSyncer::~DataSyncer()
45 {
46 /* release task runner */
47 DelayedSingleton<TaskManager>::GetInstance()->ReleaseRunner(userId_, bundleName_);
48 }
49
AsyncRun(std::shared_ptr<TaskContext> context,void (DataSyncer::* f)(std::shared_ptr<TaskContext>))50 int32_t DataSyncer::AsyncRun(std::shared_ptr<TaskContext> context,
51 void(DataSyncer::*f)(std::shared_ptr<TaskContext>))
52 {
53 return taskRunner_->AsyncRun<DataSyncer>(context, f, this);
54 }
55
56 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...))57 function<RET(ARGS...)> DataSyncer::AsyncCallback(RET(T::*f)(ARGS...))
58 {
59 return taskRunner_->AsyncCallback<DataSyncer>(f, this);
60 }
61
StartSync(bool forceFlag,SyncTriggerType triggerType)62 int32_t DataSyncer::StartSync(bool forceFlag, SyncTriggerType triggerType)
63 {
64 LOGI("%{private}d %{public}s starts sync, isforceSync %{public}d, triggerType %{public}d",
65 userId_, bundleName_.c_str(), forceFlag, triggerType);
66
67 /* only one specific data sycner running at a time */
68 if (syncStateManager_.CheckAndSetPending(forceFlag)) {
69 LOGI("syncing, pending sync");
70 return E_PENDING;
71 }
72
73 /* start data sync */
74 Schedule();
75
76 return E_OK;
77 }
78
StopSync(SyncTriggerType triggerType)79 int32_t DataSyncer::StopSync(SyncTriggerType triggerType)
80 {
81 LOGI("%{private}d %{public}s stops sync, trigger stop sync, type:%{public}d",
82 userId_, bundleName_.c_str(), triggerType);
83
84 syncStateManager_.SetStopSyncFlag();
85 Abort();
86 return E_OK;
87 }
88
Lock()89 int32_t DataSyncer::Lock()
90 {
91 lock_guard<mutex> lock(lock_.mtx);
92 if (lock_.count > 0) {
93 lock_.count++;
94 return E_OK;
95 }
96
97 /* lock: device-reentrant */
98 int32_t ret = sdkHelper_->GetLock(lock_.lock);
99 if (ret != E_OK) {
100 LOGE("sdk helper get lock err %{public}d", ret);
101 lock_.lock = { 0 };
102 return ret;
103 }
104 lock_.count++;
105
106 return ret;
107 }
108
Unlock()109 void DataSyncer::Unlock()
110 {
111 lock_guard<mutex> lock(lock_.mtx);
112 lock_.count--;
113 if (lock_.count > 0) {
114 return;
115 }
116
117 /* sdk unlock */
118 sdkHelper_->DeleteLock(lock_.lock);
119
120 /* reset sdk lock */
121 lock_.lock = { 0 };
122 }
123
ForceUnlock()124 void DataSyncer::ForceUnlock()
125 {
126 lock_guard<mutex> lock(lock_.mtx);
127 if (lock_.count == 0) {
128 return;
129 }
130 sdkHelper_->DeleteLock(lock_.lock);
131 lock_.lock = { 0 };
132 lock_.count = 0;
133 }
134
StartDownloadFile(const std::string path,const int32_t userId)135 int32_t DataSyncer::StartDownloadFile(const std::string path, const int32_t userId)
136 {
137 return E_OK;
138 }
139
StopDownloadFile(const std::string path,const int32_t userId)140 int32_t DataSyncer::StopDownloadFile(const std::string path, const int32_t userId)
141 {
142 downloadCallbackMgr_.StopDonwload(path, userId);
143 return E_OK;
144 }
145
RegisterDownloadFileCallback(const int32_t userId,const sptr<ICloudDownloadCallback> downloadCallback)146 int32_t DataSyncer::RegisterDownloadFileCallback(const int32_t userId,
147 const sptr<ICloudDownloadCallback> downloadCallback)
148 {
149 downloadCallbackMgr_.RegisterCallback(userId, downloadCallback);
150 return E_OK;
151 }
152
UnregisterDownloadFileCallback(const int32_t userId)153 int32_t DataSyncer::UnregisterDownloadFileCallback(const int32_t userId)
154 {
155 downloadCallbackMgr_.UnregisterCallback(userId);
156 return E_OK;
157 }
158
Abort()159 void DataSyncer::Abort()
160 {
161 LOGI("%{private}d %{private}s aborts", userId_, bundleName_.c_str());
162 thread ([this]() {
163 /* stop all the tasks and wait for tasks' termination */
164 if (!taskRunner_->StopAndWaitFor()) {
165 LOGE("wait for tasks stop fail");
166 }
167 /* call the syncer manager's callback for notification */
168 CompleteAll();
169 }).detach();
170 }
171
SetSdkHelper(shared_ptr<SdkHelper> & sdkHelper)172 void DataSyncer::SetSdkHelper(shared_ptr<SdkHelper> &sdkHelper)
173 {
174 sdkHelper_ = sdkHelper;
175 }
176
Pull(shared_ptr<DataHandler> handler)177 int32_t DataSyncer::Pull(shared_ptr<DataHandler> handler)
178 {
179 LOGI("%{private}d %{private}s pull", userId_, bundleName_.c_str());
180
181 shared_ptr<TaskContext> context = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
182
183 RetryDownloadRecords(context);
184 vector<DKRecordId> records;
185 int32_t ret = handler->GetRetryRecords(records);
186 if (ret == E_OK) {
187 DataSyncer::PullRecordsWithId(context, records, true);
188 }
189
190 /* Full synchronization and incremental synchronization */
191 if (handler->IsPullRecords()) {
192 ret = AsyncRun(context, &DataSyncer::PullRecords);
193 } else {
194 ret = AsyncRun(context, &DataSyncer::PullDatabaseChanges);
195 }
196 if (ret != E_OK) {
197 LOGE("asyn run pull records err %{public}d", ret);
198 return ret;
199 }
200
201 return E_OK;
202 }
203
204 /*
205 * Although records from the cloud should be all pulled down before
206 * uploading the local change, conflicts might be rare in most cases,
207 * and the syncer would just move on.
208 */
PullRecords(shared_ptr<TaskContext> context)209 void DataSyncer::PullRecords(shared_ptr<TaskContext> context)
210 {
211 LOGI("%{private}d %{private}s pull records", userId_, bundleName_.c_str());
212
213 /* get query condition here */
214 auto handler = context->GetHandler();
215 if (handler == nullptr) {
216 LOGE("context get handler err");
217 return;
218 }
219
220 FetchCondition cond;
221 handler->GetFetchCondition(cond);
222
223 DKQueryCursor tempStartCursor;
224 handler->GetTempStartCursor(tempStartCursor);
225 if (tempStartCursor.empty()) {
226 sdkHelper_->GetStartCursor(cond.recordType, tempStartCursor);
227 handler->SetTempStartCursor(tempStartCursor);
228 }
229
230 DKQueryCursor nextCursor;
231 handler->GetNextCursor(nextCursor);
232
233 SdkHelper::FetchRecordsCallback callback = nullptr;
234 if (handler->GetCheckFlag()) {
235 callback = AsyncCallback(&DataSyncer::OnFetchCheckRecords);
236 } else {
237 callback = AsyncCallback(&DataSyncer::OnFetchRecords);
238 }
239 if (callback == nullptr) {
240 LOGE("wrap on fetch records fail");
241 return;
242 }
243
244 int32_t ret = sdkHelper_->FetchRecords(context, cond, nextCursor, callback);
245 if (ret != E_OK) {
246 LOGE("sdk fetch records err %{public}d", ret);
247 }
248 }
249
PullDatabaseChanges(shared_ptr<TaskContext> context)250 void DataSyncer::PullDatabaseChanges(shared_ptr<TaskContext> context)
251 {
252 LOGI("%{private}d %{private}s pull database changes", userId_, bundleName_.c_str());
253
254 auto callback = AsyncCallback(&DataSyncer::OnFetchDatabaseChanges);
255 if (callback == nullptr) {
256 LOGE("wrap on fetch records fail");
257 return;
258 }
259
260 auto handler = context->GetHandler();
261 if (handler == nullptr) {
262 LOGE("context get handler err");
263 return;
264 }
265 DKQueryCursor nextCursor;
266 handler->GetNextCursor(nextCursor);
267
268 FetchCondition cond;
269 handler->GetFetchCondition(cond);
270 int32_t ret = sdkHelper_->FetchDatabaseChanges(context, cond, nextCursor, callback);
271 if (ret != E_OK) {
272 LOGE("sdk fetch records err %{public}d", ret);
273 }
274 }
275
276 struct DownloadContext {
277 std::shared_ptr<DKContext> context;
278 vector<DKDownloadAsset> assets;
279 DriveKit::DKDownloadId id;
280 std::function<void(std::shared_ptr<DriveKit::DKContext>,
281 std::shared_ptr<const DriveKit::DKDatabase>,
282 const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &,
283 const DriveKit::DKError &)>
284 resultCallback;
285 std::function<
286 void(std::shared_ptr<DriveKit::DKContext>, DriveKit::DKDownloadAsset, TotalSize, DriveKit::DownloadSize)>
287 progressCallback;
288 };
289
DownloadAssets(DownloadContext & ctx)290 void DataSyncer::DownloadAssets(DownloadContext &ctx)
291 {
292 if (ctx.resultCallback == nullptr) {
293 LOGE("resultCallback nullptr");
294 return;
295 }
296 if (ctx.progressCallback == nullptr) {
297 LOGE("progressCallback nullptr");
298 return;
299 }
300 sdkHelper_->DownloadAssets(ctx.context, ctx.assets, {}, ctx.id, ctx.resultCallback, ctx.progressCallback);
301 }
302
ThumbDownloadCallback(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,const map<DKDownloadAsset,DKDownloadResult> & resultMap,const DKError & err)303 static void ThumbDownloadCallback(shared_ptr<DKContext> context,
304 shared_ptr<const DKDatabase> database,
305 const map<DKDownloadAsset, DKDownloadResult> &resultMap,
306 const DKError &err)
307 {
308 if (err.HasError()) {
309 LOGE("DKAssetsDownloader err, localErr: %{public}d, serverErr: %{public}d", static_cast<int>(err.dkErrorCode),
310 err.serverErrorCode);
311 } else {
312 LOGI("DKAssetsDownloader ok");
313 }
314
315 auto ctx = static_pointer_cast<TaskContext>(context);
316 auto handler = ctx->GetHandler();
317 handler->OnDownloadThumb(resultMap);
318 }
319
ThumbDownLoadProgress(shared_ptr<DKContext> context,DKDownloadAsset asset,TotalSize total,DownloadSize download)320 void ThumbDownLoadProgress(shared_ptr<DKContext> context, DKDownloadAsset asset, TotalSize total, DownloadSize download)
321 {
322 LOGI("record %s %{public}s download progress", asset.recordId.c_str(), asset.fieldKey.c_str());
323 if (total == download) {
324 auto ctx = static_pointer_cast<TaskContext>(context);
325 auto handler = ctx->GetHandler();
326 handler->OnDownloadThumb(asset);
327 }
328 }
329
HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,bool checkOrRetry)330 int DataSyncer::HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,
331 std::shared_ptr<const DKDatabase> database, std::shared_ptr<std::vector<DKRecord>> records, bool checkOrRetry)
332 {
333 if (records->size() == 0) {
334 LOGI("no records to handle");
335 return E_OK;
336 }
337
338 OnFetchParams onFetchParams;
339 auto ctx = static_pointer_cast<TaskContext>(context);
340 auto handler = ctx->GetHandler();
341 if (handler == nullptr) {
342 LOGE("context get handler err");
343 return E_CONTEXT;
344 }
345
346 if (handler->IsPullRecords() && !checkOrRetry) {
347 onFetchParams.totalPullCount = context->GetBatchNo() * handler->GetRecordSize();
348 }
349
350 int32_t ret = handler->OnFetchRecords(records, onFetchParams);
351 DownloadContext dctx = {.context = context,
352 .assets = onFetchParams.assetsToDownload,
353 .id = 0,
354 .resultCallback = ThumbDownloadCallback,
355 .progressCallback = ThumbDownLoadProgress};
356 DownloadAssets(dctx);
357
358 if (ret != E_OK) {
359 LOGE("handler on fetch records err %{public}d", ret);
360 }
361 if (!checkOrRetry) {
362 handler->FinishPull(context->GetBatchNo());
363 }
364 return ret;
365 }
366
OnFetchRecords(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)367 void DataSyncer::OnFetchRecords(const std::shared_ptr<DKContext> context, std::shared_ptr<const DKDatabase> database,
368 std::shared_ptr<std::vector<DKRecord>> records, DKQueryCursor nextCursor, const DKError &err)
369 {
370 if (err.HasError()) {
371 LOGE("OnFetchRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode, err.dkErrorCode);
372 if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
373 SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
374 }
375 return;
376 }
377
378 LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
379 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
380 auto handler = ctx->GetHandler();
381 if (handler == nullptr) {
382 LOGE("context get handler err");
383 return;
384 }
385 if (ctx->GetBatchNo() == 0) {
386 handler->SetRecordSize(records->size());
387 }
388 /* pull more */
389 if (nextCursor.empty()) {
390 LOGI("no more records");
391 handler->GetTempStartCursor(nextCursor);
392 handler->SetTempNextCursor(nextCursor, true);
393 if (records->size() == 0) {
394 handler->FinishPull(ctx->GetBatchNo());
395 }
396 } else {
397 handler->SetTempNextCursor(nextCursor, false);
398 shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
399 int ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
400 if (ret != E_OK) {
401 LOGE("asyn run pull records err %{public}d", ret);
402 }
403 }
404
405 if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
406 LOGE("HandleOnFetchRecords failed");
407 }
408 }
409
DownloadInner(std::shared_ptr<DataHandler> handler,const std::string path,const int32_t userId)410 int32_t DataSyncer::DownloadInner(std::shared_ptr<DataHandler> handler,
411 const std::string path,
412 const int32_t userId)
413 {
414 auto ctx = std::make_shared<TaskContext>(handler);
415 std::vector<DKDownloadAsset> assetsToDownload;
416 int32_t ret = handler->GetDownloadAsset(path, assetsToDownload);
417 if (ret != E_OK) {
418 LOGE("handler on fetch records err %{public}d", ret);
419 return ret;
420 }
421
422 downloadCallbackMgr_.StartDonwload(path, userId);
423
424 auto downloadResultCallback = [this, path, assetsToDownload, handler](
425 std::shared_ptr<DriveKit::DKContext> context,
426 std::shared_ptr<const DriveKit::DKDatabase> database,
427 const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &results,
428 const DriveKit::DKError &err) {
429 this->downloadCallbackMgr_.OnDownloadedResult(path, assetsToDownload, handler, context, database, results, err);
430 };
431 auto downloadProcessCallback = [this, path](std::shared_ptr<DKContext> context, DKDownloadAsset asset,
432 TotalSize totalSize, DownloadSize downloadSize) {
433 this->downloadCallbackMgr_.OnDownloadProcess(path, context, asset, totalSize, downloadSize);
434 };
435 DownloadContext dctx = {.context = ctx,
436 .assets = assetsToDownload,
437 .id = 0,
438 .resultCallback = downloadResultCallback,
439 .progressCallback = downloadProcessCallback};
440 DownloadAssets(dctx);
441 return E_OK;
442 }
443
OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DriveKit::DKRecord>> records,DKQueryCursor nextCursor,bool hasMore,const DKError & err)444 void DataSyncer::OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,
445 std::shared_ptr<const DKDatabase> database,
446 std::shared_ptr<std::vector<DriveKit::DKRecord>> records, DKQueryCursor nextCursor,
447 bool hasMore, const DKError &err)
448 {
449 LOGI("%{private}d %{private}s on fetch database changes", userId_, bundleName_.c_str());
450 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
451
452 auto handler = ctx->GetHandler();
453 if (handler == nullptr) {
454 LOGE("context get handler err");
455 return;
456 }
457
458 if (err.HasError()) {
459 LOGE("OnFetchDatabaseChanges server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
460 err.dkErrorCode);
461 if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
462 SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
463 } else if (!err.errorDetails.empty()) {
464 DKDetailErrorCode detailCode = static_cast<DKDetailErrorCode>(err.errorDetails[0].detailCode);
465 if (detailCode == DKDetailErrorCode::PARAM_INVALID || detailCode == DKDetailErrorCode::CURSOR_EXPIRED) {
466 handler->SetChecking();
467 Pull(handler);
468 }
469 }
470 return;
471 }
472
473 /* pull more */
474 if (!hasMore) {
475 LOGI("no more records");
476 handler->SetTempNextCursor(nextCursor, true);
477 if (records->size() == 0) {
478 handler->FinishPull(ctx->GetBatchNo());
479 }
480 } else {
481 handler->SetTempNextCursor(nextCursor, false);
482 shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
483 int ret = AsyncRun(nexCtx, &DataSyncer::PullDatabaseChanges);
484 if (ret != E_OK) {
485 LOGE("asyn run pull database changes err %{public}d", ret);
486 }
487 }
488
489 if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
490 LOGE("HandleOnFetchRecords failed");
491 return;
492 }
493 }
494
OnFetchCheckRecords(const shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)495 void DataSyncer::OnFetchCheckRecords(const shared_ptr<DKContext> context,
496 shared_ptr<const DKDatabase> database, shared_ptr<std::vector<DKRecord>> records,
497 DKQueryCursor nextCursor, const DKError &err)
498 {
499 if (err.HasError()) {
500 LOGE("OnFetchCheckRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
501 err.dkErrorCode);
502 if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
503 SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
504 }
505 return;
506 }
507 LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
508 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
509 auto handler = ctx->GetHandler();
510 if (handler == nullptr) {
511 LOGE("context get handler err");
512 return;
513 }
514
515 int32_t ret = E_OK;
516 /* pull more */
517 if (nextCursor.empty()) {
518 LOGI("no more records");
519 handler->GetTempStartCursor(nextCursor);
520 handler->SetTempNextCursor(nextCursor, true);
521 } else {
522 handler->SetTempNextCursor(nextCursor, false);
523 shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
524 ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
525 if (ret != E_OK) {
526 LOGE("asyn run pull records err %{public}d", ret);
527 }
528 }
529
530 std::vector<DriveKit::DKRecordId> checkRecords;
531 ret = handler->GetCheckRecords(checkRecords, records);
532 if (ret != E_OK) {
533 LOGE("handler get check records err %{public}d", ret);
534 return;
535 }
536
537 DataSyncer::PullRecordsWithId(ctx, checkRecords, false);
538 }
539
PullRecordsWithId(shared_ptr<TaskContext> context,const std::vector<DriveKit::DKRecordId> & records,bool retry)540 void DataSyncer::PullRecordsWithId(shared_ptr<TaskContext> context, const std::vector<DriveKit::DKRecordId> &records,
541 bool retry)
542 {
543 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
544 auto handler = ctx->GetHandler();
545 if (handler == nullptr) {
546 LOGE("context get handler err");
547 return;
548 }
549
550 FetchCondition cond;
551 handler->GetFetchCondition(cond);
552 LOGI("retry records count: %{public}u", static_cast<uint32_t>(records.size()));
553 for (auto it : records) {
554 auto callback = AsyncCallback(&DataSyncer::OnFetchRecordWithId);
555 if (callback == nullptr) {
556 LOGE("wrap on fetch records fail");
557 continue;
558 }
559 int32_t ret = sdkHelper_->FetchRecordWithId(context, cond, it, callback);
560 if (ret != E_OK) {
561 LOGE("sdk fetch records err %{public}d", ret);
562 }
563 }
564 if (!retry) {
565 handler->FinishPull(ctx->GetBatchNo());
566 }
567 }
568
OnFetchRecordWithId(shared_ptr<DKContext> context,shared_ptr<DKDatabase> database,DKRecordId recordId,const DKRecord & record,const DKError & error)569 void DataSyncer::OnFetchRecordWithId(shared_ptr<DKContext> context, shared_ptr<DKDatabase> database,
570 DKRecordId recordId, const DKRecord &record, const DKError &error)
571 {
572 auto records = make_shared<std::vector<DKRecord>>();
573 if (error.HasError()) {
574 LOGE("has error, recordId:%s, dkErrorCode :%{public}d, serverErrorCode:%{public}d", recordId.c_str(),
575 error.dkErrorCode, error.serverErrorCode);
576 LOGI("convert to delete record");
577 DKRecord deleteRecord;
578 deleteRecord.SetRecordId(recordId);
579 deleteRecord.SetDelete(true);
580 records->push_back(deleteRecord);
581 } else {
582 LOGI("handle retry record : %s", record.GetRecordId().c_str());
583 records->push_back(record);
584 }
585 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
586 HandleOnFetchRecords(ctx, database, records, true);
587 }
588
RetryDownloadRecords(shared_ptr<TaskContext> context)589 void DataSyncer::RetryDownloadRecords(shared_ptr<TaskContext> context)
590 {
591 auto ctx = static_pointer_cast<TaskContext>(context);
592 auto handler = ctx->GetHandler();
593 if (handler == nullptr) {
594 LOGE("context get handler err");
595 return;
596 }
597
598 vector<DriveKit::DKDownloadAsset> assetsToDownload;
599 int32_t ret = handler->GetAssetsToDownload(assetsToDownload);
600 if (ret != E_OK) {
601 LOGE("get assetsToDownload err %{public}d", ret);
602 return;
603 }
604 if (assetsToDownload.empty()) {
605 return;
606 }
607
608 LOGI("assetsToDownload count: %{public}zu", assetsToDownload.size());
609 DownloadContext dctx = {.context = ctx,
610 .assets = assetsToDownload,
611 .id = 0,
612 .resultCallback = ThumbDownloadCallback,
613 .progressCallback = ThumbDownLoadProgress};
614 DownloadAssets(dctx);
615 }
616
Push(shared_ptr<DataHandler> handler)617 int32_t DataSyncer::Push(shared_ptr<DataHandler> handler)
618 {
619 /*
620 * Although unlikely, if the first callback finds no more records available
621 * and tries to schedule before following tasks commited, the data syncer
622 * will directly schedule to the next stage, while following tasks would be
623 * commited to the next stage mistakenly.
624 * One possible solution: commit dummy task in the beginning and complete
625 * dummy task in the end.
626 */
627 shared_ptr<TaskContext> context = make_shared<TaskContext>(handler);
628
629 /* commit a dummy task */
630 BeginTransaction();
631
632 int32_t ret = AsyncRun(context, &DataSyncer::CreateRecords);
633 if (ret != E_OK) {
634 LOGE("async run create records err %{public}d", ret);
635 return ret;
636 }
637
638 ret = AsyncRun(context, &DataSyncer::DeleteRecords);
639 if (ret != E_OK) {
640 LOGE("async run delete records err %{public}d", ret);
641 return ret;
642 }
643
644 ret = AsyncRun(context, &DataSyncer::ModifyMdirtyRecords);
645 if (ret != E_OK) {
646 LOGE("async run modify mdirty records err %{public}d", ret);
647 return ret;
648 }
649
650 ret = AsyncRun(context, &DataSyncer::ModifyFdirtyRecords);
651 if (ret != E_OK) {
652 LOGE("async run modify fdirty records err %{public}d", ret);
653 return ret;
654 }
655
656 /* complete the dummy task */
657 EndTransaction();
658
659 return E_OK;
660 }
661
Init(const std::string bundleName,const int32_t userId)662 int32_t DataSyncer::Init(const std::string bundleName, const int32_t userId)
663 {
664 return E_OK;
665 }
666
Clean(const int action)667 int32_t DataSyncer::Clean(const int action)
668 {
669 return E_OK;
670 }
671
CleanInner(std::shared_ptr<DataHandler> handler,const int action)672 int32_t DataSyncer::CleanInner(std::shared_ptr<DataHandler> handler, const int action)
673 {
674 LOGD("Enter function DataSyncer::CleanInner");
675 handler->ClearCursor();
676 int res = handler->Clean(action);
677 if (res != E_OK) {
678 LOGE("Clean file failed res:%{public}d", res);
679 }
680 return res;
681 }
682
CreateRecords(shared_ptr<TaskContext> context)683 void DataSyncer::CreateRecords(shared_ptr<TaskContext> context)
684 {
685 LOGI("%{private}d %{private}s creates records", userId_, bundleName_.c_str());
686
687 auto handler = context->GetHandler();
688 if (handler == nullptr) {
689 LOGE("context get handler err");
690 return;
691 }
692
693 /* query local */
694 vector<DKRecord> records;
695 int32_t ret = handler->GetCreatedRecords(records);
696 if (ret != E_OK) {
697 LOGE("handler get created records err %{public}d", ret);
698 return;
699 }
700
701 /* no need upload */
702 if (records.size() == 0) {
703 return;
704 }
705
706 if (!BatteryStatus::IsAllowUpload(syncStateManager_.GetForceFlag())) {
707 LOGE("battery status abnormal, abort upload");
708 SetErrorCodeMask(errorCode_, ErrorType::BATTERY_LEVEL_LOW);
709 return;
710 }
711
712 /* upload */
713 auto callback = AsyncCallback(&DataSyncer::OnCreateRecords);
714 if (callback == nullptr) {
715 LOGE("wrap on create records fail");
716 return;
717 }
718 ret = sdkHelper_->CreateRecords(context, records, callback);
719 if (ret != E_OK) {
720 LOGE("sdk create records err %{public}d", ret);
721 }
722 }
723
DeleteRecords(shared_ptr<TaskContext> context)724 void DataSyncer::DeleteRecords(shared_ptr<TaskContext> context)
725 {
726 LOGI("%{private}d %{private}s deletes records", userId_, bundleName_.c_str());
727
728 auto handler = context->GetHandler();
729 if (handler == nullptr) {
730 LOGE("context get handler err");
731 return;
732 }
733
734 /* query local */
735 vector<DKRecord> records;
736 int32_t ret = handler->GetDeletedRecords(records);
737 if (ret != E_OK) {
738 LOGE("handler get deleted records err %{public}d", ret);
739 return;
740 }
741
742 /* no need upload */
743 if (records.size() == 0) {
744 return;
745 }
746
747 /* upload */
748 auto callback = AsyncCallback(&DataSyncer::OnDeleteRecords);
749 if (callback == nullptr) {
750 LOGE("wrap on delete records fail");
751 return;
752 }
753 ret = sdkHelper_->DeleteRecords(context, records, callback);
754 if (ret != E_OK) {
755 LOGE("sdk delete records err %{public}d", ret);
756 }
757 }
758
ModifyMdirtyRecords(shared_ptr<TaskContext> context)759 void DataSyncer::ModifyMdirtyRecords(shared_ptr<TaskContext> context)
760 {
761 LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
762
763 auto handler = context->GetHandler();
764 if (handler == nullptr) {
765 LOGE("context get handler err");
766 return;
767 }
768
769 /* query local */
770 vector<DKRecord> records;
771 int32_t ret = handler->GetMetaModifiedRecords(records);
772 if (ret != E_OK) {
773 LOGE("handler get modified records err %{public}d", ret);
774 return;
775 }
776
777 /* no need upload */
778 if (records.size() == 0) {
779 return;
780 }
781
782 /* upload */
783 auto callback = AsyncCallback(&DataSyncer::OnModifyMdirtyRecords);
784 if (callback == nullptr) {
785 LOGE("wrap on modify records fail");
786 return;
787 }
788 ret = sdkHelper_->ModifyRecords(context, records, callback);
789 if (ret != E_OK) {
790 LOGE("sdk modify records err %{public}d", ret);
791 }
792 }
793
ModifyFdirtyRecords(shared_ptr<TaskContext> context)794 void DataSyncer::ModifyFdirtyRecords(shared_ptr<TaskContext> context)
795 {
796 LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
797
798 auto handler = context->GetHandler();
799 if (handler == nullptr) {
800 LOGE("context get handler err");
801 return;
802 }
803
804 /* query local */
805 vector<DKRecord> records;
806 int32_t ret = handler->GetFileModifiedRecords(records);
807 if (ret != E_OK) {
808 LOGE("handler get modified records err %{public}d", ret);
809 return;
810 }
811
812 /* no need upload */
813 if (records.size() == 0) {
814 return;
815 }
816
817 /* upload */
818 auto callback = AsyncCallback(&DataSyncer::OnModifyFdirtyRecords);
819 if (callback == nullptr) {
820 LOGE("wrap on modify records fail");
821 return;
822 }
823 ret = sdkHelper_->ModifyRecords(context, records, callback);
824 if (ret != E_OK) {
825 LOGE("sdk modify records err %{public}d", ret);
826 }
827 }
828
OnCreateRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)829 void DataSyncer::OnCreateRecords(shared_ptr<DKContext> context,
830 shared_ptr<const DKDatabase> database,
831 shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
832 {
833 LOGI("%{private}d %{private}s on create records %{public}zu", userId_,
834 bundleName_.c_str(), map->size());
835
836 auto ctx = static_pointer_cast<TaskContext>(context);
837
838 /* update local */
839 auto handler = ctx->GetHandler();
840 int32_t ret = handler->OnCreateRecords(*map);
841 if (ret != E_OK) {
842 LOGE("handler on create records err %{public}d", ret);
843 if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
844 SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
845 } else if (ret == E_CLOUD_STORAGE_FULL) {
846 SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
847 }
848 return;
849 }
850
851 /* push more */
852 ret = AsyncRun(ctx, &DataSyncer::CreateRecords);
853 if (ret != E_OK) {
854 LOGE("async run create records err %{public}d", ret);
855 return;
856 }
857 }
858
OnDeleteRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)859 void DataSyncer::OnDeleteRecords(shared_ptr<DKContext> context,
860 shared_ptr<const DKDatabase> database,
861 shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
862 {
863 LOGI("%{private}d %{private}s on delete records %{public}zu", userId_,
864 bundleName_.c_str(), map->size());
865
866 auto ctx = static_pointer_cast<TaskContext>(context);
867
868 /* update local */
869 auto handler = ctx->GetHandler();
870 int32_t ret = handler->OnDeleteRecords(*map);
871 if (ret != E_OK) {
872 LOGE("handler on delete records err %{public}d", ret);
873 if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
874 SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
875 } else if (ret == E_CLOUD_STORAGE_FULL) {
876 SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
877 }
878 return;
879 }
880
881 /* push more */
882 ret = AsyncRun(ctx, &DataSyncer::DeleteRecords);
883 if (ret != E_OK) {
884 LOGE("async run delete records err %{public}d", ret);
885 return;
886 }
887 }
888
OnModifyMdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)889 void DataSyncer::OnModifyMdirtyRecords(shared_ptr<DKContext> context,
890 shared_ptr<const DKDatabase> database,
891 shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
892 shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
893 const DKError &err)
894 {
895 LOGI("%{private}d %{private}s on modify mdirty records %{public}zu", userId_,
896 bundleName_.c_str(), saveMap->size());
897
898 auto ctx = static_pointer_cast<TaskContext>(context);
899
900 /* update local */
901 auto handler = ctx->GetHandler();
902 int32_t ret = handler->OnModifyMdirtyRecords(*saveMap);
903 if (ret != E_OK) {
904 LOGE("handler on modify records err %{public}d", ret);
905 if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
906 SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
907 } else if (ret == E_CLOUD_STORAGE_FULL) {
908 SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
909 }
910 return;
911 }
912
913 /* push more */
914 ret = AsyncRun(ctx, &DataSyncer::ModifyMdirtyRecords);
915 if (ret != E_OK) {
916 LOGE("async run modify records err %{public}d", ret);
917 return;
918 }
919 }
920
OnModifyFdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)921 void DataSyncer::OnModifyFdirtyRecords(shared_ptr<DKContext> context,
922 shared_ptr<const DKDatabase> database,
923 shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
924 shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
925 const DKError &err)
926 {
927 LOGI("%{private}d %{private}s on modify fdirty records %{public}zu", userId_,
928 bundleName_.c_str(), saveMap->size());
929
930 auto ctx = static_pointer_cast<TaskContext>(context);
931
932 /* update local */
933 auto handler = ctx->GetHandler();
934 int32_t ret = handler->OnModifyFdirtyRecords(*saveMap);
935 if (ret != E_OK) {
936 LOGE("handler on modify records err %{public}d", ret);
937 if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
938 SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
939 } else if (ret == E_CLOUD_STORAGE_FULL) {
940 SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
941 }
942 return;
943 }
944
945 /* push more */
946 ret = AsyncRun(ctx, &DataSyncer::ModifyFdirtyRecords);
947 if (ret != E_OK) {
948 LOGE("async run modify records err %{public}d", ret);
949 return;
950 }
951 }
952
BeginTransaction()953 void DataSyncer::BeginTransaction()
954 {
955 taskRunner_->CommitDummyTask();
956 }
957
EndTransaction()958 void DataSyncer::EndTransaction()
959 {
960 taskRunner_->CompleteDummyTask();
961 }
962
GetBundleName() const963 std::string DataSyncer::GetBundleName() const
964 {
965 return bundleName_;
966 }
967
GetUserId() const968 int32_t DataSyncer::GetUserId() const
969 {
970 return userId_;
971 }
972
GetSyncState() const973 SyncState DataSyncer::GetSyncState() const
974 {
975 return syncStateManager_.GetSyncState();
976 }
977
CompletePull()978 int32_t DataSyncer::CompletePull()
979 {
980 LOGI("%{private}d %{private}s completes pull", userId_, bundleName_.c_str());
981 /* call syncer manager callback */
982 auto error = GetErrorType(errorCode_);
983 if (error) {
984 LOGE("pull failed, errorType:%{public}d", error);
985 SyncStateChangedNotify(CloudSyncState::DOWNLOAD_FAILED, error);
986 CompleteAll(false);
987 } else {
988 /* schedule to next stage */
989 Schedule();
990 }
991 return E_OK;
992 }
993
CompletePush()994 int32_t DataSyncer::CompletePush()
995 {
996 LOGI("%{private}d %{public}s completes push", userId_, bundleName_.c_str());
997 /* call syncer manager callback */
998 auto error = GetErrorType(errorCode_);
999 if (error) {
1000 LOGE("pull failed, errorType:%{public}d", error);
1001 SyncStateChangedNotify(CloudSyncState::UPLOAD_FAILED, error);
1002 CompleteAll(false);
1003 } else {
1004 /* schedule to next stage */
1005 Schedule();
1006 }
1007 return E_OK;
1008 }
1009
CompleteAll(bool isNeedNotify)1010 void DataSyncer::CompleteAll(bool isNeedNotify)
1011 {
1012 LOGI("%{private}d %{private}s completes all", userId_, bundleName_.c_str());
1013
1014 /* guarantee: unlock if locked */
1015 ForceUnlock();
1016
1017 /* reset internal status */
1018 Reset();
1019
1020 SyncState syncState = SyncState::SYNC_SUCCEED;
1021 if (errorCode_ != E_OK) {
1022 syncState = SyncState::SYNC_FAILED;
1023 }
1024
1025 CloudSyncState notifyState = CloudSyncState::COMPLETED;
1026 if (syncStateManager_.GetStopSyncFlag()) {
1027 notifyState = CloudSyncState::STOPPED;
1028 } else {
1029 SaveSubscription();
1030 }
1031
1032 auto nextAction = syncStateManager_.UpdateSyncState(syncState);
1033 if (nextAction == Action::START) {
1034 /* Retrigger sync, clear errorcode */
1035 errorCode_ = E_OK;
1036 StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1037 return;
1038 }
1039 if (nextAction == Action::FORCE_START) {
1040 /* Retrigger sync, clear errorcode */
1041 errorCode_ = E_OK;
1042 StartSync(true, SyncTriggerType::PENDING_TRIGGER);
1043 return;
1044 }
1045
1046 /* notify sync state */
1047 if (isNeedNotify) {
1048 SyncStateChangedNotify(notifyState, ErrorType::NO_ERROR);
1049 }
1050 /* clear errorcode */
1051 errorCode_ = E_OK;
1052 }
1053
SyncStateChangedNotify(const CloudSyncState state,const ErrorType error)1054 void DataSyncer::SyncStateChangedNotify(const CloudSyncState state, const ErrorType error)
1055 {
1056 CurrentSyncState_ = state;
1057 CurrentErrorType_ = error;
1058 CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(state, error);
1059 }
1060
NotifyCurrentSyncState()1061 void DataSyncer::NotifyCurrentSyncState()
1062 {
1063 CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(CurrentSyncState_, CurrentErrorType_);
1064 }
1065
SetErrorCodeMask(int32_t & errorCode,ErrorType errorType)1066 void DataSyncer::SetErrorCodeMask(int32_t &errorCode, ErrorType errorType)
1067 {
1068 errorCode |= 1 << errorType;
1069 }
1070
GetErrorType(const int32_t code)1071 ErrorType DataSyncer::GetErrorType(const int32_t code)
1072 {
1073 if (code == E_OK) {
1074 return ErrorType::NO_ERROR;
1075 }
1076
1077 std::vector<ErrorType> errorTypes = {NETWORK_UNAVAILABLE, WIFI_UNAVAILABLE, BATTERY_LEVEL_WARNING,
1078 BATTERY_LEVEL_LOW, CLOUD_STORAGE_FULL, LOCAL_STORAGE_FULL};
1079 for (const auto &errorType : errorTypes) {
1080 if (code & (1 << errorType)) {
1081 return errorType;
1082 }
1083 }
1084 LOGE("errorcode unexpected, errcode: %{public}d", code);
1085 return ErrorType::NO_ERROR;
1086 }
1087
SaveSubscription()1088 void DataSyncer::SaveSubscription()
1089 {
1090 sdkHelper_->SaveSubscription([] (auto, shared_ptr<DriveKit::DKContainer>,
1091 DriveKit::DKSubscriptionResult & res) {
1092 if (!res.IsSuccess()) {
1093 auto err = res.GetDKError();
1094 LOGE("drivekit save subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1095 err.dkErrorCode);
1096 }
1097 });
1098 }
1099
DeleteSubscription()1100 void DataSyncer::DeleteSubscription()
1101 {
1102 sdkHelper_->DeleteSubscription([] (auto, DriveKit::DKError err) {
1103 if (err.HasError()) {
1104 LOGE("drivekit delete subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1105 err.dkErrorCode);
1106 }
1107 });
1108 }
1109 } // namespace CloudSync
1110 } // namespace FileManagement
1111 } // namespace OHOS
1112