• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_GPRPP_MPSCQ_H
20 #define GRPC_CORE_LIB_GPRPP_MPSCQ_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include "src/core/lib/gprpp/atomic.h"
25 #include "src/core/lib/gprpp/sync.h"
26 
27 #include <grpc/support/log.h>
28 
29 namespace grpc_core {
30 
31 // Multiple-producer single-consumer lock free queue, based upon the
32 // implementation from Dmitry Vyukov here:
33 // http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
34 class MultiProducerSingleConsumerQueue {
35  public:
36   // List node.  Application node types can inherit from this.
37   struct Node {
38     Atomic<Node*> next;
39   };
40 
MultiProducerSingleConsumerQueue()41   MultiProducerSingleConsumerQueue() : head_{&stub_}, tail_(&stub_) {}
~MultiProducerSingleConsumerQueue()42   ~MultiProducerSingleConsumerQueue() {
43     GPR_ASSERT(head_.Load(MemoryOrder::RELAXED) == &stub_);
44     GPR_ASSERT(tail_ == &stub_);
45   }
46 
47   // Push a node
48   // Thread safe - can be called from multiple threads concurrently
49   // Returns true if this was possibly the first node (may return true
50   // sporadically, will not return false sporadically)
51   bool Push(Node* node);
52   // Pop a node (returns NULL if no node is ready - which doesn't indicate that
53   // the queue is empty!!)
54   // Thread compatible - can only be called from one thread at a time
55   Node* Pop();
56   // Pop a node; sets *empty to true if the queue is empty, or false if it is
57   // not.
58   Node* PopAndCheckEnd(bool* empty);
59 
60  private:
61   // make sure head & tail don't share a cacheline
62   union {
63     char padding_[GPR_CACHELINE_SIZE];
64     Atomic<Node*> head_;
65   };
66   Node* tail_;
67   Node stub_;
68 };
69 
70 // An mpscq with a lock: it's safe to pop from multiple threads, but doing
71 // only one thread will succeed concurrently.
72 class LockedMultiProducerSingleConsumerQueue {
73  public:
74   typedef MultiProducerSingleConsumerQueue::Node Node;
75 
76   // Push a node
77   // Thread safe - can be called from multiple threads concurrently
78   // Returns true if this was possibly the first node (may return true
79   // sporadically, will not return false sporadically)
80   bool Push(Node* node);
81 
82   // Pop a node (returns NULL if no node is ready - which doesn't indicate that
83   // the queue is empty!!)
84   // Thread safe - can be called from multiple threads concurrently
85   Node* TryPop();
86 
87   // Pop a node.  Returns NULL only if the queue was empty at some point after
88   // calling this function
89   Node* Pop();
90 
91  private:
92   MultiProducerSingleConsumerQueue queue_;
93   Mutex mu_;
94 };
95 
96 }  // namespace grpc_core
97 
98 #endif /* GRPC_CORE_LIB_GPRPP_MPSCQ_H */
99