1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_sched_mission_manager.h"
17
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21
22 #include "datetime_ex.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "nlohmann/json.hpp"
26 #include "string_ex.h"
27 #include "system_ability_definition.h"
28
29 #include "distributed_sched_adapter.h"
30 #include "distributed_sched_utils.h"
31 #include "dtbschedmgr_device_info_storage.h"
32 #include "dtbschedmgr_log.h"
33 #include "mission/mission_changed_notify.h"
34 #include "mission/mission_constant.h"
35 #include "mission/mission_info_converter.h"
36 #include "mission/snapshot_converter.h"
37
38 namespace OHOS {
39 namespace DistributedSchedule {
40 namespace {
41 const std::string TAG = "DistributedSchedMissionManager";
42 constexpr size_t MAX_CACHED_ITEM = 10;
43 constexpr int32_t MAX_RETRY_TIMES = 15;
44 constexpr int32_t RETRY_DELAYED = 2000;
45 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
46 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
47 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
48 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
49 constexpr int32_t NET_STATE = 0;
50 }
51 namespace Mission {
52 constexpr int32_t GET_MAX_MISSIONS = 20;
53 } // Mission
54 using namespace std::chrono;
55 using namespace Constants::Mission;
56 using namespace OHOS::DistributedKv;
57
58 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
59
Init()60 void DistributedSchedMissionManager::Init()
61 {
62 listenerDeath_ = new ListenerDeathRecipient();
63 remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
64 auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
65 missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
66 auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
67 updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
68 missonChangeListener_ = new DistributedMissionChangeListener();
69 auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
70 missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
71 }
72
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)73 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
74 int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
75 {
76 HILOGI("start!");
77 if (!IsDeviceIdValidated(deviceId)) {
78 return INVALID_PARAMETERS_ERR;
79 }
80 if (numMissions <= 0) {
81 HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
82 return INVALID_PARAMETERS_ERR;
83 }
84 std::vector<DstbMissionInfo> dstbMissionInfoSet;
85 int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
86 if (ret != ERR_OK) {
87 HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
88 return ret;
89 }
90 return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
91 }
92
GetRemoteDms(const std::string & deviceId)93 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
94 {
95 if (deviceId.empty()) {
96 HILOGE("GetRemoteDms remoteDeviceId is empty");
97 return nullptr;
98 }
99 int64_t begin = GetTickCount();
100 {
101 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
102 auto iter = remoteDmsMap_.find(deviceId);
103 if (iter != remoteDmsMap_.end()) {
104 auto object = iter->second;
105 if (object != nullptr) {
106 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
107 GetTickCount() - begin);
108 return object;
109 }
110 }
111 }
112 HILOGD("GetRemoteDms connect deviceid is %s", GetAnonymStr(deviceId).c_str());
113 auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
114 if (samgr == nullptr) {
115 HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
116 return nullptr;
117 }
118 auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
119 if (object == nullptr) {
120 HILOGE("GetRemoteDms failed to get dms for remote device: %{public}s!", GetAnonymStr(deviceId).c_str());
121 return nullptr;
122 }
123 auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
124 HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
125 sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
126 {
127 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
128 auto iter = remoteDmsMap_.find(deviceId);
129 if (iter != remoteDmsMap_.end()) {
130 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
131 }
132 remoteDmsMap_[deviceId] = remoteDmsObj;
133 }
134 HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
135 return remoteDmsObj;
136 }
137
IsDeviceIdValidated(const std::string & deviceId)138 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
139 {
140 if (deviceId.empty()) {
141 HILOGE("IsDeviceIdValidated deviceId is empty!");
142 return false;
143 }
144 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
145 HILOGW("IsDeviceIdValidated device offline.");
146 return false;
147 }
148
149 return true;
150 }
151
NotifyRemoteDied(const wptr<IRemoteObject> & remote)152 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
153 {
154 if (distributedDataStorage_ == nullptr) {
155 HILOGE("DistributedDataStorage null!");
156 return;
157 }
158 distributedDataStorage_->NotifyRemoteDied(remote);
159 }
160
NotifyNetDisconnectOffline()161 void DistributedSchedMissionManager::NotifyNetDisconnectOffline()
162 {
163 HILOGD("NotifyNetDisconnectOffline start.");
164 {
165 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
166 if (listenDeviceMap_.empty()) {
167 HILOGI("The connect is null!");
168 return;
169 }
170 for (auto& listenDevice : listenDeviceMap_) {
171 std::u16string devId = listenDevice.first;
172 std::set<sptr<IRemoteObject>> listenerSet = listenDevice.second.listenerSet;
173 for (auto connect : listenerSet) {
174 MissionChangedNotify::NotifyNetDisconnect(connect, devId, NET_STATE);
175 }
176 }
177 }
178 HILOGD("NotifyNetDisconnectOffline end.");
179 }
180
InitDataStorage()181 int32_t DistributedSchedMissionManager::InitDataStorage()
182 {
183 if (distributedDataStorage_ == nullptr) {
184 distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
185 }
186 if (!distributedDataStorage_->Init()) {
187 HILOGE("InitDataStorage DistributedDataStorage init failed!");
188 return ERR_NULL_OBJECT;
189 }
190 return ERR_NONE;
191 }
192
StopDataStorage()193 int32_t DistributedSchedMissionManager::StopDataStorage()
194 {
195 if (distributedDataStorage_ == nullptr) {
196 HILOGE("StopDataStorage DistributedDataStorage null!");
197 return ERR_NULL_OBJECT;
198 }
199 if (!distributedDataStorage_->Stop()) {
200 HILOGE("StopDataStorage DistributedDataStorage stop failed!");
201 return ERR_NULL_OBJECT;
202 }
203 return ERR_NONE;
204 }
205
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)206 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
207 const uint8_t* byteStream, size_t len)
208 {
209 if (distributedDataStorage_ == nullptr) {
210 HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
211 return ERR_NULL_OBJECT;
212 }
213 if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
214 HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
215 return INVALID_PARAMETERS_ERR;
216 }
217 return ERR_NONE;
218 }
219
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)220 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
221 {
222 if (distributedDataStorage_ == nullptr) {
223 HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
224 return ERR_NULL_OBJECT;
225 }
226 if (!distributedDataStorage_->Delete(deviceId, missionId)) {
227 HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
228 return INVALID_PARAMETERS_ERR;
229 }
230 return ERR_NONE;
231 }
232
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)233 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
234 std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
235 {
236 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
237 if (uuid.empty()) {
238 HILOGE("uuid is empty!");
239 return INVALID_PARAMETERS_ERR;
240 }
241 std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
242 if (snapshotPtr != nullptr) {
243 HILOGI("Get snapshot from cache success, uuid: %{public}s, missionId: %{public}d.",
244 GetAnonymStr(uuid).c_str(), missionId);
245 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
246 return ERR_NONE;
247 }
248 if (distributedDataStorage_ == nullptr) {
249 HILOGE("DistributedDataStorage is null!");
250 return ERR_NULL_OBJECT;
251 }
252 DistributedKv::Value value;
253 bool ret = distributedDataStorage_->Query(networkId, missionId, value);
254 if (!ret) {
255 HILOGE("DistributedDataStorage query failed!");
256 return INVALID_PARAMETERS_ERR;
257 }
258 snapshotPtr = Snapshot::Create(value.Data());
259 if (snapshotPtr == nullptr) {
260 HILOGE("snapshot create failed!");
261 return ERR_NULL_OBJECT;
262 }
263 HILOGI("Get snapshot from DistributedDB success, uuid: %{public}s, missionId: %{public}d.",
264 GetAnonymStr(uuid).c_str(), missionId);
265 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
266 return ERR_NONE;
267 }
268
DeviceOnlineNotify(const std::string & networkId)269 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
270 {
271 if (networkId.empty()) {
272 HILOGW("DeviceOnlineNotify networkId empty!");
273 return;
274 }
275
276 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
277 if (missionHandler_ != nullptr) {
278 HILOGI("DeviceOnlineNotify RemoveTask");
279 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
280 }
281 }
282
DeviceOfflineNotify(const std::string & networkId)283 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
284 {
285 if (networkId.empty()) {
286 HILOGW("DeviceOfflineNotify networkId empty!");
287 return;
288 }
289 StopSyncMissionsFromRemote(networkId);
290 CleanMissionResources(networkId);
291 {
292 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
293 auto iter = remoteDmsMap_.find(networkId);
294 if (iter != remoteDmsMap_.end()) {
295 if (iter->second == nullptr) {
296 return;
297 }
298 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
299 remoteDmsMap_.erase(iter);
300 }
301 }
302 HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s.", GetAnonymStr(networkId).c_str());
303 }
304
DeleteDataStorage(const std::string & deviceId,bool isDelayed)305 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
306 {
307 if (distributedDataStorage_ == nullptr) {
308 HILOGE("DeleteDataStorage DistributedDataStorage null!");
309 return;
310 }
311 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
312 auto callback = [this, uuid, deviceId]() {
313 if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
314 HILOGE("DeleteDataStorage storage delete failed!");
315 } else {
316 HILOGI("DeleteDataStorage storage delete successfully!");
317 }
318 };
319 if (isDelayed) {
320 if (missionHandler_ != nullptr) {
321 HILOGI("DeleteDataStorage PostTask");
322 missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
323 }
324 } else {
325 if (missionHandler_ != nullptr) {
326 HILOGI("DeleteDataStorage RemoveTask");
327 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
328 }
329 callback();
330 }
331 }
332
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)333 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
334 const sptr<IRemoteObject>& listener)
335 {
336 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
337 if (uuid.empty()) {
338 HILOGE("uuid is empty!");
339 return INVALID_PARAMETERS_ERR;
340 }
341 if (missionHandler_ != nullptr) {
342 HILOGI("RemoveTask");
343 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
344 }
345 if (listener == nullptr) {
346 return INVALID_PARAMETERS_ERR;
347 }
348 std::string localDeviceId;
349 std::string remoteDeviceId = Str16ToStr8(devId);
350 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
351 || localDeviceId == remoteDeviceId) {
352 HILOGE("check deviceId failed!");
353 return INVALID_PARAMETERS_ERR;
354 }
355 {
356 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
357 auto& listenerInfo = listenDeviceMap_[devId];
358 if (!listenerInfo.Emplace(listener)) {
359 HILOGW("RegisterSyncListener listener has already inserted!");
360 return ERR_NONE;
361 }
362 bool ret = listener->AddDeathRecipient(listenerDeath_);
363 if (!ret) {
364 HILOGW("RegisterSyncListener AddDeathRecipient failed!");
365 }
366 if (listenerInfo.Size() > 1) {
367 HILOGI("RegisterMissionListener not notify remote DMS!");
368 return ERR_NONE;
369 }
370 }
371 return ERR_NONE;
372 }
373
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId)374 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
375 const std::string& localDevId)
376 {
377 std::u16string devId = Str8ToStr16(dstDevId);
378 {
379 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
380 auto iterItem = listenDeviceMap_.find(devId);
381 if (iterItem == listenDeviceMap_.end()) {
382 return ERR_NONE;
383 }
384 bool callFlag = iterItem->second.called;
385 if (callFlag) {
386 HILOGI("StartSyncRemoteMissions already called!");
387 return ERR_NONE;
388 }
389 }
390 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
391 if (remoteDms == nullptr) {
392 HILOGE("get remoteDms failed!");
393 RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
394 return GET_REMOTE_DMS_FAIL;
395 }
396 int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms);
397 if (ret == ERR_NONE) {
398 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
399 auto iterItem = listenDeviceMap_.find(devId);
400 if (iterItem != listenDeviceMap_.end()) {
401 iterItem->second.called = true;
402 }
403 }
404 return ret;
405 }
406
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms)407 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
408 const sptr<IDistributedSched>& remoteDms)
409 {
410 if (remoteDms == nullptr) {
411 HILOGE("remoteDms is null");
412 return INVALID_PARAMETERS_ERR;
413 }
414 std::vector<DstbMissionInfo> missionInfos;
415 CallerInfo callerInfo;
416 if (!GenerateCallerInfo(callerInfo)) {
417 return GET_LOCAL_DEVICE_ERR;
418 }
419 int64_t begin = GetTickCount();
420 int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
421 HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
422 ret, GetTickCount() - begin);
423 if (ret == ERR_NONE) {
424 RebornMissionCache(dstDevId, missionInfos);
425 }
426 return ret;
427 }
428
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)429 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
430 const sptr<IRemoteObject>& listener)
431 {
432 if (listener == nullptr) {
433 return INVALID_PARAMETERS_ERR;
434 }
435 if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
436 return INVALID_PARAMETERS_ERR;
437 }
438 std::string localDeviceId;
439 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
440 || localDeviceId == Str16ToStr8(devId)) {
441 HILOGE("check deviceId fail");
442 return INVALID_PARAMETERS_ERR;
443 }
444 {
445 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
446 auto iterItem = listenDeviceMap_.find(devId);
447 if (iterItem == listenDeviceMap_.end()) {
448 return ERR_NONE;
449 }
450 auto& listenerInfo = iterItem->second;
451 auto ret = listenerInfo.Find(listener);
452 if (!ret) {
453 HILOGI("listener not registered!");
454 return ERR_NONE;
455 }
456 listener->RemoveDeathRecipient(listenerDeath_);
457 listenerInfo.Erase(listener);
458 if (!listenerInfo.Empty()) {
459 return ERR_NONE;
460 }
461 listenDeviceMap_.erase(iterItem);
462 }
463 return ERR_NONE;
464 }
465
CleanMissionResources(const std::string & networkId)466 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
467 {
468 {
469 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
470 auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
471 if (iterDevice == listenDeviceMap_.end()) {
472 return;
473 }
474 auto& listenerInfo = iterDevice->second;
475 for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
476 if (listener != nullptr) {
477 listener->RemoveDeathRecipient(listenerDeath_);
478 }
479 }
480 listenDeviceMap_.erase(iterDevice);
481 }
482 StopSyncRemoteMissions(networkId, true);
483 }
484
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)485 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
486 bool offline, bool exit)
487 {
488 CleanMissionCache(dstDevId);
489 DeleteCachedSnapshotInfo(dstDevId);
490 DeleteDataStorage(dstDevId, true);
491
492 if (offline) {
493 return ERR_NONE;
494 }
495 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
496 if (remoteDms == nullptr) {
497 HILOGE("DMS get remoteDms failed");
498 return GET_REMOTE_DMS_FAIL;
499 }
500
501 CallerInfo callerInfo;
502 if (!GenerateCallerInfo(callerInfo)) {
503 return GET_LOCAL_DEVICE_ERR;
504 }
505 int64_t begin = GetTickCount();
506 int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
507 HILOGI("[PerformanceTest] ret: %{public}d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
508 return ret;
509 }
510
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag)511 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
512 int64_t tag)
513 {
514 std::string localDeviceId;
515 if (!IsDeviceIdValidated(dstDevId)) {
516 return INVALID_PARAMETERS_ERR;
517 }
518 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
519 || (dstDevId == localDeviceId)) {
520 HILOGE("check deviceId fail");
521 return INVALID_PARAMETERS_ERR;
522 }
523 HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
524 GetAnonymStr(dstDevId).c_str(), GetAnonymStr(localDeviceId).c_str());
525 auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId);
526 if (ret != ERR_NONE) {
527 HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
528 return ret;
529 }
530 return ERR_NONE;
531 }
532
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)533 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
534 std::vector<DstbMissionInfo>& missionInfoSet)
535 {
536 auto deviceId = callerInfo.sourceDeviceId;
537 HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
538 {
539 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
540 remoteSyncDeviceSet_.emplace(deviceId);
541 }
542 int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
543 missionInfoSet);
544 auto func = [this, missionInfoSet]() {
545 HILOGD("RegisterMissionListener called.");
546 if (!isRegMissionChange_) {
547 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
548 if (ret == ERR_OK) {
549 isRegMissionChange_ = true;
550 }
551 InitAllSnapshots(missionInfoSet);
552 }
553 };
554 if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
555 HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
556 }
557 return result;
558 }
559
StopSyncMissionsFromRemote(const std::string & networkId)560 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
561 {
562 HILOGD(" %{private}s!", GetAnonymStr(networkId).c_str());
563 {
564 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
565 remoteSyncDeviceSet_.erase(networkId);
566 if (remoteSyncDeviceSet_.empty()) {
567 auto func = [this]() {
568 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
569 if (ret == ERR_OK) {
570 isRegMissionChange_ = false;
571 }
572 };
573 if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
574 HILOGE("post UnRegisterMissionListener Task failed");
575 }
576 }
577 }
578 }
579
NeedSyncDevice(const std::string & deviceId)580 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
581 {
582 if (deviceId.empty()) {
583 HILOGD("deviceId empty!");
584 return false;
585 }
586 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
587 if (remoteSyncDeviceSet_.count(deviceId) == 0) {
588 return false;
589 }
590 return true;
591 }
592
HasSyncListener(const std::string & networkId)593 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
594 {
595 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
596 auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
597 if (iter != listenDeviceMap_.end()) {
598 return iter->second.called;
599 }
600 return false;
601 }
602
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)603 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
604 {
605 std::u16string u16DevId = Str8ToStr16(networkId);
606 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
607 auto iter = listenDeviceMap_.find(u16DevId);
608 if (iter == listenDeviceMap_.end()) {
609 return;
610 }
611 auto& listenerInfo = iter->second;
612 for (auto& listener : listenerInfo.listenerSet) {
613 MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
614 }
615 }
616
OnRemoteDied(const wptr<IRemoteObject> & remote)617 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
618 {
619 HILOGD("OnRemoteDied!");
620 sptr<IRemoteObject> listener = remote.promote();
621 if (listener == nullptr) {
622 HILOGW("listener is null");
623 return;
624 }
625 auto remoteDiedFunc = [this, listener]() {
626 OnMissionListenerDied(listener);
627 };
628 if (missionHandler_ != nullptr) {
629 missionHandler_->PostTask(remoteDiedFunc);
630 }
631 }
632
OnRemoteDied(const wptr<IRemoteObject> & remote)633 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
634 {
635 DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
636 }
637
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)638 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
639 std::unique_ptr<Snapshot> snapshot)
640 {
641 if (deviceId.empty() || snapshot == nullptr) {
642 HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
643 return;
644 }
645 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
646 std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
647 auto iter = cachedSnapshotInfos_.find(keyInfo);
648 if (iter != cachedSnapshotInfos_.end()) {
649 if (iter->second == nullptr) {
650 HILOGE("snapshotInfo is null");
651 return;
652 }
653 if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
654 return;
655 }
656 }
657
658 if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
659 int64_t oldest = -1;
660 auto iterOldest = cachedSnapshotInfos_.end();
661 for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
662 if (iterItem->second == nullptr) {
663 HILOGE("snapshotInfo is null");
664 continue;
665 }
666 if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
667 oldest = iterItem->second->GetLastAccessTime();
668 iterOldest = iterItem;
669 }
670 }
671 if (iterOldest != cachedSnapshotInfos_.end()) {
672 cachedSnapshotInfos_.erase(iterOldest);
673 }
674 }
675 cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
676 }
677
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)678 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
679 int32_t missionId)
680 {
681 if (deviceId.empty()) {
682 HILOGW("DequeueCachedSnapshotInfo invalid input param!");
683 return nullptr;
684 }
685 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
686 auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
687 if (iter != cachedSnapshotInfos_.end()) {
688 std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
689 if (snapshot == nullptr) {
690 HILOGE("snapshot is null");
691 return nullptr;
692 }
693 snapshot->UpdateLastAccessTime(GetTickCount());
694 iter->second = nullptr;
695 cachedSnapshotInfos_.erase(iter);
696 return snapshot;
697 }
698 return nullptr;
699 }
700
DeleteCachedSnapshotInfo(const std::string & networkId)701 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
702 {
703 if (networkId.empty()) {
704 HILOGW("networkId empty!");
705 return;
706 }
707 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
708 if (uuid.empty()) {
709 HILOGW("uuid empty!");
710 return;
711 }
712 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
713 auto iter = cachedSnapshotInfos_.begin();
714 while (iter != cachedSnapshotInfos_.end()) {
715 if (iter->first.find(uuid) != std::string::npos) {
716 iter = cachedSnapshotInfos_.erase(iter);
717 } else {
718 ++iter;
719 }
720 }
721 }
722
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)723 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
724 std::vector<DstbMissionInfo>& missionInfoSet)
725 {
726 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
727 if (uuid.empty()) {
728 HILOGE("uuid empty!");
729 return INVALID_PARAMETERS_ERR;
730 }
731 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
732 auto iter = deviceMissionInfos_.find(uuid);
733 if (iter == deviceMissionInfos_.end()) {
734 HILOGE("can not find uuid, deviceId: %{public}s!", GetAnonymStr(srcId).c_str());
735 return ERR_NULL_OBJECT;
736 }
737
738 // get at most numMissions missions
739 int32_t actualNums = static_cast<int32_t>((iter->second).size());
740 if (actualNums < 0) {
741 HILOGE("invalid size!");
742 return ERR_NULL_OBJECT;
743 }
744 missionInfoSet.assign((iter->second).begin(),
745 (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
746 return ERR_NONE;
747 }
748
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)749 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
750 const std::vector<DstbMissionInfo>& missionInfoSet)
751 {
752 HILOGI("start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
753 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
754 if (uuid.empty()) {
755 HILOGE("uuid empty!");
756 return;
757 }
758 {
759 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
760 deviceMissionInfos_[uuid] = missionInfoSet;
761 }
762 HILOGI("RebornMissionCache end!");
763 }
764
CleanMissionCache(const std::string & deviceId)765 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
766 {
767 HILOGI("CleanMissionCache start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
768 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
769 if (uuid.empty()) {
770 HILOGE("CleanMissionCache uuid empty!");
771 return;
772 }
773 {
774 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
775 deviceMissionInfos_.erase(uuid);
776 }
777 HILOGI("CleanMissionCache end!");
778 }
779
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)780 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
781 const std::vector<DstbMissionInfo>& missionInfoSet)
782 {
783 HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
784 std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
785 RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
786 {
787 HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
788 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
789 auto iter = listenDeviceMap_.find(u16DevId);
790 if (iter == listenDeviceMap_.end()) {
791 HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
792 return INVALID_PARAMETERS_ERR;
793 }
794 auto& listenerSet = iter->second.listenerSet;
795 auto notifyChanged = [listenerSet, u16DevId] () {
796 for (const auto& listener : listenerSet) {
797 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
798 }
799 };
800 if (missionHandler_ != nullptr) {
801 missionHandler_->PostTask(notifyChanged);
802 HILOGI("NotifyMissionsChangedFromRemote end!");
803 return ERR_NONE;
804 }
805 }
806 return INVALID_PARAMETERS_ERR;
807 }
808
NotifyLocalMissionsChanged()809 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
810 {
811 auto func = [this]() {
812 HILOGI("NotifyLocalMissionsChanged");
813 std::vector<DstbMissionInfo> missionInfos;
814 int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
815 missionInfos);
816 if (ret == ERR_OK) {
817 int32_t result = NotifyMissionsChangedToRemote(missionInfos);
818 HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
819 }
820 };
821 if (missionChangeHandler_ == nullptr) {
822 HILOGE("missionChangeHandler_ is null");
823 return;
824 }
825 if (!missionChangeHandler_->PostTask(func)) {
826 HILOGE("postTask failed");
827 }
828 }
829
NotifyMissionSnapshotCreated(int32_t missionId)830 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
831 {
832 auto func = [this, missionId]() {
833 HILOGD("called.");
834 ErrCode errCode = MissionSnapshotChanged(missionId);
835 if (errCode != ERR_OK) {
836 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
837 }
838 };
839 if (missionChangeHandler_ == nullptr) {
840 HILOGE("missionChangeHandler_ is null");
841 return;
842 }
843 if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
844 HILOGE("post MissionSnapshotChanged delay Task failed");
845 }
846 }
847
NotifyMissionSnapshotChanged(int32_t missionId)848 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
849 {
850 auto func = [this, missionId]() {
851 HILOGD("called.");
852 ErrCode errCode = MissionSnapshotChanged(missionId);
853 if (errCode != ERR_OK) {
854 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
855 }
856 };
857 if (missionChangeHandler_ == nullptr) {
858 HILOGE("missionChangeHandler_ is null");
859 return;
860 }
861 if (!missionChangeHandler_->PostTask(func)) {
862 HILOGE("post MissionSnapshotChanged Task failed");
863 }
864 }
865
NotifyMissionSnapshotDestroyed(int32_t missionId)866 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
867 {
868 auto func = [this, missionId]() {
869 HILOGD("called.");
870 ErrCode errCode = MissionSnapshotDestroyed(missionId);
871 if (errCode != ERR_OK) {
872 HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
873 }
874 };
875 if (missionChangeHandler_ == nullptr) {
876 HILOGE("missionChangeHandler_ is null");
877 return;
878 }
879 if (!missionChangeHandler_->PostTask(func)) {
880 HILOGE("post MissionSnapshotDestroyed Task failed");
881 }
882 }
883
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)884 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
885 const std::vector<DstbMissionInfo> &missionInfoSet)
886 {
887 CallerInfo callerInfo;
888 if (!GenerateCallerInfo(callerInfo)) {
889 return GET_LOCAL_DEVICE_ERR;
890 }
891 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
892 for (const auto& destDeviceId : remoteSyncDeviceSet_) {
893 auto handler = FetchDeviceHandler(destDeviceId);
894 if (handler == nullptr) {
895 HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
896 continue;
897 }
898 auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
899 NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
900 };
901 if (!handler->PostTask(callback)) {
902 HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
903 return ERR_NULL_OBJECT;
904 }
905 }
906
907 return ERR_NONE;
908 }
909
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)910 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
911 const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
912 {
913 sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
914 if (remoteDms == nullptr) {
915 HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
916 return;
917 }
918 int64_t begin = GetTickCount();
919 int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
920 HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
921 result, GetTickCount() - begin);
922 }
923
GenerateCallerInfo(CallerInfo & callerInfo)924 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
925 {
926 std::string localUuid;
927 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
928 HILOGE("get local uuid failed!");
929 return false;
930 }
931 callerInfo.uid = IPCSkeleton::GetCallingUid();
932 callerInfo.pid = IPCSkeleton::GetCallingRealPid();
933 callerInfo.callerType = CALLER_TYPE_HARMONY;
934 callerInfo.sourceDeviceId = localUuid;
935 callerInfo.dmsVersion = VERSION;
936 return true;
937 }
938
FetchDeviceHandler(const std::string & deviceId)939 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
940 const std::string& deviceId)
941 {
942 if (!IsDeviceIdValidated(deviceId)) {
943 HILOGW("FetchDeviceHandler device:%{public}s offline.", GetAnonymStr(deviceId).c_str());
944 return nullptr;
945 }
946
947 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
948 if (uuid.empty()) {
949 HILOGE("FetchDeviceHandler uuid empty!");
950 return nullptr;
951 }
952
953 auto iter = deviceHandle_.find(uuid);
954 if (iter != deviceHandle_.end()) {
955 return iter->second;
956 }
957
958 auto anonyUuid = GetAnonymStr(uuid);
959 auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
960 auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
961 deviceHandle_.emplace(uuid, handler);
962 return handler;
963 }
964
OnRemoteDied(const wptr<IRemoteObject> & remote)965 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
966 {
967 HILOGI("OnRemoteDied received died notify!");
968 DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
969 }
970
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)971 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
972 {
973 sptr<IRemoteObject> diedRemoted = remote.promote();
974 if (diedRemoted == nullptr) {
975 HILOGW("OnRemoteDmsDied promote failed!");
976 return;
977 }
978 HILOGD("delete diedRemoted");
979 auto remoteDmsDiedFunc = [this, diedRemoted]() {
980 OnRemoteDmsDied(diedRemoted);
981 };
982 if (missionHandler_ != nullptr) {
983 missionHandler_->PostTask(remoteDmsDiedFunc);
984 }
985 }
986
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)987 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
988 const std::string& localDevId, int32_t retryTimes)
989 {
990 auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
991 bool ret = HasSyncListener(dstDeviceId);
992 if (!ret) {
993 return;
994 }
995 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
996 if (remoteDms == nullptr) {
997 HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
998 RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
999 return;
1000 }
1001 int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
1002 HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
1003 };
1004 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1005 missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
1006 }
1007 }
1008
OnMissionListenerDied(const sptr<IRemoteObject> & remote)1009 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
1010 {
1011 HILOGI("OnMissionListenerDied");
1012 std::set<std::string> deviceSet;
1013 {
1014 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
1015 auto iterItem = listenDeviceMap_.begin();
1016 while (iterItem != listenDeviceMap_.end()) {
1017 auto& listenerInfo = iterItem->second;
1018 auto ret = listenerInfo.Find(remote);
1019 if (!ret) {
1020 ++iterItem;
1021 continue;
1022 }
1023 if (remote != nullptr) {
1024 remote->RemoveDeathRecipient(listenerDeath_);
1025 }
1026 listenerInfo.Erase(remote);
1027 if (listenerInfo.Empty()) {
1028 if (listenerInfo.called) {
1029 deviceSet.emplace(Str16ToStr8(iterItem->first));
1030 }
1031 iterItem = listenDeviceMap_.erase(iterItem);
1032 } else {
1033 ++iterItem;
1034 }
1035 }
1036 }
1037 for (auto& devId : deviceSet) {
1038 StopSyncRemoteMissions(devId, false);
1039 }
1040 }
1041
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)1042 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
1043 {
1044 HILOGI("OnRemoteDmsDied");
1045 std::string devId;
1046 {
1047 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
1048 for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
1049 if (iter->second->AsObject() == remote && iter->second->AsObject() != nullptr) {
1050 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
1051 devId = iter->first;
1052 remoteDmsMap_.erase(iter);
1053 break;
1054 }
1055 }
1056 }
1057 if (devId.empty()) {
1058 return;
1059 }
1060 bool ret = HasSyncListener(devId);
1061 if (ret) {
1062 std::string localDeviceId;
1063 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1064 return;
1065 }
1066 RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1067 }
1068 }
1069
NotifyDmsProxyProcessDied()1070 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1071 {
1072 HILOGI("NotifyDmsProxyProcessDied!");
1073 if (!isRegMissionChange_) {
1074 return;
1075 }
1076 RetryRegisterMissionChange(0);
1077 }
1078
RetryRegisterMissionChange(int32_t retryTimes)1079 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1080 {
1081 auto remoteDiedFunc = [this, retryTimes]() {
1082 HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1083 if (!isRegMissionChange_) {
1084 return;
1085 }
1086 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1087 if (ret == ERR_NULL_OBJECT) {
1088 RetryRegisterMissionChange(retryTimes + 1);
1089 HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1090 return;
1091 }
1092 HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1093 };
1094 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1095 missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1096 }
1097 }
1098
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1099 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1100 {
1101 for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1102 ErrCode errCode = MissionSnapshotChanged(iter->id);
1103 if (errCode != ERR_OK) {
1104 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1105 }
1106 }
1107 }
1108
MissionSnapshotChanged(int32_t missionId)1109 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1110 {
1111 std::string networkId;
1112 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1113 HILOGE("get local networkId failed!");
1114 return INVALID_PARAMETERS_ERR;
1115 }
1116 AAFwk::MissionSnapshot missionSnapshot;
1117 ErrCode errCode = DistributedSchedAdapter::GetInstance()
1118 .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1119 if (errCode != ERR_OK) {
1120 HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1121 return errCode;
1122 }
1123 Snapshot snapshot;
1124 SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1125 MessageParcel data;
1126 errCode = MissionSnapshotSequence(snapshot, data);
1127 if (errCode != ERR_OK) {
1128 HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1129 return errCode;
1130 }
1131 size_t len = data.GetReadableBytes();
1132 const uint8_t* byteStream = data.ReadBuffer(len);
1133 if (byteStream == nullptr) {
1134 HILOGE("Failed to read length.");
1135 return INVALID_PARAMETERS_ERR;
1136 }
1137 errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1138 return errCode;
1139 }
1140
MissionSnapshotDestroyed(int32_t missionId)1141 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1142 {
1143 std::string networkId;
1144 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1145 HILOGE("get local networkId failed!");
1146 return INVALID_PARAMETERS_ERR;
1147 }
1148 ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1149 return errCode;
1150 }
1151
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1152 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1153 {
1154 bool ret = snapshot.WriteSnapshotInfo(data);
1155 if (!ret) {
1156 HILOGE("WriteSnapshotInfo failed!");
1157 return ERR_FLATTEN_OBJECT;
1158 }
1159 ret = snapshot.WritePixelMap(data);
1160 if (!ret) {
1161 HILOGE("WritePixelMap failed!");
1162 return ERR_FLATTEN_OBJECT;
1163 }
1164 return ERR_OK;
1165 }
1166
OnDnetDied()1167 void DistributedSchedMissionManager::OnDnetDied()
1168 {
1169 auto dnetDiedFunc = [this]() {
1170 HILOGI("OnDnetDied");
1171 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1172 if (!isRegMissionChange_) {
1173 return;
1174 }
1175 remoteSyncDeviceSet_.clear();
1176 DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1177 isRegMissionChange_ = false;
1178 };
1179 if (missionHandler_ != nullptr) {
1180 missionHandler_->PostTask(dnetDiedFunc);
1181 }
1182 }
1183 } // namespace DistributedSchedule
1184 } // namespace OHOS
1185