1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_sched_mission_manager.h"
17
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21
22 #include "datetime_ex.h"
23 #include "distributed_sched_adapter.h"
24 #include "dtbschedmgr_device_info_storage.h"
25 #include "dtbschedmgr_log.h"
26 #include "ipc_skeleton.h"
27 #include "iservice_registry.h"
28 #include "mission/mission_changed_notify.h"
29 #include "mission/mission_constant.h"
30 #include "mission/mission_info_converter.h"
31 #include "mission/snapshot_converter.h"
32 #include "nlohmann/json.hpp"
33 #include "string_ex.h"
34 #include "system_ability_definition.h"
35
36 namespace OHOS {
37 namespace DistributedSchedule {
38 namespace {
39 const std::string TAG = "DistributedSchedMissionManager";
40 constexpr size_t MAX_CACHED_ITEM = 10;
41 constexpr int32_t MAX_RETRY_TIMES = 15;
42 constexpr int32_t RETRY_DELAYED = 2000;
43 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
44 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
45 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
46 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
47 }
48 namespace Mission {
49 constexpr int32_t GET_MAX_MISSIONS = 20;
50 } // Mission
51 using namespace std::chrono;
52 using namespace Constants::Mission;
53 using namespace OHOS::DistributedKv;
54
55 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
56
Init()57 void DistributedSchedMissionManager::Init()
58 {
59 listenerDeath_ = new ListenerDeathRecipient();
60 remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
61 auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
62 missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
63 auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
64 updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
65 missonChangeListener_ = new DistributedMissionChangeListener();
66 auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
67 missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
68 }
69
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfos)70 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
71 int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfos)
72 {
73 HILOGI("start!");
74 if (!IsDeviceIdValidated(deviceId)) {
75 return INVALID_PARAMETERS_ERR;
76 }
77 if (numMissions <= 0) {
78 HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
79 return INVALID_PARAMETERS_ERR;
80 }
81 std::vector<DstbMissionInfo> dstbMissionInfos;
82 int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfos);
83 if (ret != ERR_OK) {
84 HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
85 return ret;
86 }
87 return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfos, missionInfos);
88 }
89
GetRemoteDms(const std::string & deviceId)90 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
91 {
92 int64_t begin = GetTickCount();
93 if (deviceId.empty()) {
94 HILOGE("GetRemoteDms remoteDeviceId is empty");
95 return nullptr;
96 }
97 {
98 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
99 auto iter = remoteDmsMap_.find(deviceId);
100 if (iter != remoteDmsMap_.end()) {
101 auto object = iter->second;
102 if (object != nullptr) {
103 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
104 GetTickCount() - begin);
105 return object;
106 }
107 }
108 }
109 HILOGD("GetRemoteDms connect deviceid is %s", deviceId.c_str());
110 auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
111 if (samgr == nullptr) {
112 HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
113 return nullptr;
114 }
115 auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
116 if (object == nullptr) {
117 HILOGE("GetRemoteDms failed to get dms for remote device:%{public}s!",
118 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
119 return nullptr;
120 }
121 auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
122 HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
123 sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
124 {
125 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
126 auto iter = remoteDmsMap_.find(deviceId);
127 if (iter != remoteDmsMap_.end()) {
128 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
129 }
130 remoteDmsMap_[deviceId] = remoteDmsObj;
131 }
132 HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
133 return remoteDmsObj;
134 }
135
IsDeviceIdValidated(const std::string & deviceId)136 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
137 {
138 if (deviceId.empty()) {
139 HILOGE("IsDeviceIdValidated deviceId is empty!");
140 return false;
141 }
142 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
143 HILOGW("IsDeviceIdValidated device offline.");
144 return false;
145 }
146
147 return true;
148 }
149
NotifyRemoteDied(const wptr<IRemoteObject> & remote)150 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
151 {
152 if (distributedDataStorage_ == nullptr) {
153 HILOGE("DistributedDataStorage null!");
154 return;
155 }
156 distributedDataStorage_->NotifyRemoteDied(remote);
157 }
158
InitDataStorage()159 int32_t DistributedSchedMissionManager::InitDataStorage()
160 {
161 if (distributedDataStorage_ == nullptr) {
162 distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
163 }
164 if (!distributedDataStorage_->Init()) {
165 HILOGE("InitDataStorage DistributedDataStorage init failed!");
166 return ERR_NULL_OBJECT;
167 }
168 return ERR_NONE;
169 }
170
StopDataStorage()171 int32_t DistributedSchedMissionManager::StopDataStorage()
172 {
173 if (distributedDataStorage_ == nullptr) {
174 HILOGE("StopDataStorage DistributedDataStorage null!");
175 return ERR_NULL_OBJECT;
176 }
177 if (!distributedDataStorage_->Stop()) {
178 HILOGE("StopDataStorage DistributedDataStorage stop failed!");
179 return ERR_NULL_OBJECT;
180 }
181 return ERR_NONE;
182 }
183
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)184 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
185 const uint8_t* byteStream, size_t len)
186 {
187 if (distributedDataStorage_ == nullptr) {
188 HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
189 return ERR_NULL_OBJECT;
190 }
191 if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
192 HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
193 return INVALID_PARAMETERS_ERR;
194 }
195 return ERR_NONE;
196 }
197
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)198 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
199 {
200 if (distributedDataStorage_ == nullptr) {
201 HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
202 return ERR_NULL_OBJECT;
203 }
204 if (!distributedDataStorage_->Delete(deviceId, missionId)) {
205 HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
206 return INVALID_PARAMETERS_ERR;
207 }
208 return ERR_NONE;
209 }
210
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)211 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
212 std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
213 {
214 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
215 if (uuid.empty()) {
216 HILOGE("uuid is empty!");
217 return INVALID_PARAMETERS_ERR;
218 }
219 std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
220 if (snapshotPtr != nullptr) {
221 HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from cache successful!",
222 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
223 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
224 return ERR_NONE;
225 }
226 if (distributedDataStorage_ == nullptr) {
227 HILOGE("DistributedDataStorage null!");
228 return ERR_NULL_OBJECT;
229 }
230 DistributedKv::Value value;
231 bool ret = distributedDataStorage_->Query(networkId, missionId, value);
232 if (!ret) {
233 HILOGE("DistributedDataStorage query failed!");
234 return INVALID_PARAMETERS_ERR;
235 }
236 snapshotPtr = Snapshot::Create(value.Data());
237 if (snapshotPtr == nullptr) {
238 HILOGE("snapshot create failed!");
239 return ERR_NULL_OBJECT;
240 }
241 HILOGI("get uuid = %{public}s + missionId = %{public}d snapshot from DistributedDB successful!",
242 DnetworkAdapter::AnonymizeNetworkId(uuid).c_str(), missionId);
243 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
244 return ERR_NONE;
245 }
246
DeviceOnlineNotify(const std::string & networkId)247 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
248 {
249 if (networkId.empty()) {
250 HILOGW("DeviceOnlineNotify networkId empty!");
251 return;
252 }
253
254 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
255 if (missionHandler_ != nullptr) {
256 HILOGI("DeviceOnlineNotify RemoveTask");
257 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
258 }
259 }
260
DeviceOfflineNotify(const std::string & networkId)261 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
262 {
263 if (networkId.empty()) {
264 HILOGW("DeviceOfflineNotify networkId empty!");
265 return;
266 }
267 StopSyncMissionsFromRemote(networkId);
268 CleanMissionResources(networkId);
269 {
270 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
271 auto iter = remoteDmsMap_.find(networkId);
272 if (iter != remoteDmsMap_.end()) {
273 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
274 remoteDmsMap_.erase(iter);
275 }
276 }
277 HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s",
278 DnetworkAdapter::AnonymizeNetworkId(networkId).c_str());
279 }
280
DeleteDataStorage(const std::string & deviceId,bool isDelayed)281 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
282 {
283 if (distributedDataStorage_ == nullptr) {
284 HILOGE("DeleteDataStorage DistributedDataStorage null!");
285 return;
286 }
287 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
288 auto callback = [this, uuid, deviceId]() {
289 if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
290 HILOGE("DeleteDataStorage storage delete failed!");
291 } else {
292 HILOGI("DeleteDataStorage storage delete successfully!");
293 }
294 };
295 if (isDelayed) {
296 if (missionHandler_ != nullptr) {
297 HILOGI("DeleteDataStorage PostTask");
298 missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
299 }
300 } else {
301 if (missionHandler_ != nullptr) {
302 HILOGI("DeleteDataStorage RemoveTask");
303 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
304 }
305 callback();
306 }
307 }
308
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)309 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
310 const sptr<IRemoteObject>& listener)
311 {
312 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
313 if (uuid.empty()) {
314 HILOGE("uuid is empty!");
315 return INVALID_PARAMETERS_ERR;
316 }
317 if (missionHandler_ != nullptr) {
318 HILOGI("RemoveTask");
319 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
320 }
321 if (listener == nullptr) {
322 return INVALID_PARAMETERS_ERR;
323 }
324 std::string localDeviceId;
325 std::string remoteDeviceId = Str16ToStr8(devId);
326 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
327 || localDeviceId == remoteDeviceId) {
328 HILOGE("check deviceId failed!");
329 return INVALID_PARAMETERS_ERR;
330 }
331 {
332 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
333 auto& listenerInfo = listenDeviceMap_[devId];
334 if (!listenerInfo.Emplace(listener)) {
335 HILOGW("RegisterSyncListener listener has already inserted!");
336 return ERR_NONE;
337 }
338 bool ret = listener->AddDeathRecipient(listenerDeath_);
339 if (!ret) {
340 HILOGW("RegisterSyncListener AddDeathRecipient failed!");
341 }
342 if (listenerInfo.Size() > 1) {
343 HILOGI("RegisterMissionListener not notify remote DMS!");
344 return ERR_NONE;
345 }
346 }
347 return ERR_NONE;
348 }
349
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)350 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
351 const std::string& localDevId)
352 {
353 std::u16string devId = Str8ToStr16(dstDevId);
354 {
355 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
356 auto iterItem = listenDeviceMap_.find(devId);
357 if (iterItem == listenDeviceMap_.end()) {
358 return ERR_NONE;
359 }
360 bool callFlag = iterItem->second.called;
361 if (callFlag) {
362 HILOGI("StartSyncRemoteMissions already called!");
363 return ERR_NONE;
364 }
365 }
366 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
367 if (remoteDms == nullptr) {
368 HILOGE("get remoteDms failed!");
369 RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
370 return GET_REMOTE_DMS_FAIL;
371 }
372 int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
373 if (ret == ERR_NONE) {
374 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
375 auto iterItem = listenDeviceMap_.find(devId);
376 if (iterItem != listenDeviceMap_.end()) {
377 iterItem->second.called = true;
378 }
379 }
380 return ret;
381 }
382
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)383 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
384 const sptr<IDistributedSched>& remoteDms)
385 {
386 std::vector<DstbMissionInfo> missionInfos;
387 CallerInfo callerInfo;
388 if (!GenerateCallerInfo(callerInfo)) {
389 return GET_LOCAL_DEVICE_ERR;
390 }
391 int64_t begin = GetTickCount();
392 int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
393 HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
394 ret, GetTickCount() - begin);
395 if (ret == ERR_NONE) {
396 RebornMissionCache(dstDevId, missionInfos);
397 }
398 return ret;
399 }
400
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)401 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
402 const sptr<IRemoteObject>& listener)
403 {
404 if (listener == nullptr) {
405 return INVALID_PARAMETERS_ERR;
406 }
407 if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
408 return INVALID_PARAMETERS_ERR;
409 }
410 std::string localDeviceId;
411 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
412 || localDeviceId == Str16ToStr8(devId)) {
413 HILOGE("check deviceId fail");
414 return INVALID_PARAMETERS_ERR;
415 }
416 {
417 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
418 auto iterItem = listenDeviceMap_.find(devId);
419 if (iterItem == listenDeviceMap_.end()) {
420 return ERR_NONE;
421 }
422 auto& listenerInfo = iterItem->second;
423 auto ret = listenerInfo.Find(listener);
424 if (!ret) {
425 HILOGI("listener not registered!");
426 return ERR_NONE;
427 }
428 listener->RemoveDeathRecipient(listenerDeath_);
429 listenerInfo.Erase(listener);
430 if (!listenerInfo.Empty()) {
431 return ERR_NONE;
432 }
433 listenDeviceMap_.erase(iterItem);
434 }
435 return ERR_NONE;
436 }
437
CleanMissionResources(const std::string & networkId)438 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
439 {
440 {
441 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
442 auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
443 if (iterDevice == listenDeviceMap_.end()) {
444 return;
445 }
446 auto& listenerInfo = iterDevice->second;
447 for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
448 if (listener != nullptr) {
449 listener->RemoveDeathRecipient(listenerDeath_);
450 }
451 }
452 listenDeviceMap_.erase(iterDevice);
453 }
454 StopSyncRemoteMissions(networkId, true);
455 }
456
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)457 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
458 bool offline, bool exit)
459 {
460 CleanMissionCache(dstDevId);
461 DeleteCachedSnapshotInfo(dstDevId);
462 DeleteDataStorage(dstDevId, true);
463
464 if (offline) {
465 return ERR_NONE;
466 }
467 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
468 if (remoteDms == nullptr) {
469 HILOGE("DMS get remoteDms failed");
470 return GET_REMOTE_DMS_FAIL;
471 }
472
473 CallerInfo callerInfo;
474 if (!GenerateCallerInfo(callerInfo)) {
475 return GET_LOCAL_DEVICE_ERR;
476 }
477 int64_t begin = GetTickCount();
478 int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
479 HILOGI("[PerformanceTest] ret:%d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
480 return ret;
481 }
482
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)483 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
484 int64_t tag)
485 {
486 std::string localDeviceId;
487 if (!IsDeviceIdValidated(dstDevId)) {
488 return INVALID_PARAMETERS_ERR;
489 }
490 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
491 || (dstDevId == localDeviceId)) {
492 HILOGE("check deviceId fail");
493 return INVALID_PARAMETERS_ERR;
494 }
495 HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
496 DnetworkAdapter::AnonymizeNetworkId(dstDevId).c_str(),
497 DnetworkAdapter::AnonymizeNetworkId(localDeviceId).c_str());
498 auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
499 if (ret != ERR_NONE) {
500 HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
501 return ret;
502 }
503 return ERR_NONE;
504 }
505
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfos)506 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
507 std::vector<DstbMissionInfo>& missionInfos)
508 {
509 auto deviceId = callerInfo.sourceDeviceId;
510 HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
511 {
512 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
513 remoteSyncDeviceSet_.emplace(deviceId);
514 }
515 int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
516 missionInfos);
517 auto func = [this, missionInfos]() {
518 HILOGD("RegisterMissionListener called.");
519 if (!isRegMissionChange_) {
520 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
521 if (ret == ERR_OK) {
522 isRegMissionChange_ = true;
523 }
524 InitAllSnapshots(missionInfos);
525 }
526 };
527 if (!missionHandler_->PostTask(func)) {
528 HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
529 }
530 return result;
531 }
532
StopSyncMissionsFromRemote(const std::string & networkId)533 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
534 {
535 HILOGD(" %{private}s!", networkId.c_str());
536 {
537 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
538 remoteSyncDeviceSet_.erase(networkId);
539 if (remoteSyncDeviceSet_.empty()) {
540 auto func = [this]() {
541 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
542 if (ret == ERR_OK) {
543 isRegMissionChange_ = false;
544 }
545 };
546 if (!missionHandler_->PostTask(func)) {
547 HILOGE("post UnRegisterMissionListener Task failed");
548 }
549 }
550 }
551 }
552
NeedSyncDevice(const std::string & deviceId)553 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
554 {
555 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
556 if (remoteSyncDeviceSet_.count(deviceId) == 0) {
557 return false;
558 }
559 return true;
560 }
561
HasSyncListener(const std::string & networkId)562 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
563 {
564 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
565 auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
566 if (iter != listenDeviceMap_.end()) {
567 return iter->second.called;
568 }
569 return false;
570 }
571
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)572 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
573 {
574 std::u16string u16DevId = Str8ToStr16(networkId);
575 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
576 auto iter = listenDeviceMap_.find(u16DevId);
577 if (iter == listenDeviceMap_.end()) {
578 return;
579 }
580 auto& listenerInfo = iter->second;
581 for (auto& listener : listenerInfo.listenerSet) {
582 MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
583 }
584 }
585
OnRemoteDied(const wptr<IRemoteObject> & remote)586 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
587 {
588 HILOGD("OnRemoteDied!");
589 sptr<IRemoteObject> listener = remote.promote();
590 if (listener == nullptr) {
591 return;
592 }
593 auto remoteDiedFunc = [this, listener]() {
594 OnMissionListenerDied(listener);
595 };
596 if (missionHandler_ != nullptr) {
597 missionHandler_->PostTask(remoteDiedFunc);
598 }
599 }
600
OnRemoteDied(const wptr<IRemoteObject> & remote)601 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
602 {
603 DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
604 }
605
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)606 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
607 std::unique_ptr<Snapshot> snapshot)
608 {
609 if (deviceId.empty() || snapshot == nullptr) {
610 HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
611 return;
612 }
613 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
614 std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
615 auto iter = cachedSnapshotInfos_.find(keyInfo);
616 if (iter != cachedSnapshotInfos_.end()) {
617 if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
618 return;
619 }
620 }
621
622 if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
623 int64_t oldest = -1;
624 auto iterOldest = cachedSnapshotInfos_.end();
625 for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
626 if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
627 oldest = iterItem->second->GetLastAccessTime();
628 iterOldest = iterItem;
629 }
630 }
631 if (iterOldest != cachedSnapshotInfos_.end()) {
632 cachedSnapshotInfos_.erase(iterOldest);
633 }
634 }
635 cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
636 }
637
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)638 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
639 int32_t missionId)
640 {
641 if (deviceId.empty()) {
642 HILOGW("DequeueCachedSnapshotInfo invalid input param!");
643 return nullptr;
644 }
645 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
646 auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
647 if (iter != cachedSnapshotInfos_.end()) {
648 std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
649 snapshot->UpdateLastAccessTime(GetTickCount());
650 iter->second = nullptr;
651 cachedSnapshotInfos_.erase(iter);
652 return snapshot;
653 }
654 return nullptr;
655 }
656
DeleteCachedSnapshotInfo(const std::string & networkId)657 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
658 {
659 if (networkId.empty()) {
660 return;
661 }
662 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
663 if (uuid.empty()) {
664 return;
665 }
666 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
667 auto iter = cachedSnapshotInfos_.begin();
668 while (iter != cachedSnapshotInfos_.end()) {
669 if (iter->first.find(uuid) != std::string::npos) {
670 iter = cachedSnapshotInfos_.erase(iter);
671 } else {
672 ++iter;
673 }
674 }
675 }
676
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfos)677 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
678 std::vector<DstbMissionInfo>& missionInfos)
679 {
680 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
681 if (uuid.empty()) {
682 HILOGE("uuid empty!");
683 return INVALID_PARAMETERS_ERR;
684 }
685 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
686 auto iter = deviceMissionInfos_.find(uuid);
687 if (iter == deviceMissionInfos_.end()) {
688 HILOGE("can not find uuid, deviceId: %{public}s!",
689 DnetworkAdapter::AnonymizeNetworkId(srcId).c_str());
690 return ERR_NULL_OBJECT;
691 }
692
693 // get at most numMissions missions
694 int32_t actualNums = static_cast<int32_t>((iter->second).size());
695 if (actualNums < 0) {
696 HILOGE("invalid size!");
697 return ERR_NULL_OBJECT;
698 }
699 missionInfos.assign((iter->second).begin(),
700 (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
701 return ERR_NONE;
702 }
703
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos)704 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
705 const std::vector<DstbMissionInfo>& missionInfos)
706 {
707 HILOGI("start! deviceId is %{public}s",
708 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
709 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
710 if (uuid.empty()) {
711 HILOGE("uuid empty!");
712 return;
713 }
714 {
715 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
716 deviceMissionInfos_[uuid] = missionInfos;
717 }
718 HILOGI("RebornMissionCache end!");
719 }
720
CleanMissionCache(const std::string & deviceId)721 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
722 {
723 HILOGI("CleanMissionCache start! deviceId is %{public}s",
724 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
725 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
726 if (uuid.empty()) {
727 HILOGE("CleanMissionCache uuid empty!");
728 return;
729 }
730 {
731 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
732 deviceMissionInfos_.erase(uuid);
733 }
734 HILOGI("CleanMissionCache end!");
735 }
736
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfos)737 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
738 const std::vector<DstbMissionInfo>& missionInfos)
739 {
740 HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
741 std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
742 RebornMissionCache(callerInfo.sourceDeviceId, missionInfos);
743 {
744 HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
745 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
746 auto iter = listenDeviceMap_.find(u16DevId);
747 if (iter == listenDeviceMap_.end()) {
748 HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
749 return INVALID_PARAMETERS_ERR;
750 }
751 auto& listenerSet = iter->second.listenerSet;
752 auto notifyChanged = [listenerSet, u16DevId] () {
753 for (const auto& listener : listenerSet) {
754 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
755 }
756 };
757 if (missionHandler_ != nullptr) {
758 missionHandler_->PostTask(notifyChanged);
759 HILOGI("NotifyMissionsChangedFromRemote end!");
760 return ERR_NONE;
761 }
762 }
763 return INVALID_PARAMETERS_ERR;
764 }
765
NotifyLocalMissionsChanged()766 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
767 {
768 auto func = [this]() {
769 HILOGI("NotifyLocalMissionsChanged");
770 std::vector<DstbMissionInfo> missionInfos;
771 int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
772 missionInfos);
773 if (ret == ERR_OK) {
774 int32_t result = NotifyMissionsChangedToRemote(missionInfos);
775 HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
776 }
777 };
778 if (!missionChangeHandler_->PostTask(func)) {
779 HILOGE("postTask failed");
780 }
781 }
782
NotifyMissionSnapshotCreated(int32_t missionId)783 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
784 {
785 auto func = [this, missionId]() {
786 HILOGD("called.");
787 ErrCode errCode = MissionSnapshotChanged(missionId);
788 if (errCode != ERR_OK) {
789 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
790 }
791 };
792 if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
793 HILOGE("post MissionSnapshotChanged delay Task failed");
794 }
795 }
796
NotifyMissionSnapshotChanged(int32_t missionId)797 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
798 {
799 auto func = [this, missionId]() {
800 HILOGD("called.");
801 ErrCode errCode = MissionSnapshotChanged(missionId);
802 if (errCode != ERR_OK) {
803 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
804 }
805 };
806 if (!missionChangeHandler_->PostTask(func)) {
807 HILOGE("post MissionSnapshotChanged Task failed");
808 }
809 }
810
NotifyMissionSnapshotDestroyed(int32_t missionId)811 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
812 {
813 auto func = [this, missionId]() {
814 HILOGD("called.");
815 ErrCode errCode = MissionSnapshotDestroyed(missionId);
816 if (errCode != ERR_OK) {
817 HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
818 }
819 };
820 if (!missionChangeHandler_->PostTask(func)) {
821 HILOGE("post MissionSnapshotDestroyed Task failed");
822 }
823 }
824
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfos)825 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo>& missionInfos)
826 {
827 CallerInfo callerInfo;
828 if (!GenerateCallerInfo(callerInfo)) {
829 return GET_LOCAL_DEVICE_ERR;
830 }
831 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
832 for (const auto& destDeviceId : remoteSyncDeviceSet_) {
833 auto handler = FetchDeviceHandler(destDeviceId);
834 if (handler == nullptr) {
835 HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
836 continue;
837 }
838 auto callback = [destDeviceId, missionInfos, callerInfo, this] () {
839 NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfos, callerInfo);
840 };
841 if (!handler->PostTask(callback)) {
842 HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
843 return ERR_NULL_OBJECT;
844 }
845 }
846
847 return ERR_NONE;
848 }
849
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfos,const CallerInfo & callerInfo)850 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
851 const std::vector<DstbMissionInfo>& missionInfos, const CallerInfo& callerInfo)
852 {
853 sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
854 if (remoteDms == nullptr) {
855 HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
856 return;
857 }
858 int64_t begin = GetTickCount();
859 int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfos, callerInfo);
860 HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
861 result, GetTickCount() - begin);
862 }
863
GenerateCallerInfo(CallerInfo & callerInfo)864 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
865 {
866 std::string localUuid;
867 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
868 HILOGE("get local uuid failed!");
869 return false;
870 }
871 callerInfo.uid = IPCSkeleton::GetCallingUid();
872 callerInfo.pid = IPCSkeleton::GetCallingPid();
873 callerInfo.callerType = CALLER_TYPE_HARMONY;
874 callerInfo.sourceDeviceId = localUuid;
875 callerInfo.dmsVersion = VERSION;
876 return true;
877 }
878
FetchDeviceHandler(const std::string & deviceId)879 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
880 const std::string& deviceId)
881 {
882 if (!IsDeviceIdValidated(deviceId)) {
883 HILOGW("FetchDeviceHandler device:%{public}s offline.",
884 DnetworkAdapter::AnonymizeNetworkId(deviceId).c_str());
885 return nullptr;
886 }
887
888 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
889 if (uuid.empty()) {
890 HILOGE("FetchDeviceHandler uuid empty!");
891 return nullptr;
892 }
893
894 auto iter = deviceHandle_.find(uuid);
895 if (iter != deviceHandle_.end()) {
896 return iter->second;
897 }
898
899 auto anonyUuid = DnetworkAdapter::AnonymizeNetworkId(uuid);
900 auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
901 auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
902 deviceHandle_.emplace(uuid, handler);
903 return handler;
904 }
905
OnRemoteDied(const wptr<IRemoteObject> & remote)906 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
907 {
908 HILOGI("OnRemoteDied received died notify!");
909 DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
910 }
911
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)912 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
913 {
914 sptr<IRemoteObject> diedRemoted = remote.promote();
915 if (diedRemoted == nullptr) {
916 HILOGW("OnRemoteDmsDied promote failed!");
917 return;
918 }
919 HILOGD("delete diedRemoted");
920 auto remoteDmsDiedFunc = [this, diedRemoted]() {
921 OnRemoteDmsDied(diedRemoted);
922 };
923 if (missionHandler_ != nullptr) {
924 missionHandler_->PostTask(remoteDmsDiedFunc);
925 }
926 }
927
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)928 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
929 const std::string& localDevId, int32_t retryTimes)
930 {
931 auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
932 bool ret = HasSyncListener(dstDeviceId);
933 if (!ret) {
934 return;
935 }
936 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
937 if (remoteDms == nullptr) {
938 HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
939 RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
940 return;
941 }
942 int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
943 HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
944 };
945 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
946 missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
947 }
948 }
949
OnMissionListenerDied(const sptr<IRemoteObject> & remote)950 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
951 {
952 HILOGI("OnMissionListenerDied");
953 std::set<std::string> deviceSet;
954 {
955 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
956 auto iterItem = listenDeviceMap_.begin();
957 while (iterItem != listenDeviceMap_.end()) {
958 auto& listenerInfo = iterItem->second;
959 auto ret = listenerInfo.Find(remote);
960 if (!ret) {
961 ++iterItem;
962 continue;
963 }
964 remote->RemoveDeathRecipient(listenerDeath_);
965 listenerInfo.Erase(remote);
966 if (listenerInfo.Empty()) {
967 if (listenerInfo.called) {
968 deviceSet.emplace(Str16ToStr8(iterItem->first));
969 }
970 iterItem = listenDeviceMap_.erase(iterItem);
971 } else {
972 ++iterItem;
973 }
974 }
975 }
976 for (auto& devId : deviceSet) {
977 StopSyncRemoteMissions(devId, false);
978 }
979 }
980
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)981 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
982 {
983 HILOGI("OnRemoteDmsDied");
984 std::string devId;
985 {
986 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
987 for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
988 if (iter->second->AsObject() == remote) {
989 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
990 devId = iter->first;
991 remoteDmsMap_.erase(iter);
992 break;
993 }
994 }
995 }
996 if (devId.empty()) {
997 return;
998 }
999 bool ret = HasSyncListener(devId);
1000 if (ret) {
1001 std::string localDeviceId;
1002 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1003 return;
1004 }
1005 RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1006 }
1007 }
1008
NotifyDmsProxyProcessDied()1009 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1010 {
1011 HILOGI("NotifyDmsProxyProcessDied!");
1012 if (!isRegMissionChange_) {
1013 return;
1014 }
1015 RetryRegisterMissionChange(0);
1016 }
1017
RetryRegisterMissionChange(int32_t retryTimes)1018 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1019 {
1020 auto remoteDiedFunc = [this, retryTimes]() {
1021 HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1022 if (!isRegMissionChange_) {
1023 return;
1024 }
1025 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1026 if (ret == ERR_NULL_OBJECT) {
1027 RetryRegisterMissionChange(retryTimes + 1);
1028 HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1029 return;
1030 }
1031 HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1032 };
1033 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1034 missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1035 }
1036 }
1037
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfos)1038 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfos)
1039 {
1040 for (auto iter = missionInfos.begin(); iter != missionInfos.end(); iter++) {
1041 ErrCode errCode = MissionSnapshotChanged(iter->id);
1042 if (errCode != ERR_OK) {
1043 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1044 }
1045 }
1046 }
1047
MissionSnapshotChanged(int32_t missionId)1048 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1049 {
1050 std::string networkId;
1051 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1052 HILOGE("get local networkId failed!");
1053 return INVALID_PARAMETERS_ERR;
1054 }
1055 AAFwk::MissionSnapshot missionSnapshot;
1056 ErrCode errCode = DistributedSchedAdapter::GetInstance()
1057 .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1058 if (errCode != ERR_OK) {
1059 HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1060 return errCode;
1061 }
1062 Snapshot snapshot;
1063 SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1064 MessageParcel data;
1065 errCode = MissionSnapshotSequence(snapshot, data);
1066 if (errCode != ERR_OK) {
1067 HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1068 return errCode;
1069 }
1070 size_t len = data.GetReadableBytes();
1071 const uint8_t* byteStream = data.ReadBuffer(len);
1072 errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1073 return errCode;
1074 }
1075
MissionSnapshotDestroyed(int32_t missionId)1076 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1077 {
1078 std::string networkId;
1079 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1080 HILOGE("get local networkId failed!");
1081 return INVALID_PARAMETERS_ERR;
1082 }
1083 ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1084 return errCode;
1085 }
1086
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1087 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1088 {
1089 bool ret = snapshot.WriteSnapshotInfo(data);
1090 if (!ret) {
1091 HILOGE("WriteSnapshotInfo failed!");
1092 return ERR_FLATTEN_OBJECT;
1093 }
1094 ret = snapshot.WritePixelMap(data);
1095 if (!ret) {
1096 HILOGE("WritePixelMap failed!");
1097 return ERR_FLATTEN_OBJECT;
1098 }
1099 return ERR_OK;
1100 }
1101
OnDnetDied()1102 void DistributedSchedMissionManager::OnDnetDied()
1103 {
1104 auto dnetDiedFunc = [this]() {
1105 HILOGI("OnDnetDied");
1106 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1107 if (!isRegMissionChange_) {
1108 return;
1109 }
1110 remoteSyncDeviceSet_.clear();
1111 DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1112 isRegMissionChange_ = false;
1113 };
1114 if (missionHandler_ != nullptr) {
1115 missionHandler_->PostTask(dnetDiedFunc);
1116 }
1117 }
1118 } // namespace DistributedSchedule
1119 } // namespace OHOS
1120