• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! common facade & shim helpers
2 
3 use bt_facade_proto::common::Data;
4 use bytes::Bytes;
5 use futures::sink::SinkExt;
6 use grpcio::*;
7 use std::sync::Arc;
8 use tokio::runtime::Runtime;
9 use tokio::sync::mpsc::Receiver;
10 use tokio::sync::Mutex;
11 
12 /// Wrapper so we can invoke callbacks
13 pub trait U8SliceRunnable {
14     /// Do the thing
run(&self, data: &[u8])15     fn run(&self, data: &[u8]);
16 }
17 
18 /// Helper for interfacing channels with shim or gRPC boundaries
19 #[derive(Clone)]
20 pub struct RxAdapter<T> {
21     rx: Arc<Mutex<Receiver<T>>>,
22     running: bool,
23 }
24 
25 impl<T: 'static + Into<Vec<u8>> + Into<Bytes> + Send> RxAdapter<T> {
26     /// New, from an unwrapped receiver
new(rx: Receiver<T>) -> Self27     pub fn new(rx: Receiver<T>) -> Self {
28         Self::from_arc(Arc::new(Mutex::new(rx)))
29     }
30 
31     /// New, from an already arc mutexed receiver
from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self32     pub fn from_arc(rx: Arc<Mutex<Receiver<T>>>) -> Self {
33         Self { rx, running: false }
34     }
35 
36     /// Stream out the channel over the provided sink
stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>)37     pub fn stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink<Data>) {
38         assert!(!self.running);
39         self.running = true;
40 
41         let clone_rx = self.rx.clone();
42         ctx.spawn(async move {
43             while let Some(payload) = clone_rx.lock().await.recv().await {
44                 let mut data = Data::default();
45                 data.set_payload(payload.into());
46                 if let Err(e) = sink.send((data, WriteFlags::default())).await {
47                     log::error!("failure sending data: {:?}", e);
48                 }
49             }
50         });
51     }
52 
53     /// Stream out the channel over the provided shim runnable
stream_runnable<R: 'static + U8SliceRunnable + Send>( &mut self, rt: &Arc<Runtime>, runnable: R, )54     pub fn stream_runnable<R: 'static + U8SliceRunnable + Send>(
55         &mut self,
56         rt: &Arc<Runtime>,
57         runnable: R,
58     ) {
59         assert!(!self.running);
60         self.running = true;
61 
62         let clone_rx = self.rx.clone();
63         rt.spawn(async move {
64             while let Some(payload) = clone_rx.lock().await.recv().await {
65                 let bytes: Bytes = payload.into();
66                 runnable.run(&bytes);
67             }
68         });
69     }
70 }
71