1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_sched_mission_manager.h"
17
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21
22 #include "datetime_ex.h"
23 #include "distributed_sched_adapter.h"
24 #include "dtbschedmgr_device_info_storage.h"
25 #include "dtbschedmgr_log.h"
26 #include "ipc_skeleton.h"
27 #include "iservice_registry.h"
28 #include "mission/mission_changed_notify.h"
29 #include "mission/mission_constant.h"
30 #include "mission/mission_info_converter.h"
31 #include "mission/snapshot_converter.h"
32 #include "nlohmann/json.hpp"
33 #include "string_ex.h"
34 #include "system_ability_definition.h"
35
36 namespace OHOS {
37 namespace DistributedSchedule {
38 namespace {
39 const std::string TAG = "DistributedSchedMissionManager";
40 constexpr size_t MAX_CACHED_ITEM = 10;
41 constexpr int32_t MAX_RETRY_TIMES = 15;
42 constexpr int32_t RETRY_DELAYED = 2000;
43 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
44 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
45 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
46 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
47 }
48 namespace Mission {
49 constexpr int32_t GET_MAX_MISSIONS = 20;
50 } // Mission
51 using namespace std::chrono;
52 using namespace Constants::Mission;
53 using namespace OHOS::DistributedKv;
54
55 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
56
Init()57 void DistributedSchedMissionManager::Init()
58 {
59 listenerDeath_ = new ListenerDeathRecipient();
60 remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
61 auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
62 missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
63 auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
64 updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
65 missonChangeListener_ = new DistributedMissionChangeListener();
66 auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
67 missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
68 }
69
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)70 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
71 int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
72 {
73 HILOGI("start!");
74 if (!IsDeviceIdValidated(deviceId)) {
75 return INVALID_PARAMETERS_ERR;
76 }
77 if (numMissions <= 0) {
78 HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
79 return INVALID_PARAMETERS_ERR;
80 }
81 std::vector<DstbMissionInfo> dstbMissionInfoSet;
82 int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
83 if (ret != ERR_OK) {
84 HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
85 return ret;
86 }
87 return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
88 }
89
GetRemoteDms(const std::string & deviceId)90 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
91 {
92 if (deviceId.empty()) {
93 HILOGE("GetRemoteDms remoteDeviceId is empty");
94 return nullptr;
95 }
96 int64_t begin = GetTickCount();
97 {
98 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
99 auto iter = remoteDmsMap_.find(deviceId);
100 if (iter != remoteDmsMap_.end()) {
101 auto object = iter->second;
102 if (object != nullptr) {
103 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
104 GetTickCount() - begin);
105 return object;
106 }
107 }
108 }
109 HILOGD("GetRemoteDms connect deviceid is %s", deviceId.c_str());
110 auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
111 if (samgr == nullptr) {
112 HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
113 return nullptr;
114 }
115 auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
116 if (object == nullptr) {
117 HILOGE("GetRemoteDms failed to get dms for remote device:%{public}s!",
118 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
119 return nullptr;
120 }
121 auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
122 HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
123 sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
124 {
125 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
126 auto iter = remoteDmsMap_.find(deviceId);
127 if (iter != remoteDmsMap_.end()) {
128 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
129 }
130 remoteDmsMap_[deviceId] = remoteDmsObj;
131 }
132 HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
133 return remoteDmsObj;
134 }
135
IsDeviceIdValidated(const std::string & deviceId)136 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
137 {
138 if (deviceId.empty()) {
139 HILOGE("IsDeviceIdValidated deviceId is empty!");
140 return false;
141 }
142 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
143 HILOGW("IsDeviceIdValidated device offline.");
144 return false;
145 }
146
147 return true;
148 }
149
NotifyRemoteDied(const wptr<IRemoteObject> & remote)150 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
151 {
152 if (distributedDataStorage_ == nullptr) {
153 HILOGE("DistributedDataStorage null!");
154 return;
155 }
156 distributedDataStorage_->NotifyRemoteDied(remote);
157 }
158
InitDataStorage()159 int32_t DistributedSchedMissionManager::InitDataStorage()
160 {
161 if (distributedDataStorage_ == nullptr) {
162 distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
163 }
164 if (!distributedDataStorage_->Init()) {
165 HILOGE("InitDataStorage DistributedDataStorage init failed!");
166 return ERR_NULL_OBJECT;
167 }
168 return ERR_NONE;
169 }
170
StopDataStorage()171 int32_t DistributedSchedMissionManager::StopDataStorage()
172 {
173 if (distributedDataStorage_ == nullptr) {
174 HILOGE("StopDataStorage DistributedDataStorage null!");
175 return ERR_NULL_OBJECT;
176 }
177 if (!distributedDataStorage_->Stop()) {
178 HILOGE("StopDataStorage DistributedDataStorage stop failed!");
179 return ERR_NULL_OBJECT;
180 }
181 return ERR_NONE;
182 }
183
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)184 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
185 const uint8_t* byteStream, size_t len)
186 {
187 if (distributedDataStorage_ == nullptr) {
188 HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
189 return ERR_NULL_OBJECT;
190 }
191 if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
192 HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
193 return INVALID_PARAMETERS_ERR;
194 }
195 return ERR_NONE;
196 }
197
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)198 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
199 {
200 if (distributedDataStorage_ == nullptr) {
201 HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
202 return ERR_NULL_OBJECT;
203 }
204 if (!distributedDataStorage_->Delete(deviceId, missionId)) {
205 HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
206 return INVALID_PARAMETERS_ERR;
207 }
208 return ERR_NONE;
209 }
210
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)211 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
212 std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
213 {
214 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
215 if (uuid.empty()) {
216 HILOGE("uuid is empty!");
217 return INVALID_PARAMETERS_ERR;
218 }
219 std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
220 if (snapshotPtr != nullptr) {
221 HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from cache successful!",
222 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
223 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
224 return ERR_NONE;
225 }
226 if (distributedDataStorage_ == nullptr) {
227 HILOGE("DistributedDataStorage null!");
228 return ERR_NULL_OBJECT;
229 }
230 DistributedKv::Value value;
231 bool ret = distributedDataStorage_->Query(networkId, missionId, value);
232 if (!ret) {
233 HILOGE("DistributedDataStorage query failed!");
234 return INVALID_PARAMETERS_ERR;
235 }
236 snapshotPtr = Snapshot::Create(value.Data());
237 if (snapshotPtr == nullptr) {
238 HILOGE("snapshot create failed!");
239 return ERR_NULL_OBJECT;
240 }
241 HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from DistributedDB successful!",
242 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
243 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
244 return ERR_NONE;
245 }
246
DeviceOnlineNotify(const std::string & networkId)247 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
248 {
249 if (networkId.empty()) {
250 HILOGW("DeviceOnlineNotify networkId empty!");
251 return;
252 }
253
254 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
255 if (missionHandler_ != nullptr) {
256 HILOGI("DeviceOnlineNotify RemoveTask");
257 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
258 }
259 }
260
DeviceOfflineNotify(const std::string & networkId)261 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
262 {
263 if (networkId.empty()) {
264 HILOGW("DeviceOfflineNotify networkId empty!");
265 return;
266 }
267 StopSyncMissionsFromRemote(networkId);
268 CleanMissionResources(networkId);
269 {
270 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
271 auto iter = remoteDmsMap_.find(networkId);
272 if (iter != remoteDmsMap_.end()) {
273 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
274 remoteDmsMap_.erase(iter);
275 }
276 }
277 HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s",
278 DnetworkAdapter::AnonymizeNetworkId(networkId).c_str());
279 }
280
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283 if (distributedDataStorage_ == nullptr) {
284 HILOGE("DeleteDataStorage DistributedDataStorage null!");
285 return;
286 }
287 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288 auto callback = [this, uuid, deviceId]() {
289 if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290 HILOGE("DeleteDataStorage storage delete failed!");
291 } else {
292 HILOGI("DeleteDataStorage storage delete successfully!");
293 }
294 };
295 if (isDelayed) {
296 if (missionHandler_ != nullptr) {
297 HILOGI("DeleteDataStorage PostTask");
298 missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299 }
300 } else {
301 if (missionHandler_ != nullptr) {
302 HILOGI("DeleteDataStorage RemoveTask");
303 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304 }
305 callback();
306 }
307 }
308
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310 const sptr<IRemoteObject>& listener)
311 {
312 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313 if (uuid.empty()) {
314 HILOGE("uuid is empty!");
315 return INVALID_PARAMETERS_ERR;
316 }
317 if (missionHandler_ != nullptr) {
318 HILOGI("RemoveTask");
319 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
320 }
321 if (listener == nullptr) {
322 return INVALID_PARAMETERS_ERR;
323 }
324 std::string localDeviceId;
325 std::string remoteDeviceId = Str16ToStr8(devId);
326 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
327 || localDeviceId == remoteDeviceId) {
328 HILOGE("check deviceId failed!");
329 return INVALID_PARAMETERS_ERR;
330 }
331 {
332 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
333 auto& listenerInfo = listenDeviceMap_[devId];
334 if (!listenerInfo.Emplace(listener)) {
335 HILOGW("RegisterSyncListener listener has already inserted!");
336 return ERR_NONE;
337 }
338 bool ret = listener->AddDeathRecipient(listenerDeath_);
339 if (!ret) {
340 HILOGW("RegisterSyncListener AddDeathRecipient failed!");
341 }
342 if (listenerInfo.Size() > 1) {
343 HILOGI("RegisterMissionListener not notify remote DMS!");
344 return ERR_NONE;
345 }
346 }
347 return ERR_NONE;
348 }
349
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)350 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
351 const std::string& localDevId)
352 {
353 std::u16string devId = Str8ToStr16(dstDevId);
354 {
355 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
356 auto iterItem = listenDeviceMap_.find(devId);
357 if (iterItem == listenDeviceMap_.end()) {
358 return ERR_NONE;
359 }
360 bool callFlag = iterItem->second.called;
361 if (callFlag) {
362 HILOGI("StartSyncRemoteMissions already called!");
363 return ERR_NONE;
364 }
365 }
366 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
367 if (remoteDms == nullptr) {
368 HILOGE("get remoteDms failed!");
369 RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
370 return GET_REMOTE_DMS_FAIL;
371 }
372 int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
373 if (ret == ERR_NONE) {
374 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
375 auto iterItem = listenDeviceMap_.find(devId);
376 if (iterItem != listenDeviceMap_.end()) {
377 iterItem->second.called = true;
378 }
379 }
380 return ret;
381 }
382
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)383 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
384 const sptr<IDistributedSched>& remoteDms)
385 {
386 std::vector<DstbMissionInfo> missionInfos;
387 CallerInfo callerInfo;
388 if (!GenerateCallerInfo(callerInfo)) {
389 return GET_LOCAL_DEVICE_ERR;
390 }
391 int64_t begin = GetTickCount();
392 int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
393 HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
394 ret, GetTickCount() - begin);
395 if (ret == ERR_NONE) {
396 RebornMissionCache(dstDevId, missionInfos);
397 }
398 return ret;
399 }
400
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)401 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
402 const sptr<IRemoteObject>& listener)
403 {
404 if (listener == nullptr) {
405 return INVALID_PARAMETERS_ERR;
406 }
407 if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
408 return INVALID_PARAMETERS_ERR;
409 }
410 std::string localDeviceId;
411 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
412 || localDeviceId == Str16ToStr8(devId)) {
413 HILOGE("check deviceId fail");
414 return INVALID_PARAMETERS_ERR;
415 }
416 {
417 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
418 auto iterItem = listenDeviceMap_.find(devId);
419 if (iterItem == listenDeviceMap_.end()) {
420 return ERR_NONE;
421 }
422 auto& listenerInfo = iterItem->second;
423 auto ret = listenerInfo.Find(listener);
424 if (!ret) {
425 HILOGI("listener not registered!");
426 return ERR_NONE;
427 }
428 listener->RemoveDeathRecipient(listenerDeath_);
429 listenerInfo.Erase(listener);
430 if (!listenerInfo.Empty()) {
431 return ERR_NONE;
432 }
433 listenDeviceMap_.erase(iterItem);
434 }
435 return ERR_NONE;
436 }
437
CleanMissionResources(const std::string & networkId)438 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
439 {
440 {
441 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
442 auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
443 if (iterDevice == listenDeviceMap_.end()) {
444 return;
445 }
446 auto& listenerInfo = iterDevice->second;
447 for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
448 if (listener != nullptr) {
449 listener->RemoveDeathRecipient(listenerDeath_);
450 }
451 }
452 listenDeviceMap_.erase(iterDevice);
453 }
454 StopSyncRemoteMissions(networkId, true);
455 }
456
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)457 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
458 bool offline, bool exit)
459 {
460 CleanMissionCache(dstDevId);
461 DeleteCachedSnapshotInfo(dstDevId);
462 DeleteDataStorage(dstDevId, true);
463
464 if (offline) {
465 return ERR_NONE;
466 }
467 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
468 if (remoteDms == nullptr) {
469 HILOGE("DMS get remoteDms failed");
470 return GET_REMOTE_DMS_FAIL;
471 }
472
473 CallerInfo callerInfo;
474 if (!GenerateCallerInfo(callerInfo)) {
475 return GET_LOCAL_DEVICE_ERR;
476 }
477 int64_t begin = GetTickCount();
478 int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
479 HILOGI("[PerformanceTest] ret:%d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
480 return ret;
481 }
482
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)483 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
484 int64_t tag)
485 {
486 std::string localDeviceId;
487 if (!IsDeviceIdValidated(dstDevId)) {
488 return INVALID_PARAMETERS_ERR;
489 }
490 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
491 || (dstDevId == localDeviceId)) {
492 HILOGE("check deviceId fail");
493 return INVALID_PARAMETERS_ERR;
494 }
495 HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
496 DnetworkAdapter::AnonymizeNetworkId(dstDevId).c_str(),
497 DnetworkAdapter::AnonymizeNetworkId(localDeviceId).c_str());
498 auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
499 if (ret != ERR_NONE) {
500 HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
501 return ret;
502 }
503 return ERR_NONE;
504 }
505
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)506 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
507 std::vector<DstbMissionInfo>& missionInfoSet)
508 {
509 auto deviceId = callerInfo.sourceDeviceId;
510 HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
511 {
512 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
513 remoteSyncDeviceSet_.emplace(deviceId);
514 }
515 int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
516 missionInfoSet);
517 auto func = [this, missionInfoSet]() {
518 HILOGD("RegisterMissionListener called.");
519 if (!isRegMissionChange_) {
520 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
521 if (ret == ERR_OK) {
522 isRegMissionChange_ = true;
523 }
524 InitAllSnapshots(missionInfoSet);
525 }
526 };
527 if (!missionHandler_->PostTask(func)) {
528 HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
529 }
530 return result;
531 }
532
StopSyncMissionsFromRemote(const std::string & networkId)533 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
534 {
535 HILOGD(" %{private}s!", networkId.c_str());
536 {
537 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
538 remoteSyncDeviceSet_.erase(networkId);
539 if (remoteSyncDeviceSet_.empty()) {
540 auto func = [this]() {
541 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
542 if (ret == ERR_OK) {
543 isRegMissionChange_ = false;
544 }
545 };
546 if (!missionHandler_->PostTask(func)) {
547 HILOGE("post UnRegisterMissionListener Task failed");
548 }
549 }
550 }
551 }
552
NeedSyncDevice(const std::string & deviceId)553 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
554 {
555 if (deviceId.empty()) {
556 HILOGD("deviceId empty!");
557 return false;
558 }
559 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
560 if (remoteSyncDeviceSet_.count(deviceId) == 0) {
561 return false;
562 }
563 return true;
564 }
565
HasSyncListener(const std::string & networkId)566 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
567 {
568 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
569 auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
570 if (iter != listenDeviceMap_.end()) {
571 return iter->second.called;
572 }
573 return false;
574 }
575
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)576 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
577 {
578 std::u16string u16DevId = Str8ToStr16(networkId);
579 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
580 auto iter = listenDeviceMap_.find(u16DevId);
581 if (iter == listenDeviceMap_.end()) {
582 return;
583 }
584 auto& listenerInfo = iter->second;
585 for (auto& listener : listenerInfo.listenerSet) {
586 MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
587 }
588 }
589
OnRemoteDied(const wptr<IRemoteObject> & remote)590 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
591 {
592 HILOGD("OnRemoteDied!");
593 sptr<IRemoteObject> listener = remote.promote();
594 if (listener == nullptr) {
595 HILOGW("listener is null");
596 return;
597 }
598 auto remoteDiedFunc = [this, listener]() {
599 OnMissionListenerDied(listener);
600 };
601 if (missionHandler_ != nullptr) {
602 missionHandler_->PostTask(remoteDiedFunc);
603 }
604 }
605
OnRemoteDied(const wptr<IRemoteObject> & remote)606 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
607 {
608 DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
609 }
610
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)611 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
612 std::unique_ptr<Snapshot> snapshot)
613 {
614 if (deviceId.empty() || snapshot == nullptr) {
615 HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
616 return;
617 }
618 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
619 std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
620 auto iter = cachedSnapshotInfos_.find(keyInfo);
621 if (iter != cachedSnapshotInfos_.end()) {
622 if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
623 return;
624 }
625 }
626
627 if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
628 int64_t oldest = -1;
629 auto iterOldest = cachedSnapshotInfos_.end();
630 for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
631 if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
632 oldest = iterItem->second->GetLastAccessTime();
633 iterOldest = iterItem;
634 }
635 }
636 if (iterOldest != cachedSnapshotInfos_.end()) {
637 cachedSnapshotInfos_.erase(iterOldest);
638 }
639 }
640 cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
641 }
642
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)643 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
644 int32_t missionId)
645 {
646 if (deviceId.empty()) {
647 HILOGW("DequeueCachedSnapshotInfo invalid input param!");
648 return nullptr;
649 }
650 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
651 auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
652 if (iter != cachedSnapshotInfos_.end()) {
653 std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
654 snapshot->UpdateLastAccessTime(GetTickCount());
655 iter->second = nullptr;
656 cachedSnapshotInfos_.erase(iter);
657 return snapshot;
658 }
659 return nullptr;
660 }
661
DeleteCachedSnapshotInfo(const std::string & networkId)662 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
663 {
664 if (networkId.empty()) {
665 HILOGW("networkId empty!");
666 return;
667 }
668 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
669 if (uuid.empty()) {
670 HILOGW("uuid empty!");
671 return;
672 }
673 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
674 auto iter = cachedSnapshotInfos_.begin();
675 while (iter != cachedSnapshotInfos_.end()) {
676 if (iter->first.find(uuid) != std::string::npos) {
677 iter = cachedSnapshotInfos_.erase(iter);
678 } else {
679 ++iter;
680 }
681 }
682 }
683
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)684 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
685 std::vector<DstbMissionInfo>& missionInfoSet)
686 {
687 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
688 if (uuid.empty()) {
689 HILOGE("uuid empty!");
690 return INVALID_PARAMETERS_ERR;
691 }
692 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
693 auto iter = deviceMissionInfos_.find(uuid);
694 if (iter == deviceMissionInfos_.end()) {
695 HILOGE("can not find uuid, deviceId: %{public}s!",
696 DnetworkAdapter::AnonymizeNetworkId(srcId).c_str());
697 return ERR_NULL_OBJECT;
698 }
699
700 // get at most numMissions missions
701 int32_t actualNums = static_cast<int32_t>((iter->second).size());
702 if (actualNums < 0) {
703 HILOGE("invalid size!");
704 return ERR_NULL_OBJECT;
705 }
706 missionInfoSet.assign((iter->second).begin(),
707 (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
708 return ERR_NONE;
709 }
710
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)711 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
712 const std::vector<DstbMissionInfo>& missionInfoSet)
713 {
714 HILOGI("start! deviceId is %{public}s",
715 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
716 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
717 if (uuid.empty()) {
718 HILOGE("uuid empty!");
719 return;
720 }
721 {
722 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
723 deviceMissionInfos_[uuid] = missionInfoSet;
724 }
725 HILOGI("RebornMissionCache end!");
726 }
727
CleanMissionCache(const std::string & deviceId)728 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
729 {
730 HILOGI("CleanMissionCache start! deviceId is %{public}s",
731 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
732 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
733 if (uuid.empty()) {
734 HILOGE("CleanMissionCache uuid empty!");
735 return;
736 }
737 {
738 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
739 deviceMissionInfos_.erase(uuid);
740 }
741 HILOGI("CleanMissionCache end!");
742 }
743
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)744 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
745 const std::vector<DstbMissionInfo>& missionInfoSet)
746 {
747 HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
748 std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
749 RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
750 {
751 HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
752 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
753 auto iter = listenDeviceMap_.find(u16DevId);
754 if (iter == listenDeviceMap_.end()) {
755 HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
756 return INVALID_PARAMETERS_ERR;
757 }
758 auto& listenerSet = iter->second.listenerSet;
759 auto notifyChanged = [listenerSet, u16DevId] () {
760 for (const auto& listener : listenerSet) {
761 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
762 }
763 };
764 if (missionHandler_ != nullptr) {
765 missionHandler_->PostTask(notifyChanged);
766 HILOGI("NotifyMissionsChangedFromRemote end!");
767 return ERR_NONE;
768 }
769 }
770 return INVALID_PARAMETERS_ERR;
771 }
772
NotifyLocalMissionsChanged()773 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
774 {
775 auto func = [this]() {
776 HILOGI("NotifyLocalMissionsChanged");
777 std::vector<DstbMissionInfo> missionInfos;
778 int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
779 missionInfos);
780 if (ret == ERR_OK) {
781 int32_t result = NotifyMissionsChangedToRemote(missionInfos);
782 HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
783 }
784 };
785 if (!missionChangeHandler_->PostTask(func)) {
786 HILOGE("postTask failed");
787 }
788 }
789
NotifyMissionSnapshotCreated(int32_t missionId)790 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
791 {
792 auto func = [this, missionId]() {
793 HILOGD("called.");
794 ErrCode errCode = MissionSnapshotChanged(missionId);
795 if (errCode != ERR_OK) {
796 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
797 }
798 };
799 if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
800 HILOGE("post MissionSnapshotChanged delay Task failed");
801 }
802 }
803
NotifyMissionSnapshotChanged(int32_t missionId)804 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
805 {
806 auto func = [this, missionId]() {
807 HILOGD("called.");
808 ErrCode errCode = MissionSnapshotChanged(missionId);
809 if (errCode != ERR_OK) {
810 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
811 }
812 };
813 if (!missionChangeHandler_->PostTask(func)) {
814 HILOGE("post MissionSnapshotChanged Task failed");
815 }
816 }
817
NotifyMissionSnapshotDestroyed(int32_t missionId)818 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
819 {
820 auto func = [this, missionId]() {
821 HILOGD("called.");
822 ErrCode errCode = MissionSnapshotDestroyed(missionId);
823 if (errCode != ERR_OK) {
824 HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
825 }
826 };
827 if (!missionChangeHandler_->PostTask(func)) {
828 HILOGE("post MissionSnapshotDestroyed Task failed");
829 }
830 }
831
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)832 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
833 const std::vector<DstbMissionInfo> &missionInfoSet)
834 {
835 CallerInfo callerInfo;
836 if (!GenerateCallerInfo(callerInfo)) {
837 return GET_LOCAL_DEVICE_ERR;
838 }
839 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
840 for (const auto& destDeviceId : remoteSyncDeviceSet_) {
841 auto handler = FetchDeviceHandler(destDeviceId);
842 if (handler == nullptr) {
843 HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
844 continue;
845 }
846 auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
847 NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
848 };
849 if (!handler->PostTask(callback)) {
850 HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
851 return ERR_NULL_OBJECT;
852 }
853 }
854
855 return ERR_NONE;
856 }
857
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)858 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
859 const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
860 {
861 sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
862 if (remoteDms == nullptr) {
863 HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
864 return;
865 }
866 int64_t begin = GetTickCount();
867 int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
868 HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
869 result, GetTickCount() - begin);
870 }
871
GenerateCallerInfo(CallerInfo & callerInfo)872 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
873 {
874 std::string localUuid;
875 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
876 HILOGE("get local uuid failed!");
877 return false;
878 }
879 callerInfo.uid = IPCSkeleton::GetCallingUid();
880 callerInfo.pid = IPCSkeleton::GetCallingPid();
881 callerInfo.callerType = CALLER_TYPE_HARMONY;
882 callerInfo.sourceDeviceId = localUuid;
883 callerInfo.dmsVersion = VERSION;
884 return true;
885 }
886
FetchDeviceHandler(const std::string & deviceId)887 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
888 const std::string& deviceId)
889 {
890 if (!IsDeviceIdValidated(deviceId)) {
891 HILOGW("FetchDeviceHandler device:%{public}s offline.",
892 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
893 return nullptr;
894 }
895
896 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
897 if (uuid.empty()) {
898 HILOGE("FetchDeviceHandler uuid empty!");
899 return nullptr;
900 }
901
902 auto iter = deviceHandle_.find(uuid);
903 if (iter != deviceHandle_.end()) {
904 return iter->second;
905 }
906
907 auto anonyUuid = DnetworkAdapter::AnonymizeNetworkId(uuid);
908 auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
909 auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
910 deviceHandle_.emplace(uuid, handler);
911 return handler;
912 }
913
OnRemoteDied(const wptr<IRemoteObject> & remote)914 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
915 {
916 HILOGI("OnRemoteDied received died notify!");
917 DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
918 }
919
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)920 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
921 {
922 sptr<IRemoteObject> diedRemoted = remote.promote();
923 if (diedRemoted == nullptr) {
924 HILOGW("OnRemoteDmsDied promote failed!");
925 return;
926 }
927 HILOGD("delete diedRemoted");
928 auto remoteDmsDiedFunc = [this, diedRemoted]() {
929 OnRemoteDmsDied(diedRemoted);
930 };
931 if (missionHandler_ != nullptr) {
932 missionHandler_->PostTask(remoteDmsDiedFunc);
933 }
934 }
935
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)936 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
937 const std::string& localDevId, int32_t retryTimes)
938 {
939 auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
940 bool ret = HasSyncListener(dstDeviceId);
941 if (!ret) {
942 return;
943 }
944 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
945 if (remoteDms == nullptr) {
946 HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
947 RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
948 return;
949 }
950 int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
951 HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
952 };
953 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
954 missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
955 }
956 }
957
OnMissionListenerDied(const sptr<IRemoteObject> & remote)958 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
959 {
960 HILOGI("OnMissionListenerDied");
961 std::set<std::string> deviceSet;
962 {
963 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
964 auto iterItem = listenDeviceMap_.begin();
965 while (iterItem != listenDeviceMap_.end()) {
966 auto& listenerInfo = iterItem->second;
967 auto ret = listenerInfo.Find(remote);
968 if (!ret) {
969 ++iterItem;
970 continue;
971 }
972 remote->RemoveDeathRecipient(listenerDeath_);
973 listenerInfo.Erase(remote);
974 if (listenerInfo.Empty()) {
975 if (listenerInfo.called) {
976 deviceSet.emplace(Str16ToStr8(iterItem->first));
977 }
978 iterItem = listenDeviceMap_.erase(iterItem);
979 } else {
980 ++iterItem;
981 }
982 }
983 }
984 for (auto& devId : deviceSet) {
985 StopSyncRemoteMissions(devId, false);
986 }
987 }
988
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)989 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
990 {
991 HILOGI("OnRemoteDmsDied");
992 std::string devId;
993 {
994 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
995 for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
996 if (iter->second->AsObject() == remote) {
997 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
998 devId = iter->first;
999 remoteDmsMap_.erase(iter);
1000 break;
1001 }
1002 }
1003 }
1004 if (devId.empty()) {
1005 return;
1006 }
1007 bool ret = HasSyncListener(devId);
1008 if (ret) {
1009 std::string localDeviceId;
1010 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1011 return;
1012 }
1013 RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1014 }
1015 }
1016
NotifyDmsProxyProcessDied()1017 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1018 {
1019 HILOGI("NotifyDmsProxyProcessDied!");
1020 if (!isRegMissionChange_) {
1021 return;
1022 }
1023 RetryRegisterMissionChange(0);
1024 }
1025
RetryRegisterMissionChange(int32_t retryTimes)1026 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1027 {
1028 auto remoteDiedFunc = [this, retryTimes]() {
1029 HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1030 if (!isRegMissionChange_) {
1031 return;
1032 }
1033 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1034 if (ret == ERR_NULL_OBJECT) {
1035 RetryRegisterMissionChange(retryTimes + 1);
1036 HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1037 return;
1038 }
1039 HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1040 };
1041 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1042 missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1043 }
1044 }
1045
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1046 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1047 {
1048 for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1049 ErrCode errCode = MissionSnapshotChanged(iter->id);
1050 if (errCode != ERR_OK) {
1051 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1052 }
1053 }
1054 }
1055
MissionSnapshotChanged(int32_t missionId)1056 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1057 {
1058 std::string networkId;
1059 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1060 HILOGE("get local networkId failed!");
1061 return INVALID_PARAMETERS_ERR;
1062 }
1063 AAFwk::MissionSnapshot missionSnapshot;
1064 ErrCode errCode = DistributedSchedAdapter::GetInstance()
1065 .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1066 if (errCode != ERR_OK) {
1067 HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1068 return errCode;
1069 }
1070 Snapshot snapshot;
1071 SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1072 MessageParcel data;
1073 errCode = MissionSnapshotSequence(snapshot, data);
1074 if (errCode != ERR_OK) {
1075 HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1076 return errCode;
1077 }
1078 size_t len = data.GetReadableBytes();
1079 const uint8_t* byteStream = data.ReadBuffer(len);
1080 errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1081 return errCode;
1082 }
1083
MissionSnapshotDestroyed(int32_t missionId)1084 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1085 {
1086 std::string networkId;
1087 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1088 HILOGE("get local networkId failed!");
1089 return INVALID_PARAMETERS_ERR;
1090 }
1091 ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1092 return errCode;
1093 }
1094
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1095 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1096 {
1097 bool ret = snapshot.WriteSnapshotInfo(data);
1098 if (!ret) {
1099 HILOGE("WriteSnapshotInfo failed!");
1100 return ERR_FLATTEN_OBJECT;
1101 }
1102 ret = snapshot.WritePixelMap(data);
1103 if (!ret) {
1104 HILOGE("WritePixelMap failed!");
1105 return ERR_FLATTEN_OBJECT;
1106 }
1107 return ERR_OK;
1108 }
1109
OnDnetDied()1110 void DistributedSchedMissionManager::OnDnetDied()
1111 {
1112 auto dnetDiedFunc = [this]() {
1113 HILOGI("OnDnetDied");
1114 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1115 if (!isRegMissionChange_) {
1116 return;
1117 }
1118 remoteSyncDeviceSet_.clear();
1119 DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1120 isRegMissionChange_ = false;
1121 };
1122 if (missionHandler_ != nullptr) {
1123 missionHandler_->PostTask(dnetDiedFunc);
1124 }
1125 }
1126 } // namespace DistributedSchedule
1127 } // namespace OHOS
1128