• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! The core event loop for Rust modules. Here Rust modules are started in
16 //! dependency order.
17 
18 use gatt::channel::AttTransport;
19 use gatt::GattCallbacks;
20 use log::{info, warn};
21 use tokio::task::LocalSet;
22 
23 use std::rc::Rc;
24 use std::sync::Mutex;
25 use std::thread::JoinHandle;
26 use tokio::runtime::Builder;
27 
28 use tokio::sync::mpsc;
29 
30 pub mod core;
31 pub mod gatt;
32 pub mod packets;
33 pub mod utils;
34 
35 /// The Rust Modules runner. Starts and processes messages from Java / C++
36 /// while the Rust thread is running.  Starts in an idle state.
37 #[derive(Default, Debug)]
38 enum RustModuleRunner {
39     #[default]
40     NotRunning,
41     /// Main event loop is running and messages can be processed.  Use [`RustModuleRunner::send`] to
42     /// queue a callback to be sent.
43     Running {
44         thread: JoinHandle<()>,
45         tx: mpsc::UnboundedSender<BoxedMainThreadCallback>,
46     },
47     DisabledInTest,
48 }
49 
50 /// The ModuleViews lets us access all publicly accessible Rust modules from
51 /// Java / C++ while the stack is running. If a module should not be exposed
52 /// outside of Rust GD, there is no need to include it here.
53 pub struct ModuleViews<'a> {
54     /// Lets us call out into C++
55     pub gatt_outgoing_callbacks: Rc<dyn GattCallbacks>,
56     /// Receives synchronous callbacks from JNI
57     pub gatt_incoming_callbacks: Rc<gatt::callbacks::CallbackTransactionManager>,
58     /// Proxies calls into GATT server
59     pub gatt_module: &'a mut gatt::server::GattModule,
60 }
61 
62 static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new());
63 
64 impl RustModuleRunner {
new() -> Self65     const fn new() -> Self {
66         Self::NotRunning
67     }
68 
69     /// Handles bringup of all Rust modules. This occurs after GD C++ modules
70     /// have started, but before the legacy stack has initialized.
71     /// Must be invoked from the Rust thread after JNI initializes it and passes
72     /// in JNI modules.
start( gatt_callbacks: impl GattCallbacks + Send + 'static, att_transport: impl AttTransport + Send + 'static, on_started: impl FnOnce() + Send + 'static, )73     pub fn start(
74         gatt_callbacks: impl GattCallbacks + Send + 'static,
75         att_transport: impl AttTransport + Send + 'static,
76         on_started: impl FnOnce() + Send + 'static,
77     ) {
78         let mut runner = GLOBAL_MODULE_RUNNER.lock().unwrap();
79 
80         if let Self::Running { .. } = &*runner {
81             panic!("Already running");
82         }
83 
84         let (tx, rx) = mpsc::unbounded_channel();
85         let thread = std::thread::spawn(move || {
86             RustModuleRunner::run(Rc::new(gatt_callbacks), Rc::new(att_transport), on_started, rx)
87         });
88 
89         *runner = Self::Running { thread, tx };
90     }
91 
92     /// Externally stop the global runner.
stop()93     pub fn stop() {
94         match std::mem::replace(&mut *GLOBAL_MODULE_RUNNER.lock().unwrap(), Self::NotRunning) {
95             Self::NotRunning => warn!("Already not running"),
96             Self::Running { thread, tx } => {
97                 // Dropping the send end of the channel should cause the runner to stop.
98                 std::mem::drop(tx);
99 
100                 // Wait for the thread to terminate.
101                 let _ = thread.join();
102             }
103             Self::DisabledInTest => {}
104         }
105     }
106 
set_disabled_in_test()107     pub fn set_disabled_in_test() {
108         let mut runner = GLOBAL_MODULE_RUNNER.lock().unwrap();
109         match &*runner {
110             RustModuleRunner::NotRunning => *runner = Self::DisabledInTest,
111             _ => warn!("Unexpected state {:?}", &*runner),
112         }
113     }
114 
run( gatt_callbacks: Rc<dyn GattCallbacks>, att_transport: Rc<dyn AttTransport>, on_started: impl FnOnce(), mut rx: mpsc::UnboundedReceiver<BoxedMainThreadCallback>, )115     fn run(
116         gatt_callbacks: Rc<dyn GattCallbacks>,
117         att_transport: Rc<dyn AttTransport>,
118         on_started: impl FnOnce(),
119         mut rx: mpsc::UnboundedReceiver<BoxedMainThreadCallback>,
120     ) {
121         info!("starting Rust modules");
122 
123         let rt = Builder::new_current_thread()
124             .enable_all()
125             .build()
126             .expect("failed to start tokio runtime");
127         let local = LocalSet::new();
128 
129         // Setup FFI and C++ modules
130         let arbiter = gatt::arbiter::initialize_arbiter();
131 
132         // Now enter the runtime
133         local.block_on(&rt, async move {
134             // Then follow the pure-Rust modules
135             let gatt_incoming_callbacks =
136                 Rc::new(gatt::callbacks::CallbackTransactionManager::new(gatt_callbacks.clone()));
137             let gatt_module = &mut gatt::server::GattModule::new(att_transport.clone(), arbiter);
138 
139             // All modules that are visible from incoming JNI / top-level interfaces should
140             // be exposed here
141             let mut modules = ModuleViews {
142                 gatt_outgoing_callbacks: gatt_callbacks,
143                 gatt_incoming_callbacks,
144                 gatt_module,
145             };
146 
147             // notify upper layer that we are ready to receive messages
148             on_started();
149 
150             // This is the core event loop that serializes incoming requests into the Rust
151             // thread do_in_rust_thread lets us post into here from foreign
152             // threads
153             info!("starting event loop");
154             while let Some(f) = rx.recv().await {
155                 f(&mut modules);
156             }
157         });
158 
159         info!("RustModuleRunner has stopped, shutting down executor thread");
160 
161         gatt::arbiter::clean_arbiter();
162     }
163 
164     #[allow(dead_code)]
send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)>165     fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> {
166         match self {
167             Self::Running { tx, .. } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)),
168             _ => Err((format!("Bad state {self:?}"), f)),
169         }
170     }
171 }
172 
173 type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>;
174 
175 /// Posts a callback to the Rust thread and gives it access to public Rust
176 /// modules, used from JNI.
177 ///
178 /// Do not call this from Rust modules / the Rust thread! Instead, Rust modules should receive
179 /// references to their dependent modules at startup. If passing callbacks into C++, don't use this
180 /// method either - instead, acquire a clone of RustModule's `tx` when the callback is created. This
181 /// ensures that there never are "invalid" callbacks that may still work depending on when the
182 /// GLOBAL_MODULE_REGISTRY is initialized.
do_in_rust_thread<F>(f: F) where F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,183 pub fn do_in_rust_thread<F>(f: F)
184 where
185     F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,
186 {
187     if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f))
188     {
189         panic!("Rust call failed: {s}");
190     }
191 }
192