1 /**
2 * Copyright 2019-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <algorithm>
17 #include <functional>
18 #include <set>
19 #include "securec.h"
20 #include "minddata/dataset/util/task_manager.h"
21
22 namespace mindspore {
23 namespace dataset {
24 TaskManager *TaskManager::instance_ = nullptr;
25 std::once_flag TaskManager::init_instance_flag_;
26 // This takes the same parameter as Task constructor.
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,TaskGroup * vg,Task ** task,int32_t operator_id)27 Status TaskManager::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, TaskGroup *vg,
28 Task **task, int32_t operator_id) {
29 // We need to block destructor coming otherwise we will deadlock. We will grab the
30 // stateLock in shared allowing CreateAsyncTask to run concurrently.
31 SharedLock stateLck(&state_lock_);
32 // Now double check the state
33 if (ServiceState() == STATE::kStopInProg || ServiceState() == STATE::kStopped) {
34 RETURN_STATUS_ERROR(StatusCode::kMDInterrupted, "TaskManager is shutting down");
35 }
36 RETURN_IF_NOT_OK(GetFreeTask(my_name, f, task, operator_id));
37 if (vg == nullptr) {
38 RETURN_STATUS_UNEXPECTED("TaskGroup is null");
39 }
40 // Previously there is a timing hole where the thread is spawn but hit error immediately before we can set
41 // the TaskGroup pointer. We will do the set here before we call run(). The run() will do the registration.
42 (*task)->set_task_group(vg);
43 // Link to the master lru list.
44 {
45 UniqueLock lck(&lru_lock_);
46 lru_.Append(*task);
47 }
48 // Link to the group list as well before we spawn.
49 {
50 UniqueLock lck(&vg->rw_lock_);
51 vg->grp_list_.Append(*task);
52 }
53 // Track all the TaskGroup. Used for control-c
54 {
55 LockGuard lck(&tg_lock_);
56 (void)this->grp_list_.insert(vg);
57 }
58 RETURN_IF_NOT_OK((*task)->wp_.Register(vg));
59 RETURN_IF_NOT_OK((*task)->Run());
60 // Wait for the thread to initialize successfully.
61 RETURN_IF_NOT_OK((*task)->Wait());
62 (*task)->Clear();
63 return Status::OK();
64 }
65
join_all()66 Status TaskManager::join_all() {
67 Status rc;
68 Status rc2;
69 SharedLock lck(&lru_lock_);
70 for (Task &tk : lru_) {
71 rc = tk.Join();
72 if (rc.IsError()) {
73 rc2 = rc;
74 }
75 }
76 return rc2;
77 }
78
interrupt_all()79 void TaskManager::interrupt_all() noexcept {
80 global_interrupt_ = 1;
81 LockGuard lck(&tg_lock_);
82 for (TaskGroup *vg : grp_list_) {
83 auto svc = vg->GetIntrpService();
84 if (svc) {
85 // Stop the interrupt service. No new request is accepted.
86 Status rc = svc->ServiceStop();
87 if (rc.IsError()) {
88 MS_LOG(ERROR) << "Error while stopping the service. Message: " << rc;
89 }
90 svc->InterruptAll();
91 }
92 }
93 master_->Interrupt();
94 }
95
FindMe()96 Task *TaskManager::FindMe() {
97 #if !defined(_WIN32) && !defined(_WIN64)
98 return gMyTask;
99 #else
100 TaskManager &tm = TaskManager::GetInstance();
101 SharedLock lock(&tm.lru_lock_);
102 auto id = this_thread::get_id();
103 for (auto iter = tm.lru_.begin(); iter != tm.lru_.end(); ++iter) {
104 if (iter->id_ == id && iter->running_) {
105 return &(*iter);
106 }
107 }
108 // If we get here, either I am the watchdog or the master thread.
109 if (tm.master_->id_ == id) {
110 return tm.master_.get();
111 } else if (tm.watchdog_ != nullptr && tm.watchdog_->id_ == id) {
112 return tm.watchdog_;
113 }
114 MS_LOG(ERROR) << "Task not found.";
115 return nullptr;
116 #endif
117 }
118
TaskManager()119 TaskManager::TaskManager() try : global_interrupt_(0),
120 lru_(&Task::node),
121 free_lst_(&Task::free),
122 watchdog_grp_(nullptr),
123 watchdog_(nullptr) {
124 auto alloc = Services::GetAllocator<Task>();
125 // Create a dummy Task for the master thread (this thread)
126 master_ = std::allocate_shared<Task>(alloc, "master", []() -> Status { return Status::OK(); });
127 master_->id_ = this_thread::get_id();
128 master_->running_ = true;
129 master_->is_master_ = true;
130 #if !defined(_WIN32) && !defined(_WIN64)
131 gMyTask = master_.get();
132 #if !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
133 // Initialize the semaphore for the watchdog
134 errno_t rc = sem_init(&sem_, 0, 0);
135 if (rc == -1) {
136 MS_LOG(ERROR) << "Unable to initialize a semaphore. Errno = " << rc << ".";
137 std::terminate();
138 }
139 #endif
140 #endif
141 } catch (const std::exception &e) {
142 MS_LOG(ERROR) << "MindData initialization failed: " << e.what() << ".";
143 std::terminate();
144 }
145
~TaskManager()146 TaskManager::~TaskManager() {
147 if (watchdog_) {
148 WakeUpWatchDog();
149 auto s = watchdog_->Join();
150 if (s.IsError()) {
151 MS_LOG(ERROR) << s.ToString();
152 }
153 // watchdog_grp_ and watchdog_ pointers come from Services::GetInstance().GetServiceMemPool() which we will free it
154 // on shutdown. So no need to free these pointers one by one.
155 watchdog_grp_ = nullptr;
156 watchdog_ = nullptr;
157 }
158 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
159 (void)sem_destroy(&sem_);
160 #endif
161 }
162
DoServiceStart()163 Status TaskManager::DoServiceStart() {
164 MS_LOG(INFO) << "Starting Task Manager.";
165 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
166 // Create a watchdog for control-c
167 std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
168 // A dummy group just for the watchdog. We aren't really using it. But most code assumes a thread must
169 // belong to a group.
170 auto f = std::bind(&TaskManager::WatchDog, this);
171 Status rc;
172 watchdog_grp_ = new (&rc, mp) TaskGroup();
173 RETURN_IF_NOT_OK(rc);
174 rc = watchdog_grp_->CreateAsyncTask("Watchdog", f, &watchdog_);
175 if (rc.IsError()) {
176 ::operator delete(watchdog_grp_, mp);
177 watchdog_grp_ = nullptr;
178 return rc;
179 }
180 (void)grp_list_.erase(watchdog_grp_);
181 lru_.Remove(watchdog_);
182 #endif
183 return Status::OK();
184 }
185
DoServiceStop()186 Status TaskManager::DoServiceStop() {
187 WakeUpWatchDog();
188 interrupt_all();
189 return Status::OK();
190 }
191
WatchDog()192 Status TaskManager::WatchDog() {
193 TaskManager::FindMe()->Post();
194 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
195 errno_t err = sem_wait(&sem_);
196 if (err == -1) {
197 RETURN_STATUS_UNEXPECTED("Errno = " + std::to_string(errno));
198 }
199 // We are woken up by control-c and we are going to stop all threads that are running.
200 // In addition, we also want to prevent new thread from creating. This can be done
201 // easily by calling the parent function.
202 RETURN_IF_NOT_OK(ServiceStop());
203 #endif
204 return Status::OK();
205 }
206
207 // Follow the group link and interrupt other
208 // Task in the same group. It is used by
209 // Watchdog only.
InterruptGroup(Task & curTk)210 void TaskManager::InterruptGroup(Task &curTk) {
211 TaskGroup *vg = curTk.MyTaskGroup();
212 vg->interrupt_all();
213 }
214
InterruptMaster(const Status & rc)215 void TaskManager::InterruptMaster(const Status &rc) {
216 TaskManager &tm = TaskManager::GetInstance();
217 std::shared_ptr<Task> master = tm.master_;
218 std::lock_guard<std::mutex> lck(master->mux_);
219 master->Interrupt();
220 if (rc.IsError() && master->rc_.IsOk()) {
221 master->rc_ = rc;
222 master->caught_severe_exception_ = true;
223 // Move log error here for some scenarios didn't call GetMasterThreadRc
224 if (master->rc_.StatusCode() != mindspore::StatusCode::kMDPyFuncException) {
225 // use python operation, the error had been raised in python layer. So disable log prompt here.
226 // non-sink + non-pyfunc -> print error
227 // non-sink + pyfunc -> don't print error (the error has been raised in python layer)
228 // sink + non-pyfunc -> print error
229 // sink + pyfunc -> print error in ShutdownGroup
230 MS_LOG(ERROR) << "MindSpore dataset is terminated with err msg: " << master->rc_;
231 }
232 }
233 }
234
GetMasterThreadRc()235 Status TaskManager::GetMasterThreadRc() {
236 TaskManager &tm = TaskManager::GetInstance();
237 std::shared_ptr<Task> master = tm.master_;
238 Status rc = tm.master_->GetTaskErrorIfAny();
239 if (rc.IsError()) {
240 // Reset the state once we retrieve the value.
241 std::lock_guard<std::mutex> lck(master->mux_);
242 master->rc_ = Status::OK();
243 master->caught_severe_exception_ = false;
244 master->ResetIntrpState();
245 }
246 return rc;
247 }
248
ReturnFreeTask(Task * p)249 void TaskManager::ReturnFreeTask(Task *p) noexcept {
250 // Take it out from lru_ if any
251 {
252 UniqueLock lck(&lru_lock_);
253 auto iter = lru_.begin();
254 for (; iter != lru_.end(); ++iter) {
255 if (*iter == *p) {
256 break;
257 }
258 }
259 if (iter != lru_.end()) {
260 lru_.Remove(p);
261 }
262 }
263 // We need to deallocate the string resources associated with the Task class
264 // before we cache its memory for future use.
265 p->~Task();
266 // Put it back into free list
267 {
268 LockGuard lck(&free_lock_);
269 free_lst_.Append(p);
270 }
271 }
272
GetFreeTask(const std::string & my_name,const std::function<Status ()> & f,Task ** p,int32_t operator_id)273 Status TaskManager::GetFreeTask(const std::string &my_name, const std::function<Status()> &f, Task **p,
274 int32_t operator_id) {
275 if (p == nullptr) {
276 RETURN_STATUS_UNEXPECTED("p is null");
277 }
278 Task *q = nullptr;
279 // First try the free list
280 {
281 LockGuard lck(&free_lock_);
282 if (free_lst_.count > 0) {
283 q = free_lst_.head;
284 free_lst_.Remove(q);
285 }
286 }
287 if (q) {
288 new (q) Task(my_name, f, operator_id);
289 } else {
290 std::shared_ptr<MemoryPool> mp = Services::GetInstance().GetServiceMemPool();
291 Status rc;
292 q = new (&rc, mp) Task(my_name, f, operator_id);
293 RETURN_IF_NOT_OK(rc);
294 }
295 *p = q;
296 return Status::OK();
297 }
298
CreateAsyncTask(const std::string & my_name,const std::function<Status ()> & f,Task ** ppTask,int32_t operator_id)299 Status TaskGroup::CreateAsyncTask(const std::string &my_name, const std::function<Status()> &f, Task **ppTask,
300 int32_t operator_id) {
301 auto pMytask = TaskManager::FindMe();
302 // We need to block ~TaskGroup coming otherwise we will deadlock. We will grab the
303 // stateLock in shared allowing CreateAsyncTask to run concurrently.
304 SharedLock state_lck(&state_lock_);
305 // Now double check the state
306 if (ServiceState() != STATE::kRunning) {
307 RETURN_STATUS_ERROR(StatusCode::kMDInterrupted, "Taskgroup is shutting down");
308 }
309 TaskManager &dm = TaskManager::GetInstance();
310 Task *pTask = nullptr;
311 // If the group is already in error, early exit too.
312 // We can't hold the rc_mux_ throughout because the thread spawned by CreateAsyncTask may hit error which
313 // will try to shutdown the group and grab the rc_mux_ and we will deadlock.
314 {
315 std::unique_lock<std::mutex> rcLock(rc_mux_);
316 if (rc_.IsError()) {
317 return pMytask->IsMasterThread() ? rc_ : Status(StatusCode::kMDInterrupted);
318 }
319 }
320 RETURN_IF_NOT_OK(dm.CreateAsyncTask(my_name, f, this, &pTask, operator_id));
321 if (ppTask) {
322 *ppTask = pTask;
323 }
324 return Status::OK();
325 }
326
interrupt_all()327 void TaskGroup::interrupt_all() noexcept {
328 // There is a racing condition if we don't stop the interrupt service at this point. New resource
329 // may come in and not being picked up after we call InterruptAll(). So stop new comers and then
330 // interrupt any existing resources.
331 (void)intrp_svc_->ServiceStop();
332 intrp_svc_->InterruptAll();
333 }
334
join_all(Task::WaitFlag wf)335 Status TaskGroup::join_all(Task::WaitFlag wf) {
336 Status rc;
337 Status rc2;
338 SharedLock lck(&rw_lock_);
339 for (Task &tk : grp_list_) {
340 rc = tk.Join(wf);
341 if (rc.IsError()) {
342 rc2 = rc;
343 }
344 }
345 return rc2;
346 }
347
DoServiceStop()348 Status TaskGroup::DoServiceStop() {
349 interrupt_all();
350 return (join_all(Task::WaitFlag::kNonBlocking));
351 }
352
TaskGroup()353 TaskGroup::TaskGroup() : has_dataqueue_(false), grp_list_(&Task::group), intrp_svc_(nullptr) {
354 auto alloc = Services::GetAllocator<IntrpService>();
355 intrp_svc_ = std::allocate_shared<IntrpService>(alloc);
356 (void)Service::ServiceStart();
357 }
358
~TaskGroup()359 TaskGroup::~TaskGroup() {
360 (void)Service::ServiceStop();
361 // The TaskGroup is going out of scope, and we can return the Task list to the free list.
362 Task *cur = grp_list_.head;
363 TaskManager &tm = TaskManager::GetInstance();
364 while (cur) {
365 Task *next = cur->group.next;
366 grp_list_.Remove(cur);
367 tm.ReturnFreeTask(cur);
368 cur = next;
369 }
370 {
371 LockGuard lck(&tm.tg_lock_);
372 (void)tm.grp_list_.erase(this);
373 }
374 }
375
GetTaskErrorIfAny()376 Status TaskGroup::GetTaskErrorIfAny() {
377 SharedLock lck(&rw_lock_);
378 for (Task &tk : grp_list_) {
379 RETURN_IF_NOT_OK(tk.GetTaskErrorIfAny());
380 }
381 return Status::OK();
382 }
383
GetIntrpService()384 std::shared_ptr<IntrpService> TaskGroup::GetIntrpService() { return intrp_svc_; }
385 } // namespace dataset
386 } // namespace mindspore
387