1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_GPRPP_MPSCQ_H 20 #define GRPC_CORE_LIB_GPRPP_MPSCQ_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include "src/core/lib/gprpp/atomic.h" 25 #include "src/core/lib/gprpp/sync.h" 26 27 #include <grpc/support/log.h> 28 29 namespace grpc_core { 30 31 // Multiple-producer single-consumer lock free queue, based upon the 32 // implementation from Dmitry Vyukov here: 33 // http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue 34 class MultiProducerSingleConsumerQueue { 35 public: 36 // List node. Application node types can inherit from this. 37 struct Node { 38 Atomic<Node*> next; 39 }; 40 MultiProducerSingleConsumerQueue()41 MultiProducerSingleConsumerQueue() : head_{&stub_}, tail_(&stub_) {} ~MultiProducerSingleConsumerQueue()42 ~MultiProducerSingleConsumerQueue() { 43 GPR_ASSERT(head_.Load(MemoryOrder::RELAXED) == &stub_); 44 GPR_ASSERT(tail_ == &stub_); 45 } 46 47 // Push a node 48 // Thread safe - can be called from multiple threads concurrently 49 // Returns true if this was possibly the first node (may return true 50 // sporadically, will not return false sporadically) 51 bool Push(Node* node); 52 // Pop a node (returns NULL if no node is ready - which doesn't indicate that 53 // the queue is empty!!) 54 // Thread compatible - can only be called from one thread at a time 55 Node* Pop(); 56 // Pop a node; sets *empty to true if the queue is empty, or false if it is 57 // not. 58 Node* PopAndCheckEnd(bool* empty); 59 60 private: 61 // make sure head & tail don't share a cacheline 62 union { 63 char padding_[GPR_CACHELINE_SIZE]; 64 Atomic<Node*> head_; 65 }; 66 Node* tail_; 67 Node stub_; 68 }; 69 70 // An mpscq with a lock: it's safe to pop from multiple threads, but doing 71 // only one thread will succeed concurrently. 72 class LockedMultiProducerSingleConsumerQueue { 73 public: 74 typedef MultiProducerSingleConsumerQueue::Node Node; 75 76 // Push a node 77 // Thread safe - can be called from multiple threads concurrently 78 // Returns true if this was possibly the first node (may return true 79 // sporadically, will not return false sporadically) 80 bool Push(Node* node); 81 82 // Pop a node (returns NULL if no node is ready - which doesn't indicate that 83 // the queue is empty!!) 84 // Thread safe - can be called from multiple threads concurrently 85 Node* TryPop(); 86 87 // Pop a node. Returns NULL only if the queue was empty at some point after 88 // calling this function 89 Node* Pop(); 90 91 private: 92 MultiProducerSingleConsumerQueue queue_; 93 Mutex mu_; 94 }; 95 96 } // namespace grpc_core 97 98 #endif /* GRPC_CORE_LIB_GPRPP_MPSCQ_H */ 99