• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mission/distributed_sched_mission_manager.h"
17 
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21 
22 #include "datetime_ex.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "nlohmann/json.hpp"
26 #include "string_ex.h"
27 #include "system_ability_definition.h"
28 
29 #include "distributed_sched_adapter.h"
30 #include "distributed_sched_utils.h"
31 #include "dtbschedmgr_device_info_storage.h"
32 #include "dtbschedmgr_log.h"
33 #include "mission/mission_changed_notify.h"
34 #include "mission/mission_constant.h"
35 #include "mission/mission_info_converter.h"
36 #include "mission/snapshot_converter.h"
37 
38 namespace OHOS {
39 namespace DistributedSchedule {
40 namespace {
41 const std::string TAG = "DistributedSchedMissionManager";
42 constexpr size_t MAX_CACHED_ITEM = 10;
43 constexpr int32_t MAX_RETRY_TIMES = 15;
44 constexpr int32_t RETRY_DELAYED = 2000;
45 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
46 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
47 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
48 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
49 constexpr int32_t NET_STATE = 0;
50 }
51 namespace Mission {
52 constexpr int32_t GET_MAX_MISSIONS = 20;
53 } // Mission
54 using namespace std::chrono;
55 using namespace Constants::Mission;
56 using namespace OHOS::DistributedKv;
57 
58 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
59 
Init()60 void DistributedSchedMissionManager::Init()
61 {
62     listenerDeath_ = new ListenerDeathRecipient();
63     remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
64     auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
65     missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
66     auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
67     updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
68     missonChangeListener_ = new DistributedMissionChangeListener();
69     auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
70     missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
71 }
72 
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)73 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
74     int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
75 {
76     HILOGI("start!");
77     if (!IsDeviceIdValidated(deviceId)) {
78         return INVALID_PARAMETERS_ERR;
79     }
80     if (numMissions <= 0) {
81         HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
82         return INVALID_PARAMETERS_ERR;
83     }
84     std::vector<DstbMissionInfo> dstbMissionInfoSet;
85     int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
86     if (ret != ERR_OK) {
87         HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
88         return ret;
89     }
90     return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
91 }
92 
GetRemoteDms(const std::string & deviceId)93 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
94 {
95     if (deviceId.empty()) {
96         HILOGE("GetRemoteDms remoteDeviceId is empty");
97         return nullptr;
98     }
99     int64_t begin = GetTickCount();
100     {
101         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
102         auto iter = remoteDmsMap_.find(deviceId);
103         if (iter != remoteDmsMap_.end()) {
104             auto object = iter->second;
105             if (object != nullptr) {
106                 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
107                     GetTickCount() - begin);
108                 return object;
109             }
110         }
111     }
112     HILOGD("GetRemoteDms connect deviceid is %s", GetAnonymStr(deviceId).c_str());
113     auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
114     if (samgr == nullptr) {
115         HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
116         return nullptr;
117     }
118     auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
119     if (object == nullptr) {
120         HILOGE("GetRemoteDms failed to get dms for remote device: %{public}s!", GetAnonymStr(deviceId).c_str());
121         return nullptr;
122     }
123     auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
124     HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
125     sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
126     {
127         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
128         auto iter = remoteDmsMap_.find(deviceId);
129         if (iter != remoteDmsMap_.end()) {
130             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
131         }
132         remoteDmsMap_[deviceId] = remoteDmsObj;
133     }
134     HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
135     return remoteDmsObj;
136 }
137 
IsDeviceIdValidated(const std::string & deviceId)138 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
139 {
140     if (deviceId.empty()) {
141         HILOGE("IsDeviceIdValidated deviceId is empty!");
142         return false;
143     }
144     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
145         HILOGW("IsDeviceIdValidated device offline.");
146         return false;
147     }
148 
149     return true;
150 }
151 
NotifyRemoteDied(const wptr<IRemoteObject> & remote)152 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
153 {
154     if (distributedDataStorage_ == nullptr) {
155         HILOGE("DistributedDataStorage null!");
156         return;
157     }
158     distributedDataStorage_->NotifyRemoteDied(remote);
159 }
160 
NotifyNetDisconnectOffline()161 void DistributedSchedMissionManager::NotifyNetDisconnectOffline()
162 {
163     HILOGD("NotifyNetDisconnectOffline start.");
164     {
165         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
166         if (listenDeviceMap_.empty()) {
167             HILOGI("The connect is null!");
168             return;
169         }
170         for (auto& listenDevice : listenDeviceMap_) {
171             std::u16string devId = listenDevice.first;
172             std::set<sptr<IRemoteObject>> listenerSet = listenDevice.second.listenerSet;
173             for (auto connect : listenerSet) {
174                 MissionChangedNotify::NotifyNetDisconnect(connect, devId, NET_STATE);
175             }
176         }
177     }
178     HILOGD("NotifyNetDisconnectOffline end.");
179 }
180 
InitDataStorage()181 int32_t DistributedSchedMissionManager::InitDataStorage()
182 {
183     if (distributedDataStorage_ == nullptr) {
184         distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
185     }
186     if (!distributedDataStorage_->Init()) {
187         HILOGE("InitDataStorage DistributedDataStorage init failed!");
188         return ERR_NULL_OBJECT;
189     }
190     return ERR_NONE;
191 }
192 
StopDataStorage()193 int32_t DistributedSchedMissionManager::StopDataStorage()
194 {
195     if (distributedDataStorage_ == nullptr) {
196         HILOGE("StopDataStorage DistributedDataStorage null!");
197         return ERR_NULL_OBJECT;
198     }
199     if (!distributedDataStorage_->Stop()) {
200         HILOGE("StopDataStorage DistributedDataStorage stop failed!");
201         return ERR_NULL_OBJECT;
202     }
203     return ERR_NONE;
204 }
205 
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)206 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
207     const uint8_t* byteStream, size_t len)
208 {
209     if (distributedDataStorage_ == nullptr) {
210         HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
211         return ERR_NULL_OBJECT;
212     }
213     if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
214         HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
215         return INVALID_PARAMETERS_ERR;
216     }
217     return ERR_NONE;
218 }
219 
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)220 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
221 {
222     if (distributedDataStorage_ == nullptr) {
223         HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
224         return ERR_NULL_OBJECT;
225     }
226     if (!distributedDataStorage_->Delete(deviceId, missionId)) {
227         HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
228         return INVALID_PARAMETERS_ERR;
229     }
230     return ERR_NONE;
231 }
232 
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)233 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
234     std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
235 {
236     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
237     if (uuid.empty()) {
238         HILOGE("uuid is empty!");
239         return INVALID_PARAMETERS_ERR;
240     }
241     std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
242     if (snapshotPtr != nullptr) {
243         HILOGI("Get snapshot from cache success, uuid: %{public}s, missionId: %{public}d.",
244             GetAnonymStr(uuid).c_str(), missionId);
245         SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
246         return ERR_NONE;
247     }
248     if (distributedDataStorage_ == nullptr) {
249         HILOGE("DistributedDataStorage is null!");
250         return ERR_NULL_OBJECT;
251     }
252     DistributedKv::Value value;
253     bool ret = distributedDataStorage_->Query(networkId, missionId, value);
254     if (!ret) {
255         HILOGE("DistributedDataStorage query failed!");
256         return INVALID_PARAMETERS_ERR;
257     }
258     snapshotPtr = Snapshot::Create(value.Data());
259     if (snapshotPtr == nullptr) {
260         HILOGE("snapshot create failed!");
261         return ERR_NULL_OBJECT;
262     }
263     HILOGI("Get snapshot from DistributedDB success, uuid: %{public}s, missionId: %{public}d.",
264         GetAnonymStr(uuid).c_str(), missionId);
265     SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
266     return ERR_NONE;
267 }
268 
DeviceOnlineNotify(const std::string & networkId)269 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
270 {
271     if (networkId.empty()) {
272         HILOGW("DeviceOnlineNotify networkId empty!");
273         return;
274     }
275 
276     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
277     if (missionHandler_ != nullptr) {
278         HILOGI("DeviceOnlineNotify RemoveTask");
279         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
280     }
281 }
282 
DeviceOfflineNotify(const std::string & networkId)283 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
284 {
285     if (networkId.empty()) {
286         HILOGW("DeviceOfflineNotify networkId empty!");
287         return;
288     }
289     StopSyncMissionsFromRemote(networkId);
290     CleanMissionResources(networkId);
291     {
292         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
293         auto iter = remoteDmsMap_.find(networkId);
294         if (iter != remoteDmsMap_.end()) {
295             if (iter->second == nullptr) {
296                 return;
297             }
298             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
299             remoteDmsMap_.erase(iter);
300         }
301     }
302     HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s.", GetAnonymStr(networkId).c_str());
303 }
304 
DeleteDataStorage(const std::string & deviceId,bool isDelayed)305 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
306 {
307     if (distributedDataStorage_ == nullptr) {
308         HILOGE("DeleteDataStorage DistributedDataStorage null!");
309         return;
310     }
311     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
312     auto callback = [this, uuid, deviceId]() {
313         if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
314             HILOGE("DeleteDataStorage storage delete failed!");
315         } else {
316             HILOGI("DeleteDataStorage storage delete successfully!");
317         }
318     };
319     if (isDelayed) {
320         if (missionHandler_ != nullptr) {
321             HILOGI("DeleteDataStorage PostTask");
322             missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
323         }
324     } else {
325         if (missionHandler_ != nullptr) {
326             HILOGI("DeleteDataStorage RemoveTask");
327             missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
328         }
329         callback();
330     }
331 }
332 
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)333 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
334     const sptr<IRemoteObject>& listener)
335 {
336     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
337     if (uuid.empty()) {
338         HILOGE("uuid is empty!");
339         return INVALID_PARAMETERS_ERR;
340     }
341     if (missionHandler_ != nullptr) {
342         HILOGI("RemoveTask");
343         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
344     }
345     if (listener == nullptr) {
346         return INVALID_PARAMETERS_ERR;
347     }
348     std::string localDeviceId;
349     std::string remoteDeviceId = Str16ToStr8(devId);
350     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
351         || localDeviceId == remoteDeviceId) {
352         HILOGE("check deviceId failed!");
353         return INVALID_PARAMETERS_ERR;
354     }
355     {
356         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
357         auto& listenerInfo = listenDeviceMap_[devId];
358         if (!listenerInfo.Emplace(listener)) {
359             HILOGW("RegisterSyncListener listener has already inserted!");
360             return ERR_NONE;
361         }
362         bool ret = listener->AddDeathRecipient(listenerDeath_);
363         if (!ret) {
364             HILOGW("RegisterSyncListener AddDeathRecipient failed!");
365         }
366         if (listenerInfo.Size() > 1) {
367             HILOGI("RegisterMissionListener not notify remote DMS!");
368             return ERR_NONE;
369         }
370     }
371     return ERR_NONE;
372 }
373 
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)374 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
375     const std::string& localDevId)
376 {
377     std::u16string devId = Str8ToStr16(dstDevId);
378     {
379         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
380         auto iterItem = listenDeviceMap_.find(devId);
381         if (iterItem == listenDeviceMap_.end()) {
382             return ERR_NONE;
383         }
384         bool callFlag = iterItem->second.called;
385         if (callFlag) {
386             HILOGI("StartSyncRemoteMissions already called!");
387             return ERR_NONE;
388         }
389     }
390     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
391     if (remoteDms == nullptr) {
392         HILOGE("get remoteDms failed!");
393         RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
394         return GET_REMOTE_DMS_FAIL;
395     }
396     int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
397     if (ret == ERR_NONE) {
398         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
399         auto iterItem = listenDeviceMap_.find(devId);
400         if (iterItem != listenDeviceMap_.end()) {
401             iterItem->second.called = true;
402         }
403     }
404     return ret;
405 }
406 
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)407 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
408     const sptr<IDistributedSched>& remoteDms)
409 {
410     if (remoteDms == nullptr) {
411         HILOGE("remoteDms is null");
412         return INVALID_PARAMETERS_ERR;
413     }
414     std::vector<DstbMissionInfo> missionInfos;
415     CallerInfo callerInfo;
416     if (!GenerateCallerInfo(callerInfo)) {
417         return GET_LOCAL_DEVICE_ERR;
418     }
419     int64_t begin = GetTickCount();
420     int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
421     HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
422         ret, GetTickCount() - begin);
423     if (ret == ERR_NONE) {
424         RebornMissionCache(dstDevId, missionInfos);
425     }
426     return ret;
427 }
428 
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)429 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
430     const sptr<IRemoteObject>& listener)
431 {
432     if (listener == nullptr) {
433         return INVALID_PARAMETERS_ERR;
434     }
435     if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
436         return INVALID_PARAMETERS_ERR;
437     }
438     std::string localDeviceId;
439     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
440         || localDeviceId == Str16ToStr8(devId)) {
441         HILOGE("check deviceId fail");
442         return INVALID_PARAMETERS_ERR;
443     }
444     {
445         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
446         auto iterItem = listenDeviceMap_.find(devId);
447         if (iterItem == listenDeviceMap_.end()) {
448             return ERR_NONE;
449         }
450         auto& listenerInfo = iterItem->second;
451         auto ret = listenerInfo.Find(listener);
452         if (!ret) {
453             HILOGI("listener not registered!");
454             return ERR_NONE;
455         }
456         listener->RemoveDeathRecipient(listenerDeath_);
457         listenerInfo.Erase(listener);
458         if (!listenerInfo.Empty()) {
459             return ERR_NONE;
460         }
461         listenDeviceMap_.erase(iterItem);
462     }
463     return ERR_NONE;
464 }
465 
CleanMissionResources(const std::string & networkId)466 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
467 {
468     {
469         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
470         auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
471         if (iterDevice == listenDeviceMap_.end()) {
472             return;
473         }
474         auto& listenerInfo = iterDevice->second;
475         for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
476             if (listener != nullptr) {
477                 listener->RemoveDeathRecipient(listenerDeath_);
478             }
479         }
480         listenDeviceMap_.erase(iterDevice);
481     }
482     StopSyncRemoteMissions(networkId, true);
483 }
484 
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)485 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
486     bool offline, bool exit)
487 {
488     CleanMissionCache(dstDevId);
489     DeleteCachedSnapshotInfo(dstDevId);
490     DeleteDataStorage(dstDevId, true);
491 
492     if (offline) {
493         return ERR_NONE;
494     }
495     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
496     if (remoteDms == nullptr) {
497         HILOGE("DMS get remoteDms failed");
498         return GET_REMOTE_DMS_FAIL;
499     }
500 
501     CallerInfo callerInfo;
502     if (!GenerateCallerInfo(callerInfo)) {
503         return GET_LOCAL_DEVICE_ERR;
504     }
505     int64_t begin = GetTickCount();
506     int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
507     HILOGI("[PerformanceTest] ret: %{public}d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
508     return ret;
509 }
510 
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)511 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
512     int64_t tag)
513 {
514     std::string localDeviceId;
515     if (!IsDeviceIdValidated(dstDevId)) {
516         return INVALID_PARAMETERS_ERR;
517     }
518     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
519         || (dstDevId == localDeviceId)) {
520         HILOGE("check deviceId fail");
521         return INVALID_PARAMETERS_ERR;
522     }
523     HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
524         GetAnonymStr(dstDevId).c_str(), GetAnonymStr(localDeviceId).c_str());
525     auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
526     if (ret != ERR_NONE) {
527         HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
528         return ret;
529     }
530     return ERR_NONE;
531 }
532 
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)533 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
534     std::vector<DstbMissionInfo>& missionInfoSet)
535 {
536     auto deviceId = callerInfo.sourceDeviceId;
537     HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
538     {
539         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
540         remoteSyncDeviceSet_.emplace(deviceId);
541     }
542     int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
543         missionInfoSet);
544     auto func = [this, missionInfoSet]() {
545         HILOGD("RegisterMissionListener called.");
546         if (!isRegMissionChange_) {
547             int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
548             if (ret == ERR_OK) {
549                 isRegMissionChange_ = true;
550             }
551             InitAllSnapshots(missionInfoSet);
552         }
553     };
554     if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
555         HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
556     }
557     return result;
558 }
559 
StopSyncMissionsFromRemote(const std::string & networkId)560 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
561 {
562     HILOGD(" %{private}s!", GetAnonymStr(networkId).c_str());
563     {
564         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
565         remoteSyncDeviceSet_.erase(networkId);
566         if (remoteSyncDeviceSet_.empty()) {
567             auto func = [this]() {
568                 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
569                 if (ret == ERR_OK) {
570                     isRegMissionChange_ = false;
571                 }
572             };
573             if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
574                 HILOGE("post UnRegisterMissionListener Task failed");
575             }
576         }
577     }
578 }
579 
NeedSyncDevice(const std::string & deviceId)580 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
581 {
582     if (deviceId.empty()) {
583         HILOGD("deviceId empty!");
584         return false;
585     }
586     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
587     if (remoteSyncDeviceSet_.count(deviceId) == 0) {
588         return false;
589     }
590     return true;
591 }
592 
HasSyncListener(const std::string & networkId)593 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
594 {
595     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
596     auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
597     if (iter != listenDeviceMap_.end()) {
598         return iter->second.called;
599     }
600     return false;
601 }
602 
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)603 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
604 {
605     std::u16string u16DevId = Str8ToStr16(networkId);
606     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
607     auto iter = listenDeviceMap_.find(u16DevId);
608     if (iter == listenDeviceMap_.end()) {
609         return;
610     }
611     auto& listenerInfo = iter->second;
612     for (auto& listener : listenerInfo.listenerSet) {
613         MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
614     }
615 }
616 
OnRemoteDied(const wptr<IRemoteObject> & remote)617 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
618 {
619     HILOGD("OnRemoteDied!");
620     sptr<IRemoteObject> listener = remote.promote();
621     if (listener == nullptr) {
622         HILOGW("listener is null");
623         return;
624     }
625     auto remoteDiedFunc = [this, listener]() {
626         OnMissionListenerDied(listener);
627     };
628     if (missionHandler_ != nullptr) {
629         missionHandler_->PostTask(remoteDiedFunc);
630     }
631 }
632 
OnRemoteDied(const wptr<IRemoteObject> & remote)633 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
634 {
635     DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
636 }
637 
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)638 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
639     std::unique_ptr<Snapshot> snapshot)
640 {
641     if (deviceId.empty() || snapshot == nullptr) {
642         HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
643         return;
644     }
645     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
646     std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
647     auto iter = cachedSnapshotInfos_.find(keyInfo);
648     if (iter != cachedSnapshotInfos_.end()) {
649         if (iter->second == nullptr) {
650             HILOGE("snapshotInfo is null");
651             return;
652         }
653         if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
654             return;
655         }
656     }
657 
658     if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
659         int64_t oldest = -1;
660         auto iterOldest = cachedSnapshotInfos_.end();
661         for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
662             if (iterItem->second == nullptr) {
663                 HILOGE("snapshotInfo is null");
664                 continue;
665             }
666             if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
667                 oldest = iterItem->second->GetLastAccessTime();
668                 iterOldest = iterItem;
669             }
670         }
671         if (iterOldest != cachedSnapshotInfos_.end()) {
672             cachedSnapshotInfos_.erase(iterOldest);
673         }
674     }
675     cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
676 }
677 
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)678 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
679     int32_t missionId)
680 {
681     if (deviceId.empty()) {
682         HILOGW("DequeueCachedSnapshotInfo invalid input param!");
683         return nullptr;
684     }
685     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
686     auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
687     if (iter != cachedSnapshotInfos_.end()) {
688         std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
689         if (snapshot == nullptr) {
690             HILOGE("snapshot is null");
691             return nullptr;
692         }
693         snapshot->UpdateLastAccessTime(GetTickCount());
694         iter->second = nullptr;
695         cachedSnapshotInfos_.erase(iter);
696         return snapshot;
697     }
698     return nullptr;
699 }
700 
DeleteCachedSnapshotInfo(const std::string & networkId)701 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
702 {
703     if (networkId.empty()) {
704         HILOGW("networkId empty!");
705         return;
706     }
707     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
708     if (uuid.empty()) {
709         HILOGW("uuid empty!");
710         return;
711     }
712     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
713     auto iter = cachedSnapshotInfos_.begin();
714     while (iter != cachedSnapshotInfos_.end()) {
715         if (iter->first.find(uuid) != std::string::npos) {
716             iter = cachedSnapshotInfos_.erase(iter);
717         } else {
718             ++iter;
719         }
720     }
721 }
722 
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)723 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
724     std::vector<DstbMissionInfo>& missionInfoSet)
725 {
726     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
727     if (uuid.empty()) {
728         HILOGE("uuid empty!");
729         return INVALID_PARAMETERS_ERR;
730     }
731     std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
732     auto iter = deviceMissionInfos_.find(uuid);
733     if (iter == deviceMissionInfos_.end()) {
734         HILOGE("can not find uuid, deviceId: %{public}s!", GetAnonymStr(srcId).c_str());
735         return ERR_NULL_OBJECT;
736     }
737 
738     // get at most numMissions missions
739     int32_t actualNums = static_cast<int32_t>((iter->second).size());
740     if (actualNums < 0) {
741         HILOGE("invalid size!");
742         return ERR_NULL_OBJECT;
743     }
744     missionInfoSet.assign((iter->second).begin(),
745         (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
746     return ERR_NONE;
747 }
748 
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)749 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
750     const std::vector<DstbMissionInfo>& missionInfoSet)
751 {
752     HILOGI("start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
753     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
754     if (uuid.empty()) {
755         HILOGE("uuid empty!");
756         return;
757     }
758     {
759         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
760         deviceMissionInfos_[uuid] = missionInfoSet;
761     }
762     HILOGI("RebornMissionCache end!");
763 }
764 
CleanMissionCache(const std::string & deviceId)765 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
766 {
767     HILOGI("CleanMissionCache start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
768     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
769     if (uuid.empty()) {
770         HILOGE("CleanMissionCache uuid empty!");
771         return;
772     }
773     {
774         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
775         deviceMissionInfos_.erase(uuid);
776     }
777     HILOGI("CleanMissionCache end!");
778 }
779 
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)780 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
781     const std::vector<DstbMissionInfo>& missionInfoSet)
782 {
783     HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
784     std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
785     RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
786     {
787         HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
788         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
789         auto iter = listenDeviceMap_.find(u16DevId);
790         if (iter == listenDeviceMap_.end()) {
791             HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
792             return INVALID_PARAMETERS_ERR;
793         }
794         auto& listenerSet = iter->second.listenerSet;
795         auto notifyChanged = [listenerSet, u16DevId] () {
796             for (const auto& listener : listenerSet) {
797                 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
798             }
799         };
800         if (missionHandler_ != nullptr) {
801             missionHandler_->PostTask(notifyChanged);
802             HILOGI("NotifyMissionsChangedFromRemote end!");
803             return ERR_NONE;
804         }
805     }
806     return INVALID_PARAMETERS_ERR;
807 }
808 
NotifyLocalMissionsChanged()809 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
810 {
811     auto func = [this]() {
812         HILOGI("NotifyLocalMissionsChanged");
813         std::vector<DstbMissionInfo> missionInfos;
814         int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
815             missionInfos);
816         if (ret == ERR_OK) {
817             int32_t result = NotifyMissionsChangedToRemote(missionInfos);
818             HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
819         }
820     };
821     if (missionChangeHandler_ == nullptr) {
822         HILOGE("missionChangeHandler_ is null");
823         return;
824     }
825     if (!missionChangeHandler_->PostTask(func)) {
826         HILOGE("postTask failed");
827     }
828 }
829 
NotifyMissionSnapshotCreated(int32_t missionId)830 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
831 {
832     auto func = [this, missionId]() {
833         HILOGD("called.");
834         ErrCode errCode = MissionSnapshotChanged(missionId);
835         if (errCode != ERR_OK) {
836             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
837         }
838     };
839     if (missionChangeHandler_ == nullptr) {
840         HILOGE("missionChangeHandler_ is null");
841         return;
842     }
843     if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
844         HILOGE("post MissionSnapshotChanged delay Task failed");
845     }
846 }
847 
NotifyMissionSnapshotChanged(int32_t missionId)848 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
849 {
850     auto func = [this, missionId]() {
851         HILOGD("called.");
852         ErrCode errCode = MissionSnapshotChanged(missionId);
853         if (errCode != ERR_OK) {
854             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
855         }
856     };
857     if (missionChangeHandler_ == nullptr) {
858         HILOGE("missionChangeHandler_ is null");
859         return;
860     }
861     if (!missionChangeHandler_->PostTask(func)) {
862         HILOGE("post MissionSnapshotChanged Task failed");
863     }
864 }
865 
NotifyMissionSnapshotDestroyed(int32_t missionId)866 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
867 {
868     auto func = [this, missionId]() {
869         HILOGD("called.");
870         ErrCode errCode = MissionSnapshotDestroyed(missionId);
871         if (errCode != ERR_OK) {
872             HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
873         }
874     };
875     if (missionChangeHandler_ == nullptr) {
876         HILOGE("missionChangeHandler_ is null");
877         return;
878     }
879     if (!missionChangeHandler_->PostTask(func)) {
880         HILOGE("post MissionSnapshotDestroyed Task failed");
881     }
882 }
883 
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)884 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
885     const std::vector<DstbMissionInfo> &missionInfoSet)
886 {
887     CallerInfo callerInfo;
888     if (!GenerateCallerInfo(callerInfo)) {
889         return GET_LOCAL_DEVICE_ERR;
890     }
891     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
892     for (const auto& destDeviceId : remoteSyncDeviceSet_) {
893         auto handler = FetchDeviceHandler(destDeviceId);
894         if (handler == nullptr) {
895             HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
896             continue;
897         }
898         auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
899             NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
900         };
901         if (!handler->PostTask(callback)) {
902             HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
903             return ERR_NULL_OBJECT;
904         }
905     }
906 
907     return ERR_NONE;
908 }
909 
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)910 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
911     const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
912 {
913     sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
914     if (remoteDms == nullptr) {
915         HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
916         return;
917     }
918     int64_t begin = GetTickCount();
919     int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
920     HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
921         result, GetTickCount() - begin);
922 }
923 
GenerateCallerInfo(CallerInfo & callerInfo)924 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
925 {
926     std::string localUuid;
927     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
928         HILOGE("get local uuid failed!");
929         return false;
930     }
931     callerInfo.uid = IPCSkeleton::GetCallingUid();
932     callerInfo.pid = IPCSkeleton::GetCallingRealPid();
933     callerInfo.callerType = CALLER_TYPE_HARMONY;
934     callerInfo.sourceDeviceId = localUuid;
935     callerInfo.dmsVersion = VERSION;
936     return true;
937 }
938 
FetchDeviceHandler(const std::string & deviceId)939 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
940     const std::string& deviceId)
941 {
942     if (!IsDeviceIdValidated(deviceId)) {
943         HILOGW("FetchDeviceHandler device:%{public}s offline.", GetAnonymStr(deviceId).c_str());
944         return nullptr;
945     }
946 
947     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
948     if (uuid.empty()) {
949         HILOGE("FetchDeviceHandler uuid empty!");
950         return nullptr;
951     }
952 
953     auto iter = deviceHandle_.find(uuid);
954     if (iter != deviceHandle_.end()) {
955         return iter->second;
956     }
957 
958     auto anonyUuid = GetAnonymStr(uuid);
959     auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
960     auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
961     deviceHandle_.emplace(uuid, handler);
962     return handler;
963 }
964 
OnRemoteDied(const wptr<IRemoteObject> & remote)965 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
966 {
967     HILOGI("OnRemoteDied received died notify!");
968     DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
969 }
970 
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)971 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
972 {
973     sptr<IRemoteObject> diedRemoted = remote.promote();
974     if (diedRemoted == nullptr) {
975         HILOGW("OnRemoteDmsDied promote failed!");
976         return;
977     }
978     HILOGD("delete diedRemoted");
979     auto remoteDmsDiedFunc = [this, diedRemoted]() {
980         OnRemoteDmsDied(diedRemoted);
981     };
982     if (missionHandler_ != nullptr) {
983         missionHandler_->PostTask(remoteDmsDiedFunc);
984     }
985 }
986 
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)987 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
988     const std::string& localDevId, int32_t retryTimes)
989 {
990     auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
991         bool ret = HasSyncListener(dstDeviceId);
992         if (!ret) {
993             return;
994         }
995         sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
996         if (remoteDms == nullptr) {
997             HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
998             RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
999             return;
1000         }
1001         int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
1002         HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
1003     };
1004     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1005         missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
1006     }
1007 }
1008 
OnMissionListenerDied(const sptr<IRemoteObject> & remote)1009 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
1010 {
1011     HILOGI("OnMissionListenerDied");
1012     std::set<std::string> deviceSet;
1013     {
1014         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
1015         auto iterItem = listenDeviceMap_.begin();
1016         while (iterItem != listenDeviceMap_.end()) {
1017             auto& listenerInfo = iterItem->second;
1018             auto ret = listenerInfo.Find(remote);
1019             if (!ret) {
1020                 ++iterItem;
1021                 continue;
1022             }
1023             if (remote != nullptr) {
1024                 remote->RemoveDeathRecipient(listenerDeath_);
1025             }
1026             listenerInfo.Erase(remote);
1027             if (listenerInfo.Empty()) {
1028                 if (listenerInfo.called) {
1029                     deviceSet.emplace(Str16ToStr8(iterItem->first));
1030                 }
1031                 iterItem = listenDeviceMap_.erase(iterItem);
1032             } else {
1033                 ++iterItem;
1034             }
1035         }
1036     }
1037     for (auto& devId : deviceSet) {
1038         StopSyncRemoteMissions(devId, false);
1039     }
1040 }
1041 
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)1042 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
1043 {
1044     HILOGI("OnRemoteDmsDied");
1045     std::string devId;
1046     {
1047         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
1048         for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
1049             if (iter->second->AsObject() == remote && iter->second->AsObject() != nullptr) {
1050                 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
1051                 devId = iter->first;
1052                 remoteDmsMap_.erase(iter);
1053                 break;
1054             }
1055         }
1056     }
1057     if (devId.empty()) {
1058         return;
1059     }
1060     bool ret = HasSyncListener(devId);
1061     if (ret) {
1062         std::string localDeviceId;
1063         if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1064             return;
1065         }
1066         RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1067     }
1068 }
1069 
NotifyDmsProxyProcessDied()1070 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1071 {
1072     HILOGI("NotifyDmsProxyProcessDied!");
1073     if (!isRegMissionChange_) {
1074         return;
1075     }
1076     RetryRegisterMissionChange(0);
1077 }
1078 
RetryRegisterMissionChange(int32_t retryTimes)1079 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1080 {
1081     auto remoteDiedFunc = [this, retryTimes]() {
1082         HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1083         if (!isRegMissionChange_) {
1084             return;
1085         }
1086         int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1087         if (ret == ERR_NULL_OBJECT) {
1088             RetryRegisterMissionChange(retryTimes + 1);
1089             HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1090             return;
1091         }
1092         HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1093     };
1094     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1095         missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1096     }
1097 }
1098 
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1099 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1100 {
1101     for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1102         ErrCode errCode = MissionSnapshotChanged(iter->id);
1103         if (errCode != ERR_OK) {
1104             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1105         }
1106     }
1107 }
1108 
MissionSnapshotChanged(int32_t missionId)1109 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1110 {
1111     std::string networkId;
1112     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1113         HILOGE("get local networkId failed!");
1114         return INVALID_PARAMETERS_ERR;
1115     }
1116     AAFwk::MissionSnapshot missionSnapshot;
1117     ErrCode errCode = DistributedSchedAdapter::GetInstance()
1118         .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1119     if (errCode != ERR_OK) {
1120         HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1121         return errCode;
1122     }
1123     Snapshot snapshot;
1124     SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1125     MessageParcel data;
1126     errCode = MissionSnapshotSequence(snapshot, data);
1127     if (errCode != ERR_OK) {
1128         HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1129         return errCode;
1130     }
1131     size_t len = data.GetReadableBytes();
1132     const uint8_t* byteStream = data.ReadBuffer(len);
1133     if (byteStream == nullptr) {
1134         HILOGE("Failed to read length.");
1135         return INVALID_PARAMETERS_ERR;
1136     }
1137     errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1138     return errCode;
1139 }
1140 
MissionSnapshotDestroyed(int32_t missionId)1141 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1142 {
1143     std::string networkId;
1144     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1145         HILOGE("get local networkId failed!");
1146         return INVALID_PARAMETERS_ERR;
1147     }
1148     ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1149     return errCode;
1150 }
1151 
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1152 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1153 {
1154     bool ret = snapshot.WriteSnapshotInfo(data);
1155     if (!ret) {
1156         HILOGE("WriteSnapshotInfo failed!");
1157         return ERR_FLATTEN_OBJECT;
1158     }
1159     ret = snapshot.WritePixelMap(data);
1160     if (!ret) {
1161         HILOGE("WritePixelMap failed!");
1162         return ERR_FLATTEN_OBJECT;
1163     }
1164     return ERR_OK;
1165 }
1166 
OnDnetDied()1167 void DistributedSchedMissionManager::OnDnetDied()
1168 {
1169     auto dnetDiedFunc = [this]() {
1170         HILOGI("OnDnetDied");
1171         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1172         if (!isRegMissionChange_) {
1173             return;
1174         }
1175         remoteSyncDeviceSet_.clear();
1176         DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1177         isRegMissionChange_ = false;
1178     };
1179     if (missionHandler_ != nullptr) {
1180         missionHandler_->PostTask(dnetDiedFunc);
1181     }
1182 }
1183 } // namespace DistributedSchedule
1184 } // namespace OHOS
1185