1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <algorithm>
17 #include <functional>
18 #include <set>
19 #include "./securec.h"
20 #include "minddata/dataset/util/task_manager.h"
21
22 namespace mindspore {
23 namespace dataset {
24 TaskManager *TaskManager::instance_ = nullptr;
25 std::once_flag TaskManager::init_instance_flag_;
26 // This takes the same parameter as Task constructor.
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,TaskGroup * vg,Task ** task,int32_t operator_id)27 Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg,
28 Task **task, int32_t operator_id) {
29 // We need to block destructor coming otherwise we will deadlock. We will grab the
30 // stateLock in shared allowing CreateAsyncTask to run concurrently.
31 SharedLock stateLck(&state_lock_);
32 // Now double check the state
33 if (ServiceState() == STATE::kStopInProg || ServiceState() == STATE::kStopped) {
34 return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "TaskManager is shutting down");
35 }
36 RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task, operator_id));
37 if (vg == nullptr) {
38 RETURN_STATUS_UNEXPECTED("TaskGroup is null");
39 }
40 // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
41 // the TaskGroup pointer. We will do the set here before we call run(). The run() will do the registration.
42 (*task)->set_task_group(vg);
43 // Link to the master lru list.
44 {
45 UniqueLock lck(&lru_lock_);
46 lru_.Append(*task);
47 }
48 // Link to the group list as well before we spawn.
49 {
50 UniqueLock lck(&vg->rw_lock_);
51 vg->grp_list_.Append(*task);
52 }
53 // Track all the TaskGroup. Used for control-c
54 {
55 LockGuard lck(&tg_lock_);
56 (void)this->grp_list_.insert(vg);
57 }
58 RETURN_IF_NOT_OK((*task)->wp_.Register(vg));
59 RETURN_IF_NOT_OK((*task)->Run());
60 // Wait for the thread to initialize successfully.
61 RETURN_IF_NOT_OK((*task)->Wait());
62 return Status::OK();
63 }
64
join_all()65 Status TaskManager::join_all() {
66 Status rc;
67 Status rc2;
68 SharedLock lck(&lru_lock_);
69 for (Task &tk : lru_) {
70 rc = tk.Join();
71 if (rc.IsError()) {
72 rc2 = rc;
73 }
74 }
75 return rc2;
76 }
77
interrupt_all()78 void TaskManager::interrupt_all() noexcept {
79 global_interrupt_ = 1;
80 LockGuard lck(&tg_lock_);
81 for (TaskGroup *vg : grp_list_) {
82 auto svc = vg->GetIntrpService();
83 if (svc) {
84 // Stop the interrupt service. No new request is accepted.
85 Status rc = svc->ServiceStop();
86 if (rc.IsError()) MS_LOG(ERROR) << "Error while stopping the service. Message: " << rc;
87 svc->InterruptAll();
88 }
89 }
90 master_->Interrupt();
91 }
92
FindMe()93 Task *TaskManager::FindMe() {
94 #if !defined(_WIN32) && !defined(_WIN64)
95 return gMyTask;
96 #else
97 TaskManager &tm = TaskManager::GetInstance();
98 SharedLock lock(&tm.lru_lock_);
99 auto id = this_thread::get_id();
100 auto tk = std::find_if(tm.lru_.begin(), tm.lru_.end(), [id](const Task &tk) { return tk.id_ == id; });
101 if (tk != tm.lru_.end()) {
102 return &(*tk);
103 }
104 // If we get here, either I am the watchdog or the master thread.
105 if (tm.master_->id_ == id) {
106 return tm.master_.get();
107 } else if (tm.watchdog_ != nullptr && tm.watchdog_->id_ == id) {
108 return tm.watchdog_;
109 }
110 MS_LOG(ERROR) << "Task not found.";
111 return nullptr;
112 #endif
113 }
114
TaskManager()115 TaskManager::TaskManager() try : global_interrupt_(0),
116 lru_(&Task::node),
117 free_lst_(&Task::free),
118 watchdog_grp_(nullptr),
119 watchdog_(nullptr) {
120 auto alloc = Services::GetAllocator<Task>();
121 // Create a dummy Task for the master thread (this thread)
122 master_ = std::allocate_shared<Task>(alloc, "master", []() -> Status { return Status::OK(); });
123 master_->id_ = this_thread::get_id();
124 master_->running_ = true;
125 master_->is_master_ = true;
126 #if !defined(_WIN32) && !defined(_WIN64)
127 gMyTask = master_.get();
128 #if !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
129 // Initialize the semaphore for the watchdog
130 errno_t rc = sem_init(&sem_, 0, 0);
131 if (rc == -1) {
132 MS_LOG(ERROR) << "Unable to initialize a semaphore. Errno = " << rc << ".";
133 std::terminate();
134 }
135 #endif
136 #endif
137 } catch (const std::exception &e) {
138 MS_LOG(ERROR) << "MindData initialization failed: " << e.what() << ".";
139 std::terminate();
140 }
141
~TaskManager()142 TaskManager::~TaskManager() {
143 if (watchdog_) {
144 WakeUpWatchDog();
145 (void)watchdog_->Join();
146 // watchdog_grp_ and watchdog_ pointers come from Services::GetInstance().GetServiceMemPool() which we will free it
147 // on shutdown. So no need to free these pointers one by one.
148 watchdog_grp_ = nullptr;
149 watchdog_ = nullptr;
150 }
151 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
152 (void)sem_destroy(&sem_);
153 #endif
154 }
155
DoServiceStart()156 Status TaskManager::DoServiceStart() {
157 MS_LOG(INFO) << "Starting Task Manager.";
158 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
159 // Create a watchdog for control-c
160 std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
161 // A dummy group just for the watchdog. We aren't really using it. But most code assumes a thread must
162 // belong to a group.
163 auto f = std::bind(&TaskManager::WatchDog, this);
164 Status rc;
165 watchdog_grp_ = new (&rc, mp) TaskGroup();
166 RETURN_IF_NOT_OK(rc);
167 rc = watchdog_grp_->CreateAsyncTask("Watchdog", f, &watchdog_);
168 if (rc.IsError()) {
169 ::operator delete(watchdog_grp_, mp);
170 watchdog_grp_ = nullptr;
171 return rc;
172 }
173 (void)grp_list_.erase(watchdog_grp_);
174 lru_.Remove(watchdog_);
175 #endif
176 return Status::OK();
177 }
178
DoServiceStop()179 Status TaskManager::DoServiceStop() {
180 WakeUpWatchDog();
181 interrupt_all();
182 return Status::OK();
183 }
184
WatchDog()185 Status TaskManager::WatchDog() {
186 TaskManager::FindMe()->Post();
187 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
188 errno_t err = sem_wait(&sem_);
189 if (err == -1) {
190 RETURN_STATUS_UNEXPECTED("Errno = " + std::to_string(errno));
191 }
192 // We are woken up by control-c and we are going to stop all threads that are running.
193 // In addition, we also want to prevent new thread from creating. This can be done
194 // easily by calling the parent function.
195 RETURN_IF_NOT_OK(ServiceStop());
196 #endif
197 return Status::OK();
198 }
199
200 // Follow the group link and interrupt other
201 // Task in the same group. It is used by
202 // Watchdog only.
InterruptGroup(Task & curTk)203 void TaskManager::InterruptGroup(Task &curTk) {
204 TaskGroup *vg = curTk.MyTaskGroup();
205 vg->interrupt_all();
206 }
207
InterruptMaster(const Status & rc)208 void TaskManager::InterruptMaster(const Status &rc) {
209 TaskManager &tm = TaskManager::GetInstance();
210 std::shared_ptr<Task> master = tm.master_;
211 std::lock_guard<std::mutex> lck(master->mux_);
212 master->Interrupt();
213 if (rc.IsError() && master->rc_.IsOk()) {
214 master->rc_ = rc;
215 master->caught_severe_exception_ = true;
216 // Move log error here for some scenarios didn't call GetMasterThreadRc
217 MS_LOG(ERROR) << "Task is terminated with err msg(more detail in info level log):" << master->rc_;
218 }
219 }
220
GetMasterThreadRc()221 Status TaskManager::GetMasterThreadRc() {
222 TaskManager &tm = TaskManager::GetInstance();
223 std::shared_ptr<Task> master = tm.master_;
224 Status rc = tm.master_->GetTaskErrorIfAny();
225 if (rc.IsError()) {
226 // Reset the state once we retrieve the value.
227 std::lock_guard<std::mutex> lck(master->mux_);
228 master->rc_ = Status::OK();
229 master->caught_severe_exception_ = false;
230 master->ResetIntrpState();
231 }
232 return rc;
233 }
234
ReturnFreeTask(Task * p)235 void TaskManager::ReturnFreeTask(Task *p) noexcept {
236 // Take it out from lru_ if any
237 {
238 UniqueLock lck(&lru_lock_);
239 auto it = std::find(lru_.begin(), lru_.end(), *p);
240 if (it != lru_.end()) {
241 lru_.Remove(p);
242 }
243 }
244 // We need to deallocate the string resources associated with the Task class
245 // before we cache its memory for future use.
246 p->~Task();
247 // Put it back into free list
248 {
249 LockGuard lck(&free_lock_);
250 free_lst_.Append(p);
251 }
252 }
253
GetFreeTask(const std::string & my_name,const std::function<Status ()> & f,Task ** p,int32_t operator_id)254 Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p,
255 int32_t operator_id) {
256 if (p == nullptr) {
257 RETURN_STATUS_UNEXPECTED("p is null");
258 }
259 Task *q = nullptr;
260 // First try the free list
261 {
262 LockGuard lck(&free_lock_);
263 if (free_lst_.count > 0) {
264 q = free_lst_.head;
265 free_lst_.Remove(q);
266 }
267 }
268 if (q) {
269 new (q) Task(my_name, f, operator_id);
270 } else {
271 std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
272 Status rc;
273 q = new (&rc, mp) Task(my_name, f, operator_id);
274 RETURN_IF_NOT_OK(rc);
275 }
276 *p = q;
277 return Status::OK();
278 }
279
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,Task ** ppTask,int32_t operator_id)280 Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **ppTask,
281 int32_t operator_id) {
282 auto pMytask = TaskManager::FindMe();
283 // We need to block ~TaskGroup coming otherwise we will deadlock. We will grab the
284 // stateLock in shared allowing CreateAsyncTask to run concurrently.
285 SharedLock state_lck(&state_lock_);
286 // Now double check the state
287 if (ServiceState() != STATE::kRunning) {
288 return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "Taskgroup is shutting down");
289 }
290 TaskManager &dm = TaskManager::GetInstance();
291 Task *pTask = nullptr;
292 // If the group is already in error, early exit too.
293 // We can't hold the rc_mux_ throughout because the thread spawned by CreateAsyncTask may hit error which
294 // will try to shutdown the group and grab the rc_mux_ and we will deadlock.
295 {
296 std::unique_lock<std::mutex> rcLock(rc_mux_);
297 if (rc_.IsError()) {
298 return pMytask->IsMasterThread() ? rc_ : Status(StatusCode::kMDInterrupted);
299 }
300 }
301 RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask, operator_id));
302 if (ppTask) {
303 *ppTask = pTask;
304 }
305 return Status::OK();
306 }
307
interrupt_all()308 void TaskGroup::interrupt_all() noexcept {
309 // There is a racing condition if we don't stop the interrupt service at this point. New resource
310 // may come in and not being picked up after we call InterruptAll(). So stop new comers and then
311 // interrupt any existing resources.
312 (void)intrp_svc_->ServiceStop();
313 intrp_svc_->InterruptAll();
314 }
315
join_all(Task::WaitFlag wf)316 Status TaskGroup::join_all(Task::WaitFlag wf) {
317 Status rc;
318 Status rc2;
319 SharedLock lck(&rw_lock_);
320 for (Task &tk : grp_list_) {
321 rc = tk.Join(wf);
322 if (rc.IsError()) {
323 rc2 = rc;
324 }
325 }
326 return rc2;
327 }
328
DoServiceStop()329 Status TaskGroup::DoServiceStop() {
330 interrupt_all();
331 return (join_all(Task::WaitFlag::kNonBlocking));
332 }
333
TaskGroup()334 TaskGroup::TaskGroup() : grp_list_(&Task::group), intrp_svc_(nullptr) {
335 auto alloc = Services::GetAllocator<IntrpService>();
336 intrp_svc_ = std::allocate_shared<IntrpService>(alloc);
337 (void)Service::ServiceStart();
338 }
339
~TaskGroup()340 TaskGroup::~TaskGroup() {
341 (void)Service::ServiceStop();
342 // The TaskGroup is going out of scope, and we can return the Task list to the free list.
343 Task *cur = grp_list_.head;
344 TaskManager &tm = TaskManager::GetInstance();
345 while (cur) {
346 Task *next = cur->group.next;
347 grp_list_.Remove(cur);
348 tm.ReturnFreeTask(cur);
349 cur = next;
350 }
351 {
352 LockGuard lck(&tm.tg_lock_);
353 (void)tm.grp_list_.erase(this);
354 }
355 }
356
GetTaskErrorIfAny()357 Status TaskGroup::GetTaskErrorIfAny() {
358 SharedLock lck(&rw_lock_);
359 for (Task &tk : grp_list_) {
360 RETURN_IF_NOT_OK(tk.GetTaskErrorIfAny());
361 }
362 return Status::OK();
363 }
364
GetIntrpService()365 std::shared_ptr<IntrpService> TaskGroup::GetIntrpService() { return intrp_svc_; }
366 } // namespace dataset
367 } // namespace mindspore
368