• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "process_notifier.h"
16 
17 #include "db_errno.h"
18 #include "kv_store_errno.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
ProcessNotifier(ICloudSyncer * syncer)21 ProcessNotifier::ProcessNotifier(ICloudSyncer *syncer)
22     : syncer_(syncer)
23 {
24     RefObject::IncObjRef(syncer_);
25 }
26 
~ProcessNotifier()27 ProcessNotifier::~ProcessNotifier()
28 {
29     RefObject::DecObjRef(syncer_);
30 }
31 
Init(const std::vector<std::string> & tableName,const std::vector<std::string> & devices,const std::vector<std::string> & users)32 void ProcessNotifier::Init(const std::vector<std::string> &tableName,
33     const std::vector<std::string> &devices, const std::vector<std::string> &users)
34 {
35     std::lock_guard<std::mutex> autoLock(processMutex_);
36     InitSyncProcess(tableName, syncProcess_);
37     for (const auto &user : users) {
38         SyncProcess syncProcess;
39         InitSyncProcess(tableName, syncProcess);
40         multiSyncProcess_[user] = syncProcess;
41     }
42     devices_ = devices;
43 }
44 
InitSyncProcess(const std::vector<std::string> & tableName,SyncProcess & syncProcess)45 void ProcessNotifier::InitSyncProcess(const std::vector<std::string> &tableName, SyncProcess &syncProcess)
46 {
47     syncProcess.errCode = OK;
48     syncProcess.process = ProcessStatus::PROCESSING;
49     for (const auto &table: tableName) {
50         TableProcessInfo tableInfo;
51         tableInfo.process = ProcessStatus::PREPARED;
52         syncProcess.tableProcess[table] = tableInfo;
53     }
54 }
55 
UpdateUploadRetryInfo(const ICloudSyncer::InnerProcessInfo & process)56 void ProcessNotifier::UpdateUploadRetryInfo(const ICloudSyncer::InnerProcessInfo &process)
57 {
58     if (process.tableName.empty()) {
59         return;
60     }
61     std::lock_guard<std::mutex> autoLock(processMutex_);
62     processRetryInfo_[process.tableName] = process.retryInfo.uploadBatchRetryCount;
63 }
64 
UpdateProcess(const ICloudSyncer::InnerProcessInfo & process)65 void ProcessNotifier::UpdateProcess(const ICloudSyncer::InnerProcessInfo &process)
66 {
67     if (process.tableName.empty()) {
68         return;
69     }
70     std::lock_guard<std::mutex> autoLock(processMutex_);
71     auto &syncProcess = user_.empty() ? syncProcess_ : multiSyncProcess_[user_];
72     syncProcess.tableProcess[process.tableName].process = process.tableStatus;
73     if (process.downLoadInfo.batchIndex != 0u) {
74         LOGD("[ProcessNotifier] update download process index: %" PRIu32, process.downLoadInfo.batchIndex);
75         syncProcess.tableProcess[process.tableName].downLoadInfo = process.downLoadInfo;
76     }
77     if (process.upLoadInfo.batchIndex != 0u) {
78         LOGD("[ProcessNotifier] update upload process index: %" PRIu32, process.upLoadInfo.batchIndex);
79         syncProcess.tableProcess[process.tableName].upLoadInfo = process.upLoadInfo;
80     }
81 }
82 
NotifyProcess(const ICloudSyncer::CloudTaskInfo & taskInfo,const ICloudSyncer::InnerProcessInfo & process,bool notifyWhenError)83 void ProcessNotifier::NotifyProcess(const ICloudSyncer::CloudTaskInfo &taskInfo,
84     const ICloudSyncer::InnerProcessInfo &process, bool notifyWhenError)
85 {
86     UpdateProcess(process);
87     std::map<std::string, SyncProcess> currentProcess;
88     {
89         std::lock_guard<std::mutex> autoLock(processMutex_);
90         if (!notifyWhenError && taskInfo.errCode != E_OK) {
91             LOGD("[ProcessNotifier] task has error, do not notify now");
92             return;
93         }
94         syncProcess_.errCode = TransferDBErrno(taskInfo.errCode, true);
95         syncProcess_.process = taskInfo.status;
96         multiSyncProcess_[user_].errCode = TransferDBErrno(taskInfo.errCode, true);
97         multiSyncProcess_[user_].process = taskInfo.status;
98         UpdateUploadInfoIfNeeded(process);
99         if (user_.empty()) {
100             for (const auto &device : devices_) {
101                 // make sure only one device
102                 currentProcess[device] = syncProcess_;
103             }
104         } else {
105             currentProcess = multiSyncProcess_;
106         }
107     }
108     SyncProcessCallback callback = taskInfo.callback;
109     if (!callback) {
110         LOGD("[ProcessNotifier] task hasn't callback");
111         return;
112     }
113     ICloudSyncer *syncer = syncer_;
114     if (syncer == nullptr) {
115         LOGW("[ProcessNotifier] cancel notify because syncer is nullptr");
116         return; // should not happen
117     }
118     RefObject::IncObjRef(syncer);
119     auto id = syncer->GetIdentify();
120     int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(id, [callback, currentProcess, syncer]() {
121         LOGD("[ProcessNotifier] begin notify process");
122         if (syncer->IsClosed()) {
123             LOGI("[ProcessNotifier] db has closed, process return");
124             RefObject::DecObjRef(syncer);
125             return;
126         }
127         callback(currentProcess);
128         RefObject::DecObjRef(syncer);
129         LOGD("[ProcessNotifier] notify process finish");
130     });
131     if (errCode != E_OK) {
132         LOGW("[ProcessNotifier] schedule notify process failed %d", errCode);
133     }
134 }
135 
GetDevices() const136 std::vector<std::string> ProcessNotifier::GetDevices() const
137 {
138     return devices_;
139 }
140 
GetUploadBatchIndex(const std::string & tableName) const141 uint32_t ProcessNotifier::GetUploadBatchIndex(const std::string &tableName) const
142 {
143     std::lock_guard<std::mutex> autoLock(processMutex_);
144     auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
145     if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
146         return 0u;
147     }
148     return syncProcess.tableProcess.at(tableName).upLoadInfo.batchIndex;
149 }
150 
ResetUploadBatchIndex(const std::string & tableName)151 void ProcessNotifier::ResetUploadBatchIndex(const std::string &tableName)
152 {
153     if (tableName.empty()) {
154         return;
155     }
156     std::lock_guard<std::mutex> autoLock(processMutex_);
157     auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
158     if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
159         LOGW("[ProcessNotifier] The specified table was not found when reset UploadBatchIndex");
160         return;
161     }
162     if (syncProcess.tableProcess[tableName].upLoadInfo.total == 0) {
163         syncProcess.tableProcess[tableName].upLoadInfo.batchIndex = 0;
164     }
165 }
166 
GetLastUploadInfo(const std::string & tableName,Info & lastUploadInfo,ICloudSyncer::UploadRetryInfo & retryInfo) const167 void ProcessNotifier::GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo,
168     ICloudSyncer::UploadRetryInfo &retryInfo) const
169 {
170     Info lastInfo;
171     std::lock_guard<std::mutex> autoLock(processMutex_);
172     auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
173     if (processRetryInfo_.find(tableName) != processRetryInfo_.end()) {
174         retryInfo.uploadBatchRetryCount = processRetryInfo_.at(tableName);
175     }
176     if (syncProcess.tableProcess.find(tableName) != syncProcess_.tableProcess.end()) {
177         lastInfo = syncProcess.tableProcess.at(tableName).upLoadInfo;
178     }
179     lastUploadInfo = lastInfo;
180 }
181 
GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo & process)182 void ProcessNotifier::GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo &process)
183 {
184     if (process.tableName.empty()) {
185         return;
186     }
187     std::lock_guard<std::mutex> autoLock(processMutex_);
188     SyncProcess syncProcess;
189     if (user_.empty()) {
190         syncProcess = syncProcess_;
191     } else {
192         syncProcess = multiSyncProcess_[user_];
193     }
194 
195     if (syncProcess.tableProcess.find(process.tableName) != syncProcess.tableProcess.end()) {
196         process.downLoadInfo = syncProcess.tableProcess[process.tableName].downLoadInfo;
197     }
198 }
199 
SetUser(const std::string & user)200 void ProcessNotifier::SetUser(const std::string &user)
201 {
202     user_ = user;
203 }
204 
SetAllTableFinish()205 void ProcessNotifier::SetAllTableFinish()
206 {
207     std::lock_guard<std::mutex> autoLock(processMutex_);
208     for (auto &item : syncProcess_.tableProcess) {
209         item.second.process = ProcessStatus::FINISHED;
210     }
211     for (auto &syncProcess : multiSyncProcess_) {
212         for (auto &item : syncProcess.second.tableProcess) {
213             item.second.process = ProcessStatus::FINISHED;
214         }
215     }
216 }
217 
IsMultiUser() const218 bool ProcessNotifier::IsMultiUser() const
219 {
220     return !user_.empty() && multiSyncProcess_.find(user_) != multiSyncProcess_.end();
221 }
222 
GetCurrentTableProcess() const223 std::map<std::string, TableProcessInfo> ProcessNotifier::GetCurrentTableProcess() const
224 {
225     std::lock_guard<std::mutex> autoLock(processMutex_);
226     return syncProcess_.tableProcess;
227 }
228 
UpdateUploadInfoIfNeeded(const ICloudSyncer::InnerProcessInfo & process)229 void ProcessNotifier::UpdateUploadInfoIfNeeded(const ICloudSyncer::InnerProcessInfo &process)
230 {
231     if (process.tableName.empty()) {
232         return;
233     }
234     auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
235     auto tableProcess = syncProcess.tableProcess.find(process.tableName);
236     auto retryInfo = processRetryInfo_.find(process.tableName);
237     if (tableProcess != syncProcess.tableProcess.end() && retryInfo != processRetryInfo_.end()) {
238         uint32_t downloadOpCount = process.retryInfo.downloadBatchOpCount;
239         uint32_t uploadRetryCount = retryInfo->second;
240         tableProcess->second.upLoadInfo.successCount += std::min(uploadRetryCount, downloadOpCount);
241         processRetryInfo_.erase(retryInfo);
242     }
243 }
244 
UpdateAllTablesFinally()245 void ProcessNotifier::UpdateAllTablesFinally()
246 {
247     std::lock_guard<std::mutex> autoLock(processMutex_);
248     UpdateTableInfoFinally(syncProcess_.tableProcess);
249     for (auto &syncProcess : multiSyncProcess_) {
250         UpdateTableInfoFinally(syncProcess.second.tableProcess);
251     }
252 }
253 
UpdateTableInfoFinally(std::map<std::string,TableProcessInfo> & processInfo)254 void ProcessNotifier::UpdateTableInfoFinally(std::map<std::string, TableProcessInfo> &processInfo)
255 {
256     for (auto &item : processInfo) {
257         uint32_t uploadOpCount = item.second.upLoadInfo.successCount + item.second.upLoadInfo.failCount;
258         if (item.second.upLoadInfo.total > uploadOpCount) {
259             item.second.upLoadInfo.successCount = item.second.upLoadInfo.total - item.second.upLoadInfo.failCount;
260         } else {
261             item.second.upLoadInfo.total = uploadOpCount;
262         }
263 
264         uint32_t downloadOpCount = item.second.downLoadInfo.successCount + item.second.downLoadInfo.failCount;
265         if (item.second.downLoadInfo.total > downloadOpCount) {
266             item.second.downLoadInfo.successCount = item.second.downLoadInfo.total - item.second.downLoadInfo.failCount;
267         } else {
268             item.second.downLoadInfo.total = downloadOpCount;
269         }
270     }
271 }
272 }
273