1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_sched_mission_manager.h"
17
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21
22 #include "datetime_ex.h"
23 #include "distributed_sched_adapter.h"
24 #include "dtbschedmgr_device_info_storage.h"
25 #include "dtbschedmgr_log.h"
26 #include "ipc_skeleton.h"
27 #include "iservice_registry.h"
28 #include "mission/mission_changed_notify.h"
29 #include "mission/mission_constant.h"
30 #include "mission/mission_info_converter.h"
31 #include "mission/snapshot_converter.h"
32 #include "nlohmann/json.hpp"
33 #include "string_ex.h"
34 #include "system_ability_definition.h"
35
36 namespace OHOS {
37 namespace DistributedSchedule {
38 namespace {
39 const std::string TAG = "DistributedSchedMissionManager";
40 constexpr size_t MAX_CACHED_ITEM = 10;
41 constexpr int32_t MAX_RETRY_TIMES = 15;
42 constexpr int32_t RETRY_DELAYED = 2000;
43 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
44 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
45 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
46 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
47 }
48 namespace Mission {
49 constexpr int32_t GET_MAX_MISSIONS = 20;
50 } // Mission
51 using namespace std::chrono;
52 using namespace Constants::Mission;
53 using namespace OHOS::DistributedKv;
54
55 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
56
Init()57 void DistributedSchedMissionManager::Init()
58 {
59 listenerDeath_ = new ListenerDeathRecipient();
60 remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
61 auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
62 missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
63 auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
64 updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
65 missonChangeListener_ = new DistributedMissionChangeListener();
66 auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
67 missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
68 }
69
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfos)70 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
71 int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfos)
72 {
73 HILOGI("start!");
74 if (!IsDeviceIdValidated(deviceId)) {
75 return INVALID_PARAMETERS_ERR;
76 }
77 if (numMissions <= 0) {
78 HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
79 return INVALID_PARAMETERS_ERR;
80 }
81 std::vector<DstbMissionInfo> dstbMissionInfos;
82 int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfos);
83 if (ret != ERR_OK) {
84 HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
85 return ret;
86 }
87 return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfos, missionInfos);
88 }
89
GetRemoteDms(const std::string & deviceId)90 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
91 {
92 int64_t begin = GetTickCount();
93 if (deviceId.empty()) {
94 HILOGE("GetRemoteDms remoteDeviceId is empty");
95 return nullptr;
96 }
97 {
98 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
99 auto iter = remoteDmsMap_.find(deviceId);
100 if (iter != remoteDmsMap_.end()) {
101 auto object = iter->second;
102 if (object != nullptr) {
103 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
104 GetTickCount() - begin);
105 return object;
106 }
107 }
108 }
109 HILOGD("GetRemoteDms connect deviceid is %s", deviceId.c_str());
110 auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
111 if (samgr == nullptr) {
112 HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
113 return nullptr;
114 }
115 auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
116 if (object == nullptr) {
117 HILOGE("GetRemoteDms failed to get dms for remote device:%{public}s!",
118 DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
119 return nullptr;
120 }
121 auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
122 HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
123 sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
124 {
125 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
126 auto iter = remoteDmsMap_.find(deviceId);
127 if (iter != remoteDmsMap_.end()) {
128 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
129 }
130 remoteDmsMap_[deviceId] = remoteDmsObj;
131 }
132 HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
133 return remoteDmsObj;
134 }
135
IsDeviceIdValidated(const std::string & deviceId)136 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
137 {
138 if (deviceId.empty()) {
139 HILOGE("IsDeviceIdValidated deviceId is empty!");
140 return false;
141 }
142 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
143 HILOGW("IsDeviceIdValidated device offline.");
144 return false;
145 }
146
147 return true;
148 }
149
NotifyRemoteDied(const wptr<IRemoteObject> & remote)150 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
151 {
152 if (distributedDataStorage_ == nullptr) {
153 HILOGE("DistributedDataStorage null!");
154 return;
155 }
156 distributedDataStorage_->NotifyRemoteDied(remote);
157 }
158
InitDataStorage()159 int32_t DistributedSchedMissionManager::InitDataStorage()
160 {
161 if (distributedDataStorage_ == nullptr) {
162 distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
163 }
164 if (!distributedDataStorage_->Init()) {
165 HILOGE("InitDataStorage DistributedDataStorage init failed!");
166 return ERR_NULL_OBJECT;
167 }
168 return ERR_NONE;
169 }
170
StopDataStorage()171 int32_t DistributedSchedMissionManager::StopDataStorage()
172 {
173 if (distributedDataStorage_ == nullptr) {
174 HILOGE("StopDataStorage DistributedDataStorage null!");
175 return ERR_NULL_OBJECT;
176 }
177 if (!distributedDataStorage_->Stop()) {
178 HILOGE("StopDataStorage DistributedDataStorage stop failed!");
179 return ERR_NULL_OBJECT;
180 }
181 return ERR_NONE;
182 }
183
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)184 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
185 const uint8_t* byteStream, size_t len)
186 {
187 if (distributedDataStorage_ == nullptr) {
188 HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
189 return ERR_NULL_OBJECT;
190 }
191 if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
192 HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
193 return INVALID_PARAMETERS_ERR;
194 }
195 return ERR_NONE;
196 }
197
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)198 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
199 {
200 if (distributedDataStorage_ == nullptr) {
201 HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
202 return ERR_NULL_OBJECT;
203 }
204 if (!distributedDataStorage_->Delete(deviceId, missionId)) {
205 HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
206 return INVALID_PARAMETERS_ERR;
207 }
208 return ERR_NONE;
209 }
210
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)211 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
212 std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
213 {
214 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
215 if (uuid.empty()) {
216 HILOGE("uuid is empty!");
217 return INVALID_PARAMETERS_ERR;
218 }
219 std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
220 if (snapshotPtr != nullptr) {
221 HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from cache successful!",
222 DnetworkAdapter::AnonymizeDeviceId(uuid).c_str(), missionId);
223 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
224 return ERR_NONE;
225 }
226 if (distributedDataStorage_ == nullptr) {
227 HILOGE("DistributedDataStorage null!");
228 return ERR_NULL_OBJECT;
229 }
230 DistributedKv::Value value;
231 bool ret = distributedDataStorage_->Query(networkId, missionId, value);
232 if (!ret) {
233 HILOGE("DistributedDataStorage query failed!");
234 return INVALID_PARAMETERS_ERR;
235 }
236 snapshotPtr = Snapshot::Create(value.Data());
237 if (snapshotPtr == nullptr) {
238 HILOGE("snapshot create failed!");
239 return ERR_NULL_OBJECT;
240 }
241 HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from DistributedDB successful!",
242 DnetworkAdapter::AnonymizeDeviceId(uuid).c_str(), missionId);
243 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
244 return ERR_NONE;
245 }
246
DeviceOnlineNotify(const std::string & deviceId)247 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& deviceId)
248 {
249 if (deviceId.empty()) {
250 HILOGW("DeviceOnlineNotify deviceId empty!");
251 return;
252 }
253
254 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
255 if (missionHandler_ != nullptr) {
256 HILOGI("DeviceOnlineNotify RemoveTask");
257 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
258 }
259 }
260
DeviceOfflineNotify(const std::string & deviceId)261 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& deviceId)
262 {
263 if (deviceId.empty()) {
264 HILOGW("DeviceOfflineNotify deviceId empty!");
265 return;
266 }
267 StopSyncMissionsFromRemote(deviceId);
268 CleanMissionResources(deviceId);
269 {
270 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
271 auto iter = remoteDmsMap_.find(deviceId);
272 if (iter != remoteDmsMap_.end()) {
273 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
274 remoteDmsMap_.erase(iter);
275 }
276 }
277 HILOGI("DeviceOfflineNotify erase value for deviceId: %{public}s",
278 DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
279 }
280
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283 if (distributedDataStorage_ == nullptr) {
284 HILOGE("DeleteDataStorage DistributedDataStorage null!");
285 return;
286 }
287 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288 auto callback = [this, uuid, deviceId]() {
289 if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290 HILOGE("DeleteDataStorage storage delete failed!");
291 } else {
292 HILOGI("DeleteDataStorage storage delete successfully!");
293 }
294 };
295 if (isDelayed) {
296 if (missionHandler_ != nullptr) {
297 HILOGI("DeleteDataStorage PostTask");
298 missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299 }
300 } else {
301 if (missionHandler_ != nullptr) {
302 HILOGI("DeleteDataStorage RemoveTask");
303 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304 }
305 callback();
306 }
307 }
308
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310 const sptr<IRemoteObject>& listener)
311 {
312 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313 if (missionHandler_ != nullptr) {
314 HILOGI("RemoveTask");
315 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
316 }
317 if (listener == nullptr) {
318 return INVALID_PARAMETERS_ERR;
319 }
320 std::string localDeviceId;
321 std::string remoteDeviceId = Str16ToStr8(devId);
322 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
323 || localDeviceId == remoteDeviceId) {
324 HILOGE("check deviceId failed!");
325 return INVALID_PARAMETERS_ERR;
326 }
327 {
328 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
329 auto& listenerInfo = listenDeviceMap_[devId];
330 if (!listenerInfo.Emplace(listener)) {
331 HILOGW("RegisterSyncListener listener has already inserted!");
332 return ERR_NONE;
333 }
334 bool ret = listener->AddDeathRecipient(listenerDeath_);
335 if (!ret) {
336 HILOGW("RegisterSyncListener AddDeathRecipient failed!");
337 }
338 if (listenerInfo.Size() > 1) {
339 HILOGI("RegisterMissionListener not notify remote DMS!");
340 return ERR_NONE;
341 }
342 }
343 return ERR_NONE;
344 }
345
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)346 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
347 const std::string& localDevId)
348 {
349 std::u16string devId = Str8ToStr16(dstDevId);
350 {
351 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
352 auto iterItem = listenDeviceMap_.find(devId);
353 if (iterItem == listenDeviceMap_.end()) {
354 return ERR_NONE;
355 }
356 bool callFlag = iterItem->second.called;
357 if (callFlag) {
358 HILOGI("StartSyncRemoteMissions already called!");
359 return ERR_NONE;
360 }
361 }
362 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
363 if (remoteDms == nullptr) {
364 HILOGE("get remoteDms failed!");
365 RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
366 return GET_REMOTE_DMS_FAIL;
367 }
368 int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
369 if (ret == ERR_NONE) {
370 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
371 auto iterItem = listenDeviceMap_.find(devId);
372 if (iterItem != listenDeviceMap_.end()) {
373 iterItem->second.called = true;
374 }
375 }
376 return ret;
377 }
378
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)379 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
380 const sptr<IDistributedSched>& remoteDms)
381 {
382 std::vector<DstbMissionInfo> missionInfos;
383 CallerInfo callerInfo;
384 if (!GenerateCallerInfo(callerInfo)) {
385 return GET_LOCAL_DEVICE_ERR;
386 }
387 int64_t begin = GetTickCount();
388 int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
389 HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
390 ret, GetTickCount() - begin);
391 if (ret == ERR_NONE) {
392 RebornMissionCache(dstDevId, missionInfos);
393 }
394 return ret;
395 }
396
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)397 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
398 const sptr<IRemoteObject>& listener)
399 {
400 if (listener == nullptr) {
401 return INVALID_PARAMETERS_ERR;
402 }
403 std::string localDeviceId;
404 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
405 || localDeviceId == Str16ToStr8(devId)) {
406 HILOGE("check deviceId fail");
407 return INVALID_PARAMETERS_ERR;
408 }
409 {
410 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
411 auto iterItem = listenDeviceMap_.find(devId);
412 if (iterItem == listenDeviceMap_.end()) {
413 return ERR_NONE;
414 }
415 auto& listenerInfo = iterItem->second;
416 auto ret = listenerInfo.Find(listener);
417 if (!ret) {
418 HILOGI("listener not registered!");
419 return ERR_NONE;
420 }
421 listener->RemoveDeathRecipient(listenerDeath_);
422 listenerInfo.Erase(listener);
423 if (!listenerInfo.Empty()) {
424 return ERR_NONE;
425 }
426 listenDeviceMap_.erase(iterItem);
427 }
428 return ERR_NONE;
429 }
430
CleanMissionResources(const std::string & dstDevId)431 void DistributedSchedMissionManager::CleanMissionResources(const std::string& dstDevId)
432 {
433 {
434 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
435 auto iterDevice = listenDeviceMap_.find(Str8ToStr16(dstDevId));
436 if (iterDevice == listenDeviceMap_.end()) {
437 return;
438 }
439 auto& listenerInfo = iterDevice->second;
440 for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
441 if (listener != nullptr) {
442 listener->RemoveDeathRecipient(listenerDeath_);
443 }
444 }
445 listenDeviceMap_.erase(iterDevice);
446 }
447 StopSyncRemoteMissions(dstDevId, true);
448 }
449
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)450 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
451 bool offline, bool exit)
452 {
453 CleanMissionCache(dstDevId);
454 DeleteCachedSnapshotInfo(dstDevId);
455 DeleteDataStorage(dstDevId, true);
456
457 if (offline) {
458 return ERR_NONE;
459 }
460 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
461 if (remoteDms == nullptr) {
462 HILOGE("DMS get remoteDms failed");
463 return GET_REMOTE_DMS_FAIL;
464 }
465
466 CallerInfo callerInfo;
467 if (!GenerateCallerInfo(callerInfo)) {
468 return GET_LOCAL_DEVICE_ERR;
469 }
470 int64_t begin = GetTickCount();
471 int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
472 HILOGI("[PerformanceTest] ret:%d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
473 return ret;
474 }
475
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)476 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
477 int64_t tag)
478 {
479 std::string localDeviceId;
480 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
481 || (dstDevId == localDeviceId)) {
482 HILOGE("check deviceId fail");
483 return INVALID_PARAMETERS_ERR;
484 }
485 HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
486 DnetworkAdapter::AnonymizeDeviceId(dstDevId).c_str(),
487 DnetworkAdapter::AnonymizeDeviceId(localDeviceId).c_str());
488 auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
489 if (ret != ERR_NONE) {
490 HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
491 return ret;
492 }
493 return ERR_NONE;
494 }
495
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfos)496 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
497 std::vector<DstbMissionInfo>& missionInfos)
498 {
499 auto deviceId = callerInfo.sourceDeviceId;
500 HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
501 {
502 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
503 remoteSyncDeviceSet_.emplace(deviceId);
504 }
505 int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
506 missionInfos);
507 auto func = [this, missionInfos]() {
508 HILOGD("RegisterMissionListener called.");
509 if (!isRegMissionChange_) {
510 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
511 if (ret == ERR_OK) {
512 isRegMissionChange_ = true;
513 }
514 InitAllSnapshots(missionInfos);
515 }
516 };
517 if (!missionHandler_->PostTask(func)) {
518 HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
519 }
520 return result;
521 }
522
StopSyncMissionsFromRemote(const std::string & deviceId)523 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& deviceId)
524 {
525 HILOGD(" %{private}s!", deviceId.c_str());
526 {
527 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
528 remoteSyncDeviceSet_.erase(deviceId);
529 if (remoteSyncDeviceSet_.empty()) {
530 auto func = [this]() {
531 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
532 if (ret == ERR_OK) {
533 isRegMissionChange_ = false;
534 }
535 };
536 if (!missionHandler_->PostTask(func)) {
537 HILOGE("post UnRegisterMissionListener Task failed");
538 }
539 }
540 }
541 }
542
NeedSyncDevice(const std::string & deviceId)543 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
544 {
545 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
546 if (remoteSyncDeviceSet_.count(deviceId) == 0) {
547 return false;
548 }
549 return true;
550 }
551
HasSyncListener(const std::string & networkId)552 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
553 {
554 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
555 auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
556 if (iter != listenDeviceMap_.end()) {
557 return iter->second.called;
558 }
559 return false;
560 }
561
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)562 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
563 {
564 std::u16string u16DevId = Str8ToStr16(networkId);
565 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
566 auto iter = listenDeviceMap_.find(u16DevId);
567 if (iter == listenDeviceMap_.end()) {
568 return;
569 }
570 auto& listenerInfo = iter->second;
571 for (auto& listener : listenerInfo.listenerSet) {
572 MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
573 }
574 }
575
OnRemoteDied(const wptr<IRemoteObject> & remote)576 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
577 {
578 HILOGD("OnRemoteDied!");
579 sptr<IRemoteObject> listener = remote.promote();
580 if (listener == nullptr) {
581 return;
582 }
583 auto remoteDiedFunc = [this, listener]() {
584 OnMissionListenerDied(listener);
585 };
586 if (missionHandler_ != nullptr) {
587 missionHandler_->PostTask(remoteDiedFunc);
588 }
589 }
590
OnRemoteDied(const wptr<IRemoteObject> & remote)591 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
592 {
593 DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
594 }
595
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)596 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
597 std::unique_ptr<Snapshot> snapshot)
598 {
599 if (deviceId.empty() || snapshot == nullptr) {
600 HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
601 return;
602 }
603 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
604 std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
605 auto iter = cachedSnapshotInfos_.find(keyInfo);
606 if (iter != cachedSnapshotInfos_.end()) {
607 if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
608 return;
609 }
610 }
611
612 if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
613 int64_t oldest = -1;
614 auto iterOldest = cachedSnapshotInfos_.end();
615 for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
616 if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
617 oldest = iterItem->second->GetLastAccessTime();
618 iterOldest = iterItem;
619 }
620 }
621 if (iterOldest != cachedSnapshotInfos_.end()) {
622 cachedSnapshotInfos_.erase(iterOldest);
623 }
624 }
625 cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
626 }
627
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)628 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
629 int32_t missionId)
630 {
631 if (deviceId.empty()) {
632 HILOGW("DequeueCachedSnapshotInfo invalid input param!");
633 return nullptr;
634 }
635 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
636 auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
637 if (iter != cachedSnapshotInfos_.end()) {
638 std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
639 snapshot->UpdateLastAccessTime(GetTickCount());
640 iter->second = nullptr;
641 cachedSnapshotInfos_.erase(iter);
642 return snapshot;
643 }
644 return nullptr;
645 }
646
DeleteCachedSnapshotInfo(const std::string & networkId)647 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
648 {
649 if (networkId.empty()) {
650 return;
651 }
652 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
653 if (uuid.empty()) {
654 return;
655 }
656 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
657 auto iter = cachedSnapshotInfos_.begin();
658 while (iter != cachedSnapshotInfos_.end()) {
659 if (iter->first.find(uuid) != std::string::npos) {
660 iter = cachedSnapshotInfos_.erase(iter);
661 } else {
662 ++iter;
663 }
664 }
665 }
666
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfos)667 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
668 std::vector<DstbMissionInfo>& missionInfos)
669 {
670 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
671 if (uuid.empty()) {
672 HILOGE("uuid empty!");
673 return INVALID_PARAMETERS_ERR;
674 }
675 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
676 auto iter = deviceMissionInfos_.find(uuid);
677 if (iter == deviceMissionInfos_.end()) {
678 HILOGE("can not find uuid, deviceId: %{public}s!",
679 DnetworkAdapter::AnonymizeDeviceId(srcId).c_str());
680 return ERR_NULL_OBJECT;
681 }
682
683 // get at most numMissions missions
684 int32_t actualNums = static_cast<int32_t>((iter->second).size());
685 if (actualNums < 0) {
686 HILOGE("invalid size!");
687 return ERR_NULL_OBJECT;
688 }
689 missionInfos.assign((iter->second).begin(),
690 (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
691 return ERR_NONE;
692 }
693
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos)694 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
695 const std::vector<DstbMissionInfo>& missionInfos)
696 {
697 HILOGI("start! deviceId is %{public}s",
698 DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
699 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
700 if (uuid.empty()) {
701 HILOGE("uuid empty!");
702 return;
703 }
704 {
705 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
706 deviceMissionInfos_[uuid] = missionInfos;
707 }
708 HILOGI("RebornMissionCache end!");
709 }
710
CleanMissionCache(const std::string & deviceId)711 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
712 {
713 HILOGI("CleanMissionCache start! deviceId is %{public}s",
714 DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
715 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
716 if (uuid.empty()) {
717 HILOGE("CleanMissionCache uuid empty!");
718 return;
719 }
720 {
721 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
722 deviceMissionInfos_.erase(uuid);
723 }
724 HILOGI("CleanMissionCache end!");
725 }
726
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfos)727 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
728 const std::vector<DstbMissionInfo>& missionInfos)
729 {
730 HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
731 std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
732 RebornMissionCache(callerInfo.sourceDeviceId, missionInfos);
733 {
734 HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
735 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
736 auto iter = listenDeviceMap_.find(u16DevId);
737 if (iter == listenDeviceMap_.end()) {
738 HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
739 return INVALID_PARAMETERS_ERR;
740 }
741 auto& listenerSet = iter->second.listenerSet;
742 auto notifyChanged = [listenerSet, u16DevId] () {
743 for (const auto& listener : listenerSet) {
744 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
745 }
746 };
747 if (missionHandler_ != nullptr) {
748 missionHandler_->PostTask(notifyChanged);
749 HILOGI("NotifyMissionsChangedFromRemote end!");
750 return ERR_NONE;
751 }
752 }
753 return INVALID_PARAMETERS_ERR;
754 }
755
NotifyLocalMissionsChanged()756 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
757 {
758 auto func = [this]() {
759 HILOGI("NotifyLocalMissionsChanged");
760 std::vector<DstbMissionInfo> missionInfos;
761 int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
762 missionInfos);
763 if (ret == ERR_OK) {
764 int32_t result = NotifyMissionsChangedToRemote(missionInfos);
765 HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
766 }
767 };
768 if (!missionChangeHandler_->PostTask(func)) {
769 HILOGE("postTask failed");
770 }
771 }
772
NotifyMissionSnapshotCreated(int32_t missionId)773 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
774 {
775 auto func = [this, missionId]() {
776 HILOGD("called.");
777 ErrCode errCode = MissionSnapshotChanged(missionId);
778 if (errCode != ERR_OK) {
779 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
780 }
781 };
782 if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
783 HILOGE("post MissionSnapshotChanged delay Task failed");
784 }
785 }
786
NotifyMissionSnapshotChanged(int32_t missionId)787 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
788 {
789 auto func = [this, missionId]() {
790 HILOGD("called.");
791 ErrCode errCode = MissionSnapshotChanged(missionId);
792 if (errCode != ERR_OK) {
793 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
794 }
795 };
796 if (!missionChangeHandler_->PostTask(func)) {
797 HILOGE("post MissionSnapshotChanged Task failed");
798 }
799 }
800
NotifyMissionSnapshotDestroyed(int32_t missionId)801 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
802 {
803 auto func = [this, missionId]() {
804 HILOGD("called.");
805 ErrCode errCode = MissionSnapshotDestroyed(missionId);
806 if (errCode != ERR_OK) {
807 HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
808 }
809 };
810 if (!missionChangeHandler_->PostTask(func)) {
811 HILOGE("post MissionSnapshotDestroyed Task failed");
812 }
813 }
814
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfos)815 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo>& missionInfos)
816 {
817 CallerInfo callerInfo;
818 if (!GenerateCallerInfo(callerInfo)) {
819 return GET_LOCAL_DEVICE_ERR;
820 }
821 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
822 for (const auto& destDeviceId : remoteSyncDeviceSet_) {
823 auto handler = FetchDeviceHandler(destDeviceId);
824 if (handler == nullptr) {
825 HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
826 continue;
827 }
828 auto callback = [destDeviceId, missionInfos, callerInfo, this] () {
829 NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfos, callerInfo);
830 };
831 if (!handler->PostTask(callback)) {
832 HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
833 return ERR_NULL_OBJECT;
834 }
835 }
836
837 return ERR_NONE;
838 }
839
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos,const CallerInfo & callerInfo)840 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
841 const std::vector<DstbMissionInfo>& missionInfos, const CallerInfo& callerInfo)
842 {
843 sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
844 if (remoteDms == nullptr) {
845 HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
846 return;
847 }
848 int64_t begin = GetTickCount();
849 int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfos, callerInfo);
850 HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
851 result, GetTickCount() - begin);
852 }
853
GenerateCallerInfo(CallerInfo & callerInfo)854 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
855 {
856 std::string localUuid;
857 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
858 HILOGE("get local uuid failed!");
859 return false;
860 }
861 callerInfo.uid = IPCSkeleton::GetCallingUid();
862 callerInfo.pid = IPCSkeleton::GetCallingPid();
863 callerInfo.callerType = CALLER_TYPE_HARMONY;
864 callerInfo.sourceDeviceId = localUuid;
865 callerInfo.dmsVersion = VERSION;
866 return true;
867 }
868
FetchDeviceHandler(const std::string & deviceId)869 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
870 const std::string& deviceId)
871 {
872 if (!IsDeviceIdValidated(deviceId)) {
873 HILOGW("FetchDeviceHandler device:%{public}s offline.",
874 DnetworkAdapter::AnonymizeDeviceId(deviceId).c_str());
875 return nullptr;
876 }
877
878 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
879 if (uuid.empty()) {
880 HILOGE("FetchDeviceHandler uuid empty!");
881 return nullptr;
882 }
883
884 auto iter = deviceHandle_.find(uuid);
885 if (iter != deviceHandle_.end()) {
886 return iter->second;
887 }
888
889 auto anonyUuid = DnetworkAdapter::AnonymizeDeviceId(uuid);
890 auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
891 auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
892 deviceHandle_.emplace(uuid, handler);
893 return handler;
894 }
895
OnRemoteDied(const wptr<IRemoteObject> & remote)896 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
897 {
898 HILOGI("OnRemoteDied received died notify!");
899 DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
900 }
901
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)902 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
903 {
904 sptr<IRemoteObject> diedRemoted = remote.promote();
905 if (diedRemoted == nullptr) {
906 HILOGW("OnRemoteDmsDied promote failed!");
907 return;
908 }
909 HILOGD("delete diedRemoted");
910 auto remoteDmsDiedFunc = [this, diedRemoted]() {
911 OnRemoteDmsDied(diedRemoted);
912 };
913 if (missionHandler_ != nullptr) {
914 missionHandler_->PostTask(remoteDmsDiedFunc);
915 }
916 }
917
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)918 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
919 const std::string& localDevId, int32_t retryTimes)
920 {
921 auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
922 bool ret = HasSyncListener(dstDeviceId);
923 if (!ret) {
924 return;
925 }
926 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
927 if (remoteDms == nullptr) {
928 HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
929 RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
930 return;
931 }
932 int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
933 HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
934 };
935 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
936 missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
937 }
938 }
939
OnMissionListenerDied(const sptr<IRemoteObject> & remote)940 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
941 {
942 HILOGI("OnMissionListenerDied");
943 std::set<std::string> deviceSet;
944 {
945 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
946 auto iterItem = listenDeviceMap_.begin();
947 while (iterItem != listenDeviceMap_.end()) {
948 auto& listenerInfo = iterItem->second;
949 auto ret = listenerInfo.Find(remote);
950 if (!ret) {
951 ++iterItem;
952 continue;
953 }
954 remote->RemoveDeathRecipient(listenerDeath_);
955 listenerInfo.Erase(remote);
956 if (listenerInfo.Empty()) {
957 if (listenerInfo.called) {
958 deviceSet.emplace(Str16ToStr8(iterItem->first));
959 }
960 iterItem = listenDeviceMap_.erase(iterItem);
961 } else {
962 ++iterItem;
963 }
964 }
965 }
966 for (auto& devId : deviceSet) {
967 StopSyncRemoteMissions(devId, false);
968 }
969 }
970
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)971 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
972 {
973 HILOGI("OnRemoteDmsDied");
974 std::string devId;
975 {
976 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
977 for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
978 if (iter->second->AsObject() == remote) {
979 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
980 devId = iter->first;
981 remoteDmsMap_.erase(iter);
982 break;
983 }
984 }
985 }
986 if (devId.empty()) {
987 return;
988 }
989 bool ret = HasSyncListener(devId);
990 if (ret) {
991 std::string localDeviceId;
992 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
993 return;
994 }
995 RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
996 }
997 }
998
NotifyDmsProxyProcessDied()999 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1000 {
1001 HILOGI("NotifyDmsProxyProcessDied!");
1002 if (!isRegMissionChange_) {
1003 return;
1004 }
1005 RetryRegisterMissionChange(0);
1006 }
1007
RetryRegisterMissionChange(int32_t retryTimes)1008 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1009 {
1010 auto remoteDiedFunc = [this, retryTimes]() {
1011 HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1012 if (!isRegMissionChange_) {
1013 return;
1014 }
1015 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1016 if (ret == ERR_NULL_OBJECT) {
1017 RetryRegisterMissionChange(retryTimes + 1);
1018 HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1019 return;
1020 }
1021 HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1022 };
1023 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1024 missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1025 }
1026 }
1027
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfos)1028 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfos)
1029 {
1030 for (auto iter = missionInfos.begin(); iter != missionInfos.end(); iter++) {
1031 ErrCode errCode = MissionSnapshotChanged(iter->id);
1032 if (errCode != ERR_OK) {
1033 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1034 }
1035 }
1036 }
1037
MissionSnapshotChanged(int32_t missionId)1038 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1039 {
1040 std::string networkId;
1041 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1042 HILOGE("get local networkId failed!");
1043 return INVALID_PARAMETERS_ERR;
1044 }
1045 AAFwk::MissionSnapshot missionSnapshot;
1046 ErrCode errCode = DistributedSchedAdapter::GetInstance()
1047 .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1048 if (errCode != ERR_OK) {
1049 HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1050 return errCode;
1051 }
1052 Snapshot snapshot;
1053 SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1054 MessageParcel data;
1055 errCode = MissionSnapshotSequence(snapshot, data);
1056 if (errCode != ERR_OK) {
1057 HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1058 return errCode;
1059 }
1060 size_t len = data.GetReadableBytes();
1061 const uint8_t* byteStream = data.ReadBuffer(len);
1062 errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1063 return errCode;
1064 }
1065
MissionSnapshotDestroyed(int32_t missionId)1066 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1067 {
1068 std::string networkId;
1069 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1070 HILOGE("get local networkId failed!");
1071 return INVALID_PARAMETERS_ERR;
1072 }
1073 ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1074 return errCode;
1075 }
1076
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1077 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1078 {
1079 bool ret = snapshot.WriteSnapshotInfo(data);
1080 if (!ret) {
1081 HILOGE("WriteSnapshotInfo failed!");
1082 return ERR_FLATTEN_OBJECT;
1083 }
1084 ret = snapshot.WritePixelMap(data);
1085 if (!ret) {
1086 HILOGE("WritePixelMap failed!");
1087 return ERR_FLATTEN_OBJECT;
1088 }
1089 return ERR_OK;
1090 }
1091
OnDnetDied()1092 void DistributedSchedMissionManager::OnDnetDied()
1093 {
1094 auto dnetDiedFunc = [this]() {
1095 HILOGI("OnDnetDied");
1096 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1097 if (!isRegMissionChange_) {
1098 return;
1099 }
1100 remoteSyncDeviceSet_.clear();
1101 DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1102 isRegMissionChange_ = false;
1103 };
1104 if (missionHandler_ != nullptr) {
1105 missionHandler_->PostTask(dnetDiedFunc);
1106 }
1107 }
1108 } // namespace DistributedSchedule
1109 } // namespace OHOS
1110