1 /** 2 * Copyright 2019 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_ 18 19 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) 20 #include <pthread.h> 21 #include <sys/syscall.h> 22 #endif 23 #if defined(_WIN32) || defined(_WIN64) 24 #include <windows.h> // for GetCurrentProcessId() 25 #endif 26 #include <chrono> 27 #include <exception> 28 #include <functional> 29 #include <future> 30 #include <iostream> 31 #include <memory> 32 #include <mutex> 33 #include <sstream> 34 #include <stdexcept> 35 #include <string> 36 #include <thread> 37 #include "minddata/dataset/util/intrp_resource.h" 38 #include "minddata/dataset/util/list.h" 39 #include "minddata/dataset/util/log_adapter.h" 40 #include "minddata/dataset/util/memory_pool.h" 41 #include "minddata/dataset/util/services.h" 42 #include "minddata/dataset/util/wait_post.h" 43 44 namespace mindspore { 45 namespace dataset { 46 const uint32_t kWaitInterruptTaskTime = 30; // the wait time of interrupt task 47 48 class TaskManager; 49 50 class Task : public IntrpResource { 51 public: 52 friend class TaskManager; 53 friend class TaskGroup; 54 55 enum class WaitFlag : int { kBlocking, kNonBlocking }; 56 57 Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id = -1); 58 59 // Future objects are not copyable. 60 Task(const Task &) = delete; 61 62 ~Task() override; 63 64 Task &operator=(const Task &) = delete; 65 66 // Move constructor and Assignment are not supported. 67 // Too many things in this class. 68 Task(Task &&) = delete; 69 70 Task &operator=(Task &&) = delete; 71 72 Status GetTaskErrorIfAny() const; 73 ChangeName(const std::string & newName)74 void ChangeName(const std::string &newName) { my_name_ = newName; } 75 76 // To execute the _fncObj 77 void operator()(); 78 79 Node<Task> node; 80 Node<Task> group; 81 Node<Task> free; 82 83 // Run the task 84 Status Run(); 85 86 Status Join(WaitFlag wf = WaitFlag::kBlocking); 87 Running()88 bool Running() const { return running_; } 89 CaughtSevereException()90 bool CaughtSevereException() const { return caught_severe_exception_; } 91 IsMasterThread()92 bool IsMasterThread() const { return is_master_; } 93 get_id()94 std::thread::id get_id() { return id_; } 95 get_linux_id()96 pid_t get_linux_id() { return thread_id_; } 97 MyName()98 std::string MyName() const { return my_name_; } 99 get_operator_id()100 int32_t get_operator_id() { return operator_id_; } 101 102 // An operator used by std::find 103 bool operator==(const Task &other) const { return (this == &other); } 104 105 bool operator!=(const Task &other) const { return !(*this == other); } 106 Post()107 void Post() { wp_.Set(); } 108 Wait()109 Status Wait() { return (wp_.Wait()); } 110 Clear()111 void Clear() { wp_.Clear(); } 112 113 static Status OverrideInterruptRc(const Status &rc); 114 115 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) 116 pthread_t GetNativeHandle() const; 117 #endif 118 119 private: 120 mutable std::mutex mux_; 121 std::string my_name_; 122 int32_t operator_id_; 123 pid_t process_id_; // process id (like: 11418), which can be queried by command "ps -ef | grep pid" 124 pid_t thread_id_; // linux thread id (like: 11534), which can be queried by command "top -H" 125 Status rc_; 126 WaitPost wp_; 127 // Task need to provide definition for this function. It 128 // will be called by thread function. 129 std::function<Status()> fnc_obj_; 130 // Misc fields used by TaskManager. 131 TaskGroup *task_group_; 132 std::future<void> thrd_; 133 std::thread::id id_; // thread id (like: 140299045656320) of type std::thread::id uniquely identifying the thread 134 bool is_master_; 135 volatile bool running_; 136 volatile bool caught_severe_exception_; 137 138 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) 139 pthread_t native_handle_; 140 #else 141 uint64_t native_handle_; 142 #endif 143 144 void ShutdownGroup(); 145 TaskGroup *MyTaskGroup(); 146 void set_task_group(TaskGroup *vg); 147 }; 148 149 extern thread_local Task *gMyTask; 150 } // namespace dataset 151 } // namespace mindspore 152 153 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_ 154