• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::cell::UnsafeCell;
4 use std::collections::VecDeque;
5 use std::ptr;
6 use std::sync::atomic::{AtomicIsize, Ordering};
7 use std::sync::Arc;
8 use std::thread::{self, ThreadId};
9 
10 use crate::error::{Error, Result};
11 use crate::grpc_sys::{self, gpr_clock_type, grpc_completion_queue};
12 use crate::task::UnfinishedWork;
13 
14 pub use crate::grpc_sys::grpc_completion_type as EventType;
15 pub use crate::grpc_sys::grpc_event as Event;
16 
17 /// `CompletionQueueHandle` enable notification of the completion of asynchronous actions.
18 pub struct CompletionQueueHandle {
19     cq: *mut grpc_completion_queue,
20     // When `ref_cnt` < 0, a shutdown is pending, completion queue should not
21     // accept requests anymore; when `ref_cnt` == 0, completion queue should
22     // be shutdown; When `ref_cnt` > 0, completion queue can accept requests
23     // and should not be shutdown.
24     ref_cnt: AtomicIsize,
25 }
26 
27 unsafe impl Sync for CompletionQueueHandle {}
28 unsafe impl Send for CompletionQueueHandle {}
29 
30 impl CompletionQueueHandle {
new() -> CompletionQueueHandle31     pub fn new() -> CompletionQueueHandle {
32         CompletionQueueHandle {
33             cq: unsafe { grpc_sys::grpc_completion_queue_create_for_next(ptr::null_mut()) },
34             ref_cnt: AtomicIsize::new(1),
35         }
36     }
37 
add_ref(&self) -> Result<()>38     fn add_ref(&self) -> Result<()> {
39         let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
40         loop {
41             if cnt <= 0 {
42                 // `shutdown` has been called, reject any requests.
43                 return Err(Error::QueueShutdown);
44             }
45             let new_cnt = cnt + 1;
46             match self.ref_cnt.compare_exchange_weak(
47                 cnt,
48                 new_cnt,
49                 Ordering::SeqCst,
50                 Ordering::SeqCst,
51             ) {
52                 Ok(_) => return Ok(()),
53                 Err(c) => cnt = c,
54             }
55         }
56     }
57 
unref(&self)58     fn unref(&self) {
59         let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
60         let shutdown = loop {
61             // If `shutdown` is not called, `cnt` > 0, so minus 1 to unref.
62             // If `shutdown` is called, `cnt` < 0, so plus 1 to unref.
63             let new_cnt = cnt - cnt.signum();
64             match self.ref_cnt.compare_exchange_weak(
65                 cnt,
66                 new_cnt,
67                 Ordering::SeqCst,
68                 Ordering::SeqCst,
69             ) {
70                 Ok(_) => break new_cnt == 0,
71                 Err(c) => cnt = c,
72             }
73         };
74         if shutdown {
75             unsafe {
76                 grpc_sys::grpc_completion_queue_shutdown(self.cq);
77             }
78         }
79     }
80 
shutdown(&self)81     fn shutdown(&self) {
82         let mut cnt = self.ref_cnt.load(Ordering::SeqCst);
83         let shutdown = loop {
84             if cnt <= 0 {
85                 // `shutdown` is called, skipped.
86                 return;
87             }
88             // Make cnt negative to indicate that `shutdown` has been called.
89             // Because `cnt` is initialized to 1, so minus 1 to make it reach
90             // toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`.
91             let new_cnt = -cnt + 1;
92             match self.ref_cnt.compare_exchange_weak(
93                 cnt,
94                 new_cnt,
95                 Ordering::SeqCst,
96                 Ordering::SeqCst,
97             ) {
98                 Ok(_) => break new_cnt == 0,
99                 Err(c) => cnt = c,
100             }
101         };
102         if shutdown {
103             unsafe {
104                 grpc_sys::grpc_completion_queue_shutdown(self.cq);
105             }
106         }
107     }
108 }
109 
110 impl Drop for CompletionQueueHandle {
drop(&mut self)111     fn drop(&mut self) {
112         unsafe { grpc_sys::grpc_completion_queue_destroy(self.cq) }
113     }
114 }
115 
116 pub struct CompletionQueueRef<'a> {
117     queue: &'a CompletionQueue,
118 }
119 
120 impl<'a> CompletionQueueRef<'a> {
as_ptr(&self) -> *mut grpc_completion_queue121     pub fn as_ptr(&self) -> *mut grpc_completion_queue {
122         self.queue.handle.cq
123     }
124 }
125 
126 impl<'a> Drop for CompletionQueueRef<'a> {
drop(&mut self)127     fn drop(&mut self) {
128         self.queue.handle.unref();
129     }
130 }
131 
132 /// `WorkQueue` stores the unfinished work of a completion queue.
133 ///
134 /// Every completion queue has a work queue, and every work queue belongs
135 /// to exact one completion queue. `WorkQueue` is a short path for future
136 /// notifications. When a future is ready to be polled, there are two way
137 /// to notify it.
138 /// 1. If it's in the same thread where the future is spawned, the future
139 ///    will be pushed into `WorkQueue` and be polled when current call tag
140 ///    is handled;
141 /// 2. If not, the future will be wrapped as a call tag and pushed into
142 ///    completion queue and finally popped at the call to `grpc_completion_queue_next`.
143 pub struct WorkQueue {
144     id: ThreadId,
145     pending_work: UnsafeCell<VecDeque<UnfinishedWork>>,
146 }
147 
148 unsafe impl Sync for WorkQueue {}
149 unsafe impl Send for WorkQueue {}
150 
151 const QUEUE_CAPACITY: usize = 4096;
152 
153 impl WorkQueue {
new() -> WorkQueue154     pub fn new() -> WorkQueue {
155         WorkQueue {
156             id: std::thread::current().id(),
157             pending_work: UnsafeCell::new(VecDeque::with_capacity(QUEUE_CAPACITY)),
158         }
159     }
160 
161     /// Pushes an unfinished work into the inner queue.
162     ///
163     /// If the method is not called from the same thread where it's created,
164     /// the work will returned and no work is pushed.
push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork>165     pub fn push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork> {
166         if self.id == thread::current().id() {
167             unsafe { &mut *self.pending_work.get() }.push_back(work);
168             None
169         } else {
170             Some(work)
171         }
172     }
173 
174     /// Pops one unfinished work.
175     ///
176     /// It should only be called from the same thread where the queue is created.
177     /// Otherwise it leads to undefined behavior.
pop_work(&self) -> Option<UnfinishedWork>178     pub unsafe fn pop_work(&self) -> Option<UnfinishedWork> {
179         let queue = &mut *self.pending_work.get();
180         if queue.capacity() > QUEUE_CAPACITY && queue.len() < queue.capacity() / 2 {
181             queue.shrink_to_fit();
182         }
183         { &mut *self.pending_work.get() }.pop_back()
184     }
185 }
186 
187 #[derive(Clone)]
188 pub struct CompletionQueue {
189     handle: Arc<CompletionQueueHandle>,
190     pub(crate) worker: Arc<WorkQueue>,
191 }
192 
193 impl CompletionQueue {
new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue194     pub fn new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue {
195         CompletionQueue { handle, worker }
196     }
197 
198     /// Blocks until an event is available, the completion queue is being shut down.
next(&self) -> Event199     pub fn next(&self) -> Event {
200         unsafe {
201             let inf = grpc_sys::gpr_inf_future(gpr_clock_type::GPR_CLOCK_REALTIME);
202             grpc_sys::grpc_completion_queue_next(self.handle.cq, inf, ptr::null_mut())
203         }
204     }
205 
borrow(&self) -> Result<CompletionQueueRef<'_>>206     pub fn borrow(&self) -> Result<CompletionQueueRef<'_>> {
207         self.handle.add_ref()?;
208         Ok(CompletionQueueRef { queue: self })
209     }
210 
211     /// Begin destruction of a completion queue.
212     ///
213     /// Once all possible events are drained then `next()` will start to produce
214     /// `Event::QueueShutdown` events only.
shutdown(&self)215     pub fn shutdown(&self) {
216         self.handle.shutdown()
217     }
218 
worker_id(&self) -> ThreadId219     pub fn worker_id(&self) -> ThreadId {
220         self.worker.id
221     }
222 }
223