• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_
18 
19 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
20 #include <pthread.h>
21 #include <sys/syscall.h>
22 #endif
23 #include <chrono>
24 #include <exception>
25 #include <functional>
26 #include <future>
27 #include <iostream>
28 #include <memory>
29 #include <mutex>
30 #include <sstream>
31 #include <stdexcept>
32 #include <string>
33 #include <thread>
34 #include "minddata/dataset/util/intrp_resource.h"
35 #include "minddata/dataset/util/list.h"
36 #include "minddata/dataset/util/log_adapter.h"
37 #include "minddata/dataset/util/memory_pool.h"
38 #include "minddata/dataset/util/services.h"
39 #include "minddata/dataset/util/wait_post.h"
40 
41 namespace mindspore {
42 namespace dataset {
43 const uint32_t kWaitInterruptTaskTime = 30;  // the wait time of interrupt task
44 
45 class TaskManager;
46 
47 class Task : public IntrpResource {
48  public:
49   friend class TaskManager;
50   friend class TaskGroup;
51 
52   enum class WaitFlag : int { kBlocking, kNonBlocking };
53 
54   Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id = -1);
55 
56   // Future objects are not copyable.
57   Task(const Task &) = delete;
58 
59   ~Task() override;
60 
61   Task &operator=(const Task &) = delete;
62 
63   // Move constructor and Assignment are not supported.
64   // Too many things in this class.
65   Task(Task &&) = delete;
66 
67   Task &operator=(Task &&) = delete;
68 
69   Status GetTaskErrorIfAny() const;
70 
ChangeName(const std::string & newName)71   void ChangeName(const std::string &newName) { my_name_ = newName; }
72 
73   // To execute the _fncObj
74   void operator()();
75 
76   Node<Task> node;
77   Node<Task> group;
78   Node<Task> free;
79 
80   // Run the task
81   Status Run();
82 
83   Status Join(WaitFlag wf = WaitFlag::kBlocking);
84 
Running()85   bool Running() const { return running_; }
86 
CaughtSevereException()87   bool CaughtSevereException() const { return caught_severe_exception_; }
88 
IsMasterThread()89   bool IsMasterThread() const { return is_master_; }
90 
get_id()91   std::thread::id get_id() { return id_; }
92 
get_linux_id()93   pid_t get_linux_id() { return thread_id_; }
94 
MyName()95   std::string MyName() const { return my_name_; }
96 
get_operator_id()97   int32_t get_operator_id() { return operator_id_; }
98 
99   // An operator used by std::find
100   bool operator==(const Task &other) const { return (this == &other); }
101 
102   bool operator!=(const Task &other) const { return !(*this == other); }
103 
Post()104   void Post() { wp_.Set(); }
105 
Wait()106   Status Wait() { return (wp_.Wait()); }
107 
108   static Status OverrideInterruptRc(const Status &rc);
109 
110 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
111   pthread_t GetNativeHandle() const;
112 #endif
113 
114  private:
115   mutable std::mutex mux_;
116   std::string my_name_;
117   int32_t operator_id_;
118   pid_t thread_id_;
119   Status rc_;
120   WaitPost wp_;
121   // Task need to provide definition for this function.  It
122   // will be called by thread function.
123   std::function<Status()> fnc_obj_;
124   // Misc fields used by TaskManager.
125   TaskGroup *task_group_;
126   std::future<void> thrd_;
127   std::thread::id id_;
128   bool is_master_;
129   volatile bool running_;
130   volatile bool caught_severe_exception_;
131 
132 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
133   pthread_t native_handle_;
134 #else
135   uint64_t native_handle_;
136 #endif
137 
138   void ShutdownGroup();
139   TaskGroup *MyTaskGroup();
140   void set_task_group(TaskGroup *vg);
141 };
142 
143 extern thread_local Task *gMyTask;
144 }  // namespace dataset
145 }  // namespace mindspore
146 
147 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_
148