• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mission/distributed_sched_mission_manager.h"
17 
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21 
22 #include "datetime_ex.h"
23 #include "distributed_sched_adapter.h"
24 #include "dtbschedmgr_device_info_storage.h"
25 #include "dtbschedmgr_log.h"
26 #include "ipc_skeleton.h"
27 #include "iservice_registry.h"
28 #include "mission/mission_changed_notify.h"
29 #include "mission/mission_constant.h"
30 #include "mission/mission_info_converter.h"
31 #include "mission/snapshot_converter.h"
32 #include "nlohmann/json.hpp"
33 #include "string_ex.h"
34 #include "system_ability_definition.h"
35 
36 namespace OHOS {
37 namespace DistributedSchedule {
38 namespace {
39 const std::string TAG = "DistributedSchedMissionManager";
40 constexpr size_t MAX_CACHED_ITEM = 10;
41 constexpr int32_t MAX_RETRY_TIMES = 15;
42 constexpr int32_t RETRY_DELAYED = 2000;
43 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
44 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
45 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
46 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
47 }
48 namespace Mission {
49 constexpr int32_t GET_MAX_MISSIONS = 20;
50 } // Mission
51 using namespace std::chrono;
52 using namespace Constants::Mission;
53 using namespace OHOS::DistributedKv;
54 
55 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
56 
Init()57 void DistributedSchedMissionManager::Init()
58 {
59     listenerDeath_ = new ListenerDeathRecipient();
60     remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
61     auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
62     missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
63     auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
64     updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
65     missonChangeListener_ = new DistributedMissionChangeListener();
66     auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
67     missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
68 }
69 
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfos)70 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
71     int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfos)
72 {
73     HILOGI("start!");
74     if (!IsDeviceIdValidated(deviceId)) {
75         return INVALID_PARAMETERS_ERR;
76     }
77     if (numMissions <= 0) {
78         HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
79         return INVALID_PARAMETERS_ERR;
80     }
81     std::vector<DstbMissionInfo> dstbMissionInfos;
82     int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfos);
83     if (ret != ERR_OK) {
84         HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
85         return ret;
86     }
87     return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfos, missionInfos);
88 }
89 
GetRemoteDms(const std::string & deviceId)90 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
91 {
92     int64_t begin = GetTickCount();
93     if (deviceId.empty()) {
94         HILOGE("GetRemoteDms remoteDeviceId is empty");
95         return nullptr;
96     }
97     {
98         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
99         auto iter = remoteDmsMap_.find(deviceId);
100         if (iter != remoteDmsMap_.end()) {
101             auto object = iter->second;
102             if (object != nullptr) {
103                 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
104                     GetTickCount() - begin);
105                 return object;
106             }
107         }
108     }
109     HILOGD("GetRemoteDms connect deviceid is %s", deviceId.c_str());
110     auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
111     if (samgr == nullptr) {
112         HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
113         return nullptr;
114     }
115     auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
116     if (object == nullptr) {
117         HILOGE("GetRemoteDms failed to get dms for remote device:%{public}s!",
118             DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
119         return nullptr;
120     }
121     auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
122     HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
123     sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
124     {
125         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
126         auto iter = remoteDmsMap_.find(deviceId);
127         if (iter != remoteDmsMap_.end()) {
128             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
129         }
130         remoteDmsMap_[deviceId] = remoteDmsObj;
131     }
132     HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
133     return remoteDmsObj;
134 }
135 
IsDeviceIdValidated(const std::string & deviceId)136 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
137 {
138     if (deviceId.empty()) {
139         HILOGE("IsDeviceIdValidated deviceId is empty!");
140         return false;
141     }
142     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
143         HILOGW("IsDeviceIdValidated device offline.");
144         return false;
145     }
146 
147     return true;
148 }
149 
NotifyRemoteDied(const wptr<IRemoteObject> & remote)150 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
151 {
152     if (distributedDataStorage_ == nullptr) {
153         HILOGE("DistributedDataStorage null!");
154         return;
155     }
156     distributedDataStorage_->NotifyRemoteDied(remote);
157 }
158 
InitDataStorage()159 int32_t DistributedSchedMissionManager::InitDataStorage()
160 {
161     if (distributedDataStorage_ == nullptr) {
162         distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
163     }
164     if (!distributedDataStorage_->Init()) {
165         HILOGE("InitDataStorage DistributedDataStorage init failed!");
166         return ERR_NULL_OBJECT;
167     }
168     return ERR_NONE;
169 }
170 
StopDataStorage()171 int32_t DistributedSchedMissionManager::StopDataStorage()
172 {
173     if (distributedDataStorage_ == nullptr) {
174         HILOGE("StopDataStorage DistributedDataStorage null!");
175         return ERR_NULL_OBJECT;
176     }
177     if (!distributedDataStorage_->Stop()) {
178         HILOGE("StopDataStorage DistributedDataStorage stop failed!");
179         return ERR_NULL_OBJECT;
180     }
181     return ERR_NONE;
182 }
183 
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)184 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
185     const uint8_t* byteStream, size_t len)
186 {
187     if (distributedDataStorage_ == nullptr) {
188         HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
189         return ERR_NULL_OBJECT;
190     }
191     if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
192         HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
193         return INVALID_PARAMETERS_ERR;
194     }
195     return ERR_NONE;
196 }
197 
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)198 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
199 {
200     if (distributedDataStorage_ == nullptr) {
201         HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
202         return ERR_NULL_OBJECT;
203     }
204     if (!distributedDataStorage_->Delete(deviceId, missionId)) {
205         HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
206         return INVALID_PARAMETERS_ERR;
207     }
208     return ERR_NONE;
209 }
210 
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)211 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
212     std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
213 {
214     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
215     if (uuid.empty()) {
216         HILOGE("uuid is empty!");
217         return INVALID_PARAMETERS_ERR;
218     }
219     std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
220     if (snapshotPtr != nullptr) {
221         HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from cache successful!",
222             DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
223         SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
224         return ERR_NONE;
225     }
226     if (distributedDataStorage_ == nullptr) {
227         HILOGE("DistributedDataStorage null!");
228         return ERR_NULL_OBJECT;
229     }
230     DistributedKv::Value value;
231     bool ret = distributedDataStorage_->Query(networkId, missionId, value);
232     if (!ret) {
233         HILOGE("DistributedDataStorage query failed!");
234         return INVALID_PARAMETERS_ERR;
235     }
236     snapshotPtr = Snapshot::Create(value.Data());
237     if (snapshotPtr == nullptr) {
238         HILOGE("snapshot create failed!");
239         return ERR_NULL_OBJECT;
240     }
241     HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from DistributedDB successful!",
242         DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
243     SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
244     return ERR_NONE;
245 }
246 
DeviceOnlineNotify(const std::string & networkId)247 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
248 {
249     if (networkId.empty()) {
250         HILOGW("DeviceOnlineNotify networkId empty!");
251         return;
252     }
253 
254     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
255     if (missionHandler_ != nullptr) {
256         HILOGI("DeviceOnlineNotify RemoveTask");
257         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
258     }
259 }
260 
DeviceOfflineNotify(const std::string & networkId)261 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
262 {
263     if (networkId.empty()) {
264         HILOGW("DeviceOfflineNotify networkId empty!");
265         return;
266     }
267     StopSyncMissionsFromRemote(networkId);
268     CleanMissionResources(networkId);
269     {
270         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
271         auto iter = remoteDmsMap_.find(networkId);
272         if (iter != remoteDmsMap_.end()) {
273             iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
274             remoteDmsMap_.erase(iter);
275         }
276     }
277     HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s",
278         DnetworkAdapter::AnonymizeNetworkId(networkId).c_str());
279 }
280 
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283     if (distributedDataStorage_ == nullptr) {
284         HILOGE("DeleteDataStorage DistributedDataStorage null!");
285         return;
286     }
287     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288     auto callback = [this, uuid, deviceId]() {
289         if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290             HILOGE("DeleteDataStorage storage delete failed!");
291         } else {
292             HILOGI("DeleteDataStorage storage delete successfully!");
293         }
294     };
295     if (isDelayed) {
296         if (missionHandler_ != nullptr) {
297             HILOGI("DeleteDataStorage PostTask");
298             missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299         }
300     } else {
301         if (missionHandler_ != nullptr) {
302             HILOGI("DeleteDataStorage RemoveTask");
303             missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304         }
305         callback();
306     }
307 }
308 
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310     const sptr<IRemoteObject>& listener)
311 {
312     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313     if (uuid.empty()) {
314         HILOGE("uuid is empty!");
315         return INVALID_PARAMETERS_ERR;
316     }
317     if (missionHandler_ != nullptr) {
318         HILOGI("RemoveTask");
319         missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
320     }
321     if (listener == nullptr) {
322         return INVALID_PARAMETERS_ERR;
323     }
324     std::string localDeviceId;
325     std::string remoteDeviceId = Str16ToStr8(devId);
326     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
327         || localDeviceId == remoteDeviceId) {
328         HILOGE("check deviceId failed!");
329         return INVALID_PARAMETERS_ERR;
330     }
331     {
332         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
333         auto& listenerInfo = listenDeviceMap_[devId];
334         if (!listenerInfo.Emplace(listener)) {
335             HILOGW("RegisterSyncListener listener has already inserted!");
336             return ERR_NONE;
337         }
338         bool ret = listener->AddDeathRecipient(listenerDeath_);
339         if (!ret) {
340             HILOGW("RegisterSyncListener AddDeathRecipient failed!");
341         }
342         if (listenerInfo.Size() > 1) {
343             HILOGI("RegisterMissionListener not notify remote DMS!");
344             return ERR_NONE;
345         }
346     }
347     return ERR_NONE;
348 }
349 
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)350 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
351     const std::string& localDevId)
352 {
353     std::u16string devId = Str8ToStr16(dstDevId);
354     {
355         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
356         auto iterItem = listenDeviceMap_.find(devId);
357         if (iterItem == listenDeviceMap_.end()) {
358             return ERR_NONE;
359         }
360         bool callFlag = iterItem->second.called;
361         if (callFlag) {
362             HILOGI("StartSyncRemoteMissions already called!");
363             return ERR_NONE;
364         }
365     }
366     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
367     if (remoteDms == nullptr) {
368         HILOGE("get remoteDms failed!");
369         RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
370         return GET_REMOTE_DMS_FAIL;
371     }
372     int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
373     if (ret == ERR_NONE) {
374         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
375         auto iterItem = listenDeviceMap_.find(devId);
376         if (iterItem != listenDeviceMap_.end()) {
377             iterItem->second.called = true;
378         }
379     }
380     return ret;
381 }
382 
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)383 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
384     const sptr<IDistributedSched>& remoteDms)
385 {
386     std::vector<DstbMissionInfo> missionInfos;
387     CallerInfo callerInfo;
388     if (!GenerateCallerInfo(callerInfo)) {
389         return GET_LOCAL_DEVICE_ERR;
390     }
391     int64_t begin = GetTickCount();
392     int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
393     HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
394         ret, GetTickCount() - begin);
395     if (ret == ERR_NONE) {
396         RebornMissionCache(dstDevId, missionInfos);
397     }
398     return ret;
399 }
400 
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)401 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
402     const sptr<IRemoteObject>& listener)
403 {
404     if (listener == nullptr) {
405         return INVALID_PARAMETERS_ERR;
406     }
407     if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
408         return INVALID_PARAMETERS_ERR;
409     }
410     std::string localDeviceId;
411     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
412         || localDeviceId == Str16ToStr8(devId)) {
413         HILOGE("check deviceId fail");
414         return INVALID_PARAMETERS_ERR;
415     }
416     {
417         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
418         auto iterItem = listenDeviceMap_.find(devId);
419         if (iterItem == listenDeviceMap_.end()) {
420             return ERR_NONE;
421         }
422         auto& listenerInfo = iterItem->second;
423         auto ret = listenerInfo.Find(listener);
424         if (!ret) {
425             HILOGI("listener not registered!");
426             return ERR_NONE;
427         }
428         listener->RemoveDeathRecipient(listenerDeath_);
429         listenerInfo.Erase(listener);
430         if (!listenerInfo.Empty()) {
431             return ERR_NONE;
432         }
433         listenDeviceMap_.erase(iterItem);
434     }
435     return ERR_NONE;
436 }
437 
CleanMissionResources(const std::string & networkId)438 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
439 {
440     {
441         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
442         auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
443         if (iterDevice == listenDeviceMap_.end()) {
444             return;
445         }
446         auto& listenerInfo = iterDevice->second;
447         for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
448             if (listener != nullptr) {
449                 listener->RemoveDeathRecipient(listenerDeath_);
450             }
451         }
452         listenDeviceMap_.erase(iterDevice);
453     }
454     StopSyncRemoteMissions(networkId, true);
455 }
456 
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)457 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
458     bool offline, bool exit)
459 {
460     CleanMissionCache(dstDevId);
461     DeleteCachedSnapshotInfo(dstDevId);
462     DeleteDataStorage(dstDevId, true);
463 
464     if (offline) {
465         return ERR_NONE;
466     }
467     sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
468     if (remoteDms == nullptr) {
469         HILOGE("DMS get remoteDms failed");
470         return GET_REMOTE_DMS_FAIL;
471     }
472 
473     CallerInfo callerInfo;
474     if (!GenerateCallerInfo(callerInfo)) {
475         return GET_LOCAL_DEVICE_ERR;
476     }
477     int64_t begin = GetTickCount();
478     int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
479     HILOGI("[PerformanceTest] ret:%d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
480     return ret;
481 }
482 
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)483 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
484     int64_t tag)
485 {
486     std::string localDeviceId;
487     if (!IsDeviceIdValidated(dstDevId)) {
488         return INVALID_PARAMETERS_ERR;
489     }
490     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
491         || (dstDevId == localDeviceId)) {
492         HILOGE("check deviceId fail");
493         return INVALID_PARAMETERS_ERR;
494     }
495     HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
496         DnetworkAdapter::AnonymizeNetworkId(dstDevId).c_str(),
497         DnetworkAdapter::AnonymizeNetworkId(localDeviceId).c_str());
498     auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
499     if (ret != ERR_NONE) {
500         HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
501         return ret;
502     }
503     return ERR_NONE;
504 }
505 
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfos)506 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
507     std::vector<DstbMissionInfo>& missionInfos)
508 {
509     auto deviceId = callerInfo.sourceDeviceId;
510     HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
511     {
512         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
513         remoteSyncDeviceSet_.emplace(deviceId);
514     }
515     int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
516         missionInfos);
517     auto func = [this, missionInfos]() {
518         HILOGD("RegisterMissionListener called.");
519         if (!isRegMissionChange_) {
520             int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
521             if (ret == ERR_OK) {
522                 isRegMissionChange_ = true;
523             }
524             InitAllSnapshots(missionInfos);
525         }
526     };
527     if (!missionHandler_->PostTask(func)) {
528         HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
529     }
530     return result;
531 }
532 
StopSyncMissionsFromRemote(const std::string & networkId)533 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
534 {
535     HILOGD(" %{private}s!", networkId.c_str());
536     {
537         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
538         remoteSyncDeviceSet_.erase(networkId);
539         if (remoteSyncDeviceSet_.empty()) {
540             auto func = [this]() {
541                 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
542                 if (ret == ERR_OK) {
543                     isRegMissionChange_ = false;
544                 }
545             };
546             if (!missionHandler_->PostTask(func)) {
547                 HILOGE("post UnRegisterMissionListener Task failed");
548             }
549         }
550     }
551 }
552 
NeedSyncDevice(const std::string & deviceId)553 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
554 {
555     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
556     if (remoteSyncDeviceSet_.count(deviceId) == 0) {
557         return false;
558     }
559     return true;
560 }
561 
HasSyncListener(const std::string & networkId)562 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
563 {
564     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
565     auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
566     if (iter != listenDeviceMap_.end()) {
567         return iter->second.called;
568     }
569     return false;
570 }
571 
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)572 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
573 {
574     std::u16string u16DevId = Str8ToStr16(networkId);
575     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
576     auto iter = listenDeviceMap_.find(u16DevId);
577     if (iter == listenDeviceMap_.end()) {
578         return;
579     }
580     auto& listenerInfo = iter->second;
581     for (auto& listener : listenerInfo.listenerSet) {
582         MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
583     }
584 }
585 
OnRemoteDied(const wptr<IRemoteObject> & remote)586 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
587 {
588     HILOGD("OnRemoteDied!");
589     sptr<IRemoteObject> listener = remote.promote();
590     if (listener == nullptr) {
591         return;
592     }
593     auto remoteDiedFunc = [this, listener]() {
594         OnMissionListenerDied(listener);
595     };
596     if (missionHandler_ != nullptr) {
597         missionHandler_->PostTask(remoteDiedFunc);
598     }
599 }
600 
OnRemoteDied(const wptr<IRemoteObject> & remote)601 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
602 {
603     DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
604 }
605 
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)606 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
607     std::unique_ptr<Snapshot> snapshot)
608 {
609     if (deviceId.empty() || snapshot == nullptr) {
610         HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
611         return;
612     }
613     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
614     std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
615     auto iter = cachedSnapshotInfos_.find(keyInfo);
616     if (iter != cachedSnapshotInfos_.end()) {
617         if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
618             return;
619         }
620     }
621 
622     if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
623         int64_t oldest = -1;
624         auto iterOldest = cachedSnapshotInfos_.end();
625         for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
626             if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
627                 oldest = iterItem->second->GetLastAccessTime();
628                 iterOldest = iterItem;
629             }
630         }
631         if (iterOldest != cachedSnapshotInfos_.end()) {
632             cachedSnapshotInfos_.erase(iterOldest);
633         }
634     }
635     cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
636 }
637 
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)638 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
639     int32_t missionId)
640 {
641     if (deviceId.empty()) {
642         HILOGW("DequeueCachedSnapshotInfo invalid input param!");
643         return nullptr;
644     }
645     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
646     auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
647     if (iter != cachedSnapshotInfos_.end()) {
648         std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
649         snapshot->UpdateLastAccessTime(GetTickCount());
650         iter->second = nullptr;
651         cachedSnapshotInfos_.erase(iter);
652         return snapshot;
653     }
654     return nullptr;
655 }
656 
DeleteCachedSnapshotInfo(const std::string & networkId)657 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
658 {
659     if (networkId.empty()) {
660         return;
661     }
662     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
663     if (uuid.empty()) {
664         return;
665     }
666     std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
667     auto iter = cachedSnapshotInfos_.begin();
668     while (iter != cachedSnapshotInfos_.end()) {
669         if (iter->first.find(uuid) != std::string::npos) {
670             iter = cachedSnapshotInfos_.erase(iter);
671         } else {
672             ++iter;
673         }
674     }
675 }
676 
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfos)677 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
678     std::vector<DstbMissionInfo>& missionInfos)
679 {
680     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
681     if (uuid.empty()) {
682         HILOGE("uuid empty!");
683         return INVALID_PARAMETERS_ERR;
684     }
685     std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
686     auto iter = deviceMissionInfos_.find(uuid);
687     if (iter == deviceMissionInfos_.end()) {
688         HILOGE("can not find uuid, deviceId: %{public}s!",
689             DnetworkAdapter::AnonymizeNetworkId(srcId).c_str());
690         return ERR_NULL_OBJECT;
691     }
692 
693     // get at most numMissions missions
694     int32_t actualNums = static_cast<int32_t>((iter->second).size());
695     if (actualNums < 0) {
696         HILOGE("invalid size!");
697         return ERR_NULL_OBJECT;
698     }
699     missionInfos.assign((iter->second).begin(),
700         (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
701     return ERR_NONE;
702 }
703 
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos)704 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
705     const std::vector<DstbMissionInfo>& missionInfos)
706 {
707     HILOGI("start! deviceId is %{public}s",
708         DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
709     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
710     if (uuid.empty()) {
711         HILOGE("uuid empty!");
712         return;
713     }
714     {
715         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
716         deviceMissionInfos_[uuid] = missionInfos;
717     }
718     HILOGI("RebornMissionCache end!");
719 }
720 
CleanMissionCache(const std::string & deviceId)721 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
722 {
723     HILOGI("CleanMissionCache start! deviceId is %{public}s",
724         DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
725     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
726     if (uuid.empty()) {
727         HILOGE("CleanMissionCache uuid empty!");
728         return;
729     }
730     {
731         std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
732         deviceMissionInfos_.erase(uuid);
733     }
734     HILOGI("CleanMissionCache end!");
735 }
736 
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfos)737 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
738     const std::vector<DstbMissionInfo>& missionInfos)
739 {
740     HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
741     std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
742     RebornMissionCache(callerInfo.sourceDeviceId, missionInfos);
743     {
744         HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
745         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
746         auto iter = listenDeviceMap_.find(u16DevId);
747         if (iter == listenDeviceMap_.end()) {
748             HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
749             return INVALID_PARAMETERS_ERR;
750         }
751         auto& listenerSet = iter->second.listenerSet;
752         auto notifyChanged = [listenerSet, u16DevId] () {
753             for (const auto& listener : listenerSet) {
754                 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
755             }
756         };
757         if (missionHandler_ != nullptr) {
758             missionHandler_->PostTask(notifyChanged);
759             HILOGI("NotifyMissionsChangedFromRemote end!");
760             return ERR_NONE;
761         }
762     }
763     return INVALID_PARAMETERS_ERR;
764 }
765 
NotifyLocalMissionsChanged()766 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
767 {
768     auto func = [this]() {
769         HILOGI("NotifyLocalMissionsChanged");
770         std::vector<DstbMissionInfo> missionInfos;
771         int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
772             missionInfos);
773         if (ret == ERR_OK) {
774             int32_t result = NotifyMissionsChangedToRemote(missionInfos);
775             HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
776         }
777     };
778     if (!missionChangeHandler_->PostTask(func)) {
779         HILOGE("postTask failed");
780     }
781 }
782 
NotifyMissionSnapshotCreated(int32_t missionId)783 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
784 {
785     auto func = [this, missionId]() {
786         HILOGD("called.");
787         ErrCode errCode = MissionSnapshotChanged(missionId);
788         if (errCode != ERR_OK) {
789             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
790         }
791     };
792     if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
793         HILOGE("post MissionSnapshotChanged delay Task failed");
794     }
795 }
796 
NotifyMissionSnapshotChanged(int32_t missionId)797 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
798 {
799     auto func = [this, missionId]() {
800         HILOGD("called.");
801         ErrCode errCode = MissionSnapshotChanged(missionId);
802         if (errCode != ERR_OK) {
803             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
804         }
805     };
806     if (!missionChangeHandler_->PostTask(func)) {
807         HILOGE("post MissionSnapshotChanged Task failed");
808     }
809 }
810 
NotifyMissionSnapshotDestroyed(int32_t missionId)811 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
812 {
813     auto func = [this, missionId]() {
814         HILOGD("called.");
815         ErrCode errCode = MissionSnapshotDestroyed(missionId);
816         if (errCode != ERR_OK) {
817             HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
818         }
819     };
820     if (!missionChangeHandler_->PostTask(func)) {
821         HILOGE("post MissionSnapshotDestroyed Task failed");
822     }
823 }
824 
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfos)825 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo>& missionInfos)
826 {
827     CallerInfo callerInfo;
828     if (!GenerateCallerInfo(callerInfo)) {
829         return GET_LOCAL_DEVICE_ERR;
830     }
831     std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
832     for (const auto& destDeviceId : remoteSyncDeviceSet_) {
833         auto handler = FetchDeviceHandler(destDeviceId);
834         if (handler == nullptr) {
835             HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
836             continue;
837         }
838         auto callback = [destDeviceId, missionInfos, callerInfo, this] () {
839             NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfos, callerInfo);
840         };
841         if (!handler->PostTask(callback)) {
842             HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
843             return ERR_NULL_OBJECT;
844         }
845     }
846 
847     return ERR_NONE;
848 }
849 
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos,const CallerInfo & callerInfo)850 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
851     const std::vector<DstbMissionInfo>& missionInfos, const CallerInfo& callerInfo)
852 {
853     sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
854     if (remoteDms == nullptr) {
855         HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
856         return;
857     }
858     int64_t begin = GetTickCount();
859     int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfos, callerInfo);
860     HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
861         result, GetTickCount() - begin);
862 }
863 
GenerateCallerInfo(CallerInfo & callerInfo)864 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
865 {
866     std::string localUuid;
867     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
868         HILOGE("get local uuid failed!");
869         return false;
870     }
871     callerInfo.uid = IPCSkeleton::GetCallingUid();
872     callerInfo.pid = IPCSkeleton::GetCallingPid();
873     callerInfo.callerType = CALLER_TYPE_HARMONY;
874     callerInfo.sourceDeviceId = localUuid;
875     callerInfo.dmsVersion = VERSION;
876     return true;
877 }
878 
FetchDeviceHandler(const std::string & deviceId)879 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
880     const std::string& deviceId)
881 {
882     if (!IsDeviceIdValidated(deviceId)) {
883         HILOGW("FetchDeviceHandler device:%{public}s offline.",
884             DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
885         return nullptr;
886     }
887 
888     std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
889     if (uuid.empty()) {
890         HILOGE("FetchDeviceHandler uuid empty!");
891         return nullptr;
892     }
893 
894     auto iter = deviceHandle_.find(uuid);
895     if (iter != deviceHandle_.end()) {
896         return iter->second;
897     }
898 
899     auto anonyUuid = DnetworkAdapter::AnonymizeNetworkId(uuid);
900     auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
901     auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
902     deviceHandle_.emplace(uuid, handler);
903     return handler;
904 }
905 
OnRemoteDied(const wptr<IRemoteObject> & remote)906 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
907 {
908     HILOGI("OnRemoteDied received died notify!");
909     DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
910 }
911 
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)912 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
913 {
914     sptr<IRemoteObject> diedRemoted = remote.promote();
915     if (diedRemoted == nullptr) {
916         HILOGW("OnRemoteDmsDied promote failed!");
917         return;
918     }
919     HILOGD("delete diedRemoted");
920     auto remoteDmsDiedFunc = [this, diedRemoted]() {
921         OnRemoteDmsDied(diedRemoted);
922     };
923     if (missionHandler_ != nullptr) {
924         missionHandler_->PostTask(remoteDmsDiedFunc);
925     }
926 }
927 
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)928 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
929     const std::string& localDevId, int32_t retryTimes)
930 {
931     auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
932         bool ret = HasSyncListener(dstDeviceId);
933         if (!ret) {
934             return;
935         }
936         sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
937         if (remoteDms == nullptr) {
938             HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
939             RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
940             return;
941         }
942         int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
943         HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
944     };
945     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
946         missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
947     }
948 }
949 
OnMissionListenerDied(const sptr<IRemoteObject> & remote)950 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
951 {
952     HILOGI("OnMissionListenerDied");
953     std::set<std::string> deviceSet;
954     {
955         std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
956         auto iterItem = listenDeviceMap_.begin();
957         while (iterItem != listenDeviceMap_.end()) {
958             auto& listenerInfo = iterItem->second;
959             auto ret = listenerInfo.Find(remote);
960             if (!ret) {
961                 ++iterItem;
962                 continue;
963             }
964             remote->RemoveDeathRecipient(listenerDeath_);
965             listenerInfo.Erase(remote);
966             if (listenerInfo.Empty()) {
967                 if (listenerInfo.called) {
968                     deviceSet.emplace(Str16ToStr8(iterItem->first));
969                 }
970                 iterItem = listenDeviceMap_.erase(iterItem);
971             } else {
972                 ++iterItem;
973             }
974         }
975     }
976     for (auto& devId : deviceSet) {
977         StopSyncRemoteMissions(devId, false);
978     }
979 }
980 
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)981 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
982 {
983     HILOGI("OnRemoteDmsDied");
984     std::string devId;
985     {
986         std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
987         for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
988             if (iter->second->AsObject() == remote) {
989                 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
990                 devId = iter->first;
991                 remoteDmsMap_.erase(iter);
992                 break;
993             }
994         }
995     }
996     if (devId.empty()) {
997         return;
998     }
999     bool ret = HasSyncListener(devId);
1000     if (ret) {
1001         std::string localDeviceId;
1002         if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1003             return;
1004         }
1005         RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1006     }
1007 }
1008 
NotifyDmsProxyProcessDied()1009 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1010 {
1011     HILOGI("NotifyDmsProxyProcessDied!");
1012     if (!isRegMissionChange_) {
1013         return;
1014     }
1015     RetryRegisterMissionChange(0);
1016 }
1017 
RetryRegisterMissionChange(int32_t retryTimes)1018 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1019 {
1020     auto remoteDiedFunc = [this, retryTimes]() {
1021         HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1022         if (!isRegMissionChange_) {
1023             return;
1024         }
1025         int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1026         if (ret == ERR_NULL_OBJECT) {
1027             RetryRegisterMissionChange(retryTimes + 1);
1028             HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1029             return;
1030         }
1031         HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1032     };
1033     if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1034         missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1035     }
1036 }
1037 
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfos)1038 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfos)
1039 {
1040     for (auto iter = missionInfos.begin(); iter != missionInfos.end(); iter++) {
1041         ErrCode errCode = MissionSnapshotChanged(iter->id);
1042         if (errCode != ERR_OK) {
1043             HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1044         }
1045     }
1046 }
1047 
MissionSnapshotChanged(int32_t missionId)1048 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1049 {
1050     std::string networkId;
1051     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1052         HILOGE("get local networkId failed!");
1053         return INVALID_PARAMETERS_ERR;
1054     }
1055     AAFwk::MissionSnapshot missionSnapshot;
1056     ErrCode errCode = DistributedSchedAdapter::GetInstance()
1057         .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1058     if (errCode != ERR_OK) {
1059         HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1060         return errCode;
1061     }
1062     Snapshot snapshot;
1063     SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1064     MessageParcel data;
1065     errCode = MissionSnapshotSequence(snapshot, data);
1066     if (errCode != ERR_OK) {
1067         HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1068         return errCode;
1069     }
1070     size_t len = data.GetReadableBytes();
1071     const uint8_t* byteStream = data.ReadBuffer(len);
1072     errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1073     return errCode;
1074 }
1075 
MissionSnapshotDestroyed(int32_t missionId)1076 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1077 {
1078     std::string networkId;
1079     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1080         HILOGE("get local networkId failed!");
1081         return INVALID_PARAMETERS_ERR;
1082     }
1083     ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1084     return errCode;
1085 }
1086 
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1087 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1088 {
1089     bool ret = snapshot.WriteSnapshotInfo(data);
1090     if (!ret) {
1091         HILOGE("WriteSnapshotInfo failed!");
1092         return ERR_FLATTEN_OBJECT;
1093     }
1094     ret = snapshot.WritePixelMap(data);
1095     if (!ret) {
1096         HILOGE("WritePixelMap failed!");
1097         return ERR_FLATTEN_OBJECT;
1098     }
1099     return ERR_OK;
1100 }
1101 
OnDnetDied()1102 void DistributedSchedMissionManager::OnDnetDied()
1103 {
1104     auto dnetDiedFunc = [this]() {
1105         HILOGI("OnDnetDied");
1106         std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1107         if (!isRegMissionChange_) {
1108             return;
1109         }
1110         remoteSyncDeviceSet_.clear();
1111         DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1112         isRegMissionChange_ = false;
1113     };
1114     if (missionHandler_ != nullptr) {
1115         missionHandler_->PostTask(dnetDiedFunc);
1116     }
1117 }
1118 } // namespace DistributedSchedule
1119 } // namespace OHOS
1120