• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <functional>
18 #include <set>
19 #include "securec.h"
20 #include "minddata/dataset/util/task_manager.h"
21 
22 namespace mindspore {
23 namespace dataset {
24 TaskManager *TaskManager::instance_ = nullptr;
25 std::once_flag TaskManager::init_instance_flag_;
26 // This takes the same parameter as Task constructor.
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,TaskGroup * vg,Task ** task,int32_t operator_id)27 Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg,
28                                     Task **task, int32_t operator_id) {
29   // We need to block destructor coming otherwise we will deadlock. We will grab the
30   // stateLock in shared allowing CreateAsyncTask to run concurrently.
31   SharedLock stateLck(&state_lock_);
32   // Now double check the state
33   if (ServiceState() == STATE::kStopInProg || ServiceState() == STATE::kStopped) {
34     RETURN_STATUS_ERROR(StatusCode::kMDInterrupted, "TaskManager is shutting down");
35   }
36   RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task, operator_id));
37   if (vg == nullptr) {
38     RETURN_STATUS_UNEXPECTED("TaskGroup is null");
39   }
40   // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
41   // the TaskGroup pointer. We will do the set here before we call run(). The run() will do the registration.
42   (*task)->set_task_group(vg);
43   // Link to the master lru list.
44   {
45     UniqueLock lck(&lru_lock_);
46     lru_.Append(*task);
47   }
48   // Link to the group list as well before we spawn.
49   {
50     UniqueLock lck(&vg->rw_lock_);
51     vg->grp_list_.Append(*task);
52   }
53   // Track all the TaskGroup. Used for control-c
54   {
55     LockGuard lck(&tg_lock_);
56     (void)this->grp_list_.insert(vg);
57   }
58   RETURN_IF_NOT_OK((*task)->wp_.Register(vg));
59   RETURN_IF_NOT_OK((*task)->Run());
60   // Wait for the thread to initialize successfully.
61   RETURN_IF_NOT_OK((*task)->Wait());
62   (*task)->Clear();
63   return Status::OK();
64 }
65 
join_all()66 Status TaskManager::join_all() {
67   Status rc;
68   Status rc2;
69   SharedLock lck(&lru_lock_);
70   for (Task &tk : lru_) {
71     rc = tk.Join();
72     if (rc.IsError()) {
73       rc2 = rc;
74     }
75   }
76   return rc2;
77 }
78 
interrupt_all()79 void TaskManager::interrupt_all() noexcept {
80   global_interrupt_ = 1;
81   LockGuard lck(&tg_lock_);
82   for (TaskGroup *vg : grp_list_) {
83     auto svc = vg->GetIntrpService();
84     if (svc) {
85       // Stop the interrupt service. No new request is accepted.
86       Status rc = svc->ServiceStop();
87       if (rc.IsError()) {
88         MS_LOG(ERROR) << "Error while stopping the service. Message: " << rc;
89       }
90       svc->InterruptAll();
91     }
92   }
93   master_->Interrupt();
94 }
95 
FindMe()96 Task *TaskManager::FindMe() {
97 #if !defined(_WIN32) && !defined(_WIN64)
98   return gMyTask;
99 #else
100   TaskManager &tm = TaskManager::GetInstance();
101   SharedLock lock(&tm.lru_lock_);
102   auto id = this_thread::get_id();
103   for (auto iter = tm.lru_.begin(); iter != tm.lru_.end(); ++iter) {
104     if (iter->id_ == id && iter->running_) {
105       return &(*iter);
106     }
107   }
108   // If we get here, either I am the watchdog or the master thread.
109   if (tm.master_->id_ == id) {
110     return tm.master_.get();
111   } else if (tm.watchdog_ != nullptr && tm.watchdog_->id_ == id) {
112     return tm.watchdog_;
113   }
114   MS_LOG(ERROR) << "Task not found.";
115   return nullptr;
116 #endif
117 }
118 
TaskManager()119 TaskManager::TaskManager() try : global_interrupt_(0),
120                                  lru_(&Task::node),
121                                  free_lst_(&Task::free),
122                                  watchdog_grp_(nullptr),
123                                  watchdog_(nullptr) {
124   auto alloc = Services::GetAllocator<Task>();
125   // Create a dummy Task for the master thread (this thread)
126   master_ = std::allocate_shared<Task>(alloc, "master", []() -> Status { return Status::OK(); });
127   master_->id_ = this_thread::get_id();
128   master_->running_ = true;
129   master_->is_master_ = true;
130 #if !defined(_WIN32) && !defined(_WIN64)
131   gMyTask = master_.get();
132 #if !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
133   // Initialize the semaphore for the watchdog
134   errno_t rc = sem_init(&sem_, 0, 0);
135   if (rc == -1) {
136     MS_LOG(ERROR) << "Unable to initialize a semaphore. Errno = " << rc << ".";
137     std::terminate();
138   }
139 #endif
140 #endif
141 } catch (const std::exception &e) {
142   MS_LOG(ERROR) << "MindData initialization failed: " << e.what() << ".";
143   std::terminate();
144 }
145 
~TaskManager()146 TaskManager::~TaskManager() {
147   if (watchdog_) {
148     WakeUpWatchDog();
149     auto s = watchdog_->Join();
150     if (s.IsError()) {
151       MS_LOG(ERROR) << s.ToString();
152     }
153     // watchdog_grp_ and watchdog_ pointers come from Services::GetInstance().GetServiceMemPool() which we will free it
154     // on shutdown. So no need to free these pointers one by one.
155     watchdog_grp_ = nullptr;
156     watchdog_ = nullptr;
157   }
158 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
159   (void)sem_destroy(&sem_);
160 #endif
161 }
162 
DoServiceStart()163 Status TaskManager::DoServiceStart() {
164   MS_LOG(INFO) << "Starting Task Manager.";
165 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
166   // Create a watchdog for control-c
167   std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
168   // A dummy group just for the watchdog. We aren't really using it. But most code assumes a thread must
169   // belong to a group.
170   auto f = std::bind(&TaskManager::WatchDog, this);
171   Status rc;
172   watchdog_grp_ = new (&rc, mp) TaskGroup();
173   RETURN_IF_NOT_OK(rc);
174   rc = watchdog_grp_->CreateAsyncTask("Watchdog", f, &watchdog_);
175   if (rc.IsError()) {
176     ::operator delete(watchdog_grp_, mp);
177     watchdog_grp_ = nullptr;
178     return rc;
179   }
180   (void)grp_list_.erase(watchdog_grp_);
181   lru_.Remove(watchdog_);
182 #endif
183   return Status::OK();
184 }
185 
DoServiceStop()186 Status TaskManager::DoServiceStop() {
187   WakeUpWatchDog();
188   interrupt_all();
189   return Status::OK();
190 }
191 
WatchDog()192 Status TaskManager::WatchDog() {
193   TaskManager::FindMe()->Post();
194 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
195   errno_t err = sem_wait(&sem_);
196   if (err == -1) {
197     RETURN_STATUS_UNEXPECTED("Errno = " + std::to_string(errno));
198   }
199   // We are woken up by control-c and we are going to stop all threads that are running.
200   // In addition, we also want to prevent new thread from creating. This can be done
201   // easily by calling the parent function.
202   RETURN_IF_NOT_OK(ServiceStop());
203 #endif
204   return Status::OK();
205 }
206 
207 // Follow the group link and interrupt other
208 // Task in the same group. It is used by
209 // Watchdog only.
InterruptGroup(Task & curTk)210 void TaskManager::InterruptGroup(Task &curTk) {
211   TaskGroup *vg = curTk.MyTaskGroup();
212   vg->interrupt_all();
213 }
214 
InterruptMaster(const Status & rc)215 void TaskManager::InterruptMaster(const Status &rc) {
216   TaskManager &tm = TaskManager::GetInstance();
217   std::shared_ptr<Task> master = tm.master_;
218   std::lock_guard<std::mutex> lck(master->mux_);
219   master->Interrupt();
220   if (rc.IsError() && master->rc_.IsOk()) {
221     master->rc_ = rc;
222     master->caught_severe_exception_ = true;
223     // Move log error here for some scenarios didn't call GetMasterThreadRc
224     if (master->rc_.StatusCode() != mindspore::StatusCode::kMDPyFuncException) {
225       // use python operation, the error had been raised in python layer. So disable log prompt here.
226       // non-sink + non-pyfunc -> print error
227       // non-sink + pyfunc     -> don't print error (the error has been raised in python layer)
228       // sink     + non-pyfunc -> print error
229       // sink     + pyfunc     -> print error in ShutdownGroup
230       MS_LOG(ERROR) << "MindSpore dataset is terminated with err msg: " << master->rc_;
231     }
232   }
233 }
234 
GetMasterThreadRc()235 Status TaskManager::GetMasterThreadRc() {
236   TaskManager &tm = TaskManager::GetInstance();
237   std::shared_ptr<Task> master = tm.master_;
238   Status rc = tm.master_->GetTaskErrorIfAny();
239   if (rc.IsError()) {
240     // Reset the state once we retrieve the value.
241     std::lock_guard<std::mutex> lck(master->mux_);
242     master->rc_ = Status::OK();
243     master->caught_severe_exception_ = false;
244     master->ResetIntrpState();
245   }
246   return rc;
247 }
248 
ReturnFreeTask(Task * p)249 void TaskManager::ReturnFreeTask(Task *p) noexcept {
250   // Take it out from lru_ if any
251   {
252     UniqueLock lck(&lru_lock_);
253     auto iter = lru_.begin();
254     for (; iter != lru_.end(); ++iter) {
255       if (*iter == *p) {
256         break;
257       }
258     }
259     if (iter != lru_.end()) {
260       lru_.Remove(p);
261     }
262   }
263   // We need to deallocate the string resources associated with the Task class
264   // before we cache its memory for future use.
265   p->~Task();
266   // Put it back into free list
267   {
268     LockGuard lck(&free_lock_);
269     free_lst_.Append(p);
270   }
271 }
272 
GetFreeTask(const std::string & my_name,const std::function<Status ()> & f,Task ** p,int32_t operator_id)273 Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p,
274                                 int32_t operator_id) {
275   if (p == nullptr) {
276     RETURN_STATUS_UNEXPECTED("p is null");
277   }
278   Task *q = nullptr;
279   // First try the free list
280   {
281     LockGuard lck(&free_lock_);
282     if (free_lst_.count > 0) {
283       q = free_lst_.head;
284       free_lst_.Remove(q);
285     }
286   }
287   if (q) {
288     new (q) Task(my_name, f, operator_id);
289   } else {
290     std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
291     Status rc;
292     q = new (&rc, mp) Task(my_name, f, operator_id);
293     RETURN_IF_NOT_OK(rc);
294   }
295   *p = q;
296   return Status::OK();
297 }
298 
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,Task ** ppTask,int32_t operator_id)299 Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **ppTask,
300                                   int32_t operator_id) {
301   auto pMytask = TaskManager::FindMe();
302   // We need to block ~TaskGroup coming otherwise we will deadlock. We will grab the
303   // stateLock in shared allowing CreateAsyncTask to run concurrently.
304   SharedLock state_lck(&state_lock_);
305   // Now double check the state
306   if (ServiceState() != STATE::kRunning) {
307     RETURN_STATUS_ERROR(StatusCode::kMDInterrupted, "Taskgroup is shutting down");
308   }
309   TaskManager &dm = TaskManager::GetInstance();
310   Task *pTask = nullptr;
311   // If the group is already in error, early exit too.
312   // We can't hold the rc_mux_ throughout because the thread spawned by CreateAsyncTask may hit error which
313   // will try to shutdown the group and grab the rc_mux_ and we will deadlock.
314   {
315     std::unique_lock<std::mutex> rcLock(rc_mux_);
316     if (rc_.IsError()) {
317       return pMytask->IsMasterThread() ? rc_ : Status(StatusCode::kMDInterrupted);
318     }
319   }
320   RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask, operator_id));
321   if (ppTask) {
322     *ppTask = pTask;
323   }
324   return Status::OK();
325 }
326 
interrupt_all()327 void TaskGroup::interrupt_all() noexcept {
328   // There is a racing condition if we don't stop the interrupt service at this point. New resource
329   // may come in and not being picked up after we call InterruptAll(). So stop new comers and then
330   // interrupt any existing resources.
331   (void)intrp_svc_->ServiceStop();
332   intrp_svc_->InterruptAll();
333 }
334 
join_all(Task::WaitFlag wf)335 Status TaskGroup::join_all(Task::WaitFlag wf) {
336   Status rc;
337   Status rc2;
338   SharedLock lck(&rw_lock_);
339   for (Task &tk : grp_list_) {
340     rc = tk.Join(wf);
341     if (rc.IsError()) {
342       rc2 = rc;
343     }
344   }
345   return rc2;
346 }
347 
DoServiceStop()348 Status TaskGroup::DoServiceStop() {
349   interrupt_all();
350   return (join_all(Task::WaitFlag::kNonBlocking));
351 }
352 
TaskGroup()353 TaskGroup::TaskGroup() : has_dataqueue_(false), grp_list_(&Task::group), intrp_svc_(nullptr) {
354   auto alloc = Services::GetAllocator<IntrpService>();
355   intrp_svc_ = std::allocate_shared<IntrpService>(alloc);
356   (void)Service::ServiceStart();
357 }
358 
~TaskGroup()359 TaskGroup::~TaskGroup() {
360   (void)Service::ServiceStop();
361   // The TaskGroup is going out of scope, and we can return the Task list to the free list.
362   Task *cur = grp_list_.head;
363   TaskManager &tm = TaskManager::GetInstance();
364   while (cur) {
365     Task *next = cur->group.next;
366     grp_list_.Remove(cur);
367     tm.ReturnFreeTask(cur);
368     cur = next;
369   }
370   {
371     LockGuard lck(&tm.tg_lock_);
372     (void)tm.grp_list_.erase(this);
373   }
374 }
375 
GetTaskErrorIfAny()376 Status TaskGroup::GetTaskErrorIfAny() {
377   SharedLock lck(&rw_lock_);
378   for (Task &tk : grp_list_) {
379     RETURN_IF_NOT_OK(tk.GetTaskErrorIfAny());
380   }
381   return Status::OK();
382 }
383 
GetIntrpService()384 std::shared_ptr<IntrpService> TaskGroup::GetIntrpService() { return intrp_svc_; }
385 }  // namespace dataset
386 }  // namespace mindspore
387