1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mission/distributed_sched_mission_manager.h"
17
18 #include <chrono>
19 #include <sys/time.h>
20 #include <unistd.h>
21
22 #include "datetime_ex.h"
23 #include "ipc_skeleton.h"
24 #include "iservice_registry.h"
25 #include "nlohmann/json.hpp"
26 #include "string_ex.h"
27 #include "system_ability_definition.h"
28
29 #include "distributed_sched_adapter.h"
30 #include "distributed_sched_utils.h"
31 #include "dtbschedmgr_device_info_storage.h"
32 #include "dtbschedmgr_log.h"
33 #include "mission/mission_changed_notify.h"
34 #include "mission/mission_constant.h"
35 #include "mission/mission_info_converter.h"
36 #include "mission/snapshot_converter.h"
37 #include "caller_info.h"
38 #include "os_account_manager.h"
39 #include "ohos_account_kits.h"
40
41 namespace OHOS {
42 namespace DistributedSchedule {
43 namespace {
44 const std::string TAG = "DistributedSchedMissionManager";
45 constexpr size_t MAX_CACHED_ITEM = 10;
46 constexpr int32_t MAX_RETRY_TIMES = 15;
47 constexpr int32_t RETRY_DELAYED = 2000;
48 constexpr int32_t GET_FOREGROUND_SNAPSHOT_DELAY_TIME = 800; // ms
49 const std::string DELETE_DATA_STORAGE = "DeleteDataStorage";
50 constexpr int32_t DELETE_DATA_STORAGE_DELAYED = 60000; // ms
51 const std::string INVAILD_LOCAL_DEVICE_ID = "-1";
52 constexpr int32_t NET_STATE = 0;
53 }
54 namespace Mission {
55 constexpr int32_t GET_MAX_MISSIONS = 20;
56 } // Mission
57 using namespace std::chrono;
58 using namespace Constants::Mission;
59 using namespace OHOS::DistributedKv;
60 using namespace DistributedHardware;
61 using AccountInfo = IDistributedSched::AccountInfo;
62
63 IMPLEMENT_SINGLE_INSTANCE(DistributedSchedMissionManager);
64
Init()65 void DistributedSchedMissionManager::Init()
66 {
67 listenerDeath_ = new ListenerDeathRecipient();
68 remoteDmsRecipient_ = new RemoteDmsDeathRecipient();
69 auto runner = AppExecFwk::EventRunner::Create("MissionManagerHandler");
70 missionHandler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
71 auto updateRunner = AppExecFwk::EventRunner::Create("UpdateHandler");
72 updateHandler_ = std::make_shared<AppExecFwk::EventHandler>(updateRunner);
73 missonChangeListener_ = new DistributedMissionChangeListener();
74 auto missionChangeRunner = AppExecFwk::EventRunner::Create("DistributedMissionChange");
75 missionChangeHandler_ = std::make_shared<AppExecFwk::EventHandler>(missionChangeRunner);
76 }
77
GetMissionInfos(const std::string & deviceId,int32_t numMissions,std::vector<AAFwk::MissionInfo> & missionInfoSet)78 int32_t DistributedSchedMissionManager::GetMissionInfos(const std::string& deviceId,
79 int32_t numMissions, std::vector<AAFwk::MissionInfo>& missionInfoSet)
80 {
81 HILOGI("start!");
82 if (!IsDeviceIdValidated(deviceId)) {
83 return INVALID_PARAMETERS_ERR;
84 }
85 if (numMissions <= 0) {
86 HILOGE("numMissions is illegal! numMissions:%{public}d", numMissions);
87 return INVALID_PARAMETERS_ERR;
88 }
89 std::vector<DstbMissionInfo> dstbMissionInfoSet;
90 int32_t ret = FetchCachedRemoteMissions(deviceId, numMissions, dstbMissionInfoSet);
91 if (ret != ERR_OK) {
92 HILOGE("FetchCachedRemoteMissions failed, ret = %{public}d", ret);
93 return ret;
94 }
95 return MissionInfoConverter::ConvertToMissionInfos(dstbMissionInfoSet, missionInfoSet);
96 }
97
GetRemoteDms(const std::string & deviceId)98 sptr<IDistributedSched> DistributedSchedMissionManager::GetRemoteDms(const std::string& deviceId)
99 {
100 if (deviceId.empty()) {
101 HILOGE("GetRemoteDms remoteDeviceId is empty");
102 return nullptr;
103 }
104 int64_t begin = GetTickCount();
105 {
106 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
107 auto iter = remoteDmsMap_.find(deviceId);
108 if (iter != remoteDmsMap_.end()) {
109 auto object = iter->second;
110 if (object != nullptr) {
111 HILOGI("[PerformanceTest] GetRemoteDms from cache spend %{public}" PRId64 " ms",
112 GetTickCount() - begin);
113 return object;
114 }
115 }
116 }
117 HILOGD("GetRemoteDms connect deviceid is %s", GetAnonymStr(deviceId).c_str());
118 auto samgr = SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
119 if (samgr == nullptr) {
120 HILOGE("GetRemoteDms failed to connect to systemAbilityMgr!");
121 return nullptr;
122 }
123 auto object = samgr->CheckSystemAbility(DISTRIBUTED_SCHED_SA_ID, deviceId);
124 if (object == nullptr) {
125 HILOGE("GetRemoteDms failed to get dms for remote device: %{public}s!", GetAnonymStr(deviceId).c_str());
126 return nullptr;
127 }
128 auto ret = object->AddDeathRecipient(remoteDmsRecipient_);
129 HILOGD("GetRemoteDms AddDeathRecipient ret : %{public}d", ret);
130 sptr<IDistributedSched> remoteDmsObj = iface_cast<IDistributedSched>(object);
131 {
132 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
133 auto iter = remoteDmsMap_.find(deviceId);
134 if (iter != remoteDmsMap_.end()) {
135 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
136 }
137 remoteDmsMap_[deviceId] = remoteDmsObj;
138 }
139 HILOGI("[PerformanceTest] GetRemoteDms spend %{public}" PRId64 " ms", GetTickCount() - begin);
140 return remoteDmsObj;
141 }
142
IsDeviceIdValidated(const std::string & deviceId)143 bool DistributedSchedMissionManager::IsDeviceIdValidated(const std::string& deviceId)
144 {
145 if (deviceId.empty()) {
146 HILOGE("IsDeviceIdValidated deviceId is empty!");
147 return false;
148 }
149 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(deviceId) == nullptr) {
150 HILOGW("IsDeviceIdValidated device offline.");
151 return false;
152 }
153
154 return true;
155 }
156
NotifyRemoteDied(const wptr<IRemoteObject> & remote)157 void DistributedSchedMissionManager::NotifyRemoteDied(const wptr<IRemoteObject>& remote)
158 {
159 if (distributedDataStorage_ == nullptr) {
160 HILOGE("DistributedDataStorage null!");
161 return;
162 }
163 distributedDataStorage_->NotifyRemoteDied(remote);
164 }
165
NotifyNetDisconnectOffline()166 void DistributedSchedMissionManager::NotifyNetDisconnectOffline()
167 {
168 HILOGD("NotifyNetDisconnectOffline start.");
169 {
170 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
171 if (listenDeviceMap_.empty()) {
172 HILOGI("The connect is null!");
173 return;
174 }
175 for (auto& listenDevice : listenDeviceMap_) {
176 std::u16string devId = listenDevice.first;
177 std::set<sptr<IRemoteObject>> listenerSet = listenDevice.second.listenerSet;
178 for (auto connect : listenerSet) {
179 MissionChangedNotify::NotifyNetDisconnect(connect, devId, NET_STATE);
180 }
181 }
182 }
183 HILOGD("NotifyNetDisconnectOffline end.");
184 }
185
GetOsAccountData(AccountInfo & dmsAccountInfo)186 bool DistributedSchedMissionManager::GetOsAccountData(AccountInfo& dmsAccountInfo)
187 {
188 #ifdef OS_ACCOUNT_PART
189 std::vector<int32_t> ids;
190 ErrCode ret = AccountSA::OsAccountManager::QueryActiveOsAccountIds(ids);
191 if (ret != ERR_OK || ids.empty()) {
192 HILOGE("Get userId from active Os AccountIds fail, ret : %{public}d", ret);
193 return false;
194 }
195 dmsAccountInfo.userId = ids[0];
196
197 AccountSA::OhosAccountInfo osAccountInfo;
198 ret = AccountSA::OhosAccountKits::GetInstance().GetOhosAccountInfo(osAccountInfo);
199 if (ret != 0 || osAccountInfo.uid_ == "") {
200 HILOGE("Get accountId from Ohos account info fail, ret: %{public}d.", ret);
201 return false;
202 }
203 dmsAccountInfo.activeAccountId = osAccountInfo.uid_;
204 #endif
205 return true;
206 }
207
InitDataStorage()208 int32_t DistributedSchedMissionManager::InitDataStorage()
209 {
210 if (distributedDataStorage_ == nullptr) {
211 distributedDataStorage_ = std::make_shared<DistributedDataStorage>();
212 }
213 if (!distributedDataStorage_->Init()) {
214 HILOGE("InitDataStorage DistributedDataStorage init failed!");
215 return ERR_NULL_OBJECT;
216 }
217 return ERR_NONE;
218 }
219
StopDataStorage()220 int32_t DistributedSchedMissionManager::StopDataStorage()
221 {
222 if (distributedDataStorage_ == nullptr) {
223 HILOGE("StopDataStorage DistributedDataStorage null!");
224 return ERR_NULL_OBJECT;
225 }
226 if (!distributedDataStorage_->Stop()) {
227 HILOGE("StopDataStorage DistributedDataStorage stop failed!");
228 return ERR_NULL_OBJECT;
229 }
230 return ERR_NONE;
231 }
232
StoreSnapshotInfo(const std::string & deviceId,int32_t missionId,const uint8_t * byteStream,size_t len)233 int32_t DistributedSchedMissionManager::StoreSnapshotInfo(const std::string& deviceId, int32_t missionId,
234 const uint8_t* byteStream, size_t len)
235 {
236 if (distributedDataStorage_ == nullptr) {
237 HILOGE("StoreSnapshotInfo DistributedDataStorage null!");
238 return ERR_NULL_OBJECT;
239 }
240 if (!distributedDataStorage_->Insert(deviceId, missionId, byteStream, len)) {
241 HILOGE("StoreSnapshotInfo DistributedDataStorage insert failed!");
242 return INVALID_PARAMETERS_ERR;
243 }
244 return ERR_NONE;
245 }
246
RemoveSnapshotInfo(const std::string & deviceId,int32_t missionId)247 int32_t DistributedSchedMissionManager::RemoveSnapshotInfo(const std::string& deviceId, int32_t missionId)
248 {
249 if (distributedDataStorage_ == nullptr) {
250 HILOGE("RemoveSnapshotInfo DistributedDataStorage null!");
251 return ERR_NULL_OBJECT;
252 }
253 if (!distributedDataStorage_->Delete(deviceId, missionId)) {
254 HILOGE("RemoveSnapshotInfo DistributedDataStorage delete failed!");
255 return INVALID_PARAMETERS_ERR;
256 }
257 return ERR_NONE;
258 }
259
GetRemoteMissionSnapshotInfo(const std::string & networkId,int32_t missionId,std::unique_ptr<AAFwk::MissionSnapshot> & missionSnapshot)260 int32_t DistributedSchedMissionManager::GetRemoteMissionSnapshotInfo(const std::string& networkId, int32_t missionId,
261 std::unique_ptr<AAFwk::MissionSnapshot>& missionSnapshot)
262 {
263 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
264 if (uuid.empty()) {
265 HILOGE("uuid is empty!");
266 return INVALID_PARAMETERS_ERR;
267 }
268 std::unique_ptr<Snapshot> snapshotPtr = DequeueCachedSnapshotInfo(uuid, missionId);
269 if (snapshotPtr != nullptr) {
270 HILOGI("Get snapshot from cache success, uuid: %{public}s, missionId: %{public}d.",
271 GetAnonymStr(uuid).c_str(), missionId);
272 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
273 return ERR_NONE;
274 }
275 if (distributedDataStorage_ == nullptr) {
276 HILOGE("DistributedDataStorage is null!");
277 return ERR_NULL_OBJECT;
278 }
279 DistributedKv::Value value;
280 bool ret = distributedDataStorage_->Query(networkId, missionId, value);
281 if (!ret) {
282 HILOGE("DistributedDataStorage query failed!");
283 return INVALID_PARAMETERS_ERR;
284 }
285 snapshotPtr = Snapshot::Create(value.Data());
286 if (snapshotPtr == nullptr) {
287 HILOGE("snapshot create failed!");
288 return ERR_NULL_OBJECT;
289 }
290 HILOGI("Get snapshot from DistributedDB success, uuid: %{public}s, missionId: %{public}d.",
291 GetAnonymStr(uuid).c_str(), missionId);
292 SnapshotConverter::ConvertToMissionSnapshot(*snapshotPtr, missionSnapshot);
293 return ERR_NONE;
294 }
295
DeviceOnlineNotify(const std::string & networkId)296 void DistributedSchedMissionManager::DeviceOnlineNotify(const std::string& networkId)
297 {
298 if (networkId.empty()) {
299 HILOGW("DeviceOnlineNotify networkId empty!");
300 return;
301 }
302
303 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
304 if (missionHandler_ != nullptr) {
305 HILOGI("DeviceOnlineNotify RemoveTask");
306 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
307 }
308 }
309
DeviceOfflineNotify(const std::string & networkId)310 void DistributedSchedMissionManager::DeviceOfflineNotify(const std::string& networkId)
311 {
312 if (networkId.empty()) {
313 HILOGW("DeviceOfflineNotify networkId empty!");
314 return;
315 }
316 StopSyncMissionsFromRemote(networkId);
317 CleanMissionResources(networkId);
318 {
319 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
320 auto iter = remoteDmsMap_.find(networkId);
321 if (iter != remoteDmsMap_.end()) {
322 if (iter->second == nullptr) {
323 return;
324 }
325 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
326 remoteDmsMap_.erase(iter);
327 }
328 }
329 HILOGI("DeviceOfflineNotify erase value for networkId: %{public}s.", GetAnonymStr(networkId).c_str());
330 }
331
DeleteDataStorage(const std::string & deviceId,bool isDelayed)332 void DistributedSchedMissionManager::DeleteDataStorage(const std::string& deviceId, bool isDelayed)
333 {
334 if (distributedDataStorage_ == nullptr) {
335 HILOGE("DeleteDataStorage DistributedDataStorage null!");
336 return;
337 }
338 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
339 auto callback = [this, uuid, deviceId]() {
340 if (!distributedDataStorage_->FuzzyDelete(deviceId)) {
341 HILOGE("DeleteDataStorage storage delete failed!");
342 } else {
343 HILOGI("DeleteDataStorage storage delete successfully!");
344 }
345 };
346 if (isDelayed) {
347 if (missionHandler_ != nullptr) {
348 HILOGI("DeleteDataStorage PostTask");
349 missionHandler_->PostTask(callback, DELETE_DATA_STORAGE + uuid, DELETE_DATA_STORAGE_DELAYED);
350 }
351 } else {
352 if (missionHandler_ != nullptr) {
353 HILOGI("DeleteDataStorage RemoveTask");
354 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
355 }
356 callback();
357 }
358 }
359
RegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)360 int32_t DistributedSchedMissionManager::RegisterMissionListener(const std::u16string& devId,
361 const sptr<IRemoteObject>& listener)
362 {
363 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(Str16ToStr8(devId));
364 if (uuid.empty()) {
365 HILOGE("uuid is empty!");
366 return INVALID_PARAMETERS_ERR;
367 }
368 if (missionHandler_ != nullptr) {
369 HILOGI("RemoveTask");
370 missionHandler_->RemoveTask(DELETE_DATA_STORAGE + uuid);
371 }
372 if (listener == nullptr) {
373 return INVALID_PARAMETERS_ERR;
374 }
375 std::string localDeviceId;
376 std::string remoteDeviceId = Str16ToStr8(devId);
377 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
378 || localDeviceId == remoteDeviceId) {
379 HILOGE("check deviceId failed!");
380 return INVALID_PARAMETERS_ERR;
381 }
382 {
383 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
384 auto& listenerInfo = listenDeviceMap_[devId];
385 if (!listenerInfo.Emplace(listener)) {
386 HILOGW("RegisterSyncListener listener has already inserted!");
387 return ERR_NONE;
388 }
389 bool ret = listener->AddDeathRecipient(listenerDeath_);
390 if (!ret) {
391 HILOGW("RegisterSyncListener AddDeathRecipient failed!");
392 }
393 if (listenerInfo.Size() > 1) {
394 HILOGI("RegisterMissionListener not notify remote DMS!");
395 return ERR_NONE;
396 }
397 }
398 return ERR_NONE;
399 }
400
StartSyncRemoteMissions(const std::string & dstDevId,const std::string & localDevId,uint32_t callingTokenId)401 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
402 const std::string& localDevId, uint32_t callingTokenId)
403 {
404 std::u16string devId = Str8ToStr16(dstDevId);
405 {
406 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
407 auto iterItem = listenDeviceMap_.find(devId);
408 if (iterItem == listenDeviceMap_.end()) {
409 return ERR_NONE;
410 }
411 bool callFlag = iterItem->second.called;
412 if (callFlag) {
413 HILOGI("StartSyncRemoteMissions already called!");
414 return ERR_NONE;
415 }
416 }
417 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
418 if (remoteDms == nullptr) {
419 HILOGE("get remoteDms failed!");
420 RetryStartSyncRemoteMissions(dstDevId, localDevId, 0);
421 return GET_REMOTE_DMS_FAIL;
422 }
423 int32_t ret = StartSyncRemoteMissions(dstDevId, remoteDms, callingTokenId);
424 if (ret == ERR_NONE) {
425 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
426 auto iterItem = listenDeviceMap_.find(devId);
427 if (iterItem != listenDeviceMap_.end()) {
428 iterItem->second.called = true;
429 }
430 }
431 return ret;
432 }
433
StartSyncRemoteMissions(const std::string & dstDevId,const sptr<IDistributedSched> & remoteDms,uint32_t callingTokenId)434 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId,
435 const sptr<IDistributedSched>& remoteDms, uint32_t callingTokenId)
436 {
437 if (remoteDms == nullptr) {
438 HILOGE("remoteDms is null");
439 return INVALID_PARAMETERS_ERR;
440 }
441 std::vector<DstbMissionInfo> missionInfos;
442 CallerInfo callerInfo;
443 if (!GenerateCallerInfo(callerInfo)) {
444 return GET_LOCAL_DEVICE_ERR;
445 }
446
447 AccountInfo callerAccountInfo;
448 if (!GetOsAccountData(callerAccountInfo)) {
449 HILOGE("Get Os accountId and userId fail.");
450 return GET_LOCAL_DEVICE_ERR;
451 }
452 callerInfo.accessToken = callingTokenId;
453 nlohmann::json extraInfoJson;
454 extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID] = callerAccountInfo.activeAccountId;
455 extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_USERID_ID] = callerAccountInfo.userId;
456 extraInfoJson["accessToken"] = callerInfo.accessToken;
457 callerInfo.extraInfoJson = extraInfoJson;
458 int64_t begin = GetTickCount();
459 int32_t ret = remoteDms->StartSyncMissionsFromRemote(callerInfo, missionInfos);
460 HILOGI("[PerformanceTest] StartSyncMissionsFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
461 ret, GetTickCount() - begin);
462 if (ret == ERR_NONE) {
463 RebornMissionCache(dstDevId, missionInfos);
464 }
465 return ret;
466 }
467
UnRegisterMissionListener(const std::u16string & devId,const sptr<IRemoteObject> & listener)468 int32_t DistributedSchedMissionManager::UnRegisterMissionListener(const std::u16string& devId,
469 const sptr<IRemoteObject>& listener)
470 {
471 if (listener == nullptr) {
472 return INVALID_PARAMETERS_ERR;
473 }
474 if (!IsDeviceIdValidated(Str16ToStr8(devId))) {
475 return INVALID_PARAMETERS_ERR;
476 }
477 std::string localDeviceId;
478 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
479 || localDeviceId == Str16ToStr8(devId)) {
480 HILOGE("check deviceId fail");
481 return INVALID_PARAMETERS_ERR;
482 }
483 {
484 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
485 auto iterItem = listenDeviceMap_.find(devId);
486 if (iterItem == listenDeviceMap_.end()) {
487 return ERR_NONE;
488 }
489 auto& listenerInfo = iterItem->second;
490 auto ret = listenerInfo.Find(listener);
491 if (!ret) {
492 HILOGI("listener not registered!");
493 return ERR_NONE;
494 }
495 listener->RemoveDeathRecipient(listenerDeath_);
496 listenerInfo.Erase(listener);
497 if (!listenerInfo.Empty()) {
498 return ERR_NONE;
499 }
500 listenDeviceMap_.erase(iterItem);
501 }
502 return ERR_NONE;
503 }
504
CleanMissionResources(const std::string & networkId)505 void DistributedSchedMissionManager::CleanMissionResources(const std::string& networkId)
506 {
507 {
508 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
509 auto iterDevice = listenDeviceMap_.find(Str8ToStr16(networkId));
510 if (iterDevice == listenDeviceMap_.end()) {
511 return;
512 }
513 auto& listenerInfo = iterDevice->second;
514 for (sptr<IRemoteObject> listener : listenerInfo.listenerSet) {
515 if (listener != nullptr) {
516 listener->RemoveDeathRecipient(listenerDeath_);
517 }
518 }
519 listenDeviceMap_.erase(iterDevice);
520 }
521 StopSyncRemoteMissions(networkId, true);
522 }
523
StopSyncRemoteMissions(const std::string & dstDevId,bool offline,bool exit)524 int32_t DistributedSchedMissionManager::StopSyncRemoteMissions(const std::string& dstDevId,
525 bool offline, bool exit)
526 {
527 CleanMissionCache(dstDevId);
528 DeleteCachedSnapshotInfo(dstDevId);
529 DeleteDataStorage(dstDevId, true);
530
531 if (offline) {
532 return ERR_NONE;
533 }
534 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDevId);
535 if (remoteDms == nullptr) {
536 HILOGE("DMS get remoteDms failed");
537 return GET_REMOTE_DMS_FAIL;
538 }
539
540 CallerInfo callerInfo;
541 if (!GenerateCallerInfo(callerInfo)) {
542 return GET_LOCAL_DEVICE_ERR;
543 }
544 int64_t begin = GetTickCount();
545 int32_t ret = remoteDms->StopSyncMissionsFromRemote(callerInfo);
546 HILOGI("[PerformanceTest] ret: %{public}d, spend %{public}" PRId64 " ms", ret, GetTickCount() - begin);
547 return ret;
548 }
549
StartSyncRemoteMissions(const std::string & dstDevId,bool fixConflict,int64_t tag,uint32_t callingTokenId)550 int32_t DistributedSchedMissionManager::StartSyncRemoteMissions(const std::string& dstDevId, bool fixConflict,
551 int64_t tag, uint32_t callingTokenId)
552 {
553 std::string localDeviceId;
554 if (!IsDeviceIdValidated(dstDevId)) {
555 return INVALID_PARAMETERS_ERR;
556 }
557 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)
558 || (dstDevId == localDeviceId)) {
559 HILOGE("check deviceId fail");
560 return INVALID_PARAMETERS_ERR;
561 }
562 HILOGI("begin, dstDevId is %{public}s, local deviceId is %{public}s",
563 GetAnonymStr(dstDevId).c_str(), GetAnonymStr(localDeviceId).c_str());
564 auto ret = StartSyncRemoteMissions(dstDevId, localDeviceId, callingTokenId);
565 if (ret != ERR_NONE) {
566 HILOGE("StartSyncRemoteMissions failed, %{public}d", ret);
567 return ret;
568 }
569 return ERR_NONE;
570 }
571
StartSyncMissionsFromRemote(const CallerInfo & callerInfo,std::vector<DstbMissionInfo> & missionInfoSet)572 int32_t DistributedSchedMissionManager::StartSyncMissionsFromRemote(const CallerInfo& callerInfo,
573 std::vector<DstbMissionInfo>& missionInfoSet)
574 {
575 auto deviceId = callerInfo.sourceDeviceId;
576 HILOGD("remote version is %{public}d!", callerInfo.dmsVersion);
577 {
578 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
579 remoteSyncDeviceSet_.emplace(deviceId);
580 }
581
582 nlohmann::json extraInfoJson = callerInfo.extraInfoJson;
583 if (extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID) == extraInfoJson.end() &&
584 extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_USERID_ID) == extraInfoJson.end()) {
585 HILOGW("older version does not need check access control.");
586 } else {
587 bool res = CheckAccessControlForMissions(callerInfo);
588 if (!res) {
589 HILOGE("check access control failed");
590 return INVALID_PARAMETERS_ERR;
591 }
592 }
593
594 int32_t result = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
595 missionInfoSet);
596 auto func = [this, missionInfoSet]() {
597 HILOGD("RegisterMissionListener called.");
598 if (!isRegMissionChange_) {
599 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
600 if (ret == ERR_OK) {
601 isRegMissionChange_ = true;
602 }
603 InitAllSnapshots(missionInfoSet);
604 }
605 };
606 if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
607 HILOGE("post RegisterMissionListener and InitAllSnapshots Task failed");
608 }
609 return result;
610 }
611
CheckAccessControlForMissions(const CallerInfo & callerInfo)612 bool DistributedSchedMissionManager::CheckAccessControlForMissions(const CallerInfo& callerInfo)
613 {
614 AccountInfo accountInfo;
615 nlohmann::json extraInfoJson = callerInfo.extraInfoJson;
616 if (!extraInfoJson.is_discarded()) {
617 if (extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID) != extraInfoJson.end() &&
618 extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID].is_string()) {
619 accountInfo.activeAccountId = extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_ACCOUNT_ID];
620 }
621 if (extraInfoJson.find(Constants::EXTRO_INFO_JSON_KEY_USERID_ID) != extraInfoJson.end() &&
622 extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_USERID_ID].is_number()) {
623 accountInfo.userId = extraInfoJson[Constants::EXTRO_INFO_JSON_KEY_USERID_ID];
624 }
625 }
626 DmAccessCaller dmSrcCaller = {
627 .accountId = accountInfo.activeAccountId,
628 .networkId = callerInfo.sourceDeviceId,
629 .userId = accountInfo.userId,
630 };
631 std::string localDeviceId;
632 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
633 HILOGE("GetLocalDeviceId failed");
634 return false;
635 }
636 AccountInfo dstAccountInfo;
637 if (!GetOsAccountData(dstAccountInfo)) {
638 HILOGE("Get Os accountId and userId fail.");
639 return false;
640 }
641 DmAccessCallee dmDstCallee = {
642 .accountId = dstAccountInfo.activeAccountId,
643 .networkId = localDeviceId,
644 .userId = dstAccountInfo.userId,
645 };
646 return DeviceManager::GetInstance().CheckSinkAccessControl(dmSrcCaller, dmDstCallee);
647 }
648
StopSyncMissionsFromRemote(const std::string & networkId)649 void DistributedSchedMissionManager::StopSyncMissionsFromRemote(const std::string& networkId)
650 {
651 HILOGD(" %{private}s!", GetAnonymStr(networkId).c_str());
652 {
653 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
654 remoteSyncDeviceSet_.erase(networkId);
655 if (remoteSyncDeviceSet_.empty()) {
656 auto func = [this]() {
657 int32_t ret = DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
658 if (ret == ERR_OK) {
659 isRegMissionChange_ = false;
660 }
661 };
662 if (missionHandler_ != nullptr && !missionHandler_->PostTask(func)) {
663 HILOGE("post UnRegisterMissionListener Task failed");
664 }
665 }
666 }
667 }
668
NeedSyncDevice(const std::string & deviceId)669 bool DistributedSchedMissionManager::NeedSyncDevice(const std::string& deviceId)
670 {
671 if (deviceId.empty()) {
672 HILOGD("deviceId empty!");
673 return false;
674 }
675 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
676 if (remoteSyncDeviceSet_.count(deviceId) == 0) {
677 return false;
678 }
679 return true;
680 }
681
HasSyncListener(const std::string & networkId)682 bool DistributedSchedMissionManager::HasSyncListener(const std::string& networkId)
683 {
684 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
685 auto iter = listenDeviceMap_.find(Str8ToStr16(networkId));
686 if (iter != listenDeviceMap_.end()) {
687 return iter->second.called;
688 }
689 return false;
690 }
691
NotifySnapshotChanged(const std::string & networkId,int32_t missionId)692 void DistributedSchedMissionManager::NotifySnapshotChanged(const std::string& networkId, int32_t missionId)
693 {
694 std::u16string u16DevId = Str8ToStr16(networkId);
695 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
696 auto iter = listenDeviceMap_.find(u16DevId);
697 if (iter == listenDeviceMap_.end()) {
698 return;
699 }
700 auto& listenerInfo = iter->second;
701 for (auto& listener : listenerInfo.listenerSet) {
702 MissionChangedNotify::NotifySnapshot(listener, u16DevId, missionId);
703 }
704 }
705
OnRemoteDied(const wptr<IRemoteObject> & remote)706 void DistributedSchedMissionManager::OnRemoteDied(const wptr<IRemoteObject>& remote)
707 {
708 HILOGD("OnRemoteDied!");
709 sptr<IRemoteObject> listener = remote.promote();
710 if (listener == nullptr) {
711 HILOGW("listener is null");
712 return;
713 }
714 auto remoteDiedFunc = [this, listener]() {
715 OnMissionListenerDied(listener);
716 };
717 if (missionHandler_ != nullptr) {
718 missionHandler_->PostTask(remoteDiedFunc);
719 }
720 }
721
OnRemoteDied(const wptr<IRemoteObject> & remote)722 void DistributedSchedMissionManager::ListenerDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
723 {
724 DistributedSchedMissionManager::GetInstance().OnRemoteDied(remote);
725 }
726
EnqueueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId,std::unique_ptr<Snapshot> snapshot)727 void DistributedSchedMissionManager::EnqueueCachedSnapshotInfo(const std::string& deviceId, int32_t missionId,
728 std::unique_ptr<Snapshot> snapshot)
729 {
730 if (deviceId.empty() || snapshot == nullptr) {
731 HILOGW("EnqueueCachedSnapshotInfo invalid input param!");
732 return;
733 }
734 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
735 std::string keyInfo = GenerateKeyInfo(deviceId, missionId);
736 auto iter = cachedSnapshotInfos_.find(keyInfo);
737 if (iter != cachedSnapshotInfos_.end()) {
738 if (iter->second == nullptr) {
739 HILOGE("snapshotInfo is null");
740 return;
741 }
742 if (snapshot->GetCreatedTime() < iter->second->GetCreatedTime()) {
743 return;
744 }
745 }
746
747 if (cachedSnapshotInfos_.size() == MAX_CACHED_ITEM) {
748 int64_t oldest = -1;
749 auto iterOldest = cachedSnapshotInfos_.end();
750 for (auto iterItem = cachedSnapshotInfos_.begin(); iterItem != cachedSnapshotInfos_.end(); ++iterItem) {
751 if (iterItem->second == nullptr) {
752 HILOGE("snapshotInfo is null");
753 continue;
754 }
755 if (oldest == -1 || iterItem->second->GetLastAccessTime() < oldest) {
756 oldest = iterItem->second->GetLastAccessTime();
757 iterOldest = iterItem;
758 }
759 }
760 if (iterOldest != cachedSnapshotInfos_.end()) {
761 cachedSnapshotInfos_.erase(iterOldest);
762 }
763 }
764 cachedSnapshotInfos_[keyInfo] = std::move(snapshot);
765 }
766
DequeueCachedSnapshotInfo(const std::string & deviceId,int32_t missionId)767 std::unique_ptr<Snapshot> DistributedSchedMissionManager::DequeueCachedSnapshotInfo(const std::string& deviceId,
768 int32_t missionId)
769 {
770 if (deviceId.empty()) {
771 HILOGW("DequeueCachedSnapshotInfo invalid input param!");
772 return nullptr;
773 }
774 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
775 auto iter = cachedSnapshotInfos_.find(GenerateKeyInfo(deviceId, missionId));
776 if (iter != cachedSnapshotInfos_.end()) {
777 std::unique_ptr<Snapshot> snapshot = std::move(iter->second);
778 if (snapshot == nullptr) {
779 HILOGE("snapshot is null");
780 return nullptr;
781 }
782 snapshot->UpdateLastAccessTime(GetTickCount());
783 iter->second = nullptr;
784 cachedSnapshotInfos_.erase(iter);
785 return snapshot;
786 }
787 return nullptr;
788 }
789
DeleteCachedSnapshotInfo(const std::string & networkId)790 void DistributedSchedMissionManager::DeleteCachedSnapshotInfo(const std::string& networkId)
791 {
792 if (networkId.empty()) {
793 HILOGW("networkId empty!");
794 return;
795 }
796 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(networkId);
797 if (uuid.empty()) {
798 HILOGW("uuid empty!");
799 return;
800 }
801 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
802 auto iter = cachedSnapshotInfos_.begin();
803 while (iter != cachedSnapshotInfos_.end()) {
804 if (iter->first.find(uuid) != std::string::npos) {
805 iter = cachedSnapshotInfos_.erase(iter);
806 } else {
807 ++iter;
808 }
809 }
810 }
811
FetchCachedRemoteMissions(const std::string & srcId,int32_t numMissions,std::vector<DstbMissionInfo> & missionInfoSet)812 int32_t DistributedSchedMissionManager::FetchCachedRemoteMissions(const std::string& srcId, int32_t numMissions,
813 std::vector<DstbMissionInfo>& missionInfoSet)
814 {
815 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(srcId);
816 if (uuid.empty()) {
817 HILOGE("uuid empty!");
818 return INVALID_PARAMETERS_ERR;
819 }
820 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
821 auto iter = deviceMissionInfos_.find(uuid);
822 if (iter == deviceMissionInfos_.end()) {
823 HILOGE("can not find uuid, deviceId: %{public}s!", GetAnonymStr(srcId).c_str());
824 return ERR_NULL_OBJECT;
825 }
826
827 // get at most numMissions missions
828 int32_t actualNums = static_cast<int32_t>((iter->second).size());
829 if (actualNums < 0) {
830 HILOGE("invalid size!");
831 return ERR_NULL_OBJECT;
832 }
833 missionInfoSet.assign((iter->second).begin(),
834 (actualNums > numMissions) ? (iter->second).begin() + numMissions : (iter->second).end());
835 return ERR_NONE;
836 }
837
RebornMissionCache(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet)838 void DistributedSchedMissionManager::RebornMissionCache(const std::string& deviceId,
839 const std::vector<DstbMissionInfo>& missionInfoSet)
840 {
841 HILOGI("start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
842 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
843 if (uuid.empty()) {
844 HILOGE("uuid empty!");
845 return;
846 }
847 {
848 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
849 deviceMissionInfos_[uuid] = missionInfoSet;
850 }
851 HILOGI("RebornMissionCache end!");
852 }
853
CleanMissionCache(const std::string & deviceId)854 void DistributedSchedMissionManager::CleanMissionCache(const std::string& deviceId)
855 {
856 HILOGI("CleanMissionCache start! deviceId is %{public}s.", GetAnonymStr(deviceId).c_str());
857 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
858 if (uuid.empty()) {
859 HILOGE("CleanMissionCache uuid empty!");
860 return;
861 }
862 {
863 std::lock_guard<std::mutex> autoLock(remoteMissionInfosLock_);
864 deviceMissionInfos_.erase(uuid);
865 }
866 HILOGI("CleanMissionCache end!");
867 }
868
NotifyMissionsChangedFromRemote(const CallerInfo & callerInfo,const std::vector<DstbMissionInfo> & missionInfoSet)869 int32_t DistributedSchedMissionManager::NotifyMissionsChangedFromRemote(const CallerInfo& callerInfo,
870 const std::vector<DstbMissionInfo>& missionInfoSet)
871 {
872 HILOGI("NotifyMissionsChangedFromRemote version is %{public}d!", callerInfo.dmsVersion);
873 std::u16string u16DevId = Str8ToStr16(callerInfo.sourceDeviceId);
874 RebornMissionCache(callerInfo.sourceDeviceId, missionInfoSet);
875 {
876 HILOGI("NotifyMissionsChangedFromRemote notify mission start!");
877 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
878 auto iter = listenDeviceMap_.find(u16DevId);
879 if (iter == listenDeviceMap_.end()) {
880 HILOGE("NotifyMissionsChangedFromRemote notify mission no listener!");
881 return INVALID_PARAMETERS_ERR;
882 }
883 auto& listenerSet = iter->second.listenerSet;
884 auto notifyChanged = [listenerSet, u16DevId] () {
885 for (const auto& listener : listenerSet) {
886 MissionChangedNotify::NotifyMissionsChanged(listener, u16DevId);
887 }
888 };
889 if (missionHandler_ != nullptr) {
890 missionHandler_->PostTask(notifyChanged);
891 HILOGI("NotifyMissionsChangedFromRemote end!");
892 return ERR_NONE;
893 }
894 }
895 return INVALID_PARAMETERS_ERR;
896 }
897
NotifyLocalMissionsChanged()898 void DistributedSchedMissionManager::NotifyLocalMissionsChanged()
899 {
900 auto func = [this]() {
901 HILOGI("NotifyLocalMissionsChanged");
902 std::vector<DstbMissionInfo> missionInfos;
903 int32_t ret = DistributedSchedAdapter::GetInstance().GetLocalMissionInfos(Mission::GET_MAX_MISSIONS,
904 missionInfos);
905 if (ret == ERR_OK) {
906 int32_t result = NotifyMissionsChangedToRemote(missionInfos);
907 HILOGI("NotifyMissionsChangedToRemote result = %{public}d", result);
908 }
909 };
910 if (missionChangeHandler_ == nullptr) {
911 HILOGE("missionChangeHandler_ is null");
912 return;
913 }
914 if (!missionChangeHandler_->PostTask(func)) {
915 HILOGE("postTask failed");
916 }
917 }
918
NotifyMissionSnapshotCreated(int32_t missionId)919 void DistributedSchedMissionManager::NotifyMissionSnapshotCreated(int32_t missionId)
920 {
921 auto func = [this, missionId]() {
922 HILOGD("called.");
923 ErrCode errCode = MissionSnapshotChanged(missionId);
924 if (errCode != ERR_OK) {
925 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
926 }
927 };
928 if (missionChangeHandler_ == nullptr) {
929 HILOGE("missionChangeHandler_ is null");
930 return;
931 }
932 if (!missionChangeHandler_->PostTask(func, GET_FOREGROUND_SNAPSHOT_DELAY_TIME)) {
933 HILOGE("post MissionSnapshotChanged delay Task failed");
934 }
935 }
936
NotifyMissionSnapshotChanged(int32_t missionId)937 void DistributedSchedMissionManager::NotifyMissionSnapshotChanged(int32_t missionId)
938 {
939 auto func = [this, missionId]() {
940 HILOGD("called.");
941 ErrCode errCode = MissionSnapshotChanged(missionId);
942 if (errCode != ERR_OK) {
943 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
944 }
945 };
946 if (missionChangeHandler_ == nullptr) {
947 HILOGE("missionChangeHandler_ is null");
948 return;
949 }
950 if (!missionChangeHandler_->PostTask(func)) {
951 HILOGE("post MissionSnapshotChanged Task failed");
952 }
953 }
954
NotifyMissionSnapshotDestroyed(int32_t missionId)955 void DistributedSchedMissionManager::NotifyMissionSnapshotDestroyed(int32_t missionId)
956 {
957 auto func = [this, missionId]() {
958 HILOGD("called.");
959 ErrCode errCode = MissionSnapshotDestroyed(missionId);
960 if (errCode != ERR_OK) {
961 HILOGE("mission snapshot removed failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
962 }
963 };
964 if (missionChangeHandler_ == nullptr) {
965 HILOGE("missionChangeHandler_ is null");
966 return;
967 }
968 if (!missionChangeHandler_->PostTask(func)) {
969 HILOGE("post MissionSnapshotDestroyed Task failed");
970 }
971 }
972
NotifyMissionsChangedToRemote(const std::vector<DstbMissionInfo> & missionInfoSet)973 int32_t DistributedSchedMissionManager::NotifyMissionsChangedToRemote(
974 const std::vector<DstbMissionInfo> &missionInfoSet)
975 {
976 CallerInfo callerInfo;
977 if (!GenerateCallerInfo(callerInfo)) {
978 return GET_LOCAL_DEVICE_ERR;
979 }
980 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
981 for (const auto& destDeviceId : remoteSyncDeviceSet_) {
982 auto handler = FetchDeviceHandler(destDeviceId);
983 if (handler == nullptr) {
984 HILOGE("NotifyMissionsChangedToRemote fetch handler failed!");
985 continue;
986 }
987 auto callback = [destDeviceId, missionInfoSet, callerInfo, this] () {
988 NotifyMissionsChangedToRemoteInner(destDeviceId, missionInfoSet, callerInfo);
989 };
990 if (!handler->PostTask(callback)) {
991 HILOGE("NotifyMissionsChangedToRemote PostTask failed!");
992 return ERR_NULL_OBJECT;
993 }
994 }
995
996 return ERR_NONE;
997 }
998
NotifyMissionsChangedToRemoteInner(const std::string & deviceId,const std::vector<DstbMissionInfo> & missionInfoSet,const CallerInfo & callerInfo)999 void DistributedSchedMissionManager::NotifyMissionsChangedToRemoteInner(const std::string& deviceId,
1000 const std::vector<DstbMissionInfo>& missionInfoSet, const CallerInfo& callerInfo)
1001 {
1002 sptr<IDistributedSched> remoteDms = GetRemoteDms(deviceId);
1003 if (remoteDms == nullptr) {
1004 HILOGE("NotifyMissionsChangedToRemote DMS get remoteDms failed");
1005 return;
1006 }
1007 int64_t begin = GetTickCount();
1008 int32_t result = remoteDms->NotifyMissionsChangedFromRemote(missionInfoSet, callerInfo);
1009 HILOGI("[PerformanceTest] NotifyMissionsChangedFromRemote ret:%{public}d, spend %{public}" PRId64 " ms",
1010 result, GetTickCount() - begin);
1011 }
1012
GenerateCallerInfo(CallerInfo & callerInfo)1013 bool DistributedSchedMissionManager::GenerateCallerInfo(CallerInfo& callerInfo)
1014 {
1015 std::string localUuid;
1016 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localUuid)) {
1017 HILOGE("get local uuid failed!");
1018 return false;
1019 }
1020 callerInfo.uid = IPCSkeleton::GetCallingUid();
1021 callerInfo.pid = IPCSkeleton::GetCallingRealPid();
1022 callerInfo.callerType = CALLER_TYPE_HARMONY;
1023 callerInfo.sourceDeviceId = localUuid;
1024 callerInfo.dmsVersion = VERSION;
1025 return true;
1026 }
1027
FetchDeviceHandler(const std::string & deviceId)1028 std::shared_ptr<AppExecFwk::EventHandler> DistributedSchedMissionManager::FetchDeviceHandler(
1029 const std::string& deviceId)
1030 {
1031 if (!IsDeviceIdValidated(deviceId)) {
1032 HILOGW("FetchDeviceHandler device:%{public}s offline.", GetAnonymStr(deviceId).c_str());
1033 return nullptr;
1034 }
1035
1036 std::string uuid = DtbschedmgrDeviceInfoStorage::GetInstance().GetUuidByNetworkId(deviceId);
1037 if (uuid.empty()) {
1038 HILOGE("FetchDeviceHandler uuid empty!");
1039 return nullptr;
1040 }
1041
1042 auto iter = deviceHandle_.find(uuid);
1043 if (iter != deviceHandle_.end()) {
1044 return iter->second;
1045 }
1046
1047 auto anonyUuid = GetAnonymStr(uuid);
1048 auto runner = AppExecFwk::EventRunner::Create(anonyUuid + "_MissionN");
1049 auto handler = std::make_shared<AppExecFwk::EventHandler>(runner);
1050 deviceHandle_.emplace(uuid, handler);
1051 return handler;
1052 }
1053
OnRemoteDied(const wptr<IRemoteObject> & remote)1054 void DistributedSchedMissionManager::RemoteDmsDeathRecipient::OnRemoteDied(const wptr<IRemoteObject>& remote)
1055 {
1056 HILOGI("OnRemoteDied received died notify!");
1057 DistributedSchedMissionManager::GetInstance().OnRemoteDmsDied(remote);
1058 }
1059
OnRemoteDmsDied(const wptr<IRemoteObject> & remote)1060 void DistributedSchedMissionManager::OnRemoteDmsDied(const wptr<IRemoteObject>& remote)
1061 {
1062 sptr<IRemoteObject> diedRemoted = remote.promote();
1063 if (diedRemoted == nullptr) {
1064 HILOGW("OnRemoteDmsDied promote failed!");
1065 return;
1066 }
1067 HILOGD("delete diedRemoted");
1068 auto remoteDmsDiedFunc = [this, diedRemoted]() {
1069 OnRemoteDmsDied(diedRemoted);
1070 };
1071 if (missionHandler_ != nullptr) {
1072 missionHandler_->PostTask(remoteDmsDiedFunc);
1073 }
1074 }
1075
RetryStartSyncRemoteMissions(const std::string & dstDeviceId,const std::string & localDevId,int32_t retryTimes)1076 void DistributedSchedMissionManager::RetryStartSyncRemoteMissions(const std::string& dstDeviceId,
1077 const std::string& localDevId, int32_t retryTimes)
1078 {
1079 auto retryFunc = [this, dstDeviceId, localDevId, retryTimes]() {
1080 bool ret = HasSyncListener(dstDeviceId);
1081 if (!ret) {
1082 return;
1083 }
1084 sptr<IDistributedSched> remoteDms = GetRemoteDms(dstDeviceId);
1085 if (remoteDms == nullptr) {
1086 HILOGI("RetryStartSyncRemoteMissions DMS get remoteDms failed");
1087 RetryStartSyncRemoteMissions(dstDeviceId, localDevId, retryTimes + 1);
1088 return;
1089 }
1090 int32_t errNo = StartSyncRemoteMissions(dstDeviceId, remoteDms);
1091 HILOGI("RetryStartSyncRemoteMissions result:%{public}d", errNo);
1092 };
1093 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1094 missionHandler_->PostTask(retryFunc, RETRY_DELAYED);
1095 }
1096 }
1097
OnMissionListenerDied(const sptr<IRemoteObject> & remote)1098 void DistributedSchedMissionManager::OnMissionListenerDied(const sptr<IRemoteObject>& remote)
1099 {
1100 HILOGI("OnMissionListenerDied");
1101 std::set<std::string> deviceSet;
1102 {
1103 std::lock_guard<std::mutex> autoLock(listenDeviceLock_);
1104 auto iterItem = listenDeviceMap_.begin();
1105 while (iterItem != listenDeviceMap_.end()) {
1106 auto& listenerInfo = iterItem->second;
1107 auto ret = listenerInfo.Find(remote);
1108 if (!ret) {
1109 ++iterItem;
1110 continue;
1111 }
1112 if (remote != nullptr) {
1113 remote->RemoveDeathRecipient(listenerDeath_);
1114 }
1115 listenerInfo.Erase(remote);
1116 if (listenerInfo.Empty()) {
1117 if (listenerInfo.called) {
1118 deviceSet.emplace(Str16ToStr8(iterItem->first));
1119 }
1120 iterItem = listenDeviceMap_.erase(iterItem);
1121 } else {
1122 ++iterItem;
1123 }
1124 }
1125 }
1126 for (auto& devId : deviceSet) {
1127 StopSyncRemoteMissions(devId, false);
1128 }
1129 }
1130
OnRemoteDmsDied(const sptr<IRemoteObject> & remote)1131 void DistributedSchedMissionManager::OnRemoteDmsDied(const sptr<IRemoteObject>& remote)
1132 {
1133 HILOGI("OnRemoteDmsDied");
1134 std::string devId;
1135 {
1136 std::lock_guard<std::mutex> autoLock(remoteDmsLock_);
1137 for (auto iter = remoteDmsMap_.begin(); iter != remoteDmsMap_.end(); ++iter) {
1138 if (iter->second->AsObject() == remote && iter->second->AsObject() != nullptr) {
1139 iter->second->AsObject()->RemoveDeathRecipient(remoteDmsRecipient_);
1140 devId = iter->first;
1141 remoteDmsMap_.erase(iter);
1142 break;
1143 }
1144 }
1145 }
1146 if (devId.empty()) {
1147 return;
1148 }
1149 bool ret = HasSyncListener(devId);
1150 if (ret) {
1151 std::string localDeviceId;
1152 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDeviceId)) {
1153 return;
1154 }
1155 RetryStartSyncRemoteMissions(devId, localDeviceId, 0);
1156 }
1157 }
1158
NotifyDmsProxyProcessDied()1159 void DistributedSchedMissionManager::NotifyDmsProxyProcessDied()
1160 {
1161 HILOGI("NotifyDmsProxyProcessDied!");
1162 if (!isRegMissionChange_) {
1163 return;
1164 }
1165 RetryRegisterMissionChange(0);
1166 }
1167
RetryRegisterMissionChange(int32_t retryTimes)1168 void DistributedSchedMissionManager::RetryRegisterMissionChange(int32_t retryTimes)
1169 {
1170 auto remoteDiedFunc = [this, retryTimes]() {
1171 HILOGI("RetryRegisterMissionChange retryTimes:%{public}d begin", retryTimes);
1172 if (!isRegMissionChange_) {
1173 return;
1174 }
1175 int32_t ret = DistributedSchedAdapter::GetInstance().RegisterMissionListener(missonChangeListener_);
1176 if (ret == ERR_NULL_OBJECT) {
1177 RetryRegisterMissionChange(retryTimes + 1);
1178 HILOGI("RetryRegisterMissionChange dmsproxy null, retry!");
1179 return;
1180 }
1181 HILOGI("RetryRegisterMissionChange result:%{public}d", ret);
1182 };
1183 if (missionHandler_ != nullptr && retryTimes < MAX_RETRY_TIMES) {
1184 missionHandler_->PostTask(remoteDiedFunc, RETRY_DELAYED);
1185 }
1186 }
1187
InitAllSnapshots(const std::vector<DstbMissionInfo> & missionInfoSet)1188 void DistributedSchedMissionManager::InitAllSnapshots(const std::vector<DstbMissionInfo>& missionInfoSet)
1189 {
1190 for (auto iter = missionInfoSet.begin(); iter != missionInfoSet.end(); iter++) {
1191 ErrCode errCode = MissionSnapshotChanged(iter->id);
1192 if (errCode != ERR_OK) {
1193 HILOGE("mission snapshot changed failed, missionId=%{public}d, errCode=%{public}d", iter->id, errCode);
1194 }
1195 }
1196 }
1197
MissionSnapshotChanged(int32_t missionId)1198 int32_t DistributedSchedMissionManager::MissionSnapshotChanged(int32_t missionId)
1199 {
1200 std::string networkId;
1201 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1202 HILOGE("get local networkId failed!");
1203 return INVALID_PARAMETERS_ERR;
1204 }
1205 AAFwk::MissionSnapshot missionSnapshot;
1206 ErrCode errCode = DistributedSchedAdapter::GetInstance()
1207 .GetLocalMissionSnapshotInfo(networkId, missionId, missionSnapshot);
1208 if (errCode != ERR_OK) {
1209 HILOGE("get local mission snapshot failed, missionId=%{public}d, errCode=%{public}d", missionId, errCode);
1210 return errCode;
1211 }
1212 Snapshot snapshot;
1213 SnapshotConverter::ConvertToSnapshot(missionSnapshot, snapshot);
1214 MessageParcel data;
1215 errCode = MissionSnapshotSequence(snapshot, data);
1216 if (errCode != ERR_OK) {
1217 HILOGE("mission snapshot sequence failed, errCode=%{public}d", errCode);
1218 return errCode;
1219 }
1220 size_t len = data.GetReadableBytes();
1221 const uint8_t* byteStream = data.ReadBuffer(len);
1222 if (byteStream == nullptr) {
1223 HILOGE("Failed to read length.");
1224 return INVALID_PARAMETERS_ERR;
1225 }
1226 errCode = StoreSnapshotInfo(networkId, missionId, byteStream, len);
1227 return errCode;
1228 }
1229
MissionSnapshotDestroyed(int32_t missionId)1230 int32_t DistributedSchedMissionManager::MissionSnapshotDestroyed(int32_t missionId)
1231 {
1232 std::string networkId;
1233 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(networkId)) {
1234 HILOGE("get local networkId failed!");
1235 return INVALID_PARAMETERS_ERR;
1236 }
1237 ErrCode errCode = RemoveSnapshotInfo(networkId, missionId);
1238 return errCode;
1239 }
1240
MissionSnapshotSequence(const Snapshot & snapshot,MessageParcel & data)1241 int32_t DistributedSchedMissionManager::MissionSnapshotSequence(const Snapshot& snapshot, MessageParcel& data)
1242 {
1243 bool ret = snapshot.WriteSnapshotInfo(data);
1244 if (!ret) {
1245 HILOGE("WriteSnapshotInfo failed!");
1246 return ERR_FLATTEN_OBJECT;
1247 }
1248 ret = snapshot.WritePixelMap(data);
1249 if (!ret) {
1250 HILOGE("WritePixelMap failed!");
1251 return ERR_FLATTEN_OBJECT;
1252 }
1253 return ERR_OK;
1254 }
1255
OnDnetDied()1256 void DistributedSchedMissionManager::OnDnetDied()
1257 {
1258 auto dnetDiedFunc = [this]() {
1259 HILOGI("OnDnetDied");
1260 std::lock_guard<std::mutex> autoLock(remoteSyncDeviceLock_);
1261 if (!isRegMissionChange_) {
1262 return;
1263 }
1264 remoteSyncDeviceSet_.clear();
1265 DistributedSchedAdapter::GetInstance().UnRegisterMissionListener(missonChangeListener_);
1266 isRegMissionChange_ = false;
1267 };
1268 if (missionHandler_ != nullptr) {
1269 missionHandler_->PostTask(dnetDiedFunc);
1270 }
1271 }
1272 } // namespace DistributedSchedule
1273 } // namespace OHOS
1274