1 // 2 // 3 // Copyright 2016 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CORE_LIB_GPRPP_MPSCQ_H 20 #define GRPC_SRC_CORE_LIB_GPRPP_MPSCQ_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <atomic> 25 26 #include <grpc/support/log.h> 27 28 #include "src/core/lib/gprpp/sync.h" 29 30 namespace grpc_core { 31 32 // Multiple-producer single-consumer lock free queue, based upon the 33 // implementation from Dmitry Vyukov here: 34 // http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue 35 class MultiProducerSingleConsumerQueue { 36 public: 37 // List node. Application node types can inherit from this. 38 struct Node { 39 std::atomic<Node*> next{nullptr}; 40 }; 41 MultiProducerSingleConsumerQueue()42 MultiProducerSingleConsumerQueue() : head_{&stub_}, tail_(&stub_) {} ~MultiProducerSingleConsumerQueue()43 ~MultiProducerSingleConsumerQueue() { 44 GPR_ASSERT(head_.load(std::memory_order_relaxed) == &stub_); 45 GPR_ASSERT(tail_ == &stub_); 46 } 47 48 // Push a node 49 // Thread safe - can be called from multiple threads concurrently 50 // Returns true if this was possibly the first node (may return true 51 // sporadically, will not return false sporadically) 52 bool Push(Node* node); 53 // Pop a node (returns NULL if no node is ready - which doesn't indicate that 54 // the queue is empty!!) 55 // Thread compatible - can only be called from one thread at a time 56 Node* Pop(); 57 // Pop a node; sets *empty to true if the queue is empty, or false if it is 58 // not. 59 Node* PopAndCheckEnd(bool* empty); 60 61 private: 62 // make sure head & tail don't share a cacheline 63 union { 64 char padding_[GPR_CACHELINE_SIZE]; 65 std::atomic<Node*> head_{nullptr}; 66 }; 67 Node* tail_; 68 Node stub_; 69 }; 70 71 // An mpscq with a lock: it's safe to pop from multiple threads, but doing 72 // only one thread will succeed concurrently. 73 class LockedMultiProducerSingleConsumerQueue { 74 public: 75 typedef MultiProducerSingleConsumerQueue::Node Node; 76 77 // Push a node 78 // Thread safe - can be called from multiple threads concurrently 79 // Returns true if this was possibly the first node (may return true 80 // sporadically, will not return false sporadically) 81 bool Push(Node* node); 82 83 // Pop a node (returns NULL if no node is ready - which doesn't indicate that 84 // the queue is empty!!) 85 // Thread safe - can be called from multiple threads concurrently 86 Node* TryPop(); 87 88 // Pop a node. Returns NULL only if the queue was empty at some point after 89 // calling this function 90 Node* Pop(); 91 92 private: 93 MultiProducerSingleConsumerQueue queue_; 94 Mutex mu_; 95 }; 96 97 } // namespace grpc_core 98 99 #endif // GRPC_SRC_CORE_LIB_GPRPP_MPSCQ_H 100