1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::mpsc;
5 use std::sync::Arc;
6 use std::thread::{Builder as ThreadBuilder, JoinHandle};
7
8 use crate::grpc_sys;
9
10 use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
11 use crate::task::CallTag;
12
13 // event loop
poll_queue(tx: mpsc::Sender<CompletionQueue>)14 fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
15 let cq = Arc::new(CompletionQueueHandle::new());
16 let worker_info = Arc::new(WorkQueue::new());
17 let cq = CompletionQueue::new(cq, worker_info);
18 tx.send(cq.clone()).expect("send back completion queue");
19 loop {
20 let e = cq.next();
21 match e.type_ {
22 EventType::GRPC_QUEUE_SHUTDOWN => break,
23 // timeout should not happen in theory.
24 EventType::GRPC_QUEUE_TIMEOUT => continue,
25 EventType::GRPC_OP_COMPLETE => {}
26 }
27
28 let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
29
30 tag.resolve(&cq, e.success != 0);
31 while let Some(work) = unsafe { cq.worker.pop_work() } {
32 work.finish();
33 }
34 }
35 }
36
37 /// [`Environment`] factory in order to configure the properties.
38 pub struct EnvBuilder {
39 cq_count: usize,
40 name_prefix: Option<String>,
41 after_start: Option<Arc<dyn Fn() + Send + Sync>>,
42 before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
43 }
44
45 impl EnvBuilder {
46 /// Initialize a new [`EnvBuilder`].
new() -> EnvBuilder47 pub fn new() -> EnvBuilder {
48 EnvBuilder {
49 cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
50 name_prefix: None,
51 after_start: None,
52 before_stop: None,
53 }
54 }
55
56 /// Set the number of completion queues and polling threads. Each thread polls
57 /// one completion queue.
58 ///
59 /// # Panics
60 ///
61 /// This method will panic if `count` is 0.
cq_count(mut self, count: usize) -> EnvBuilder62 pub fn cq_count(mut self, count: usize) -> EnvBuilder {
63 assert!(count > 0);
64 self.cq_count = count;
65 self
66 }
67
68 /// Set the thread name prefix of each polling thread.
name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder69 pub fn name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder {
70 self.name_prefix = Some(prefix.into());
71 self
72 }
73
74 /// Execute function `f` after each thread is started but before it starts doing work.
after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder75 pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
76 self.after_start = Some(Arc::new(f));
77 self
78 }
79
80 /// Execute function `f` before each thread stops.
before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder81 pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
82 self.before_stop = Some(Arc::new(f));
83 self
84 }
85
86 /// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
build(self) -> Environment87 pub fn build(self) -> Environment {
88 unsafe {
89 grpc_sys::grpc_init();
90 }
91 let mut cqs = Vec::with_capacity(self.cq_count);
92 let mut handles = Vec::with_capacity(self.cq_count);
93 let (tx, rx) = mpsc::channel();
94 for i in 0..self.cq_count {
95 let tx_i = tx.clone();
96 let mut builder = ThreadBuilder::new();
97 if let Some(ref prefix) = self.name_prefix {
98 builder = builder.name(format!("{}-{}", prefix, i));
99 }
100 let after_start = self.after_start.clone();
101 let before_stop = self.before_stop.clone();
102 let handle = builder
103 .spawn(move || {
104 if let Some(f) = after_start {
105 f();
106 }
107 poll_queue(tx_i);
108 if let Some(f) = before_stop {
109 f();
110 }
111 })
112 .unwrap();
113 handles.push(handle);
114 }
115 for _ in 0..self.cq_count {
116 cqs.push(rx.recv().unwrap());
117 }
118
119 Environment {
120 cqs,
121 idx: AtomicUsize::new(0),
122 _handles: handles,
123 }
124 }
125 }
126
127 /// An object that used to control concurrency and start gRPC event loop.
128 pub struct Environment {
129 cqs: Vec<CompletionQueue>,
130 idx: AtomicUsize,
131 _handles: Vec<JoinHandle<()>>,
132 }
133
134 impl Environment {
135 /// Initialize gRPC and create a thread pool to poll completion queue. The thread pool size
136 /// and the number of completion queue is specified by `cq_count`. Each thread polls one
137 /// completion queue.
138 ///
139 /// # Panics
140 ///
141 /// This method will panic if `cq_count` is 0.
new(cq_count: usize) -> Environment142 pub fn new(cq_count: usize) -> Environment {
143 assert!(cq_count > 0);
144 EnvBuilder::new()
145 .name_prefix("grpc-poll")
146 .cq_count(cq_count)
147 .build()
148 }
149
150 /// Get all the created completion queues.
completion_queues(&self) -> &[CompletionQueue]151 pub fn completion_queues(&self) -> &[CompletionQueue] {
152 self.cqs.as_slice()
153 }
154
155 /// Pick an arbitrary completion queue.
pick_cq(&self) -> CompletionQueue156 pub fn pick_cq(&self) -> CompletionQueue {
157 let idx = self.idx.fetch_add(1, Ordering::Relaxed);
158 self.cqs[idx % self.cqs.len()].clone()
159 }
160 }
161
162 impl Drop for Environment {
drop(&mut self)163 fn drop(&mut self) {
164 for cq in self.completion_queues() {
165 // it's safe to shutdown more than once.
166 cq.shutdown()
167 }
168 }
169 }
170
171 #[cfg(test)]
172 mod tests {
173 use super::*;
174
175 #[test]
test_basic_loop()176 fn test_basic_loop() {
177 let mut env = Environment::new(2);
178
179 let q1 = env.pick_cq();
180 let q2 = env.pick_cq();
181 let q3 = env.pick_cq();
182 let cases = vec![(&q1, &q3, true), (&q1, &q2, false)];
183 for (lq, rq, is_eq) in cases {
184 let lq_ref = lq.borrow().unwrap();
185 let rq_ref = rq.borrow().unwrap();
186 if is_eq {
187 assert_eq!(lq_ref.as_ptr(), rq_ref.as_ptr());
188 } else {
189 assert_ne!(lq_ref.as_ptr(), rq_ref.as_ptr());
190 }
191 }
192
193 assert_eq!(env.completion_queues().len(), 2);
194 for cq in env.completion_queues() {
195 cq.shutdown();
196 }
197
198 for handle in env._handles.drain(..) {
199 handle.join().unwrap();
200 }
201 }
202 }
203