• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "CloudServiceImpl"
17 
18 #include "cloud_service_impl.h"
19 
20 #include "accesstoken_kit.h"
21 #include "account/account_delegate.h"
22 #include "checker/checker_manager.h"
23 #include "cloud/cloud_server.h"
24 #include "cloud/cloud_share_event.h"
25 #include "cloud/make_query_event.h"
26 #include "cloud_value_util.h"
27 #include "communicator/device_manager_adapter.h"
28 #include "eventcenter/event_center.h"
29 #include "hap_token_info.h"
30 #include "ipc_skeleton.h"
31 #include "log_print.h"
32 #include "metadata/meta_data_manager.h"
33 #include "rdb_cloud_data_translate.h"
34 #include "rdb_types.h"
35 #include "runtime_config.h"
36 #include "store/auto_cache.h"
37 #include "store/general_store.h"
38 #include "sync_manager.h"
39 #include "utils/anonymous.h"
40 #include "values_bucket.h"
41 namespace OHOS::CloudData {
42 using namespace DistributedData;
43 using namespace DistributedKv;
44 using namespace std::chrono;
45 using namespace SharingUtil;
46 using namespace Security::AccessToken;
47 using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter;
48 using Account = OHOS::DistributedKv::AccountDelegate;
49 using AccessTokenKit = Security::AccessToken::AccessTokenKit;
50 __attribute__((used)) CloudServiceImpl::Factory CloudServiceImpl::factory_;
51 
Factory()52 CloudServiceImpl::Factory::Factory() noexcept
53 {
54     FeatureSystem::GetInstance().RegisterCreator(
55         CloudServiceImpl::SERVICE_NAME,
56         [this]() {
57             if (product_ == nullptr) {
58                 product_ = std::make_shared<CloudServiceImpl>();
59             }
60             return product_;
61         },
62         FeatureSystem::BIND_NOW);
63     staticActs_ = std::make_shared<CloudStatic>();
64     FeatureSystem::GetInstance().RegisterStaticActs(CloudServiceImpl::SERVICE_NAME, staticActs_);
65 }
66 
~Factory()67 CloudServiceImpl::Factory::~Factory() {}
68 
CloudServiceImpl()69 CloudServiceImpl::CloudServiceImpl()
70 {
71     EventCenter::GetInstance().Subscribe(CloudEvent::GET_SCHEMA, [this](const Event &event) {
72         GetSchema(event);
73     });
74     EventCenter::GetInstance().Subscribe(CloudEvent::CLOUD_SHARE, [this](const Event &event) {
75         CloudShare(event);
76     });
77     MetaDataManager::GetInstance().Subscribe(
78         Subscription::GetPrefix({ "" }), [this](const std::string &key,
79             const std::string &value, int32_t flag) -> auto {
80             if (flag != MetaDataManager::INSERT && flag != MetaDataManager::UPDATE) {
81                 return true;
82             }
83             Subscription sub;
84             Subscription::Unmarshall(value, sub);
85             InitSubTask(sub);
86             return true;
87         }, true);
88 }
89 
EnableCloud(const std::string & id,const std::map<std::string,int32_t> & switches)90 int32_t CloudServiceImpl::EnableCloud(const std::string &id, const std::map<std::string, int32_t> &switches)
91 {
92     auto tokenId = IPCSkeleton::GetCallingTokenID();
93     auto user = Account::GetInstance()->GetUserByToken(tokenId);
94     auto [status, cloudInfo] = GetCloudInfo(user);
95     if (status != SUCCESS) {
96         return status;
97     }
98     cloudInfo.enableCloud = true;
99     for (const auto &[bundle, value] : switches) {
100         if (!cloudInfo.Exist(bundle)) {
101             continue;
102         }
103         cloudInfo.apps[bundle].cloudSwitch = (value == SWITCH_ON);
104     }
105     if (!MetaDataManager::GetInstance().SaveMeta(cloudInfo.GetKey(), cloudInfo, true)) {
106         return ERROR;
107     }
108     Execute(GenTask(0, cloudInfo.user, { WORK_CLOUD_INFO_UPDATE, WORK_SCHEMA_UPDATE, WORK_SUB, WORK_DO_CLOUD_SYNC }));
109     return SUCCESS;
110 }
111 
DisableCloud(const std::string & id)112 int32_t CloudServiceImpl::DisableCloud(const std::string &id)
113 {
114     auto tokenId = IPCSkeleton::GetCallingTokenID();
115     auto user = Account::GetInstance()->GetUserByToken(tokenId);
116     auto [status, cloudInfo] = GetCloudInfo(user);
117     if (status != SUCCESS) {
118         return status;
119     }
120     if (cloudInfo.id != id) {
121         ZLOGE("invalid args, [input] id:%{public}s, [exist] id:%{public}s", Anonymous::Change(id).c_str(),
122             Anonymous::Change(cloudInfo.id).c_str());
123         return ERROR;
124     }
125     cloudInfo.enableCloud = false;
126     if (!MetaDataManager::GetInstance().SaveMeta(cloudInfo.GetKey(), cloudInfo, true)) {
127         return ERROR;
128     }
129     Execute(GenTask(0, cloudInfo.user, { WORK_STOP_CLOUD_SYNC, WORK_RELEASE, WORK_SUB }));
130     return SUCCESS;
131 }
132 
ChangeAppSwitch(const std::string & id,const std::string & bundleName,int32_t appSwitch)133 int32_t CloudServiceImpl::ChangeAppSwitch(const std::string &id, const std::string &bundleName, int32_t appSwitch)
134 {
135     auto tokenId = IPCSkeleton::GetCallingTokenID();
136     auto user = Account::GetInstance()->GetUserByToken(tokenId);
137     auto [status, cloudInfo] = GetCloudInfo(user);
138     if (status != SUCCESS) {
139         return status;
140     }
141     if (cloudInfo.id != id || !cloudInfo.Exist(bundleName)) {
142         ZLOGE("invalid args, [input] id:%{public}s, [exist] id:%{public}s, bundleName:%{public}s",
143             Anonymous::Change(id).c_str(), Anonymous::Change(cloudInfo.id).c_str(), bundleName.c_str());
144         return ERROR;
145     }
146     cloudInfo.apps[bundleName].cloudSwitch = (appSwitch == SWITCH_ON);
147     if (!MetaDataManager::GetInstance().SaveMeta(cloudInfo.GetKey(), cloudInfo, true)) {
148         return ERROR;
149     }
150     Execute(GenTask(0, cloudInfo.user, { WORK_CLOUD_INFO_UPDATE, WORK_SCHEMA_UPDATE, WORK_SUB }));
151     if (cloudInfo.enableCloud && appSwitch == SWITCH_ON) {
152         syncManager_.DoCloudSync({ cloudInfo.user, bundleName });
153     }
154     return SUCCESS;
155 }
156 
DoClean(CloudInfo & cloudInfo,const std::map<std::string,int32_t> & actions)157 int32_t CloudServiceImpl::DoClean(CloudInfo &cloudInfo, const std::map<std::string, int32_t> &actions)
158 {
159     syncManager_.StopCloudSync(cloudInfo.user);
160     auto keys = cloudInfo.GetSchemaKey();
161     for (const auto &[bundle, action] : actions) {
162         if (!cloudInfo.Exist(bundle)) {
163             continue;
164         }
165         SchemaMeta schemaMeta;
166         if (!MetaDataManager::GetInstance().LoadMeta(keys[bundle], schemaMeta, true)) {
167             ZLOGE("failed, no schema meta:bundleName:%{public}s", bundle.c_str());
168             return ERROR;
169         }
170         for (const auto &database : schemaMeta.databases) {
171             // action
172             StoreMetaData meta;
173             meta.bundleName = schemaMeta.bundleName;
174             meta.storeId = database.name;
175             meta.user = std::to_string(cloudInfo.user);
176             meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
177             meta.instanceId = cloudInfo.apps[bundle].instanceId;
178             if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
179                 ZLOGE("failed, no store meta bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
180                     meta.GetStoreAlias().c_str());
181                 continue;
182             }
183             AutoCache::Store store = SyncManager::GetStore(meta, cloudInfo.user, false);
184             if (store == nullptr) {
185                 ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
186                 return ERROR;
187             }
188             auto status = store->Clean({}, action, "");
189             if (status != E_OK) {
190                 ZLOGW("remove device data status:%{public}d, user:%{public}d, bundleName:%{public}s, "
191                       "storeId:%{public}s",
192                     status, static_cast<int>(cloudInfo.user), meta.bundleName.c_str(), meta.GetStoreAlias().c_str());
193                 continue;
194             }
195         }
196     }
197     return SUCCESS;
198 }
199 
Clean(const std::string & id,const std::map<std::string,int32_t> & actions)200 int32_t CloudServiceImpl::Clean(const std::string &id, const std::map<std::string, int32_t> &actions)
201 {
202     auto tokenId = IPCSkeleton::GetCallingTokenID();
203     auto user = Account::GetInstance()->GetUserByToken(tokenId);
204     auto [status, cloudInfo] = GetCloudInfoFromMeta(user);
205     if (status != SUCCESS) {
206         ZLOGE("get cloud meta failed user:%{public}d", static_cast<int>(cloudInfo.user));
207         return ERROR;
208     }
209     if (id != cloudInfo.id) {
210         ZLOGE("different id, [server] id:%{public}s, [meta] id:%{public}s", Anonymous::Change(cloudInfo.id).c_str(),
211             Anonymous::Change(id).c_str());
212         return ERROR;
213     }
214     auto dbActions = ConvertAction(actions);
215     if (dbActions.empty()) {
216         ZLOGE("invalid actions. id:%{public}s", Anonymous::Change(cloudInfo.id).c_str());
217         return ERROR;
218     }
219     return DoClean(cloudInfo, dbActions);
220 }
221 
CheckNotifyConditions(const std::string & id,const std::string & bundleName,CloudInfo & cloudInfo)222 int32_t CloudServiceImpl::CheckNotifyConditions(const std::string &id, const std::string &bundleName,
223     CloudInfo &cloudInfo)
224 {
225     if (cloudInfo.id != id) {
226         ZLOGE("invalid args, [input] id:%{public}s, [meta] id:%{public}s", Anonymous::Change(id).c_str(),
227             Anonymous::Change(cloudInfo.id).c_str());
228         return INVALID_ARGUMENT;
229     }
230     if (!cloudInfo.enableCloud) {
231         return CLOUD_DISABLE;
232     }
233     if (!cloudInfo.Exist(bundleName)) {
234         ZLOGE("bundleName:%{public}s is not exist", bundleName.c_str());
235         return INVALID_ARGUMENT;
236     }
237     if (!cloudInfo.apps[bundleName].cloudSwitch) {
238         return CLOUD_DISABLE_SWITCH;
239     }
240     return SUCCESS;
241 }
242 
GetDbInfoFromExtraData(const ExtraData & extraData,const SchemaMeta & schemaMeta)243 std::pair<std::string, std::vector<std::string>> CloudServiceImpl::GetDbInfoFromExtraData(const ExtraData &extraData,
244     const SchemaMeta &schemaMeta)
245 {
246     for (auto &db : schemaMeta.databases) {
247         if (db.alias != extraData.info.containerName) {
248             continue;
249         }
250         std::vector<std::string> tables;
251         for (auto &table : db.tables) {
252             const auto &tbs = extraData.info.tables;
253             if (std::find(tbs.begin(), tbs.end(), table.alias) == tbs.end()) {
254                 continue;
255             }
256             if (extraData.isPrivate()) {
257                 ZLOGD("private table change, name:%{public}s", Anonymous::Change(table.name).c_str());
258                 tables.emplace_back(table.name);
259             }
260             if (extraData.isShared() && !table.sharedTableName.empty()) {
261                 ZLOGD("sharing table change, name:%{public}s", Anonymous::Change(table.sharedTableName).c_str());
262                 tables.emplace_back(table.sharedTableName);
263             }
264         }
265         return { db.name, std::move(tables) };
266     }
267     return { "", {} };
268 }
269 
NotifyDataChange(const std::string & id,const std::string & bundleName)270 int32_t CloudServiceImpl::NotifyDataChange(const std::string &id, const std::string &bundleName)
271 {
272     auto tokenId = IPCSkeleton::GetCallingTokenID();
273     auto user = Account::GetInstance()->GetUserByToken(tokenId);
274     auto [status, cloudInfo] = GetCloudInfoFromMeta(user);
275     if (CheckNotifyConditions(id, bundleName, cloudInfo) != E_OK) {
276         return INVALID_ARGUMENT;
277     }
278     syncManager_.DoCloudSync(SyncManager::SyncInfo(cloudInfo.user, bundleName));
279     return SUCCESS;
280 }
281 
NotifyDataChange(const std::string & eventId,const std::string & extraData,int32_t userId)282 int32_t CloudServiceImpl::NotifyDataChange(const std::string &eventId, const std::string &extraData, int32_t userId)
283 {
284     ExtraData exData;
285     if (eventId != DATA_CHANGE_EVENT_ID || extraData.empty() || !exData.Unmarshall(extraData)) {
286         ZLOGE("invalid args, eventId:%{public}s, user:%{public}d, extraData is %{public}s", eventId.c_str(),
287             userId, extraData.empty() ? "empty" : "not empty");
288         return INVALID_ARGUMENT;
289     }
290     std::vector<int32_t> users;
291     if (userId != INVALID_USER_ID) {
292         users.emplace_back(userId);
293     } else {
294         Account::GetInstance()->QueryUsers(users);
295     }
296     for (auto user : users) {
297         if (user == DEFAULT_USER) {
298             continue;
299         }
300         auto [status, cloudInfo] = GetCloudInfoFromMeta(user);
301         if (CheckNotifyConditions(exData.info.accountId, exData.info.bundleName, cloudInfo) != E_OK) {
302             ZLOGD("invalid user:%{public}d", user);
303             return INVALID_ARGUMENT;
304         }
305         auto schemaKey = CloudInfo::GetSchemaKey(user, exData.info.bundleName);
306         SchemaMeta schemaMeta;
307         if (!MetaDataManager::GetInstance().LoadMeta(schemaKey, schemaMeta, true)) {
308             ZLOGE("no exist meta, user:%{public}d", user);
309             return INVALID_ARGUMENT;
310         }
311         auto [storeId, tables] = GetDbInfoFromExtraData(exData, schemaMeta);
312         if (storeId.empty()) {
313             ZLOGE("invalid data, storeId:%{public}s, tables size:%{public}zu", Anonymous::Change(storeId).c_str(),
314                 tables.size());
315             return INVALID_ARGUMENT;
316         }
317         syncManager_.DoCloudSync(SyncManager::SyncInfo(cloudInfo.user, exData.info.bundleName, storeId, tables));
318     }
319     return SUCCESS;
320 }
321 
OnInitialize()322 int32_t CloudServiceImpl::OnInitialize()
323 {
324     DistributedDB::RuntimeConfig::SetCloudTranslate(std::make_shared<DistributedRdb::RdbCloudDataTranslate>());
325     Execute(GenTask(0, 0, { WORK_CLOUD_INFO_UPDATE, WORK_SCHEMA_UPDATE, WORK_SUB }));
326     std::vector<int> users;
327     Account::GetInstance()->QueryUsers(users);
328     for (auto user : users) {
329         if (user == DEFAULT_USER) {
330             continue;
331         }
332         Subscription sub;
333         sub.userId = user;
334         if (!MetaDataManager::GetInstance().LoadMeta(sub.GetKey(), sub, true)) {
335             continue;
336         }
337         InitSubTask(sub);
338     }
339     return E_OK;
340 }
341 
OnBind(const BindInfo & info)342 int32_t CloudServiceImpl::OnBind(const BindInfo &info)
343 {
344     if (executor_ != nullptr || info.executors == nullptr) {
345         return E_INVALID_ARGS;
346     }
347 
348     executor_ = std::move(info.executors);
349     syncManager_.Bind(executor_);
350     auto instance = CloudServer::GetInstance();
351     if (instance != nullptr) {
352         instance->Bind(executor_);
353     }
354     return E_OK;
355 }
356 
OnUserChange(uint32_t code,const std::string & user,const std::string & account)357 int32_t CloudServiceImpl::OnUserChange(uint32_t code, const std::string &user, const std::string &account)
358 {
359     int32_t userId = atoi(user.c_str());
360     switch (code) {
361         case static_cast<uint32_t>(AccountStatus::DEVICE_ACCOUNT_SWITCHED):
362             Execute(GenTask(0, userId, { WORK_CLOUD_INFO_UPDATE, WORK_SCHEMA_UPDATE, WORK_SUB, WORK_DO_CLOUD_SYNC }));
363             break;
364         case static_cast<uint32_t>(AccountStatus::DEVICE_ACCOUNT_DELETE):
365             Execute(GenTask(0, userId, { WORK_STOP_CLOUD_SYNC, WORK_RELEASE }));
366             break;
367         case static_cast<uint32_t>(AccountStatus::DEVICE_ACCOUNT_UNLOCKED):
368             Execute(GenTask(0, userId, { WORK_CLOUD_INFO_UPDATE, WORK_SCHEMA_UPDATE, WORK_SUB, WORK_DO_CLOUD_SYNC }));
369             break;
370         default:
371             break;
372     }
373     return E_OK;
374 }
375 
Online(const std::string & device)376 int32_t CloudServiceImpl::Online(const std::string &device)
377 {
378     if (device != DeviceManagerAdapter::CLOUD_DEVICE_UUID) {
379         ZLOGI("Not network online");
380         return SUCCESS;
381     }
382     std::vector<int32_t> users;
383     Account::GetInstance()->QueryUsers(users);
384     if (users.empty()) {
385         return SUCCESS;
386     }
387     auto it = users.begin();
388     Execute(GenTask(0, *it, { WORK_CLOUD_INFO_UPDATE, WORK_SCHEMA_UPDATE, WORK_SUB, WORK_DO_CLOUD_SYNC }));
389     return SUCCESS;
390 }
391 
Offline(const std::string & device)392 int32_t CloudServiceImpl::Offline(const std::string &device)
393 {
394     if (device != DeviceManagerAdapter::CLOUD_DEVICE_UUID) {
395         ZLOGI("Not network offline");
396         return SUCCESS;
397     }
398     std::vector<int32_t> users;
399     Account::GetInstance()->QueryUsers(users);
400     if (users.empty()) {
401         return SUCCESS;
402     }
403     auto it = users.begin();
404     syncManager_.StopCloudSync(*it);
405     return SUCCESS;
406 }
407 
GetCloudInfoFromMeta(int32_t userId)408 std::pair<int32_t, CloudInfo> CloudServiceImpl::GetCloudInfoFromMeta(int32_t userId)
409 {
410     CloudInfo cloudInfo;
411     cloudInfo.user = userId;
412     if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true)) {
413         ZLOGE("no exist meta, user:%{public}d", cloudInfo.user);
414         return { ERROR, cloudInfo };
415     }
416     return { SUCCESS, cloudInfo };
417 }
418 
GetCloudInfoFromServer(int32_t userId)419 std::pair<int32_t, CloudInfo> CloudServiceImpl::GetCloudInfoFromServer(int32_t userId)
420 {
421     CloudInfo cloudInfo;
422     cloudInfo.user = userId;
423     if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
424         return { ERROR, cloudInfo };
425     }
426     auto instance = CloudServer::GetInstance();
427     if (instance == nullptr) {
428         return { SERVER_UNAVAILABLE, cloudInfo };
429     }
430     cloudInfo = instance->GetServerInfo(cloudInfo.user);
431     if (!cloudInfo.IsValid()) {
432         ZLOGE("cloud is empty, user%{public}d", cloudInfo.user);
433         return { ERROR, cloudInfo };
434     }
435     return { SUCCESS, cloudInfo };
436 }
437 
UpdateCloudInfo(int32_t user)438 bool CloudServiceImpl::UpdateCloudInfo(int32_t user)
439 {
440     auto [status, cloudInfo] = GetCloudInfoFromServer(user);
441     if (status != SUCCESS) {
442         return false;
443     }
444     CloudInfo oldInfo;
445     if (!MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), oldInfo, true)) {
446         MetaDataManager::GetInstance().SaveMeta(cloudInfo.GetKey(), cloudInfo, true);
447         return true;
448     }
449     MetaDataManager::GetInstance().SaveMeta(cloudInfo.GetKey(), cloudInfo, true);
450     if (oldInfo.id != cloudInfo.id) {
451         ReleaseUserInfo(user);
452         ZLOGE("different id, [server] id:%{public}s, [meta] id:%{public}s", Anonymous::Change(cloudInfo.id).c_str(),
453             Anonymous::Change(oldInfo.id).c_str());
454         std::map<std::string, int32_t> actions;
455         for (auto &[bundle, app] : cloudInfo.apps) {
456             actions[bundle] = GeneralStore::CleanMode::CLOUD_INFO;
457         }
458         DoClean(oldInfo, actions);
459     }
460     return true;
461 }
462 
UpdateSchema(int32_t user)463 bool CloudServiceImpl::UpdateSchema(int32_t user)
464 {
465     auto [status, cloudInfo] = GetCloudInfoFromServer(user);
466     if (status != SUCCESS) {
467         return false;
468     }
469     auto keys = cloudInfo.GetSchemaKey();
470     for (const auto &[bundle, key] : keys) {
471         SchemaMeta schemaMeta;
472         std::tie(status, schemaMeta) = GetAppSchemaFromServer(user, bundle);
473         if (status != SUCCESS) {
474             continue;
475         }
476         MetaDataManager::GetInstance().SaveMeta(key, schemaMeta, true);
477     }
478     return true;
479 }
480 
GetAppSchemaFromServer(int32_t user,const std::string & bundleName)481 std::pair<int32_t, SchemaMeta> CloudServiceImpl::GetAppSchemaFromServer(int32_t user, const std::string &bundleName)
482 {
483     SchemaMeta schemaMeta;
484     if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
485         return { ERROR, schemaMeta };
486     }
487     auto instance = CloudServer::GetInstance();
488     if (instance == nullptr) {
489         return { SERVER_UNAVAILABLE, schemaMeta };
490     }
491     schemaMeta = instance->GetAppSchema(user, bundleName);
492     if (!schemaMeta.IsValid()) {
493         ZLOGE("schema is InValid, user:%{public}d, bundleName:%{public}s", user, bundleName.c_str());
494         return { ERROR, schemaMeta };
495     }
496     return { SUCCESS, schemaMeta };
497 }
498 
GenTask(int32_t retry,int32_t user,Handles handles)499 ExecutorPool::Task CloudServiceImpl::GenTask(int32_t retry, int32_t user, Handles handles)
500 {
501     return [this, retry, user, works = std::move(handles)]() mutable {
502         auto executor = executor_;
503         if (retry >= RETRY_TIMES || executor == nullptr || works.empty()) {
504             return;
505         }
506         if (!DmAdapter::GetInstance().IsNetworkAvailable()) {
507             return;
508         }
509         bool finished = true;
510         std::vector<int32_t> users;
511         if (user == 0) {
512             auto account = Account::GetInstance();
513             finished = !(account == nullptr) && account->QueryUsers(users);
514         } else {
515             users.push_back(user);
516         }
517 
518         auto handle = works.front();
519         for (auto user : users) {
520             if (user == 0 || !Account::GetInstance()->IsVerified(user)) {
521                 continue;
522             }
523             finished = (this->*handle)(user) && finished;
524         }
525         if (!finished || users.empty()) {
526             executor->Schedule(std::chrono::seconds(RETRY_INTERVAL), GenTask(retry + 1, user, std::move(works)));
527             return;
528         }
529         works.pop_front();
530         if (!works.empty()) {
531             executor->Execute(GenTask(retry, user, std::move(works)));
532         }
533     };
534 }
535 
GetSchemaMeta(int32_t userId,const std::string & bundleName,int32_t instanceId)536 std::pair<int32_t, SchemaMeta> CloudServiceImpl::GetSchemaMeta(int32_t userId, const std::string &bundleName,
537     int32_t instanceId)
538 {
539     SchemaMeta schemaMeta;
540     auto [status, cloudInfo] = GetCloudInfoFromMeta(userId);
541     if (status != SUCCESS) {
542         // GetCloudInfo has print the log info. so we don`t need print again.
543         return { status, schemaMeta };
544     }
545     if (!bundleName.empty() && !cloudInfo.Exist(bundleName, instanceId)) {
546         ZLOGD("bundleName:%{public}s instanceId:%{public}d is not exist", bundleName.c_str(), instanceId);
547         return { ERROR, schemaMeta };
548     }
549     std::string schemaKey = cloudInfo.GetSchemaKey(bundleName, instanceId);
550     if (MetaDataManager::GetInstance().LoadMeta(schemaKey, schemaMeta, true)) {
551         return { SUCCESS, schemaMeta };
552     }
553     if (!Account::GetInstance()->IsVerified(userId)) {
554         return { ERROR, schemaMeta };
555     }
556     std::tie(status, schemaMeta) = GetAppSchemaFromServer(userId, bundleName);
557     if (status != SUCCESS) {
558         return { status, schemaMeta };
559     }
560     MetaDataManager::GetInstance().SaveMeta(schemaKey, schemaMeta, true);
561     return { SUCCESS, schemaMeta };
562 }
563 
GetCloudInfo(int32_t userId)564 std::pair<int32_t, CloudInfo> CloudServiceImpl::GetCloudInfo(int32_t userId)
565 {
566     auto [status, cloudInfo] = GetCloudInfoFromMeta(userId);
567     if (status == SUCCESS) {
568         return { status, cloudInfo };
569     }
570     if (!Account::GetInstance()->IsVerified(userId)) {
571         ZLOGW("user:%{public}d is locked!", userId);
572         return { ERROR, cloudInfo };
573     }
574     std::tie(status, cloudInfo) = GetCloudInfoFromServer(userId);
575     if (status == SUCCESS) {
576         return { status, cloudInfo };
577     }
578     MetaDataManager::GetInstance().SaveMeta(cloudInfo.GetKey(), cloudInfo, true);
579     return { SUCCESS, cloudInfo };
580 }
581 
OnAppUninstall(const std::string & bundleName,int32_t user,int32_t index)582 int32_t CloudServiceImpl::CloudStatic::OnAppUninstall(const std::string &bundleName, int32_t user, int32_t index)
583 {
584     MetaDataManager::GetInstance().DelMeta(Subscription::GetRelationKey(user, bundleName), true);
585     MetaDataManager::GetInstance().DelMeta(CloudInfo::GetSchemaKey(user, bundleName, index), true);
586     return E_OK;
587 }
588 
GetSchema(const Event & event)589 void CloudServiceImpl::GetSchema(const Event &event)
590 {
591     auto &rdbEvent = static_cast<const CloudEvent &>(event);
592     auto &storeInfo = rdbEvent.GetStoreInfo();
593     ZLOGD("Start GetSchema, bundleName:%{public}s, storeName:%{public}s, instanceId:%{public}d",
594         storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.instanceId);
595     GetSchemaMeta(storeInfo.user, storeInfo.bundleName, storeInfo.instanceId);
596 }
597 
CloudShare(const Event & event)598 void CloudServiceImpl::CloudShare(const Event &event)
599 {
600     auto &cloudShareEvent = static_cast<const CloudShareEvent &>(event);
601     auto &storeInfo = cloudShareEvent.GetStoreInfo();
602     auto query = cloudShareEvent.GetQuery();
603     auto callback = cloudShareEvent.GetCallback();
604     if (query == nullptr) {
605         ZLOGE("query is null, bundleName:%{public}s, storeName:%{public}s, instanceId:%{public}d",
606             storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.instanceId);
607         if (callback) {
608             callback(GeneralError::E_ERROR, nullptr);
609         }
610         return;
611     }
612     ZLOGD("Start PreShare, bundleName:%{public}s, storeName:%{public}s, instanceId:%{public}d",
613         storeInfo.bundleName.c_str(), Anonymous::Change(storeInfo.storeName).c_str(), storeInfo.instanceId);
614     auto [status, cursor] = PreShare(storeInfo, *query);
615 
616     if (callback) {
617         callback(status, cursor);
618     }
619 }
620 
PreShare(const CloudEvent::StoreInfo & storeInfo,GenQuery & query)621 std::pair<int32_t, std::shared_ptr<DistributedData::Cursor>> CloudServiceImpl::PreShare(
622     const CloudEvent::StoreInfo& storeInfo, GenQuery& query)
623 {
624     StoreMetaData meta;
625     meta.bundleName = storeInfo.bundleName;
626     meta.storeId = storeInfo.storeName;
627     meta.user = std::to_string(storeInfo.user);
628     meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid;
629     meta.instanceId = storeInfo.instanceId;
630     if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKey(), meta, true)) {
631         ZLOGE("failed, no store meta bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(),
632             meta.GetStoreAlias().c_str());
633         return { GeneralError::E_ERROR, nullptr };
634     }
635     AutoCache::Store store = SyncManager::GetStore(meta, storeInfo.user, true);
636     if (store == nullptr) {
637         ZLOGE("store null, storeId:%{public}s", meta.GetStoreAlias().c_str());
638         return { GeneralError::E_ERROR, nullptr };
639     }
640     return { GeneralError::E_OK, store->PreSharing(query) };
641 }
642 
ReleaseUserInfo(int32_t user)643 bool CloudServiceImpl::ReleaseUserInfo(int32_t user)
644 {
645     auto instance = CloudServer::GetInstance();
646     if (instance == nullptr) {
647         return true;
648     }
649     instance->ReleaseUserInfo(user);
650     return true;
651 }
652 
DoCloudSync(int32_t user)653 bool CloudServiceImpl::DoCloudSync(int32_t user)
654 {
655     syncManager_.DoCloudSync(user);
656     return true;
657 }
658 
StopCloudSync(int32_t user)659 bool CloudServiceImpl::StopCloudSync(int32_t user)
660 {
661     syncManager_.StopCloudSync(user);
662     return true;
663 }
664 
DoSubscribe(int32_t user)665 bool CloudServiceImpl::DoSubscribe(int32_t user)
666 {
667     Subscription sub;
668     sub.userId = user;
669     MetaDataManager::GetInstance().LoadMeta(sub.GetKey(), sub, true);
670     if (CloudServer::GetInstance() == nullptr) {
671         ZLOGI("not support cloud server");
672         return true;
673     }
674 
675     CloudInfo cloudInfo;
676     cloudInfo.user = sub.userId;
677     auto exits = MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetKey(), cloudInfo, true);
678     if (!exits) {
679         ZLOGW("error, there is no cloud info for user(%{public}d)", sub.userId);
680         return false;
681     }
682     if (!sub.id.empty() && sub.id != cloudInfo.id) {
683         CleanSubscription(sub);
684         sub.id.clear();
685         sub.expiresTime.clear();
686     }
687 
688     ZLOGD("begin cloud:%{public}d user:%{public}d apps:%{public}zu", cloudInfo.enableCloud, sub.userId,
689         cloudInfo.apps.size());
690     auto onThreshold = duration_cast<milliseconds>((system_clock::now() + hours(EXPIRE_INTERVAL)).time_since_epoch());
691     auto offThreshold = duration_cast<milliseconds>(system_clock::now().time_since_epoch());
692     std::map<std::string, std::vector<SchemaMeta::Database>> subDbs;
693     std::map<std::string, std::vector<SchemaMeta::Database>> unsubDbs;
694     for (auto &[bundle, app] : cloudInfo.apps) {
695         auto enabled = cloudInfo.enableCloud && app.cloudSwitch;
696         auto &dbs = enabled ? subDbs : unsubDbs;
697         auto it = sub.expiresTime.find(bundle);
698         // cloud is enabled, but the subscription won't expire
699         if (enabled && (it != sub.expiresTime.end() && it->second >= static_cast<uint64_t>(onThreshold.count()))) {
700             continue;
701         }
702         // cloud is disabled, we don't care the subscription which was expired or didn't subscribe.
703         if (!enabled && (it == sub.expiresTime.end() || it->second <= static_cast<uint64_t>(offThreshold.count()))) {
704             continue;
705         }
706 
707         SchemaMeta schemaMeta;
708         exits = MetaDataManager::GetInstance().LoadMeta(cloudInfo.GetSchemaKey(bundle), schemaMeta, true);
709         if (exits) {
710             dbs.insert_or_assign(bundle, std::move(schemaMeta.databases));
711         }
712     }
713 
714     ZLOGI("cloud switch:%{public}d user%{public}d, sub:%{public}zu, unsub:%{public}zu", cloudInfo.enableCloud,
715         sub.userId, subDbs.size(), unsubDbs.size());
716     ZLOGD("Subscribe user%{public}d details:%{public}s", sub.userId, Serializable::Marshall(subDbs).c_str());
717     ZLOGD("Unsubscribe user%{public}d details:%{public}s", sub.userId, Serializable::Marshall(unsubDbs).c_str());
718     CloudServer::GetInstance()->Subscribe(sub.userId, subDbs);
719     CloudServer::GetInstance()->Unsubscribe(sub.userId, unsubDbs);
720     return subDbs.empty() && unsubDbs.empty();
721 }
722 
CleanSubscription(Subscription & sub)723 void CloudServiceImpl::CleanSubscription(Subscription &sub)
724 {
725     ZLOGD("id:%{public}s, size:%{public}zu", Anonymous::Change(sub.id).c_str(), sub.expiresTime.size());
726     MetaDataManager::GetInstance().DelMeta(sub.GetKey(), true);
727     for (const auto &[bundle, expireTime] : sub.expiresTime) {
728         MetaDataManager::GetInstance().DelMeta(sub.GetRelationKey(bundle), true);
729     }
730 }
731 
Execute(Task task)732 void CloudServiceImpl::Execute(Task task)
733 {
734     auto executor = executor_;
735     if (executor == nullptr) {
736         return;
737     }
738     executor->Execute(std::move(task));
739 }
740 
ConvertAction(const std::map<std::string,int32_t> & actions)741 std::map<std::string, int32_t> CloudServiceImpl::ConvertAction(const std::map<std::string, int32_t> &actions)
742 {
743     std::map<std::string, int32_t> genActions;
744     for (const auto &[bundleName, action] : actions) {
745         switch (action) {
746             case CloudService::Action::CLEAR_CLOUD_INFO:
747                 genActions.emplace(bundleName, GeneralStore::CleanMode::CLOUD_INFO);
748                 break;
749             case CloudService::Action::CLEAR_CLOUD_DATA_AND_INFO:
750                 genActions.emplace(bundleName, GeneralStore::CleanMode::CLOUD_DATA);
751                 break;
752             default:
753                 ZLOGE("invalid action. action:%{public}d, bundleName:%{public}s", action, bundleName.c_str());
754                 return {};
755         }
756     }
757     return genActions;
758 }
759 
AllocResourceAndShare(const std::string & storeId,const DistributedRdb::PredicatesMemo & predicates,const std::vector<std::string> & columns,const Participants & participants)760 std::pair<int32_t, std::vector<NativeRdb::ValuesBucket>> CloudServiceImpl::AllocResourceAndShare(
761     const std::string& storeId, const DistributedRdb::PredicatesMemo& predicates,
762     const std::vector<std::string>& columns, const Participants& participants)
763 {
764     auto tokenId = IPCSkeleton::GetCallingTokenID();
765     auto hapInfo = GetHapInfo(tokenId);
766     if (hapInfo.bundleName.empty() || hapInfo.user == INVALID_USER_ID) {
767         ZLOGE("bundleName is empty or invalid user, user:%{public}d, storeId:%{public}s", hapInfo.user,
768             Anonymous::Change(storeId).c_str());
769         return { E_ERROR, {} };
770     }
771     if (predicates.tables_.empty()) {
772         ZLOGE("invalid args, tables size:%{public}zu, storeId:%{public}s", predicates.tables_.size(),
773             Anonymous::Change(storeId).c_str());
774         return { E_INVALID_ARGS, {} };
775     }
776     auto memo = std::make_shared<DistributedRdb::PredicatesMemo>(predicates);
777     CloudEvent::StoreInfo storeInfo;
778     storeInfo.bundleName = hapInfo.bundleName;
779     storeInfo.tokenId = tokenId;
780     storeInfo.user = hapInfo.user;
781     storeInfo.storeName = storeId;
782     std::shared_ptr<GenQuery> query;
783     MakeQueryEvent::Callback asyncCallback = [&query](std::shared_ptr<GenQuery> genQuery) {
784         query = genQuery;
785     };
786     auto evt = std::make_unique<MakeQueryEvent>(storeInfo, memo, columns, asyncCallback);
787     EventCenter::GetInstance().PostEvent(std::move(evt));
788     if (query == nullptr) {
789         ZLOGE("query is null, storeId:%{public}s,", Anonymous::Change(storeId).c_str());
790         return { E_ERROR, {} };
791     }
792     auto [status, cursor] = PreShare(storeInfo, *query);
793     if (status != GeneralError::E_OK || cursor == nullptr) {
794         ZLOGE("PreShare fail, storeId:%{public}s, status:%{public}d", Anonymous::Change(storeId).c_str(), status);
795         return { E_ERROR, {} };
796     }
797     auto valueBuckets = ConvertCursor(cursor);
798     Results results;
799     for (auto &valueBucket : valueBuckets) {
800         NativeRdb::ValueObject object;
801         if (!valueBucket.GetObject(DistributedRdb::Field::SHARING_RESOURCE_FIELD, object)) {
802             continue;
803         }
804         std::string shareRes;
805         if (object.GetString(shareRes) != E_OK) {
806             continue;
807         }
808         Share(shareRes, participants, results);
809     }
810     return { SUCCESS, std::move(valueBuckets) };
811 }
812 
ConvertCursor(std::shared_ptr<Cursor> cursor) const813 std::vector<NativeRdb::ValuesBucket> CloudServiceImpl::ConvertCursor(std::shared_ptr<Cursor> cursor) const
814 {
815     std::vector<NativeRdb::ValuesBucket> valueBuckets;
816     int32_t count = cursor->GetCount();
817     valueBuckets.reserve(count);
818     auto err = cursor->MoveToFirst();
819     while (err == E_OK && count > 0) {
820         VBucket entry;
821         err = cursor->GetEntry(entry);
822         if (err != E_OK) {
823             break;
824         }
825         NativeRdb::ValuesBucket bucket;
826         for (auto &[key, value] : entry) {
827             NativeRdb::ValueObject object;
828             DistributedData::Convert(std::move(value), object.value);
829             bucket.values_.insert_or_assign(key, std::move(object));
830         }
831         valueBuckets.emplace_back(std::move(bucket));
832         err = cursor->MoveToNext();
833         count--;
834     }
835     return valueBuckets;
836 }
837 
GetHapInfo(uint32_t tokenId)838 CloudServiceImpl::HapInfo CloudServiceImpl::GetHapInfo(uint32_t tokenId)
839 {
840     HapTokenInfo tokenInfo;
841     int errCode = AccessTokenKit::GetHapTokenInfo(tokenId, tokenInfo);
842     if (errCode != RET_SUCCESS) {
843         ZLOGE("GetHapTokenInfo error:%{public}d, tokenId:0x%{public}x", errCode, tokenId);
844         return { INVALID_USER_ID, -1, "" };
845     }
846     return { tokenInfo.userID, tokenInfo.instIndex, tokenInfo.bundleName };
847 }
848 
Share(const std::string & sharingRes,const Participants & participants,Results & results)849 int32_t CloudServiceImpl::Share(const std::string &sharingRes, const Participants &participants, Results &results)
850 {
851     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
852     if (hapInfo.bundleName.empty()) {
853         ZLOGE("bundleName is empty, sharingRes:%{public}s", Anonymous::Change(sharingRes).c_str());
854         return E_ERROR;
855     }
856     auto instance = GetSharingHandle(hapInfo);
857     if (instance == nullptr) {
858         return NOT_SUPPORT;
859     }
860     results = instance->Share(hapInfo.user, hapInfo.bundleName, sharingRes, Convert(participants));
861     int32_t status = std::get<0>(results);
862     ZLOGD("status:%{public}d", status);
863     return Convert(static_cast<CenterCode>(status));
864 }
865 
Unshare(const std::string & sharingRes,const Participants & participants,Results & results)866 int32_t CloudServiceImpl::Unshare(const std::string &sharingRes, const Participants &participants, Results &results)
867 {
868     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
869     if (hapInfo.bundleName.empty()) {
870         ZLOGE("bundleName is empty, sharingRes:%{public}s", Anonymous::Change(sharingRes).c_str());
871         return E_ERROR;
872     }
873     auto instance = GetSharingHandle(hapInfo);
874     if (instance == nullptr) {
875         return NOT_SUPPORT;
876     }
877     results = instance->Unshare(hapInfo.user, hapInfo.bundleName, sharingRes, Convert(participants));
878     int32_t status = std::get<0>(results);
879     ZLOGD("status:%{public}d", status);
880     return Convert(static_cast<CenterCode>(status));
881 }
882 
Exit(const std::string & sharingRes,std::pair<int32_t,std::string> & result)883 int32_t CloudServiceImpl::Exit(const std::string &sharingRes, std::pair<int32_t, std::string> &result)
884 {
885     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
886     if (hapInfo.bundleName.empty()) {
887         ZLOGE("bundleName is empty, sharingRes:%{public}s", Anonymous::Change(sharingRes).c_str());
888         return E_ERROR;
889     }
890     auto instance = GetSharingHandle(hapInfo);
891     if (instance == nullptr) {
892         return NOT_SUPPORT;
893     }
894     result = instance->Exit(hapInfo.user, hapInfo.bundleName, sharingRes);
895     int32_t status = result.first;
896     ZLOGD("status:%{public}d", status);
897     return Convert(static_cast<CenterCode>(status));
898 }
899 
ChangePrivilege(const std::string & sharingRes,const Participants & participants,Results & results)900 int32_t CloudServiceImpl::ChangePrivilege(const std::string &sharingRes, const Participants &participants,
901     Results &results)
902 {
903     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
904     if (hapInfo.bundleName.empty()) {
905         ZLOGE("bundleName is empty, sharingRes:%{public}s", Anonymous::Change(sharingRes).c_str());
906         return E_ERROR;
907     }
908     auto instance = GetSharingHandle(hapInfo);
909     if (instance == nullptr) {
910         return NOT_SUPPORT;
911     }
912     results = instance->ChangePrivilege(hapInfo.user, hapInfo.bundleName, sharingRes, Convert(participants));
913     int32_t status = std::get<0>(results);
914     ZLOGD("status:%{public}d", status);
915     return Convert(static_cast<CenterCode>(status));
916 }
917 
Query(const std::string & sharingRes,QueryResults & results)918 int32_t CloudServiceImpl::Query(const std::string &sharingRes, QueryResults &results)
919 {
920     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
921     if (hapInfo.bundleName.empty()) {
922         ZLOGE("bundleName is empty, sharingRes:%{public}s", Anonymous::Change(sharingRes).c_str());
923         return E_ERROR;
924     }
925     auto instance = GetSharingHandle(hapInfo);
926     if (instance == nullptr) {
927         return NOT_SUPPORT;
928     }
929     auto queryResults = instance->Query(hapInfo.user, hapInfo.bundleName, sharingRes);
930     results = Convert(queryResults);
931     int32_t status = std::get<0>(queryResults);
932     ZLOGD("status:%{public}d", status);
933     return Convert(static_cast<CenterCode>(status));
934 }
935 
QueryByInvitation(const std::string & invitation,QueryResults & results)936 int32_t CloudServiceImpl::QueryByInvitation(const std::string &invitation, QueryResults &results)
937 {
938     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
939     if (hapInfo.bundleName.empty()) {
940         ZLOGE("bundleName is empty, invitation:%{public}s", Anonymous::Change(invitation).c_str());
941         return E_ERROR;
942     }
943     auto instance = GetSharingHandle(hapInfo);
944     if (instance == nullptr) {
945         return NOT_SUPPORT;
946     }
947     auto queryResults = instance->QueryByInvitation(hapInfo.user, hapInfo.bundleName, invitation);
948     results = Convert(queryResults);
949     int32_t status = std::get<0>(queryResults);
950     ZLOGD("status:%{public}d", status);
951     return Convert(static_cast<CenterCode>(status));
952 }
953 
ConfirmInvitation(const std::string & invitation,int32_t confirmation,std::tuple<int32_t,std::string,std::string> & result)954 int32_t CloudServiceImpl::ConfirmInvitation(const std::string &invitation, int32_t confirmation,
955     std::tuple<int32_t, std::string, std::string> &result)
956 {
957     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
958     if (hapInfo.bundleName.empty()) {
959         ZLOGE("bundleName is empty, invitation:%{public}s, confirmation:%{public}d",
960             Anonymous::Change(invitation).c_str(), confirmation);
961         return E_ERROR;
962     }
963     auto instance = GetSharingHandle(hapInfo);
964     if (instance == nullptr) {
965         return NOT_SUPPORT;
966     }
967     result = instance->ConfirmInvitation(hapInfo.user, hapInfo.bundleName, invitation, confirmation);
968     int32_t status = std::get<0>(result);
969     ZLOGD("status:%{public}d", status);
970     return Convert(static_cast<CenterCode>(status));
971 }
972 
ChangeConfirmation(const std::string & sharingRes,int32_t confirmation,std::pair<int32_t,std::string> & result)973 int32_t CloudServiceImpl::ChangeConfirmation(const std::string &sharingRes, int32_t confirmation,
974     std::pair<int32_t, std::string> &result)
975 {
976     auto hapInfo = GetHapInfo(IPCSkeleton::GetCallingTokenID());
977     if (hapInfo.bundleName.empty()) {
978         ZLOGE("bundleName is empty, sharingRes:%{public}s", Anonymous::Change(sharingRes).c_str());
979         return E_ERROR;
980     }
981     auto instance = GetSharingHandle(hapInfo);
982     if (instance == nullptr) {
983         return NOT_SUPPORT;
984     }
985     result = instance->ChangeConfirmation(hapInfo.user, hapInfo.bundleName, sharingRes, confirmation);
986     int32_t status = result.first;
987     ZLOGD("status:%{public}d", status);
988     return Convert(static_cast<CenterCode>(status));
989 }
990 
GetSharingHandle(const HapInfo & hapInfo)991 std::shared_ptr<SharingCenter> CloudServiceImpl::GetSharingHandle(const HapInfo &hapInfo)
992 {
993     auto instance = CloudServer::GetInstance();
994     if (instance == nullptr) {
995         return nullptr;
996     }
997     auto handle = instance->ConnectSharingCenter(hapInfo.user, hapInfo.bundleName);
998     return handle;
999 }
1000 
GenSubTask(Task task,int32_t user)1001 ExecutorPool::Task CloudServiceImpl::GenSubTask(Task task, int32_t user)
1002 {
1003     return [this, user, work = std::move(task)] () {
1004         {
1005             std::lock_guard<decltype(mutex_)> lock(mutex_);
1006             subTask_ = ExecutorPool::INVALID_TASK_ID;
1007         }
1008         auto [status, cloudInfo] = GetCloudInfoFromMeta(user);
1009         if (status != SUCCESS || !cloudInfo.enableCloud || cloudInfo.IsAllSwitchOff()) {
1010             ZLOGW("[sub task] all switch off, status:%{public}d user:%{public}d enableCloud:%{public}d",
1011                 status, user, cloudInfo.enableCloud);
1012             return;
1013         }
1014         work();
1015     };
1016 }
1017 
InitSubTask(const Subscription & sub)1018 void CloudServiceImpl::InitSubTask(const Subscription &sub)
1019 {
1020     auto expire = sub.GetMinExpireTime();
1021     if (expire == INVALID_SUB_TIME) {
1022         return;
1023     }
1024     auto executor = executor_;
1025     if (executor == nullptr) {
1026         return;
1027     }
1028     ZLOGI("Subscription Info, subTask:%{public}" PRIu64", user:%{public}d", subTask_, sub.userId);
1029     expire = expire - TIME_BEFORE_SUB; // before 12 hours
1030     auto now = static_cast<uint64_t>(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count());
1031     Duration delay = expire > now ? milliseconds(expire - now) : milliseconds(0);
1032     std::lock_guard<decltype(mutex_)> lock(mutex_);
1033     if (subTask_ != ExecutorPool::INVALID_TASK_ID) {
1034         if (expire < expireTime_) {
1035             subTask_ = executor->Reset(subTask_, delay);
1036             expireTime_ = expire > now ? expire : now;
1037         }
1038         return;
1039     }
1040     subTask_ = executor->Schedule(delay, GenSubTask(GenTask(0, sub.userId), sub.userId));
1041     expireTime_ = expire > now ? expire : now;
1042 }
1043 
1044 } // namespace OHOS::CloudData