• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023-2024 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_R_QUEUE_H_
18 #define MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_R_QUEUE_H_
19 
20 #include <queue>
21 #include <memory>
22 #include <thread>
23 #include <mutex>
24 #include <set>
25 #include <string>
26 #include <unordered_map>
27 #include <condition_variable>
28 #include <utility>
29 
30 #include "include/backend/visible.h"
31 #include "runtime/pipeline/task/task.h"
32 
33 #include "runtime/pipeline/ring_queue.h"
34 
35 namespace mindspore {
36 namespace runtime {
37 using AsyncTaskPtr = std::shared_ptr<AsyncTask>;
38 constexpr auto kQueueCapacity = 1024;
39 enum kThreadWaitLevel : int {
40   kLevelUnknown = 0,
41   kLevelPython,
42   kLevelGrad,
43   kLevelFrontend,
44   kLevelBackend,
45   kLevelDevice,
46 };
47 
48 // Create a new thread to execute the tasks in the queue sequentially.
49 class BACKEND_EXPORT AsyncRQueue {
50  public:
AsyncRQueue(std::string name,kThreadWaitLevel wait_level)51   explicit AsyncRQueue(std::string name, kThreadWaitLevel wait_level)
52       : name_(std::move(name)), wait_level_(wait_level) {}
53   virtual ~AsyncRQueue();
54 
55   // Add task to the end of the queue.
56   void Push(const AsyncTaskPtr &task);
57 
58   // Wait for all async task finish executing.
59   void Wait();
60 
61   // Check if the queue is empty.
62   bool Empty();
63 
64   // clear tasks of queue, and wait last task.
65   void Clear();
66 
67   // When an exception occurs, the state needs to be reset.
68   void Reset();
69 
70   // Thread join before the process exit.
71   void WorkerJoin();
72 
73   // Reinit resources after fork occurs.
74   void ChildAfterFork();
75 
76  protected:
77   void WorkerLoop();
78   void SetThreadName() const;
79 
80   std::unique_ptr<std::thread> worker_{nullptr};
81   std::string name_;
82   kThreadWaitLevel wait_level_;
83   inline static std::unordered_map<std::thread::id, kThreadWaitLevel> thread_id_to_wait_level_;
84   inline static std::mutex level_mutex_;
85 
86  private:
87   void ClearTaskWithException();
88 
89   RingQueue<AsyncTaskPtr, kQueueCapacity> tasks_queue_;
90 };
91 }  // namespace runtime
92 using AsyncRQueuePtr = std::shared_ptr<runtime::AsyncRQueue>;
93 }  // namespace mindspore
94 
95 #endif  // MINDSPORE_MINDSPORE_CCSRC_RUNTIME_PYNATIVE_ASYNC_ASYNC_R_QUEUE_H_
96