1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. 2 3 use std::fmt::{self, Debug, Formatter}; 4 use std::sync::Arc; 5 6 use super::Inner; 7 use crate::call::{BatchContext, MessageReader, RpcStatusCode}; 8 use crate::error::Error; 9 10 /// Batch job type. 11 #[derive(PartialEq, Debug)] 12 pub enum BatchType { 13 /// Finish without reading any message. 14 Finish, 15 /// Extract one message when finish. 16 Read, 17 /// Check the rpc code and then extract one message. 18 CheckRead, 19 } 20 21 /// A promise used to resolve batch jobs. 22 pub struct Batch { 23 ty: BatchType, 24 ctx: BatchContext, 25 inner: Arc<Inner<Option<MessageReader>>>, 26 } 27 28 impl Batch { new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch29 pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch { 30 Batch { 31 ty, 32 ctx: BatchContext::new(), 33 inner, 34 } 35 } 36 context(&self) -> &BatchContext37 pub fn context(&self) -> &BatchContext { 38 &self.ctx 39 } 40 read_one_msg(&mut self, success: bool)41 fn read_one_msg(&mut self, success: bool) { 42 let task = { 43 let mut guard = self.inner.lock(); 44 if success { 45 guard.set_result(Ok(self.ctx.recv_message())) 46 } else { 47 // rely on C core to handle the failed read (e.g. deliver approriate 48 // statusCode on the clientside). 49 guard.set_result(Ok(None)) 50 } 51 }; 52 task.map(|t| t.wake()); 53 } 54 finish_response(&mut self, succeed: bool)55 fn finish_response(&mut self, succeed: bool) { 56 let task = { 57 let mut guard = self.inner.lock(); 58 if succeed { 59 let status = self.ctx.rpc_status(); 60 if status.status == RpcStatusCode::OK { 61 guard.set_result(Ok(None)) 62 } else { 63 guard.set_result(Err(Error::RpcFailure(status))) 64 } 65 } else { 66 guard.set_result(Err(Error::RemoteStopped)) 67 } 68 }; 69 task.map(|t| t.wake()); 70 } 71 handle_unary_response(&mut self)72 fn handle_unary_response(&mut self) { 73 let task = { 74 let mut guard = self.inner.lock(); 75 let status = self.ctx.rpc_status(); 76 if status.status == RpcStatusCode::OK { 77 guard.set_result(Ok(self.ctx.recv_message())) 78 } else { 79 guard.set_result(Err(Error::RpcFailure(status))) 80 } 81 }; 82 task.map(|t| t.wake()); 83 } 84 resolve(mut self, success: bool)85 pub fn resolve(mut self, success: bool) { 86 match self.ty { 87 BatchType::CheckRead => { 88 assert!(success); 89 self.handle_unary_response(); 90 } 91 BatchType::Finish => { 92 self.finish_response(success); 93 } 94 BatchType::Read => { 95 self.read_one_msg(success); 96 } 97 } 98 } 99 } 100 101 impl Debug for Batch { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result102 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 103 write!(f, "Batch [{:?}]", self.ty) 104 } 105 } 106 107 /// A promise used to resolve async action status. 108 /// 109 /// The action can only succeed or fail without extra error hint. 110 pub struct Action { 111 inner: Arc<Inner<bool>>, 112 } 113 114 impl Action { new(inner: Arc<Inner<bool>>) -> Action115 pub fn new(inner: Arc<Inner<bool>>) -> Action { 116 Action { inner } 117 } 118 resolve(self, success: bool)119 pub fn resolve(self, success: bool) { 120 let task = { 121 let mut guard = self.inner.lock(); 122 guard.set_result(Ok(success)) 123 }; 124 task.map(|t| t.wake()); 125 } 126 } 127