• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/util/task.h"
17 
18 #include <unistd.h>
19 #include "utils/ms_utils.h"
20 #include "minddata/dataset/util/log_adapter.h"
21 #include "minddata/dataset/util/task_manager.h"
22 #if defined(__ANDROID__) || defined(ANDROID)
23 #include "minddata/dataset/util/services.h"
24 #endif
25 #ifdef ENABLE_TDTQUE
26 #include "acl/acl_tdt.h"
27 #include "tdt/status.h"
28 #include "minddata/dataset/engine/tdt/tdt_handle.h"
29 #endif
30 
31 namespace mindspore {
32 namespace dataset {
33 thread_local Task *gMyTask = nullptr;
34 
operator ()()35 void Task::operator()() {
36 #if !defined(_WIN32) && !defined(_WIN64)
37   gMyTask = this;
38 #endif
39   id_ = this_thread::get_id();
40   std::stringstream ss;
41   ss << id_;
42 #if defined(__ANDROID__) || defined(ANDROID)
43   // The thread id in Linux may be duplicate
44   ss << Services::GetUniqueID();
45 #endif
46   MS_LOG(DEBUG) << "Task: " << my_name_ << " Thread ID " << ss.str() << " Started.";
47 
48 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
49   native_handle_ = pthread_self();
50   thread_id_ = syscall(SYS_gettid);
51 #endif
52   try {
53     // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
54     // the TaskGroup pointer and register. We move the registration logic to here (after we spawn) so we can
55     // get the thread id.
56     TaskGroup *vg = MyTaskGroup();
57     rc_ = vg->GetIntrpService()->Register(ss.str(), this);
58     if (rc_.IsOk()) {
59       // Now we can run the given task.
60       rc_ = fnc_obj_();
61     }
62     // Some error codes are ignored, e.g. interrupt. Others we just shutdown the group.
63     if (rc_.IsError() && rc_ != StatusCode::kMDInterrupted) {
64       if (rc_.StatusCode() == StatusCode::kMDNetWorkError) {
65         MS_LOG(WARNING) << rc_;
66       } else {
67         MS_LOG(ERROR) << "Task: " << my_name_ << " - thread(" << ss.str() << ") is terminated with err msg: " << rc_;
68       }
69       ShutdownGroup();
70     }
71   } catch (const std::bad_alloc &e) {
72     rc_ = Status(StatusCode::kMDOutOfMemory, __LINE__, __FILE__, e.what());
73     MS_LOG(ERROR) << rc_;
74     ShutdownGroup();
75   } catch (const std::exception &e) {
76     rc_ = Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, e.what());
77     MS_LOG(ERROR) << rc_;
78     ShutdownGroup();
79   }
80 }
81 
ShutdownGroup()82 void Task::ShutdownGroup() {  // Wake up watch dog and shutdown the engine.
83   {
84     std::lock_guard<std::mutex> lk(mux_);
85     caught_severe_exception_ = true;
86   }
87   TaskGroup *vg = MyTaskGroup();
88   // If multiple threads hit severe errors in the same group. Keep the first one and
89   // discard the rest.
90   if (vg->rc_.IsOk()) {
91     std::unique_lock<std::mutex> rcLock(vg->rc_mux_);
92     // Check again after we get the lock
93     if (vg->rc_.IsOk()) {
94       vg->rc_ = rc_;
95       rcLock.unlock();
96       TaskManager::InterruptMaster(rc_);
97       TaskManager::InterruptGroup(*this);
98     }
99   }
100 }
101 
GetTaskErrorIfAny() const102 Status Task::GetTaskErrorIfAny() const {
103   std::lock_guard<std::mutex> lk(mux_);
104   if (caught_severe_exception_) {
105     return rc_;
106   } else {
107     return Status::OK();
108   }
109 }
110 
Task(const std::string & myName,const std::function<Status ()> & f,int32_t operator_id)111 Task::Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id)
112     : my_name_(myName),
113       operator_id_(operator_id),
114       rc_(),
115       fnc_obj_(f),
116       task_group_(nullptr),
117       is_master_(false),
118       running_(false),
119       caught_severe_exception_(false),
120       native_handle_(0) {
121   IntrpResource::ResetIntrpState();
122   wp_.ResetIntrpState();
123   wp_.Clear();
124 }
125 
Run()126 Status Task::Run() {
127   Status rc;
128   if (running_ == false) {
129     try {
130       thrd_ = std::async(std::launch::async, std::ref(*this));
131       running_ = true;
132       caught_severe_exception_ = false;
133     } catch (const std::exception &e) {
134       rc = Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, e.what());
135     }
136   }
137   return rc;
138 }
139 
Join(WaitFlag blocking)140 Status Task::Join(WaitFlag blocking) {
141   if (running_) {
142     RETURN_UNEXPECTED_IF_NULL(MyTaskGroup());
143     auto interrupt_svc = MyTaskGroup()->GetIntrpService();
144     try {
145       if (blocking == WaitFlag::kBlocking) {
146         // If we are asked to wait, then wait
147         thrd_.get();
148       } else if (blocking == WaitFlag::kNonBlocking) {
149         // There is a race condition in the global resource tracking such that a thread can miss the
150         // interrupt and becomes blocked on a conditional variable forever. As a result, calling
151         // join() will not come back. We need some timeout version of join such that if the thread
152         // doesn't come back in a reasonable of time, we will send the interrupt again.
153         uint32_t wait_times = 0;
154         while (thrd_.wait_for(std::chrono::seconds(1)) != std::future_status::ready) {
155           // We can't tell which conditional_variable this thread is waiting on. So we may need
156           // to interrupt everything one more time.
157           std::stringstream ss;
158           ss << get_id();
159           MS_LOG(WARNING) << MyName() << " Thread ID " << ss.str() << " is not responding. Interrupt again";
160           interrupt_svc->InterruptAll();
161           wait_times++;
162 #ifdef ENABLE_TDTQUE
163           // Because hostPush hung in DeviceQueueOp, wait 5 seconds and destroy the tdt
164           if (wait_times > 5 && my_name_.find("DeviceQueueOp") != std::string::npos) {
165             MS_LOG(WARNING) << "Wait " << wait_times << " seconds, "
166                             << "the task: " << my_name_ << " will be destroyed by TdtHostDestory.";
167             if (!TdtHandle::DestroyHandle()) {
168               MS_LOG(WARNING) << "Destroy tdt channel failed.";
169             } else {
170               MS_LOG(INFO) << "Destroy tdt channel success.";
171             }
172 
173             // just wait 30 seconds
174             // case1: cpu usage 100%, DeviceQueueOp thread may destroy without thrd_ future
175             if (wait_times > kWaitInterruptTaskTime) {
176               MS_LOG(WARNING) << MyName() << " Thread ID " << ss.str()
177                               << " is not responding. Maybe it's destroyed, task stop.";
178               break;
179             }
180           }
181 #endif
182         }
183       } else {
184         RETURN_STATUS_UNEXPECTED("Unknown WaitFlag");
185       }
186       std::stringstream ss;
187       ss << get_id();
188       MS_LOG(DEBUG) << MyName() << " Thread ID " << ss.str() << " Stopped.";
189       running_ = false;
190       RETURN_IF_NOT_OK(wp_.Deregister());
191       RETURN_IF_NOT_OK(interrupt_svc->Deregister(ss.str()));
192     } catch (const std::exception &e) {
193       RETURN_STATUS_UNEXPECTED(e.what());
194     }
195   }
196   return Status::OK();
197 }
198 
MyTaskGroup()199 TaskGroup *Task::MyTaskGroup() { return task_group_; }
200 
set_task_group(TaskGroup * vg)201 void Task::set_task_group(TaskGroup *vg) { task_group_ = vg; }
202 
~Task()203 Task::~Task() { task_group_ = nullptr; }
OverrideInterruptRc(const Status & rc)204 Status Task::OverrideInterruptRc(const Status &rc) {
205   if (rc == StatusCode::kMDInterrupted && this_thread::is_master_thread()) {
206     // If we are interrupted, override the return value if this is the master thread.
207     // Master thread is being interrupted mostly because of some thread is reporting error.
208     return TaskManager::GetMasterThreadRc();
209   }
210   return rc;
211 }
212 
213 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
GetNativeHandle() const214 pthread_t Task::GetNativeHandle() const { return native_handle_; }
215 #endif
216 
217 }  // namespace dataset
218 }  // namespace mindspore
219