1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H 20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include "src/core/lib/debug/stats.h" 25 #include "src/core/lib/gprpp/atomic.h" 26 #include "src/core/lib/gprpp/sync.h" 27 28 namespace grpc_core { 29 30 extern DebugOnlyTraceFlag grpc_thread_pool_trace; 31 32 // Abstract base class of a Multiple-Producer-Multiple-Consumer(MPMC) queue 33 // interface 34 class MPMCQueueInterface { 35 public: ~MPMCQueueInterface()36 virtual ~MPMCQueueInterface() {} 37 38 // Puts elem into queue immediately at the end of queue. 39 // This might cause to block on full queue depending on implementation. 40 virtual void Put(void* elem) = 0; 41 42 // Removes the oldest element from the queue and return it. 43 // This might cause to block on empty queue depending on implementation. 44 // Optional argument for collecting stats purpose. 45 virtual void* Get(gpr_timespec* wait_time) = 0; 46 47 // Returns number of elements in the queue currently 48 virtual int count() const = 0; 49 }; 50 51 class InfLenFIFOQueue : public MPMCQueueInterface { 52 public: 53 // Creates a new MPMC Queue. The queue created will have infinite length. 54 InfLenFIFOQueue(); 55 56 // Releases all resources held by the queue. The queue must be empty, and no 57 // one waits on conditional variables. 58 ~InfLenFIFOQueue() override; 59 60 // Puts elem into queue immediately at the end of queue. Since the queue has 61 // infinite length, this routine will never block and should never fail. 62 void Put(void* elem) override; 63 64 // Removes the oldest element from the queue and returns it. 65 // This routine will cause the thread to block if queue is currently empty. 66 // Argument wait_time should be passed in when trace flag turning on (for 67 // collecting stats info purpose.) 68 void* Get(gpr_timespec* wait_time) override; 69 70 // Returns number of elements in queue currently. 71 // There might be concurrently add/remove on queue, so count might change 72 // quickly. count()73 int count() const override { return count_.Load(MemoryOrder::RELAXED); } 74 75 struct Node { 76 Node* next; // Linking 77 Node* prev; 78 void* content; // Points to actual element 79 gpr_timespec insert_time; // Time for stats 80 NodeNode81 Node() { 82 next = prev = nullptr; 83 content = nullptr; 84 } 85 }; 86 87 // For test purpose only. Returns number of nodes allocated in queue. 88 // Any allocated node will be alive until the destruction of the queue. num_nodes()89 int num_nodes() const { return num_nodes_; } 90 91 // For test purpose only. Returns the initial number of nodes in queue. init_num_nodes()92 int init_num_nodes() const { return kQueueInitNumNodes; } 93 94 private: 95 // For Internal Use Only. 96 // Removes the oldest element from the queue and returns it. This routine 97 // will NOT check whether queue is empty, and it will NOT acquire mutex. 98 // Caller MUST check that queue is not empty and must acquire mutex before 99 // callling. 100 void* PopFront(); 101 102 // Stats of queue. This will only be collect when debug trace mode is on. 103 // All printed stats info will have time measurement in microsecond. 104 struct Stats { 105 uint64_t num_started; // Number of elements have been added to queue 106 uint64_t num_completed; // Number of elements have been removed from 107 // the queue 108 gpr_timespec total_queue_time; // Total waiting time that all the 109 // removed elements have spent in queue 110 gpr_timespec max_queue_time; // Max waiting time among all removed 111 // elements 112 gpr_timespec busy_queue_time; // Accumulated amount of time that queue 113 // was not empty 114 StatsStats115 Stats() { 116 num_started = 0; 117 num_completed = 0; 118 total_queue_time = gpr_time_0(GPR_TIMESPAN); 119 max_queue_time = gpr_time_0(GPR_TIMESPAN); 120 busy_queue_time = gpr_time_0(GPR_TIMESPAN); 121 } 122 }; 123 124 // Node for waiting thread queue. Stands for one waiting thread, should have 125 // exact one thread waiting on its CondVar. 126 // Using a doubly linked list for waiting thread queue to wake up waiting 127 // threads in LIFO order to reduce cache misses. 128 struct Waiter { 129 CondVar cv; 130 Waiter* next; 131 Waiter* prev; 132 }; 133 134 // Pushs waiter to the front of queue, require caller held mutex 135 void PushWaiter(Waiter* waiter); 136 137 // Removes waiter from queue, require caller held mutex 138 void RemoveWaiter(Waiter* waiter); 139 140 // Returns pointer to the waiter that should be waken up next, should be the 141 // last added waiter. 142 Waiter* TopWaiter(); 143 144 Mutex mu_; // Protecting lock 145 Waiter waiters_; // Head of waiting thread queue 146 147 // Initial size for delete list 148 static const int kDeleteListInitSize = 1024; 149 // Initial number of nodes allocated 150 static const int kQueueInitNumNodes = 1024; 151 152 Node** delete_list_ = nullptr; // Keeps track of all allocated array entries 153 // for deleting on destruction 154 size_t delete_list_count_ = 0; // Number of entries in list 155 size_t delete_list_size_ = 0; // Size of the list. List will be expanded to 156 // double size on full 157 158 Node* queue_head_ = nullptr; // Head of the queue, remove position 159 Node* queue_tail_ = nullptr; // End of queue, insert position 160 Atomic<int> count_{0}; // Number of elements in queue 161 int num_nodes_ = 0; // Number of nodes allocated 162 163 Stats stats_; // Stats info 164 gpr_timespec busy_time; // Start time of busy queue 165 166 // Internal Helper. 167 // Allocates an array of nodes of size "num", links all nodes together except 168 // the first node's prev and last node's next. They should be set by caller 169 // manually afterward. 170 Node* AllocateNodes(int num); 171 }; 172 173 } // namespace grpc_core 174 175 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_MPMCQUEUE_H */ 176