• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <functional>
18 #include <set>
19 #include "./securec.h"
20 #include "minddata/dataset/util/task_manager.h"
21 
22 namespace mindspore {
23 namespace dataset {
24 TaskManager *TaskManager::instance_ = nullptr;
25 std::once_flag TaskManager::init_instance_flag_;
26 // This takes the same parameter as Task constructor.
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,TaskGroup * vg,Task ** task,int32_t operator_id)27 Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg,
28                                     Task **task, int32_t operator_id) {
29   // We need to block destructor coming otherwise we will deadlock. We will grab the
30   // stateLock in shared allowing CreateAsyncTask to run concurrently.
31   SharedLock stateLck(&state_lock_);
32   // Now double check the state
33   if (ServiceState() == STATE::kStopInProg || ServiceState() == STATE::kStopped) {
34     return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "TaskManager is shutting down");
35   }
36   RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task, operator_id));
37   if (vg == nullptr) {
38     RETURN_STATUS_UNEXPECTED("TaskGroup is null");
39   }
40   // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
41   // the TaskGroup pointer. We will do the set here before we call run(). The run() will do the registration.
42   (*task)->set_task_group(vg);
43   // Link to the master lru list.
44   {
45     UniqueLock lck(&lru_lock_);
46     lru_.Append(*task);
47   }
48   // Link to the group list as well before we spawn.
49   {
50     UniqueLock lck(&vg->rw_lock_);
51     vg->grp_list_.Append(*task);
52   }
53   // Track all the TaskGroup. Used for control-c
54   {
55     LockGuard lck(&tg_lock_);
56     (void)this->grp_list_.insert(vg);
57   }
58   RETURN_IF_NOT_OK((*task)->wp_.Register(vg));
59   RETURN_IF_NOT_OK((*task)->Run());
60   // Wait for the thread to initialize successfully.
61   RETURN_IF_NOT_OK((*task)->Wait());
62   return Status::OK();
63 }
64 
join_all()65 Status TaskManager::join_all() {
66   Status rc;
67   Status rc2;
68   SharedLock lck(&lru_lock_);
69   for (Task &tk : lru_) {
70     rc = tk.Join();
71     if (rc.IsError()) {
72       rc2 = rc;
73     }
74   }
75   return rc2;
76 }
77 
interrupt_all()78 void TaskManager::interrupt_all() noexcept {
79   global_interrupt_ = 1;
80   LockGuard lck(&tg_lock_);
81   for (TaskGroup *vg : grp_list_) {
82     auto svc = vg->GetIntrpService();
83     if (svc) {
84       // Stop the interrupt service. No new request is accepted.
85       Status rc = svc->ServiceStop();
86       if (rc.IsError()) MS_LOG(ERROR) << "Error while stopping the service. Message: " << rc;
87       svc->InterruptAll();
88     }
89   }
90   master_->Interrupt();
91 }
92 
FindMe()93 Task *TaskManager::FindMe() {
94 #if !defined(_WIN32) && !defined(_WIN64)
95   return gMyTask;
96 #else
97   TaskManager &tm = TaskManager::GetInstance();
98   SharedLock lock(&tm.lru_lock_);
99   auto id = this_thread::get_id();
100   auto tk = std::find_if(tm.lru_.begin(), tm.lru_.end(), [id](const Task &tk) { return tk.id_ == id; });
101   if (tk != tm.lru_.end()) {
102     return &(*tk);
103   }
104   // If we get here, either I am the watchdog or the master thread.
105   if (tm.master_->id_ == id) {
106     return tm.master_.get();
107   } else if (tm.watchdog_ != nullptr && tm.watchdog_->id_ == id) {
108     return tm.watchdog_;
109   }
110   MS_LOG(ERROR) << "Task not found.";
111   return nullptr;
112 #endif
113 }
114 
TaskManager()115 TaskManager::TaskManager() try : global_interrupt_(0),
116                                  lru_(&Task::node),
117                                  free_lst_(&Task::free),
118                                  watchdog_grp_(nullptr),
119                                  watchdog_(nullptr) {
120   auto alloc = Services::GetAllocator<Task>();
121   // Create a dummy Task for the master thread (this thread)
122   master_ = std::allocate_shared<Task>(alloc, "master", []() -> Status { return Status::OK(); });
123   master_->id_ = this_thread::get_id();
124   master_->running_ = true;
125   master_->is_master_ = true;
126 #if !defined(_WIN32) && !defined(_WIN64)
127   gMyTask = master_.get();
128 #if !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
129   // Initialize the semaphore for the watchdog
130   errno_t rc = sem_init(&sem_, 0, 0);
131   if (rc == -1) {
132     MS_LOG(ERROR) << "Unable to initialize a semaphore. Errno = " << rc << ".";
133     std::terminate();
134   }
135 #endif
136 #endif
137 } catch (const std::exception &e) {
138   MS_LOG(ERROR) << "MindData initialization failed: " << e.what() << ".";
139   std::terminate();
140 }
141 
~TaskManager()142 TaskManager::~TaskManager() {
143   if (watchdog_) {
144     WakeUpWatchDog();
145     (void)watchdog_->Join();
146     // watchdog_grp_ and watchdog_ pointers come from Services::GetInstance().GetServiceMemPool() which we will free it
147     // on shutdown. So no need to free these pointers one by one.
148     watchdog_grp_ = nullptr;
149     watchdog_ = nullptr;
150   }
151 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
152   (void)sem_destroy(&sem_);
153 #endif
154 }
155 
DoServiceStart()156 Status TaskManager::DoServiceStart() {
157   MS_LOG(INFO) << "Starting Task Manager.";
158 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
159   // Create a watchdog for control-c
160   std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
161   // A dummy group just for the watchdog. We aren't really using it. But most code assumes a thread must
162   // belong to a group.
163   auto f = std::bind(&TaskManager::WatchDog, this);
164   Status rc;
165   watchdog_grp_ = new (&rc, mp) TaskGroup();
166   RETURN_IF_NOT_OK(rc);
167   rc = watchdog_grp_->CreateAsyncTask("Watchdog", f, &watchdog_);
168   if (rc.IsError()) {
169     ::operator delete(watchdog_grp_, mp);
170     watchdog_grp_ = nullptr;
171     return rc;
172   }
173   (void)grp_list_.erase(watchdog_grp_);
174   lru_.Remove(watchdog_);
175 #endif
176   return Status::OK();
177 }
178 
DoServiceStop()179 Status TaskManager::DoServiceStop() {
180   WakeUpWatchDog();
181   interrupt_all();
182   return Status::OK();
183 }
184 
WatchDog()185 Status TaskManager::WatchDog() {
186   TaskManager::FindMe()->Post();
187 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
188   errno_t err = sem_wait(&sem_);
189   if (err == -1) {
190     RETURN_STATUS_UNEXPECTED("Errno = " + std::to_string(errno));
191   }
192   // We are woken up by control-c and we are going to stop all threads that are running.
193   // In addition, we also want to prevent new thread from creating. This can be done
194   // easily by calling the parent function.
195   RETURN_IF_NOT_OK(ServiceStop());
196 #endif
197   return Status::OK();
198 }
199 
200 // Follow the group link and interrupt other
201 // Task in the same group. It is used by
202 // Watchdog only.
InterruptGroup(Task & curTk)203 void TaskManager::InterruptGroup(Task &curTk) {
204   TaskGroup *vg = curTk.MyTaskGroup();
205   vg->interrupt_all();
206 }
207 
InterruptMaster(const Status & rc)208 void TaskManager::InterruptMaster(const Status &rc) {
209   TaskManager &tm = TaskManager::GetInstance();
210   std::shared_ptr<Task> master = tm.master_;
211   std::lock_guard<std::mutex> lck(master->mux_);
212   master->Interrupt();
213   if (rc.IsError() && master->rc_.IsOk()) {
214     master->rc_ = rc;
215     master->caught_severe_exception_ = true;
216     // Move log error here for some scenarios didn't call GetMasterThreadRc
217     MS_LOG(ERROR) << "Task is terminated with err msg(more detail in info level log):" << master->rc_;
218   }
219 }
220 
GetMasterThreadRc()221 Status TaskManager::GetMasterThreadRc() {
222   TaskManager &tm = TaskManager::GetInstance();
223   std::shared_ptr<Task> master = tm.master_;
224   Status rc = tm.master_->GetTaskErrorIfAny();
225   if (rc.IsError()) {
226     // Reset the state once we retrieve the value.
227     std::lock_guard<std::mutex> lck(master->mux_);
228     master->rc_ = Status::OK();
229     master->caught_severe_exception_ = false;
230     master->ResetIntrpState();
231   }
232   return rc;
233 }
234 
ReturnFreeTask(Task * p)235 void TaskManager::ReturnFreeTask(Task *p) noexcept {
236   // Take it out from lru_ if any
237   {
238     UniqueLock lck(&lru_lock_);
239     auto it = std::find(lru_.begin(), lru_.end(), *p);
240     if (it != lru_.end()) {
241       lru_.Remove(p);
242     }
243   }
244   // We need to deallocate the string resources associated with the Task class
245   // before we cache its memory for future use.
246   p->~Task();
247   // Put it back into free list
248   {
249     LockGuard lck(&free_lock_);
250     free_lst_.Append(p);
251   }
252 }
253 
GetFreeTask(const std::string & my_name,const std::function<Status ()> & f,Task ** p,int32_t operator_id)254 Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p,
255                                 int32_t operator_id) {
256   if (p == nullptr) {
257     RETURN_STATUS_UNEXPECTED("p is null");
258   }
259   Task *q = nullptr;
260   // First try the free list
261   {
262     LockGuard lck(&free_lock_);
263     if (free_lst_.count > 0) {
264       q = free_lst_.head;
265       free_lst_.Remove(q);
266     }
267   }
268   if (q) {
269     new (q) Task(my_name, f, operator_id);
270   } else {
271     std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
272     Status rc;
273     q = new (&rc, mp) Task(my_name, f, operator_id);
274     RETURN_IF_NOT_OK(rc);
275   }
276   *p = q;
277   return Status::OK();
278 }
279 
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,Task ** ppTask,int32_t operator_id)280 Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **ppTask,
281                                   int32_t operator_id) {
282   auto pMytask = TaskManager::FindMe();
283   // We need to block ~TaskGroup coming otherwise we will deadlock. We will grab the
284   // stateLock in shared allowing CreateAsyncTask to run concurrently.
285   SharedLock state_lck(&state_lock_);
286   // Now double check the state
287   if (ServiceState() != STATE::kRunning) {
288     return Status(StatusCode::kMDInterrupted, __LINE__, __FILE__, "Taskgroup is shutting down");
289   }
290   TaskManager &dm = TaskManager::GetInstance();
291   Task *pTask = nullptr;
292   // If the group is already in error, early exit too.
293   // We can't hold the rc_mux_ throughout because the thread spawned by CreateAsyncTask may hit error which
294   // will try to shutdown the group and grab the rc_mux_ and we will deadlock.
295   {
296     std::unique_lock<std::mutex> rcLock(rc_mux_);
297     if (rc_.IsError()) {
298       return pMytask->IsMasterThread() ? rc_ : Status(StatusCode::kMDInterrupted);
299     }
300   }
301   RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask, operator_id));
302   if (ppTask) {
303     *ppTask = pTask;
304   }
305   return Status::OK();
306 }
307 
interrupt_all()308 void TaskGroup::interrupt_all() noexcept {
309   // There is a racing condition if we don't stop the interrupt service at this point. New resource
310   // may come in and not being picked up after we call InterruptAll(). So stop new comers and then
311   // interrupt any existing resources.
312   (void)intrp_svc_->ServiceStop();
313   intrp_svc_->InterruptAll();
314 }
315 
join_all(Task::WaitFlag wf)316 Status TaskGroup::join_all(Task::WaitFlag wf) {
317   Status rc;
318   Status rc2;
319   SharedLock lck(&rw_lock_);
320   for (Task &tk : grp_list_) {
321     rc = tk.Join(wf);
322     if (rc.IsError()) {
323       rc2 = rc;
324     }
325   }
326   return rc2;
327 }
328 
DoServiceStop()329 Status TaskGroup::DoServiceStop() {
330   interrupt_all();
331   return (join_all(Task::WaitFlag::kNonBlocking));
332 }
333 
TaskGroup()334 TaskGroup::TaskGroup() : grp_list_(&Task::group), intrp_svc_(nullptr) {
335   auto alloc = Services::GetAllocator<IntrpService>();
336   intrp_svc_ = std::allocate_shared<IntrpService>(alloc);
337   (void)Service::ServiceStart();
338 }
339 
~TaskGroup()340 TaskGroup::~TaskGroup() {
341   (void)Service::ServiceStop();
342   // The TaskGroup is going out of scope, and we can return the Task list to the free list.
343   Task *cur = grp_list_.head;
344   TaskManager &tm = TaskManager::GetInstance();
345   while (cur) {
346     Task *next = cur->group.next;
347     grp_list_.Remove(cur);
348     tm.ReturnFreeTask(cur);
349     cur = next;
350   }
351   {
352     LockGuard lck(&tm.tg_lock_);
353     (void)tm.grp_list_.erase(this);
354   }
355 }
356 
GetTaskErrorIfAny()357 Status TaskGroup::GetTaskErrorIfAny() {
358   SharedLock lck(&rw_lock_);
359   for (Task &tk : grp_list_) {
360     RETURN_IF_NOT_OK(tk.GetTaskErrorIfAny());
361   }
362   return Status::OK();
363 }
364 
GetIntrpService()365 std::shared_ptr<IntrpService> TaskGroup::GetIntrpService() { return intrp_svc_; }
366 }  // namespace dataset
367 }  // namespace mindspore
368