• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_
17 #define MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_
18 
19 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
20 #include <pthread.h>
21 #include <sys/syscall.h>
22 #endif
23 #if defined(_WIN32) || defined(_WIN64)
24 #include <windows.h>  // for GetCurrentProcessId()
25 #endif
26 #include <chrono>
27 #include <exception>
28 #include <functional>
29 #include <future>
30 #include <iostream>
31 #include <memory>
32 #include <mutex>
33 #include <sstream>
34 #include <stdexcept>
35 #include <string>
36 #include <thread>
37 #include "minddata/dataset/util/intrp_resource.h"
38 #include "minddata/dataset/util/list.h"
39 #include "minddata/dataset/util/log_adapter.h"
40 #include "minddata/dataset/util/memory_pool.h"
41 #include "minddata/dataset/util/services.h"
42 #include "minddata/dataset/util/wait_post.h"
43 
44 namespace mindspore {
45 namespace dataset {
46 const uint32_t kWaitInterruptTaskTime = 30;  // the wait time of interrupt task
47 
48 class TaskManager;
49 
50 class Task : public IntrpResource {
51  public:
52   friend class TaskManager;
53   friend class TaskGroup;
54 
55   enum class WaitFlag : int { kBlocking, kNonBlocking };
56 
57   Task(const std::string &myName, const std::function<Status()> &f, int32_t operator_id = -1);
58 
59   // Future objects are not copyable.
60   Task(const Task &) = delete;
61 
62   ~Task() override;
63 
64   Task &operator=(const Task &) = delete;
65 
66   // Move constructor and Assignment are not supported.
67   // Too many things in this class.
68   Task(Task &&) = delete;
69 
70   Task &operator=(Task &&) = delete;
71 
72   Status GetTaskErrorIfAny() const;
73 
ChangeName(const std::string & newName)74   void ChangeName(const std::string &newName) { my_name_ = newName; }
75 
76   // To execute the _fncObj
77   void operator()();
78 
79   Node<Task> node;
80   Node<Task> group;
81   Node<Task> free;
82 
83   // Run the task
84   Status Run();
85 
86   Status Join(WaitFlag wf = WaitFlag::kBlocking);
87 
Running()88   bool Running() const { return running_; }
89 
CaughtSevereException()90   bool CaughtSevereException() const { return caught_severe_exception_; }
91 
IsMasterThread()92   bool IsMasterThread() const { return is_master_; }
93 
get_id()94   std::thread::id get_id() { return id_; }
95 
get_linux_id()96   pid_t get_linux_id() { return thread_id_; }
97 
MyName()98   std::string MyName() const { return my_name_; }
99 
get_operator_id()100   int32_t get_operator_id() { return operator_id_; }
101 
102   // An operator used by std::find
103   bool operator==(const Task &other) const { return (this == &other); }
104 
105   bool operator!=(const Task &other) const { return !(*this == other); }
106 
Post()107   void Post() { wp_.Set(); }
108 
Wait()109   Status Wait() { return (wp_.Wait()); }
110 
Clear()111   void Clear() { wp_.Clear(); }
112 
113   static Status OverrideInterruptRc(const Status &rc);
114 
115 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
116   pthread_t GetNativeHandle() const;
117 #endif
118 
119  private:
120   mutable std::mutex mux_;
121   std::string my_name_;
122   int32_t operator_id_;
123   pid_t process_id_;  // process id (like: 11418), which can be queried by command "ps -ef | grep pid"
124   pid_t thread_id_;   // linux thread id (like: 11534), which can be queried by command "top -H"
125   Status rc_;
126   WaitPost wp_;
127   // Task need to provide definition for this function.  It
128   // will be called by thread function.
129   std::function<Status()> fnc_obj_;
130   // Misc fields used by TaskManager.
131   TaskGroup *task_group_;
132   std::future<void> thrd_;
133   std::thread::id id_;  // thread id (like: 140299045656320) of type std::thread::id uniquely identifying the thread
134   bool is_master_;
135   volatile bool running_;
136   volatile bool caught_severe_exception_;
137 
138 #if !defined(_WIN32) && !defined(_WIN64) && !defined(__ANDROID__) && !defined(ANDROID) && !defined(__APPLE__)
139   pthread_t native_handle_;
140 #else
141   uint64_t native_handle_;
142 #endif
143 
144   void ShutdownGroup();
145   TaskGroup *MyTaskGroup();
146   void set_task_group(TaskGroup *vg);
147 };
148 
149 extern thread_local Task *gMyTask;
150 }  // namespace dataset
151 }  // namespace mindspore
152 
153 #endif  // MINDSPORE_CCSRC_MINDDATA_DATASET_UTIL_TASK_H_
154