• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsched_continue_manager.h"
17 
18 #include <chrono>
19 #include <sys/prctl.h>
20 
21 #include "cJSON.h"
22 
23 #include "continue_scene_session_handler.h"
24 #include "dfx/distributed_radar.h"
25 #include "distributed_sched_utils.h"
26 #include "dsched_transport_softbus_adapter.h"
27 #include "dtbschedmgr_device_info_storage.h"
28 #include "dtbschedmgr_log.h"
29 #include "mission/distributed_bm_storage.h"
30 #include "mission/dms_continue_condition_manager.h"
31 #include "mission/dsched_sync_e2e.h"
32 #include "multi_user_manager.h"
33 
34 namespace OHOS {
35 namespace DistributedSchedule {
36 namespace {
37 const std::string TAG = "DSchedContinueManager";
38 const std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
39 const std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
40 const std::string MDM_CONTROL = "current user is MDM_CONTROL";
41 const std::u16string CONNECTION_CALLBACK_INTERFACE_TOKEN = u"ohos.abilityshell.DistributedConnection";
42 constexpr int32_t TERMINATE_DELAY_TIME = 200; // ms
43 }
44 
45 IMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
46 
DSchedContinueManager()47 DSchedContinueManager::DSchedContinueManager()
48 {
49 }
50 
~DSchedContinueManager()51 DSchedContinueManager::~DSchedContinueManager()
52 {
53     HILOGI("DSchedContinueManager delete");
54     UnInit();
55 }
56 
Init()57 void DSchedContinueManager::Init()
58 {
59     HILOGI("Init DSchedContinueManager start");
60     {
61         std::unique_lock<std::mutex> lock(hasInitMutex_);
62         if (hasInit_) {
63             HILOGW("Init DSchedContinueManager has init");
64             return;
65         }
66         hasInit_ = true;
67     }
68 
69     if (eventHandler_ != nullptr) {
70         HILOGI("DSchedContinueManager already inited, end.");
71         return;
72     }
73     DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
74     softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
75     DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
76     eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
77     std::unique_lock<std::mutex> lock(eventMutex_);
78     eventCon_.wait(lock, [this] {
79         return eventHandler_ != nullptr;
80     });
81     HILOGI("Init DSchedContinueManager end");
82 }
83 
StartEvent()84 void DSchedContinueManager::StartEvent()
85 {
86     HILOGI("StartEvent start");
87     prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
88     auto runner = AppExecFwk::EventRunner::Create(false);
89     {
90         std::lock_guard<std::mutex> lock(eventMutex_);
91         eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
92     }
93     eventCon_.notify_one();
94     runner->Run();
95     HILOGI("StartEvent end");
96 }
97 
UnInit()98 void DSchedContinueManager::UnInit()
99 {
100     HILOGI("UnInit start");
101     {
102         std::unique_lock<std::mutex> lock(hasInitMutex_);
103         if (!hasInit_) {
104             HILOGW("Init DSchedContinueManager has uninit");
105             return;
106         }
107         hasInit_ = false;
108     }
109 
110     DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
111     continues_.clear();
112     cntSink_ = 0;
113     cntSource_ = 0;
114 
115     if (eventHandler_ != nullptr) {
116         eventHandler_->GetEventRunner()->Stop();
117         if (eventThread_.joinable()) {
118             eventThread_.join();
119         }
120         eventHandler_ = nullptr;
121     } else {
122         HILOGE("eventHandler_ is nullptr");
123     }
124     HILOGI("UnInit end");
125 }
126 
NotifyAllConnectDecision(std::string peerDeviceId,bool isSupport)127 void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
128 {
129     HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
130         GetAnonymStr(peerDeviceId).c_str(), isSupport);
131 #ifdef DMSFWK_ALL_CONNECT_MGR
132     std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
133     peerConnectDecision_[peerDeviceId] = isSupport;
134     connectDecisionCond_.notify_all();
135 #endif
136 }
137 
ContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)138 int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
139     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
140 {
141     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
142         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
143         return INVALID_PARAMETERS_ERR;
144     }
145 
146     std::string localDevId;
147     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
148         HILOGE("get local deviceId failed!");
149         return INVALID_PARAMETERS_ERR;
150     }
151     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
152         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
153             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
154         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
155     }
156     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
157         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
158         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
159             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
160         return INVALID_REMOTE_PARAMETERS_ERR;
161     }
162 
163     auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
164         HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
165     };
166     if (eventHandler_ == nullptr) {
167         HILOGE("eventHandler_ is nullptr");
168         return INVALID_PARAMETERS_ERR;
169     }
170     eventHandler_->PostTask(func);
171     return ERR_OK;
172 }
173 
HandleContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)174 void DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
175     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
176 {
177     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
178         GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
179 
180     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
181         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
182         return;
183     }
184 
185     std::string localDevId;
186     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
187         HILOGE("get local deviceId failed!");
188         return;
189     }
190     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
191 
192     AAFwk::MissionInfo missionInfo;
193     if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
194         && srcDeviceId == localDevId) {
195         info.sourceBundleName_ = missionInfo.want.GetBundle();
196         info.sinkBundleName_ = missionInfo.want.GetBundle();
197     }
198     HandleContinueMissionWithBundleName(info, callback, wantParams);
199     return;
200 }
201 
ContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)202 int32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
203     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
204 {
205     std::string srcDeviceId = continueInfo.sourceDeviceId_;
206     std::string dstDeviceId = continueInfo.sinkDeviceId_;
207 
208     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
209         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
210         return INVALID_PARAMETERS_ERR;
211     }
212 
213     std::string localDevId;
214     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
215         HILOGE("get local deviceId failed!");
216         return INVALID_PARAMETERS_ERR;
217     }
218     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
219         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
220             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
221         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
222     }
223     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
224         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
225         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
226             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
227         return INVALID_REMOTE_PARAMETERS_ERR;
228     }
229 
230 #ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
231     if (localDevId == srcDeviceId) {
232         int32_t missionId = -1;
233         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
234         int32_t ret = DmsContinueConditionMgr::GetInstance().GetMissionIdByBundleName(
235             currentAccountId, continueInfo.sourceBundleName_, missionId);
236         if (ret != ERR_OK) {
237             HILOGE("get missionId fail, ret %{public}d.", ret);
238             return INVALID_PARAMETERS_ERR;
239         }
240     }
241 #endif
242 
243     auto func = [this, continueInfo, callback, wantParams]() {
244         HandleContinueMission(continueInfo, callback, wantParams);
245     };
246     if (eventHandler_ == nullptr) {
247         HILOGE("eventHandler_ is nullptr");
248         return INVALID_PARAMETERS_ERR;
249     }
250     eventHandler_->PostTask(func);
251     return ERR_OK;
252 }
253 
HandleContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)254 void DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
255     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
256 {
257     std::string srcDeviceId = continueInfo.sourceDeviceId_;
258     std::string dstDeviceId = continueInfo.sinkDeviceId_;
259     std::string srcBundleName = continueInfo.sourceBundleName_;
260     if (srcBundleName.empty()) {
261         srcBundleName = continueInfo.sinkBundleName_;
262     }
263     std::string bundleName = continueInfo.sinkBundleName_;
264     std::string continueType = continueInfo.continueType_;
265     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
266         " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
267         bundleName.c_str(), continueType.c_str());
268 
269     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
270         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
271         return;
272     }
273 
274     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
275     HandleContinueMissionWithBundleName(info, callback, wantParams);
276     return;
277 }
278 
GetFirstBundleName(DSchedContinueInfo & info,std::string & firstBundleName,std::string bundleName,std::string deviceId)279 bool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
280     std::string bundleName, std::string deviceId)
281 {
282     uint16_t bundleNameId;
283     DmsBundleInfo distributedBundleInfo;
284     DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
285     bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
286         bundleNameId, distributedBundleInfo);
287     if (!result) {
288         HILOGE("GetDistributedBundleInfo faild");
289         return false;
290     }
291     std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
292     for (DmsAbilityInfo &ability: dmsAbilityInfos) {
293         std::vector<std::string> abilityContinueTypes = ability.continueType;
294         for (std::string &ability_continue_type: abilityContinueTypes) {
295             if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
296                 firstBundleName = *ability.continueBundleName.begin();
297                 return true;
298             }
299         }
300     }
301     HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
302            info.continueType_.c_str());
303     return false;
304 }
305 
HandleContinueMissionWithBundleName(DSchedContinueInfo & info,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)306 void DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
307     const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
308 {
309     bool control = DmsKvSyncE2E::GetInstance()->CheckMDMCtrlRule(info.sourceBundleName_);
310     int32_t direction = CONTINUE_SINK;
311     int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
312     if (ret != ERR_OK) {
313         HILOGE("CheckContinuationLimit failed, ret: %{public}d.", ret);
314         return;
315     }
316     int32_t subType = CONTINUE_PUSH;
317     if (direction == CONTINUE_SOURCE) {
318         CHECK_BOOL_VALUE_RETURN(control, MDM_CONTROL.c_str());
319         cntSource_++;
320     } else {
321         cntSink_++;
322         subType = CONTINUE_PULL;
323         if (info.sourceBundleName_.empty()) {
324             HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
325             std::string firstBundleNamme;
326             std::string bundleName = info.sinkBundleName_;
327             std::string deviceId = info.sinkDeviceId_;
328             if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
329                 info.sourceBundleName_ = firstBundleNamme;
330             }
331         }
332     }
333     {
334         std::lock_guard<std::mutex> continueLock(continueMutex_);
335         if (!continues_.empty() && continues_.find(info) != continues_.end()) {
336             HILOGE("a same continue task is already in progress.");
337             return;
338         }
339         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
340         auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info, currentAccountId);
341         newContinue->Init();
342         continues_.insert(std::make_pair(info, newContinue));
343 #ifdef DMSFWK_ALL_CONNECT_MGR
344         {
345             std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
346             std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
347             if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
348                 peerConnectDecision_.erase(peerDeviceId);
349             }
350         }
351 #endif
352         newContinue->OnContinueMission(wantParams);
353     }
354     WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
355     HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
356         subType, direction, info.ToString().c_str());
357 }
358 
WaitAllConnectDecision(int32_t direction,const DSchedContinueInfo & info,int32_t timeout)359 void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
360 {
361 #ifdef DMSFWK_ALL_CONNECT_MGR
362     std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
363     {
364         std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
365         connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
366             [this, peerDeviceId]() {
367                 return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
368                     peerConnectDecision_.at(peerDeviceId).load();
369             });
370 
371         if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
372             HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
373             SetTimeOut(info, 0);
374             return;
375         }
376         if (!peerConnectDecision_.at(peerDeviceId).load()) {
377             HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
378             peerConnectDecision_.erase(peerDeviceId);
379             SetTimeOut(info, 0);
380             return;
381         }
382         peerConnectDecision_.erase(peerDeviceId);
383     }
384 #endif
385     SetTimeOut(info, timeout);
386 }
387 
SetTimeOut(const DSchedContinueInfo & info,int32_t timeout)388 void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
389 {
390     auto func = [this, info]() {
391         if (continues_.empty() || continues_.count(info) == 0) {
392             HILOGE("continue not exist.");
393             return;
394         }
395         HILOGE("continue timeout! info: %{public}s", info.ToString().c_str());
396         auto dsContinue = continues_[info];
397         if (dsContinue != nullptr) {
398             dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
399         }
400     };
401     if (eventHandler_ == nullptr) {
402         HILOGE("eventHandler_ is nullptr");
403         return;
404     }
405     timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
406         eventHandler_->PostTask(func);
407 }
408 
StartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)409 int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
410     int32_t callerUid, int32_t status, uint32_t accessToken)
411 {
412     std::string dstDeviceId = want.GetElement().GetDeviceID();
413     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
414         HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
415         return INVALID_REMOTE_PARAMETERS_ERR;
416     }
417     if (GetDSchedContinueByWant(want, missionId) == nullptr) {
418         HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
419             GetAnonymStr(dstDeviceId).c_str(), missionId);
420         return INVALID_REMOTE_PARAMETERS_ERR;
421     }
422 
423     auto func = [this, want, missionId, callerUid, status, accessToken]() {
424         HandleStartContinuation(want, missionId, callerUid, status, accessToken);
425     };
426     if (eventHandler_ == nullptr) {
427         HILOGE("eventHandler_ is nullptr");
428         return INVALID_PARAMETERS_ERR;
429     }
430     eventHandler_->PostTask(func);
431     return ERR_OK;
432 }
433 
HandleStartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)434 void DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
435     int32_t callerUid, int32_t status, uint32_t accessToken)
436 {
437     HILOGI("begin");
438     auto dContinue = GetDSchedContinueByWant(want, missionId);
439     if (dContinue != nullptr) {
440         dContinue->OnStartContinuation(want, callerUid, status, accessToken);
441     } else {
442         DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
443     }
444     HILOGI("end");
445     return;
446 }
447 
GetDSchedContinueByWant(const OHOS::AAFwk::Want & want,int32_t missionId)448 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
449     const OHOS::AAFwk::Want& want, int32_t missionId)
450 {
451     std::string srcDeviceId;
452     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
453         DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
454         HILOGE("get local deviceId failed!");
455         return nullptr;
456     }
457     std::string dstDeviceId = want.GetElement().GetDeviceID();
458     std::string bundleName = want.GetElement().GetBundleName();
459     auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
460 
461     HILOGI("continue info: %{public}s.", info.ToString().c_str());
462     {
463         std::lock_guard<std::mutex> continueLock(continueMutex_);
464         if (continues_.empty()) {
465             HILOGE("continue info doesn't match an existing continuation.");
466             return nullptr;
467         }
468         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
469             if (iter->second == nullptr) {
470                 continue;
471             }
472             DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
473             if (srcDeviceId == continueInfo.sourceDeviceId_
474                 && bundleName == continueInfo.sourceBundleName_
475                 && dstDeviceId == continueInfo.sinkDeviceId_) {
476                 return iter->second;
477             }
478         }
479     }
480     HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
481         info.ToString().c_str());
482     return nullptr;
483 }
484 
NotifyCompleteContinuation(const std::u16string & devId,int32_t sessionId,bool isSuccess,const std::string & callerBundleName)485 int32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
486     bool isSuccess, const std::string &callerBundleName)
487 {
488     auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
489         HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
490     };
491     if (eventHandler_ == nullptr) {
492         HILOGE("eventHandler_ is nullptr");
493         return INVALID_PARAMETERS_ERR;
494     }
495     eventHandler_->PostTask(func);
496     return ERR_OK;
497 }
498 
HandleNotifyCompleteContinuation(const std::u16string & devId,int32_t missionId,bool isSuccess,const std::string & callerBundleName)499 void DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
500     bool isSuccess, const std::string &callerBundleName)
501 {
502     HILOGI("begin, isSuccess %{public}d", isSuccess);
503     auto dContinue = GetDSchedContinueByDevId(devId, missionId);
504     if (dContinue != nullptr) {
505         if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
506             HILOGE("callerBundleName doesn't match the existing continuation");
507             return;
508         }
509         dContinue->hasResult_ = true;
510         dContinue->OnNotifyComplete(missionId, isSuccess);
511         HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().ToString().c_str());
512     }
513     return;
514 }
515 
GetDSchedContinueByDevId(const std::u16string & devId,int32_t missionId)516 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
517     const std::u16string& devId, int32_t missionId)
518 {
519     std::string deviceId = Str16ToStr8(devId);
520     HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
521     {
522         std::lock_guard<std::mutex> continueLock(continueMutex_);
523         if (continues_.empty()) {
524             HILOGE("No continuation in progress.");
525             return nullptr;
526         }
527         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
528             if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
529                 return iter->second;
530             }
531         }
532     }
533     HILOGE("source deviceId doesn't match an existing continuation.");
534     return nullptr;
535 }
536 
NotifyTerminateContinuation(const int32_t missionId)537 void DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
538 {
539     HILOGI("begin, missionId %{public}d", missionId);
540     auto func = [this, missionId]() {
541         HandleNotifyTerminateContinuation(missionId);
542     };
543     if (eventHandler_ == nullptr) {
544         HILOGE("eventHandler_ is nullptr.");
545         return;
546     }
547     eventHandler_->PostTask(func, TERMINATE_DELAY_TIME);
548     HILOGW("doesn't match an existing continuation.");
549 }
550 
HandleNotifyTerminateContinuation(const int32_t missionId)551 void DSchedContinueManager::HandleNotifyTerminateContinuation(const int32_t missionId)
552 {
553     HILOGI("HandleNotifyTerminateContinuation begin, missionId %{public}d", missionId);
554     {
555         std::lock_guard<std::mutex> continueLock(continueMutex_);
556         if (continues_.empty()) {
557             HILOGW("No continuation in progress.");
558             return;
559         }
560 
561         MissionStatus status;
562         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
563         int32_t ret = DmsContinueConditionMgr::GetInstance().GetMissionStatus(currentAccountId, missionId, status);
564         if (ret != ERR_OK) {
565             HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
566             return;
567         }
568         auto flag = status.launchFlag;
569         if ((flag & AAFwk::Want::FLAG_ABILITY_CONTINUATION) != AAFwk::Want::FLAG_ABILITY_CONTINUATION &&
570             (flag & AAFwk::Want::FLAG_ABILITY_PREPARE_CONTINUATION) != AAFwk::Want::FLAG_ABILITY_PREPARE_CONTINUATION) {
571             HILOGI("missionId %{public}d not start by continue, ignore", missionId);
572             return;
573         }
574         HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
575             status.bundleName.c_str(), status.abilityName.c_str());
576         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
577             if (iter->second == nullptr) {
578                 break;
579             }
580 
581             auto continueInfo = iter->second->GetContinueInfo();
582             HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
583                 continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
584             if (status.bundleName == continueInfo.sinkBundleName_
585                 && status.abilityName == continueInfo.sinkAbilityName_) {
586                 if (iter->second->hasResult_) {
587                     HILOGI("hasResult_ %{public}d", iter->second->hasResult_);
588                     return;
589                 }
590                 iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
591                 return;
592             }
593         }
594     }
595     HILOGI("HandleNotifyTerminateContinuation end.");
596 }
597 
ContinueStateCallbackRegister(StateCallbackInfo & stateCallbackInfo,sptr<IRemoteObject> callback)598 int32_t DSchedContinueManager::ContinueStateCallbackRegister(
599     StateCallbackInfo &stateCallbackInfo, sptr<IRemoteObject> callback)
600 {
601     std::shared_ptr<StateCallbackData> stateCallbackDataExist = FindStateCallbackData(stateCallbackInfo);
602     if (stateCallbackDataExist == nullptr) {
603         if (callback == nullptr) {
604             return NO_CONNECT_CALLBACK_ERR;
605         }
606         StateCallbackData stateCallbackData;
607         sptr<StateCallbackIpcDiedListener> diedListener = new StateCallbackIpcDiedListener();
608         diedListener->stateCallbackInfo_ = stateCallbackInfo;
609         stateCallbackData.diedListener = diedListener;
610         callback->AddDeathRecipient(diedListener);
611         stateCallbackData.remoteObject = callback;
612         AddStateCallbackData(stateCallbackInfo, stateCallbackData);
613         return ERR_OK;
614     }
615     stateCallbackDataExist->remoteObject = callback;
616     if (stateCallbackDataExist->state != -1) {
617         return NotifyQuickStartState(stateCallbackInfo, stateCallbackDataExist->state, stateCallbackDataExist->message);
618     }
619     return ERR_OK;
620 }
621 
ContinueStateCallbackUnRegister(StateCallbackInfo & stateCallbackInfo)622 int32_t DSchedContinueManager::ContinueStateCallbackUnRegister(StateCallbackInfo &stateCallbackInfo)
623 {
624     RemoveStateCallbackData(stateCallbackInfo);
625     return ERR_OK;
626 }
627 
NotifyQuickStartState(StateCallbackInfo & stateCallbackInfo,int32_t state,std::string message)628 int32_t DSchedContinueManager::NotifyQuickStartState(StateCallbackInfo &stateCallbackInfo,
629     int32_t state, std::string message)
630 {
631     HILOGI("NotifyQuickStartState called, state: %{public}d, message: %{public}s", state, message.c_str());
632     std::shared_ptr<StateCallbackData> stateCallbackDataExist = FindStateCallbackData(stateCallbackInfo);
633     if (stateCallbackDataExist == nullptr) {
634         StateCallbackData nweStateCallbackData;
635         nweStateCallbackData.state = state;
636         nweStateCallbackData.message = message;
637         AddStateCallbackData(stateCallbackInfo, nweStateCallbackData);
638         return ERR_OK;
639     }
640 
641     if (stateCallbackDataExist->remoteObject == nullptr) {
642         stateCallbackDataExist->state = state;
643         stateCallbackDataExist->message = message;
644         StateCallbackData *stateCallbackData = stateCallbackDataExist.get();
645         AddStateCallbackData(stateCallbackInfo, *stateCallbackData);
646         HILOGW("IPC remote object for quick start callback is null, state saved.");
647         return ERR_OK;
648     }
649     MessageParcel data;
650     if (!data.WriteInterfaceToken(CONNECTION_CALLBACK_INTERFACE_TOKEN)) {
651         HILOGE("Write interface token failed");
652         return IPC_CALL_NORESPONSE_ERR;
653     }
654 
655     if (!data.WriteInt32(state)) {
656         HILOGE("Write state failed");
657         return IPC_CALL_NORESPONSE_ERR;
658     }
659 
660     if (!data.WriteString(message)) {
661         HILOGE("Write message failed");
662         return IPC_CALL_NORESPONSE_ERR;
663     }
664 
665     MessageParcel reply;
666     MessageOption option;
667     stateCallbackDataExist->remoteObject->SendRequest(
668         static_cast<uint32_t>(IDSchedInterfaceCode::CONTINUE_STATE_CALLBACK), data, reply, option);
669     stateCallbackDataExist->state = -1;
670     stateCallbackDataExist->message = "";
671     return ERR_OK;
672 }
673 
OnContinueEnd(const DSchedContinueInfo & info)674 int32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
675 {
676     auto func = [this, info]() {
677         HandleContinueEnd(info);
678     };
679     if (eventHandler_ == nullptr) {
680         HILOGE("eventHandler_ is nullptr");
681         return INVALID_PARAMETERS_ERR;
682     }
683     eventHandler_->PostTask(func);
684     return ERR_OK;
685 }
686 
HandleContinueEnd(const DSchedContinueInfo & info)687 void DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
688 {
689     HILOGI("begin, continue info: %{public}s.", info.ToString().c_str());
690     std::lock_guard<std::mutex> continueLock(continueMutex_);
691     if (continues_.empty() || continues_.find(info) == continues_.end()) {
692         HILOGE("continue info doesn't match any existing continuation.");
693         return;
694     }
695     RemoveTimeout(info);
696     continues_.erase(info);
697     ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
698 
699     std::string localDevId;
700     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
701         HILOGE("get local deviceId failed!");
702         return;
703     }
704 
705     if (info.sinkDeviceId_ == localDevId) {
706         cntSink_--;
707     } else if (info.sourceDeviceId_ == localDevId) {
708         cntSource_--;
709     }
710     HILOGI("end.");
711 }
712 
RemoveTimeout(const DSchedContinueInfo & info)713 void DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
714 {
715     if (eventHandler_ == nullptr) {
716         HILOGE("eventHandler_ is nullptr");
717         return;
718     }
719     eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
720 }
721 
FindStateCallbackData(StateCallbackInfo & stateCallbackInfo)722 std::shared_ptr<StateCallbackData> DSchedContinueManager::FindStateCallbackData(StateCallbackInfo &stateCallbackInfo)
723 {
724     std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
725     auto lastResult = stateCallbackCache_.find(stateCallbackInfo);
726     if (lastResult == stateCallbackCache_.end()) {
727         return nullptr;
728     }
729     return std::make_shared<StateCallbackData>(lastResult->second);
730 }
731 
AddStateCallbackData(StateCallbackInfo & stateCallbackInfo,StateCallbackData & stateCallbackData)732 void DSchedContinueManager::AddStateCallbackData(
733     StateCallbackInfo &stateCallbackInfo, StateCallbackData &stateCallbackData)
734 {
735     std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
736     stateCallbackCache_.emplace(stateCallbackInfo, stateCallbackData);
737 }
738 
RemoveStateCallbackData(StateCallbackInfo & stateCallbackInfo)739 void DSchedContinueManager::RemoveStateCallbackData(StateCallbackInfo &stateCallbackInfo)
740 {
741     std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
742     stateCallbackCache_.erase(stateCallbackInfo);
743 }
744 
OnDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)745 void DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
746 {
747     auto func = [this, sessionId, dataBuffer]() {
748         HandleDataRecv(sessionId, dataBuffer);
749     };
750     if (eventHandler_ == nullptr) {
751         HILOGE("eventHandler_ is nullptr");
752         return;
753     }
754     eventHandler_->PostTask(func);
755 }
756 
HandleDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)757 void DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
758 {
759     HILOGI("start, sessionId: %{public}d.", sessionId);
760     if (dataBuffer == nullptr) {
761         HILOGE("dataBuffer is null.");
762         return;
763     }
764     uint8_t *data = dataBuffer->Data();
765     std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
766     cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
767     if (rootValue == nullptr) {
768         HILOGE("Parse jsonStr error.");
769         return;
770     }
771     cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
772     if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
773         cJSON_Delete(rootValue);
774         HILOGE("Parse base cmd error.");
775         return;
776     }
777 
778     cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
779     cJSON_Delete(rootValue);
780     if (cmdValue == nullptr) {
781         HILOGE("Parse cmd value error.");
782         return;
783     }
784 
785     cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
786     if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
787         cJSON_Delete(cmdValue);
788         HILOGE("parse command failed");
789         return;
790     }
791     int32_t command = comvalue->valueint;
792     cJSON_Delete(cmdValue);
793     NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
794     HILOGI("end, sessionId: %{public}d.", sessionId);
795 }
796 
NotifyContinueDataRecv(int32_t sessionId,int32_t command,const std::string & jsonStr,std::shared_ptr<DSchedDataBuffer> dataBuffer)797 void DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
798     std::shared_ptr<DSchedDataBuffer> dataBuffer)
799 {
800     HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
801     std::lock_guard<std::mutex> continueLock(continueMutex_);
802     if (!continues_.empty()) {
803         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
804             if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
805                 HILOGI("sessionId %{public}d exist.", sessionId);
806                 iter->second->OnDataRecv(command, dataBuffer);
807                 return;
808             }
809         }
810     }
811 
812     if (command == DSCHED_CONTINUE_CMD_START) {
813         HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
814         auto startCmd = std::make_shared<DSchedContinueStartCmd>();
815         int32_t ret = startCmd->Unmarshal(jsonStr);
816         if (ret != ERR_OK) {
817             HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
818             return;
819         }
820         int32_t direction = CONTINUE_SINK;
821         ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
822         if (ret != ERR_OK) {
823             DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
824             HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
825             return;
826         }
827 
828         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
829         auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId, currentAccountId);
830         newContinue->Init();
831         continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
832 
833         if (DmsKvSyncE2E::GetInstance()->CheckMDMCtrlRule(newContinue->GetContinueInfo().sourceBundleName_)) {
834             HILOGI("Current user is under MDM control.");
835             return;
836         }
837         newContinue->OnStartCmd(startCmd->appVersion_);
838         HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().ToString().c_str());
839         return;
840     }
841     HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
842     return;
843 }
844 
CheckContinuationLimit(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t & direction)845 int32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
846     const std::string& dstDeviceId, int32_t &direction)
847 {
848     std::string localDevId;
849     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
850         HILOGE("get local deviceId failed!");
851         return GET_LOCAL_DEVICE_ERR;
852     }
853 
854     direction = CONTINUE_SINK;
855     if (dstDeviceId == localDevId) {
856         if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
857             HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
858             return CONTINUE_ALREADY_IN_PROGRESS;
859         }
860         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
861             HILOGE("Irrecognized source device!");
862             return INVALID_PARAMETERS_ERR;
863         }
864     } else if (srcDeviceId == localDevId) {
865         if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
866             HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
867             return CONTINUE_ALREADY_IN_PROGRESS;
868         }
869         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
870             HILOGE("Irrecognized destination device!");
871             return INVALID_PARAMETERS_ERR;
872         }
873         direction = CONTINUE_SOURCE;
874     } else {
875         HILOGE("source or target device must be local!");
876         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
877     }
878     return ERR_OK;
879 }
880 
GetContinueInfo(std::string & srcDeviceId,std::string & dstDeviceId)881 int32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
882 {
883     HILOGI("called");
884     std::lock_guard<std::mutex> continueLock(continueMutex_);
885     if (continues_.empty()) {
886         HILOGW("No continuation in progress.");
887         return ERR_OK;
888     }
889     auto dsContinue = continues_.begin()->second;
890     if (dsContinue == nullptr) {
891         HILOGE("dContinue is null");
892         return INVALID_PARAMETERS_ERR;
893     }
894     dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
895     srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
896     return ERR_OK;
897 }
898 
OnShutdown(int32_t socket,bool isSelfCalled)899 void DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
900 {
901     if (isSelfCalled) {
902         HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
903         return;
904     }
905     HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
906     auto func = [this, socket]() {
907         std::lock_guard<std::mutex> continueLock(continueMutex_);
908         if (continues_.empty()) {
909             return;
910         }
911         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
912             if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
913                 iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
914             }
915         }
916     };
917     if (eventHandler_ == nullptr) {
918         HILOGE("eventHandler_ is nullptr");
919         return;
920     }
921     eventHandler_->PostTask(func);
922     return;
923 }
924 
OnBind(int32_t socket,PeerSocketInfo info)925 void DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
926 {
927 }
928 
OnShutdown(int32_t socket,bool isSelfCalled)929 void DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
930 {
931     DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
932 }
933 
OnDataRecv(int32_t socket,std::shared_ptr<DSchedDataBuffer> dataBuffer)934 void DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
935 {
936     DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
937 }
938 }  // namespace DistributedSchedule
939 }  // namespace OHOS
940