1 /** 2 * Copyright 2023-2024 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_HQUEUE_H_ 18 #define MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_HQUEUE_H_ 19 20 #include <queue> 21 #include <memory> 22 #include <thread> 23 #include <mutex> 24 #include <string> 25 #include <utility> 26 #include <unordered_map> 27 #include <condition_variable> 28 29 #include "include/backend/visible.h" 30 #include "runtime/pipeline/task/task.h" 31 #include "thread/hqueue.h" 32 #ifndef USE_HQUEUE 33 #define USE_HQUEUE 34 #endif 35 36 namespace mindspore { 37 namespace runtime { 38 /* Thread status */ 39 constexpr int kThreadBusy = 0; // busy, the thread is running task 40 constexpr int kThreadIdle = 1; // idle, the thread is waiting 41 // Create a new thread to execute the tasks in the queue sequentially. 42 class BACKEND_EXPORT AsyncHqueue { 43 public: AsyncHqueue(std::string name)44 explicit AsyncHqueue(std::string name) : name_(std::move(name)) {} 45 virtual ~AsyncHqueue(); 46 47 // Init resource 48 void Init(); 49 50 // Add task to the end of the queue. 51 bool Push(AsyncTask *task); 52 53 // Wait for all async task finish executing. 54 void Wait(); 55 56 // Check if the queue is empty. 57 bool Empty(); 58 59 // clear tasks of queue, and wait last task. 60 void Clear(); 61 62 // When an exception occurs, the state needs to be reset. 63 void Reset(); 64 65 // Thread join before the process exit. 66 void WorkerJoin(); 67 68 // Reinit resources after fork occurs. 69 void ChildAfterFork(); 70 71 // Check grad queue exception. 72 void CheckException(); 73 74 protected: 75 void WorkerLoop(); 76 void SetThreadName() const; 77 std::unique_ptr<std::thread> worker_{nullptr}; 78 std::mutex task_mutex_; 79 std::unique_ptr<std::condition_variable> task_cond_var_{nullptr}; 80 std::string name_; 81 82 private: 83 void ClearTaskWithException(); 84 HQueue<AsyncTask> tasks_hqueque_; 85 bool init_{false}; 86 bool alive_{true}; 87 bool stop_{false}; 88 std::atomic_int status_{kThreadBusy}; 89 size_t spin_count_{0}; 90 std::exception_ptr e_ptr_{nullptr}; 91 }; 92 using AsyncHqueuePtr = std::shared_ptr<AsyncHqueue>; 93 } // namespace runtime 94 } // namespace mindspore 95 96 #endif // MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_QUEUE_H_ 97