1 // Copyright 2022, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! The core event loop for Rust modules. Here Rust modules are started in
16 //! dependency order.
17
18 use gatt::channel::AttTransport;
19 use gatt::GattCallbacks;
20 use log::{info, warn};
21 use tokio::task::LocalSet;
22
23 use std::rc::Rc;
24 use std::sync::Mutex;
25 use std::thread::JoinHandle;
26 use tokio::runtime::Builder;
27
28 use tokio::sync::mpsc;
29
30 pub mod core;
31 pub mod gatt;
32 pub mod packets;
33 pub mod utils;
34
35 /// The Rust Modules runner. Starts and processes messages from Java / C++
36 /// while the Rust thread is running. Starts in an idle state.
37 #[derive(Default, Debug)]
38 enum RustModuleRunner {
39 #[default]
40 NotRunning,
41 /// Main event loop is running and messages can be processed. Use [`RustModuleRunner::send`] to
42 /// queue a callback to be sent.
43 Running {
44 thread: JoinHandle<()>,
45 tx: mpsc::UnboundedSender<BoxedMainThreadCallback>,
46 },
47 DisabledInTest,
48 }
49
50 /// The ModuleViews lets us access all publicly accessible Rust modules from
51 /// Java / C++ while the stack is running. If a module should not be exposed
52 /// outside of Rust GD, there is no need to include it here.
53 pub struct ModuleViews<'a> {
54 /// Lets us call out into C++
55 pub gatt_outgoing_callbacks: Rc<dyn GattCallbacks>,
56 /// Receives synchronous callbacks from JNI
57 pub gatt_incoming_callbacks: Rc<gatt::callbacks::CallbackTransactionManager>,
58 /// Proxies calls into GATT server
59 pub gatt_module: &'a mut gatt::server::GattModule,
60 }
61
62 static GLOBAL_MODULE_RUNNER: Mutex<RustModuleRunner> = Mutex::new(RustModuleRunner::new());
63
64 impl RustModuleRunner {
new() -> Self65 const fn new() -> Self {
66 Self::NotRunning
67 }
68
69 /// Handles bringup of all Rust modules. This occurs after GD C++ modules
70 /// have started, but before the legacy stack has initialized.
71 /// Must be invoked from the Rust thread after JNI initializes it and passes
72 /// in JNI modules.
start( gatt_callbacks: impl GattCallbacks + Send + 'static, att_transport: impl AttTransport + Send + 'static, on_started: impl FnOnce() + Send + 'static, )73 pub fn start(
74 gatt_callbacks: impl GattCallbacks + Send + 'static,
75 att_transport: impl AttTransport + Send + 'static,
76 on_started: impl FnOnce() + Send + 'static,
77 ) {
78 let mut runner = GLOBAL_MODULE_RUNNER.lock().unwrap();
79
80 if let Self::Running { .. } = &*runner {
81 panic!("Already running");
82 }
83
84 let (tx, rx) = mpsc::unbounded_channel();
85 let thread = std::thread::spawn(move || {
86 RustModuleRunner::run(Rc::new(gatt_callbacks), Rc::new(att_transport), on_started, rx)
87 });
88
89 *runner = Self::Running { thread, tx };
90 }
91
92 /// Externally stop the global runner.
stop()93 pub fn stop() {
94 match std::mem::replace(&mut *GLOBAL_MODULE_RUNNER.lock().unwrap(), Self::NotRunning) {
95 Self::NotRunning => warn!("Already not running"),
96 Self::Running { thread, tx } => {
97 // Dropping the send end of the channel should cause the runner to stop.
98 std::mem::drop(tx);
99
100 // Wait for the thread to terminate.
101 let _ = thread.join();
102 }
103 Self::DisabledInTest => {}
104 }
105 }
106
set_disabled_in_test()107 pub fn set_disabled_in_test() {
108 let mut runner = GLOBAL_MODULE_RUNNER.lock().unwrap();
109 match &*runner {
110 RustModuleRunner::NotRunning => *runner = Self::DisabledInTest,
111 _ => warn!("Unexpected state {:?}", &*runner),
112 }
113 }
114
run( gatt_callbacks: Rc<dyn GattCallbacks>, att_transport: Rc<dyn AttTransport>, on_started: impl FnOnce(), mut rx: mpsc::UnboundedReceiver<BoxedMainThreadCallback>, )115 fn run(
116 gatt_callbacks: Rc<dyn GattCallbacks>,
117 att_transport: Rc<dyn AttTransport>,
118 on_started: impl FnOnce(),
119 mut rx: mpsc::UnboundedReceiver<BoxedMainThreadCallback>,
120 ) {
121 info!("starting Rust modules");
122
123 let rt = Builder::new_current_thread()
124 .enable_all()
125 .build()
126 .expect("failed to start tokio runtime");
127 let local = LocalSet::new();
128
129 // Setup FFI and C++ modules
130 let arbiter = gatt::arbiter::initialize_arbiter();
131
132 // Now enter the runtime
133 local.block_on(&rt, async move {
134 // Then follow the pure-Rust modules
135 let gatt_incoming_callbacks =
136 Rc::new(gatt::callbacks::CallbackTransactionManager::new(gatt_callbacks.clone()));
137 let gatt_module = &mut gatt::server::GattModule::new(att_transport.clone(), arbiter);
138
139 // All modules that are visible from incoming JNI / top-level interfaces should
140 // be exposed here
141 let mut modules = ModuleViews {
142 gatt_outgoing_callbacks: gatt_callbacks,
143 gatt_incoming_callbacks,
144 gatt_module,
145 };
146
147 // notify upper layer that we are ready to receive messages
148 on_started();
149
150 // This is the core event loop that serializes incoming requests into the Rust
151 // thread do_in_rust_thread lets us post into here from foreign
152 // threads
153 info!("starting event loop");
154 while let Some(f) = rx.recv().await {
155 f(&mut modules);
156 }
157 });
158
159 info!("RustModuleRunner has stopped, shutting down executor thread");
160
161 gatt::arbiter::clean_arbiter();
162 }
163
164 #[allow(dead_code)]
send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)>165 fn send(&self, f: BoxedMainThreadCallback) -> Result<(), (String, BoxedMainThreadCallback)> {
166 match self {
167 Self::Running { tx, .. } => tx.send(f).map_err(|e| ("Failed to send".to_string(), e.0)),
168 _ => Err((format!("Bad state {self:?}"), f)),
169 }
170 }
171 }
172
173 type BoxedMainThreadCallback = Box<dyn for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static>;
174
175 /// Posts a callback to the Rust thread and gives it access to public Rust
176 /// modules, used from JNI.
177 ///
178 /// Do not call this from Rust modules / the Rust thread! Instead, Rust modules should receive
179 /// references to their dependent modules at startup. If passing callbacks into C++, don't use this
180 /// method either - instead, acquire a clone of RustModule's `tx` when the callback is created. This
181 /// ensures that there never are "invalid" callbacks that may still work depending on when the
182 /// GLOBAL_MODULE_REGISTRY is initialized.
do_in_rust_thread<F>(f: F) where F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,183 pub fn do_in_rust_thread<F>(f: F)
184 where
185 F: for<'a> FnOnce(&'a mut ModuleViews) + Send + 'static,
186 {
187 if let Err((s, _f)) = GLOBAL_MODULE_RUNNER.lock().expect("lock not poisoned").send(Box::new(f))
188 {
189 panic!("Rust call failed: {s}");
190 }
191 }
192