1 /*
2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "dsched_continue_manager.h"
17
18 #include <chrono>
19 #include <sys/prctl.h>
20
21 #include "cJSON.h"
22
23 #include "continue_scene_session_handler.h"
24 #include "dfx/distributed_radar.h"
25 #include "distributed_sched_utils.h"
26 #include "dsched_transport_softbus_adapter.h"
27 #include "dtbschedmgr_device_info_storage.h"
28 #include "dtbschedmgr_log.h"
29 #include "mission/distributed_bm_storage.h"
30 #include "mission/dms_continue_condition_manager.h"
31 #include "multi_user_manager.h"
32
33 namespace OHOS {
34 namespace DistributedSchedule {
35 namespace {
36 const std::string TAG = "DSchedContinueManager";
37 const std::string DSCHED_CONTINUE_MANAGER = "dsched_continue_manager";
38 const std::string CONTINUE_TIMEOUT_TASK = "continue_timeout_task";
39 const std::u16string CONNECTION_CALLBACK_INTERFACE_TOKEN = u"ohos.abilityshell.DistributedConnection";
40 constexpr int32_t TERMINATE_DELAY_TIME = 200; //ms
41 }
42
43 IMPLEMENT_SINGLE_INSTANCE(DSchedContinueManager);
44
DSchedContinueManager()45 DSchedContinueManager::DSchedContinueManager()
46 {
47 }
48
~DSchedContinueManager()49 DSchedContinueManager::~DSchedContinueManager()
50 {
51 HILOGI("DSchedContinueManager delete");
52 UnInit();
53 }
54
Init()55 void DSchedContinueManager::Init()
56 {
57 HILOGI("Init DSchedContinueManager start");
58 {
59 std::unique_lock<std::mutex> lock(hasInitMutex_);
60 if (hasInit_) {
61 HILOGW("Init DSchedContinueManager has init");
62 return;
63 }
64 hasInit_ = true;
65 }
66
67 if (eventHandler_ != nullptr) {
68 HILOGI("DSchedContinueManager already inited, end.");
69 return;
70 }
71 DSchedTransportSoftbusAdapter::GetInstance().InitChannel();
72 softbusListener_ = std::make_shared<DSchedContinueManager::SoftbusListener>();
73 DSchedTransportSoftbusAdapter::GetInstance().RegisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
74 eventThread_ = std::thread(&DSchedContinueManager::StartEvent, this);
75 std::unique_lock<std::mutex> lock(eventMutex_);
76 eventCon_.wait(lock, [this] {
77 return eventHandler_ != nullptr;
78 });
79 HILOGI("Init DSchedContinueManager end");
80 }
81
StartEvent()82 void DSchedContinueManager::StartEvent()
83 {
84 HILOGI("StartEvent start");
85 prctl(PR_SET_NAME, DSCHED_CONTINUE_MANAGER.c_str());
86 auto runner = AppExecFwk::EventRunner::Create(false);
87 {
88 std::lock_guard<std::mutex> lock(eventMutex_);
89 eventHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(runner);
90 }
91 eventCon_.notify_one();
92 runner->Run();
93 HILOGI("StartEvent end");
94 }
95
UnInit()96 void DSchedContinueManager::UnInit()
97 {
98 HILOGI("UnInit start");
99 {
100 std::unique_lock<std::mutex> lock(hasInitMutex_);
101 if (!hasInit_) {
102 HILOGW("Init DSchedContinueManager has uninit");
103 return;
104 }
105 hasInit_ = false;
106 }
107
108 DSchedTransportSoftbusAdapter::GetInstance().UnregisterListener(SERVICE_TYPE_CONTINUE, softbusListener_);
109 continues_.clear();
110 cntSink_ = 0;
111 cntSource_ = 0;
112
113 if (eventHandler_ != nullptr) {
114 eventHandler_->GetEventRunner()->Stop();
115 if (eventThread_.joinable()) {
116 eventThread_.join();
117 }
118 eventHandler_ = nullptr;
119 } else {
120 HILOGE("eventHandler_ is nullptr");
121 }
122 HILOGI("UnInit end");
123 }
124
NotifyAllConnectDecision(std::string peerDeviceId,bool isSupport)125 void DSchedContinueManager::NotifyAllConnectDecision(std::string peerDeviceId, bool isSupport)
126 {
127 HILOGI("Notify all connect decision, peerDeviceId %{public}s, isSupport %{public}d.",
128 GetAnonymStr(peerDeviceId).c_str(), isSupport);
129 #ifdef DMSFWK_ALL_CONNECT_MGR
130 std::lock_guard<std::mutex> decisionLock(connectDecisionMutex_);
131 peerConnectDecision_[peerDeviceId] = isSupport;
132 connectDecisionCond_.notify_all();
133 #endif
134 }
135
ContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)136 int32_t DSchedContinueManager::ContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
137 int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
138 {
139 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
140 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
141 return INVALID_PARAMETERS_ERR;
142 }
143
144 std::string localDevId;
145 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
146 HILOGE("get local deviceId failed!");
147 return INVALID_PARAMETERS_ERR;
148 }
149 if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
150 HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
151 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
152 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
153 }
154 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
155 localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
156 HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
157 GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
158 return INVALID_REMOTE_PARAMETERS_ERR;
159 }
160
161 auto func = [this, srcDeviceId, dstDeviceId, missionId, callback, wantParams]() {
162 HandleContinueMission(srcDeviceId, dstDeviceId, missionId, callback, wantParams);
163 };
164 if (eventHandler_ == nullptr) {
165 HILOGE("eventHandler_ is nullptr");
166 return INVALID_PARAMETERS_ERR;
167 }
168 eventHandler_->PostTask(func);
169 return ERR_OK;
170 }
171
HandleContinueMission(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t missionId,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)172 void DSchedContinueManager::HandleContinueMission(const std::string& srcDeviceId, const std::string& dstDeviceId,
173 int32_t missionId, const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
174 {
175 HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. missionId: %{public}d.",
176 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), missionId);
177
178 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
179 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
180 return;
181 }
182
183 std::string localDevId;
184 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
185 HILOGE("get local deviceId failed!");
186 return;
187 }
188 DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, dstDeviceId, missionId);
189
190 AAFwk::MissionInfo missionInfo;
191 if (AAFwk::AbilityManagerClient::GetInstance()->GetMissionInfo("", missionId, missionInfo) == ERR_OK
192 && srcDeviceId == localDevId) {
193 info.sourceBundleName_ = missionInfo.want.GetBundle();
194 info.sinkBundleName_ = missionInfo.want.GetBundle();
195 }
196
197 HandleContinueMissionWithBundleName(info, callback, wantParams);
198 return;
199 }
200
ContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)201 int32_t DSchedContinueManager::ContinueMission(const DSchedContinueInfo& continueInfo,
202 const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
203 {
204 std::string srcDeviceId = continueInfo.sourceDeviceId_;
205 std::string dstDeviceId = continueInfo.sinkDeviceId_;
206
207 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
208 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
209 return INVALID_PARAMETERS_ERR;
210 }
211
212 std::string localDevId;
213 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
214 HILOGE("get local deviceId failed!");
215 return INVALID_PARAMETERS_ERR;
216 }
217 if (localDevId != srcDeviceId && localDevId != dstDeviceId) {
218 HILOGE("Input srcDevId: %{public}s or dstDevId: %{public}s must be locDevId: %{public}s.",
219 GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(), GetAnonymStr(localDevId).c_str());
220 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
221 }
222 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(
223 localDevId == srcDeviceId ? dstDeviceId : srcDeviceId) == nullptr) {
224 HILOGE("GetDeviceInfoById fail, locDevId: %{public}s, srcDevId: %{public}s, dstDevId: %{public}s.",
225 GetAnonymStr(localDevId).c_str(), GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str());
226 return INVALID_REMOTE_PARAMETERS_ERR;
227 }
228
229 #ifdef SUPPORT_DISTRIBUTED_MISSION_MANAGER
230 if (localDevId == srcDeviceId) {
231 int32_t missionId = -1;
232 int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
233 int32_t ret = DmsContinueConditionMgr::GetInstance().GetMissionIdByBundleName(
234 currentAccountId, continueInfo.sourceBundleName_, missionId);
235 if (ret != ERR_OK) {
236 HILOGE("get missionId fail, ret %{public}d.", ret);
237 return INVALID_PARAMETERS_ERR;
238 }
239 }
240 #endif
241
242 auto func = [this, continueInfo, callback, wantParams]() {
243 HandleContinueMission(continueInfo, callback, wantParams);
244 };
245 if (eventHandler_ == nullptr) {
246 HILOGE("eventHandler_ is nullptr");
247 return INVALID_PARAMETERS_ERR;
248 }
249 eventHandler_->PostTask(func);
250 return ERR_OK;
251 }
252
HandleContinueMission(const DSchedContinueInfo & continueInfo,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)253 void DSchedContinueManager::HandleContinueMission(const DSchedContinueInfo& continueInfo,
254 const sptr<IRemoteObject>& callback, const OHOS::AAFwk::WantParams& wantParams)
255 {
256 std::string srcDeviceId = continueInfo.sourceDeviceId_;
257 std::string dstDeviceId = continueInfo.sinkDeviceId_;
258 std::string srcBundleName = continueInfo.sourceBundleName_;
259 if (srcBundleName.empty()) {
260 srcBundleName = continueInfo.sinkBundleName_;
261 }
262 std::string bundleName = continueInfo.sinkBundleName_;
263 std::string continueType = continueInfo.continueType_;
264 HILOGI("start, srcDeviceId: %{public}s. dstDeviceId: %{public}s. bundleName: %{public}s."
265 " continueType: %{public}s.", GetAnonymStr(srcDeviceId).c_str(), GetAnonymStr(dstDeviceId).c_str(),
266 bundleName.c_str(), continueType.c_str());
267
268 if (srcDeviceId.empty() || dstDeviceId.empty() || callback == nullptr) {
269 HILOGE("srcDeviceId or dstDeviceId or callback is null!");
270 return;
271 }
272
273 DSchedContinueInfo info = DSchedContinueInfo(srcDeviceId, srcBundleName, dstDeviceId, bundleName, continueType);
274 HandleContinueMissionWithBundleName(info, callback, wantParams);
275 return;
276 }
277
GetFirstBundleName(DSchedContinueInfo & info,std::string & firstBundleName,std::string bundleName,std::string deviceId)278 bool DSchedContinueManager::GetFirstBundleName(DSchedContinueInfo &info, std::string &firstBundleName,
279 std::string bundleName, std::string deviceId)
280 {
281 uint16_t bundleNameId;
282 DmsBundleInfo distributedBundleInfo;
283 DmsBmStorage::GetInstance()->GetBundleNameId(bundleName, bundleNameId);
284 bool result = DmsBmStorage::GetInstance()->GetDistributedBundleInfo(deviceId,
285 bundleNameId, distributedBundleInfo);
286 if (!result) {
287 HILOGE("GetDistributedBundleInfo faild");
288 return false;
289 }
290 std::vector<DmsAbilityInfo> dmsAbilityInfos = distributedBundleInfo.dmsAbilityInfos;
291 for (DmsAbilityInfo &ability: dmsAbilityInfos) {
292 std::vector<std::string> abilityContinueTypes = ability.continueType;
293 for (std::string &ability_continue_type: abilityContinueTypes) {
294 if (ability_continue_type == info.continueType_ && !ability.continueBundleName.empty()) {
295 firstBundleName = *ability.continueBundleName.begin();
296 return true;
297 }
298 }
299 }
300 HILOGE("can not get abilicy info or continue bundle names is empty for continue type:%{public}s",
301 info.continueType_.c_str());
302 return false;
303 }
304
HandleContinueMissionWithBundleName(DSchedContinueInfo & info,const sptr<IRemoteObject> & callback,const OHOS::AAFwk::WantParams & wantParams)305 void DSchedContinueManager::HandleContinueMissionWithBundleName(DSchedContinueInfo &info,
306 const sptr<IRemoteObject> &callback, const OHOS::AAFwk::WantParams &wantParams)
307 {
308 int32_t direction = CONTINUE_SINK;
309 int32_t ret = CheckContinuationLimit(info.sourceDeviceId_, info.sinkDeviceId_, direction);
310 if (ret != ERR_OK) {
311 HILOGE("CheckContinuationLimit failed, ret: %{public}d", ret);
312 return;
313 }
314 int32_t subType = CONTINUE_PUSH;
315 if (direction == CONTINUE_SOURCE) {
316 cntSource_++;
317 } else {
318 cntSink_++;
319 subType = CONTINUE_PULL;
320 if (info.sourceBundleName_.empty()) {
321 HILOGW("current sub type is continue pull; but can not get source bundle name from recv cache.");
322 std::string firstBundleNamme;
323 std::string bundleName = info.sinkBundleName_;
324 std::string deviceId = info.sinkDeviceId_;
325 if (GetFirstBundleName(info, firstBundleNamme, bundleName, deviceId)) {
326 info.sourceBundleName_ = firstBundleNamme;
327 }
328 }
329 }
330 {
331 std::lock_guard<std::mutex> continueLock(continueMutex_);
332 if (!continues_.empty() && continues_.find(info) != continues_.end()) {
333 HILOGE("a same continue task is already in progress.");
334 return;
335 }
336 int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
337 auto newContinue = std::make_shared<DSchedContinue>(subType, direction, callback, info, currentAccountId);
338 newContinue->Init();
339 continues_.insert(std::make_pair(info, newContinue));
340 #ifdef DMSFWK_ALL_CONNECT_MGR
341 {
342 std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
343 std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
344 if (peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end()) {
345 peerConnectDecision_.erase(peerDeviceId);
346 }
347 }
348 #endif
349 newContinue->OnContinueMission(wantParams);
350 }
351 WaitAllConnectDecision(direction, info, CONTINUE_TIMEOUT);
352 HILOGI("end, subType: %{public}d dirction: %{public}d, continue info: %{public}s",
353 subType, direction, info.ToString().c_str());
354 }
355
WaitAllConnectDecision(int32_t direction,const DSchedContinueInfo & info,int32_t timeout)356 void DSchedContinueManager::WaitAllConnectDecision(int32_t direction, const DSchedContinueInfo &info, int32_t timeout)
357 {
358 #ifdef DMSFWK_ALL_CONNECT_MGR
359 std::string peerDeviceId = direction == CONTINUE_SOURCE ? info.sinkDeviceId_ : info.sourceDeviceId_;
360 {
361 std::unique_lock<std::mutex> decisionLock(connectDecisionMutex_);
362 connectDecisionCond_.wait_for(decisionLock, std::chrono::seconds(CONNECT_DECISION_WAIT_S),
363 [this, peerDeviceId]() {
364 return peerConnectDecision_.find(peerDeviceId) != peerConnectDecision_.end() &&
365 peerConnectDecision_.at(peerDeviceId).load();
366 });
367
368 if (peerConnectDecision_.find(peerDeviceId) == peerConnectDecision_.end()) {
369 HILOGE("Not find peerDeviceId %{public}s in peerConnectDecision.", GetAnonymStr(peerDeviceId).c_str());
370 SetTimeOut(info, 0);
371 return;
372 }
373 if (!peerConnectDecision_.at(peerDeviceId).load()) {
374 HILOGE("All connect manager refuse bind to PeerDeviceId %{public}s.", GetAnonymStr(peerDeviceId).c_str());
375 peerConnectDecision_.erase(peerDeviceId);
376 SetTimeOut(info, 0);
377 return;
378 }
379 peerConnectDecision_.erase(peerDeviceId);
380 }
381 #endif
382 SetTimeOut(info, timeout);
383 }
384
SetTimeOut(const DSchedContinueInfo & info,int32_t timeout)385 void DSchedContinueManager::SetTimeOut(const DSchedContinueInfo &info, int32_t timeout)
386 {
387 auto func = [this, info]() {
388 if (continues_.empty() || continues_.count(info) == 0) {
389 HILOGE("continue not exist.");
390 return;
391 }
392 HILOGE("continue timeout! info: %{public}s", info.ToString().c_str());
393 auto dsContinue = continues_[info];
394 if (dsContinue != nullptr) {
395 dsContinue->OnContinueEnd(CONTINUE_ABILITY_TIMEOUT_ERR);
396 }
397 };
398 if (eventHandler_ == nullptr) {
399 HILOGE("eventHandler_ is nullptr");
400 return;
401 }
402 timeout > 0 ? eventHandler_->PostTask(func, info.ToStringIgnoreMissionId(), timeout) :
403 eventHandler_->PostTask(func);
404 }
405
StartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)406 int32_t DSchedContinueManager::StartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
407 int32_t callerUid, int32_t status, uint32_t accessToken)
408 {
409 std::string dstDeviceId = want.GetElement().GetDeviceID();
410 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
411 HILOGE("GetDeviceInfoById fail, dstDevId: %{public}s.", GetAnonymStr(dstDeviceId).c_str());
412 return INVALID_REMOTE_PARAMETERS_ERR;
413 }
414 if (GetDSchedContinueByWant(want, missionId) == nullptr) {
415 HILOGE("GetDSchedContinueByWant fail, dstDevId: %{public}s, missionId: %{public}d.",
416 GetAnonymStr(dstDeviceId).c_str(), missionId);
417 return INVALID_REMOTE_PARAMETERS_ERR;
418 }
419
420 auto func = [this, want, missionId, callerUid, status, accessToken]() {
421 HandleStartContinuation(want, missionId, callerUid, status, accessToken);
422 };
423 if (eventHandler_ == nullptr) {
424 HILOGE("eventHandler_ is nullptr");
425 return INVALID_PARAMETERS_ERR;
426 }
427 eventHandler_->PostTask(func);
428 return ERR_OK;
429 }
430
HandleStartContinuation(const OHOS::AAFwk::Want & want,int32_t missionId,int32_t callerUid,int32_t status,uint32_t accessToken)431 void DSchedContinueManager::HandleStartContinuation(const OHOS::AAFwk::Want& want, int32_t missionId,
432 int32_t callerUid, int32_t status, uint32_t accessToken)
433 {
434 HILOGI("begin");
435 auto dContinue = GetDSchedContinueByWant(want, missionId);
436 if (dContinue != nullptr) {
437 dContinue->OnStartContinuation(want, callerUid, status, accessToken);
438 } else {
439 DmsRadar::GetInstance().SaveDataDmsRemoteWant("HandleStartContinuation", INVALID_PARAMETERS_ERR);
440 }
441 HILOGI("end");
442 return;
443 }
444
GetDSchedContinueByWant(const OHOS::AAFwk::Want & want,int32_t missionId)445 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByWant(
446 const OHOS::AAFwk::Want& want, int32_t missionId)
447 {
448 std::string srcDeviceId;
449 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(srcDeviceId)) {
450 DmsRadar::GetInstance().SaveDataDmsRemoteWant("GetDSchedContinueByWant", GET_LOCAL_DEVICE_ERR);
451 HILOGE("get local deviceId failed!");
452 return nullptr;
453 }
454 std::string dstDeviceId = want.GetElement().GetDeviceID();
455 std::string bundleName = want.GetElement().GetBundleName();
456 auto info = DSchedContinueInfo(srcDeviceId, bundleName, dstDeviceId, bundleName, "");
457
458 HILOGI("continue info: %{public}s.", info.ToString().c_str());
459 {
460 std::lock_guard<std::mutex> continueLock(continueMutex_);
461 if (continues_.empty()) {
462 HILOGE("continue info doesn't match an existing continuation.");
463 return nullptr;
464 }
465 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
466 if (iter->second == nullptr) {
467 continue;
468 }
469 DSchedContinueInfo continueInfo = iter->second->GetContinueInfo();
470 if (srcDeviceId == continueInfo.sourceDeviceId_
471 && bundleName == continueInfo.sourceBundleName_
472 && dstDeviceId == continueInfo.sinkDeviceId_) {
473 return iter->second;
474 }
475 }
476 }
477 HILOGE("missionId doesn't match the existing continuation, continueInfo: %{public}s.",
478 info.ToString().c_str());
479 return nullptr;
480 }
481
NotifyCompleteContinuation(const std::u16string & devId,int32_t sessionId,bool isSuccess,const std::string & callerBundleName)482 int32_t DSchedContinueManager::NotifyCompleteContinuation(const std::u16string& devId, int32_t sessionId,
483 bool isSuccess, const std::string &callerBundleName)
484 {
485 auto func = [this, devId, sessionId, isSuccess, callerBundleName]() {
486 HandleNotifyCompleteContinuation(devId, sessionId, isSuccess, callerBundleName);
487 };
488 if (eventHandler_ == nullptr) {
489 HILOGE("eventHandler_ is nullptr");
490 return INVALID_PARAMETERS_ERR;
491 }
492 eventHandler_->PostTask(func);
493 return ERR_OK;
494 }
495
HandleNotifyCompleteContinuation(const std::u16string & devId,int32_t missionId,bool isSuccess,const std::string & callerBundleName)496 void DSchedContinueManager::HandleNotifyCompleteContinuation(const std::u16string& devId, int32_t missionId,
497 bool isSuccess, const std::string &callerBundleName)
498 {
499 HILOGI("begin, isSuccess %{public}d", isSuccess);
500 auto dContinue = GetDSchedContinueByDevId(devId, missionId);
501 if (dContinue != nullptr) {
502 if (dContinue->GetContinueInfo().sinkBundleName_ != callerBundleName) {
503 HILOGE("callerBundleName doesn't match the existing continuation");
504 return;
505 }
506 dContinue->hasResult_ = true;
507 dContinue->OnNotifyComplete(missionId, isSuccess);
508 HILOGI("end, continue info: %{public}s.", dContinue->GetContinueInfo().ToString().c_str());
509 }
510 return;
511 }
512
GetDSchedContinueByDevId(const std::u16string & devId,int32_t missionId)513 std::shared_ptr<DSchedContinue> DSchedContinueManager::GetDSchedContinueByDevId(
514 const std::u16string& devId, int32_t missionId)
515 {
516 std::string deviceId = Str16ToStr8(devId);
517 HILOGI("begin, deviceId %{public}s, missionId %{public}d", GetAnonymStr(deviceId).c_str(), missionId);
518 {
519 std::lock_guard<std::mutex> continueLock(continueMutex_);
520 if (continues_.empty()) {
521 HILOGE("No continuation in progress.");
522 return nullptr;
523 }
524 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
525 if (iter->second != nullptr && deviceId == iter->second->GetContinueInfo().sourceDeviceId_) {
526 return iter->second;
527 }
528 }
529 }
530 HILOGE("source deviceId doesn't match an existing continuation.");
531 return nullptr;
532 }
533
NotifyTerminateContinuation(const int32_t missionId)534 void DSchedContinueManager::NotifyTerminateContinuation(const int32_t missionId)
535 {
536 HILOGI("begin, missionId %{public}d", missionId);
537 auto func = [this, missionId]() {
538 HandleNotifyTerminateContinuation(missionId);
539 };
540 if (eventHandler_ == nullptr) {
541 HILOGE("eventHandler_ is nullptr.");
542 return;
543 }
544 eventHandler_->PostTask(func, TERMINATE_DELAY_TIME);
545 HILOGW("doesn't match an existing continuation.");
546 }
547
HandleNotifyTerminateContinuation(const int32_t missionId)548 void DSchedContinueManager::HandleNotifyTerminateContinuation(const int32_t missionId)
549 {
550 HILOGI("HandleNotifyTerminateContinuation begin, missionId %{public}d", missionId);
551 {
552 std::lock_guard<std::mutex> continueLock(continueMutex_);
553 if (continues_.empty()) {
554 HILOGW("No continuation in progress.");
555 return;
556 }
557
558 MissionStatus status;
559 int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
560 int32_t ret = DmsContinueConditionMgr::GetInstance().GetMissionStatus(currentAccountId, missionId, status);
561 if (ret != ERR_OK) {
562 HILOGE("get continueLaunchMissionInfo failed, missionId %{public}d", missionId);
563 return;
564 }
565 auto flag = status.launchFlag;
566 if ((flag & AAFwk::Want::FLAG_ABILITY_CONTINUATION) != AAFwk::Want::FLAG_ABILITY_CONTINUATION &&
567 (flag & AAFwk::Want::FLAG_ABILITY_PREPARE_CONTINUATION) != AAFwk::Want::FLAG_ABILITY_PREPARE_CONTINUATION) {
568 HILOGI("missionId %{public}d not start by continue, ignore", missionId);
569 return;
570 }
571 HILOGI("alive missionInfo bundleName is %{public}s, abilityName is %{public}s",
572 status.bundleName.c_str(), status.abilityName.c_str());
573 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
574 if (iter->second == nullptr) {
575 break;
576 }
577
578 auto continueInfo = iter->second->GetContinueInfo();
579 HILOGI("continueInfo bundleName is %{public}s, abilityName is %{public}s",
580 continueInfo.sinkBundleName_.c_str(), continueInfo.sinkAbilityName_.c_str());
581 if (status.bundleName == continueInfo.sinkBundleName_
582 && status.abilityName == continueInfo.sinkAbilityName_) {
583 if (iter->second->hasResult_) {
584 HILOGI("hasResult_ %{public}d", iter->second->hasResult_);
585 return;
586 }
587 iter->second->OnContinueEnd(CONTINUE_SINK_ABILITY_TERMINATED);
588 return;
589 }
590 }
591 }
592 HILOGI("HandleNotifyTerminateContinuation end.");
593 }
594
ContinueStateCallbackRegister(StateCallbackInfo & stateCallbackInfo,sptr<IRemoteObject> callback)595 int32_t DSchedContinueManager::ContinueStateCallbackRegister(
596 StateCallbackInfo &stateCallbackInfo, sptr<IRemoteObject> callback)
597 {
598 std::shared_ptr<StateCallbackData> stateCallbackDataExist = FindStateCallbackData(stateCallbackInfo);
599 if (stateCallbackDataExist == nullptr) {
600 if (callback == nullptr) {
601 return NO_CONNECT_CALLBACK_ERR;
602 }
603 StateCallbackData stateCallbackData;
604 sptr<StateCallbackIpcDiedListener> diedListener = new StateCallbackIpcDiedListener();
605 diedListener->stateCallbackInfo_ = stateCallbackInfo;
606 stateCallbackData.diedListener = diedListener;
607 callback->AddDeathRecipient(diedListener);
608 stateCallbackData.remoteObject = callback;
609 AddStateCallbackData(stateCallbackInfo, stateCallbackData);
610 return ERR_OK;
611 }
612 stateCallbackDataExist->remoteObject = callback;
613 if (stateCallbackDataExist->state != -1) {
614 return NotifyQuickStartState(stateCallbackInfo, stateCallbackDataExist->state, stateCallbackDataExist->message);
615 }
616 return ERR_OK;
617 }
618
ContinueStateCallbackUnRegister(StateCallbackInfo & stateCallbackInfo)619 int32_t DSchedContinueManager::ContinueStateCallbackUnRegister(StateCallbackInfo &stateCallbackInfo)
620 {
621 RemoveStateCallbackData(stateCallbackInfo);
622 return ERR_OK;
623 }
624
NotifyQuickStartState(StateCallbackInfo & stateCallbackInfo,int32_t state,std::string message)625 int32_t DSchedContinueManager::NotifyQuickStartState(StateCallbackInfo &stateCallbackInfo,
626 int32_t state, std::string message)
627 {
628 HILOGI("NotifyQuickStartState called, state: %{public}d, message: %{public}s", state, message.c_str());
629 std::shared_ptr<StateCallbackData> stateCallbackDataExist = FindStateCallbackData(stateCallbackInfo);
630 if (stateCallbackDataExist == nullptr) {
631 StateCallbackData nweStateCallbackData;
632 nweStateCallbackData.state = state;
633 nweStateCallbackData.message = message;
634 AddStateCallbackData(stateCallbackInfo, nweStateCallbackData);
635 return ERR_OK;
636 }
637
638 if (stateCallbackDataExist->remoteObject == nullptr) {
639 stateCallbackDataExist->state = state;
640 stateCallbackDataExist->message = message;
641 StateCallbackData *stateCallbackData = stateCallbackDataExist.get();
642 AddStateCallbackData(stateCallbackInfo, *stateCallbackData);
643 HILOGW("IPC remote object for quick start callback is null, state saved.");
644 return ERR_OK;
645 }
646 MessageParcel data;
647 if (!data.WriteInterfaceToken(CONNECTION_CALLBACK_INTERFACE_TOKEN)) {
648 HILOGE("Write interface token failed");
649 return IPC_CALL_NORESPONSE_ERR;
650 }
651
652 if (!data.WriteInt32(state)) {
653 HILOGE("Write state failed");
654 return IPC_CALL_NORESPONSE_ERR;
655 }
656
657 if (!data.WriteString(message)) {
658 HILOGE("Write message failed");
659 return IPC_CALL_NORESPONSE_ERR;
660 }
661
662 MessageParcel reply;
663 MessageOption option;
664 stateCallbackDataExist->remoteObject->SendRequest(
665 static_cast<uint32_t>(IDSchedInterfaceCode::CONTINUE_STATE_CALLBACK), data, reply, option);
666 stateCallbackDataExist->state = -1;
667 stateCallbackDataExist->message = "";
668 return ERR_OK;
669 }
670
OnContinueEnd(const DSchedContinueInfo & info)671 int32_t DSchedContinueManager::OnContinueEnd(const DSchedContinueInfo& info)
672 {
673 auto func = [this, info]() {
674 HandleContinueEnd(info);
675 };
676 if (eventHandler_ == nullptr) {
677 HILOGE("eventHandler_ is nullptr");
678 return INVALID_PARAMETERS_ERR;
679 }
680 eventHandler_->PostTask(func);
681 return ERR_OK;
682 }
683
HandleContinueEnd(const DSchedContinueInfo & info)684 void DSchedContinueManager::HandleContinueEnd(const DSchedContinueInfo& info)
685 {
686 HILOGI("begin, continue info: %{public}s.", info.ToString().c_str());
687 std::lock_guard<std::mutex> continueLock(continueMutex_);
688 if (continues_.empty() || continues_.find(info) == continues_.end()) {
689 HILOGE("continue info doesn't match any existing continuation.");
690 return;
691 }
692 RemoveTimeout(info);
693 continues_.erase(info);
694 ContinueSceneSessionHandler::GetInstance().ClearContinueSessionId();
695
696 std::string localDevId;
697 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
698 HILOGE("get local deviceId failed!");
699 return;
700 }
701
702 if (info.sinkDeviceId_ == localDevId) {
703 cntSink_--;
704 } else if (info.sourceDeviceId_ == localDevId) {
705 cntSource_--;
706 }
707 HILOGI("end.");
708 }
709
RemoveTimeout(const DSchedContinueInfo & info)710 void DSchedContinueManager::RemoveTimeout(const DSchedContinueInfo& info)
711 {
712 if (eventHandler_ == nullptr) {
713 HILOGE("eventHandler_ is nullptr");
714 return;
715 }
716 eventHandler_->RemoveTask(info.ToStringIgnoreMissionId());
717 }
718
FindStateCallbackData(StateCallbackInfo & stateCallbackInfo)719 std::shared_ptr<StateCallbackData> DSchedContinueManager::FindStateCallbackData(StateCallbackInfo &stateCallbackInfo)
720 {
721 std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
722 auto lastResult = stateCallbackCache_.find(stateCallbackInfo);
723 if (lastResult == stateCallbackCache_.end()) {
724 return nullptr;
725 }
726 return std::make_shared<StateCallbackData>(lastResult->second);
727 }
728
AddStateCallbackData(StateCallbackInfo & stateCallbackInfo,StateCallbackData & stateCallbackData)729 void DSchedContinueManager::AddStateCallbackData(
730 StateCallbackInfo &stateCallbackInfo, StateCallbackData &stateCallbackData)
731 {
732 std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
733 stateCallbackCache_.emplace(stateCallbackInfo, stateCallbackData);
734 }
735
RemoveStateCallbackData(StateCallbackInfo & stateCallbackInfo)736 void DSchedContinueManager::RemoveStateCallbackData(StateCallbackInfo &stateCallbackInfo)
737 {
738 std::lock_guard<std::mutex> autolock(callbackCacheMutex_);
739 stateCallbackCache_.erase(stateCallbackInfo);
740 }
741
OnDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)742 void DSchedContinueManager::OnDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
743 {
744 auto func = [this, sessionId, dataBuffer]() {
745 HandleDataRecv(sessionId, dataBuffer);
746 };
747 if (eventHandler_ == nullptr) {
748 HILOGE("eventHandler_ is nullptr");
749 return;
750 }
751 eventHandler_->PostTask(func);
752 }
753
HandleDataRecv(int32_t sessionId,std::shared_ptr<DSchedDataBuffer> dataBuffer)754 void DSchedContinueManager::HandleDataRecv(int32_t sessionId, std::shared_ptr<DSchedDataBuffer> dataBuffer)
755 {
756 HILOGI("start, sessionId: %{public}d.", sessionId);
757 if (dataBuffer == nullptr) {
758 HILOGE("dataBuffer is null.");
759 return;
760 }
761 uint8_t *data = dataBuffer->Data();
762 std::string jsonStr(reinterpret_cast<const char *>(data), dataBuffer->Capacity());
763 cJSON *rootValue = cJSON_Parse(jsonStr.c_str());
764 if (rootValue == nullptr) {
765 HILOGE("Parse jsonStr error.");
766 return;
767 }
768 cJSON *baseCmd = cJSON_GetObjectItemCaseSensitive(rootValue, "BaseCmd");
769 if (baseCmd == nullptr || !cJSON_IsString(baseCmd) || (baseCmd->valuestring == nullptr)) {
770 cJSON_Delete(rootValue);
771 HILOGE("Parse base cmd error.");
772 return;
773 }
774
775 cJSON *cmdValue = cJSON_Parse(baseCmd->valuestring);
776 cJSON_Delete(rootValue);
777 if (cmdValue == nullptr) {
778 HILOGE("Parse cmd value error.");
779 return;
780 }
781
782 cJSON *comvalue = cJSON_GetObjectItemCaseSensitive(cmdValue, "Command");
783 if (comvalue == nullptr || !cJSON_IsNumber(comvalue)) {
784 cJSON_Delete(cmdValue);
785 HILOGE("parse command failed");
786 return;
787 }
788 int32_t command = comvalue->valueint;
789 cJSON_Delete(cmdValue);
790 NotifyContinueDataRecv(sessionId, command, jsonStr, dataBuffer);
791 HILOGI("end, sessionId: %{public}d.", sessionId);
792 }
793
NotifyContinueDataRecv(int32_t sessionId,int32_t command,const std::string & jsonStr,std::shared_ptr<DSchedDataBuffer> dataBuffer)794 void DSchedContinueManager::NotifyContinueDataRecv(int32_t sessionId, int32_t command, const std::string& jsonStr,
795 std::shared_ptr<DSchedDataBuffer> dataBuffer)
796 {
797 HILOGI("start, parsed cmd %{public}d, sessionId: %{public}d.", command, sessionId);
798 std::lock_guard<std::mutex> continueLock(continueMutex_);
799 if (!continues_.empty()) {
800 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
801 if (iter->second != nullptr && sessionId == iter->second->GetSessionId()) {
802 HILOGI("sessionId %{public}d exist.", sessionId);
803 iter->second->OnDataRecv(command, dataBuffer);
804 return;
805 }
806 }
807 }
808
809 if (command == DSCHED_CONTINUE_CMD_START) {
810 HILOGI("recv start cmd, sessionId: %{public}d.", sessionId);
811 auto startCmd = std::make_shared<DSchedContinueStartCmd>();
812 int32_t ret = startCmd->Unmarshal(jsonStr);
813 if (ret != ERR_OK) {
814 HILOGE("Unmarshal start cmd failed, ret: %{public}d", ret);
815 return;
816 }
817 int32_t direction = CONTINUE_SINK;
818 ret = CheckContinuationLimit(startCmd->srcDeviceId_, startCmd->dstDeviceId_, direction);
819 if (ret != ERR_OK) {
820 DmsRadar::GetInstance().SaveDataDmsRemoteWant("NotifyContinueDataRecv", ret);
821 HILOGE("CheckContinuationSubType failed, ret: %{public}d", ret);
822 return;
823 }
824
825 int32_t currentAccountId = MultiUserManager::GetInstance().GetForegroundUser();
826 auto newContinue = std::make_shared<DSchedContinue>(startCmd, sessionId, currentAccountId);
827 newContinue->Init();
828 continues_.insert(std::make_pair(newContinue->GetContinueInfo(), newContinue));
829
830 newContinue->OnStartCmd(startCmd->appVersion_);
831 HILOGI("end, continue info: %{public}s.", newContinue->GetContinueInfo().ToString().c_str());
832 return;
833 }
834 HILOGE("No matching session to handle cmd! sessionId: %{public}d, recv cmd %{public}d.", sessionId, command);
835 return;
836 }
837
CheckContinuationLimit(const std::string & srcDeviceId,const std::string & dstDeviceId,int32_t & direction)838 int32_t DSchedContinueManager::CheckContinuationLimit(const std::string& srcDeviceId,
839 const std::string& dstDeviceId, int32_t &direction)
840 {
841 std::string localDevId;
842 if (!DtbschedmgrDeviceInfoStorage::GetInstance().GetLocalDeviceId(localDevId)) {
843 HILOGE("get local deviceId failed!");
844 return GET_LOCAL_DEVICE_ERR;
845 }
846
847 direction = CONTINUE_SINK;
848 if (dstDeviceId == localDevId) {
849 if (cntSink_.load() >= MAX_CONCURRENT_SINK) {
850 HILOGE("can't deal more than %{public}d pull requests at the same time.", cntSink_.load());
851 return CONTINUE_ALREADY_IN_PROGRESS;
852 }
853 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(srcDeviceId) == nullptr) {
854 HILOGE("Irrecognized source device!");
855 return INVALID_PARAMETERS_ERR;
856 }
857 } else if (srcDeviceId == localDevId) {
858 if (cntSource_.load() >= MAX_CONCURRENT_SOURCE) {
859 HILOGE("can't deal more than %{public}d push requests at the same time.", cntSource_.load());
860 return CONTINUE_ALREADY_IN_PROGRESS;
861 }
862 if (DtbschedmgrDeviceInfoStorage::GetInstance().GetDeviceInfoById(dstDeviceId) == nullptr) {
863 HILOGE("Irrecognized destination device!");
864 return INVALID_PARAMETERS_ERR;
865 }
866 direction = CONTINUE_SOURCE;
867 } else {
868 HILOGE("source or target device must be local!");
869 return OPERATION_DEVICE_NOT_INITIATOR_OR_TARGET;
870 }
871 return ERR_OK;
872 }
873
GetContinueInfo(std::string & srcDeviceId,std::string & dstDeviceId)874 int32_t DSchedContinueManager::GetContinueInfo(std::string &srcDeviceId, std::string &dstDeviceId)
875 {
876 HILOGI("called");
877 std::lock_guard<std::mutex> continueLock(continueMutex_);
878 if (continues_.empty()) {
879 HILOGW("No continuation in progress.");
880 return ERR_OK;
881 }
882 auto dsContinue = continues_.begin()->second;
883 if (dsContinue == nullptr) {
884 HILOGE("dContinue is null");
885 return INVALID_PARAMETERS_ERR;
886 }
887 dstDeviceId = dsContinue->GetContinueInfo().sinkDeviceId_;
888 srcDeviceId = dsContinue->GetContinueInfo().sourceDeviceId_;
889 return ERR_OK;
890 }
891
OnShutdown(int32_t socket,bool isSelfCalled)892 void DSchedContinueManager::OnShutdown(int32_t socket, bool isSelfCalled)
893 {
894 if (isSelfCalled) {
895 HILOGW("called, shutdown by local, sessionId: %{public}d", socket);
896 return;
897 }
898 HILOGW("called, sessionId: %{public}d, isSelfCalled %{public}d", socket, isSelfCalled);
899 auto func = [this, socket]() {
900 std::lock_guard<std::mutex> continueLock(continueMutex_);
901 if (continues_.empty()) {
902 return;
903 }
904 for (auto iter = continues_.begin(); iter != continues_.end(); iter++) {
905 if (iter->second != nullptr && socket == iter->second->GetSessionId()) {
906 iter->second->OnContinueEnd(CONTINUE_SESSION_SHUTDOWN);
907 }
908 }
909 };
910 if (eventHandler_ == nullptr) {
911 HILOGE("eventHandler_ is nullptr");
912 return;
913 }
914 eventHandler_->PostTask(func);
915 return;
916 }
917
OnBind(int32_t socket,PeerSocketInfo info)918 void DSchedContinueManager::SoftbusListener::OnBind(int32_t socket, PeerSocketInfo info)
919 {
920 }
921
OnShutdown(int32_t socket,bool isSelfCalled)922 void DSchedContinueManager::SoftbusListener::OnShutdown(int32_t socket, bool isSelfCalled)
923 {
924 DSchedContinueManager::GetInstance().OnShutdown(socket, isSelfCalled);
925 }
926
OnDataRecv(int32_t socket,std::shared_ptr<DSchedDataBuffer> dataBuffer)927 void DSchedContinueManager::SoftbusListener::OnDataRecv(int32_t socket, std::shared_ptr<DSchedDataBuffer> dataBuffer)
928 {
929 DSchedContinueManager::GetInstance().OnDataRecv(socket, dataBuffer);
930 }
931 } // namespace DistributedSchedule
932 } // namespace OHOS
933