• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "dsched_continue_manager.h"
17 
18 #include <chrono>
19 #include <sys/prctl.h>
20 
21 #include "cJSON.h"
22 
23 #include "continue_scene_session_handler.h"
24 #include "dfx/distributed_radar.h"
25 #include "distributed_sched_utils.h"
26 #include "dsched_transport_softbus_adapter.h"
27 #include "dtbschedmgr_device_info_storage.h"
28 #include "dtbschedmgr_log.h"
29 #include "mission/distributed_bm_storage.h"
30 #include "mission/dms_continue_condition_manager.h"
31 #include "multi_user_manager.h"
32 
33 namespace OHOS {
34 namespace DistributedSchedule {
35 namespace {
36 const std::string TAG = "DSchedContinueManager";
37 const std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
38 const std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
39 const std::u16string CONNECTION_CALLBACK_INTERFACE_TOKEN = u"ohos.abilityshell.DistributedConnection";
40 constexpr int32_t TERMINATE_DELAY_TIME = 200; //ms
41 }
42 
43 IMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
44 
DSchedContinueManager()45 DSchedContinueManager::DSchedContinueManager()
46 {
47 }
48 
~DSchedContinueManager()49 DSchedContinueManager::~DSchedContinueManager()
50 {
51     HILOGI("DSchedContinueManager delete");
52     UnInit();
53 }
54 
Init()55 void DSchedContinueManager::Init()
56 {
57     HILOGI("Init DSchedContinueManager start");
58     {
59         std::unique_lock<std::mutex> lock(hasInitMutex_);
60         if (hasInit_) {
61             HILOGW("Init DSchedContinueManager has init");
62             return;
63         }
64         hasInit_ = true;
65     }
66 
67     if (eventHandler_ != nullptr) {
68         HILOGI("DSchedContinueManager already inited, end.");
69         return;
70     }
71     DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
72     softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
73     DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
74     eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
75     std::unique_lock<std::mutex> lock(eventMutex_);
76     eventCon_.wait(lock, [this] {
77         return eventHandler_ != nullptr;
78     });
79     HILOGI("Init DSchedContinueManager end");
80 }
81 
StartEvent()82 void DSchedContinueManager::StartEvent()
83 {
84     HILOGI("StartEvent start");
85     prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
86     auto runner = AppExecFwk::EventRunner::Create(false);
87     {
88         std::lock_guard<std::mutex> lock(eventMutex_);
89         eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
90     }
91     eventCon_.notify_one();
92     runner->Run();
93     HILOGI("StartEvent end");
94 }
95 
UnInit()96 void DSchedContinueManager::UnInit()
97 {
98     HILOGI("UnInit start");
99     {
100         std::unique_lock<std::mutex> lock(hasInitMutex_);
101         if (!hasInit_) {
102             HILOGW("Init DSchedContinueManager has uninit");
103             return;
104         }
105         hasInit_ = false;
106     }
107 
108     DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
109     continues_.clear();
110     cntSink_ = 0;
111     cntSource_ = 0;
112 
113     if (eventHandler_ != nullptr) {
114         eventHandler_->GetEventRunner()->Stop();
115         if (eventThread_.joinable()) {
116             eventThread_.join();
117         }
118         eventHandler_ = nullptr;
119     } else {
120         HILOGE("eventHandler_ is nullptr");
121     }
122     HILOGI("UnInit end");
123 }
124 
NotifyAllConnectDecision(std::string peerDeviceId,bool isSupport)125 void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
126 {
127     HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
128         GetAnonymStr(peerDeviceId).c_str(), isSupport);
129 #ifdef DMSFWK_ALL_CONNECT_MGR
130     std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
131     peerConnectDecision_[peerDeviceId] = isSupport;
132     connectDecisionCond_.notify_all();
133 #endif
134 }
135 
ContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)136 int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
137     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
138 {
139     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
140         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
141         return INVALID_PARAMETERS_ERR;
142     }
143 
144     std::string localDevId;
145     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
146         HILOGE("get local deviceId failed!");
147         return INVALID_PARAMETERS_ERR;
148     }
149     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
150         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
151             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
152         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
153     }
154     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
155         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
156         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
157             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
158         return INVALID_REMOTE_PARAMETERS_ERR;
159     }
160 
161     auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
162         HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
163     };
164     if (eventHandler_ == nullptr) {
165         HILOGE("eventHandler_ is nullptr");
166         return INVALID_PARAMETERS_ERR;
167     }
168     eventHandler_->PostTask(func);
169     return ERR_OK;
170 }
171 
HandleContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)172 void DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
173     int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
174 {
175     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
176         GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
177 
178     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
179         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
180         return;
181     }
182 
183     std::string localDevId;
184     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
185         HILOGE("get local deviceId failed!");
186         return;
187     }
188     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
189 
190     AAFwk::MissionInfo missionInfo;
191     if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
192         && srcDeviceId == localDevId) {
193         info.sourceBundleName_ = missionInfo.want.GetBundle();
194         info.sinkBundleName_ = missionInfo.want.GetBundle();
195     }
196 
197     HandleContinueMissionWithBundleName(info, callback, wantParams);
198     return;
199 }
200 
ContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)201 int32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
202     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
203 {
204     std::string srcDeviceId = continueInfo.sourceDeviceId_;
205     std::string dstDeviceId = continueInfo.sinkDeviceId_;
206 
207     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
208         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
209         return INVALID_PARAMETERS_ERR;
210     }
211 
212     std::string localDevId;
213     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
214         HILOGE("get local deviceId failed!");
215         return INVALID_PARAMETERS_ERR;
216     }
217     if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
218         HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
219             GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
220         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
221     }
222     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
223         localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
224         HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
225             GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
226         return INVALID_REMOTE_PARAMETERS_ERR;
227     }
228 
229 #ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
230     if (localDevId == srcDeviceId) {
231         int32_t missionId = -1;
232         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
233         int32_t ret = DmsContinueConditionMgr::GetInstance().GetMissionIdByBundleName(
234             currentAccountId, continueInfo.sourceBundleName_, missionId);
235         if (ret != ERR_OK) {
236             HILOGE("get missionId fail, ret %{public}d.", ret);
237             return INVALID_PARAMETERS_ERR;
238         }
239     }
240 #endif
241 
242     auto func = [this, continueInfo, callback, wantParams]() {
243         HandleContinueMission(continueInfo, callback, wantParams);
244     };
245     if (eventHandler_ == nullptr) {
246         HILOGE("eventHandler_ is nullptr");
247         return INVALID_PARAMETERS_ERR;
248     }
249     eventHandler_->PostTask(func);
250     return ERR_OK;
251 }
252 
HandleContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)253 void DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
254     const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
255 {
256     std::string srcDeviceId = continueInfo.sourceDeviceId_;
257     std::string dstDeviceId = continueInfo.sinkDeviceId_;
258     std::string srcBundleName = continueInfo.sourceBundleName_;
259     if (srcBundleName.empty()) {
260         srcBundleName = continueInfo.sinkBundleName_;
261     }
262     std::string bundleName = continueInfo.sinkBundleName_;
263     std::string continueType = continueInfo.continueType_;
264     HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
265         " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
266         bundleName.c_str(), continueType.c_str());
267 
268     if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
269         HILOGE("srcDeviceId or dstDeviceId or callback is null!");
270         return;
271     }
272 
273     DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
274     HandleContinueMissionWithBundleName(info, callback, wantParams);
275     return;
276 }
277 
GetFirstBundleName(DSchedContinueInfo & info,std::string & firstBundleName,std::string bundleName,std::string deviceId)278 bool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
279     std::string bundleName, std::string deviceId)
280 {
281     uint16_t bundleNameId;
282     DmsBundleInfo distributedBundleInfo;
283     DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
284     bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
285         bundleNameId, distributedBundleInfo);
286     if (!result) {
287         HILOGE("GetDistributedBundleInfo faild");
288         return false;
289     }
290     std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
291     for (DmsAbilityInfo &ability: dmsAbilityInfos) {
292         std::vector<std::string> abilityContinueTypes = ability.continueType;
293         for (std::string &ability_continue_type: abilityContinueTypes) {
294             if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
295                 firstBundleName = *ability.continueBundleName.begin();
296                 return true;
297             }
298         }
299     }
300     HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
301            info.continueType_.c_str());
302     return false;
303 }
304 
HandleContinueMissionWithBundleName(DSchedContinueInfo & info,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)305 void DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
306     const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
307 {
308     int32_t direction = CONTINUE_SINK;
309     int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
310     if (ret != ERR_OK) {
311         HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret);
312         return;
313     }
314     int32_t subType = CONTINUE_PUSH;
315     if (direction == CONTINUE_SOURCE) {
316         cntSource_++;
317     } else {
318         cntSink_++;
319         subType = CONTINUE_PULL;
320         if (info.sourceBundleName_.empty()) {
321             HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
322             std::string firstBundleNamme;
323             std::string bundleName = info.sinkBundleName_;
324             std::string deviceId = info.sinkDeviceId_;
325             if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
326                 info.sourceBundleName_ = firstBundleNamme;
327             }
328         }
329     }
330     {
331         std::lock_guard<std::mutex> continueLock(continueMutex_);
332         if (!continues_.empty() && continues_.find(info) != continues_.end()) {
333             HILOGE("a same continue task is already in progress.");
334             return;
335         }
336         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
337         auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info, currentAccountId);
338         newContinue->Init();
339         continues_.insert(std::make_pair(info, newContinue));
340 #ifdef DMSFWK_ALL_CONNECT_MGR
341         {
342             std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
343             std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
344             if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
345                 peerConnectDecision_.erase(peerDeviceId);
346             }
347         }
348 #endif
349         newContinue->OnContinueMission(wantParams);
350     }
351     WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
352     HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
353         subType, direction, info.ToString().c_str());
354 }
355 
WaitAllConnectDecision(int32_t direction,const DSchedContinueInfo & info,int32_t timeout)356 void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
357 {
358 #ifdef DMSFWK_ALL_CONNECT_MGR
359     std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
360     {
361         std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
362         connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
363             [this, peerDeviceId]() {
364                 return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
365                     peerConnectDecision_.at(peerDeviceId).load();
366             });
367 
368         if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
369             HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
370             SetTimeOut(info, 0);
371             return;
372         }
373         if (!peerConnectDecision_.at(peerDeviceId).load()) {
374             HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
375             peerConnectDecision_.erase(peerDeviceId);
376             SetTimeOut(info, 0);
377             return;
378         }
379         peerConnectDecision_.erase(peerDeviceId);
380     }
381 #endif
382     SetTimeOut(info, timeout);
383 }
384 
SetTimeOut(const DSchedContinueInfo & info,int32_t timeout)385 void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
386 {
387     auto func = [this, info]() {
388         if (continues_.empty() || continues_.count(info) == 0) {
389             HILOGE("continue not exist.");
390             return;
391         }
392         HILOGE("continue timeout! info: %{public}s", info.ToString().c_str());
393         auto dsContinue = continues_[info];
394         if (dsContinue != nullptr) {
395             dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
396         }
397     };
398     if (eventHandler_ == nullptr) {
399         HILOGE("eventHandler_ is nullptr");
400         return;
401     }
402     timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
403         eventHandler_->PostTask(func);
404 }
405 
StartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)406 int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
407     int32_t callerUid, int32_t status, uint32_t accessToken)
408 {
409     std::string dstDeviceId = want.GetElement().GetDeviceID();
410     if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
411         HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
412         return INVALID_REMOTE_PARAMETERS_ERR;
413     }
414     if (GetDSchedContinueByWant(want, missionId) == nullptr) {
415         HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
416             GetAnonymStr(dstDeviceId).c_str(), missionId);
417         return INVALID_REMOTE_PARAMETERS_ERR;
418     }
419 
420     auto func = [this, want, missionId, callerUid, status, accessToken]() {
421         HandleStartContinuation(want, missionId, callerUid, status, accessToken);
422     };
423     if (eventHandler_ == nullptr) {
424         HILOGE("eventHandler_ is nullptr");
425         return INVALID_PARAMETERS_ERR;
426     }
427     eventHandler_->PostTask(func);
428     return ERR_OK;
429 }
430 
HandleStartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)431 void DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
432     int32_t callerUid, int32_t status, uint32_t accessToken)
433 {
434     HILOGI("begin");
435     auto dContinue = GetDSchedContinueByWant(want, missionId);
436     if (dContinue != nullptr) {
437         dContinue->OnStartContinuation(want, callerUid, status, accessToken);
438     } else {
439         DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
440     }
441     HILOGI("end");
442     return;
443 }
444 
GetDSchedContinueByWant(const OHOS::AAFwk::Want & want,int32_t missionId)445 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
446     const OHOS::AAFwk::Want& want, int32_t missionId)
447 {
448     std::string srcDeviceId;
449     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
450         DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
451         HILOGE("get local deviceId failed!");
452         return nullptr;
453     }
454     std::string dstDeviceId = want.GetElement().GetDeviceID();
455     std::string bundleName = want.GetElement().GetBundleName();
456     auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
457 
458     HILOGI("continue info: %{public}s.", info.ToString().c_str());
459     {
460         std::lock_guard<std::mutex> continueLock(continueMutex_);
461         if (continues_.empty()) {
462             HILOGE("continue info doesn't match an existing continuation.");
463             return nullptr;
464         }
465         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
466             if (iter->second == nullptr) {
467                 continue;
468             }
469             DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
470             if (srcDeviceId == continueInfo.sourceDeviceId_
471                 && bundleName == continueInfo.sourceBundleName_
472                 && dstDeviceId == continueInfo.sinkDeviceId_) {
473                 return iter->second;
474             }
475         }
476     }
477     HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
478         info.ToString().c_str());
479     return nullptr;
480 }
481 
NotifyCompleteContinuation(const std::u16string & devId,int32_t sessionId,bool isSuccess,const std::string & callerBundleName)482 int32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
483     bool isSuccess, const std::string &callerBundleName)
484 {
485     auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
486         HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
487     };
488     if (eventHandler_ == nullptr) {
489         HILOGE("eventHandler_ is nullptr");
490         return INVALID_PARAMETERS_ERR;
491     }
492     eventHandler_->PostTask(func);
493     return ERR_OK;
494 }
495 
HandleNotifyCompleteContinuation(const std::u16string & devId,int32_t missionId,bool isSuccess,const std::string & callerBundleName)496 void DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
497     bool isSuccess, const std::string &callerBundleName)
498 {
499     HILOGI("begin, isSuccess %{public}d", isSuccess);
500     auto dContinue = GetDSchedContinueByDevId(devId, missionId);
501     if (dContinue != nullptr) {
502         if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
503             HILOGE("callerBundleName doesn't match the existing continuation");
504             return;
505         }
506         dContinue->hasResult_ = true;
507         dContinue->OnNotifyComplete(missionId, isSuccess);
508         HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().ToString().c_str());
509     }
510     return;
511 }
512 
GetDSchedContinueByDevId(const std::u16string & devId,int32_t missionId)513 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
514     const std::u16string& devId, int32_t missionId)
515 {
516     std::string deviceId = Str16ToStr8(devId);
517     HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
518     {
519         std::lock_guard<std::mutex> continueLock(continueMutex_);
520         if (continues_.empty()) {
521             HILOGE("No continuation in progress.");
522             return nullptr;
523         }
524         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
525             if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
526                 return iter->second;
527             }
528         }
529     }
530     HILOGE("source deviceId doesn't match an existing continuation.");
531     return nullptr;
532 }
533 
NotifyTerminateContinuation(const int32_t missionId)534 void DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
535 {
536     HILOGI("begin, missionId %{public}d", missionId);
537     auto func = [this, missionId]() {
538         HandleNotifyTerminateContinuation(missionId);
539     };
540     if (eventHandler_ == nullptr) {
541         HILOGE("eventHandler_ is nullptr.");
542         return;
543     }
544     eventHandler_->PostTask(func, TERMINATE_DELAY_TIME);
545     HILOGW("doesn't match an existing continuation.");
546 }
547 
HandleNotifyTerminateContinuation(const int32_t missionId)548 void DSchedContinueManager::HandleNotifyTerminateContinuation(const int32_t missionId)
549 {
550     HILOGI("HandleNotifyTerminateContinuation begin, missionId %{public}d", missionId);
551     {
552         std::lock_guard<std::mutex> continueLock(continueMutex_);
553         if (continues_.empty()) {
554             HILOGW("No continuation in progress.");
555             return;
556         }
557 
558         MissionStatus status;
559         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
560         int32_t ret = DmsContinueConditionMgr::GetInstance().GetMissionStatus(currentAccountId, missionId, status);
561         if (ret != ERR_OK) {
562             HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
563             return;
564         }
565         auto flag = status.launchFlag;
566         if ((flag & AAFwk::Want::FLAG_ABILITY_CONTINUATION) != AAFwk::Want::FLAG_ABILITY_CONTINUATION &&
567             (flag & AAFwk::Want::FLAG_ABILITY_PREPARE_CONTINUATION) != AAFwk::Want::FLAG_ABILITY_PREPARE_CONTINUATION) {
568             HILOGI("missionId %{public}d not start by continue, ignore", missionId);
569             return;
570         }
571         HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
572             status.bundleName.c_str(), status.abilityName.c_str());
573         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
574             if (iter->second == nullptr) {
575                 break;
576             }
577 
578             auto continueInfo = iter->second->GetContinueInfo();
579             HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
580                 continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
581             if (status.bundleName == continueInfo.sinkBundleName_
582                 && status.abilityName == continueInfo.sinkAbilityName_) {
583                 if (iter->second->hasResult_) {
584                     HILOGI("hasResult_ %{public}d", iter->second->hasResult_);
585                     return;
586                 }
587                 iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
588                 return;
589             }
590         }
591     }
592     HILOGI("HandleNotifyTerminateContinuation end.");
593 }
594 
ContinueStateCallbackRegister(StateCallbackInfo & stateCallbackInfo,sptr<IRemoteObject> callback)595 int32_t DSchedContinueManager::ContinueStateCallbackRegister(
596     StateCallbackInfo &stateCallbackInfo, sptr<IRemoteObject> callback)
597 {
598     std::shared_ptr<StateCallbackData> stateCallbackDataExist = FindStateCallbackData(stateCallbackInfo);
599     if (stateCallbackDataExist == nullptr) {
600         if (callback == nullptr) {
601             return NO_CONNECT_CALLBACK_ERR;
602         }
603         StateCallbackData stateCallbackData;
604         sptr<StateCallbackIpcDiedListener> diedListener = new StateCallbackIpcDiedListener();
605         diedListener->stateCallbackInfo_ = stateCallbackInfo;
606         stateCallbackData.diedListener = diedListener;
607         callback->AddDeathRecipient(diedListener);
608         stateCallbackData.remoteObject = callback;
609         AddStateCallbackData(stateCallbackInfo, stateCallbackData);
610         return ERR_OK;
611     }
612     stateCallbackDataExist->remoteObject = callback;
613     if (stateCallbackDataExist->state != -1) {
614         return NotifyQuickStartState(stateCallbackInfo, stateCallbackDataExist->state, stateCallbackDataExist->message);
615     }
616     return ERR_OK;
617 }
618 
ContinueStateCallbackUnRegister(StateCallbackInfo & stateCallbackInfo)619 int32_t DSchedContinueManager::ContinueStateCallbackUnRegister(StateCallbackInfo &stateCallbackInfo)
620 {
621     RemoveStateCallbackData(stateCallbackInfo);
622     return ERR_OK;
623 }
624 
NotifyQuickStartState(StateCallbackInfo & stateCallbackInfo,int32_t state,std::string message)625 int32_t DSchedContinueManager::NotifyQuickStartState(StateCallbackInfo &stateCallbackInfo,
626     int32_t state, std::string message)
627 {
628     HILOGI("NotifyQuickStartState called, state: %{public}d, message: %{public}s", state, message.c_str());
629     std::shared_ptr<StateCallbackData> stateCallbackDataExist = FindStateCallbackData(stateCallbackInfo);
630     if (stateCallbackDataExist == nullptr) {
631         StateCallbackData nweStateCallbackData;
632         nweStateCallbackData.state = state;
633         nweStateCallbackData.message = message;
634         AddStateCallbackData(stateCallbackInfo, nweStateCallbackData);
635         return ERR_OK;
636     }
637 
638     if (stateCallbackDataExist->remoteObject == nullptr) {
639         stateCallbackDataExist->state = state;
640         stateCallbackDataExist->message = message;
641         StateCallbackData *stateCallbackData = stateCallbackDataExist.get();
642         AddStateCallbackData(stateCallbackInfo, *stateCallbackData);
643         HILOGW("IPC remote object for quick start callback is null, state saved.");
644         return ERR_OK;
645     }
646     MessageParcel data;
647     if (!data.WriteInterfaceToken(CONNECTION_CALLBACK_INTERFACE_TOKEN)) {
648         HILOGE("Write interface token failed");
649         return IPC_CALL_NORESPONSE_ERR;
650     }
651 
652     if (!data.WriteInt32(state)) {
653         HILOGE("Write state failed");
654         return IPC_CALL_NORESPONSE_ERR;
655     }
656 
657     if (!data.WriteString(message)) {
658         HILOGE("Write message failed");
659         return IPC_CALL_NORESPONSE_ERR;
660     }
661 
662     MessageParcel reply;
663     MessageOption option;
664     stateCallbackDataExist->remoteObject->SendRequest(
665         static_cast<uint32_t>(IDSchedInterfaceCode::CONTINUE_STATE_CALLBACK), data, reply, option);
666     stateCallbackDataExist->state = -1;
667     stateCallbackDataExist->message = "";
668     return ERR_OK;
669 }
670 
OnContinueEnd(const DSchedContinueInfo & info)671 int32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
672 {
673     auto func = [this, info]() {
674         HandleContinueEnd(info);
675     };
676     if (eventHandler_ == nullptr) {
677         HILOGE("eventHandler_ is nullptr");
678         return INVALID_PARAMETERS_ERR;
679     }
680     eventHandler_->PostTask(func);
681     return ERR_OK;
682 }
683 
HandleContinueEnd(const DSchedContinueInfo & info)684 void DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
685 {
686     HILOGI("begin, continue info: %{public}s.", info.ToString().c_str());
687     std::lock_guard<std::mutex> continueLock(continueMutex_);
688     if (continues_.empty() || continues_.find(info) == continues_.end()) {
689         HILOGE("continue info doesn't match any existing continuation.");
690         return;
691     }
692     RemoveTimeout(info);
693     continues_.erase(info);
694     ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
695 
696     std::string localDevId;
697     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
698         HILOGE("get local deviceId failed!");
699         return;
700     }
701 
702     if (info.sinkDeviceId_ == localDevId) {
703         cntSink_--;
704     } else if (info.sourceDeviceId_ == localDevId) {
705         cntSource_--;
706     }
707     HILOGI("end.");
708 }
709 
RemoveTimeout(const DSchedContinueInfo & info)710 void DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
711 {
712     if (eventHandler_ == nullptr) {
713         HILOGE("eventHandler_ is nullptr");
714         return;
715     }
716     eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
717 }
718 
FindStateCallbackData(StateCallbackInfo & stateCallbackInfo)719 std::shared_ptr<StateCallbackData> DSchedContinueManager::FindStateCallbackData(StateCallbackInfo &stateCallbackInfo)
720 {
721     std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
722     auto lastResult = stateCallbackCache_.find(stateCallbackInfo);
723     if (lastResult == stateCallbackCache_.end()) {
724         return nullptr;
725     }
726     return std::make_shared<StateCallbackData>(lastResult->second);
727 }
728 
AddStateCallbackData(StateCallbackInfo & stateCallbackInfo,StateCallbackData & stateCallbackData)729 void DSchedContinueManager::AddStateCallbackData(
730     StateCallbackInfo &stateCallbackInfo, StateCallbackData &stateCallbackData)
731 {
732     std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
733     stateCallbackCache_.emplace(stateCallbackInfo, stateCallbackData);
734 }
735 
RemoveStateCallbackData(StateCallbackInfo & stateCallbackInfo)736 void DSchedContinueManager::RemoveStateCallbackData(StateCallbackInfo &stateCallbackInfo)
737 {
738     std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
739     stateCallbackCache_.erase(stateCallbackInfo);
740 }
741 
OnDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)742 void DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
743 {
744     auto func = [this, sessionId, dataBuffer]() {
745         HandleDataRecv(sessionId, dataBuffer);
746     };
747     if (eventHandler_ == nullptr) {
748         HILOGE("eventHandler_ is nullptr");
749         return;
750     }
751     eventHandler_->PostTask(func);
752 }
753 
HandleDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)754 void DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
755 {
756     HILOGI("start, sessionId: %{public}d.", sessionId);
757     if (dataBuffer == nullptr) {
758         HILOGE("dataBuffer is null.");
759         return;
760     }
761     uint8_t *data = dataBuffer->Data();
762     std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
763     cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
764     if (rootValue == nullptr) {
765         HILOGE("Parse jsonStr error.");
766         return;
767     }
768     cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
769     if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
770         cJSON_Delete(rootValue);
771         HILOGE("Parse base cmd error.");
772         return;
773     }
774 
775     cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
776     cJSON_Delete(rootValue);
777     if (cmdValue == nullptr) {
778         HILOGE("Parse cmd value error.");
779         return;
780     }
781 
782     cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
783     if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
784         cJSON_Delete(cmdValue);
785         HILOGE("parse command failed");
786         return;
787     }
788     int32_t command = comvalue->valueint;
789     cJSON_Delete(cmdValue);
790     NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
791     HILOGI("end, sessionId: %{public}d.", sessionId);
792 }
793 
NotifyContinueDataRecv(int32_t sessionId,int32_t command,const std::string & jsonStr,std::shared_ptr<DSchedDataBuffer> dataBuffer)794 void DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
795     std::shared_ptr<DSchedDataBuffer> dataBuffer)
796 {
797     HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
798     std::lock_guard<std::mutex> continueLock(continueMutex_);
799     if (!continues_.empty()) {
800         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
801             if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
802                 HILOGI("sessionId %{public}d exist.", sessionId);
803                 iter->second->OnDataRecv(command, dataBuffer);
804                 return;
805             }
806         }
807     }
808 
809     if (command == DSCHED_CONTINUE_CMD_START) {
810         HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
811         auto startCmd = std::make_shared<DSchedContinueStartCmd>();
812         int32_t ret = startCmd->Unmarshal(jsonStr);
813         if (ret != ERR_OK) {
814             HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
815             return;
816         }
817         int32_t direction = CONTINUE_SINK;
818         ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
819         if (ret != ERR_OK) {
820             DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
821             HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
822             return;
823         }
824 
825         int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
826         auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId, currentAccountId);
827         newContinue->Init();
828         continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
829 
830         newContinue->OnStartCmd(startCmd->appVersion_);
831         HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().ToString().c_str());
832         return;
833     }
834     HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
835     return;
836 }
837 
CheckContinuationLimit(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t & direction)838 int32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
839     const std::string& dstDeviceId, int32_t &direction)
840 {
841     std::string localDevId;
842     if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
843         HILOGE("get local deviceId failed!");
844         return GET_LOCAL_DEVICE_ERR;
845     }
846 
847     direction = CONTINUE_SINK;
848     if (dstDeviceId == localDevId) {
849         if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
850             HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
851             return CONTINUE_ALREADY_IN_PROGRESS;
852         }
853         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
854             HILOGE("Irrecognized source device!");
855             return INVALID_PARAMETERS_ERR;
856         }
857     } else if (srcDeviceId == localDevId) {
858         if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
859             HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
860             return CONTINUE_ALREADY_IN_PROGRESS;
861         }
862         if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
863             HILOGE("Irrecognized destination device!");
864             return INVALID_PARAMETERS_ERR;
865         }
866         direction = CONTINUE_SOURCE;
867     } else {
868         HILOGE("source or target device must be local!");
869         return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
870     }
871     return ERR_OK;
872 }
873 
GetContinueInfo(std::string & srcDeviceId,std::string & dstDeviceId)874 int32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
875 {
876     HILOGI("called");
877     std::lock_guard<std::mutex> continueLock(continueMutex_);
878     if (continues_.empty()) {
879         HILOGW("No continuation in progress.");
880         return ERR_OK;
881     }
882     auto dsContinue = continues_.begin()->second;
883     if (dsContinue == nullptr) {
884         HILOGE("dContinue is null");
885         return INVALID_PARAMETERS_ERR;
886     }
887     dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
888     srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
889     return ERR_OK;
890 }
891 
OnShutdown(int32_t socket,bool isSelfCalled)892 void DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
893 {
894     if (isSelfCalled) {
895         HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
896         return;
897     }
898     HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
899     auto func = [this, socket]() {
900         std::lock_guard<std::mutex> continueLock(continueMutex_);
901         if (continues_.empty()) {
902             return;
903         }
904         for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
905             if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
906                 iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
907             }
908         }
909     };
910     if (eventHandler_ == nullptr) {
911         HILOGE("eventHandler_ is nullptr");
912         return;
913     }
914     eventHandler_->PostTask(func);
915     return;
916 }
917 
OnBind(int32_t socket,PeerSocketInfo info)918 void DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
919 {
920 }
921 
OnShutdown(int32_t socket,bool isSelfCalled)922 void DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
923 {
924     DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
925 }
926 
OnDataRecv(int32_t socket,std::shared_ptr<DSchedDataBuffer> dataBuffer)927 void DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
928 {
929     DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
930 }
931 }  // namespace DistributedSchedule
932 }  // namespace OHOS
933