• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mission/distributed_sched_mission_manager.h"
17 
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21 
22 #include "datetime_ex.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "nlohmann/json.hpp"
26 #include "string_ex.h"
27 #include "system_ability_definition.h"
28 
29 #include "distributed_sched_adapter.h"
30 #include "distributed_sched_utils.h"
31 #include "dtbschedmgr_device_info_storage.h"
32 #include "dtbschedmgr_log.h"
33 #include "mission/mission_changed_notify.h"
34 #include "mission/mission_constant.h"
35 #include "mission/mission_info_converter.h"
36 #include "mission/snapshot_converter.h"
37 #include "caller_info.h"
38 #include "os_account_manager.h"
39 #include "ohos_account_kits.h"
40 
41 namespace OHOS {
42 namespace DistributedSchedule {
43 namespace {
44 const std::string TAG = "DistributedSchedMissionManager";
45 constexpr size_t MAX_CACHED_ITEM = 10;
46 constexpr int32_t MAX_RETRY_TIMES = 15;
47 constexpr int32_t RETRY_DELAYED = 2000;
48 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
49 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
50 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
51 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
52 constexpr int32_t NET_STATE = 0;
53 }
54 namespace Mission {
55 constexpr int32_t GET_MAX_MISSIONS = 20;
56 } // Mission
57 using namespace std::chrono;
58 using namespace Constants::Mission;
59 using namespace OHOS::DistributedKv;
60 using namespace DistributedHardware;
61 using AccountInfo = IDistributedSched::AccountInfo;
62 
63 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
64 
Init()65 void DistributedSchedMissionManager::Init()
66 {
67     listenerDeath_ = new ListenerDeathRecipient();
68     remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
69     auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
70     missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
71     auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
72     updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
73     missonChangeListener_ = new DistributedMissionChangeListener();
74     auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
75     missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
76 }
77 
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)78 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
79     int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
80 {
81     HILOGI("start!");
82     if (!IsDeviceIdValidated(deviceId)) {
83         return INVALID_PARAMETERS_ERR;
84     }
85     if (numMissions <= 0) {
86         HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
87         return INVALID_PARAMETERS_ERR;
88     }
89     std::vector<DstbMissionInfo> dstbMissionInfoSet;
90     int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
91     if (ret != ERR_OK) {
92         HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
93         return ret;
94     }
95     return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
96 }
97 
GetRemoteDms(const std::string & deviceId)98 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
99 {
100     if (deviceId.empty()) {
101         HILOGE("GetRemoteDms remoteDeviceId is empty");
102         return nullptr;
103     }
104     int64_t begin = GetTickCount();
105     {
106         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
107         auto iter = remoteDmsMap_.find(deviceId);
108         if (iter != remoteDmsMap_.end()) {
109             auto object = iter->second;
110             if (object != nullptr) {
111                 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
112                     GetTickCount() - begin);
113                 return object;
114             }
115         }
116     }
117     HILOGD("GetRemoteDms connect deviceid is %s", GetAnonymStr(deviceId).c_str());
118     auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
119     if (samgr == nullptr) {
120         HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
121         return nullptr;
122     }
123     auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
124     if (object == nullptr) {
125         HILOGE("GetRemoteDms failed to get dms for remote device: %{public}s!", GetAnonymStr(deviceId).c_str());
126         return nullptr;
127     }
128     auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
129     HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
130     sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
131     {
132         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
133         auto iter = remoteDmsMap_.find(deviceId);
134         if (iter != remoteDmsMap_.end()) {
135             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
136         }
137         remoteDmsMap_[deviceId] = remoteDmsObj;
138     }
139     HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
140     return remoteDmsObj;
141 }
142 
IsDeviceIdValidated(const std::string & deviceId)143 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
144 {
145     if (deviceId.empty()) {
146         HILOGE("IsDeviceIdValidated deviceId is empty!");
147         return false;
148     }
149     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
150         HILOGW("IsDeviceIdValidated device offline.");
151         return false;
152     }
153 
154     return true;
155 }
156 
NotifyRemoteDied(const wptr<IRemoteObject> & remote)157 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
158 {
159     if (distributedDataStorage_ == nullptr) {
160         HILOGE("DistributedDataStorage null!");
161         return;
162     }
163     distributedDataStorage_->NotifyRemoteDied(remote);
164 }
165 
NotifyNetDisconnectOffline()166 void DistributedSchedMissionManager::NotifyNetDisconnectOffline()
167 {
168     HILOGD("NotifyNetDisconnectOffline start.");
169     {
170         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
171         if (listenDeviceMap_.empty()) {
172             HILOGI("The connect is null!");
173             return;
174         }
175         for (auto& listenDevice : listenDeviceMap_) {
176             std::u16string devId = listenDevice.first;
177             std::set<sptr<IRemoteObject>> listenerSet = listenDevice.second.listenerSet;
178             for (auto connect : listenerSet) {
179                 MissionChangedNotify::NotifyNetDisconnect(connect, devId, NET_STATE);
180             }
181         }
182     }
183     HILOGD("NotifyNetDisconnectOffline end.");
184 }
185 
GetOsAccountData(AccountInfo & dmsAccountInfo)186 bool DistributedSchedMissionManager::GetOsAccountData(AccountInfo& dmsAccountInfo)
187 {
188 #ifdef OS_ACCOUNT_PART
189     std::vector<int32_t> ids;
190     ErrCode ret = AccountSA::OsAccountManager::QueryActiveOsAccountIds(ids);
191     if (ret != ERR_OK || ids.empty()) {
192         HILOGE("Get userId from active Os AccountIds fail, ret : %{public}d", ret);
193         return false;
194     }
195     dmsAccountInfo.userId = ids[0];
196 
197     AccountSA::OhosAccountInfo osAccountInfo;
198     ret = AccountSA::OhosAccountKits::GetInstance().GetOhosAccountInfo(osAccountInfo);
199     if (ret != 0 || osAccountInfo.uid_ == "") {
200         HILOGE("Get accountId from Ohos account info fail, ret: %{public}d.", ret);
201         return false;
202     }
203     dmsAccountInfo.activeAccountId = osAccountInfo.uid_;
204 #endif
205     return true;
206 }
207 
InitDataStorage()208 int32_t DistributedSchedMissionManager::InitDataStorage()
209 {
210     if (distributedDataStorage_ == nullptr) {
211         distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
212     }
213     if (!distributedDataStorage_->Init()) {
214         HILOGE("InitDataStorage DistributedDataStorage init failed!");
215         return ERR_NULL_OBJECT;
216     }
217     return ERR_NONE;
218 }
219 
StopDataStorage()220 int32_t DistributedSchedMissionManager::StopDataStorage()
221 {
222     if (distributedDataStorage_ == nullptr) {
223         HILOGE("StopDataStorage DistributedDataStorage null!");
224         return ERR_NULL_OBJECT;
225     }
226     if (!distributedDataStorage_->Stop()) {
227         HILOGE("StopDataStorage DistributedDataStorage stop failed!");
228         return ERR_NULL_OBJECT;
229     }
230     return ERR_NONE;
231 }
232 
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)233 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
234     const uint8_t* byteStream, size_t len)
235 {
236     if (distributedDataStorage_ == nullptr) {
237         HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
238         return ERR_NULL_OBJECT;
239     }
240     if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
241         HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
242         return INVALID_PARAMETERS_ERR;
243     }
244     return ERR_NONE;
245 }
246 
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)247 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
248 {
249     if (distributedDataStorage_ == nullptr) {
250         HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
251         return ERR_NULL_OBJECT;
252     }
253     if (!distributedDataStorage_->Delete(deviceId, missionId)) {
254         HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
255         return INVALID_PARAMETERS_ERR;
256     }
257     return ERR_NONE;
258 }
259 
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)260 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
261     std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
262 {
263     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
264     if (uuid.empty()) {
265         HILOGE("uuid is empty!");
266         return INVALID_PARAMETERS_ERR;
267     }
268     std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
269     if (snapshotPtr != nullptr) {
270         HILOGI("Get snapshot from cache success, uuid: %{public}s, missionId: %{public}d.",
271             GetAnonymStr(uuid).c_str(), missionId);
272         SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
273         return ERR_NONE;
274     }
275     if (distributedDataStorage_ == nullptr) {
276         HILOGE("DistributedDataStorage is null!");
277         return ERR_NULL_OBJECT;
278     }
279     DistributedKv::Value value;
280     bool ret = distributedDataStorage_->Query(networkId, missionId, value);
281     if (!ret) {
282         HILOGE("DistributedDataStorage query failed!");
283         return INVALID_PARAMETERS_ERR;
284     }
285     snapshotPtr = Snapshot::Create(value.Data());
286     if (snapshotPtr == nullptr) {
287         HILOGE("snapshot create failed!");
288         return ERR_NULL_OBJECT;
289     }
290     HILOGI("Get snapshot from DistributedDB success, uuid: %{public}s, missionId: %{public}d.",
291         GetAnonymStr(uuid).c_str(), missionId);
292     SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
293     return ERR_NONE;
294 }
295 
DeviceOnlineNotify(const std::string & networkId)296 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
297 {
298     if (networkId.empty()) {
299         HILOGW("DeviceOnlineNotify networkId empty!");
300         return;
301     }
302 
303     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
304     if (missionHandler_ != nullptr) {
305         HILOGI("DeviceOnlineNotify RemoveTask");
306         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
307     }
308 }
309 
DeviceOfflineNotify(const std::string & networkId)310 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
311 {
312     if (networkId.empty()) {
313         HILOGW("DeviceOfflineNotify networkId empty!");
314         return;
315     }
316     StopSyncMissionsFromRemote(networkId);
317     CleanMissionResources(networkId);
318     {
319         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
320         auto iter = remoteDmsMap_.find(networkId);
321         if (iter != remoteDmsMap_.end()) {
322             if (iter->second == nullptr) {
323                 return;
324             }
325             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
326             remoteDmsMap_.erase(iter);
327         }
328     }
329     HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s.", GetAnonymStr(networkId).c_str());
330 }
331 
DeleteDataStorage(const std::string & deviceId,bool isDelayed)332 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
333 {
334     if (distributedDataStorage_ == nullptr) {
335         HILOGE("DeleteDataStorage DistributedDataStorage null!");
336         return;
337     }
338     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
339     auto callback = [this, uuid, deviceId]() {
340         if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
341             HILOGE("DeleteDataStorage storage delete failed!");
342         } else {
343             HILOGI("DeleteDataStorage storage delete successfully!");
344         }
345     };
346     if (isDelayed) {
347         if (missionHandler_ != nullptr) {
348             HILOGI("DeleteDataStorage PostTask");
349             missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
350         }
351     } else {
352         if (missionHandler_ != nullptr) {
353             HILOGI("DeleteDataStorage RemoveTask");
354             missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
355         }
356         callback();
357     }
358 }
359 
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)360 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
361     const sptr<IRemoteObject>& listener)
362 {
363     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
364     if (uuid.empty()) {
365         HILOGE("uuid is empty!");
366         return INVALID_PARAMETERS_ERR;
367     }
368     if (missionHandler_ != nullptr) {
369         HILOGI("RemoveTask");
370         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
371     }
372     if (listener == nullptr) {
373         return INVALID_PARAMETERS_ERR;
374     }
375     std::string localDeviceId;
376     std::string remoteDeviceId = Str16ToStr8(devId);
377     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
378         || localDeviceId == remoteDeviceId) {
379         HILOGE("check deviceId failed!");
380         return INVALID_PARAMETERS_ERR;
381     }
382     {
383         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
384         auto& listenerInfo = listenDeviceMap_[devId];
385         if (!listenerInfo.Emplace(listener)) {
386             HILOGW("RegisterSyncListener listener has already inserted!");
387             return ERR_NONE;
388         }
389         bool ret = listener->AddDeathRecipient(listenerDeath_);
390         if (!ret) {
391             HILOGW("RegisterSyncListener AddDeathRecipient failed!");
392         }
393         if (listenerInfo.Size() > 1) {
394             HILOGI("RegisterMissionListener not notify remote DMS!");
395             return ERR_NONE;
396         }
397     }
398     return ERR_NONE;
399 }
400 
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId,uint32_t callingTokenId)401 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
402     const std::string& localDevId, uint32_t callingTokenId)
403 {
404     std::u16string devId = Str8ToStr16(dstDevId);
405     {
406         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
407         auto iterItem = listenDeviceMap_.find(devId);
408         if (iterItem == listenDeviceMap_.end()) {
409             return ERR_NONE;
410         }
411         bool callFlag = iterItem->second.called;
412         if (callFlag) {
413             HILOGI("StartSyncRemoteMissions already called!");
414             return ERR_NONE;
415         }
416     }
417     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
418     if (remoteDms == nullptr) {
419         HILOGE("get remoteDms failed!");
420         RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
421         return GET_REMOTE_DMS_FAIL;
422     }
423     int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms, callingTokenId);
424     if (ret == ERR_NONE) {
425         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
426         auto iterItem = listenDeviceMap_.find(devId);
427         if (iterItem != listenDeviceMap_.end()) {
428             iterItem->second.called = true;
429         }
430     }
431     return ret;
432 }
433 
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms,uint32_t callingTokenId)434 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
435     const sptr<IDistributedSched>& remoteDms, uint32_t callingTokenId)
436 {
437     if (remoteDms == nullptr) {
438         HILOGE("remoteDms is null");
439         return INVALID_PARAMETERS_ERR;
440     }
441     std::vector<DstbMissionInfo> missionInfos;
442     CallerInfo callerInfo;
443     if (!GenerateCallerInfo(callerInfo)) {
444         return GET_LOCAL_DEVICE_ERR;
445     }
446 
447     AccountInfo callerAccountInfo;
448     if (!GetOsAccountData(callerAccountInfo)) {
449         HILOGE("Get Os accountId and userId fail.");
450         return GET_LOCAL_DEVICE_ERR;
451     }
452     callerInfo.accessToken = callingTokenId;
453     nlohmann::json extraInfoJson;
454     extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID] = callerAccountInfo.activeAccountId;
455     extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_USERID_ID] = callerAccountInfo.userId;
456     extraInfoJson["accessToken"] = callerInfo.accessToken;
457     callerInfo.extraInfoJson = extraInfoJson;
458     int64_t begin = GetTickCount();
459     int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
460     HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
461         ret, GetTickCount() - begin);
462     if (ret == ERR_NONE) {
463         RebornMissionCache(dstDevId, missionInfos);
464     }
465     return ret;
466 }
467 
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)468 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
469     const sptr<IRemoteObject>& listener)
470 {
471     if (listener == nullptr) {
472         return INVALID_PARAMETERS_ERR;
473     }
474     if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
475         return INVALID_PARAMETERS_ERR;
476     }
477     std::string localDeviceId;
478     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
479         || localDeviceId == Str16ToStr8(devId)) {
480         HILOGE("check deviceId fail");
481         return INVALID_PARAMETERS_ERR;
482     }
483     {
484         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
485         auto iterItem = listenDeviceMap_.find(devId);
486         if (iterItem == listenDeviceMap_.end()) {
487             return ERR_NONE;
488         }
489         auto& listenerInfo = iterItem->second;
490         auto ret = listenerInfo.Find(listener);
491         if (!ret) {
492             HILOGI("listener not registered!");
493             return ERR_NONE;
494         }
495         listener->RemoveDeathRecipient(listenerDeath_);
496         listenerInfo.Erase(listener);
497         if (!listenerInfo.Empty()) {
498             return ERR_NONE;
499         }
500         listenDeviceMap_.erase(iterItem);
501     }
502     return ERR_NONE;
503 }
504 
CleanMissionResources(const std::string & networkId)505 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
506 {
507     {
508         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
509         auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
510         if (iterDevice == listenDeviceMap_.end()) {
511             return;
512         }
513         auto& listenerInfo = iterDevice->second;
514         for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
515             if (listener != nullptr) {
516                 listener->RemoveDeathRecipient(listenerDeath_);
517             }
518         }
519         listenDeviceMap_.erase(iterDevice);
520     }
521     StopSyncRemoteMissions(networkId, true);
522 }
523 
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)524 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
525     bool offline, bool exit)
526 {
527     CleanMissionCache(dstDevId);
528     DeleteCachedSnapshotInfo(dstDevId);
529     DeleteDataStorage(dstDevId, true);
530 
531     if (offline) {
532         return ERR_NONE;
533     }
534     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
535     if (remoteDms == nullptr) {
536         HILOGE("DMS get remoteDms failed");
537         return GET_REMOTE_DMS_FAIL;
538     }
539 
540     CallerInfo callerInfo;
541     if (!GenerateCallerInfo(callerInfo)) {
542         return GET_LOCAL_DEVICE_ERR;
543     }
544     int64_t begin = GetTickCount();
545     int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
546     HILOGI("[PerformanceTest] ret: %{public}d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
547     return ret;
548 }
549 
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag,uint32_t callingTokenId)550 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
551     int64_t tag, uint32_t callingTokenId)
552 {
553     std::string localDeviceId;
554     if (!IsDeviceIdValidated(dstDevId)) {
555         return INVALID_PARAMETERS_ERR;
556     }
557     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
558         || (dstDevId == localDeviceId)) {
559         HILOGE("check deviceId fail");
560         return INVALID_PARAMETERS_ERR;
561     }
562     HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
563         GetAnonymStr(dstDevId).c_str(), GetAnonymStr(localDeviceId).c_str());
564     auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId, callingTokenId);
565     if (ret != ERR_NONE) {
566         HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
567         return ret;
568     }
569     return ERR_NONE;
570 }
571 
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)572 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
573     std::vector<DstbMissionInfo>& missionInfoSet)
574 {
575     auto deviceId = callerInfo.sourceDeviceId;
576     HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
577     {
578         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
579         remoteSyncDeviceSet_.emplace(deviceId);
580     }
581 
582     nlohmann::json extraInfoJson = callerInfo.extraInfoJson;
583     if (extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID) == extraInfoJson.end() &&
584         extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_USERID_ID) == extraInfoJson.end()) {
585         HILOGW("older version does not need check access control.");
586     } else {
587         bool res = CheckAccessControlForMissions(callerInfo);
588         if (!res) {
589             HILOGE("check access control failed");
590             return INVALID_PARAMETERS_ERR;
591         }
592     }
593 
594     int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
595         missionInfoSet);
596     auto func = [this, missionInfoSet]() {
597         HILOGD("RegisterMissionListener called.");
598         if (!isRegMissionChange_) {
599             int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
600             if (ret == ERR_OK) {
601                 isRegMissionChange_ = true;
602             }
603             InitAllSnapshots(missionInfoSet);
604         }
605     };
606     if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
607         HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
608     }
609     return result;
610 }
611 
CheckAccessControlForMissions(const CallerInfo & callerInfo)612 bool DistributedSchedMissionManager::CheckAccessControlForMissions(const CallerInfo& callerInfo)
613 {
614     AccountInfo accountInfo;
615     nlohmann::json extraInfoJson = callerInfo.extraInfoJson;
616     if (!extraInfoJson.is_discarded()) {
617         if (extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID) != extraInfoJson.end() &&
618             extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID].is_string()) {
619             accountInfo.activeAccountId = extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID];
620         }
621         if (extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_USERID_ID) != extraInfoJson.end() &&
622             extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_USERID_ID].is_number()) {
623             accountInfo.userId = extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_USERID_ID];
624         }
625     }
626     DmAccessCaller dmSrcCaller = {
627         .accountId = accountInfo.activeAccountId,
628         .networkId = callerInfo.sourceDeviceId,
629         .userId = accountInfo.userId,
630     };
631     std::string localDeviceId;
632     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
633         HILOGE("GetLocalDeviceId failed");
634         return false;
635     }
636     AccountInfo dstAccountInfo;
637     if (!GetOsAccountData(dstAccountInfo)) {
638         HILOGE("Get Os accountId and userId fail.");
639         return false;
640     }
641     DmAccessCallee dmDstCallee = {
642         .accountId = dstAccountInfo.activeAccountId,
643         .networkId = localDeviceId,
644         .userId = dstAccountInfo.userId,
645     };
646     return DeviceManager::GetInstance().CheckSinkAccessControl(dmSrcCaller, dmDstCallee);
647 }
648 
StopSyncMissionsFromRemote(const std::string & networkId)649 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
650 {
651     HILOGD(" %{private}s!", GetAnonymStr(networkId).c_str());
652     {
653         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
654         remoteSyncDeviceSet_.erase(networkId);
655         if (remoteSyncDeviceSet_.empty()) {
656             auto func = [this]() {
657                 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
658                 if (ret == ERR_OK) {
659                     isRegMissionChange_ = false;
660                 }
661             };
662             if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
663                 HILOGE("post UnRegisterMissionListener Task failed");
664             }
665         }
666     }
667 }
668 
NeedSyncDevice(const std::string & deviceId)669 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
670 {
671     if (deviceId.empty()) {
672         HILOGD("deviceId empty!");
673         return false;
674     }
675     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
676     if (remoteSyncDeviceSet_.count(deviceId) == 0) {
677         return false;
678     }
679     return true;
680 }
681 
HasSyncListener(const std::string & networkId)682 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
683 {
684     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
685     auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
686     if (iter != listenDeviceMap_.end()) {
687         return iter->second.called;
688     }
689     return false;
690 }
691 
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)692 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
693 {
694     std::u16string u16DevId = Str8ToStr16(networkId);
695     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
696     auto iter = listenDeviceMap_.find(u16DevId);
697     if (iter == listenDeviceMap_.end()) {
698         return;
699     }
700     auto& listenerInfo = iter->second;
701     for (auto& listener : listenerInfo.listenerSet) {
702         MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
703     }
704 }
705 
OnRemoteDied(const wptr<IRemoteObject> & remote)706 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
707 {
708     HILOGD("OnRemoteDied!");
709     sptr<IRemoteObject> listener = remote.promote();
710     if (listener == nullptr) {
711         HILOGW("listener is null");
712         return;
713     }
714     auto remoteDiedFunc = [this, listener]() {
715         OnMissionListenerDied(listener);
716     };
717     if (missionHandler_ != nullptr) {
718         missionHandler_->PostTask(remoteDiedFunc);
719     }
720 }
721 
OnRemoteDied(const wptr<IRemoteObject> & remote)722 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
723 {
724     DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
725 }
726 
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)727 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
728     std::unique_ptr<Snapshot> snapshot)
729 {
730     if (deviceId.empty() || snapshot == nullptr) {
731         HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
732         return;
733     }
734     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
735     std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
736     auto iter = cachedSnapshotInfos_.find(keyInfo);
737     if (iter != cachedSnapshotInfos_.end()) {
738         if (iter->second == nullptr) {
739             HILOGE("snapshotInfo is null");
740             return;
741         }
742         if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
743             return;
744         }
745     }
746 
747     if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
748         int64_t oldest = -1;
749         auto iterOldest = cachedSnapshotInfos_.end();
750         for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
751             if (iterItem->second == nullptr) {
752                 HILOGE("snapshotInfo is null");
753                 continue;
754             }
755             if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
756                 oldest = iterItem->second->GetLastAccessTime();
757                 iterOldest = iterItem;
758             }
759         }
760         if (iterOldest != cachedSnapshotInfos_.end()) {
761             cachedSnapshotInfos_.erase(iterOldest);
762         }
763     }
764     cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
765 }
766 
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)767 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
768     int32_t missionId)
769 {
770     if (deviceId.empty()) {
771         HILOGW("DequeueCachedSnapshotInfo invalid input param!");
772         return nullptr;
773     }
774     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
775     auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
776     if (iter != cachedSnapshotInfos_.end()) {
777         std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
778         if (snapshot == nullptr) {
779             HILOGE("snapshot is null");
780             return nullptr;
781         }
782         snapshot->UpdateLastAccessTime(GetTickCount());
783         iter->second = nullptr;
784         cachedSnapshotInfos_.erase(iter);
785         return snapshot;
786     }
787     return nullptr;
788 }
789 
DeleteCachedSnapshotInfo(const std::string & networkId)790 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
791 {
792     if (networkId.empty()) {
793         HILOGW("networkId empty!");
794         return;
795     }
796     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
797     if (uuid.empty()) {
798         HILOGW("uuid empty!");
799         return;
800     }
801     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
802     auto iter = cachedSnapshotInfos_.begin();
803     while (iter != cachedSnapshotInfos_.end()) {
804         if (iter->first.find(uuid) != std::string::npos) {
805             iter = cachedSnapshotInfos_.erase(iter);
806         } else {
807             ++iter;
808         }
809     }
810 }
811 
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)812 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
813     std::vector<DstbMissionInfo>& missionInfoSet)
814 {
815     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
816     if (uuid.empty()) {
817         HILOGE("uuid empty!");
818         return INVALID_PARAMETERS_ERR;
819     }
820     std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
821     auto iter = deviceMissionInfos_.find(uuid);
822     if (iter == deviceMissionInfos_.end()) {
823         HILOGE("can not find uuid, deviceId: %{public}s!", GetAnonymStr(srcId).c_str());
824         return ERR_NULL_OBJECT;
825     }
826 
827     // get at most numMissions missions
828     int32_t actualNums = static_cast<int32_t>((iter->second).size());
829     if (actualNums < 0) {
830         HILOGE("invalid size!");
831         return ERR_NULL_OBJECT;
832     }
833     missionInfoSet.assign((iter->second).begin(),
834         (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
835     return ERR_NONE;
836 }
837 
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)838 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
839     const std::vector<DstbMissionInfo>& missionInfoSet)
840 {
841     HILOGI("start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
842     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
843     if (uuid.empty()) {
844         HILOGE("uuid empty!");
845         return;
846     }
847     {
848         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
849         deviceMissionInfos_[uuid] = missionInfoSet;
850     }
851     HILOGI("RebornMissionCache end!");
852 }
853 
CleanMissionCache(const std::string & deviceId)854 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
855 {
856     HILOGI("CleanMissionCache start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
857     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
858     if (uuid.empty()) {
859         HILOGE("CleanMissionCache uuid empty!");
860         return;
861     }
862     {
863         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
864         deviceMissionInfos_.erase(uuid);
865     }
866     HILOGI("CleanMissionCache end!");
867 }
868 
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)869 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
870     const std::vector<DstbMissionInfo>& missionInfoSet)
871 {
872     HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
873     std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
874     RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
875     {
876         HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
877         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
878         auto iter = listenDeviceMap_.find(u16DevId);
879         if (iter == listenDeviceMap_.end()) {
880             HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
881             return INVALID_PARAMETERS_ERR;
882         }
883         auto& listenerSet = iter->second.listenerSet;
884         auto notifyChanged = [listenerSet, u16DevId] () {
885             for (const auto& listener : listenerSet) {
886                 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
887             }
888         };
889         if (missionHandler_ != nullptr) {
890             missionHandler_->PostTask(notifyChanged);
891             HILOGI("NotifyMissionsChangedFromRemote end!");
892             return ERR_NONE;
893         }
894     }
895     return INVALID_PARAMETERS_ERR;
896 }
897 
NotifyLocalMissionsChanged()898 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
899 {
900     auto func = [this]() {
901         HILOGI("NotifyLocalMissionsChanged");
902         std::vector<DstbMissionInfo> missionInfos;
903         int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
904             missionInfos);
905         if (ret == ERR_OK) {
906             int32_t result = NotifyMissionsChangedToRemote(missionInfos);
907             HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
908         }
909     };
910     if (missionChangeHandler_ == nullptr) {
911         HILOGE("missionChangeHandler_ is null");
912         return;
913     }
914     if (!missionChangeHandler_->PostTask(func)) {
915         HILOGE("postTask failed");
916     }
917 }
918 
NotifyMissionSnapshotCreated(int32_t missionId)919 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
920 {
921     auto func = [this, missionId]() {
922         HILOGD("called.");
923         ErrCode errCode = MissionSnapshotChanged(missionId);
924         if (errCode != ERR_OK) {
925             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
926         }
927     };
928     if (missionChangeHandler_ == nullptr) {
929         HILOGE("missionChangeHandler_ is null");
930         return;
931     }
932     if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
933         HILOGE("post MissionSnapshotChanged delay Task failed");
934     }
935 }
936 
NotifyMissionSnapshotChanged(int32_t missionId)937 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
938 {
939     auto func = [this, missionId]() {
940         HILOGD("called.");
941         ErrCode errCode = MissionSnapshotChanged(missionId);
942         if (errCode != ERR_OK) {
943             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
944         }
945     };
946     if (missionChangeHandler_ == nullptr) {
947         HILOGE("missionChangeHandler_ is null");
948         return;
949     }
950     if (!missionChangeHandler_->PostTask(func)) {
951         HILOGE("post MissionSnapshotChanged Task failed");
952     }
953 }
954 
NotifyMissionSnapshotDestroyed(int32_t missionId)955 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
956 {
957     auto func = [this, missionId]() {
958         HILOGD("called.");
959         ErrCode errCode = MissionSnapshotDestroyed(missionId);
960         if (errCode != ERR_OK) {
961             HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
962         }
963     };
964     if (missionChangeHandler_ == nullptr) {
965         HILOGE("missionChangeHandler_ is null");
966         return;
967     }
968     if (!missionChangeHandler_->PostTask(func)) {
969         HILOGE("post MissionSnapshotDestroyed Task failed");
970     }
971 }
972 
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)973 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
974     const std::vector<DstbMissionInfo> &missionInfoSet)
975 {
976     CallerInfo callerInfo;
977     if (!GenerateCallerInfo(callerInfo)) {
978         return GET_LOCAL_DEVICE_ERR;
979     }
980     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
981     for (const auto& destDeviceId : remoteSyncDeviceSet_) {
982         auto handler = FetchDeviceHandler(destDeviceId);
983         if (handler == nullptr) {
984             HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
985             continue;
986         }
987         auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
988             NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
989         };
990         if (!handler->PostTask(callback)) {
991             HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
992             return ERR_NULL_OBJECT;
993         }
994     }
995 
996     return ERR_NONE;
997 }
998 
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)999 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
1000     const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
1001 {
1002     sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
1003     if (remoteDms == nullptr) {
1004         HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
1005         return;
1006     }
1007     int64_t begin = GetTickCount();
1008     int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
1009     HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
1010         result, GetTickCount() - begin);
1011 }
1012 
GenerateCallerInfo(CallerInfo & callerInfo)1013 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
1014 {
1015     std::string localUuid;
1016     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
1017         HILOGE("get local uuid failed!");
1018         return false;
1019     }
1020     callerInfo.uid = IPCSkeleton::GetCallingUid();
1021     callerInfo.pid = IPCSkeleton::GetCallingRealPid();
1022     callerInfo.callerType = CALLER_TYPE_HARMONY;
1023     callerInfo.sourceDeviceId = localUuid;
1024     callerInfo.dmsVersion = VERSION;
1025     return true;
1026 }
1027 
FetchDeviceHandler(const std::string & deviceId)1028 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
1029     const std::string& deviceId)
1030 {
1031     if (!IsDeviceIdValidated(deviceId)) {
1032         HILOGW("FetchDeviceHandler device:%{public}s offline.", GetAnonymStr(deviceId).c_str());
1033         return nullptr;
1034     }
1035 
1036     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
1037     if (uuid.empty()) {
1038         HILOGE("FetchDeviceHandler uuid empty!");
1039         return nullptr;
1040     }
1041 
1042     auto iter = deviceHandle_.find(uuid);
1043     if (iter != deviceHandle_.end()) {
1044         return iter->second;
1045     }
1046 
1047     auto anonyUuid = GetAnonymStr(uuid);
1048     auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
1049     auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
1050     deviceHandle_.emplace(uuid, handler);
1051     return handler;
1052 }
1053 
OnRemoteDied(const wptr<IRemoteObject> & remote)1054 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
1055 {
1056     HILOGI("OnRemoteDied received died notify!");
1057     DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
1058 }
1059 
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)1060 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
1061 {
1062     sptr<IRemoteObject> diedRemoted = remote.promote();
1063     if (diedRemoted == nullptr) {
1064         HILOGW("OnRemoteDmsDied promote failed!");
1065         return;
1066     }
1067     HILOGD("delete diedRemoted");
1068     auto remoteDmsDiedFunc = [this, diedRemoted]() {
1069         OnRemoteDmsDied(diedRemoted);
1070     };
1071     if (missionHandler_ != nullptr) {
1072         missionHandler_->PostTask(remoteDmsDiedFunc);
1073     }
1074 }
1075 
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)1076 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
1077     const std::string& localDevId, int32_t retryTimes)
1078 {
1079     auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
1080         bool ret = HasSyncListener(dstDeviceId);
1081         if (!ret) {
1082             return;
1083         }
1084         sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
1085         if (remoteDms == nullptr) {
1086             HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
1087             RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
1088             return;
1089         }
1090         int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
1091         HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
1092     };
1093     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1094         missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
1095     }
1096 }
1097 
OnMissionListenerDied(const sptr<IRemoteObject> & remote)1098 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
1099 {
1100     HILOGI("OnMissionListenerDied");
1101     std::set<std::string> deviceSet;
1102     {
1103         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
1104         auto iterItem = listenDeviceMap_.begin();
1105         while (iterItem != listenDeviceMap_.end()) {
1106             auto& listenerInfo = iterItem->second;
1107             auto ret = listenerInfo.Find(remote);
1108             if (!ret) {
1109                 ++iterItem;
1110                 continue;
1111             }
1112             if (remote != nullptr) {
1113                 remote->RemoveDeathRecipient(listenerDeath_);
1114             }
1115             listenerInfo.Erase(remote);
1116             if (listenerInfo.Empty()) {
1117                 if (listenerInfo.called) {
1118                     deviceSet.emplace(Str16ToStr8(iterItem->first));
1119                 }
1120                 iterItem = listenDeviceMap_.erase(iterItem);
1121             } else {
1122                 ++iterItem;
1123             }
1124         }
1125     }
1126     for (auto& devId : deviceSet) {
1127         StopSyncRemoteMissions(devId, false);
1128     }
1129 }
1130 
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)1131 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
1132 {
1133     HILOGI("OnRemoteDmsDied");
1134     std::string devId;
1135     {
1136         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
1137         for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
1138             if (iter->second->AsObject() == remote && iter->second->AsObject() != nullptr) {
1139                 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
1140                 devId = iter->first;
1141                 remoteDmsMap_.erase(iter);
1142                 break;
1143             }
1144         }
1145     }
1146     if (devId.empty()) {
1147         return;
1148     }
1149     bool ret = HasSyncListener(devId);
1150     if (ret) {
1151         std::string localDeviceId;
1152         if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1153             return;
1154         }
1155         RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1156     }
1157 }
1158 
NotifyDmsProxyProcessDied()1159 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1160 {
1161     HILOGI("NotifyDmsProxyProcessDied!");
1162     if (!isRegMissionChange_) {
1163         return;
1164     }
1165     RetryRegisterMissionChange(0);
1166 }
1167 
RetryRegisterMissionChange(int32_t retryTimes)1168 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1169 {
1170     auto remoteDiedFunc = [this, retryTimes]() {
1171         HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1172         if (!isRegMissionChange_) {
1173             return;
1174         }
1175         int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1176         if (ret == ERR_NULL_OBJECT) {
1177             RetryRegisterMissionChange(retryTimes + 1);
1178             HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1179             return;
1180         }
1181         HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1182     };
1183     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1184         missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1185     }
1186 }
1187 
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1188 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1189 {
1190     for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1191         ErrCode errCode = MissionSnapshotChanged(iter->id);
1192         if (errCode != ERR_OK) {
1193             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1194         }
1195     }
1196 }
1197 
MissionSnapshotChanged(int32_t missionId)1198 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1199 {
1200     std::string networkId;
1201     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1202         HILOGE("get local networkId failed!");
1203         return INVALID_PARAMETERS_ERR;
1204     }
1205     AAFwk::MissionSnapshot missionSnapshot;
1206     ErrCode errCode = DistributedSchedAdapter::GetInstance()
1207         .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1208     if (errCode != ERR_OK) {
1209         HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1210         return errCode;
1211     }
1212     Snapshot snapshot;
1213     SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1214     MessageParcel data;
1215     errCode = MissionSnapshotSequence(snapshot, data);
1216     if (errCode != ERR_OK) {
1217         HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1218         return errCode;
1219     }
1220     size_t len = data.GetReadableBytes();
1221     const uint8_t* byteStream = data.ReadBuffer(len);
1222     if (byteStream == nullptr) {
1223         HILOGE("Failed to read length.");
1224         return INVALID_PARAMETERS_ERR;
1225     }
1226     errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1227     return errCode;
1228 }
1229 
MissionSnapshotDestroyed(int32_t missionId)1230 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1231 {
1232     std::string networkId;
1233     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1234         HILOGE("get local networkId failed!");
1235         return INVALID_PARAMETERS_ERR;
1236     }
1237     ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1238     return errCode;
1239 }
1240 
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1241 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1242 {
1243     bool ret = snapshot.WriteSnapshotInfo(data);
1244     if (!ret) {
1245         HILOGE("WriteSnapshotInfo failed!");
1246         return ERR_FLATTEN_OBJECT;
1247     }
1248     ret = snapshot.WritePixelMap(data);
1249     if (!ret) {
1250         HILOGE("WritePixelMap failed!");
1251         return ERR_FLATTEN_OBJECT;
1252     }
1253     return ERR_OK;
1254 }
1255 
OnDnetDied()1256 void DistributedSchedMissionManager::OnDnetDied()
1257 {
1258     auto dnetDiedFunc = [this]() {
1259         HILOGI("OnDnetDied");
1260         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1261         if (!isRegMissionChange_) {
1262             return;
1263         }
1264         remoteSyncDeviceSet_.clear();
1265         DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1266         isRegMissionChange_ = false;
1267     };
1268     if (missionHandler_ != nullptr) {
1269         missionHandler_->PostTask(dnetDiedFunc);
1270     }
1271 }
1272 } // namespace DistributedSchedule
1273 } // namespace OHOS
1274