1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_ 18 #define MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_ 19 #include <atomic> 20 #include <vector> 21 22 namespace mindspore { 23 // implement a lock-free queue 24 // refer to https://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf 25 template <typename T> 26 class HQueue; 27 struct Pointer { 28 int32_t index = -1; 29 uint32_t version = 0; 30 bool operator==(const Pointer &that) const { return (index == that.index && version == that.version); } 31 bool operator!=(const Pointer &that) const { return !(*this == that); } 32 }; 33 34 template <typename T> 35 struct HQNode { 36 std::atomic<Pointer> next; 37 T *value = nullptr; 38 std::atomic_bool free = {true}; 39 }; 40 41 template <typename T> 42 class HQueue { 43 public: 44 HQueue(const HQueue &) = delete; 45 HQueue &operator=(const HQueue &) = delete; HQueue()46 HQueue() {} ~HQueue()47 virtual ~HQueue() {} 48 IsInit()49 bool IsInit() const { return nodes.size() != 0; } 50 Init(int32_t sz)51 bool Init(int32_t sz) { 52 if (IsInit() || sz <= 0) { 53 return false; 54 } 55 for (int32_t i = 0; i < sz; i++) { 56 auto node = new HQNode<T>(); 57 if (node == nullptr) { 58 Clean(); 59 return false; 60 } 61 node->value = nullptr; 62 node->free = true; 63 node->next = {-1, 0}; 64 nodes.emplace_back(node); 65 } 66 67 // init first node as dummy head 68 qhead = {0, 0}; 69 qtail = {0, 0}; 70 nodes[0]->free = false; 71 queue_size = sz; 72 free_index = 1; 73 return true; 74 } 75 Clean()76 void Clean() { 77 for (auto node : nodes) { 78 delete node; 79 node = nullptr; 80 } 81 nodes.clear(); 82 } 83 Enqueue(T * t)84 bool Enqueue(T *t) { 85 HQNode<T> *node = nullptr; 86 int32_t nodeIdx = free_index; 87 for (; nodeIdx < queue_size; ++nodeIdx) { 88 bool expected = true; 89 if (nodes[nodeIdx]->free.compare_exchange_strong(expected, false)) { 90 node = nodes[nodeIdx]; 91 free_index = nodeIdx + 1; 92 break; 93 } 94 } 95 if (node == nullptr) { 96 free_index = 1; 97 for (nodeIdx = 1; nodeIdx < queue_size; ++nodeIdx) { 98 bool expected = true; 99 if (nodes[nodeIdx]->free.compare_exchange_strong(expected, false)) { 100 node = nodes[nodeIdx]; 101 free_index = nodeIdx + 1; 102 break; 103 } 104 } 105 if (node == nullptr) { 106 return false; 107 } 108 } 109 110 node->value = t; 111 node->next = {-1, 0}; 112 113 while (true) { 114 Pointer tail = qtail; 115 if (tail.index == -1) { 116 continue; 117 } 118 Pointer next = nodes[tail.index]->next; 119 120 if (tail != this->qtail) { 121 continue; 122 } 123 124 if (next.index != -1) { 125 this->qtail.compare_exchange_strong(tail, {next.index, tail.version + 1}); 126 continue; 127 } 128 129 if (nodes[tail.index]->next.compare_exchange_strong(next, {nodeIdx, next.version + 1})) { 130 this->qtail.compare_exchange_strong(tail, {nodeIdx, tail.version + 1}); 131 break; 132 } 133 } 134 135 return true; 136 } 137 Dequeue()138 T *Dequeue() { 139 while (true) { 140 T *ret = nullptr; 141 Pointer head = qhead; 142 Pointer tail = qtail; 143 if (head.index == -1) { 144 continue; 145 } 146 Pointer next = nodes[head.index]->next; 147 148 if (head != this->qhead) { 149 continue; 150 } 151 152 if (head.index == tail.index) { 153 if (next.index == -1) { 154 return nullptr; 155 } 156 this->qtail.compare_exchange_strong(tail, {next.index, tail.version + 1}); 157 } else { 158 if (next.index == -1) { 159 continue; 160 } 161 ret = nodes[next.index]->value; 162 if (this->qhead.compare_exchange_strong(head, {next.index, head.version + 1})) { 163 // free head 164 nodes[head.index]->free = true; 165 return ret; 166 } 167 } 168 } 169 } 170 Empty()171 bool Empty() { 172 Pointer head = qhead; 173 Pointer tail = qtail; 174 if (head.index < 0) { 175 return false; 176 } 177 Pointer next = nodes[head.index]->next; 178 179 if (head == this->qhead && head.index == tail.index && next.index == -1) { 180 return true; 181 } 182 183 return false; 184 } 185 186 private: 187 std::atomic<Pointer> qhead; 188 std::atomic<Pointer> qtail; 189 std::vector<HQNode<T> *> nodes; 190 int32_t queue_size{}; 191 std::atomic<int32_t> free_index; 192 }; 193 } // namespace mindspore 194 195 #endif // MINDSPORE_CORE_MINDRT_RUNTIME_HQUEUE_H_ 196