1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "process_notifier.h"
16
17 #include "db_errno.h"
18 #include "kv_store_errno.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
ProcessNotifier(ICloudSyncer * syncer)21 ProcessNotifier::ProcessNotifier(ICloudSyncer *syncer)
22 : syncer_(syncer)
23 {
24 RefObject::IncObjRef(syncer_);
25 }
26
~ProcessNotifier()27 ProcessNotifier::~ProcessNotifier()
28 {
29 RefObject::DecObjRef(syncer_);
30 }
31
Init(const std::vector<std::string> & tableName,const std::vector<std::string> & devices,const std::vector<std::string> & users)32 void ProcessNotifier::Init(const std::vector<std::string> &tableName,
33 const std::vector<std::string> &devices, const std::vector<std::string> &users)
34 {
35 std::lock_guard<std::mutex> autoLock(processMutex_);
36 InitSyncProcess(tableName, syncProcess_);
37 for (const auto &user : users) {
38 SyncProcess syncProcess;
39 InitSyncProcess(tableName, syncProcess);
40 multiSyncProcess_[user] = syncProcess;
41 }
42 devices_ = devices;
43 }
44
InitSyncProcess(const std::vector<std::string> & tableName,SyncProcess & syncProcess)45 void ProcessNotifier::InitSyncProcess(const std::vector<std::string> &tableName, SyncProcess &syncProcess)
46 {
47 syncProcess.errCode = OK;
48 syncProcess.process = ProcessStatus::PROCESSING;
49 for (const auto &table: tableName) {
50 TableProcessInfo tableInfo;
51 tableInfo.process = ProcessStatus::PREPARED;
52 syncProcess.tableProcess[table] = tableInfo;
53 }
54 }
55
UpdateUploadRetryInfo(const ICloudSyncer::InnerProcessInfo & process)56 void ProcessNotifier::UpdateUploadRetryInfo(const ICloudSyncer::InnerProcessInfo &process)
57 {
58 if (process.tableName.empty()) {
59 return;
60 }
61 std::lock_guard<std::mutex> autoLock(processMutex_);
62 processRetryInfo_[process.tableName] = process.retryInfo.uploadBatchRetryCount;
63 }
64
UpdateProcess(const ICloudSyncer::InnerProcessInfo & process)65 void ProcessNotifier::UpdateProcess(const ICloudSyncer::InnerProcessInfo &process)
66 {
67 if (process.tableName.empty()) {
68 return;
69 }
70 std::lock_guard<std::mutex> autoLock(processMutex_);
71 auto &syncProcess = user_.empty() ? syncProcess_ : multiSyncProcess_[user_];
72 syncProcess.tableProcess[process.tableName].process = process.tableStatus;
73 if (process.downLoadInfo.batchIndex != 0u) {
74 LOGD("[ProcessNotifier] update download process index: %" PRIu32, process.downLoadInfo.batchIndex);
75 syncProcess.tableProcess[process.tableName].downLoadInfo = process.downLoadInfo;
76 }
77 if (process.upLoadInfo.batchIndex != 0u) {
78 LOGD("[ProcessNotifier] update upload process index: %" PRIu32, process.upLoadInfo.batchIndex);
79 syncProcess.tableProcess[process.tableName].upLoadInfo = process.upLoadInfo;
80 }
81 }
82
NotifyProcess(const ICloudSyncer::CloudTaskInfo & taskInfo,const ICloudSyncer::InnerProcessInfo & process,bool notifyWhenError)83 void ProcessNotifier::NotifyProcess(const ICloudSyncer::CloudTaskInfo &taskInfo,
84 const ICloudSyncer::InnerProcessInfo &process, bool notifyWhenError)
85 {
86 UpdateProcess(process);
87 std::map<std::string, SyncProcess> currentProcess;
88 {
89 std::lock_guard<std::mutex> autoLock(processMutex_);
90 if (!notifyWhenError && taskInfo.errCode != E_OK) {
91 LOGD("[ProcessNotifier] task has error, do not notify now");
92 return;
93 }
94 syncProcess_.errCode = TransferDBErrno(taskInfo.errCode, true);
95 syncProcess_.process = taskInfo.status;
96 multiSyncProcess_[user_].errCode = TransferDBErrno(taskInfo.errCode, true);
97 multiSyncProcess_[user_].process = taskInfo.status;
98 UpdateUploadInfoIfNeeded(process);
99 if (user_.empty()) {
100 for (const auto &device : devices_) {
101 // make sure only one device
102 currentProcess[device] = syncProcess_;
103 }
104 } else {
105 currentProcess = multiSyncProcess_;
106 }
107 }
108 SyncProcessCallback callback = taskInfo.callback;
109 if (!callback) {
110 LOGD("[ProcessNotifier] task hasn't callback");
111 return;
112 }
113 ICloudSyncer *syncer = syncer_;
114 if (syncer == nullptr) {
115 LOGW("[ProcessNotifier] cancel notify because syncer is nullptr");
116 return; // should not happen
117 }
118 RefObject::IncObjRef(syncer);
119 auto id = syncer->GetIdentify();
120 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(id, [callback, currentProcess, syncer]() {
121 LOGD("[ProcessNotifier] begin notify process");
122 if (syncer->IsClosed()) {
123 LOGI("[ProcessNotifier] db has closed, process return");
124 RefObject::DecObjRef(syncer);
125 return;
126 }
127 callback(currentProcess);
128 RefObject::DecObjRef(syncer);
129 LOGD("[ProcessNotifier] notify process finish");
130 });
131 if (errCode != E_OK) {
132 LOGW("[ProcessNotifier] schedule notify process failed %d", errCode);
133 }
134 }
135
GetDevices() const136 std::vector<std::string> ProcessNotifier::GetDevices() const
137 {
138 return devices_;
139 }
140
GetUploadBatchIndex(const std::string & tableName) const141 uint32_t ProcessNotifier::GetUploadBatchIndex(const std::string &tableName) const
142 {
143 std::lock_guard<std::mutex> autoLock(processMutex_);
144 auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
145 if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
146 return 0u;
147 }
148 return syncProcess.tableProcess.at(tableName).upLoadInfo.batchIndex;
149 }
150
ResetUploadBatchIndex(const std::string & tableName)151 void ProcessNotifier::ResetUploadBatchIndex(const std::string &tableName)
152 {
153 if (tableName.empty()) {
154 return;
155 }
156 std::lock_guard<std::mutex> autoLock(processMutex_);
157 auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
158 if (syncProcess.tableProcess.find(tableName) == syncProcess.tableProcess.end()) {
159 LOGW("[ProcessNotifier] The specified table was not found when reset UploadBatchIndex");
160 return;
161 }
162 if (syncProcess.tableProcess[tableName].upLoadInfo.total == 0) {
163 syncProcess.tableProcess[tableName].upLoadInfo.batchIndex = 0;
164 }
165 }
166
GetLastUploadInfo(const std::string & tableName,Info & lastUploadInfo,ICloudSyncer::UploadRetryInfo & retryInfo) const167 void ProcessNotifier::GetLastUploadInfo(const std::string &tableName, Info &lastUploadInfo,
168 ICloudSyncer::UploadRetryInfo &retryInfo) const
169 {
170 Info lastInfo;
171 std::lock_guard<std::mutex> autoLock(processMutex_);
172 auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
173 if (processRetryInfo_.find(tableName) != processRetryInfo_.end()) {
174 retryInfo.uploadBatchRetryCount = processRetryInfo_.at(tableName);
175 }
176 if (syncProcess.tableProcess.find(tableName) != syncProcess_.tableProcess.end()) {
177 lastInfo = syncProcess.tableProcess.at(tableName).upLoadInfo;
178 }
179 lastUploadInfo = lastInfo;
180 }
181
GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo & process)182 void ProcessNotifier::GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo &process)
183 {
184 if (process.tableName.empty()) {
185 return;
186 }
187 std::lock_guard<std::mutex> autoLock(processMutex_);
188 SyncProcess syncProcess;
189 if (user_.empty()) {
190 syncProcess = syncProcess_;
191 } else {
192 syncProcess = multiSyncProcess_[user_];
193 }
194
195 if (syncProcess.tableProcess.find(process.tableName) != syncProcess.tableProcess.end()) {
196 process.downLoadInfo = syncProcess.tableProcess[process.tableName].downLoadInfo;
197 }
198 }
199
SetUser(const std::string & user)200 void ProcessNotifier::SetUser(const std::string &user)
201 {
202 user_ = user;
203 }
204
SetAllTableFinish()205 void ProcessNotifier::SetAllTableFinish()
206 {
207 std::lock_guard<std::mutex> autoLock(processMutex_);
208 for (auto &item : syncProcess_.tableProcess) {
209 item.second.process = ProcessStatus::FINISHED;
210 }
211 for (auto &syncProcess : multiSyncProcess_) {
212 for (auto &item : syncProcess.second.tableProcess) {
213 item.second.process = ProcessStatus::FINISHED;
214 }
215 }
216 }
217
IsMultiUser() const218 bool ProcessNotifier::IsMultiUser() const
219 {
220 return !user_.empty() && multiSyncProcess_.find(user_) != multiSyncProcess_.end();
221 }
222
GetCurrentTableProcess() const223 std::map<std::string, TableProcessInfo> ProcessNotifier::GetCurrentTableProcess() const
224 {
225 std::lock_guard<std::mutex> autoLock(processMutex_);
226 return syncProcess_.tableProcess;
227 }
228
UpdateUploadInfoIfNeeded(const ICloudSyncer::InnerProcessInfo & process)229 void ProcessNotifier::UpdateUploadInfoIfNeeded(const ICloudSyncer::InnerProcessInfo &process)
230 {
231 if (process.tableName.empty()) {
232 return;
233 }
234 auto &syncProcess = IsMultiUser() ? multiSyncProcess_.at(user_) : syncProcess_;
235 auto tableProcess = syncProcess.tableProcess.find(process.tableName);
236 auto retryInfo = processRetryInfo_.find(process.tableName);
237 if (tableProcess != syncProcess.tableProcess.end() && retryInfo != processRetryInfo_.end()) {
238 uint32_t downloadOpCount = process.retryInfo.downloadBatchOpCount;
239 uint32_t uploadRetryCount = retryInfo->second;
240 tableProcess->second.upLoadInfo.successCount += std::min(uploadRetryCount, downloadOpCount);
241 processRetryInfo_.erase(retryInfo);
242 }
243 }
244
UpdateAllTablesFinally()245 void ProcessNotifier::UpdateAllTablesFinally()
246 {
247 std::lock_guard<std::mutex> autoLock(processMutex_);
248 UpdateTableInfoFinally(syncProcess_.tableProcess);
249 for (auto &syncProcess : multiSyncProcess_) {
250 UpdateTableInfoFinally(syncProcess.second.tableProcess);
251 }
252 }
253
UpdateTableInfoFinally(std::map<std::string,TableProcessInfo> & processInfo)254 void ProcessNotifier::UpdateTableInfoFinally(std::map<std::string, TableProcessInfo> &processInfo)
255 {
256 for (auto &item : processInfo) {
257 uint32_t uploadOpCount = item.second.upLoadInfo.successCount + item.second.upLoadInfo.failCount;
258 if (item.second.upLoadInfo.total > uploadOpCount) {
259 item.second.upLoadInfo.successCount = item.second.upLoadInfo.total - item.second.upLoadInfo.failCount;
260 } else {
261 item.second.upLoadInfo.total = uploadOpCount;
262 }
263
264 uint32_t downloadOpCount = item.second.downLoadInfo.successCount + item.second.downLoadInfo.failCount;
265 if (item.second.downLoadInfo.total > downloadOpCount) {
266 item.second.downLoadInfo.successCount = item.second.downLoadInfo.total - item.second.downLoadInfo.failCount;
267 } else {
268 item.second.downLoadInfo.total = downloadOpCount;
269 }
270 }
271 }
272 }
273