1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "SyncManager"
16 #include "sync_manager.h"
17
18 #include "cloud/cloud_info.h"
19 #include "cloud/cloud_server.h"
20 #include "cloud/schema_meta.h"
21 #include "cloud/sync_event.h"
22 #include "device_manager_adapter.h"
23 #include "eventcenter/event_center.h"
24 #include "log_print.h"
25 #include "metadata/meta_data_manager.h"
26 #include "store/auto_cache.h"
27 #include "utils/anonymous.h"
28 namespace OHOS::CloudData {
29 using namespace DistributedData;
30 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
31 using Defer = EventCenter::Defer;
32 std::atomic<uint32_t> SyncManager::genId_ = 0;
SyncInfo(int32_t user,const std::string & bundleName,const Store & store,const Tables & tables)33 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Store &store, const Tables &tables)
34 : user_(user), bundleName_(bundleName)
35 {
36 if (!store.empty()) {
37 tables_[store] = tables;
38 }
39 syncId_ = SyncManager::GenerateId(user);
40 }
41
SyncInfo(int32_t user,const std::string & bundleName,const Stores & stores)42 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const Stores &stores)
43 : user_(user), bundleName_(bundleName)
44 {
45 for (auto &store : stores) {
46 tables_[store] = {};
47 }
48 syncId_ = SyncManager::GenerateId(user);
49 }
50
SyncInfo(int32_t user,const std::string & bundleName,const MutliStoreTables & tables)51 SyncManager::SyncInfo::SyncInfo(int32_t user, const std::string &bundleName, const MutliStoreTables &tables)
52 : user_(user), bundleName_(bundleName), tables_(tables)
53 {
54 tables_ = tables;
55 syncId_ = SyncManager::GenerateId(user);
56 }
57
SetMode(int32_t mode)58 void SyncManager::SyncInfo::SetMode(int32_t mode)
59 {
60 mode_ = mode;
61 }
62
SetWait(int32_t wait)63 void SyncManager::SyncInfo::SetWait(int32_t wait)
64 {
65 wait_ = wait;
66 }
67
SetAsyncDetail(GenAsync asyncDetail)68 void SyncManager::SyncInfo::SetAsyncDetail(GenAsync asyncDetail)
69 {
70 async_ = std::move(asyncDetail);
71 }
72
SetQuery(std::shared_ptr<GenQuery> query)73 void SyncManager::SyncInfo::SetQuery(std::shared_ptr<GenQuery> query)
74 {
75 query_ = query;
76 }
77
SetError(int32_t code) const78 void SyncManager::SyncInfo::SetError(int32_t code) const
79 {
80 if (async_) {
81 GenDetails details;
82 auto &detail = details[id_];
83 detail.progress = SYNC_FINISH;
84 detail.code = code;
85 async_(std::move(details));
86 }
87 }
88
GenerateQuery(const std::string & store,const Tables & tables)89 std::shared_ptr<GenQuery> SyncManager::SyncInfo::GenerateQuery(const std::string &store, const Tables &tables)
90 {
91 if (query_ != nullptr) {
92 return query_;
93 }
94 class SyncQuery final : public GenQuery {
95 public:
96 explicit SyncQuery(const std::vector<std::string> &tables) : tables_(tables)
97 {
98 }
99
100 bool IsEqual(uint64_t tid) override
101 {
102 return false;
103 }
104
105 std::vector<std::string> GetTables() override
106 {
107 return tables_;
108 }
109
110 private:
111 std::vector<std::string> tables_;
112 };
113 auto &syncTables = tables_[store];
114 return std::make_shared<SyncQuery>(syncTables.empty() ? tables : syncTables);
115 }
116
SyncManager()117 SyncManager::SyncManager()
118 {
119 EventCenter::GetInstance().Subscribe(CloudEvent::LOCAL_CHANGE, GetClientChangeHandler());
120 }
121
~SyncManager()122 SyncManager::~SyncManager()
123 {
124 if (executor_ != nullptr) {
125 actives_.ForEachCopies([this](auto &syncId, auto &taskId) {
126 executor_->Remove(taskId);
127 return false;
128 });
129 executor_ = nullptr;
130 }
131 }
132
Bind(std::shared_ptr<ExecutorPool> executor)133 int32_t SyncManager::Bind(std::shared_ptr<ExecutorPool> executor)
134 {
135 executor_ = executor;
136 return E_OK;
137 }
138
DoCloudSync(SyncInfo syncInfo)139 int32_t SyncManager::DoCloudSync(SyncInfo syncInfo)
140 {
141 if (executor_ == nullptr) {
142 return E_NOT_INIT;
143 }
144
145 actives_.Compute(GenerateId(syncInfo.user_), [this, &syncInfo](const uint64_t &key, TaskId &taskId) mutable {
146 taskId = executor_->Execute(GetSyncTask(0, true, GenSyncRef(key), std::move(syncInfo)));
147 return true;
148 });
149
150 return E_OK;
151 }
152
StopCloudSync(int32_t user)153 int32_t SyncManager::StopCloudSync(int32_t user)
154 {
155 if (executor_ == nullptr) {
156 return E_NOT_INIT;
157 }
158 actives_.ForEachCopies([this, user](auto &syncId, auto &taskId) {
159 if (Compare(syncId, user) == 0) {
160 executor_->Remove(taskId);
161 }
162 return false;
163 });
164 return E_OK;
165 }
166
GetSyncTask(int32_t times,bool retry,RefCount ref,SyncInfo && syncInfo)167 ExecutorPool::Task SyncManager::GetSyncTask(int32_t times, bool retry, RefCount ref, SyncInfo &&syncInfo)
168 {
169 times++;
170 return [this, times, retry, ref = std::move(ref), info = std::move(syncInfo)]() mutable {
171 activeInfos_.Erase(info.syncId_);
172 CloudInfo cloud;
173 cloud.user = info.user_;
174 if (!MetaDataManager::GetInstance().LoadMeta(cloud.GetKey(), cloud, true)) {
175 info.SetError(E_CLOUD_DISABLED);
176 ZLOGE("no cloud info for user:%{public}d", info.user_);
177 return;
178 }
179
180 if (!cloud.enableCloud || (info.id_ != SyncInfo::DEFAULT_ID && cloud.id != info.id_) ||
181 (!info.bundleName_.empty() && !cloud.IsOn(info.bundleName_))) {
182 info.SetError(E_CLOUD_DISABLED);
183 return;
184 }
185
186 if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
187 info.SetError(E_NETWORK_ERROR);
188 return;
189 }
190
191 std::vector<SchemaMeta> schemas;
192 auto key = cloud.GetSchemaPrefix(info.bundleName_);
193 auto retryer = GetRetryer(times, info);
194 if (!MetaDataManager::GetInstance().LoadMeta(key, schemas, true) || schemas.empty()) {
195 UpdateSchema(info);
196 retryer(RETRY_INTERVAL, E_RETRY_TIMEOUT);
197 return;
198 }
199
200 Defer defer(GetSyncHandler(std::move(retryer)), CloudEvent::CLOUD_SYNC);
201 for (auto &schema : schemas) {
202 if (!cloud.IsOn(schema.bundleName)) {
203 continue;
204 }
205
206 for (const auto &database : schema.databases) {
207 CloudEvent::StoreInfo storeInfo;
208 storeInfo.bundleName = schema.bundleName;
209 storeInfo.user = cloud.user;
210 storeInfo.storeName = database.name;
211 storeInfo.instanceId = cloud.apps[schema.bundleName].instanceId;
212 auto query = info.GenerateQuery(database.name, database.GetTableNames());
213 auto evt = std::make_unique<SyncEvent>(std::move(storeInfo),
214 SyncEvent::EventInfo { info.mode_, info.wait_, retry, std::move(query), info.async_ });
215 EventCenter::GetInstance().PostEvent(std::move(evt));
216 }
217 }
218 };
219 }
220
GetSyncHandler(Retryer retryer)221 std::function<void(const Event &)> SyncManager::GetSyncHandler(Retryer retryer)
222 {
223 return [retryer](const Event &event) {
224 auto &evt = static_cast<const SyncEvent &>(event);
225 auto &storeInfo = evt.GetStoreInfo();
226 StoreMetaData meta;
227 meta.storeId = storeInfo.storeName;
228 meta.bundleName = storeInfo.bundleName;
229 meta.user = std::to_string(storeInfo.user);
230 meta.instanceId = storeInfo.instanceId;
231 meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
232 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta)) {
233 ZLOGE("failed, no store meta bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
234 meta.GetStoreAlias().c_str());
235 return;
236 }
237 auto store = GetStore(meta, storeInfo.user);
238 if (store == nullptr) {
239 ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
240 return;
241 }
242
243 ZLOGD("database:<%{public}d:%{public}s:%{public}s> sync start", storeInfo.user, storeInfo.bundleName.c_str(),
244 meta.GetStoreAlias().c_str());
245 auto status = store->Sync({ SyncInfo::DEFAULT_ID }, evt.GetMode(), *(evt.GetQuery()), evt.AutoRetry()
246 ? [retryer](const GenDetails &details) {
247 if (details.empty()) {
248 ZLOGE("retry, details empty");
249 return;
250 }
251 int32_t code = details.begin()->second.code;
252 retryer(code == E_LOCKED_BY_OTHERS ? LOCKED_INTERVAL : RETRY_INTERVAL, code);
253 }
254 : evt.GetAsyncDetail(), evt.GetWait());
255 GenAsync async = evt.GetAsyncDetail();
256 if (status != E_OK && async) {
257 GenDetails details;
258 auto &detail = details[SyncInfo::DEFAULT_ID];
259 detail.progress = SYNC_FINISH;
260 detail.code = status;
261 async(std::move(details));
262 }
263 };
264 }
265
GetClientChangeHandler()266 std::function<void(const Event &)> SyncManager::GetClientChangeHandler()
267 {
268 return [this](const Event &event) {
269 auto &evt = static_cast<const SyncEvent &>(event);
270 auto store = evt.GetStoreInfo();
271 SyncInfo syncInfo(store.user, store.bundleName, store.storeName);
272 syncInfo.SetMode(evt.GetMode());
273 syncInfo.SetWait(evt.GetWait());
274 syncInfo.SetAsyncDetail(evt.GetAsyncDetail());
275 syncInfo.SetQuery(evt.GetQuery());
276 auto times = evt.AutoRetry() ? RETRY_TIMES - CLIENT_RETRY_TIMES : RETRY_TIMES;
277 auto task = GetSyncTask(times, evt.AutoRetry(), RefCount(), std::move(syncInfo));
278 task();
279 };
280 }
281
GetRetryer(int32_t times,const SyncInfo & syncInfo)282 SyncManager::Retryer SyncManager::GetRetryer(int32_t times, const SyncInfo &syncInfo)
283 {
284 if (times >= RETRY_TIMES) {
285 return [info = SyncInfo(syncInfo)](Duration, int32_t code) mutable {
286 if (code == E_OK) {
287 return true;
288 }
289 info.SetError(code);
290 return true;
291 };
292 }
293 return [this, times, info = SyncInfo(syncInfo)](Duration interval, int32_t code) mutable {
294 if (code == E_OK) {
295 return true;
296 }
297
298 activeInfos_.ComputeIfAbsent(info.syncId_, [this, times, interval, &info](uint64_t key) mutable {
299 auto syncId = GenerateId(info.user_);
300 actives_.Compute(syncId, [this, times, interval, &info](const uint64_t &key, TaskId &value) mutable {
301 value = executor_->Schedule(interval, GetSyncTask(times, true, GenSyncRef(key), std::move(info)));
302 return true;
303 });
304 return syncId;
305 });
306 return true;
307 };
308 }
309
GenerateId(int32_t user)310 uint64_t SyncManager::GenerateId(int32_t user)
311 {
312 uint64_t syncId = static_cast<uint64_t>(user) & 0xFFFFFFFF;
313 return (syncId << MV_BIT) | (++genId_);
314 }
315
GenSyncRef(uint64_t syncId)316 RefCount SyncManager::GenSyncRef(uint64_t syncId)
317 {
318 return RefCount([syncId, this]() {
319 actives_.Erase(syncId);
320 });
321 }
322
Compare(uint64_t syncId,int32_t user)323 int32_t SyncManager::Compare(uint64_t syncId, int32_t user)
324 {
325 uint64_t inner = static_cast<uint64_t>(user) & 0xFFFFFFFF;
326 return (syncId & USER_MARK) == (inner << MV_BIT);
327 }
328
UpdateSchema(const SyncManager::SyncInfo & syncInfo)329 void SyncManager::UpdateSchema(const SyncManager::SyncInfo &syncInfo)
330 {
331 CloudEvent::StoreInfo storeInfo;
332 storeInfo.user = syncInfo.user_;
333 storeInfo.bundleName = syncInfo.bundleName_;
334 EventCenter::GetInstance().PostEvent(std::make_unique<CloudEvent>(CloudEvent::GET_SCHEMA, storeInfo));
335 }
336
GetStore(const StoreMetaData & meta,int32_t user,bool mustBind)337 AutoCache::Store SyncManager::GetStore(const StoreMetaData &meta, int32_t user, bool mustBind)
338 {
339 auto instance = CloudServer::GetInstance();
340 if (instance == nullptr) {
341 ZLOGD("not support cloud sync");
342 return nullptr;
343 }
344
345 auto store = AutoCache::GetInstance().GetStore(meta, {});
346 if (store == nullptr) {
347 ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
348 return nullptr;
349 }
350
351 if (!store->IsBound()) {
352 CloudInfo info;
353 info.user = user;
354 SchemaMeta schemaMeta;
355 std::string schemaKey = info.GetSchemaKey(meta.bundleName, meta.instanceId);
356 if (!MetaDataManager::GetInstance().LoadMeta(std::move(schemaKey), schemaMeta, true)) {
357 ZLOGE("failed, no schema bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
358 meta.GetStoreAlias().c_str());
359 return nullptr;
360 }
361 auto dbMeta = schemaMeta.GetDataBase(meta.storeId);
362 auto cloudDB = instance->ConnectCloudDB(meta.tokenId, dbMeta);
363 auto assetLoader = instance->ConnectAssetLoader(meta.tokenId, dbMeta);
364 if (mustBind && (cloudDB == nullptr || assetLoader == nullptr)) {
365 ZLOGE("failed, no cloud DB <0x%{public}x %{public}s<->%{public}s>", meta.tokenId, dbMeta.name.c_str(),
366 dbMeta.alias.c_str());
367 return nullptr;
368 }
369
370 if (cloudDB != nullptr || assetLoader != nullptr) {
371 store->Bind(dbMeta, { std::move(cloudDB), std::move(assetLoader) });
372 }
373 }
374 return store;
375 }
376 } // namespace OHOS::CloudData