• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! HFP service facade
2 
3 use bt_topshim::btif::{BluetoothInterface, RawAddress, ToggleableProfile};
4 use bt_topshim::profiles::hfp::{Hfp, HfpCallbacks, HfpCallbacksDispatcher};
5 use bt_topshim_facade_protobuf::empty::Empty;
6 use bt_topshim_facade_protobuf::facade::{
7     ConnectAudioRequest, DisconnectAudioRequest, EventType, FetchEventsRequest,
8     FetchEventsResponse, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
9 };
10 use bt_topshim_facade_protobuf::facade_grpc::{create_hfp_service, HfpService};
11 use futures::sink::SinkExt;
12 use grpcio::*;
13 
14 use std::str::from_utf8;
15 use std::sync::{Arc, Mutex};
16 use tokio::runtime::Runtime;
17 use tokio::sync::mpsc;
18 
get_hfp_dispatcher( _hfp: Arc<Mutex<Hfp>>, tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>, ) -> HfpCallbacksDispatcher19 fn get_hfp_dispatcher(
20     _hfp: Arc<Mutex<Hfp>>,
21     tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
22 ) -> HfpCallbacksDispatcher {
23     HfpCallbacksDispatcher {
24         dispatch: Box::new(move |cb: HfpCallbacks| {
25             println!("Hfp Callback found {:?}", cb);
26             if let HfpCallbacks::ConnectionState(state, address) = &cb {
27                 println!("Hfp Connection state changed to {:?} for address {:?}", state, address);
28             }
29             let guard_tx = tx.lock().unwrap();
30             if let Some(event_tx) = guard_tx.as_ref() {
31                 let txclone = event_tx.clone();
32                 if txclone.try_send(cb.clone()).is_err() {
33                     println!("Cannot send event {:?}", cb);
34                 }
35                 /*tokio::spawn(async move {
36                     let _ = txclone.send(cb).await;
37                 });*/
38             }
39         }),
40     }
41 }
42 
43 /// Main object for Hfp facade service
44 #[derive(Clone)]
45 pub struct HfpServiceImpl {
46     #[allow(dead_code)]
47     rt: Arc<Runtime>,
48     pub btif_hfp: Arc<Mutex<Hfp>>,
49     #[allow(dead_code)]
50     event_tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
51 }
52 
53 impl HfpServiceImpl {
54     /// Create a new instance of the root facade service
create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service55     pub fn create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service {
56         let btif_hfp = Arc::new(Mutex::new(Hfp::new(&btif_intf.lock().unwrap())));
57         let event_tx = Arc::new(Mutex::new(None));
58         btif_hfp.lock().unwrap().initialize(get_hfp_dispatcher(btif_hfp.clone(), event_tx.clone()));
59         btif_hfp.lock().unwrap().enable();
60         create_hfp_service(Self { rt, btif_hfp, event_tx })
61     }
62 }
63 
64 impl HfpService for HfpServiceImpl {
start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>)65     fn start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>) {
66         let hfp = self.btif_hfp.clone();
67         ctx.spawn(async move {
68             let addr_bytes = &req.connection.unwrap().cookie;
69             let bt_addr = from_utf8(addr_bytes).unwrap();
70             if let Some(addr) = RawAddress::from_string(bt_addr) {
71                 hfp.lock().unwrap().connect(addr);
72                 sink.success(Empty::default()).await.unwrap();
73             } else {
74                 sink.fail(RpcStatus::with_message(
75                     RpcStatusCode::INVALID_ARGUMENT,
76                     format!("Invalid Request Address: {}", bt_addr),
77                 ))
78                 .await
79                 .unwrap();
80             }
81         })
82     }
83 
stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>)84     fn stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>) {
85         let hfp = self.btif_hfp.clone();
86         ctx.spawn(async move {
87             let addr_bytes = &req.connection.unwrap().cookie;
88             let bt_addr = from_utf8(addr_bytes).unwrap();
89             if let Some(addr) = RawAddress::from_string(bt_addr) {
90                 hfp.lock().unwrap().disconnect(addr);
91                 sink.success(Empty::default()).await.unwrap();
92             } else {
93                 sink.fail(RpcStatus::with_message(
94                     RpcStatusCode::INVALID_ARGUMENT,
95                     format!("Invalid Request Address: {}", bt_addr),
96                 ))
97                 .await
98                 .unwrap();
99             }
100         })
101     }
102 
connect_audio( &mut self, ctx: RpcContext<'_>, req: ConnectAudioRequest, sink: UnarySink<Empty>, )103     fn connect_audio(
104         &mut self,
105         ctx: RpcContext<'_>,
106         req: ConnectAudioRequest,
107         sink: UnarySink<Empty>,
108     ) {
109         let hfp = self.btif_hfp.clone();
110         ctx.spawn(async move {
111             let addr_bytes = &req.connection.unwrap().cookie;
112             let bt_addr = from_utf8(addr_bytes).unwrap();
113             if let Some(addr) = RawAddress::from_string(bt_addr) {
114                 hfp.lock().unwrap().connect_audio(addr, req.is_sco_offload_enabled, req.force_cvsd);
115                 hfp.lock().unwrap().set_active_device(addr);
116                 sink.success(Empty::default()).await.unwrap();
117             } else {
118                 sink.fail(RpcStatus::with_message(
119                     RpcStatusCode::INVALID_ARGUMENT,
120                     format!("Invalid Request Address: {}", bt_addr),
121                 ))
122                 .await
123                 .unwrap();
124             }
125         })
126     }
127 
disconnect_audio( &mut self, ctx: RpcContext<'_>, req: DisconnectAudioRequest, sink: UnarySink<Empty>, )128     fn disconnect_audio(
129         &mut self,
130         ctx: RpcContext<'_>,
131         req: DisconnectAudioRequest,
132         sink: UnarySink<Empty>,
133     ) {
134         let hfp = self.btif_hfp.clone();
135         ctx.spawn(async move {
136             let addr_bytes = &req.connection.unwrap().cookie;
137             let bt_addr = from_utf8(addr_bytes).unwrap();
138             if let Some(addr) = RawAddress::from_string(bt_addr) {
139                 hfp.lock().unwrap().disconnect_audio(addr);
140                 sink.success(Empty::default()).await.unwrap();
141             } else {
142                 sink.fail(RpcStatus::with_message(
143                     RpcStatusCode::INVALID_ARGUMENT,
144                     format!("Invalid Request Address: {}", bt_addr),
145                 ))
146                 .await
147                 .unwrap();
148             }
149         })
150     }
151 
set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>)152     fn set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>) {
153         let hfp = self.btif_hfp.clone();
154         ctx.spawn(async move {
155             let addr_bytes = &req.connection.unwrap().cookie;
156             let bt_addr = from_utf8(addr_bytes).unwrap();
157             if let Some(addr) = RawAddress::from_string(bt_addr) {
158                 // TODO(aritrasen): Consider using TryFrom and cap the maximum volume here
159                 // since `as` silently deals with data overflow, which might not be preferred.
160                 hfp.lock().unwrap().set_volume(req.volume as i8, addr);
161                 sink.success(Empty::default()).await.unwrap();
162             } else {
163                 sink.fail(RpcStatus::with_message(
164                     RpcStatusCode::INVALID_ARGUMENT,
165                     format!("Invalid Request Address: {}", bt_addr),
166                 ))
167                 .await
168                 .unwrap();
169             }
170         })
171     }
172 
fetch_events( &mut self, ctx: RpcContext<'_>, _req: FetchEventsRequest, mut sink: ServerStreamingSink<FetchEventsResponse>, )173     fn fetch_events(
174         &mut self,
175         ctx: RpcContext<'_>,
176         _req: FetchEventsRequest,
177         mut sink: ServerStreamingSink<FetchEventsResponse>,
178     ) {
179         let (tx, mut rx) = mpsc::channel(10);
180         {
181             let mut guard = self.event_tx.lock().unwrap();
182             if guard.is_some() {
183                 ctx.spawn(async move {
184                     sink.fail(RpcStatus::with_message(
185                         RpcStatusCode::UNAVAILABLE,
186                         String::from("Profile is currently already connected and streaming"),
187                     ))
188                     .await
189                     .unwrap();
190                 });
191                 return;
192             } else {
193                 *guard = Some(tx);
194             }
195         }
196 
197         ctx.spawn(async move {
198             while let Some(event) = rx.recv().await {
199                 if let HfpCallbacks::ConnectionState(state, address) = event {
200                     let mut rsp = FetchEventsResponse::new();
201                     rsp.event_type = EventType::HFP_CONNECTION_STATE;
202                     rsp.data = format!("{:?}, {}", state, address.to_string());
203                     sink.send((rsp, WriteFlags::default())).await.unwrap();
204                 }
205             }
206         })
207     }
208 }
209