1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. 2 3 use std::cell::UnsafeCell; 4 use std::collections::VecDeque; 5 use std::ptr; 6 use std::sync::atomic::{AtomicIsize, Ordering}; 7 use std::sync::Arc; 8 use std::thread::{self, ThreadId}; 9 10 use crate::error::{Error, Result}; 11 use crate::grpc_sys::{self, gpr_clock_type, grpc_completion_queue}; 12 use crate::task::UnfinishedWork; 13 14 pub use crate::grpc_sys::grpc_completion_type as EventType; 15 pub use crate::grpc_sys::grpc_event as Event; 16 17 /// `CompletionQueueHandle` enable notification of the completion of asynchronous actions. 18 pub struct CompletionQueueHandle { 19 cq: *mut grpc_completion_queue, 20 // When `ref_cnt` < 0, a shutdown is pending, completion queue should not 21 // accept requests anymore; when `ref_cnt` == 0, completion queue should 22 // be shutdown; When `ref_cnt` > 0, completion queue can accept requests 23 // and should not be shutdown. 24 ref_cnt: AtomicIsize, 25 } 26 27 unsafe impl Sync for CompletionQueueHandle {} 28 unsafe impl Send for CompletionQueueHandle {} 29 30 impl CompletionQueueHandle { new() -> CompletionQueueHandle31 pub fn new() -> CompletionQueueHandle { 32 CompletionQueueHandle { 33 cq: unsafe { grpc_sys::grpc_completion_queue_create_for_next(ptr::null_mut()) }, 34 ref_cnt: AtomicIsize::new(1), 35 } 36 } 37 add_ref(&self) -> Result<()>38 fn add_ref(&self) -> Result<()> { 39 let mut cnt = self.ref_cnt.load(Ordering::SeqCst); 40 loop { 41 if cnt <= 0 { 42 // `shutdown` has been called, reject any requests. 43 return Err(Error::QueueShutdown); 44 } 45 let new_cnt = cnt + 1; 46 match self.ref_cnt.compare_exchange_weak( 47 cnt, 48 new_cnt, 49 Ordering::SeqCst, 50 Ordering::SeqCst, 51 ) { 52 Ok(_) => return Ok(()), 53 Err(c) => cnt = c, 54 } 55 } 56 } 57 unref(&self)58 fn unref(&self) { 59 let mut cnt = self.ref_cnt.load(Ordering::SeqCst); 60 let shutdown = loop { 61 // If `shutdown` is not called, `cnt` > 0, so minus 1 to unref. 62 // If `shutdown` is called, `cnt` < 0, so plus 1 to unref. 63 let new_cnt = cnt - cnt.signum(); 64 match self.ref_cnt.compare_exchange_weak( 65 cnt, 66 new_cnt, 67 Ordering::SeqCst, 68 Ordering::SeqCst, 69 ) { 70 Ok(_) => break new_cnt == 0, 71 Err(c) => cnt = c, 72 } 73 }; 74 if shutdown { 75 unsafe { 76 grpc_sys::grpc_completion_queue_shutdown(self.cq); 77 } 78 } 79 } 80 shutdown(&self)81 fn shutdown(&self) { 82 let mut cnt = self.ref_cnt.load(Ordering::SeqCst); 83 let shutdown = loop { 84 if cnt <= 0 { 85 // `shutdown` is called, skipped. 86 return; 87 } 88 // Make cnt negative to indicate that `shutdown` has been called. 89 // Because `cnt` is initialized to 1, so minus 1 to make it reach 90 // toward 0. That is `new_cnt = -(cnt - 1) = -cnt + 1`. 91 let new_cnt = -cnt + 1; 92 match self.ref_cnt.compare_exchange_weak( 93 cnt, 94 new_cnt, 95 Ordering::SeqCst, 96 Ordering::SeqCst, 97 ) { 98 Ok(_) => break new_cnt == 0, 99 Err(c) => cnt = c, 100 } 101 }; 102 if shutdown { 103 unsafe { 104 grpc_sys::grpc_completion_queue_shutdown(self.cq); 105 } 106 } 107 } 108 } 109 110 impl Drop for CompletionQueueHandle { drop(&mut self)111 fn drop(&mut self) { 112 unsafe { grpc_sys::grpc_completion_queue_destroy(self.cq) } 113 } 114 } 115 116 pub struct CompletionQueueRef<'a> { 117 queue: &'a CompletionQueue, 118 } 119 120 impl<'a> CompletionQueueRef<'a> { as_ptr(&self) -> *mut grpc_completion_queue121 pub fn as_ptr(&self) -> *mut grpc_completion_queue { 122 self.queue.handle.cq 123 } 124 } 125 126 impl<'a> Drop for CompletionQueueRef<'a> { drop(&mut self)127 fn drop(&mut self) { 128 self.queue.handle.unref(); 129 } 130 } 131 132 /// `WorkQueue` stores the unfinished work of a completion queue. 133 /// 134 /// Every completion queue has a work queue, and every work queue belongs 135 /// to exact one completion queue. `WorkQueue` is a short path for future 136 /// notifications. When a future is ready to be polled, there are two way 137 /// to notify it. 138 /// 1. If it's in the same thread where the future is spawned, the future 139 /// will be pushed into `WorkQueue` and be polled when current call tag 140 /// is handled; 141 /// 2. If not, the future will be wrapped as a call tag and pushed into 142 /// completion queue and finally popped at the call to `grpc_completion_queue_next`. 143 pub struct WorkQueue { 144 id: ThreadId, 145 pending_work: UnsafeCell<VecDeque<UnfinishedWork>>, 146 } 147 148 unsafe impl Sync for WorkQueue {} 149 unsafe impl Send for WorkQueue {} 150 151 const QUEUE_CAPACITY: usize = 4096; 152 153 impl WorkQueue { new() -> WorkQueue154 pub fn new() -> WorkQueue { 155 WorkQueue { 156 id: std::thread::current().id(), 157 pending_work: UnsafeCell::new(VecDeque::with_capacity(QUEUE_CAPACITY)), 158 } 159 } 160 161 /// Pushes an unfinished work into the inner queue. 162 /// 163 /// If the method is not called from the same thread where it's created, 164 /// the work will returned and no work is pushed. push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork>165 pub fn push_work(&self, work: UnfinishedWork) -> Option<UnfinishedWork> { 166 if self.id == thread::current().id() { 167 unsafe { &mut *self.pending_work.get() }.push_back(work); 168 None 169 } else { 170 Some(work) 171 } 172 } 173 174 /// Pops one unfinished work. 175 /// 176 /// It should only be called from the same thread where the queue is created. 177 /// Otherwise it leads to undefined behavior. pop_work(&self) -> Option<UnfinishedWork>178 pub unsafe fn pop_work(&self) -> Option<UnfinishedWork> { 179 let queue = &mut *self.pending_work.get(); 180 if queue.capacity() > QUEUE_CAPACITY && queue.len() < queue.capacity() / 2 { 181 queue.shrink_to_fit(); 182 } 183 { &mut *self.pending_work.get() }.pop_back() 184 } 185 } 186 187 #[derive(Clone)] 188 pub struct CompletionQueue { 189 handle: Arc<CompletionQueueHandle>, 190 pub(crate) worker: Arc<WorkQueue>, 191 } 192 193 impl CompletionQueue { new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue194 pub fn new(handle: Arc<CompletionQueueHandle>, worker: Arc<WorkQueue>) -> CompletionQueue { 195 CompletionQueue { handle, worker } 196 } 197 198 /// Blocks until an event is available, the completion queue is being shut down. next(&self) -> Event199 pub fn next(&self) -> Event { 200 unsafe { 201 let inf = grpc_sys::gpr_inf_future(gpr_clock_type::GPR_CLOCK_REALTIME); 202 grpc_sys::grpc_completion_queue_next(self.handle.cq, inf, ptr::null_mut()) 203 } 204 } 205 borrow(&self) -> Result<CompletionQueueRef<'_>>206 pub fn borrow(&self) -> Result<CompletionQueueRef<'_>> { 207 self.handle.add_ref()?; 208 Ok(CompletionQueueRef { queue: self }) 209 } 210 211 /// Begin destruction of a completion queue. 212 /// 213 /// Once all possible events are drained then `next()` will start to produce 214 /// `Event::QueueShutdown` events only. shutdown(&self)215 pub fn shutdown(&self) { 216 self.handle.shutdown() 217 } 218 worker_id(&self) -> ThreadId219 pub fn worker_id(&self) -> ThreadId { 220 self.worker.id 221 } 222 } 223