1 /** 2 * Copyright 2019 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_ 17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_ 18 19 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) 20 #include <pthread.h> 21 #include <sys/syscall.h> 22 #endif 23 #include <chrono> 24 #include <exception> 25 #include <functional> 26 #include <future> 27 #include <iostream> 28 #include <memory> 29 #include <mutex> 30 #include <sstream> 31 #include <stdexcept> 32 #include <string> 33 #include <thread> 34 #include "minddata/dataset/util/intrp_resource.h" 35 #include "minddata/dataset/util/list.h" 36 #include "minddata/dataset/util/log_adapter.h" 37 #include "minddata/dataset/util/memory_pool.h" 38 #include "minddata/dataset/util/services.h" 39 #include "minddata/dataset/util/wait_post.h" 40 41 namespace mindspore { 42 namespace dataset { 43 const uint32_t kWaitInterruptTaskTime = 30; // the wait time of interrupt task 44 45 class TaskManager; 46 47 class Task : public IntrpResource { 48 public: 49 friend class TaskManager; 50 friend class TaskGroup; 51 52 enum class WaitFlag : int { kBlocking, kNonBlocking }; 53 54 Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id = -1); 55 56 // Future objects are not copyable. 57 Task(const Task &) = delete; 58 59 ~Task() override; 60 61 Task &operator=(const Task &) = delete; 62 63 // Move constructor and Assignment are not supported. 64 // Too many things in this class. 65 Task(Task &&) = delete; 66 67 Task &operator=(Task &&) = delete; 68 69 Status GetTaskErrorIfAny() const; 70 ChangeName(const std::string & newName)71 void ChangeName(const std::string &newName) { my_name_ = newName; } 72 73 // To execute the _fncObj 74 void operator()(); 75 76 Node<Task> node; 77 Node<Task> group; 78 Node<Task> free; 79 80 // Run the task 81 Status Run(); 82 83 Status Join(WaitFlag wf = WaitFlag::kBlocking); 84 Running()85 bool Running() const { return running_; } 86 CaughtSevereException()87 bool CaughtSevereException() const { return caught_severe_exception_; } 88 IsMasterThread()89 bool IsMasterThread() const { return is_master_; } 90 get_id()91 std::thread::id get_id() { return id_; } 92 get_linux_id()93 pid_t get_linux_id() { return thread_id_; } 94 MyName()95 std::string MyName() const { return my_name_; } 96 get_operator_id()97 int32_t get_operator_id() { return operator_id_; } 98 99 // An operator used by std::find 100 bool operator==(const Task &other) const { return (this == &other); } 101 102 bool operator!=(const Task &other) const { return !(*this == other); } 103 Post()104 void Post() { wp_.Set(); } 105 Wait()106 Status Wait() { return (wp_.Wait()); } 107 108 static Status OverrideInterruptRc(const Status &rc); 109 110 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) 111 pthread_t GetNativeHandle() const; 112 #endif 113 114 private: 115 mutable std::mutex mux_; 116 std::string my_name_; 117 int32_t operator_id_; 118 pid_t thread_id_; 119 Status rc_; 120 WaitPost wp_; 121 // Task need to provide definition for this function. It 122 // will be called by thread function. 123 std::function<Status()> fnc_obj_; 124 // Misc fields used by TaskManager. 125 TaskGroup *task_group_; 126 std::future<void> thrd_; 127 std::thread::id id_; 128 bool is_master_; 129 volatile bool running_; 130 volatile bool caught_severe_exception_; 131 132 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__) 133 pthread_t native_handle_; 134 #else 135 uint64_t native_handle_; 136 #endif 137 138 void ShutdownGroup(); 139 TaskGroup *MyTaskGroup(); 140 void set_task_group(TaskGroup *vg); 141 }; 142 143 extern thread_local Task *gMyTask; 144 } // namespace dataset 145 } // namespace mindspore 146 147 #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_ 148