1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17
18 #include "account/account_delegate.h"
19 #include "cloud/cloud_server.h"
20 #include "cloud/schema_meta.h"
21 #include "cloud/sync_event.h"
22 #include "device_manager_adapter.h"
23 #include "eventcenter/event_center.h"
24 #include "log_print.h"
25 #include "metadata/meta_data_manager.h"
26 #include "store/auto_cache.h"
27 #include "utils/anonymous.h"
28 namespace OHOS::CloudData {
29 using namespace DistributedData;
30 using namespace DistributedKv;
31 using Account = OHOS::DistributedKv::AccountDelegate;
32 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
33 using Defer = EventCenter::Defer;
34 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo(int32_t user,const std::string & bundleName,const Store & store,const Tables & tables)35 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Store &store, const Tables &tables)
36 : user_(user), bundleName_(bundleName)
37 {
38 if (!store.empty()) {
39 tables_[store] = tables;
40 }
41 syncId_ = SyncManager::GenerateId(user);
42 }
43
SyncInfo(int32_t user,const std::string & bundleName,const Stores & stores)44 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
45 : user_(user), bundleName_(bundleName)
46 {
47 for (auto &store : stores) {
48 tables_[store] = {};
49 }
50 syncId_ = SyncManager::GenerateId(user);
51 }
52
SyncInfo(int32_t user,const std::string & bundleName,const MutliStoreTables & tables)53 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
54 : user_(user), bundleName_(bundleName), tables_(tables)
55 {
56 tables_ = tables;
57 syncId_ = SyncManager::GenerateId(user);
58 }
59
SetMode(int32_t mode)60 void SyncManager::SyncInfo::SetMode(int32_t mode)
61 {
62 mode_ = mode;
63 }
64
SetWait(int32_t wait)65 void SyncManager::SyncInfo::SetWait(int32_t wait)
66 {
67 wait_ = wait;
68 }
69
SetAsyncDetail(GenAsync asyncDetail)70 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
71 {
72 async_ = std::move(asyncDetail);
73 }
74
SetQuery(std::shared_ptr<GenQuery> query)75 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
76 {
77 query_ = query;
78 }
79
SetError(int32_t code) const80 void SyncManager::SyncInfo::SetError(int32_t code) const
81 {
82 if (async_) {
83 GenDetails details;
84 auto &detail = details[id_];
85 detail.progress = SYNC_FINISH;
86 detail.code = code;
87 async_(std::move(details));
88 }
89 }
90
GenerateQuery(const std::string & store,const Tables & tables)91 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
92 {
93 if (query_ != nullptr) {
94 return query_;
95 }
96 class SyncQuery final : public GenQuery {
97 public:
98 explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables)
99 {
100 }
101
102 bool IsEqual(uint64_t tid) override
103 {
104 return false;
105 }
106
107 std::vector<std::string> GetTables() override
108 {
109 return tables_;
110 }
111
112 private:
113 std::vector<std::string> tables_;
114 };
115 auto it = tables_.find(store);
116 return std::make_shared<SyncQuery>(it == tables_.end() || it->second.empty() ? tables : it->second);
117 }
118
Contains(const std::string & storeName)119 bool SyncManager::SyncInfo::Contains(const std::string& storeName)
120 {
121 return tables_.empty() || tables_.find(storeName) != tables_.end();
122 }
123
SyncManager()124 SyncManager::SyncManager()
125 {
126 EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
127 }
128
~SyncManager()129 SyncManager::~SyncManager()
130 {
131 if (executor_ != nullptr) {
132 actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
133 executor_->Remove(taskId);
134 return false;
135 });
136 executor_ = nullptr;
137 }
138 }
139
Bind(std::shared_ptr<ExecutorPool> executor)140 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
141 {
142 executor_ = executor;
143 return E_OK;
144 }
145
DoCloudSync(SyncInfo syncInfo)146 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
147 {
148 if (executor_ == nullptr) {
149 return E_NOT_INIT;
150 }
151 auto syncId = GenerateId(syncInfo.user_);
152 auto ref = GenSyncRef(syncId);
153 actives_.Compute(syncId, [this, &ref, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
154 taskId = executor_->Execute(GetSyncTask(0, true, ref, std::move(syncInfo)));
155 return true;
156 });
157 return E_OK;
158 }
159
StopCloudSync(int32_t user)160 int32_t SyncManager::StopCloudSync(int32_t user)
161 {
162 if (executor_ == nullptr) {
163 return E_NOT_INIT;
164 }
165 actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
166 if (Compare(syncId, user) == 0) {
167 executor_->Remove(taskId);
168 }
169 return false;
170 });
171 return E_OK;
172 }
173
IsValid(SyncInfo & info,CloudInfo & cloud)174 bool SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud)
175 {
176 if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) ||
177 (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) {
178 info.SetError(E_CLOUD_DISABLED);
179 ZLOGE("cloudInfo invalid:%{public}d, <syncId:%{public}s, metaId:%{public}s>", cloud.IsValid(),
180 Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str());
181 return false;
182 }
183 if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
184 info.SetError(E_CLOUD_DISABLED);
185 ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str());
186 return false;
187 }
188 if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
189 info.SetError(E_NETWORK_ERROR);
190 ZLOGD("network unavailable");
191 return false;
192 }
193 if (!Account::GetInstance()->IsVerified(info.user_)) {
194 info.SetError(E_USER_UNLOCK);
195 ZLOGD("user unverified");
196 return false;
197 }
198 return true;
199 }
200
GetSyncTask(int32_t times,bool retry,RefCount ref,SyncInfo && syncInfo)201 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
202 {
203 times++;
204 return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable {
205 activeInfos_.Erase(info.syncId_);
206 CloudInfo cloud;
207 cloud.user = info.user_;
208 if (!IsValid(info, cloud)) {
209 return;
210 }
211 std::vector<SchemaMeta> schemas;
212 auto key = cloud.GetSchemaPrefix(info.bundleName_);
213 auto retryer = GetRetryer(times, info);
214 if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true) || schemas.empty()) {
215 UpdateSchema(info);
216 retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT);
217 return;
218 }
219 Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
220 for (auto &schema : schemas) {
221 if (!cloud.IsOn(schema.bundleName)) {
222 continue;
223 }
224 for (const auto &database : schema.databases) {
225 if (!info.Contains(database.name)) {
226 continue;
227 }
228 CloudEvent::StoreInfo storeInfo = { 0, schema.bundleName, database.name,
229 cloud.apps[schema.bundleName].instanceId, cloud.user };
230 auto query = info.GenerateQuery(database.name, database.GetTableNames());
231 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
232 SyncEvent::EventInfo { info.mode_, info.wait_, retry, std::move(query), info.async_ });
233 EventCenter::GetInstance().PostEvent(std::move(evt));
234 }
235 }
236 };
237 }
238
GetSyncHandler(Retryer retryer)239 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
240 {
241 return [retryer](const Event &event) {
242 auto &evt = static_cast<const SyncEvent &>(event);
243 auto &storeInfo = evt.GetStoreInfo();
244 StoreMetaData meta;
245 meta.storeId = storeInfo.storeName;
246 meta.bundleName = storeInfo.bundleName;
247 meta.user = std::to_string(storeInfo.user);
248 meta.instanceId = storeInfo.instanceId;
249 meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
250 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
251 ZLOGE("failed, no store meta bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
252 meta.GetStoreAlias().c_str());
253 return;
254 }
255 auto store = GetStore(meta, storeInfo.user);
256 if (store == nullptr) {
257 ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
258 return;
259 }
260
261 ZLOGD("database:<%{public}d:%{public}s:%{public}s> sync start", storeInfo.user, storeInfo.bundleName.c_str(),
262 meta.GetStoreAlias().c_str());
263 auto status = store->Sync({ SyncInfo::DEFAULT_ID }, evt.GetMode(), *(evt.GetQuery()), evt.AutoRetry()
264 ? [retryer](const GenDetails &details) {
265 if (details.empty()) {
266 ZLOGE("retry, details empty");
267 return;
268 }
269 int32_t code = details.begin()->second.code;
270 retryer(code == E_LOCKED_BY_OTHERS ? LOCKED_INTERVAL : RETRY_INTERVAL, code);
271 }
272 : evt.GetAsyncDetail(), evt.GetWait());
273 GenAsync async = evt.GetAsyncDetail();
274 if (status != E_OK && async) {
275 GenDetails details;
276 auto &detail = details[SyncInfo::DEFAULT_ID];
277 detail.progress = SYNC_FINISH;
278 detail.code = status;
279 async(std::move(details));
280 }
281 };
282 }
283
GetClientChangeHandler()284 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
285 {
286 return [this](const Event &event) {
287 auto &evt = static_cast<const SyncEvent &>(event);
288 auto store = evt.GetStoreInfo();
289 SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
290 syncInfo.SetMode(evt.GetMode());
291 syncInfo.SetWait(evt.GetWait());
292 syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
293 syncInfo.SetQuery(evt.GetQuery());
294 auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
295 auto task = GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo));
296 task();
297 };
298 }
299
GetRetryer(int32_t times,const SyncInfo & syncInfo)300 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo)
301 {
302 if (times >= RETRY_TIMES) {
303 return [info = SyncInfo(syncInfo)](Duration, int32_t code) mutable {
304 if (code == E_OK) {
305 return true;
306 }
307 info.SetError(code);
308 return true;
309 };
310 }
311 return [this, times, info = SyncInfo(syncInfo)](Duration interval, int32_t code) mutable {
312 if (code == E_OK) {
313 return true;
314 }
315
316 activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
317 auto syncId = GenerateId(info.user_);
318 auto ref = GenSyncRef(syncId);
319 actives_.Compute(syncId, [this, times, interval, &ref, &info](const uint64_t &key, TaskId &value) mutable {
320 value = executor_->Schedule(interval, GetSyncTask(times, true, ref, std::move(info)));
321 return true;
322 });
323 return syncId;
324 });
325 return true;
326 };
327 }
328
GenerateId(int32_t user)329 uint64_t SyncManager::GenerateId(int32_t user)
330 {
331 uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
332 return (syncId << MV_BIT) | (++genId_);
333 }
334
GenSyncRef(uint64_t syncId)335 RefCount SyncManager::GenSyncRef(uint64_t syncId)
336 {
337 return RefCount([syncId, this]() {
338 actives_.Erase(syncId);
339 });
340 }
341
Compare(uint64_t syncId,int32_t user)342 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
343 {
344 uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
345 return (syncId & USER_MARK) == (inner << MV_BIT);
346 }
347
UpdateSchema(const SyncManager::SyncInfo & syncInfo)348 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
349 {
350 CloudEvent::StoreInfo storeInfo;
351 storeInfo.user = syncInfo.user_;
352 storeInfo.bundleName = syncInfo.bundleName_;
353 EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
354 }
355
GetStore(const StoreMetaData & meta,int32_t user,bool mustBind)356 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
357 {
358 if (!Account::GetInstance()->IsVerified(user)) {
359 ZLOGW("user:%{public}d is locked!", user);
360 return nullptr;
361 }
362 auto instance = CloudServer::GetInstance();
363 if (instance == nullptr) {
364 ZLOGD("not support cloud sync");
365 return nullptr;
366 }
367
368 auto store = AutoCache::GetInstance().GetStore(meta, {});
369 if (store == nullptr) {
370 ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
371 return nullptr;
372 }
373
374 if (!store->IsBound()) {
375 CloudInfo info;
376 info.user = user;
377 SchemaMeta schemaMeta;
378 std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
379 if (!MetaDataManager::GetInstance().LoadMeta(std::move(schemaKey), schemaMeta, true)) {
380 ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
381 meta.GetStoreAlias().c_str());
382 return nullptr;
383 }
384 auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
385 auto cloudDB = instance->ConnectCloudDB(meta.tokenId, dbMeta);
386 auto assetLoader = instance->ConnectAssetLoader(meta.tokenId, dbMeta);
387 if (mustBind && (cloudDB == nullptr || assetLoader == nullptr)) {
388 ZLOGE("failed, no cloud DB <0x%{public}x %{public}s<->%{public}s>", meta.tokenId,
389 Anonymous::Change(dbMeta.name).c_str(), Anonymous::Change(dbMeta.alias).c_str());
390 return nullptr;
391 }
392
393 if (cloudDB != nullptr || assetLoader != nullptr) {
394 store->Bind(dbMeta, { std::move(cloudDB), std::move(assetLoader) });
395 }
396 }
397 return store;
398 }
399 } // namespace OHOS::CloudData