• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/util/task.h"
17 #include "utils/os.h"
18 #include "minddata/dataset/util/log_adapter.h"
19 #include "minddata/dataset/util/task_manager.h"
20 #if defined(__ANDROID__) || defined(ANDROID)
21 #include "minddata/dataset/util/services.h"
22 #endif
23 #ifdef WITH_BACKEND
24 #include "utils/ms_context.h"
25 #include "mindspore/ccsrc/include/backend/data_queue/data_queue_mgr.h"
26 #endif
27 namespace mindspore {
28 namespace dataset {
29 thread_local Task *gMyTask = nullptr;
30 
operator ()()31 void Task::operator()() {
32 #if !defined(_WIN32) && !defined(_WIN64)
33   gMyTask = this;
34 #endif
35   id_ = this_thread::get_id();
36   std::stringstream ss;
37   ss << id_;
38 #if defined(__ANDROID__) || defined(ANDROID) || defined(__APPLE__)
39   // The thread id in Linux may be duplicate
40   ss << Services::GetUniqueID();
41 #endif
42   MS_LOG(DEBUG) << "Task: " << my_name_ << " Thread ID " << ss.str() << " Started.";
43 
44 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
45   native_handle_ = pthread_self();
46   thread_id_ = syscall(SYS_gettid);
47 #endif
48 
49   try {
50     // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
51     // the TaskGroup pointer and register. We move the registration logic to here (after we spawn) so we can
52     // get the thread id.
53     TaskGroup *vg = MyTaskGroup();
54     if (vg == nullptr) {
55       MS_LOG(ERROR) << "Task Group is nullptr.";
56       ShutdownGroup();
57       return;
58     }
59     std::string uuid = ss.str();
60     auto intrp_service = vg->GetIntrpService();
61     rc_ = intrp_service->Register(&uuid, this);
62     if (rc_.IsOk()) {
63       // Now we can run the given task.
64       rc_ = fnc_obj_();
65     }
66     // Some error codes are ignored, e.g. interrupt. Others we just shutdown the group.
67     if (rc_.IsError() && rc_ != StatusCode::kMDInterrupted) {
68       if (rc_.StatusCode() == StatusCode::kMDNetWorkError) {
69         MS_LOG(WARNING) << rc_;
70       } else {
71         MS_LOG(INFO) << "Task: " << my_name_ << " - thread(" << uuid << ") is terminated with err msg: " << rc_;
72       }
73       ShutdownGroup();
74     }
75     // The given function has finished running. We must change the running status immediately.
76     // Because std::async may create a new thread with the same thread ID as this thread since it has finished.
77     // Then there will be two tasks with the same thread ID in our task group, which may cause a mismatch
78     // in TaskManager::FindMe(). We can identify the exact task based on the running status there.
79     running_ = false;
80     MS_LOG(DEBUG) << "Task: " << my_name_ << " Thread ID " << ss.str() << " Finished.";
81   } catch (const std::bad_alloc &e) {
82     rc_ = STATUS_ERROR(StatusCode::kMDOutOfMemory, e.what());
83     MS_LOG(ERROR) << rc_;
84     ShutdownGroup();
85   } catch (const std::exception &e) {
86     rc_ = STATUS_ERROR(StatusCode::kMDUnexpectedError, e.what());
87     MS_LOG(INFO) << rc_;
88     ShutdownGroup();
89   }
90 }
91 
ShutdownGroup()92 void Task::ShutdownGroup() {  // Wake up watch dog and shutdown the engine.
93   {
94     std::lock_guard<std::mutex> lk(mux_);
95     caught_severe_exception_ = true;
96   }
97   TaskGroup *vg = MyTaskGroup();
98   // If multiple threads hit severe errors in the same group. Keep the first one and
99   // discard the rest.
100   std::unique_lock<std::mutex> rcLock(vg->rc_mux_);
101   {
102     if (vg->rc_.IsOk()) {
103       // Check again after we get the lock
104       if (vg->rc_.IsOk()) {
105         vg->rc_ = rc_;
106         rcLock.unlock();
107         TaskManager::InterruptMaster(rc_);
108         TaskManager::InterruptGroup(*this);
109         if (vg->rc_.IsError()) {
110           // InterruptMaster miss sink pyfunc scenario, thus add print here.
111           if (vg->has_dataqueue_ && vg->rc_.StatusCode() == mindspore::StatusCode::kMDPyFuncException) {
112             MS_LOG(ERROR) << "MindSpore dataset is terminated with err msg: " << vg->rc_;
113           }
114         }
115       }
116     }
117   }
118 }
119 
GetTaskErrorIfAny() const120 Status Task::GetTaskErrorIfAny() const {
121   std::lock_guard<std::mutex> lk(mux_);
122   if (caught_severe_exception_) {
123     return rc_;
124   } else {
125     return Status::OK();
126   }
127 }
128 
Task(const std::string & myName,const std::function<Status ()> & f,int32_t operator_id)129 Task::Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id)
130     : my_name_(myName),
131       operator_id_(operator_id),
132       thread_id_(-1),
133       rc_(),
134       fnc_obj_(f),
135       task_group_(nullptr),
136       is_master_(false),
137       running_(false),
138       caught_severe_exception_(false),
139       native_handle_(0) {
140 #if defined(_WIN32) || defined(_WIN64)
141   process_id_ = GetCurrentProcessId();
142 #else
143   process_id_ = getpid();
144 #endif
145   IntrpResource::ResetIntrpState();
146   wp_.ResetIntrpState();
147   wp_.Clear();
148 }
149 
Run()150 Status Task::Run() {
151   Status rc;
152   std::lock_guard<std::mutex> lk(mux_);
153   if (running_ == false) {
154     try {
155       running_ = true;
156       thrd_ = std::async(std::launch::async, std::ref(*this));
157       caught_severe_exception_ = false;
158     } catch (const std::exception &e) {
159       rc = STATUS_ERROR(StatusCode::kMDUnexpectedError, e.what());
160     }
161   }
162   return rc;
163 }
164 
Join(WaitFlag blocking)165 Status Task::Join(WaitFlag blocking) {
166 #ifdef WITH_BACKEND
167   RETURN_UNEXPECTED_IF_NULL(MsContext::GetInstance());
168   std::string device_target = MsContext::GetInstance()->get_param<std::string>(MS_CTX_DEVICE_TARGET);
169 #endif
170   // If the current process is subprocess of map or batch, the getpid() != process_id_.
171   // And no need to join WatchDog.
172   if (running_ && getpid() == process_id_ && my_name_.find("WatchDog") == std::string::npos) {
173     RETURN_UNEXPECTED_IF_NULL(MyTaskGroup());
174     auto interrupt_svc = MyTaskGroup()->GetIntrpService();
175     try {
176       if (blocking == WaitFlag::kBlocking) {
177         // If we are asked to wait, then wait
178         thrd_.get();
179       } else if (blocking == WaitFlag::kNonBlocking) {
180         // There is a race condition in the global resource tracking such that a thread can miss the
181         // interrupt and becomes blocked on a conditional variable forever. As a result, calling
182         // join() will not come back. We need some timeout version of join such that if the thread
183         // doesn't come back in a reasonable of time, we will send the interrupt again.
184         uint32_t wait_times = 0;
185         const uint32_t kLogInterval = 5;
186         while (thrd_.wait_for(std::chrono::seconds(1)) != std::future_status::ready) {
187           // We can't tell which conditional_variable this thread is waiting on. So we may need
188           // to interrupt everything one more time.
189           std::stringstream ss;
190           ss << get_id();
191           wait_times++;
192           if (wait_times % kLogInterval == 0) {
193             MS_LOG(WARNING) << "Task: " << my_name_ << " Thread ID " << ss.str()
194                             << " is not finished and cannot be joined. Try to interrupt again.";
195           }
196           interrupt_svc->InterruptAll();
197 #ifdef WITH_BACKEND
198           if (device_target == kAscendDevice) {
199             // Because hostPush hung in DataQueueOp, wait 5 seconds and destroy the tdt
200             if (wait_times > 5 && my_name_.find("DataQueueOp") != std::string::npos) {
201               MS_LOG(WARNING) << "Wait " << wait_times << " seconds, "
202                               << "the task: " << my_name_ << " will be destroyed by TdtHostDestory.";
203               if (device::DataQueueMgr::DestoryTdtHandle()) {
204                 MS_LOG(INFO) << "Destroy tdt channel success.";
205               } else {
206                 MS_LOG(WARNING) << "Destroy tdt channel failed.";
207               }
208 
209               // just wait 30 seconds
210               // case1: cpu usage 100%, DataQueueOp thread may destroy without thread_future
211               if (wait_times > kWaitInterruptTaskTime) {
212                 MS_LOG(WARNING) << "Task: " << my_name_ << " Thread ID " << ss.str()
213                                 << " is not responding. Maybe it has been destroyed. Stop the task.";
214                 break;
215               }
216             }
217           }
218 #endif
219         }
220       } else {
221         RETURN_STATUS_UNEXPECTED("Unknown WaitFlag");
222       }
223       std::stringstream ss;
224       ss << get_id();
225       MS_LOG(DEBUG) << "Task: " << my_name_ << " Thread ID " << ss.str() << " Stopped.";
226       running_ = false;
227       RETURN_IF_NOT_OK(wp_.Deregister());
228       RETURN_IF_NOT_OK(interrupt_svc->Deregister(ss.str()));
229     } catch (const std::exception &e) {
230       RETURN_STATUS_UNEXPECTED(e.what());
231     }
232   }
233   return Status::OK();
234 }
235 
MyTaskGroup()236 TaskGroup *Task::MyTaskGroup() { return task_group_; }
237 
set_task_group(TaskGroup * vg)238 void Task::set_task_group(TaskGroup *vg) { task_group_ = vg; }
239 
~Task()240 Task::~Task() { task_group_ = nullptr; }
241 
OverrideInterruptRc(const Status & rc)242 Status Task::OverrideInterruptRc(const Status &rc) {
243   if (rc == StatusCode::kMDInterrupted && this_thread::is_master_thread()) {
244     // If we are interrupted, override the return value if this is the master thread.
245     // Master thread is being interrupted mostly because of some thread is reporting error.
246     return TaskManager::GetMasterThreadRc();
247   }
248   return rc;
249 }
250 
251 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
GetNativeHandle() const252 pthread_t Task::GetNativeHandle() const { return native_handle_; }
253 #endif
254 
255 }  // namespace dataset
256 }  // namespace mindspore
257