• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/executor/mpmcqueue.h"
22 
23 namespace grpc_core {
24 
25 DebugOnlyTraceFlag grpc_thread_pool_trace(false, "thread_pool");
26 
PopFront()27 inline void* InfLenFIFOQueue::PopFront() {
28   // Caller should already check queue is not empty and has already held the
29   // mutex. This function will assume that there is at least one element in the
30   // queue (i.e. queue_head_->content is valid).
31   void* result = queue_head_->content;
32   count_.Store(count_.Load(MemoryOrder::RELAXED) - 1, MemoryOrder::RELAXED);
33 
34   // Updates Stats when trace flag turned on.
35   if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
36     gpr_timespec wait_time =
37         gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), queue_head_->insert_time);
38     stats_.num_completed++;
39     stats_.total_queue_time = gpr_time_add(stats_.total_queue_time, wait_time);
40     stats_.max_queue_time = gpr_time_max(
41         gpr_convert_clock_type(stats_.max_queue_time, GPR_TIMESPAN), wait_time);
42 
43     if (count_.Load(MemoryOrder::RELAXED) == 0) {
44       stats_.busy_queue_time =
45           gpr_time_add(stats_.busy_queue_time,
46                        gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), busy_time));
47     }
48 
49     gpr_log(GPR_INFO,
50             "[InfLenFIFOQueue PopFront] num_completed:        %" PRIu64
51             " total_queue_time: %f max_queue_time:   %f busy_queue_time:   %f",
52             stats_.num_completed,
53             gpr_timespec_to_micros(stats_.total_queue_time),
54             gpr_timespec_to_micros(stats_.max_queue_time),
55             gpr_timespec_to_micros(stats_.busy_queue_time));
56   }
57 
58   queue_head_ = queue_head_->next;
59   // Signal waiting thread
60   if (count_.Load(MemoryOrder::RELAXED) > 0) {
61     TopWaiter()->cv.Signal();
62   }
63 
64   return result;
65 }
66 
AllocateNodes(int num)67 InfLenFIFOQueue::Node* InfLenFIFOQueue::AllocateNodes(int num) {
68   num_nodes_ = num_nodes_ + num;
69   Node* new_chunk = static_cast<Node*>(gpr_zalloc(sizeof(Node) * num));
70   new_chunk[0].next = &new_chunk[1];
71   new_chunk[num - 1].prev = &new_chunk[num - 2];
72   for (int i = 1; i < num - 1; ++i) {
73     new_chunk[i].prev = &new_chunk[i - 1];
74     new_chunk[i].next = &new_chunk[i + 1];
75   }
76   return new_chunk;
77 }
78 
InfLenFIFOQueue()79 InfLenFIFOQueue::InfLenFIFOQueue() {
80   delete_list_size_ = kDeleteListInitSize;
81   delete_list_ =
82       static_cast<Node**>(gpr_zalloc(sizeof(Node*) * delete_list_size_));
83 
84   Node* new_chunk = AllocateNodes(kQueueInitNumNodes);
85   delete_list_[delete_list_count_++] = new_chunk;
86   queue_head_ = queue_tail_ = new_chunk;
87   new_chunk[0].prev = &new_chunk[kQueueInitNumNodes - 1];
88   new_chunk[kQueueInitNumNodes - 1].next = &new_chunk[0];
89 
90   waiters_.next = &waiters_;
91   waiters_.prev = &waiters_;
92 }
93 
~InfLenFIFOQueue()94 InfLenFIFOQueue::~InfLenFIFOQueue() {
95   GPR_ASSERT(count_.Load(MemoryOrder::RELAXED) == 0);
96   for (size_t i = 0; i < delete_list_count_; ++i) {
97     gpr_free(delete_list_[i]);
98   }
99   gpr_free(delete_list_);
100 }
101 
Put(void * elem)102 void InfLenFIFOQueue::Put(void* elem) {
103   MutexLock l(&mu_);
104 
105   int curr_count = count_.Load(MemoryOrder::RELAXED);
106 
107   if (queue_tail_ == queue_head_ && curr_count != 0) {
108     // List is full. Expands list to double size by inserting new chunk of nodes
109     Node* new_chunk = AllocateNodes(curr_count);
110     delete_list_[delete_list_count_++] = new_chunk;
111     // Expands delete list on full.
112     if (delete_list_count_ == delete_list_size_) {
113       delete_list_size_ = delete_list_size_ * 2;
114       delete_list_ = static_cast<Node**>(
115           gpr_realloc(delete_list_, sizeof(Node*) * delete_list_size_));
116     }
117     new_chunk[0].prev = queue_tail_->prev;
118     new_chunk[curr_count - 1].next = queue_head_;
119     queue_tail_->prev->next = new_chunk;
120     queue_head_->prev = &new_chunk[curr_count - 1];
121     queue_tail_ = new_chunk;
122   }
123   queue_tail_->content = static_cast<void*>(elem);
124 
125   // Updates Stats info
126   if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
127     stats_.num_started++;
128     gpr_log(GPR_INFO, "[InfLenFIFOQueue Put] num_started:        %" PRIu64,
129             stats_.num_started);
130     auto current_time = gpr_now(GPR_CLOCK_MONOTONIC);
131     if (curr_count == 0) {
132       busy_time = current_time;
133     }
134     queue_tail_->insert_time = current_time;
135   }
136 
137   count_.Store(curr_count + 1, MemoryOrder::RELAXED);
138   queue_tail_ = queue_tail_->next;
139 
140   TopWaiter()->cv.Signal();
141 }
142 
Get(gpr_timespec * wait_time)143 void* InfLenFIFOQueue::Get(gpr_timespec* wait_time) {
144   MutexLock l(&mu_);
145 
146   if (count_.Load(MemoryOrder::RELAXED) == 0) {
147     gpr_timespec start_time;
148     if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
149         wait_time != nullptr) {
150       start_time = gpr_now(GPR_CLOCK_MONOTONIC);
151     }
152 
153     Waiter self;
154     PushWaiter(&self);
155     do {
156       self.cv.Wait(&mu_);
157     } while (count_.Load(MemoryOrder::RELAXED) == 0);
158     RemoveWaiter(&self);
159     if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace) &&
160         wait_time != nullptr) {
161       *wait_time = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
162     }
163   }
164   GPR_DEBUG_ASSERT(count_.Load(MemoryOrder::RELAXED) > 0);
165   return PopFront();
166 }
167 
PushWaiter(Waiter * waiter)168 void InfLenFIFOQueue::PushWaiter(Waiter* waiter) {
169   waiter->next = waiters_.next;
170   waiter->prev = &waiters_;
171   waiter->next->prev = waiter;
172   waiter->prev->next = waiter;
173 }
174 
RemoveWaiter(Waiter * waiter)175 void InfLenFIFOQueue::RemoveWaiter(Waiter* waiter) {
176   GPR_DEBUG_ASSERT(waiter != &waiters_);
177   waiter->next->prev = waiter->prev;
178   waiter->prev->next = waiter->next;
179 }
180 
TopWaiter()181 InfLenFIFOQueue::Waiter* InfLenFIFOQueue::TopWaiter() { return waiters_.next; }
182 
183 }  // namespace grpc_core
184