• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mission/distributed_sched_mission_manager.h"
17 
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21 
22 #include "datetime_ex.h"
23 #include "distributed_sched_adapter.h"
24 #include "dtbschedmgr_device_info_storage.h"
25 #include "dtbschedmgr_log.h"
26 #include "ipc_skeleton.h"
27 #include "iservice_registry.h"
28 #include "mission/mission_changed_notify.h"
29 #include "mission/mission_constant.h"
30 #include "mission/mission_info_converter.h"
31 #include "mission/snapshot_converter.h"
32 #include "nlohmann/json.hpp"
33 #include "string_ex.h"
34 #include "system_ability_definition.h"
35 
36 namespace OHOS {
37 namespace DistributedSchedule {
38 namespace {
39 const std::string TAG = "DistributedSchedMissionManager";
40 constexpr size_t MAX_CACHED_ITEM = 10;
41 constexpr int32_t MAX_RETRY_TIMES = 15;
42 constexpr int32_t RETRY_DELAYED = 2000;
43 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
44 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
45 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
46 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
47 }
48 namespace Mission {
49 constexpr int32_t GET_MAX_MISSIONS = 20;
50 } // Mission
51 using namespace std::chrono;
52 using namespace Constants::Mission;
53 using namespace OHOS::DistributedKv;
54 
55 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
56 
Init()57 void DistributedSchedMissionManager::Init()
58 {
59     listenerDeath_ = new ListenerDeathRecipient();
60     remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
61     auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
62     missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
63     auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
64     updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
65     missonChangeListener_ = new DistributedMissionChangeListener();
66     auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
67     missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
68 }
69 
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfos)70 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
71     int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfos)
72 {
73     HILOGI("start!");
74     if (!IsDeviceIdValidated(deviceId)) {
75         return INVALID_PARAMETERS_ERR;
76     }
77     if (numMissions <= 0) {
78         HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
79         return INVALID_PARAMETERS_ERR;
80     }
81     std::vector<DstbMissionInfo> dstbMissionInfos;
82     int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfos);
83     if (ret != ERR_OK) {
84         HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
85         return ret;
86     }
87     return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfos, missionInfos);
88 }
89 
GetRemoteDms(const std::string & deviceId)90 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
91 {
92     int64_t begin = GetTickCount();
93     if (deviceId.empty()) {
94         HILOGE("GetRemoteDms remoteDeviceId is empty");
95         return nullptr;
96     }
97     {
98         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
99         auto iter = remoteDmsMap_.find(deviceId);
100         if (iter != remoteDmsMap_.end()) {
101             auto object = iter->second;
102             if (object != nullptr) {
103                 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
104                     GetTickCount() - begin);
105                 return object;
106             }
107         }
108     }
109     HILOGD("GetRemoteDms connect deviceid is %s", deviceId.c_str());
110     auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
111     if (samgr == nullptr) {
112         HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
113         return nullptr;
114     }
115     auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
116     if (object == nullptr) {
117         HILOGE("GetRemoteDms failed to get dms for remote device:%{public}s!",
118             DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
119         return nullptr;
120     }
121     auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
122     HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
123     sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
124     {
125         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
126         auto iter = remoteDmsMap_.find(deviceId);
127         if (iter != remoteDmsMap_.end()) {
128             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
129         }
130         remoteDmsMap_[deviceId] = remoteDmsObj;
131     }
132     HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
133     return remoteDmsObj;
134 }
135 
IsDeviceIdValidated(const std::string & deviceId)136 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
137 {
138     if (deviceId.empty()) {
139         HILOGE("IsDeviceIdValidated deviceId is empty!");
140         return false;
141     }
142     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
143         HILOGW("IsDeviceIdValidated device offline.");
144         return false;
145     }
146 
147     return true;
148 }
149 
NotifyRemoteDied(const wptr<IRemoteObject> & remote)150 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
151 {
152     if (distributedDataStorage_ == nullptr) {
153         HILOGE("DistributedDataStorage null!");
154         return;
155     }
156     distributedDataStorage_->NotifyRemoteDied(remote);
157 }
158 
InitDataStorage()159 int32_t DistributedSchedMissionManager::InitDataStorage()
160 {
161     if (distributedDataStorage_ == nullptr) {
162         distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
163     }
164     if (!distributedDataStorage_->Init()) {
165         HILOGE("InitDataStorage DistributedDataStorage init failed!");
166         return ERR_NULL_OBJECT;
167     }
168     return ERR_NONE;
169 }
170 
StopDataStorage()171 int32_t DistributedSchedMissionManager::StopDataStorage()
172 {
173     if (distributedDataStorage_ == nullptr) {
174         HILOGE("StopDataStorage DistributedDataStorage null!");
175         return ERR_NULL_OBJECT;
176     }
177     if (!distributedDataStorage_->Stop()) {
178         HILOGE("StopDataStorage DistributedDataStorage stop failed!");
179         return ERR_NULL_OBJECT;
180     }
181     return ERR_NONE;
182 }
183 
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)184 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
185     const uint8_t* byteStream, size_t len)
186 {
187     if (distributedDataStorage_ == nullptr) {
188         HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
189         return ERR_NULL_OBJECT;
190     }
191     if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
192         HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
193         return INVALID_PARAMETERS_ERR;
194     }
195     return ERR_NONE;
196 }
197 
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)198 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
199 {
200     if (distributedDataStorage_ == nullptr) {
201         HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
202         return ERR_NULL_OBJECT;
203     }
204     if (!distributedDataStorage_->Delete(deviceId, missionId)) {
205         HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
206         return INVALID_PARAMETERS_ERR;
207     }
208     return ERR_NONE;
209 }
210 
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)211 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
212     std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
213 {
214     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
215     if (uuid.empty()) {
216         HILOGE("uuid is empty!");
217         return INVALID_PARAMETERS_ERR;
218     }
219     std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
220     if (snapshotPtr != nullptr) {
221         HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from cache successful!",
222             DnetworkAdapter::AnonymizeDeviceId(uuid).c_str(), missionId);
223         SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
224         return ERR_NONE;
225     }
226     if (distributedDataStorage_ == nullptr) {
227         HILOGE("DistributedDataStorage null!");
228         return ERR_NULL_OBJECT;
229     }
230     DistributedKv::Value value;
231     bool ret = distributedDataStorage_->Query(networkId, missionId, value);
232     if (!ret) {
233         HILOGE("DistributedDataStorage query failed!");
234         return INVALID_PARAMETERS_ERR;
235     }
236     snapshotPtr = Snapshot::Create(value.Data());
237     if (snapshotPtr == nullptr) {
238         HILOGE("snapshot create failed!");
239         return ERR_NULL_OBJECT;
240     }
241     HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from DistributedDB successful!",
242         DnetworkAdapter::AnonymizeDeviceId(uuid).c_str(), missionId);
243     SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
244     return ERR_NONE;
245 }
246 
DeviceOnlineNotify(const std::string & deviceId)247 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& deviceId)
248 {
249     if (deviceId.empty()) {
250         HILOGW("DeviceOnlineNotify deviceId empty!");
251         return;
252     }
253 
254     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
255     if (missionHandler_ != nullptr) {
256         HILOGI("DeviceOnlineNotify RemoveTask");
257         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
258     }
259 }
260 
DeviceOfflineNotify(const std::string & deviceId)261 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& deviceId)
262 {
263     if (deviceId.empty()) {
264         HILOGW("DeviceOfflineNotify deviceId empty!");
265         return;
266     }
267     StopSyncMissionsFromRemote(deviceId);
268     CleanMissionResources(deviceId);
269     {
270         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
271         auto iter = remoteDmsMap_.find(deviceId);
272         if (iter != remoteDmsMap_.end()) {
273             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
274             remoteDmsMap_.erase(iter);
275         }
276     }
277     HILOGI("DeviceOfflineNotify erase value for deviceId: %{public}s",
278         DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
279 }
280 
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283     if (distributedDataStorage_ == nullptr) {
284         HILOGE("DeleteDataStorage DistributedDataStorage null!");
285         return;
286     }
287     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288     auto callback = [this, uuid, deviceId]() {
289         if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290             HILOGE("DeleteDataStorage storage delete failed!");
291         } else {
292             HILOGI("DeleteDataStorage storage delete successfully!");
293         }
294     };
295     if (isDelayed) {
296         if (missionHandler_ != nullptr) {
297             HILOGI("DeleteDataStorage PostTask");
298             missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299         }
300     } else {
301         if (missionHandler_ != nullptr) {
302             HILOGI("DeleteDataStorage RemoveTask");
303             missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304         }
305         callback();
306     }
307 }
308 
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310     const sptr<IRemoteObject>& listener)
311 {
312     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313     if (missionHandler_ != nullptr) {
314         HILOGI("RemoveTask");
315         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
316     }
317     if (listener == nullptr) {
318         return INVALID_PARAMETERS_ERR;
319     }
320     std::string localDeviceId;
321     std::string remoteDeviceId = Str16ToStr8(devId);
322     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
323         || localDeviceId == remoteDeviceId) {
324         HILOGE("check deviceId failed!");
325         return INVALID_PARAMETERS_ERR;
326     }
327     {
328         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
329         auto& listenerInfo = listenDeviceMap_[devId];
330         if (!listenerInfo.Emplace(listener)) {
331             HILOGW("RegisterSyncListener listener has already inserted!");
332             return ERR_NONE;
333         }
334         bool ret = listener->AddDeathRecipient(listenerDeath_);
335         if (!ret) {
336             HILOGW("RegisterSyncListener AddDeathRecipient failed!");
337         }
338         if (listenerInfo.Size() > 1) {
339             HILOGI("RegisterMissionListener not notify remote DMS!");
340             return ERR_NONE;
341         }
342     }
343     return ERR_NONE;
344 }
345 
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)346 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
347     const std::string& localDevId)
348 {
349     std::u16string devId = Str8ToStr16(dstDevId);
350     {
351         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
352         auto iterItem = listenDeviceMap_.find(devId);
353         if (iterItem == listenDeviceMap_.end()) {
354             return ERR_NONE;
355         }
356         bool callFlag = iterItem->second.called;
357         if (callFlag) {
358             HILOGI("StartSyncRemoteMissions already called!");
359             return ERR_NONE;
360         }
361     }
362     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
363     if (remoteDms == nullptr) {
364         HILOGE("get remoteDms failed!");
365         RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
366         return GET_REMOTE_DMS_FAIL;
367     }
368     int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
369     if (ret == ERR_NONE) {
370         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
371         auto iterItem = listenDeviceMap_.find(devId);
372         if (iterItem != listenDeviceMap_.end()) {
373             iterItem->second.called = true;
374         }
375     }
376     return ret;
377 }
378 
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)379 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
380     const sptr<IDistributedSched>& remoteDms)
381 {
382     std::vector<DstbMissionInfo> missionInfos;
383     CallerInfo callerInfo;
384     if (!GenerateCallerInfo(callerInfo)) {
385         return GET_LOCAL_DEVICE_ERR;
386     }
387     int64_t begin = GetTickCount();
388     int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
389     HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
390         ret, GetTickCount() - begin);
391     if (ret == ERR_NONE) {
392         RebornMissionCache(dstDevId, missionInfos);
393     }
394     return ret;
395 }
396 
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)397 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
398     const sptr<IRemoteObject>& listener)
399 {
400     if (listener == nullptr) {
401         return INVALID_PARAMETERS_ERR;
402     }
403     std::string localDeviceId;
404     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
405         || localDeviceId == Str16ToStr8(devId)) {
406         HILOGE("check deviceId fail");
407         return INVALID_PARAMETERS_ERR;
408     }
409     {
410         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
411         auto iterItem = listenDeviceMap_.find(devId);
412         if (iterItem == listenDeviceMap_.end()) {
413             return ERR_NONE;
414         }
415         auto& listenerInfo = iterItem->second;
416         auto ret = listenerInfo.Find(listener);
417         if (!ret) {
418             HILOGI("listener not registered!");
419             return ERR_NONE;
420         }
421         listener->RemoveDeathRecipient(listenerDeath_);
422         listenerInfo.Erase(listener);
423         if (!listenerInfo.Empty()) {
424             return ERR_NONE;
425         }
426         listenDeviceMap_.erase(iterItem);
427     }
428     return ERR_NONE;
429 }
430 
CleanMissionResources(const std::string & dstDevId)431 void DistributedSchedMissionManager::CleanMissionResources(const std::string& dstDevId)
432 {
433     {
434         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
435         auto iterDevice = listenDeviceMap_.find(Str8ToStr16(dstDevId));
436         if (iterDevice == listenDeviceMap_.end()) {
437             return;
438         }
439         auto& listenerInfo = iterDevice->second;
440         for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
441             if (listener != nullptr) {
442                 listener->RemoveDeathRecipient(listenerDeath_);
443             }
444         }
445         listenDeviceMap_.erase(iterDevice);
446     }
447     StopSyncRemoteMissions(dstDevId, true);
448 }
449 
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)450 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
451     bool offline, bool exit)
452 {
453     CleanMissionCache(dstDevId);
454     DeleteCachedSnapshotInfo(dstDevId);
455     DeleteDataStorage(dstDevId, true);
456 
457     if (offline) {
458         return ERR_NONE;
459     }
460     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
461     if (remoteDms == nullptr) {
462         HILOGE("DMS get remoteDms failed");
463         return GET_REMOTE_DMS_FAIL;
464     }
465 
466     CallerInfo callerInfo;
467     if (!GenerateCallerInfo(callerInfo)) {
468         return GET_LOCAL_DEVICE_ERR;
469     }
470     int64_t begin = GetTickCount();
471     int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
472     HILOGI("[PerformanceTest] ret:%d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
473     return ret;
474 }
475 
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)476 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
477     int64_t tag)
478 {
479     std::string localDeviceId;
480     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
481         || (dstDevId == localDeviceId)) {
482         HILOGE("check deviceId fail");
483         return INVALID_PARAMETERS_ERR;
484     }
485     HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
486         DnetworkAdapter::AnonymizeDeviceId(dstDevId).c_str(),
487         DnetworkAdapter::AnonymizeDeviceId(localDeviceId).c_str());
488     auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
489     if (ret != ERR_NONE) {
490         HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
491         return ret;
492     }
493     return ERR_NONE;
494 }
495 
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfos)496 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
497     std::vector<DstbMissionInfo>& missionInfos)
498 {
499     auto deviceId = callerInfo.sourceDeviceId;
500     HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
501     {
502         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
503         remoteSyncDeviceSet_.emplace(deviceId);
504     }
505     int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
506         missionInfos);
507     auto func = [this, missionInfos]() {
508         HILOGD("RegisterMissionListener called.");
509         if (!isRegMissionChange_) {
510             int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
511             if (ret == ERR_OK) {
512                 isRegMissionChange_ = true;
513             }
514             InitAllSnapshots(missionInfos);
515         }
516     };
517     if (!missionHandler_->PostTask(func)) {
518         HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
519     }
520     return result;
521 }
522 
StopSyncMissionsFromRemote(const std::string & deviceId)523 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& deviceId)
524 {
525     HILOGD(" %{private}s!", deviceId.c_str());
526     {
527         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
528         remoteSyncDeviceSet_.erase(deviceId);
529         if (remoteSyncDeviceSet_.empty()) {
530             auto func = [this]() {
531                 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
532                 if (ret == ERR_OK) {
533                     isRegMissionChange_ = false;
534                 }
535             };
536             if (!missionHandler_->PostTask(func)) {
537                 HILOGE("post UnRegisterMissionListener Task failed");
538             }
539         }
540     }
541 }
542 
NeedSyncDevice(const std::string & deviceId)543 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
544 {
545     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
546     if (remoteSyncDeviceSet_.count(deviceId) == 0) {
547         return false;
548     }
549     return true;
550 }
551 
HasSyncListener(const std::string & networkId)552 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
553 {
554     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
555     auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
556     if (iter != listenDeviceMap_.end()) {
557         return iter->second.called;
558     }
559     return false;
560 }
561 
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)562 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
563 {
564     std::u16string u16DevId = Str8ToStr16(networkId);
565     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
566     auto iter = listenDeviceMap_.find(u16DevId);
567     if (iter == listenDeviceMap_.end()) {
568         return;
569     }
570     auto& listenerInfo = iter->second;
571     for (auto& listener : listenerInfo.listenerSet) {
572         MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
573     }
574 }
575 
OnRemoteDied(const wptr<IRemoteObject> & remote)576 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
577 {
578     HILOGD("OnRemoteDied!");
579     sptr<IRemoteObject> listener = remote.promote();
580     if (listener == nullptr) {
581         return;
582     }
583     auto remoteDiedFunc = [this, listener]() {
584         OnMissionListenerDied(listener);
585     };
586     if (missionHandler_ != nullptr) {
587         missionHandler_->PostTask(remoteDiedFunc);
588     }
589 }
590 
OnRemoteDied(const wptr<IRemoteObject> & remote)591 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
592 {
593     DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
594 }
595 
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)596 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
597     std::unique_ptr<Snapshot> snapshot)
598 {
599     if (deviceId.empty() || snapshot == nullptr) {
600         HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
601         return;
602     }
603     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
604     std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
605     auto iter = cachedSnapshotInfos_.find(keyInfo);
606     if (iter != cachedSnapshotInfos_.end()) {
607         if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
608             return;
609         }
610     }
611 
612     if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
613         int64_t oldest = -1;
614         auto iterOldest = cachedSnapshotInfos_.end();
615         for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
616             if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
617                 oldest = iterItem->second->GetLastAccessTime();
618                 iterOldest = iterItem;
619             }
620         }
621         if (iterOldest != cachedSnapshotInfos_.end()) {
622             cachedSnapshotInfos_.erase(iterOldest);
623         }
624     }
625     cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
626 }
627 
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)628 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
629     int32_t missionId)
630 {
631     if (deviceId.empty()) {
632         HILOGW("DequeueCachedSnapshotInfo invalid input param!");
633         return nullptr;
634     }
635     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
636     auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
637     if (iter != cachedSnapshotInfos_.end()) {
638         std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
639         snapshot->UpdateLastAccessTime(GetTickCount());
640         iter->second = nullptr;
641         cachedSnapshotInfos_.erase(iter);
642         return snapshot;
643     }
644     return nullptr;
645 }
646 
DeleteCachedSnapshotInfo(const std::string & networkId)647 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
648 {
649     if (networkId.empty()) {
650         return;
651     }
652     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
653     if (uuid.empty()) {
654         return;
655     }
656     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
657     auto iter = cachedSnapshotInfos_.begin();
658     while (iter != cachedSnapshotInfos_.end()) {
659         if (iter->first.find(uuid) != std::string::npos) {
660             iter = cachedSnapshotInfos_.erase(iter);
661         } else {
662             ++iter;
663         }
664     }
665 }
666 
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfos)667 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
668     std::vector<DstbMissionInfo>& missionInfos)
669 {
670     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
671     if (uuid.empty()) {
672         HILOGE("uuid empty!");
673         return INVALID_PARAMETERS_ERR;
674     }
675     std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
676     auto iter = deviceMissionInfos_.find(uuid);
677     if (iter == deviceMissionInfos_.end()) {
678         HILOGE("can not find uuid, deviceId: %{public}s!",
679             DnetworkAdapter::AnonymizeDeviceId(srcId).c_str());
680         return ERR_NULL_OBJECT;
681     }
682 
683     // get at most numMissions missions
684     int32_t actualNums = static_cast<int32_t>((iter->second).size());
685     if (actualNums < 0) {
686         HILOGE("invalid size!");
687         return ERR_NULL_OBJECT;
688     }
689     missionInfos.assign((iter->second).begin(),
690         (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
691     return ERR_NONE;
692 }
693 
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos)694 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
695     const std::vector<DstbMissionInfo>& missionInfos)
696 {
697     HILOGI("start! deviceId is %{public}s",
698         DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
699     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
700     if (uuid.empty()) {
701         HILOGE("uuid empty!");
702         return;
703     }
704     {
705         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
706         deviceMissionInfos_[uuid] = missionInfos;
707     }
708     HILOGI("RebornMissionCache end!");
709 }
710 
CleanMissionCache(const std::string & deviceId)711 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
712 {
713     HILOGI("CleanMissionCache start! deviceId is %{public}s",
714         DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
715     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
716     if (uuid.empty()) {
717         HILOGE("CleanMissionCache uuid empty!");
718         return;
719     }
720     {
721         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
722         deviceMissionInfos_.erase(uuid);
723     }
724     HILOGI("CleanMissionCache end!");
725 }
726 
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfos)727 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
728     const std::vector<DstbMissionInfo>& missionInfos)
729 {
730     HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
731     std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
732     RebornMissionCache(callerInfo.sourceDeviceId, missionInfos);
733     {
734         HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
735         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
736         auto iter = listenDeviceMap_.find(u16DevId);
737         if (iter == listenDeviceMap_.end()) {
738             HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
739             return INVALID_PARAMETERS_ERR;
740         }
741         auto& listenerSet = iter->second.listenerSet;
742         auto notifyChanged = [listenerSet, u16DevId] () {
743             for (const auto& listener : listenerSet) {
744                 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
745             }
746         };
747         if (missionHandler_ != nullptr) {
748             missionHandler_->PostTask(notifyChanged);
749             HILOGI("NotifyMissionsChangedFromRemote end!");
750             return ERR_NONE;
751         }
752     }
753     return INVALID_PARAMETERS_ERR;
754 }
755 
NotifyLocalMissionsChanged()756 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
757 {
758     auto func = [this]() {
759         HILOGI("NotifyLocalMissionsChanged");
760         std::vector<DstbMissionInfo> missionInfos;
761         int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
762             missionInfos);
763         if (ret == ERR_OK) {
764             int32_t result = NotifyMissionsChangedToRemote(missionInfos);
765             HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
766         }
767     };
768     if (!missionChangeHandler_->PostTask(func)) {
769         HILOGE("postTask failed");
770     }
771 }
772 
NotifyMissionSnapshotCreated(int32_t missionId)773 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
774 {
775     auto func = [this, missionId]() {
776         HILOGD("called.");
777         ErrCode errCode = MissionSnapshotChanged(missionId);
778         if (errCode != ERR_OK) {
779             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
780         }
781     };
782     if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
783         HILOGE("post MissionSnapshotChanged delay Task failed");
784     }
785 }
786 
NotifyMissionSnapshotChanged(int32_t missionId)787 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
788 {
789     auto func = [this, missionId]() {
790         HILOGD("called.");
791         ErrCode errCode = MissionSnapshotChanged(missionId);
792         if (errCode != ERR_OK) {
793             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
794         }
795     };
796     if (!missionChangeHandler_->PostTask(func)) {
797         HILOGE("post MissionSnapshotChanged Task failed");
798     }
799 }
800 
NotifyMissionSnapshotDestroyed(int32_t missionId)801 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
802 {
803     auto func = [this, missionId]() {
804         HILOGD("called.");
805         ErrCode errCode = MissionSnapshotDestroyed(missionId);
806         if (errCode != ERR_OK) {
807             HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
808         }
809     };
810     if (!missionChangeHandler_->PostTask(func)) {
811         HILOGE("post MissionSnapshotDestroyed Task failed");
812     }
813 }
814 
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfos)815 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo>& missionInfos)
816 {
817     CallerInfo callerInfo;
818     if (!GenerateCallerInfo(callerInfo)) {
819         return GET_LOCAL_DEVICE_ERR;
820     }
821     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
822     for (const auto& destDeviceId : remoteSyncDeviceSet_) {
823         auto handler = FetchDeviceHandler(destDeviceId);
824         if (handler == nullptr) {
825             HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
826             continue;
827         }
828         auto callback = [destDeviceId, missionInfos, callerInfo, this] () {
829             NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfos, callerInfo);
830         };
831         if (!handler->PostTask(callback)) {
832             HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
833             return ERR_NULL_OBJECT;
834         }
835     }
836 
837     return ERR_NONE;
838 }
839 
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos,const CallerInfo & callerInfo)840 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
841     const std::vector<DstbMissionInfo>& missionInfos, const CallerInfo& callerInfo)
842 {
843     sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
844     if (remoteDms == nullptr) {
845         HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
846         return;
847     }
848     int64_t begin = GetTickCount();
849     int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfos, callerInfo);
850     HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
851         result, GetTickCount() - begin);
852 }
853 
GenerateCallerInfo(CallerInfo & callerInfo)854 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
855 {
856     std::string localUuid;
857     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
858         HILOGE("get local uuid failed!");
859         return false;
860     }
861     callerInfo.uid = IPCSkeleton::GetCallingUid();
862     callerInfo.pid = IPCSkeleton::GetCallingPid();
863     callerInfo.callerType = CALLER_TYPE_HARMONY;
864     callerInfo.sourceDeviceId = localUuid;
865     callerInfo.dmsVersion = VERSION;
866     return true;
867 }
868 
FetchDeviceHandler(const std::string & deviceId)869 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
870     const std::string& deviceId)
871 {
872     if (!IsDeviceIdValidated(deviceId)) {
873         HILOGW("FetchDeviceHandler device:%{public}s offline.",
874             DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
875         return nullptr;
876     }
877 
878     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
879     if (uuid.empty()) {
880         HILOGE("FetchDeviceHandler uuid empty!");
881         return nullptr;
882     }
883 
884     auto iter = deviceHandle_.find(uuid);
885     if (iter != deviceHandle_.end()) {
886         return iter->second;
887     }
888 
889     auto anonyUuid = DnetworkAdapter::AnonymizeDeviceId(uuid);
890     auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
891     auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
892     deviceHandle_.emplace(uuid, handler);
893     return handler;
894 }
895 
OnRemoteDied(const wptr<IRemoteObject> & remote)896 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
897 {
898     HILOGI("OnRemoteDied received died notify!");
899     DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
900 }
901 
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)902 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
903 {
904     sptr<IRemoteObject> diedRemoted = remote.promote();
905     if (diedRemoted == nullptr) {
906         HILOGW("OnRemoteDmsDied promote failed!");
907         return;
908     }
909     HILOGD("delete diedRemoted");
910     auto remoteDmsDiedFunc = [this, diedRemoted]() {
911         OnRemoteDmsDied(diedRemoted);
912     };
913     if (missionHandler_ != nullptr) {
914         missionHandler_->PostTask(remoteDmsDiedFunc);
915     }
916 }
917 
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)918 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
919     const std::string& localDevId, int32_t retryTimes)
920 {
921     auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
922         bool ret = HasSyncListener(dstDeviceId);
923         if (!ret) {
924             return;
925         }
926         sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
927         if (remoteDms == nullptr) {
928             HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
929             RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
930             return;
931         }
932         int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
933         HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
934     };
935     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
936         missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
937     }
938 }
939 
OnMissionListenerDied(const sptr<IRemoteObject> & remote)940 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
941 {
942     HILOGI("OnMissionListenerDied");
943     std::set<std::string> deviceSet;
944     {
945         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
946         auto iterItem = listenDeviceMap_.begin();
947         while (iterItem != listenDeviceMap_.end()) {
948             auto& listenerInfo = iterItem->second;
949             auto ret = listenerInfo.Find(remote);
950             if (!ret) {
951                 ++iterItem;
952                 continue;
953             }
954             remote->RemoveDeathRecipient(listenerDeath_);
955             listenerInfo.Erase(remote);
956             if (listenerInfo.Empty()) {
957                 if (listenerInfo.called) {
958                     deviceSet.emplace(Str16ToStr8(iterItem->first));
959                 }
960                 iterItem = listenDeviceMap_.erase(iterItem);
961             } else {
962                 ++iterItem;
963             }
964         }
965     }
966     for (auto& devId : deviceSet) {
967         StopSyncRemoteMissions(devId, false);
968     }
969 }
970 
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)971 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
972 {
973     HILOGI("OnRemoteDmsDied");
974     std::string devId;
975     {
976         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
977         for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
978             if (iter->second->AsObject() == remote) {
979                 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
980                 devId = iter->first;
981                 remoteDmsMap_.erase(iter);
982                 break;
983             }
984         }
985     }
986     if (devId.empty()) {
987         return;
988     }
989     bool ret = HasSyncListener(devId);
990     if (ret) {
991         std::string localDeviceId;
992         if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
993             return;
994         }
995         RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
996     }
997 }
998 
NotifyDmsProxyProcessDied()999 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1000 {
1001     HILOGI("NotifyDmsProxyProcessDied!");
1002     if (!isRegMissionChange_) {
1003         return;
1004     }
1005     RetryRegisterMissionChange(0);
1006 }
1007 
RetryRegisterMissionChange(int32_t retryTimes)1008 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1009 {
1010     auto remoteDiedFunc = [this, retryTimes]() {
1011         HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1012         if (!isRegMissionChange_) {
1013             return;
1014         }
1015         int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1016         if (ret == ERR_NULL_OBJECT) {
1017             RetryRegisterMissionChange(retryTimes + 1);
1018             HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1019             return;
1020         }
1021         HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1022     };
1023     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1024         missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1025     }
1026 }
1027 
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfos)1028 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfos)
1029 {
1030     for (auto iter = missionInfos.begin(); iter != missionInfos.end(); iter++) {
1031         ErrCode errCode = MissionSnapshotChanged(iter->id);
1032         if (errCode != ERR_OK) {
1033             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1034         }
1035     }
1036 }
1037 
MissionSnapshotChanged(int32_t missionId)1038 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1039 {
1040     std::string networkId;
1041     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1042         HILOGE("get local networkId failed!");
1043         return INVALID_PARAMETERS_ERR;
1044     }
1045     AAFwk::MissionSnapshot missionSnapshot;
1046     ErrCode errCode = DistributedSchedAdapter::GetInstance()
1047         .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1048     if (errCode != ERR_OK) {
1049         HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1050         return errCode;
1051     }
1052     Snapshot snapshot;
1053     SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1054     MessageParcel data;
1055     errCode = MissionSnapshotSequence(snapshot, data);
1056     if (errCode != ERR_OK) {
1057         HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1058         return errCode;
1059     }
1060     size_t len = data.GetReadableBytes();
1061     const uint8_t* byteStream = data.ReadBuffer(len);
1062     errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1063     return errCode;
1064 }
1065 
MissionSnapshotDestroyed(int32_t missionId)1066 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1067 {
1068     std::string networkId;
1069     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1070         HILOGE("get local networkId failed!");
1071         return INVALID_PARAMETERS_ERR;
1072     }
1073     ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1074     return errCode;
1075 }
1076 
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1077 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1078 {
1079     bool ret = snapshot.WriteSnapshotInfo(data);
1080     if (!ret) {
1081         HILOGE("WriteSnapshotInfo failed!");
1082         return ERR_FLATTEN_OBJECT;
1083     }
1084     ret = snapshot.WritePixelMap(data);
1085     if (!ret) {
1086         HILOGE("WritePixelMap failed!");
1087         return ERR_FLATTEN_OBJECT;
1088     }
1089     return ERR_OK;
1090 }
1091 
OnDnetDied()1092 void DistributedSchedMissionManager::OnDnetDied()
1093 {
1094     auto dnetDiedFunc = [this]() {
1095         HILOGI("OnDnetDied");
1096         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1097         if (!isRegMissionChange_) {
1098             return;
1099         }
1100         remoteSyncDeviceSet_.clear();
1101         DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1102         isRegMissionChange_ = false;
1103     };
1104     if (missionHandler_ != nullptr) {
1105         missionHandler_->PostTask(dnetDiedFunc);
1106     }
1107 }
1108 } // namespace DistributedSchedule
1109 } // namespace OHOS
1110