1 //! common facade & shim helpers 2 3 use bt_facade_proto::common::Data; 4 use bytes::Bytes; 5 use futures::sink::SinkExt; 6 use grpcio::*; 7 use std::sync::Arc; 8 use tokio::runtime::Runtime; 9 use tokio::sync::mpsc::Receiver; 10 use tokio::sync::Mutex; 11 12 /// Wrapper so we can invoke callbacks 13 pub trait U8SliceRunnable { 14 /// Do the thing run(&self, data: &[u8])15 fn run(&self, data: &[u8]); 16 } 17 18 /// Helper for interfacing channels with shim or gRPC boundaries 19 #[derive(Clone)] 20 pub struct RxAdapter<T> { 21 rx: Arc<Mutex<Receiver<T>>>, 22 running: bool, 23 } 24 25 impl<T: 'static + Into<Vec<u8>> + Into<Bytes> + Send> RxAdapter<T> { 26 /// New, from an unwrapped receiver new(rx: Receiver<T>) -> Self27 pub fn new(rx: Receiver<T>) -> Self { 28 Self::from_arc(Arc::new(Mutex::new(rx))) 29 } 30 31 /// New, from an already arc mutexed receiver from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self32 pub fn from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self { 33 Self { rx, running: false } 34 } 35 36 /// Stream out the channel over the provided sink stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>)37 pub fn stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>) { 38 assert!(!self.running); 39 self.running = true; 40 41 let clone_rx = self.rx.clone(); 42 ctx.spawn(async move { 43 while let Some(payload) = clone_rx.lock().await.recv().await { 44 let mut data = Data::default(); 45 data.set_payload(payload.into()); 46 if let Err(e) = sink.send((data, WriteFlags::default())).await { 47 log::error!("failure sending data: {:?}", e); 48 } 49 } 50 }); 51 } 52 53 /// Stream out the channel over the provided shim runnable stream_runnable<R: 'static + U8SliceRunnable + Send>( &mut self, rt: &Arc<Runtime>, runnable: R, )54 pub fn stream_runnable<R: 'static + U8SliceRunnable + Send>( 55 &mut self, 56 rt: &Arc<Runtime>, 57 runnable: R, 58 ) { 59 assert!(!self.running); 60 self.running = true; 61 62 let clone_rx = self.rx.clone(); 63 rt.spawn(async move { 64 while let Some(payload) = clone_rx.lock().await.recv().await { 65 let bytes: Bytes = payload.into(); 66 runnable.run(&bytes); 67 } 68 }); 69 } 70 } 71