1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #![allow(clippy::empty_line_after_doc_comments)]
16
17 use std::collections::HashMap;
18 use std::sync::mpsc::{channel, Sender};
19 use std::sync::{OnceLock, RwLock};
20 use std::thread;
21
22 use crate::captures;
23 use crate::devices::{chip, chip::ChipIdentifier};
24
25 use bytes::Bytes;
26 use log::{error, info, warn};
27 use netsim_proto::hci_packet::hcipacket::PacketType;
28 use protobuf::Enum;
29
30 /// The Packet module routes packets from a chip controller instance to
31 /// different transport managers. Currently transport managers include
32 ///
33 /// - GRPC is a PacketStreamer
34 /// - FD is a file descriptor to a pair of Unix Fifos used by "-s" startup
35 /// - SOCKET is a TCP stream
36
37 // When a connection arrives, the transport registers a responder
38 // implementing Response trait for the packet stream.
39 pub trait Response {
response(&mut self, packet: Bytes, packet_type: u8)40 fn response(&mut self, packet: Bytes, packet_type: u8);
41 }
42
43 // When a responder is registered a responder thread is created to
44 // decouple the chip controller from the network. The thread reads
45 // ResponsePacket from a queue and sends to responder.
46 struct ResponsePacket {
47 packet: Bytes,
48 packet_type: u8,
49 }
50
51 // A hash map from chip_id to response channel.
52
53 struct PacketManager {
54 transports: RwLock<HashMap<ChipIdentifier, Sender<ResponsePacket>>>,
55 }
56
57 static MANAGER: OnceLock<PacketManager> = OnceLock::new();
58
get_manager() -> &'static PacketManager59 fn get_manager() -> &'static PacketManager {
60 MANAGER.get_or_init(PacketManager::new)
61 }
62
63 /// Register a chip controller instance to a transport manager.
register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>)64 pub fn register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>) {
65 get_manager().register_transport(chip_id, responder);
66 }
67
68 /// Unregister a chip controller instance.
unregister_transport(chip_id: ChipIdentifier)69 pub fn unregister_transport(chip_id: ChipIdentifier) {
70 get_manager().unregister_transport(chip_id);
71 }
72
73 impl PacketManager {
new() -> Self74 fn new() -> Self {
75 PacketManager { transports: RwLock::new(HashMap::new()) }
76 }
77 /// Register a transport stream for handle_response calls.
register_transport( &self, chip_id: ChipIdentifier, mut transport: Box<dyn Response + Send>, )78 pub fn register_transport(
79 &self,
80 chip_id: ChipIdentifier,
81 mut transport: Box<dyn Response + Send>,
82 ) {
83 let (tx, rx) = channel::<ResponsePacket>();
84 if self.transports.write().unwrap().insert(chip_id, tx).is_some() {
85 error!("register_transport: key already present for chip_id: {chip_id}");
86 }
87 let _ = thread::Builder::new().name(format!("transport_responder_{chip_id}")).spawn(
88 move || {
89 info!("register_transport: started thread chip_id: {chip_id}");
90 while let Ok(ResponsePacket { packet, packet_type }) = rx.recv() {
91 transport.response(packet, packet_type);
92 }
93 info!("register_transport: finished thread chip_id: {chip_id}");
94 },
95 );
96 }
97
98 /// Unregister a chip controller instance.
unregister_transport(&self, chip_id: ChipIdentifier)99 pub fn unregister_transport(&self, chip_id: ChipIdentifier) {
100 // Shuts down the responder thread, because sender is dropped.
101 self.transports.write().unwrap().remove(&chip_id);
102 }
103 }
104
105 /// Handle requests from gRPC transport in C++.
handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)106 pub fn handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
107 // TODO(b/314840701):
108 // 1. Per EChip Struct should contain private field of channel & facade_id
109 // 2. Lookup from ECHIPS with given chip_id
110 // 3. Call adaptor.handle_response
111 let packet = Bytes::from(packet.as_slice().to_vec());
112 let chip_id = ChipIdentifier(chip_id);
113 captures::controller_to_host(chip_id, &packet, packet_type.into());
114
115 let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
116 transport.send(ResponsePacket { packet, packet_type })
117 } else {
118 warn!("handle_response: chip {chip_id} not found");
119 Ok(())
120 };
121 // transports lock is now released
122 if let Err(e) = result {
123 warn!("handle_response: error {:?}", e);
124 unregister_transport(chip_id);
125 }
126 }
127
128 // Handle response from rust libraries
handle_response(chip_id: ChipIdentifier, packet: &Bytes)129 pub fn handle_response(chip_id: ChipIdentifier, packet: &Bytes) {
130 let packet_type = PacketType::HCI_PACKET_UNSPECIFIED.value() as u8;
131 captures::controller_to_host(chip_id, packet, packet_type.into());
132
133 let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
134 transport.send(ResponsePacket { packet: packet.clone(), packet_type })
135 } else {
136 warn!("handle_response: chip {chip_id} not found");
137 Ok(())
138 };
139 // transports lock is now released
140 if let Err(e) = result {
141 warn!("handle_response: error {:?}", e);
142 unregister_transport(chip_id);
143 }
144 }
145
146 /// Handle requests from transports.
handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8)147 pub fn handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8) {
148 captures::host_to_controller(chip_id, packet, packet_type.into());
149
150 let mut packet_vec = packet.to_vec();
151 // Prepend packet_type to packet if specified
152 if PacketType::HCI_PACKET_UNSPECIFIED.value()
153 != <u8 as std::convert::Into<i32>>::into(packet_type)
154 {
155 packet_vec.insert(0, packet_type);
156 }
157
158 // Perform handle_request
159 match chip::get_chip(&chip_id) {
160 Some(c) => c.wireless_chip.handle_request(&Bytes::from(packet_vec)),
161 None => warn!("SharedWirelessChip doesn't exist for chip_id: {chip_id}"),
162 }
163 }
164
165 /// Handle requests from gRPC transport in C++.
handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)166 pub fn handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
167 let packet_bytes = Bytes::from(packet.as_slice().to_vec());
168 handle_request(ChipIdentifier(chip_id), &packet_bytes, packet_type);
169 }
170
171 #[cfg(test)]
172 mod tests {
173 use super::*;
174
175 struct TestTransport {}
176 impl Response for TestTransport {
response(&mut self, _packet: Bytes, _packet_type: u8)177 fn response(&mut self, _packet: Bytes, _packet_type: u8) {}
178 }
179
180 #[test]
test_register_transport()181 fn test_register_transport() {
182 let val: Box<dyn Response + Send> = Box::new(TestTransport {});
183 let manager = PacketManager::new();
184 let chip_id = ChipIdentifier(0);
185 manager.register_transport(chip_id, val);
186 {
187 assert!(manager.transports.read().unwrap().contains_key(&chip_id));
188 }
189 }
190
191 #[test]
test_unregister_transport()192 fn test_unregister_transport() {
193 let manager = PacketManager::new();
194 let chip_id = ChipIdentifier(1);
195 manager.register_transport(chip_id, Box::new(TestTransport {}));
196 manager.unregister_transport(chip_id);
197 assert!(manager.transports.read().unwrap().get(&chip_id).is_none());
198 }
199 }
200