• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mission/distributed_sched_mission_manager.h"
17 
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21 
22 #include "datetime_ex.h"
23 #include "distributed_sched_adapter.h"
24 #include "dtbschedmgr_device_info_storage.h"
25 #include "dtbschedmgr_log.h"
26 #include "ipc_skeleton.h"
27 #include "iservice_registry.h"
28 #include "mission/mission_changed_notify.h"
29 #include "mission/mission_constant.h"
30 #include "mission/mission_info_converter.h"
31 #include "mission/snapshot_converter.h"
32 #include "nlohmann/json.hpp"
33 #include "string_ex.h"
34 #include "system_ability_definition.h"
35 
36 namespace OHOS {
37 namespace DistributedSchedule {
38 namespace {
39 const std::string TAG = "DistributedSchedMissionManager";
40 constexpr size_t MAX_CACHED_ITEM = 10;
41 constexpr int32_t MAX_RETRY_TIMES = 15;
42 constexpr int32_t RETRY_DELAYED = 2000;
43 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
44 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
45 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
46 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
47 }
48 namespace Mission {
49 constexpr int32_t GET_MAX_MISSIONS = 20;
50 } // Mission
51 using namespace std::chrono;
52 using namespace Constants::Mission;
53 using namespace OHOS::DistributedKv;
54 
55 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
56 
Init()57 void DistributedSchedMissionManager::Init()
58 {
59     listenerDeath_ = new ListenerDeathRecipient();
60     remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
61     auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
62     missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
63     auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
64     updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
65     missonChangeListener_ = new DistributedMissionChangeListener();
66     auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
67     missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
68 }
69 
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)70 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
71     int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
72 {
73     HILOGI("start!");
74     if (!IsDeviceIdValidated(deviceId)) {
75         return INVALID_PARAMETERS_ERR;
76     }
77     if (numMissions <= 0) {
78         HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
79         return INVALID_PARAMETERS_ERR;
80     }
81     std::vector<DstbMissionInfo> dstbMissionInfoSet;
82     int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
83     if (ret != ERR_OK) {
84         HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
85         return ret;
86     }
87     return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
88 }
89 
GetRemoteDms(const std::string & deviceId)90 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
91 {
92     if (deviceId.empty()) {
93         HILOGE("GetRemoteDms remoteDeviceId is empty");
94         return nullptr;
95     }
96     int64_t begin = GetTickCount();
97     {
98         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
99         auto iter = remoteDmsMap_.find(deviceId);
100         if (iter != remoteDmsMap_.end()) {
101             auto object = iter->second;
102             if (object != nullptr) {
103                 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
104                     GetTickCount() - begin);
105                 return object;
106             }
107         }
108     }
109     HILOGD("GetRemoteDms connect deviceid is %s", deviceId.c_str());
110     auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
111     if (samgr == nullptr) {
112         HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
113         return nullptr;
114     }
115     auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
116     if (object == nullptr) {
117         HILOGE("GetRemoteDms failed to get dms for remote device:%{public}s!",
118             DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
119         return nullptr;
120     }
121     auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
122     HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
123     sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
124     {
125         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
126         auto iter = remoteDmsMap_.find(deviceId);
127         if (iter != remoteDmsMap_.end()) {
128             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
129         }
130         remoteDmsMap_[deviceId] = remoteDmsObj;
131     }
132     HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
133     return remoteDmsObj;
134 }
135 
IsDeviceIdValidated(const std::string & deviceId)136 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
137 {
138     if (deviceId.empty()) {
139         HILOGE("IsDeviceIdValidated deviceId is empty!");
140         return false;
141     }
142     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
143         HILOGW("IsDeviceIdValidated device offline.");
144         return false;
145     }
146 
147     return true;
148 }
149 
NotifyRemoteDied(const wptr<IRemoteObject> & remote)150 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
151 {
152     if (distributedDataStorage_ == nullptr) {
153         HILOGE("DistributedDataStorage null!");
154         return;
155     }
156     distributedDataStorage_->NotifyRemoteDied(remote);
157 }
158 
InitDataStorage()159 int32_t DistributedSchedMissionManager::InitDataStorage()
160 {
161     if (distributedDataStorage_ == nullptr) {
162         distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
163     }
164     if (!distributedDataStorage_->Init()) {
165         HILOGE("InitDataStorage DistributedDataStorage init failed!");
166         return ERR_NULL_OBJECT;
167     }
168     return ERR_NONE;
169 }
170 
StopDataStorage()171 int32_t DistributedSchedMissionManager::StopDataStorage()
172 {
173     if (distributedDataStorage_ == nullptr) {
174         HILOGE("StopDataStorage DistributedDataStorage null!");
175         return ERR_NULL_OBJECT;
176     }
177     if (!distributedDataStorage_->Stop()) {
178         HILOGE("StopDataStorage DistributedDataStorage stop failed!");
179         return ERR_NULL_OBJECT;
180     }
181     return ERR_NONE;
182 }
183 
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)184 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
185     const uint8_t* byteStream, size_t len)
186 {
187     if (distributedDataStorage_ == nullptr) {
188         HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
189         return ERR_NULL_OBJECT;
190     }
191     if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
192         HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
193         return INVALID_PARAMETERS_ERR;
194     }
195     return ERR_NONE;
196 }
197 
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)198 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
199 {
200     if (distributedDataStorage_ == nullptr) {
201         HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
202         return ERR_NULL_OBJECT;
203     }
204     if (!distributedDataStorage_->Delete(deviceId, missionId)) {
205         HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
206         return INVALID_PARAMETERS_ERR;
207     }
208     return ERR_NONE;
209 }
210 
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)211 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
212     std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
213 {
214     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
215     if (uuid.empty()) {
216         HILOGE("uuid is empty!");
217         return INVALID_PARAMETERS_ERR;
218     }
219     std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
220     if (snapshotPtr != nullptr) {
221         HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from cache successful!",
222             DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
223         SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
224         return ERR_NONE;
225     }
226     if (distributedDataStorage_ == nullptr) {
227         HILOGE("DistributedDataStorage null!");
228         return ERR_NULL_OBJECT;
229     }
230     DistributedKv::Value value;
231     bool ret = distributedDataStorage_->Query(networkId, missionId, value);
232     if (!ret) {
233         HILOGE("DistributedDataStorage query failed!");
234         return INVALID_PARAMETERS_ERR;
235     }
236     snapshotPtr = Snapshot::Create(value.Data());
237     if (snapshotPtr == nullptr) {
238         HILOGE("snapshot create failed!");
239         return ERR_NULL_OBJECT;
240     }
241     HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from DistributedDB successful!",
242         DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
243     SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
244     return ERR_NONE;
245 }
246 
DeviceOnlineNotify(const std::string & networkId)247 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
248 {
249     if (networkId.empty()) {
250         HILOGW("DeviceOnlineNotify networkId empty!");
251         return;
252     }
253 
254     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
255     if (missionHandler_ != nullptr) {
256         HILOGI("DeviceOnlineNotify RemoveTask");
257         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
258     }
259 }
260 
DeviceOfflineNotify(const std::string & networkId)261 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
262 {
263     if (networkId.empty()) {
264         HILOGW("DeviceOfflineNotify networkId empty!");
265         return;
266     }
267     StopSyncMissionsFromRemote(networkId);
268     CleanMissionResources(networkId);
269     {
270         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
271         auto iter = remoteDmsMap_.find(networkId);
272         if (iter != remoteDmsMap_.end()) {
273             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
274             remoteDmsMap_.erase(iter);
275         }
276     }
277     HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s",
278         DnetworkAdapter::AnonymizeNetworkId(networkId).c_str());
279 }
280 
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283     if (distributedDataStorage_ == nullptr) {
284         HILOGE("DeleteDataStorage DistributedDataStorage null!");
285         return;
286     }
287     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288     auto callback = [this, uuid, deviceId]() {
289         if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290             HILOGE("DeleteDataStorage storage delete failed!");
291         } else {
292             HILOGI("DeleteDataStorage storage delete successfully!");
293         }
294     };
295     if (isDelayed) {
296         if (missionHandler_ != nullptr) {
297             HILOGI("DeleteDataStorage PostTask");
298             missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299         }
300     } else {
301         if (missionHandler_ != nullptr) {
302             HILOGI("DeleteDataStorage RemoveTask");
303             missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304         }
305         callback();
306     }
307 }
308 
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310     const sptr<IRemoteObject>& listener)
311 {
312     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313     if (uuid.empty()) {
314         HILOGE("uuid is empty!");
315         return INVALID_PARAMETERS_ERR;
316     }
317     if (missionHandler_ != nullptr) {
318         HILOGI("RemoveTask");
319         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
320     }
321     if (listener == nullptr) {
322         return INVALID_PARAMETERS_ERR;
323     }
324     std::string localDeviceId;
325     std::string remoteDeviceId = Str16ToStr8(devId);
326     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
327         || localDeviceId == remoteDeviceId) {
328         HILOGE("check deviceId failed!");
329         return INVALID_PARAMETERS_ERR;
330     }
331     {
332         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
333         auto& listenerInfo = listenDeviceMap_[devId];
334         if (!listenerInfo.Emplace(listener)) {
335             HILOGW("RegisterSyncListener listener has already inserted!");
336             return ERR_NONE;
337         }
338         bool ret = listener->AddDeathRecipient(listenerDeath_);
339         if (!ret) {
340             HILOGW("RegisterSyncListener AddDeathRecipient failed!");
341         }
342         if (listenerInfo.Size() > 1) {
343             HILOGI("RegisterMissionListener not notify remote DMS!");
344             return ERR_NONE;
345         }
346     }
347     return ERR_NONE;
348 }
349 
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)350 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
351     const std::string& localDevId)
352 {
353     std::u16string devId = Str8ToStr16(dstDevId);
354     {
355         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
356         auto iterItem = listenDeviceMap_.find(devId);
357         if (iterItem == listenDeviceMap_.end()) {
358             return ERR_NONE;
359         }
360         bool callFlag = iterItem->second.called;
361         if (callFlag) {
362             HILOGI("StartSyncRemoteMissions already called!");
363             return ERR_NONE;
364         }
365     }
366     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
367     if (remoteDms == nullptr) {
368         HILOGE("get remoteDms failed!");
369         RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
370         return GET_REMOTE_DMS_FAIL;
371     }
372     int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
373     if (ret == ERR_NONE) {
374         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
375         auto iterItem = listenDeviceMap_.find(devId);
376         if (iterItem != listenDeviceMap_.end()) {
377             iterItem->second.called = true;
378         }
379     }
380     return ret;
381 }
382 
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)383 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
384     const sptr<IDistributedSched>& remoteDms)
385 {
386     std::vector<DstbMissionInfo> missionInfos;
387     CallerInfo callerInfo;
388     if (!GenerateCallerInfo(callerInfo)) {
389         return GET_LOCAL_DEVICE_ERR;
390     }
391     int64_t begin = GetTickCount();
392     int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
393     HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
394         ret, GetTickCount() - begin);
395     if (ret == ERR_NONE) {
396         RebornMissionCache(dstDevId, missionInfos);
397     }
398     return ret;
399 }
400 
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)401 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
402     const sptr<IRemoteObject>& listener)
403 {
404     if (listener == nullptr) {
405         return INVALID_PARAMETERS_ERR;
406     }
407     if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
408         return INVALID_PARAMETERS_ERR;
409     }
410     std::string localDeviceId;
411     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
412         || localDeviceId == Str16ToStr8(devId)) {
413         HILOGE("check deviceId fail");
414         return INVALID_PARAMETERS_ERR;
415     }
416     {
417         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
418         auto iterItem = listenDeviceMap_.find(devId);
419         if (iterItem == listenDeviceMap_.end()) {
420             return ERR_NONE;
421         }
422         auto& listenerInfo = iterItem->second;
423         auto ret = listenerInfo.Find(listener);
424         if (!ret) {
425             HILOGI("listener not registered!");
426             return ERR_NONE;
427         }
428         listener->RemoveDeathRecipient(listenerDeath_);
429         listenerInfo.Erase(listener);
430         if (!listenerInfo.Empty()) {
431             return ERR_NONE;
432         }
433         listenDeviceMap_.erase(iterItem);
434     }
435     return ERR_NONE;
436 }
437 
CleanMissionResources(const std::string & networkId)438 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
439 {
440     {
441         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
442         auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
443         if (iterDevice == listenDeviceMap_.end()) {
444             return;
445         }
446         auto& listenerInfo = iterDevice->second;
447         for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
448             if (listener != nullptr) {
449                 listener->RemoveDeathRecipient(listenerDeath_);
450             }
451         }
452         listenDeviceMap_.erase(iterDevice);
453     }
454     StopSyncRemoteMissions(networkId, true);
455 }
456 
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)457 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
458     bool offline, bool exit)
459 {
460     CleanMissionCache(dstDevId);
461     DeleteCachedSnapshotInfo(dstDevId);
462     DeleteDataStorage(dstDevId, true);
463 
464     if (offline) {
465         return ERR_NONE;
466     }
467     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
468     if (remoteDms == nullptr) {
469         HILOGE("DMS get remoteDms failed");
470         return GET_REMOTE_DMS_FAIL;
471     }
472 
473     CallerInfo callerInfo;
474     if (!GenerateCallerInfo(callerInfo)) {
475         return GET_LOCAL_DEVICE_ERR;
476     }
477     int64_t begin = GetTickCount();
478     int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
479     HILOGI("[PerformanceTest] ret:%d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
480     return ret;
481 }
482 
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)483 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
484     int64_t tag)
485 {
486     std::string localDeviceId;
487     if (!IsDeviceIdValidated(dstDevId)) {
488         return INVALID_PARAMETERS_ERR;
489     }
490     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
491         || (dstDevId == localDeviceId)) {
492         HILOGE("check deviceId fail");
493         return INVALID_PARAMETERS_ERR;
494     }
495     HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
496         DnetworkAdapter::AnonymizeNetworkId(dstDevId).c_str(),
497         DnetworkAdapter::AnonymizeNetworkId(localDeviceId).c_str());
498     auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
499     if (ret != ERR_NONE) {
500         HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
501         return ret;
502     }
503     return ERR_NONE;
504 }
505 
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)506 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
507     std::vector<DstbMissionInfo>& missionInfoSet)
508 {
509     auto deviceId = callerInfo.sourceDeviceId;
510     HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
511     {
512         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
513         remoteSyncDeviceSet_.emplace(deviceId);
514     }
515     int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
516         missionInfoSet);
517     auto func = [this, missionInfoSet]() {
518         HILOGD("RegisterMissionListener called.");
519         if (!isRegMissionChange_) {
520             int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
521             if (ret == ERR_OK) {
522                 isRegMissionChange_ = true;
523             }
524             InitAllSnapshots(missionInfoSet);
525         }
526     };
527     if (!missionHandler_->PostTask(func)) {
528         HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
529     }
530     return result;
531 }
532 
StopSyncMissionsFromRemote(const std::string & networkId)533 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
534 {
535     HILOGD(" %{private}s!", networkId.c_str());
536     {
537         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
538         remoteSyncDeviceSet_.erase(networkId);
539         if (remoteSyncDeviceSet_.empty()) {
540             auto func = [this]() {
541                 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
542                 if (ret == ERR_OK) {
543                     isRegMissionChange_ = false;
544                 }
545             };
546             if (!missionHandler_->PostTask(func)) {
547                 HILOGE("post UnRegisterMissionListener Task failed");
548             }
549         }
550     }
551 }
552 
NeedSyncDevice(const std::string & deviceId)553 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
554 {
555     if (deviceId.empty()) {
556         HILOGD("deviceId empty!");
557         return false;
558     }
559     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
560     if (remoteSyncDeviceSet_.count(deviceId) == 0) {
561         return false;
562     }
563     return true;
564 }
565 
HasSyncListener(const std::string & networkId)566 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
567 {
568     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
569     auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
570     if (iter != listenDeviceMap_.end()) {
571         return iter->second.called;
572     }
573     return false;
574 }
575 
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)576 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
577 {
578     std::u16string u16DevId = Str8ToStr16(networkId);
579     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
580     auto iter = listenDeviceMap_.find(u16DevId);
581     if (iter == listenDeviceMap_.end()) {
582         return;
583     }
584     auto& listenerInfo = iter->second;
585     for (auto& listener : listenerInfo.listenerSet) {
586         MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
587     }
588 }
589 
OnRemoteDied(const wptr<IRemoteObject> & remote)590 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
591 {
592     HILOGD("OnRemoteDied!");
593     sptr<IRemoteObject> listener = remote.promote();
594     if (listener == nullptr) {
595         HILOGW("listener is null");
596         return;
597     }
598     auto remoteDiedFunc = [this, listener]() {
599         OnMissionListenerDied(listener);
600     };
601     if (missionHandler_ != nullptr) {
602         missionHandler_->PostTask(remoteDiedFunc);
603     }
604 }
605 
OnRemoteDied(const wptr<IRemoteObject> & remote)606 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
607 {
608     DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
609 }
610 
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)611 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
612     std::unique_ptr<Snapshot> snapshot)
613 {
614     if (deviceId.empty() || snapshot == nullptr) {
615         HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
616         return;
617     }
618     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
619     std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
620     auto iter = cachedSnapshotInfos_.find(keyInfo);
621     if (iter != cachedSnapshotInfos_.end()) {
622         if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
623             return;
624         }
625     }
626 
627     if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
628         int64_t oldest = -1;
629         auto iterOldest = cachedSnapshotInfos_.end();
630         for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
631             if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
632                 oldest = iterItem->second->GetLastAccessTime();
633                 iterOldest = iterItem;
634             }
635         }
636         if (iterOldest != cachedSnapshotInfos_.end()) {
637             cachedSnapshotInfos_.erase(iterOldest);
638         }
639     }
640     cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
641 }
642 
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)643 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
644     int32_t missionId)
645 {
646     if (deviceId.empty()) {
647         HILOGW("DequeueCachedSnapshotInfo invalid input param!");
648         return nullptr;
649     }
650     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
651     auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
652     if (iter != cachedSnapshotInfos_.end()) {
653         std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
654         snapshot->UpdateLastAccessTime(GetTickCount());
655         iter->second = nullptr;
656         cachedSnapshotInfos_.erase(iter);
657         return snapshot;
658     }
659     return nullptr;
660 }
661 
DeleteCachedSnapshotInfo(const std::string & networkId)662 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
663 {
664     if (networkId.empty()) {
665         HILOGW("networkId empty!");
666         return;
667     }
668     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
669     if (uuid.empty()) {
670         HILOGW("uuid empty!");
671         return;
672     }
673     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
674     auto iter = cachedSnapshotInfos_.begin();
675     while (iter != cachedSnapshotInfos_.end()) {
676         if (iter->first.find(uuid) != std::string::npos) {
677             iter = cachedSnapshotInfos_.erase(iter);
678         } else {
679             ++iter;
680         }
681     }
682 }
683 
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)684 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
685     std::vector<DstbMissionInfo>& missionInfoSet)
686 {
687     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
688     if (uuid.empty()) {
689         HILOGE("uuid empty!");
690         return INVALID_PARAMETERS_ERR;
691     }
692     std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
693     auto iter = deviceMissionInfos_.find(uuid);
694     if (iter == deviceMissionInfos_.end()) {
695         HILOGE("can not find uuid, deviceId: %{public}s!",
696             DnetworkAdapter::AnonymizeNetworkId(srcId).c_str());
697         return ERR_NULL_OBJECT;
698     }
699 
700     // get at most numMissions missions
701     int32_t actualNums = static_cast<int32_t>((iter->second).size());
702     if (actualNums < 0) {
703         HILOGE("invalid size!");
704         return ERR_NULL_OBJECT;
705     }
706     missionInfoSet.assign((iter->second).begin(),
707         (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
708     return ERR_NONE;
709 }
710 
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)711 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
712     const std::vector<DstbMissionInfo>& missionInfoSet)
713 {
714     HILOGI("start! deviceId is %{public}s",
715         DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
716     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
717     if (uuid.empty()) {
718         HILOGE("uuid empty!");
719         return;
720     }
721     {
722         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
723         deviceMissionInfos_[uuid] = missionInfoSet;
724     }
725     HILOGI("RebornMissionCache end!");
726 }
727 
CleanMissionCache(const std::string & deviceId)728 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
729 {
730     HILOGI("CleanMissionCache start! deviceId is %{public}s",
731         DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
732     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
733     if (uuid.empty()) {
734         HILOGE("CleanMissionCache uuid empty!");
735         return;
736     }
737     {
738         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
739         deviceMissionInfos_.erase(uuid);
740     }
741     HILOGI("CleanMissionCache end!");
742 }
743 
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)744 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
745     const std::vector<DstbMissionInfo>& missionInfoSet)
746 {
747     HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
748     std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
749     RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
750     {
751         HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
752         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
753         auto iter = listenDeviceMap_.find(u16DevId);
754         if (iter == listenDeviceMap_.end()) {
755             HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
756             return INVALID_PARAMETERS_ERR;
757         }
758         auto& listenerSet = iter->second.listenerSet;
759         auto notifyChanged = [listenerSet, u16DevId] () {
760             for (const auto& listener : listenerSet) {
761                 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
762             }
763         };
764         if (missionHandler_ != nullptr) {
765             missionHandler_->PostTask(notifyChanged);
766             HILOGI("NotifyMissionsChangedFromRemote end!");
767             return ERR_NONE;
768         }
769     }
770     return INVALID_PARAMETERS_ERR;
771 }
772 
NotifyLocalMissionsChanged()773 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
774 {
775     auto func = [this]() {
776         HILOGI("NotifyLocalMissionsChanged");
777         std::vector<DstbMissionInfo> missionInfos;
778         int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
779             missionInfos);
780         if (ret == ERR_OK) {
781             int32_t result = NotifyMissionsChangedToRemote(missionInfos);
782             HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
783         }
784     };
785     if (!missionChangeHandler_->PostTask(func)) {
786         HILOGE("postTask failed");
787     }
788 }
789 
NotifyMissionSnapshotCreated(int32_t missionId)790 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
791 {
792     auto func = [this, missionId]() {
793         HILOGD("called.");
794         ErrCode errCode = MissionSnapshotChanged(missionId);
795         if (errCode != ERR_OK) {
796             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
797         }
798     };
799     if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
800         HILOGE("post MissionSnapshotChanged delay Task failed");
801     }
802 }
803 
NotifyMissionSnapshotChanged(int32_t missionId)804 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
805 {
806     auto func = [this, missionId]() {
807         HILOGD("called.");
808         ErrCode errCode = MissionSnapshotChanged(missionId);
809         if (errCode != ERR_OK) {
810             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
811         }
812     };
813     if (!missionChangeHandler_->PostTask(func)) {
814         HILOGE("post MissionSnapshotChanged Task failed");
815     }
816 }
817 
NotifyMissionSnapshotDestroyed(int32_t missionId)818 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
819 {
820     auto func = [this, missionId]() {
821         HILOGD("called.");
822         ErrCode errCode = MissionSnapshotDestroyed(missionId);
823         if (errCode != ERR_OK) {
824             HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
825         }
826     };
827     if (!missionChangeHandler_->PostTask(func)) {
828         HILOGE("post MissionSnapshotDestroyed Task failed");
829     }
830 }
831 
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)832 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
833     const std::vector<DstbMissionInfo> &missionInfoSet)
834 {
835     CallerInfo callerInfo;
836     if (!GenerateCallerInfo(callerInfo)) {
837         return GET_LOCAL_DEVICE_ERR;
838     }
839     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
840     for (const auto& destDeviceId : remoteSyncDeviceSet_) {
841         auto handler = FetchDeviceHandler(destDeviceId);
842         if (handler == nullptr) {
843             HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
844             continue;
845         }
846         auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
847             NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
848         };
849         if (!handler->PostTask(callback)) {
850             HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
851             return ERR_NULL_OBJECT;
852         }
853     }
854 
855     return ERR_NONE;
856 }
857 
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)858 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
859     const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
860 {
861     sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
862     if (remoteDms == nullptr) {
863         HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
864         return;
865     }
866     int64_t begin = GetTickCount();
867     int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
868     HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
869         result, GetTickCount() - begin);
870 }
871 
GenerateCallerInfo(CallerInfo & callerInfo)872 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
873 {
874     std::string localUuid;
875     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
876         HILOGE("get local uuid failed!");
877         return false;
878     }
879     callerInfo.uid = IPCSkeleton::GetCallingUid();
880     callerInfo.pid = IPCSkeleton::GetCallingPid();
881     callerInfo.callerType = CALLER_TYPE_HARMONY;
882     callerInfo.sourceDeviceId = localUuid;
883     callerInfo.dmsVersion = VERSION;
884     return true;
885 }
886 
FetchDeviceHandler(const std::string & deviceId)887 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
888     const std::string& deviceId)
889 {
890     if (!IsDeviceIdValidated(deviceId)) {
891         HILOGW("FetchDeviceHandler device:%{public}s offline.",
892             DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
893         return nullptr;
894     }
895 
896     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
897     if (uuid.empty()) {
898         HILOGE("FetchDeviceHandler uuid empty!");
899         return nullptr;
900     }
901 
902     auto iter = deviceHandle_.find(uuid);
903     if (iter != deviceHandle_.end()) {
904         return iter->second;
905     }
906 
907     auto anonyUuid = DnetworkAdapter::AnonymizeNetworkId(uuid);
908     auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
909     auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
910     deviceHandle_.emplace(uuid, handler);
911     return handler;
912 }
913 
OnRemoteDied(const wptr<IRemoteObject> & remote)914 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
915 {
916     HILOGI("OnRemoteDied received died notify!");
917     DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
918 }
919 
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)920 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
921 {
922     sptr<IRemoteObject> diedRemoted = remote.promote();
923     if (diedRemoted == nullptr) {
924         HILOGW("OnRemoteDmsDied promote failed!");
925         return;
926     }
927     HILOGD("delete diedRemoted");
928     auto remoteDmsDiedFunc = [this, diedRemoted]() {
929         OnRemoteDmsDied(diedRemoted);
930     };
931     if (missionHandler_ != nullptr) {
932         missionHandler_->PostTask(remoteDmsDiedFunc);
933     }
934 }
935 
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)936 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
937     const std::string& localDevId, int32_t retryTimes)
938 {
939     auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
940         bool ret = HasSyncListener(dstDeviceId);
941         if (!ret) {
942             return;
943         }
944         sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
945         if (remoteDms == nullptr) {
946             HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
947             RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
948             return;
949         }
950         int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
951         HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
952     };
953     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
954         missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
955     }
956 }
957 
OnMissionListenerDied(const sptr<IRemoteObject> & remote)958 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
959 {
960     HILOGI("OnMissionListenerDied");
961     std::set<std::string> deviceSet;
962     {
963         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
964         auto iterItem = listenDeviceMap_.begin();
965         while (iterItem != listenDeviceMap_.end()) {
966             auto& listenerInfo = iterItem->second;
967             auto ret = listenerInfo.Find(remote);
968             if (!ret) {
969                 ++iterItem;
970                 continue;
971             }
972             remote->RemoveDeathRecipient(listenerDeath_);
973             listenerInfo.Erase(remote);
974             if (listenerInfo.Empty()) {
975                 if (listenerInfo.called) {
976                     deviceSet.emplace(Str16ToStr8(iterItem->first));
977                 }
978                 iterItem = listenDeviceMap_.erase(iterItem);
979             } else {
980                 ++iterItem;
981             }
982         }
983     }
984     for (auto& devId : deviceSet) {
985         StopSyncRemoteMissions(devId, false);
986     }
987 }
988 
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)989 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
990 {
991     HILOGI("OnRemoteDmsDied");
992     std::string devId;
993     {
994         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
995         for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
996             if (iter->second->AsObject() == remote) {
997                 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
998                 devId = iter->first;
999                 remoteDmsMap_.erase(iter);
1000                 break;
1001             }
1002         }
1003     }
1004     if (devId.empty()) {
1005         return;
1006     }
1007     bool ret = HasSyncListener(devId);
1008     if (ret) {
1009         std::string localDeviceId;
1010         if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1011             return;
1012         }
1013         RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1014     }
1015 }
1016 
NotifyDmsProxyProcessDied()1017 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1018 {
1019     HILOGI("NotifyDmsProxyProcessDied!");
1020     if (!isRegMissionChange_) {
1021         return;
1022     }
1023     RetryRegisterMissionChange(0);
1024 }
1025 
RetryRegisterMissionChange(int32_t retryTimes)1026 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1027 {
1028     auto remoteDiedFunc = [this, retryTimes]() {
1029         HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1030         if (!isRegMissionChange_) {
1031             return;
1032         }
1033         int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1034         if (ret == ERR_NULL_OBJECT) {
1035             RetryRegisterMissionChange(retryTimes + 1);
1036             HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1037             return;
1038         }
1039         HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1040     };
1041     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1042         missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1043     }
1044 }
1045 
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1046 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1047 {
1048     for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1049         ErrCode errCode = MissionSnapshotChanged(iter->id);
1050         if (errCode != ERR_OK) {
1051             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1052         }
1053     }
1054 }
1055 
MissionSnapshotChanged(int32_t missionId)1056 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1057 {
1058     std::string networkId;
1059     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1060         HILOGE("get local networkId failed!");
1061         return INVALID_PARAMETERS_ERR;
1062     }
1063     AAFwk::MissionSnapshot missionSnapshot;
1064     ErrCode errCode = DistributedSchedAdapter::GetInstance()
1065         .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1066     if (errCode != ERR_OK) {
1067         HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1068         return errCode;
1069     }
1070     Snapshot snapshot;
1071     SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1072     MessageParcel data;
1073     errCode = MissionSnapshotSequence(snapshot, data);
1074     if (errCode != ERR_OK) {
1075         HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1076         return errCode;
1077     }
1078     size_t len = data.GetReadableBytes();
1079     const uint8_t* byteStream = data.ReadBuffer(len);
1080     errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1081     return errCode;
1082 }
1083 
MissionSnapshotDestroyed(int32_t missionId)1084 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1085 {
1086     std::string networkId;
1087     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1088         HILOGE("get local networkId failed!");
1089         return INVALID_PARAMETERS_ERR;
1090     }
1091     ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1092     return errCode;
1093 }
1094 
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1095 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1096 {
1097     bool ret = snapshot.WriteSnapshotInfo(data);
1098     if (!ret) {
1099         HILOGE("WriteSnapshotInfo failed!");
1100         return ERR_FLATTEN_OBJECT;
1101     }
1102     ret = snapshot.WritePixelMap(data);
1103     if (!ret) {
1104         HILOGE("WritePixelMap failed!");
1105         return ERR_FLATTEN_OBJECT;
1106     }
1107     return ERR_OK;
1108 }
1109 
OnDnetDied()1110 void DistributedSchedMissionManager::OnDnetDied()
1111 {
1112     auto dnetDiedFunc = [this]() {
1113         HILOGI("OnDnetDied");
1114         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1115         if (!isRegMissionChange_) {
1116             return;
1117         }
1118         remoteSyncDeviceSet_.clear();
1119         DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1120         isRegMissionChange_ = false;
1121     };
1122     if (missionHandler_ != nullptr) {
1123         missionHandler_->PostTask(dnetDiedFunc);
1124     }
1125 }
1126 } // namespace DistributedSchedule
1127 } // namespace OHOS
1128