1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/iomgr/executor/mpmcqueue.h"
20
21 #include <grpc/grpc.h>
22
23 #include "src/core/lib/gprpp/thd.h"
24 #include "test/core/util/test_config.h"
25
26 #define TEST_NUM_ITEMS 10000
27
28 // Testing items for queue
29 struct WorkItem {
30 int index;
31 bool done;
32
WorkItemWorkItem33 WorkItem(int i) : index(i) { done = false; }
34 };
35
36 // Thread to "produce" items and put items into queue
37 // It will also check that all items has been marked done and clean up all
38 // produced items on destructing.
39 class ProducerThread {
40 public:
ProducerThread(grpc_core::InfLenFIFOQueue * queue,int start_index,int num_items)41 ProducerThread(grpc_core::InfLenFIFOQueue* queue, int start_index,
42 int num_items)
43 : start_index_(start_index), num_items_(num_items), queue_(queue) {
44 items_ = nullptr;
45 thd_ = grpc_core::Thread(
46 "mpmcq_test_producer_thd",
47 [](void* th) { static_cast<ProducerThread*>(th)->Run(); }, this);
48 }
~ProducerThread()49 ~ProducerThread() {
50 for (int i = 0; i < num_items_; ++i) {
51 GPR_ASSERT(items_[i]->done);
52 delete items_[i];
53 }
54 gpr_free(items_);
55 }
56
Start()57 void Start() { thd_.Start(); }
Join()58 void Join() { thd_.Join(); }
59
60 private:
Run()61 void Run() {
62 items_ =
63 static_cast<WorkItem**>(gpr_zalloc(num_items_ * sizeof(WorkItem*)));
64 for (int i = 0; i < num_items_; ++i) {
65 items_[i] = new WorkItem(start_index_ + i);
66 queue_->Put(items_[i]);
67 }
68 }
69
70 int start_index_;
71 int num_items_;
72 grpc_core::InfLenFIFOQueue* queue_;
73 grpc_core::Thread thd_;
74 WorkItem** items_;
75 };
76
77 // Thread to pull out items from queue
78 class ConsumerThread {
79 public:
ConsumerThread(grpc_core::InfLenFIFOQueue * queue)80 ConsumerThread(grpc_core::InfLenFIFOQueue* queue) : queue_(queue) {
81 thd_ = grpc_core::Thread(
82 "mpmcq_test_consumer_thd",
83 [](void* th) { static_cast<ConsumerThread*>(th)->Run(); }, this);
84 }
~ConsumerThread()85 ~ConsumerThread() {}
86
Start()87 void Start() { thd_.Start(); }
Join()88 void Join() { thd_.Join(); }
89
90 private:
Run()91 void Run() {
92 // count number of Get() called in this thread
93 int count = 0;
94
95 WorkItem* item;
96 while ((item = static_cast<WorkItem*>(queue_->Get())) != nullptr) {
97 count++;
98 GPR_ASSERT(!item->done);
99 item->done = true;
100 }
101
102 gpr_log(GPR_DEBUG, "ConsumerThread: %d times of Get() called.", count);
103 }
104 grpc_core::InfLenFIFOQueue* queue_;
105 grpc_core::Thread thd_;
106 };
107
test_FIFO(void)108 static void test_FIFO(void) {
109 gpr_log(GPR_INFO, "test_FIFO");
110 grpc_core::InfLenFIFOQueue large_queue;
111 for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
112 large_queue.Put(static_cast<void*>(new WorkItem(i)));
113 }
114 GPR_ASSERT(large_queue.count() == TEST_NUM_ITEMS);
115 for (int i = 0; i < TEST_NUM_ITEMS; ++i) {
116 WorkItem* item = static_cast<WorkItem*>(large_queue.Get());
117 GPR_ASSERT(i == item->index);
118 delete item;
119 }
120 }
121
122 // Test if queue's behavior of expanding is correct. (Only does expansion when
123 // it gets full, and each time expands to doubled size).
test_space_efficiency(void)124 static void test_space_efficiency(void) {
125 gpr_log(GPR_INFO, "test_space_efficiency");
126 grpc_core::InfLenFIFOQueue queue;
127 for (int i = 0; i < queue.init_num_nodes(); ++i) {
128 queue.Put(static_cast<void*>(new WorkItem(i)));
129 }
130 // Queue should not have been expanded at this time.
131 GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
132 for (int i = 0; i < queue.init_num_nodes(); ++i) {
133 WorkItem* item = static_cast<WorkItem*>(queue.Get());
134 queue.Put(item);
135 }
136 GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
137 for (int i = 0; i < queue.init_num_nodes(); ++i) {
138 WorkItem* item = static_cast<WorkItem*>(queue.Get());
139 delete item;
140 }
141 // Queue never shrinks even it is empty.
142 GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes());
143 GPR_ASSERT(queue.count() == 0);
144 // queue empty now
145 for (int i = 0; i < queue.init_num_nodes() * 2; ++i) {
146 queue.Put(static_cast<void*>(new WorkItem(i)));
147 }
148 GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2);
149 // Queue should have been expanded once.
150 GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
151 for (int i = 0; i < queue.init_num_nodes(); ++i) {
152 WorkItem* item = static_cast<WorkItem*>(queue.Get());
153 delete item;
154 }
155 GPR_ASSERT(queue.count() == queue.init_num_nodes());
156 // Queue will never shrink, should keep same number of node as before.
157 GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 2);
158 for (int i = 0; i < queue.init_num_nodes() + 1; ++i) {
159 queue.Put(static_cast<void*>(new WorkItem(i)));
160 }
161 GPR_ASSERT(queue.count() == queue.init_num_nodes() * 2 + 1);
162 // Queue should have been expanded twice.
163 GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
164 for (int i = 0; i < queue.init_num_nodes() * 2 + 1; ++i) {
165 WorkItem* item = static_cast<WorkItem*>(queue.Get());
166 delete item;
167 }
168 GPR_ASSERT(queue.count() == 0);
169 GPR_ASSERT(queue.num_nodes() == queue.init_num_nodes() * 4);
170 gpr_log(GPR_DEBUG, "Done.");
171 }
172
test_many_thread(void)173 static void test_many_thread(void) {
174 gpr_log(GPR_INFO, "test_many_thread");
175 const int num_producer_threads = 10;
176 const int num_consumer_threads = 20;
177 grpc_core::InfLenFIFOQueue queue;
178 ProducerThread** producer_threads = static_cast<ProducerThread**>(
179 gpr_zalloc(num_producer_threads * sizeof(ProducerThread*)));
180 ConsumerThread** consumer_threads = static_cast<ConsumerThread**>(
181 gpr_zalloc(num_consumer_threads * sizeof(ConsumerThread*)));
182
183 gpr_log(GPR_DEBUG, "Fork ProducerThreads...");
184 for (int i = 0; i < num_producer_threads; ++i) {
185 producer_threads[i] =
186 new ProducerThread(&queue, i * TEST_NUM_ITEMS, TEST_NUM_ITEMS);
187 producer_threads[i]->Start();
188 }
189 gpr_log(GPR_DEBUG, "ProducerThreads Started.");
190 gpr_log(GPR_DEBUG, "Fork ConsumerThreads...");
191 for (int i = 0; i < num_consumer_threads; ++i) {
192 consumer_threads[i] = new ConsumerThread(&queue);
193 consumer_threads[i]->Start();
194 }
195 gpr_log(GPR_DEBUG, "ConsumerThreads Started.");
196 gpr_log(GPR_DEBUG, "Waiting ProducerThreads to finish...");
197 for (int i = 0; i < num_producer_threads; ++i) {
198 producer_threads[i]->Join();
199 }
200 gpr_log(GPR_DEBUG, "All ProducerThreads Terminated.");
201 gpr_log(GPR_DEBUG, "Terminating ConsumerThreads...");
202 for (int i = 0; i < num_consumer_threads; ++i) {
203 queue.Put(nullptr);
204 }
205 for (int i = 0; i < num_consumer_threads; ++i) {
206 consumer_threads[i]->Join();
207 }
208 gpr_log(GPR_DEBUG, "All ConsumerThreads Terminated.");
209 gpr_log(GPR_DEBUG, "Checking WorkItems and Cleaning Up...");
210 for (int i = 0; i < num_producer_threads; ++i) {
211 // Destructor of ProducerThread will do the check of WorkItems
212 delete producer_threads[i];
213 }
214 gpr_free(producer_threads);
215 for (int i = 0; i < num_consumer_threads; ++i) {
216 delete consumer_threads[i];
217 }
218 gpr_free(consumer_threads);
219 gpr_log(GPR_DEBUG, "Done.");
220 }
221
main(int argc,char ** argv)222 int main(int argc, char** argv) {
223 grpc::testing::TestEnvironment env(argc, argv);
224 grpc_init();
225 test_FIFO();
226 test_space_efficiency();
227 test_many_thread();
228 grpc_shutdown();
229 return 0;
230 }
231