• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17 
18 #include "account/account_delegate.h"
19 #include "cloud/cloud_server.h"
20 #include "cloud/schema_meta.h"
21 #include "cloud/sync_event.h"
22 #include "device_manager_adapter.h"
23 #include "eventcenter/event_center.h"
24 #include "log_print.h"
25 #include "metadata/meta_data_manager.h"
26 #include "store/auto_cache.h"
27 #include "utils/anonymous.h"
28 namespace OHOS::CloudData {
29 using namespace DistributedData;
30 using namespace DistributedKv;
31 using Account = OHOS::DistributedKv::AccountDelegate;
32 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
33 using Defer = EventCenter::Defer;
34 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo(int32_t user,const std::string & bundleName,const Store & store,const Tables & tables)35 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Store &store, const Tables &tables)
36     : user_(user), bundleName_(bundleName)
37 {
38     if (!store.empty()) {
39         tables_[store] = tables;
40     }
41     syncId_ = SyncManager::GenerateId(user);
42 }
43 
SyncInfo(int32_t user,const std::string & bundleName,const Stores & stores)44 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
45     : user_(user), bundleName_(bundleName)
46 {
47     for (auto &store : stores) {
48         tables_[store] = {};
49     }
50     syncId_ = SyncManager::GenerateId(user);
51 }
52 
SyncInfo(int32_t user,const std::string & bundleName,const MutliStoreTables & tables)53 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
54     : user_(user), bundleName_(bundleName), tables_(tables)
55 {
56     tables_ = tables;
57     syncId_ = SyncManager::GenerateId(user);
58 }
59 
SetMode(int32_t mode)60 void SyncManager::SyncInfo::SetMode(int32_t mode)
61 {
62     mode_ = mode;
63 }
64 
SetWait(int32_t wait)65 void SyncManager::SyncInfo::SetWait(int32_t wait)
66 {
67     wait_ = wait;
68 }
69 
SetAsyncDetail(GenAsync asyncDetail)70 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
71 {
72     async_ = std::move(asyncDetail);
73 }
74 
SetQuery(std::shared_ptr<GenQuery> query)75 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
76 {
77     query_ = query;
78 }
79 
SetError(int32_t code) const80 void SyncManager::SyncInfo::SetError(int32_t code) const
81 {
82     if (async_) {
83         GenDetails details;
84         auto &detail = details[id_];
85         detail.progress = SYNC_FINISH;
86         detail.code = code;
87         async_(std::move(details));
88     }
89 }
90 
GenerateQuery(const std::string & store,const Tables & tables)91 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
92 {
93     if (query_ != nullptr) {
94         return query_;
95     }
96     class SyncQuery final : public GenQuery {
97     public:
98         explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables)
99         {
100         }
101 
102         bool IsEqual(uint64_t tid) override
103         {
104             return false;
105         }
106 
107         std::vector<std::string> GetTables() override
108         {
109             return tables_;
110         }
111 
112     private:
113         std::vector<std::string> tables_;
114     };
115     auto it = tables_.find(store);
116     return std::make_shared<SyncQuery>(it == tables_.end() || it->second.empty() ? tables : it->second);
117 }
118 
Contains(const std::string & storeName)119 bool SyncManager::SyncInfo::Contains(const std::string& storeName)
120 {
121     return tables_.empty() || tables_.find(storeName) != tables_.end();
122 }
123 
SyncManager()124 SyncManager::SyncManager()
125 {
126     EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
127 }
128 
~SyncManager()129 SyncManager::~SyncManager()
130 {
131     if (executor_ != nullptr) {
132         actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
133             executor_->Remove(taskId);
134             return false;
135         });
136         executor_ = nullptr;
137     }
138 }
139 
Bind(std::shared_ptr<ExecutorPool> executor)140 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
141 {
142     executor_ = executor;
143     return E_OK;
144 }
145 
DoCloudSync(SyncInfo syncInfo)146 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
147 {
148     if (executor_ == nullptr) {
149         return E_NOT_INIT;
150     }
151     auto syncId = GenerateId(syncInfo.user_);
152     auto ref = GenSyncRef(syncId);
153     actives_.Compute(syncId, [this, &ref, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
154         taskId = executor_->Execute(GetSyncTask(0, true, ref, std::move(syncInfo)));
155         return true;
156     });
157     return E_OK;
158 }
159 
StopCloudSync(int32_t user)160 int32_t SyncManager::StopCloudSync(int32_t user)
161 {
162     if (executor_ == nullptr) {
163         return E_NOT_INIT;
164     }
165     actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
166         if (Compare(syncId, user) == 0) {
167             executor_->Remove(taskId);
168         }
169         return false;
170     });
171     return E_OK;
172 }
173 
IsValid(SyncInfo & info,CloudInfo & cloud)174 bool SyncManager::IsValid(SyncInfo &info, CloudInfo &cloud)
175 {
176     if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true) ||
177         (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_)) {
178         info.SetError(E_CLOUD_DISABLED);
179         ZLOGE("cloudInfo invalid:%{public}d, <syncId:%{public}s, metaId:%{public}s>", cloud.IsValid(),
180               Anonymous::Change(info.id_).c_str(), Anonymous::Change(cloud.id).c_str());
181         return false;
182     }
183     if (!cloud.enableCloud || (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
184         info.SetError(E_CLOUD_DISABLED);
185         ZLOGD("enable:%{public}d, bundleName:%{public}s", cloud.enableCloud, info.bundleName_.c_str());
186         return false;
187     }
188     if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
189         info.SetError(E_NETWORK_ERROR);
190         ZLOGD("network unavailable");
191         return false;
192     }
193     if (!Account::GetInstance()->IsVerified(info.user_)) {
194         info.SetError(E_USER_UNLOCK);
195         ZLOGD("user unverified");
196         return false;
197     }
198     return true;
199 }
200 
GetSyncTask(int32_t times,bool retry,RefCount ref,SyncInfo && syncInfo)201 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
202 {
203     times++;
204     return [this, times, retry, keep = std::move(ref), info = std::move(syncInfo)]() mutable {
205         activeInfos_.Erase(info.syncId_);
206         CloudInfo cloud;
207         cloud.user = info.user_;
208         if (!IsValid(info, cloud)) {
209             return;
210         }
211         std::vector<SchemaMeta> schemas;
212         auto key = cloud.GetSchemaPrefix(info.bundleName_);
213         auto retryer = GetRetryer(times, info);
214         if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true) || schemas.empty()) {
215             UpdateSchema(info);
216             retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT);
217             return;
218         }
219         Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
220         for (auto &schema : schemas) {
221             if (!cloud.IsOn(schema.bundleName)) {
222                 continue;
223             }
224             for (const auto &database : schema.databases) {
225                 if (!info.Contains(database.name)) {
226                     continue;
227                 }
228                 CloudEvent::StoreInfo storeInfo = { 0, schema.bundleName, database.name,
229                     cloud.apps[schema.bundleName].instanceId, cloud.user };
230                 auto query = info.GenerateQuery(database.name, database.GetTableNames());
231                 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
232                     SyncEvent::EventInfo { info.mode_, info.wait_, retry, std::move(query), info.async_ });
233                 EventCenter::GetInstance().PostEvent(std::move(evt));
234             }
235         }
236     };
237 }
238 
GetSyncHandler(Retryer retryer)239 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
240 {
241     return [retryer](const Event &event) {
242         auto &evt = static_cast<const SyncEvent &>(event);
243         auto &storeInfo = evt.GetStoreInfo();
244         StoreMetaData meta;
245         meta.storeId = storeInfo.storeName;
246         meta.bundleName = storeInfo.bundleName;
247         meta.user = std::to_string(storeInfo.user);
248         meta.instanceId = storeInfo.instanceId;
249         meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
250         if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
251             ZLOGE("failed, no store meta bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
252                 meta.GetStoreAlias().c_str());
253             return;
254         }
255         auto store = GetStore(meta, storeInfo.user);
256         if (store == nullptr) {
257             ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
258             return;
259         }
260 
261         ZLOGD("database:<%{public}d:%{public}s:%{public}s> sync start", storeInfo.user, storeInfo.bundleName.c_str(),
262             meta.GetStoreAlias().c_str());
263         auto status = store->Sync({ SyncInfo::DEFAULT_ID }, evt.GetMode(), *(evt.GetQuery()), evt.AutoRetry()
264             ? [retryer](const GenDetails &details) {
265                 if (details.empty()) {
266                     ZLOGE("retry, details empty");
267                     return;
268                 }
269                 int32_t code = details.begin()->second.code;
270                 retryer(code == E_LOCKED_BY_OTHERS ? LOCKED_INTERVAL : RETRY_INTERVAL, code);
271             }
272             : evt.GetAsyncDetail(), evt.GetWait());
273         GenAsync async = evt.GetAsyncDetail();
274         if (status != E_OK && async) {
275             GenDetails details;
276             auto &detail = details[SyncInfo::DEFAULT_ID];
277             detail.progress = SYNC_FINISH;
278             detail.code = status;
279             async(std::move(details));
280         }
281     };
282 }
283 
GetClientChangeHandler()284 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
285 {
286     return [this](const Event &event) {
287         auto &evt = static_cast<const SyncEvent &>(event);
288         auto store = evt.GetStoreInfo();
289         SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
290         syncInfo.SetMode(evt.GetMode());
291         syncInfo.SetWait(evt.GetWait());
292         syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
293         syncInfo.SetQuery(evt.GetQuery());
294         auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
295         auto task = GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo));
296         task();
297     };
298 }
299 
GetRetryer(int32_t times,const SyncInfo & syncInfo)300 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo)
301 {
302     if (times >= RETRY_TIMES) {
303         return  [info = SyncInfo(syncInfo)](Duration, int32_t code) mutable {
304             if (code == E_OK) {
305                 return true;
306             }
307             info.SetError(code);
308             return true;
309         };
310     }
311     return [this, times, info = SyncInfo(syncInfo)](Duration interval, int32_t code) mutable {
312         if (code == E_OK) {
313             return true;
314         }
315 
316         activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
317             auto syncId = GenerateId(info.user_);
318             auto ref = GenSyncRef(syncId);
319             actives_.Compute(syncId, [this, times, interval, &ref, &info](const uint64_t &key, TaskId &value) mutable {
320                 value = executor_->Schedule(interval, GetSyncTask(times, true, ref, std::move(info)));
321                 return true;
322             });
323             return syncId;
324         });
325         return true;
326     };
327 }
328 
GenerateId(int32_t user)329 uint64_t SyncManager::GenerateId(int32_t user)
330 {
331     uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
332     return (syncId << MV_BIT) | (++genId_);
333 }
334 
GenSyncRef(uint64_t syncId)335 RefCount SyncManager::GenSyncRef(uint64_t syncId)
336 {
337     return RefCount([syncId, this]() {
338         actives_.Erase(syncId);
339     });
340 }
341 
Compare(uint64_t syncId,int32_t user)342 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
343 {
344     uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
345     return (syncId & USER_MARK) == (inner << MV_BIT);
346 }
347 
UpdateSchema(const SyncManager::SyncInfo & syncInfo)348 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
349 {
350     CloudEvent::StoreInfo storeInfo;
351     storeInfo.user = syncInfo.user_;
352     storeInfo.bundleName = syncInfo.bundleName_;
353     EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
354 }
355 
GetStore(const StoreMetaData & meta,int32_t user,bool mustBind)356 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
357 {
358     if (!Account::GetInstance()->IsVerified(user)) {
359         ZLOGW("user:%{public}d is locked!", user);
360         return nullptr;
361     }
362     auto instance = CloudServer::GetInstance();
363     if (instance == nullptr) {
364         ZLOGD("not support cloud sync");
365         return nullptr;
366     }
367 
368     auto store = AutoCache::GetInstance().GetStore(meta, {});
369     if (store == nullptr) {
370         ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
371         return nullptr;
372     }
373 
374     if (!store->IsBound()) {
375         CloudInfo info;
376         info.user = user;
377         SchemaMeta schemaMeta;
378         std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
379         if (!MetaDataManager::GetInstance().LoadMeta(std::move(schemaKey), schemaMeta, true)) {
380             ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
381                 meta.GetStoreAlias().c_str());
382             return nullptr;
383         }
384         auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
385         auto cloudDB = instance->ConnectCloudDB(meta.tokenId, dbMeta);
386         auto assetLoader = instance->ConnectAssetLoader(meta.tokenId, dbMeta);
387         if (mustBind && (cloudDB == nullptr || assetLoader == nullptr)) {
388             ZLOGE("failed, no cloud DB <0x%{public}x %{public}s<->%{public}s>", meta.tokenId,
389                 Anonymous::Change(dbMeta.name).c_str(), Anonymous::Change(dbMeta.alias).c_str());
390             return nullptr;
391         }
392 
393         if (cloudDB != nullptr || assetLoader != nullptr) {
394             store->Bind(dbMeta, { std::move(cloudDB), std::move(assetLoader) });
395         }
396     }
397     return store;
398 }
399 } // namespace OHOS::CloudData