1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 //! gRPC C Core binds a call to a completion queue, all the related readiness
4 //! will be forwarded to the completion queue. This module utilizes the mechanism
5 //! and using `Kicker` to wake up completion queue.
6 //!
7 //! Apparently, to minimize context switch, it's better to bind the future to the
8 //! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
9
10 use std::cell::UnsafeCell;
11 use std::pin::Pin;
12 use std::sync::atomic::{AtomicU8, Ordering};
13 use std::sync::Arc;
14
15 use futures::future::Future;
16 use futures::task::{waker_ref, ArcWake, Context, Poll};
17
18 use super::CallTag;
19 use crate::call::Call;
20 use crate::cq::{CompletionQueue, WorkQueue};
21 use crate::error::{Error, Result};
22 use crate::grpc_sys::{self, grpc_call_error};
23
24 /// A handle to a `Spawn`.
25 /// Inner future is expected to be polled in the same thread as cq.
26 type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
27
28 /// `Kicker` wakes up the completion queue that the inner call binds to.
29 pub(crate) struct Kicker {
30 call: Call,
31 }
32
33 impl Kicker {
from_call(call: Call) -> Kicker34 pub fn from_call(call: Call) -> Kicker {
35 Kicker { call }
36 }
37
38 /// Wakes up its completion queue.
39 ///
40 /// `tag` will be popped by `grpc_completion_queue_next` in the future.
kick(&self, tag: Box<CallTag>) -> Result<()>41 pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
42 let _ref = self.call.cq.borrow()?;
43 unsafe {
44 let ptr = Box::into_raw(tag);
45 let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
46 if status == grpc_call_error::GRPC_CALL_OK {
47 Ok(())
48 } else {
49 Err(Error::CallFailure(status))
50 }
51 }
52 }
53 }
54
55 unsafe impl Sync for Kicker {}
56
57 impl Clone for Kicker {
clone(&self) -> Kicker58 fn clone(&self) -> Kicker {
59 // Bump call's reference count.
60 let call = unsafe {
61 grpc_sys::grpc_call_ref(self.call.call);
62 self.call.call
63 };
64 let cq = self.call.cq.clone();
65 Kicker {
66 call: Call { call, cq },
67 }
68 }
69 }
70
71 /// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
72 /// it will be notified via task.wake(), and marked as NOTIFIED. When executor
73 /// begins to poll the future, it's marked as POLLING. When the executor finishes
74 /// polling, the future can either be ready or not ready. In the former case, it's
75 /// marked as COMPLETED. If it's latter, it's marked as IDLE again.
76 ///
77 /// Note it's possible the future is notified during polling, in which case, executor
78 /// should polling it when last polling is finished unless it returns ready.
79 const NOTIFIED: u8 = 1;
80 const IDLE: u8 = 2;
81 const POLLING: u8 = 3;
82 const COMPLETED: u8 = 4;
83
84 /// Maintains the spawned future with state, so that it can be notified and polled efficiently.
85 pub struct SpawnTask {
86 handle: UnsafeCell<Option<SpawnHandle>>,
87 state: AtomicU8,
88 kicker: Kicker,
89 queue: Arc<WorkQueue>,
90 }
91
92 /// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
93 ///
94 /// Sync is required by `ArcWake`.
95 unsafe impl Sync for SpawnTask {}
96
97 impl SpawnTask {
new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask98 fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
99 SpawnTask {
100 handle: UnsafeCell::new(Some(s)),
101 state: AtomicU8::new(IDLE),
102 kicker,
103 queue,
104 }
105 }
106
107 /// Marks the state of this task to NOTIFIED.
108 ///
109 /// Returns true means the task was IDLE, needs to be scheduled.
mark_notified(&self) -> bool110 fn mark_notified(&self) -> bool {
111 loop {
112 match self.state.compare_exchange_weak(
113 IDLE,
114 NOTIFIED,
115 Ordering::AcqRel,
116 Ordering::Acquire,
117 ) {
118 Ok(_) => return true,
119 Err(POLLING) => match self.state.compare_exchange_weak(
120 POLLING,
121 NOTIFIED,
122 Ordering::AcqRel,
123 Ordering::Acquire,
124 ) {
125 Err(IDLE) | Err(POLLING) => continue,
126 // If it succeeds, then executor will poll the future again;
127 // if it fails, then the future should be resolved. In both
128 // cases, no need to notify the future, hence return false.
129 _ => return false,
130 },
131 Err(IDLE) => continue,
132 _ => return false,
133 }
134 }
135 }
136 }
137
resolve(task: Arc<SpawnTask>, success: bool)138 pub fn resolve(task: Arc<SpawnTask>, success: bool) {
139 // it should always be canceled for now.
140 assert!(success);
141 poll(task, true);
142 }
143
144 /// A custom Waker.
145 ///
146 /// It will push the inner future to work_queue if it's notified on the
147 /// same thread as inner cq.
148 impl ArcWake for SpawnTask {
wake_by_ref(task: &Arc<Self>)149 fn wake_by_ref(task: &Arc<Self>) {
150 if !task.mark_notified() {
151 return;
152 }
153
154 // It can lead to deadlock if poll the future immediately. So we need to
155 // defer the work instead.
156 if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
157 match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
158 // If the queue is shutdown, then the tag will be notified
159 // eventually. So just skip here.
160 Err(Error::QueueShutdown) => (),
161 Err(e) => panic!("unexpected error when canceling call: {:?}", e),
162 _ => (),
163 }
164 }
165 }
166 }
167
168 /// Work that should be deferred to be handled.
169 ///
170 /// Sometimes a work can't be done immediately as it might lead
171 /// to resource conflict, deadlock for example. So they will be
172 /// pushed into a queue and handled when current work is done.
173 pub struct UnfinishedWork(Arc<SpawnTask>);
174
175 impl UnfinishedWork {
finish(self)176 pub fn finish(self) {
177 resolve(self.0, true);
178 }
179 }
180
181 /// Poll the future.
182 ///
183 /// `woken` indicates that if the cq is waken up by itself.
poll(task: Arc<SpawnTask>, woken: bool)184 fn poll(task: Arc<SpawnTask>, woken: bool) {
185 let mut init_state = if woken { NOTIFIED } else { IDLE };
186 // TODO: maybe we need to break the loop to avoid hunger.
187 loop {
188 match task
189 .state
190 .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
191 {
192 Ok(_) => {}
193 Err(COMPLETED) => return,
194 Err(s) => panic!("unexpected state {}", s),
195 }
196
197 let waker = waker_ref(&task);
198 let mut cx = Context::from_waker(&waker);
199
200 // L208 "lock"s state, hence it's safe to get a mutable reference.
201 match unsafe { &mut *task.handle.get() }
202 .as_mut()
203 .unwrap()
204 .as_mut()
205 .poll(&mut cx)
206 {
207 Poll::Ready(()) => {
208 task.state.store(COMPLETED, Ordering::Release);
209 unsafe { &mut *task.handle.get() }.take();
210 }
211 _ => {
212 match task.state.compare_exchange(
213 POLLING,
214 IDLE,
215 Ordering::AcqRel,
216 Ordering::Acquire,
217 ) {
218 Ok(_) => return,
219 Err(NOTIFIED) => {
220 init_state = NOTIFIED;
221 }
222 Err(s) => panic!("unexpected state {}", s),
223 }
224 }
225 }
226 }
227 }
228
229 /// An executor that drives a future in the gRPC poll thread, which
230 /// can reduce thread context switching.
231 pub(crate) struct Executor<'a> {
232 cq: &'a CompletionQueue,
233 }
234
235 impl<'a> Executor<'a> {
new(cq: &CompletionQueue) -> Executor<'_>236 pub fn new(cq: &CompletionQueue) -> Executor<'_> {
237 Executor { cq }
238 }
239
cq(&self) -> &CompletionQueue240 pub fn cq(&self) -> &CompletionQueue {
241 self.cq
242 }
243
244 /// Spawn the future into inner poll loop.
245 ///
246 /// If you want to trace the future, you may need to create a sender/receiver
247 /// pair by yourself.
spawn<F>(&self, f: F, kicker: Kicker) where F: Future<Output = ()> + Send + 'static,248 pub fn spawn<F>(&self, f: F, kicker: Kicker)
249 where
250 F: Future<Output = ()> + Send + 'static,
251 {
252 let s = Box::pin(f);
253 let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
254 poll(notify, false)
255 }
256 }
257