• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::mpsc;
5 use std::sync::Arc;
6 use std::thread::{Builder as ThreadBuilder, JoinHandle};
7 
8 use crate::grpc_sys;
9 
10 use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
11 use crate::task::CallTag;
12 
13 // event loop
poll_queue(tx: mpsc::Sender<CompletionQueue>)14 fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
15     let cq = Arc::new(CompletionQueueHandle::new());
16     let worker_info = Arc::new(WorkQueue::new());
17     let cq = CompletionQueue::new(cq, worker_info);
18     tx.send(cq.clone()).expect("send back completion queue");
19     loop {
20         let e = cq.next();
21         match e.type_ {
22             EventType::GRPC_QUEUE_SHUTDOWN => break,
23             // timeout should not happen in theory.
24             EventType::GRPC_QUEUE_TIMEOUT => continue,
25             EventType::GRPC_OP_COMPLETE => {}
26         }
27 
28         let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
29 
30         tag.resolve(&cq, e.success != 0);
31         while let Some(work) = unsafe { cq.worker.pop_work() } {
32             work.finish();
33         }
34     }
35 }
36 
37 /// [`Environment`] factory in order to configure the properties.
38 pub struct EnvBuilder {
39     cq_count: usize,
40     name_prefix: Option<String>,
41     after_start: Option<Arc<dyn Fn() + Send + Sync>>,
42     before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
43 }
44 
45 impl EnvBuilder {
46     /// Initialize a new [`EnvBuilder`].
new() -> EnvBuilder47     pub fn new() -> EnvBuilder {
48         EnvBuilder {
49             cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
50             name_prefix: None,
51             after_start: None,
52             before_stop: None,
53         }
54     }
55 
56     /// Set the number of completion queues and polling threads. Each thread polls
57     /// one completion queue.
58     ///
59     /// # Panics
60     ///
61     /// This method will panic if `count` is 0.
cq_count(mut self, count: usize) -> EnvBuilder62     pub fn cq_count(mut self, count: usize) -> EnvBuilder {
63         assert!(count > 0);
64         self.cq_count = count;
65         self
66     }
67 
68     /// Set the thread name prefix of each polling thread.
name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder69     pub fn name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder {
70         self.name_prefix = Some(prefix.into());
71         self
72     }
73 
74     /// Execute function `f` after each thread is started but before it starts doing work.
after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder75     pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
76         self.after_start = Some(Arc::new(f));
77         self
78     }
79 
80     /// Execute function `f` before each thread stops.
before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder81     pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
82         self.before_stop = Some(Arc::new(f));
83         self
84     }
85 
86     /// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
build(self) -> Environment87     pub fn build(self) -> Environment {
88         unsafe {
89             grpc_sys::grpc_init();
90         }
91         let mut cqs = Vec::with_capacity(self.cq_count);
92         let mut handles = Vec::with_capacity(self.cq_count);
93         let (tx, rx) = mpsc::channel();
94         for i in 0..self.cq_count {
95             let tx_i = tx.clone();
96             let mut builder = ThreadBuilder::new();
97             if let Some(ref prefix) = self.name_prefix {
98                 builder = builder.name(format!("{}-{}", prefix, i));
99             }
100             let after_start = self.after_start.clone();
101             let before_stop = self.before_stop.clone();
102             let handle = builder
103                 .spawn(move || {
104                     if let Some(f) = after_start {
105                         f();
106                     }
107                     poll_queue(tx_i);
108                     if let Some(f) = before_stop {
109                         f();
110                     }
111                 })
112                 .unwrap();
113             handles.push(handle);
114         }
115         for _ in 0..self.cq_count {
116             cqs.push(rx.recv().unwrap());
117         }
118 
119         Environment {
120             cqs,
121             idx: AtomicUsize::new(0),
122             _handles: handles,
123         }
124     }
125 }
126 
127 /// An object that used to control concurrency and start gRPC event loop.
128 pub struct Environment {
129     cqs: Vec<CompletionQueue>,
130     idx: AtomicUsize,
131     _handles: Vec<JoinHandle<()>>,
132 }
133 
134 impl Environment {
135     /// Initialize gRPC and create a thread pool to poll completion queue. The thread pool size
136     /// and the number of completion queue is specified by `cq_count`. Each thread polls one
137     /// completion queue.
138     ///
139     /// # Panics
140     ///
141     /// This method will panic if `cq_count` is 0.
new(cq_count: usize) -> Environment142     pub fn new(cq_count: usize) -> Environment {
143         assert!(cq_count > 0);
144         EnvBuilder::new()
145             .name_prefix("grpc-poll")
146             .cq_count(cq_count)
147             .build()
148     }
149 
150     /// Get all the created completion queues.
completion_queues(&self) -> &[CompletionQueue]151     pub fn completion_queues(&self) -> &[CompletionQueue] {
152         self.cqs.as_slice()
153     }
154 
155     /// Pick an arbitrary completion queue.
pick_cq(&self) -> CompletionQueue156     pub fn pick_cq(&self) -> CompletionQueue {
157         let idx = self.idx.fetch_add(1, Ordering::Relaxed);
158         self.cqs[idx % self.cqs.len()].clone()
159     }
160 }
161 
162 impl Drop for Environment {
drop(&mut self)163     fn drop(&mut self) {
164         for cq in self.completion_queues() {
165             // it's safe to shutdown more than once.
166             cq.shutdown()
167         }
168     }
169 }
170 
171 #[cfg(test)]
172 mod tests {
173     use super::*;
174 
175     #[test]
test_basic_loop()176     fn test_basic_loop() {
177         let mut env = Environment::new(2);
178 
179         let q1 = env.pick_cq();
180         let q2 = env.pick_cq();
181         let q3 = env.pick_cq();
182         let cases = vec![(&q1, &q3, true), (&q1, &q2, false)];
183         for (lq, rq, is_eq) in cases {
184             let lq_ref = lq.borrow().unwrap();
185             let rq_ref = rq.borrow().unwrap();
186             if is_eq {
187                 assert_eq!(lq_ref.as_ptr(), rq_ref.as_ptr());
188             } else {
189                 assert_ne!(lq_ref.as_ptr(), rq_ref.as_ptr());
190             }
191         }
192 
193         assert_eq!(env.completion_queues().len(), 2);
194         for cq in env.completion_queues() {
195             cq.shutdown();
196         }
197 
198         for handle in env._handles.drain(..) {
199             handle.join().unwrap();
200         }
201     }
202 }
203