• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "process_notifier.h"
16 
17 #include "db_errno.h"
18 #include "kv_store_errno.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
ProcessNotifier(ICloudSyncer * syncer)21 ProcessNotifier::ProcessNotifier(ICloudSyncer *syncer)
22     : syncer_(syncer)
23 {
24     RefObject::IncObjRef(syncer_);
25 }
26 
~ProcessNotifier()27 ProcessNotifier::~ProcessNotifier()
28 {
29     RefObject::DecObjRef(syncer_);
30 }
31 
Init(const std::vector<std::string> & tableName,const std::vector<std::string> & devices)32 void ProcessNotifier::Init(const std::vector<std::string> &tableName,
33     const std::vector<std::string> &devices)
34 {
35     std::lock_guard<std::mutex> autoLock(processMutex_);
36     syncProcess_.errCode = OK;
37     syncProcess_.process = ProcessStatus::PROCESSING;
38     for (const auto &table: tableName) {
39         TableProcessInfo tableInfo = {
40             .process = ProcessStatus::PREPARED
41         };
42         syncProcess_.tableProcess[table] = tableInfo;
43     }
44     devices_ = devices;
45 }
46 
UpdateProcess(const ICloudSyncer::InnerProcessInfo & process)47 void ProcessNotifier::UpdateProcess(const ICloudSyncer::InnerProcessInfo &process)
48 {
49     if (process.tableName.empty()) {
50         return;
51     }
52     std::lock_guard<std::mutex> autoLock(processMutex_);
53     syncProcess_.tableProcess[process.tableName].process = process.tableStatus;
54     if (process.downLoadInfo.batchIndex != 0u) {
55         LOGD("[ProcessNotifier] update download process index: %" PRIu32, process.downLoadInfo.batchIndex);
56         syncProcess_.tableProcess[process.tableName].downLoadInfo.batchIndex = process.downLoadInfo.batchIndex;
57         syncProcess_.tableProcess[process.tableName].downLoadInfo.total = process.downLoadInfo.total;
58         syncProcess_.tableProcess[process.tableName].downLoadInfo.failCount = process.downLoadInfo.failCount;
59         syncProcess_.tableProcess[process.tableName].downLoadInfo.successCount = process.downLoadInfo.successCount;
60     }
61     if (process.upLoadInfo.batchIndex != 0u) {
62         LOGD("[ProcessNotifier] update upload process index: %" PRIu32, process.upLoadInfo.batchIndex);
63         syncProcess_.tableProcess[process.tableName].upLoadInfo.batchIndex = process.upLoadInfo.batchIndex;
64         syncProcess_.tableProcess[process.tableName].upLoadInfo.total = process.upLoadInfo.total;
65         syncProcess_.tableProcess[process.tableName].upLoadInfo.failCount = process.upLoadInfo.failCount;
66         syncProcess_.tableProcess[process.tableName].upLoadInfo.successCount = process.upLoadInfo.successCount;
67     }
68 }
69 
NotifyProcess(const ICloudSyncer::CloudTaskInfo & taskInfo,const ICloudSyncer::InnerProcessInfo & process,bool notifyWhenError)70 void ProcessNotifier::NotifyProcess(const ICloudSyncer::CloudTaskInfo &taskInfo,
71     const ICloudSyncer::InnerProcessInfo &process, bool notifyWhenError)
72 {
73     UpdateProcess(process);
74     std::map<std::string, SyncProcess> currentProcess;
75     {
76         std::lock_guard<std::mutex> autoLock(processMutex_);
77         if (!notifyWhenError && taskInfo.errCode != E_OK) {
78             LOGD("[ProcessNotifier] task has error, do not notify now");
79             return;
80         }
81         syncProcess_.errCode = TransferDBErrno(taskInfo.errCode);
82         syncProcess_.process = taskInfo.status;
83         for (const auto &device : devices_) {
84             // make sure only one device
85             currentProcess[device] = syncProcess_;
86         }
87     }
88     SyncProcessCallback callback = taskInfo.callback;
89     if (!callback) {
90         LOGD("[ProcessNotifier] task hasn't callback");
91         return;
92     }
93     ICloudSyncer *syncer = syncer_;
94     if (syncer == nullptr) {
95         return; // should not happen
96     }
97     RefObject::IncObjRef(syncer);
98     auto id = syncer->GetIdentify();
99     syncer->IncSyncCallbackTaskCount();
100     int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(id, [callback, currentProcess, syncer]() {
101         LOGD("[ProcessNotifier] begin notify process");
102         callback(currentProcess);
103         syncer->DecSyncCallbackTaskCount();
104         RefObject::DecObjRef(syncer);
105         LOGD("[ProcessNotifier] notify process finish");
106     });
107     if (errCode != E_OK) {
108         LOGW("[ProcessNotifier] schedule notify process failed %d", errCode);
109     }
110 }
111 
GetDevices() const112 std::vector<std::string> ProcessNotifier::GetDevices() const
113 {
114     return devices_;
115 }
116 }