1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "process_notifier.h"
16
17 #include "db_errno.h"
18 #include "kv_store_errno.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
ProcessNotifier(ICloudSyncer * syncer)21 ProcessNotifier::ProcessNotifier(ICloudSyncer *syncer)
22 : syncer_(syncer)
23 {
24 RefObject::IncObjRef(syncer_);
25 }
26
~ProcessNotifier()27 ProcessNotifier::~ProcessNotifier()
28 {
29 RefObject::DecObjRef(syncer_);
30 }
31
Init(const std::vector<std::string> & tableName,const std::vector<std::string> & devices)32 void ProcessNotifier::Init(const std::vector<std::string> &tableName,
33 const std::vector<std::string> &devices)
34 {
35 std::lock_guard<std::mutex> autoLock(processMutex_);
36 syncProcess_.errCode = OK;
37 syncProcess_.process = ProcessStatus::PROCESSING;
38 for (const auto &table: tableName) {
39 TableProcessInfo tableInfo = {
40 .process = ProcessStatus::PREPARED
41 };
42 syncProcess_.tableProcess[table] = tableInfo;
43 }
44 devices_ = devices;
45 }
46
UpdateProcess(const ICloudSyncer::InnerProcessInfo & process)47 void ProcessNotifier::UpdateProcess(const ICloudSyncer::InnerProcessInfo &process)
48 {
49 if (process.tableName.empty()) {
50 return;
51 }
52 std::lock_guard<std::mutex> autoLock(processMutex_);
53 syncProcess_.tableProcess[process.tableName].process = process.tableStatus;
54 if (process.downLoadInfo.batchIndex != 0u) {
55 LOGD("[ProcessNotifier] update download process index: %" PRIu32, process.downLoadInfo.batchIndex);
56 syncProcess_.tableProcess[process.tableName].downLoadInfo.batchIndex = process.downLoadInfo.batchIndex;
57 syncProcess_.tableProcess[process.tableName].downLoadInfo.total = process.downLoadInfo.total;
58 syncProcess_.tableProcess[process.tableName].downLoadInfo.failCount = process.downLoadInfo.failCount;
59 syncProcess_.tableProcess[process.tableName].downLoadInfo.successCount = process.downLoadInfo.successCount;
60 }
61 if (process.upLoadInfo.batchIndex != 0u) {
62 LOGD("[ProcessNotifier] update upload process index: %" PRIu32, process.upLoadInfo.batchIndex);
63 syncProcess_.tableProcess[process.tableName].upLoadInfo.batchIndex = process.upLoadInfo.batchIndex;
64 syncProcess_.tableProcess[process.tableName].upLoadInfo.total = process.upLoadInfo.total;
65 syncProcess_.tableProcess[process.tableName].upLoadInfo.failCount = process.upLoadInfo.failCount;
66 syncProcess_.tableProcess[process.tableName].upLoadInfo.successCount = process.upLoadInfo.successCount;
67 }
68 }
69
NotifyProcess(const ICloudSyncer::CloudTaskInfo & taskInfo,const ICloudSyncer::InnerProcessInfo & process,bool notifyWhenError)70 void ProcessNotifier::NotifyProcess(const ICloudSyncer::CloudTaskInfo &taskInfo,
71 const ICloudSyncer::InnerProcessInfo &process, bool notifyWhenError)
72 {
73 UpdateProcess(process);
74 std::map<std::string, SyncProcess> currentProcess;
75 {
76 std::lock_guard<std::mutex> autoLock(processMutex_);
77 if (!notifyWhenError && taskInfo.errCode != E_OK) {
78 LOGD("[ProcessNotifier] task has error, do not notify now");
79 return;
80 }
81 syncProcess_.errCode = TransferDBErrno(taskInfo.errCode);
82 syncProcess_.process = taskInfo.status;
83 for (const auto &device : devices_) {
84 // make sure only one device
85 currentProcess[device] = syncProcess_;
86 }
87 }
88 SyncProcessCallback callback = taskInfo.callback;
89 if (!callback) {
90 LOGD("[ProcessNotifier] task hasn't callback");
91 return;
92 }
93 ICloudSyncer *syncer = syncer_;
94 if (syncer == nullptr) {
95 return; // should not happen
96 }
97 RefObject::IncObjRef(syncer);
98 auto id = syncer->GetIdentify();
99 syncer->IncSyncCallbackTaskCount();
100 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(id, [callback, currentProcess, syncer]() {
101 LOGD("[ProcessNotifier] begin notify process");
102 callback(currentProcess);
103 syncer->DecSyncCallbackTaskCount();
104 RefObject::DecObjRef(syncer);
105 LOGD("[ProcessNotifier] notify process finish");
106 });
107 if (errCode != E_OK) {
108 LOGW("[ProcessNotifier] schedule notify process failed %d", errCode);
109 }
110 }
111
GetDevices() const112 std::vector<std::string> ProcessNotifier::GetDevices() const
113 {
114 return devices_;
115 }
116 }