1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/util/task.h"
17 #include "utils/os.h"
18 #include "minddata/dataset/util/log_adapter.h"
19 #include "minddata/dataset/util/task_manager.h"
20 #if defined(__ANDROID__) || defined(ANDROID)
21 #include "minddata/dataset/util/services.h"
22 #endif
23 #ifdef WITH_BACKEND
24 #include "utils/ms_context.h"
25 #include "mindspore/ccsrc/include/backend/data_queue/data_queue_mgr.h"
26 #endif
27 namespace mindspore {
28 namespace dataset {
29 thread_local Task *gMyTask = nullptr;
30
operator ()()31 void Task::operator()() {
32 #if !defined(_WIN32) && !defined(_WIN64)
33 gMyTask = this;
34 #endif
35 id_ = this_thread::get_id();
36 std::stringstream ss;
37 ss << id_;
38 #if defined(__ANDROID__) || defined(ANDROID) || defined(__APPLE__)
39 // The thread id in Linux may be duplicate
40 ss << Services::GetUniqueID();
41 #endif
42 MS_LOG(DEBUG) << "Task: " << my_name_ << " Thread ID " << ss.str() << " Started.";
43
44 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
45 native_handle_ = pthread_self();
46 thread_id_ = syscall(SYS_gettid);
47 #endif
48
49 try {
50 // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
51 // the TaskGroup pointer and register. We move the registration logic to here (after we spawn) so we can
52 // get the thread id.
53 TaskGroup *vg = MyTaskGroup();
54 if (vg == nullptr) {
55 MS_LOG(ERROR) << "Task Group is nullptr.";
56 ShutdownGroup();
57 return;
58 }
59 std::string uuid = ss.str();
60 auto intrp_service = vg->GetIntrpService();
61 rc_ = intrp_service->Register(&uuid, this);
62 if (rc_.IsOk()) {
63 // Now we can run the given task.
64 rc_ = fnc_obj_();
65 }
66 // Some error codes are ignored, e.g. interrupt. Others we just shutdown the group.
67 if (rc_.IsError() && rc_ != StatusCode::kMDInterrupted) {
68 if (rc_.StatusCode() == StatusCode::kMDNetWorkError) {
69 MS_LOG(WARNING) << rc_;
70 } else {
71 MS_LOG(INFO) << "Task: " << my_name_ << " - thread(" << uuid << ") is terminated with err msg: " << rc_;
72 }
73 ShutdownGroup();
74 }
75 // The given function has finished running. We must change the running status immediately.
76 // Because std::async may create a new thread with the same thread ID as this thread since it has finished.
77 // Then there will be two tasks with the same thread ID in our task group, which may cause a mismatch
78 // in TaskManager::FindMe(). We can identify the exact task based on the running status there.
79 running_ = false;
80 MS_LOG(DEBUG) << "Task: " << my_name_ << " Thread ID " << ss.str() << " Finished.";
81 } catch (const std::bad_alloc &e) {
82 rc_ = STATUS_ERROR(StatusCode::kMDOutOfMemory, e.what());
83 MS_LOG(ERROR) << rc_;
84 ShutdownGroup();
85 } catch (const std::exception &e) {
86 rc_ = STATUS_ERROR(StatusCode::kMDUnexpectedError, e.what());
87 MS_LOG(INFO) << rc_;
88 ShutdownGroup();
89 }
90 }
91
ShutdownGroup()92 void Task::ShutdownGroup() { // Wake up watch dog and shutdown the engine.
93 {
94 std::lock_guard<std::mutex> lk(mux_);
95 caught_severe_exception_ = true;
96 }
97 TaskGroup *vg = MyTaskGroup();
98 // If multiple threads hit severe errors in the same group. Keep the first one and
99 // discard the rest.
100 std::unique_lock<std::mutex> rcLock(vg->rc_mux_);
101 {
102 if (vg->rc_.IsOk()) {
103 // Check again after we get the lock
104 if (vg->rc_.IsOk()) {
105 vg->rc_ = rc_;
106 rcLock.unlock();
107 TaskManager::InterruptMaster(rc_);
108 TaskManager::InterruptGroup(*this);
109 if (vg->rc_.IsError()) {
110 // InterruptMaster miss sink pyfunc scenario, thus add print here.
111 if (vg->has_dataqueue_ && vg->rc_.StatusCode() == mindspore::StatusCode::kMDPyFuncException) {
112 MS_LOG(ERROR) << "MindSpore dataset is terminated with err msg: " << vg->rc_;
113 }
114 }
115 }
116 }
117 }
118 }
119
GetTaskErrorIfAny() const120 Status Task::GetTaskErrorIfAny() const {
121 std::lock_guard<std::mutex> lk(mux_);
122 if (caught_severe_exception_) {
123 return rc_;
124 } else {
125 return Status::OK();
126 }
127 }
128
Task(const std::string & myName,const std::function<Status ()> & f,int32_t operator_id)129 Task::Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id)
130 : my_name_(myName),
131 operator_id_(operator_id),
132 thread_id_(-1),
133 rc_(),
134 fnc_obj_(f),
135 task_group_(nullptr),
136 is_master_(false),
137 running_(false),
138 caught_severe_exception_(false),
139 native_handle_(0) {
140 #if defined(_WIN32) || defined(_WIN64)
141 process_id_ = GetCurrentProcessId();
142 #else
143 process_id_ = getpid();
144 #endif
145 IntrpResource::ResetIntrpState();
146 wp_.ResetIntrpState();
147 wp_.Clear();
148 }
149
Run()150 Status Task::Run() {
151 Status rc;
152 std::lock_guard<std::mutex> lk(mux_);
153 if (running_ == false) {
154 try {
155 running_ = true;
156 thrd_ = std::async(std::launch::async, std::ref(*this));
157 caught_severe_exception_ = false;
158 } catch (const std::exception &e) {
159 rc = STATUS_ERROR(StatusCode::kMDUnexpectedError, e.what());
160 }
161 }
162 return rc;
163 }
164
Join(WaitFlag blocking)165 Status Task::Join(WaitFlag blocking) {
166 #ifdef WITH_BACKEND
167 RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
168 std::string device_target = MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET);
169 #endif
170 // If the current process is subprocess of map or batch, the getpid() != process_id_.
171 // And no need to join WatchDog.
172 if (running_ && getpid() == process_id_ && my_name_.find("WatchDog") == std::string::npos) {
173 RETURN_UNEXPECTED_IF_NULL(MyTaskGroup());
174 auto interrupt_svc = MyTaskGroup()->GetIntrpService();
175 try {
176 if (blocking == WaitFlag::kBlocking) {
177 // If we are asked to wait, then wait
178 thrd_.get();
179 } else if (blocking == WaitFlag::kNonBlocking) {
180 // There is a race condition in the global resource tracking such that a thread can miss the
181 // interrupt and becomes blocked on a conditional variable forever. As a result, calling
182 // join() will not come back. We need some timeout version of join such that if the thread
183 // doesn't come back in a reasonable of time, we will send the interrupt again.
184 uint32_t wait_times = 0;
185 const uint32_t kLogInterval = 5;
186 while (thrd_.wait_for(std::chrono::seconds(1)) != std::future_status::ready) {
187 // We can't tell which conditional_variable this thread is waiting on. So we may need
188 // to interrupt everything one more time.
189 std::stringstream ss;
190 ss << get_id();
191 wait_times++;
192 if (wait_times % kLogInterval == 0) {
193 MS_LOG(WARNING) << "Task: " << my_name_ << " Thread ID " << ss.str()
194 << " is not finished and cannot be joined. Try to interrupt again.";
195 }
196 interrupt_svc->InterruptAll();
197 #ifdef WITH_BACKEND
198 if (device_target == kAscendDevice) {
199 // Because hostPush hung in DataQueueOp, wait 5 seconds and destroy the tdt
200 if (wait_times > 5 && my_name_.find("DataQueueOp") != std::string::npos) {
201 MS_LOG(WARNING) << "Wait " << wait_times << " seconds, "
202 << "the task: " << my_name_ << " will be destroyed by TdtHostDestory.";
203 if (device::DataQueueMgr::DestoryTdtHandle()) {
204 MS_LOG(INFO) << "Destroy tdt channel success.";
205 } else {
206 MS_LOG(WARNING) << "Destroy tdt channel failed.";
207 }
208
209 // just wait 30 seconds
210 // case1: cpu usage 100%, DataQueueOp thread may destroy without thread_future
211 if (wait_times > kWaitInterruptTaskTime) {
212 MS_LOG(WARNING) << "Task: " << my_name_ << " Thread ID " << ss.str()
213 << " is not responding. Maybe it has been destroyed. Stop the task.";
214 break;
215 }
216 }
217 }
218 #endif
219 }
220 } else {
221 RETURN_STATUS_UNEXPECTED("Unknown WaitFlag");
222 }
223 std::stringstream ss;
224 ss << get_id();
225 MS_LOG(DEBUG) << "Task: " << my_name_ << " Thread ID " << ss.str() << " Stopped.";
226 running_ = false;
227 RETURN_IF_NOT_OK(wp_.Deregister());
228 RETURN_IF_NOT_OK(interrupt_svc->Deregister(ss.str()));
229 } catch (const std::exception &e) {
230 RETURN_STATUS_UNEXPECTED(e.what());
231 }
232 }
233 return Status::OK();
234 }
235
MyTaskGroup()236 TaskGroup *Task::MyTaskGroup() { return task_group_; }
237
set_task_group(TaskGroup * vg)238 void Task::set_task_group(TaskGroup *vg) { task_group_ = vg; }
239
~Task()240 Task::~Task() { task_group_ = nullptr; }
241
OverrideInterruptRc(const Status & rc)242 Status Task::OverrideInterruptRc(const Status &rc) {
243 if (rc == StatusCode::kMDInterrupted && this_thread::is_master_thread()) {
244 // If we are interrupted, override the return value if this is the master thread.
245 // Master thread is being interrupted mostly because of some thread is reporting error.
246 return TaskManager::GetMasterThreadRc();
247 }
248 return rc;
249 }
250
251 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
GetNativeHandle() const252 pthread_t Task::GetNativeHandle() const { return native_handle_; }
253 #endif
254
255 } // namespace dataset
256 } // namespace mindspore
257