1 //! HFP service facade
2
3 use bt_topshim::btif::{BluetoothInterface, RawAddress, ToggleableProfile};
4 use bt_topshim::profiles::hfp::{Hfp, HfpCallbacks, HfpCallbacksDispatcher};
5 use bt_topshim_facade_protobuf::empty::Empty;
6 use bt_topshim_facade_protobuf::facade::{
7 ConnectAudioRequest, DisconnectAudioRequest, EventType, FetchEventsRequest,
8 FetchEventsResponse, SetVolumeRequest, StartSlcRequest, StopSlcRequest,
9 };
10 use bt_topshim_facade_protobuf::facade_grpc::{create_hfp_service, HfpService};
11 use futures::sink::SinkExt;
12 use grpcio::*;
13
14 use std::str::from_utf8;
15 use std::sync::{Arc, Mutex};
16 use tokio::runtime::Runtime;
17 use tokio::sync::mpsc;
18
get_hfp_dispatcher( _hfp: Arc<Mutex<Hfp>>, tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>, ) -> HfpCallbacksDispatcher19 fn get_hfp_dispatcher(
20 _hfp: Arc<Mutex<Hfp>>,
21 tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
22 ) -> HfpCallbacksDispatcher {
23 HfpCallbacksDispatcher {
24 dispatch: Box::new(move |cb: HfpCallbacks| {
25 println!("Hfp Callback found {:?}", cb);
26 if let HfpCallbacks::ConnectionState(state, address) = &cb {
27 println!("Hfp Connection state changed to {:?} for address {:?}", state, address);
28 }
29 let guard_tx = tx.lock().unwrap();
30 if let Some(event_tx) = guard_tx.as_ref() {
31 let txclone = event_tx.clone();
32 if txclone.try_send(cb.clone()).is_err() {
33 println!("Cannot send event {:?}", cb);
34 }
35 /*tokio::spawn(async move {
36 let _ = txclone.send(cb).await;
37 });*/
38 }
39 }),
40 }
41 }
42
43 /// Main object for Hfp facade service
44 #[derive(Clone)]
45 pub struct HfpServiceImpl {
46 #[allow(dead_code)]
47 rt: Arc<Runtime>,
48 pub btif_hfp: Arc<Mutex<Hfp>>,
49 #[allow(dead_code)]
50 event_tx: Arc<Mutex<Option<mpsc::Sender<HfpCallbacks>>>>,
51 }
52
53 impl HfpServiceImpl {
54 /// Create a new instance of the root facade service
create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service55 pub fn create(rt: Arc<Runtime>, btif_intf: Arc<Mutex<BluetoothInterface>>) -> grpcio::Service {
56 let btif_hfp = Arc::new(Mutex::new(Hfp::new(&btif_intf.lock().unwrap())));
57 let event_tx = Arc::new(Mutex::new(None));
58 btif_hfp.lock().unwrap().initialize(get_hfp_dispatcher(btif_hfp.clone(), event_tx.clone()));
59 btif_hfp.lock().unwrap().enable();
60 create_hfp_service(Self { rt, btif_hfp, event_tx })
61 }
62 }
63
64 impl HfpService for HfpServiceImpl {
start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>)65 fn start_slc(&mut self, ctx: RpcContext<'_>, req: StartSlcRequest, sink: UnarySink<Empty>) {
66 let hfp = self.btif_hfp.clone();
67 ctx.spawn(async move {
68 let addr_bytes = &req.connection.unwrap().cookie;
69 let bt_addr = from_utf8(addr_bytes).unwrap();
70 if let Some(addr) = RawAddress::from_string(bt_addr) {
71 hfp.lock().unwrap().connect(addr);
72 sink.success(Empty::default()).await.unwrap();
73 } else {
74 sink.fail(RpcStatus::with_message(
75 RpcStatusCode::INVALID_ARGUMENT,
76 format!("Invalid Request Address: {}", bt_addr),
77 ))
78 .await
79 .unwrap();
80 }
81 })
82 }
83
stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>)84 fn stop_slc(&mut self, ctx: RpcContext<'_>, req: StopSlcRequest, sink: UnarySink<Empty>) {
85 let hfp = self.btif_hfp.clone();
86 ctx.spawn(async move {
87 let addr_bytes = &req.connection.unwrap().cookie;
88 let bt_addr = from_utf8(addr_bytes).unwrap();
89 if let Some(addr) = RawAddress::from_string(bt_addr) {
90 hfp.lock().unwrap().disconnect(addr);
91 sink.success(Empty::default()).await.unwrap();
92 } else {
93 sink.fail(RpcStatus::with_message(
94 RpcStatusCode::INVALID_ARGUMENT,
95 format!("Invalid Request Address: {}", bt_addr),
96 ))
97 .await
98 .unwrap();
99 }
100 })
101 }
102
connect_audio( &mut self, ctx: RpcContext<'_>, req: ConnectAudioRequest, sink: UnarySink<Empty>, )103 fn connect_audio(
104 &mut self,
105 ctx: RpcContext<'_>,
106 req: ConnectAudioRequest,
107 sink: UnarySink<Empty>,
108 ) {
109 let hfp = self.btif_hfp.clone();
110 ctx.spawn(async move {
111 let addr_bytes = &req.connection.unwrap().cookie;
112 let bt_addr = from_utf8(addr_bytes).unwrap();
113 if let Some(addr) = RawAddress::from_string(bt_addr) {
114 hfp.lock().unwrap().connect_audio(addr, req.is_sco_offload_enabled, req.force_cvsd);
115 hfp.lock().unwrap().set_active_device(addr);
116 sink.success(Empty::default()).await.unwrap();
117 } else {
118 sink.fail(RpcStatus::with_message(
119 RpcStatusCode::INVALID_ARGUMENT,
120 format!("Invalid Request Address: {}", bt_addr),
121 ))
122 .await
123 .unwrap();
124 }
125 })
126 }
127
disconnect_audio( &mut self, ctx: RpcContext<'_>, req: DisconnectAudioRequest, sink: UnarySink<Empty>, )128 fn disconnect_audio(
129 &mut self,
130 ctx: RpcContext<'_>,
131 req: DisconnectAudioRequest,
132 sink: UnarySink<Empty>,
133 ) {
134 let hfp = self.btif_hfp.clone();
135 ctx.spawn(async move {
136 let addr_bytes = &req.connection.unwrap().cookie;
137 let bt_addr = from_utf8(addr_bytes).unwrap();
138 if let Some(addr) = RawAddress::from_string(bt_addr) {
139 hfp.lock().unwrap().disconnect_audio(addr);
140 sink.success(Empty::default()).await.unwrap();
141 } else {
142 sink.fail(RpcStatus::with_message(
143 RpcStatusCode::INVALID_ARGUMENT,
144 format!("Invalid Request Address: {}", bt_addr),
145 ))
146 .await
147 .unwrap();
148 }
149 })
150 }
151
set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>)152 fn set_volume(&mut self, ctx: RpcContext<'_>, req: SetVolumeRequest, sink: UnarySink<Empty>) {
153 let hfp = self.btif_hfp.clone();
154 ctx.spawn(async move {
155 let addr_bytes = &req.connection.unwrap().cookie;
156 let bt_addr = from_utf8(addr_bytes).unwrap();
157 if let Some(addr) = RawAddress::from_string(bt_addr) {
158 // TODO(aritrasen): Consider using TryFrom and cap the maximum volume here
159 // since `as` silently deals with data overflow, which might not be preferred.
160 hfp.lock().unwrap().set_volume(req.volume as i8, addr);
161 sink.success(Empty::default()).await.unwrap();
162 } else {
163 sink.fail(RpcStatus::with_message(
164 RpcStatusCode::INVALID_ARGUMENT,
165 format!("Invalid Request Address: {}", bt_addr),
166 ))
167 .await
168 .unwrap();
169 }
170 })
171 }
172
fetch_events( &mut self, ctx: RpcContext<'_>, _req: FetchEventsRequest, mut sink: ServerStreamingSink<FetchEventsResponse>, )173 fn fetch_events(
174 &mut self,
175 ctx: RpcContext<'_>,
176 _req: FetchEventsRequest,
177 mut sink: ServerStreamingSink<FetchEventsResponse>,
178 ) {
179 let (tx, mut rx) = mpsc::channel(10);
180 {
181 let mut guard = self.event_tx.lock().unwrap();
182 if guard.is_some() {
183 ctx.spawn(async move {
184 sink.fail(RpcStatus::with_message(
185 RpcStatusCode::UNAVAILABLE,
186 String::from("Profile is currently already connected and streaming"),
187 ))
188 .await
189 .unwrap();
190 });
191 return;
192 } else {
193 *guard = Some(tx);
194 }
195 }
196
197 ctx.spawn(async move {
198 while let Some(event) = rx.recv().await {
199 if let HfpCallbacks::ConnectionState(state, address) = event {
200 let mut rsp = FetchEventsResponse::new();
201 rsp.event_type = EventType::HFP_CONNECTION_STATE;
202 rsp.data = format!("{:?}, {}", state, address.to_string());
203 sink.send((rsp, WriteFlags::default())).await.unwrap();
204 }
205 }
206 })
207 }
208 }
209