• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "runtime/graph_scheduler/actor/loop_count_actor.h"
18 #include <set>
19 #include "runtime/graph_scheduler/actor/data_prepare_actor.h"
20 #include "runtime/graph_scheduler/actor/output_actor.h"
21 #include "runtime/graph_scheduler/actor/memory_manager_actor.h"
22 #include "runtime/graph_scheduler/actor/recorder_actor.h"
23 #include "runtime/graph_scheduler/actor/debug_actor.h"
24 #include "runtime/graph_scheduler/actor/profiler_actor.h"
25 #include "runtime/graph_scheduler/actor/control_flow/entrance_actor.h"
26 #include "mindrt/include/async/async.h"
27 #include "utils/log_adapter.h"
28 #include "runtime/device/stream_synchronizer.h"
29 #include "include/backend/distributed/recovery/recovery_context.h"
30 #include "include/backend/distributed/collective/collective_manager.h"
31 #if defined(__linux__) && defined(WITH_BACKEND)
32 #include "runtime/graph_scheduler/rpc_node_scheduler.h"
33 #endif
34 
35 namespace mindspore {
36 namespace runtime {
37 using distributed::collective::CollectiveManager;
38 using distributed::recovery::RecoveryContext;
39 
Run(OpContext<DeviceTensor> * const context)40 void LoopCountActor::Run(OpContext<DeviceTensor> *const context) {
41   MS_EXCEPTION_IF_NULL(context);
42   // Need wait MemoryManagerActor running finished to avoid the illegal memory timing problem before
43   // LoopCountActor exits, because other processors which are not in actor also will process device tensor.
44   ActorDispatcher::Send(memory_manager_aid_, &MemoryManagerActor::Wait, context, GetAID());
45 }
46 
OnMemoryAllocFinish(OpContext<DeviceTensor> * const context)47 void LoopCountActor::OnMemoryAllocFinish(OpContext<DeviceTensor> *const context) {
48   MS_EXCEPTION_IF_NULL(context);
49   IncreaseLoopCount(context);
50 }
51 
IncreaseLoopCount(OpContext<DeviceTensor> * const context)52 void LoopCountActor::IncreaseLoopCount(OpContext<DeviceTensor> *const context) {
53   MS_EXCEPTION_IF_NULL(context);
54 
55   total_running_count_++;
56   current_count_++;
57   MS_LOG(INFO) << "Loop count actor(" << GetAID().Name() << ") running, loop count: " << loop_count_
58                << ", current count: " << current_count_ << ", total running count: " << total_running_count_;
59   if (!WaitRuntimePipelineFinish(context)) {
60     MS_LOG(INFO) << "Run graph failed and please check error log.";
61     return;
62   }
63 
64   // Debug actor is blocked, must wait debug actor callback message to process continue.
65   if (debug_aid_ != nullptr) {
66     SendDebugReq(context);
67     return;
68   }
69 
70   if (profiler_aid_ != nullptr) {
71     SendProfilerReq(context);
72     return;
73   }
74 
75   // Sync device stream.
76   if ((strategy_ == GraphExecutionStrategy::kPipeline) && is_need_sync_stream_) {
77     ProfilerRecorder profiler(ProfilerModule::kKernel, ProfilerEvent::kStreamSync, GetAID().Name());
78     std::set<const DeviceContext *> sync_stream_device_contexts;
79     for (auto &device_context : device_contexts_) {
80       MS_EXCEPTION_IF_NULL(device_context);
81       if ((sync_stream_device_contexts.count(device_context) == 0) &&
82           (!device::StreamSynchronizer::GetInstance()->SyncStream(device_context->device_context_key().device_name_))) {
83         SET_OPCONTEXT_FAIL_RET_WITH_ERROR((*context),
84                                           ("Sync stream failed:" + device_context->device_context_key().ToString()));
85       }
86       (void)sync_stream_device_contexts.insert(device_context);
87 
88       // Trigger disaster recovery and exit loop early.
89       if (RecoveryContext::GetInstance()->enable_recovery() && CollectiveManager::instance()->need_reinit()) {
90         current_count_ = loop_count_;
91       }
92     }
93   }
94 
95   PostRun(context);
96 }
97 
SendDebugReq(OpContext<DeviceTensor> * const context)98 void LoopCountActor::SendDebugReq(OpContext<DeviceTensor> *const context) {
99   ActorDispatcher::SendSync(*debug_aid_, &DebugActor::DebugOnStepEnd, context, &GetAID(), total_running_count_);
100   OnDebugFinish(context);
101 }
102 
SendProfilerReq(OpContext<DeviceTensor> * const context)103 void LoopCountActor::SendProfilerReq(OpContext<DeviceTensor> *const context) {
104   ActorDispatcher::SendSync(*profiler_aid_, &ProfilerActor::ProfilerOnStepEnd, context, &GetAID(),
105                             total_running_count_);
106   OnDebugFinish(context);
107 }
108 
SendOutput(OpContext<DeviceTensor> * const context)109 void LoopCountActor::SendOutput(OpContext<DeviceTensor> *const context) {
110   // Send recorder info.
111   if (recorder_aid_ != nullptr) {
112     ActorDispatcher::Send(*recorder_aid_, &RecorderActor::RecordOnStepEnd, context);
113   }
114 
115   // Send output control.
116   auto from_aid = const_cast<AID *>(&GetAID());
117   for (auto &output_control : output_control_arrows_) {
118     MS_EXCEPTION_IF_NULL(output_control);
119     ActorDispatcher::Send(output_control->to_op_id_, &OpActor::RunOpControl, from_aid, context);
120   }
121 
122   // Send to EntranceActor to clear the data which are generated in the loop body execution.
123   for (auto &entrance_aid : entrance_aids_) {
124     ActorDispatcher::Send(entrance_aid, &EntranceActor::ClearDataOnStepEnd, from_aid, context);
125   }
126 
127 #if defined(__linux__) && defined(WITH_BACKEND)
128   // Flush sent data after each step is done.
129   RpcActorStatusUpdater::GetInstance().FlushRpcData(graph_name_);
130 #endif
131 
132   // The LoopCountActor exits.
133   if (current_count_ == loop_count_) {
134     current_count_ = 0;
135     return;
136   }
137 
138   // Send to DataPrepareActor to trigger next step running.
139   ActorDispatcher::Send(data_prepare_aid_, &OpActor::RunOpControl, from_aid, context);
140 }
141 }  // namespace runtime
142 }  // namespace mindspore
143