• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17 
18 #include "cloud/cloud_info.h"
19 #include "cloud/cloud_server.h"
20 #include "cloud/schema_meta.h"
21 #include "cloud/sync_event.h"
22 #include "device_manager_adapter.h"
23 #include "eventcenter/event_center.h"
24 #include "log_print.h"
25 #include "metadata/meta_data_manager.h"
26 #include "store/auto_cache.h"
27 #include "utils/anonymous.h"
28 namespace OHOS::CloudData {
29 using namespace DistributedData;
30 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
31 using Defer = EventCenter::Defer;
32 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo(int32_t user,const std::string & bundleName,const Store & store,const Tables & tables)33 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Store &store, const Tables &tables)
34     : user_(user), bundleName_(bundleName)
35 {
36     if (!store.empty()) {
37         tables_[store] = tables;
38     }
39     syncId_ = SyncManager::GenerateId(user);
40 }
41 
SyncInfo(int32_t user,const std::string & bundleName,const Stores & stores)42 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
43     : user_(user), bundleName_(bundleName)
44 {
45     for (auto &store : stores) {
46         tables_[store] = {};
47     }
48     syncId_ = SyncManager::GenerateId(user);
49 }
50 
SyncInfo(int32_t user,const std::string & bundleName,const MutliStoreTables & tables)51 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
52     : user_(user), bundleName_(bundleName), tables_(tables)
53 {
54     tables_ = tables;
55     syncId_ = SyncManager::GenerateId(user);
56 }
57 
SetMode(int32_t mode)58 void SyncManager::SyncInfo::SetMode(int32_t mode)
59 {
60     mode_ = mode;
61 }
62 
SetWait(int32_t wait)63 void SyncManager::SyncInfo::SetWait(int32_t wait)
64 {
65     wait_ = wait;
66 }
67 
SetAsyncDetail(GenAsync asyncDetail)68 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
69 {
70     async_ = std::move(asyncDetail);
71 }
72 
SetQuery(std::shared_ptr<GenQuery> query)73 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
74 {
75     query_ = query;
76 }
77 
SetError(int32_t code) const78 void SyncManager::SyncInfo::SetError(int32_t code) const
79 {
80     if (async_) {
81         GenDetails details;
82         auto &detail = details[id_];
83         detail.progress = SYNC_FINISH;
84         detail.code = code;
85         async_(std::move(details));
86     }
87 }
88 
GenerateQuery(const std::string & store,const Tables & tables)89 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
90 {
91     if (query_ != nullptr) {
92         return query_;
93     }
94     class SyncQuery final : public GenQuery {
95     public:
96         explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables)
97         {
98         }
99 
100         bool IsEqual(uint64_t tid) override
101         {
102             return false;
103         }
104 
105         std::vector<std::string> GetTables() override
106         {
107             return tables_;
108         }
109 
110     private:
111         std::vector<std::string> tables_;
112     };
113     auto &syncTables = tables_[store];
114     return std::make_shared<SyncQuery>(syncTables.empty() ? tables : syncTables);
115 }
116 
SyncManager()117 SyncManager::SyncManager()
118 {
119     EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
120 }
121 
~SyncManager()122 SyncManager::~SyncManager()
123 {
124     if (executor_ != nullptr) {
125         actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
126             executor_->Remove(taskId);
127             return false;
128         });
129         executor_ = nullptr;
130     }
131 }
132 
Bind(std::shared_ptr<ExecutorPool> executor)133 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
134 {
135     executor_ = executor;
136     return E_OK;
137 }
138 
DoCloudSync(SyncInfo syncInfo)139 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
140 {
141     if (executor_ == nullptr) {
142         return E_NOT_INIT;
143     }
144 
145     actives_.Compute(GenerateId(syncInfo.user_), [this, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
146         taskId = executor_->Execute(GetSyncTask(0, true, GenSyncRef(key), std::move(syncInfo)));
147         return true;
148     });
149 
150     return E_OK;
151 }
152 
StopCloudSync(int32_t user)153 int32_t SyncManager::StopCloudSync(int32_t user)
154 {
155     if (executor_ == nullptr) {
156         return E_NOT_INIT;
157     }
158     actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
159         if (Compare(syncId, user) == 0) {
160             executor_->Remove(taskId);
161         }
162         return false;
163     });
164     return E_OK;
165 }
166 
GetSyncTask(int32_t times,bool retry,RefCount ref,SyncInfo && syncInfo)167 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
168 {
169     times++;
170     return [this, times, retry, ref = std::move(ref), info = std::move(syncInfo)]() mutable {
171         activeInfos_.Erase(info.syncId_);
172         CloudInfo cloud;
173         cloud.user = info.user_;
174         if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true)) {
175             info.SetError(E_CLOUD_DISABLED);
176             ZLOGE("no cloud info for user:%{public}d", info.user_);
177             return;
178         }
179 
180         if (!cloud.enableCloud || (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_) ||
181             (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
182             info.SetError(E_CLOUD_DISABLED);
183             return;
184         }
185 
186         if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
187             info.SetError(E_NETWORK_ERROR);
188             return;
189         }
190 
191         std::vector<SchemaMeta> schemas;
192         auto key = cloud.GetSchemaPrefix(info.bundleName_);
193         auto retryer = GetRetryer(times, info);
194         if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true) || schemas.empty()) {
195             UpdateSchema(info);
196             retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT);
197             return;
198         }
199 
200         Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
201         for (auto &schema : schemas) {
202             if (!cloud.IsOn(schema.bundleName)) {
203                 continue;
204             }
205 
206             for (const auto &database : schema.databases) {
207                 CloudEvent::StoreInfo storeInfo;
208                 storeInfo.bundleName = schema.bundleName;
209                 storeInfo.user = cloud.user;
210                 storeInfo.storeName = database.name;
211                 storeInfo.instanceId = cloud.apps[schema.bundleName].instanceId;
212                 auto query = info.GenerateQuery(database.name, database.GetTableNames());
213                 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
214                     SyncEvent::EventInfo { info.mode_, info.wait_, retry, std::move(query), info.async_ });
215                 EventCenter::GetInstance().PostEvent(std::move(evt));
216             }
217         }
218     };
219 }
220 
GetSyncHandler(Retryer retryer)221 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
222 {
223     return [retryer](const Event &event) {
224         auto &evt = static_cast<const SyncEvent &>(event);
225         auto &storeInfo = evt.GetStoreInfo();
226         StoreMetaData meta;
227         meta.storeId = storeInfo.storeName;
228         meta.bundleName = storeInfo.bundleName;
229         meta.user = std::to_string(storeInfo.user);
230         meta.instanceId = storeInfo.instanceId;
231         meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
232         if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta)) {
233             ZLOGE("failed, no store meta bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
234                 meta.GetStoreAlias().c_str());
235             return;
236         }
237         auto store = GetStore(meta, storeInfo.user);
238         if (store == nullptr) {
239             ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
240             return;
241         }
242 
243         ZLOGD("database:<%{public}d:%{public}s:%{public}s> sync start", storeInfo.user, storeInfo.bundleName.c_str(),
244             meta.GetStoreAlias().c_str());
245         auto status = store->Sync({ SyncInfo::DEFAULT_ID }, evt.GetMode(), *(evt.GetQuery()), evt.AutoRetry()
246             ? [retryer](const GenDetails &details) {
247                 if (details.empty()) {
248                     ZLOGE("retry, details empty");
249                     return;
250                 }
251                 int32_t code = details.begin()->second.code;
252                 retryer(code == E_LOCKED_BY_OTHERS ? LOCKED_INTERVAL : RETRY_INTERVAL, code);
253             }
254             : evt.GetAsyncDetail(), evt.GetWait());
255         GenAsync async = evt.GetAsyncDetail();
256         if (status != E_OK && async) {
257             GenDetails details;
258             auto &detail = details[SyncInfo::DEFAULT_ID];
259             detail.progress = SYNC_FINISH;
260             detail.code = status;
261             async(std::move(details));
262         }
263     };
264 }
265 
GetClientChangeHandler()266 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
267 {
268     return [this](const Event &event) {
269         auto &evt = static_cast<const SyncEvent &>(event);
270         auto store = evt.GetStoreInfo();
271         SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
272         syncInfo.SetMode(evt.GetMode());
273         syncInfo.SetWait(evt.GetWait());
274         syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
275         syncInfo.SetQuery(evt.GetQuery());
276         auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
277         auto task = GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo));
278         task();
279     };
280 }
281 
GetRetryer(int32_t times,const SyncInfo & syncInfo)282 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo)
283 {
284     if (times >= RETRY_TIMES) {
285         return  [info = SyncInfo(syncInfo)](Duration, int32_t code) mutable {
286             if (code == E_OK) {
287                 return true;
288             }
289             info.SetError(code);
290             return true;
291         };
292     }
293     return [this, times, info = SyncInfo(syncInfo)](Duration interval, int32_t code) mutable {
294         if (code == E_OK) {
295             return true;
296         }
297 
298         activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
299             auto syncId = GenerateId(info.user_);
300             actives_.Compute(syncId, [this, times, interval, &info](const uint64_t &key, TaskId &value) mutable {
301                 value = executor_->Schedule(interval, GetSyncTask(times, true, GenSyncRef(key), std::move(info)));
302                 return true;
303             });
304             return syncId;
305         });
306         return true;
307     };
308 }
309 
GenerateId(int32_t user)310 uint64_t SyncManager::GenerateId(int32_t user)
311 {
312     uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
313     return (syncId << MV_BIT) | (++genId_);
314 }
315 
GenSyncRef(uint64_t syncId)316 RefCount SyncManager::GenSyncRef(uint64_t syncId)
317 {
318     return RefCount([syncId, this]() {
319         actives_.Erase(syncId);
320     });
321 }
322 
Compare(uint64_t syncId,int32_t user)323 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
324 {
325     uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
326     return (syncId & USER_MARK) == (inner << MV_BIT);
327 }
328 
UpdateSchema(const SyncManager::SyncInfo & syncInfo)329 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
330 {
331     CloudEvent::StoreInfo storeInfo;
332     storeInfo.user = syncInfo.user_;
333     storeInfo.bundleName = syncInfo.bundleName_;
334     EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
335 }
336 
GetStore(const StoreMetaData & meta,int32_t user,bool mustBind)337 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
338 {
339     auto instance = CloudServer::GetInstance();
340     if (instance == nullptr) {
341         ZLOGD("not support cloud sync");
342         return nullptr;
343     }
344 
345     auto store = AutoCache::GetInstance().GetStore(meta, {});
346     if (store == nullptr) {
347         ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
348         return nullptr;
349     }
350 
351     if (!store->IsBound()) {
352         CloudInfo info;
353         info.user = user;
354         SchemaMeta schemaMeta;
355         std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
356         if (!MetaDataManager::GetInstance().LoadMeta(std::move(schemaKey), schemaMeta, true)) {
357             ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
358                 meta.GetStoreAlias().c_str());
359             return nullptr;
360         }
361         auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
362         auto cloudDB = instance->ConnectCloudDB(meta.tokenId, dbMeta);
363         auto assetLoader = instance->ConnectAssetLoader(meta.tokenId, dbMeta);
364         if (mustBind && (cloudDB == nullptr || assetLoader == nullptr)) {
365             ZLOGE("failed, no cloud DB <0x%{public}x %{public}s<->%{public}s>", meta.tokenId, dbMeta.name.c_str(),
366                 dbMeta.alias.c_str());
367             return nullptr;
368         }
369 
370         if (cloudDB != nullptr || assetLoader != nullptr) {
371             store->Bind(dbMeta, { std::move(cloudDB), std::move(assetLoader) });
372         }
373     }
374     return store;
375 }
376 } // namespace OHOS::CloudData