• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_
18 #define MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_
19 #include <atomic>
20 #include <vector>
21 
22 namespace mindspore {
23 // implement a lock-free queue
24 // refer to https://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
25 template <typename T>
26 class HQueue;
27 struct Pointer {
28   int32_t index = -1;
29   uint32_t version = 0;
30   bool operator==(const Pointer &that) const { return (index == that.index && version == that.version); }
31   bool operator!=(const Pointer &that) const { return !(*this == that); }
32 };
33 
34 template <typename T>
35 struct HQNode {
36   std::atomic<Pointer> next;
37   T *value = nullptr;
38   std::atomic_bool free = {true};
39 };
40 
41 template <typename T>
42 class HQueue {
43  public:
44   HQueue(const HQueue &) = delete;
45   HQueue &operator=(const HQueue &) = delete;
HQueue()46   HQueue() {}
~HQueue()47   virtual ~HQueue() {}
48 
IsInit()49   bool IsInit() const { return nodes.size() != 0; }
50 
Init(int32_t sz)51   bool Init(int32_t sz) {
52     if (IsInit() || sz <= 0) {
53       return false;
54     }
55     for (int32_t i = 0; i < sz; i++) {
56       auto node = new HQNode<T>();
57       if (node == nullptr) {
58         Clean();
59         return false;
60       }
61       node->value = nullptr;
62       node->free = true;
63       node->next = {-1, 0};
64       nodes.emplace_back(node);
65     }
66 
67     // init first node as dummy head
68     qhead = {0, 0};
69     qtail = {0, 0};
70     nodes[0]->free = false;
71     queue_size = sz;
72     free_index = 1;
73     return true;
74   }
75 
Clean()76   void Clean() {
77     for (auto node : nodes) {
78       delete node;
79       node = nullptr;
80     }
81     nodes.clear();
82   }
83 
Enqueue(T * t)84   bool Enqueue(T *t) {
85     HQNode<T> *node = nullptr;
86     int32_t nodeIdx = free_index;
87     for (; nodeIdx < queue_size; ++nodeIdx) {
88       bool expected = true;
89       if (nodes[nodeIdx]->free.compare_exchange_strong(expected, false)) {
90         node = nodes[nodeIdx];
91         free_index = nodeIdx + 1;
92         break;
93       }
94     }
95     if (node == nullptr) {
96       free_index = 1;
97       for (nodeIdx = 1; nodeIdx < queue_size; ++nodeIdx) {
98         bool expected = true;
99         if (nodes[nodeIdx]->free.compare_exchange_strong(expected, false)) {
100           node = nodes[nodeIdx];
101           free_index = nodeIdx + 1;
102           break;
103         }
104       }
105       if (node == nullptr) {
106         return false;
107       }
108     }
109 
110     node->value = t;
111     node->next = {-1, 0};
112 
113     while (true) {
114       Pointer tail = qtail;
115       if (tail.index == -1) {
116         continue;
117       }
118       Pointer next = nodes[tail.index]->next;
119 
120       if (tail != this->qtail) {
121         continue;
122       }
123 
124       if (next.index != -1) {
125         this->qtail.compare_exchange_strong(tail, {next.index, tail.version + 1});
126         continue;
127       }
128 
129       if (nodes[tail.index]->next.compare_exchange_strong(next, {nodeIdx, next.version + 1})) {
130         this->qtail.compare_exchange_strong(tail, {nodeIdx, tail.version + 1});
131         break;
132       }
133     }
134 
135     return true;
136   }
137 
Dequeue()138   T *Dequeue() {
139     while (true) {
140       T *ret = nullptr;
141       Pointer head = qhead;
142       Pointer tail = qtail;
143       if (head.index == -1) {
144         continue;
145       }
146       Pointer next = nodes[head.index]->next;
147 
148       if (head != this->qhead) {
149         continue;
150       }
151 
152       if (head.index == tail.index) {
153         if (next.index == -1) {
154           return nullptr;
155         }
156         this->qtail.compare_exchange_strong(tail, {next.index, tail.version + 1});
157       } else {
158         if (next.index == -1) {
159           continue;
160         }
161         ret = nodes[next.index]->value;
162         if (this->qhead.compare_exchange_strong(head, {next.index, head.version + 1})) {
163           // free head
164           nodes[head.index]->free = true;
165           return ret;
166         }
167       }
168     }
169   }
170 
Empty()171   bool Empty() {
172     Pointer head = qhead;
173     Pointer tail = qtail;
174     if (head.index < 0) {
175       return false;
176     }
177     Pointer next = nodes[head.index]->next;
178 
179     if (head == this->qhead && head.index == tail.index && next.index == -1) {
180       return true;
181     }
182 
183     return false;
184   }
185 
186  private:
187   std::atomic<Pointer> qhead;
188   std::atomic<Pointer> qtail;
189   std::vector<HQNode<T> *> nodes;
190   int32_t queue_size{};
191   std::atomic<int32_t> free_index;
192 };
193 }  // namespace mindspore
194 
195 #endif  // MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_
196