• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023-2024 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_HQUEUE_H_
18 #define MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_HQUEUE_H_
19 
20 #include <queue>
21 #include <memory>
22 #include <thread>
23 #include <mutex>
24 #include <string>
25 #include <utility>
26 #include <unordered_map>
27 #include <condition_variable>
28 
29 #include "include/backend/visible.h"
30 #include "runtime/pipeline/task/task.h"
31 #include "thread/hqueue.h"
32 #ifndef USE_HQUEUE
33 #define USE_HQUEUE
34 #endif
35 
36 namespace mindspore {
37 namespace runtime {
38 /* Thread status */
39 constexpr int kThreadBusy = 0;  // busy, the thread is running task
40 constexpr int kThreadIdle = 1;  // idle, the thread is waiting
41 // Create a new thread to execute the tasks in the queue sequentially.
42 class BACKEND_EXPORT AsyncHqueue {
43  public:
AsyncHqueue(std::string name)44   explicit AsyncHqueue(std::string name) : name_(std::move(name)) {}
45   virtual ~AsyncHqueue();
46 
47   // Init resource
48   void Init();
49 
50   // Add task to the end of the queue.
51   bool Push(AsyncTask *task);
52 
53   // Wait for all async task finish executing.
54   void Wait();
55 
56   // Check if the queue is empty.
57   bool Empty();
58 
59   // clear tasks of queue, and wait last task.
60   void Clear();
61 
62   // When an exception occurs, the state needs to be reset.
63   void Reset();
64 
65   // Thread join before the process exit.
66   void WorkerJoin();
67 
68   // Reinit resources after fork occurs.
69   void ChildAfterFork();
70 
71   // Check grad queue exception.
72   void CheckException();
73 
74  protected:
75   void WorkerLoop();
76   void SetThreadName() const;
77   std::unique_ptr<std::thread> worker_{nullptr};
78   std::mutex task_mutex_;
79   std::unique_ptr<std::condition_variable> task_cond_var_{nullptr};
80   std::string name_;
81 
82  private:
83   void ClearTaskWithException();
84   HQueue<AsyncTask> tasks_hqueque_;
85   bool init_{false};
86   bool alive_{true};
87   bool stop_{false};
88   std::atomic_int status_{kThreadBusy};
89   size_t spin_count_{0};
90   std::exception_ptr e_ptr_{nullptr};
91 };
92 using AsyncHqueuePtr = std::shared_ptr<AsyncHqueue>;
93 }  // namespace runtime
94 }  // namespace mindspore
95 
96 #endif  // MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_QUEUE_H_
97