• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #![allow(clippy::empty_line_after_doc_comments)]
16 
17 use std::collections::HashMap;
18 use std::sync::mpsc::{channel, Sender};
19 use std::sync::{OnceLock, RwLock};
20 use std::thread;
21 
22 use crate::captures;
23 use crate::devices::{chip, chip::ChipIdentifier};
24 
25 use bytes::Bytes;
26 use log::{error, info, warn};
27 use netsim_proto::hci_packet::hcipacket::PacketType;
28 use protobuf::Enum;
29 
30 /// The Packet module routes packets from a chip controller instance to
31 /// different transport managers. Currently transport managers include
32 ///
33 /// - GRPC is a PacketStreamer
34 /// - FD is a file descriptor to a pair of Unix Fifos used by "-s" startup
35 /// - SOCKET is a TCP stream
36 
37 // When a connection arrives, the transport registers a responder
38 // implementing Response trait for the packet stream.
39 pub trait Response {
response(&mut self, packet: Bytes, packet_type: u8)40     fn response(&mut self, packet: Bytes, packet_type: u8);
41 }
42 
43 // When a responder is registered a responder thread is created to
44 // decouple the chip controller from the network. The thread reads
45 // ResponsePacket from a queue and sends to responder.
46 struct ResponsePacket {
47     packet: Bytes,
48     packet_type: u8,
49 }
50 
51 // A hash map from chip_id to response channel.
52 
53 struct PacketManager {
54     transports: RwLock<HashMap<ChipIdentifier, Sender<ResponsePacket>>>,
55 }
56 
57 static MANAGER: OnceLock<PacketManager> = OnceLock::new();
58 
get_manager() -> &'static PacketManager59 fn get_manager() -> &'static PacketManager {
60     MANAGER.get_or_init(PacketManager::new)
61 }
62 
63 /// Register a chip controller instance to a transport manager.
register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>)64 pub fn register_transport(chip_id: ChipIdentifier, responder: Box<dyn Response + Send>) {
65     get_manager().register_transport(chip_id, responder);
66 }
67 
68 /// Unregister a chip controller instance.
unregister_transport(chip_id: ChipIdentifier)69 pub fn unregister_transport(chip_id: ChipIdentifier) {
70     get_manager().unregister_transport(chip_id);
71 }
72 
73 impl PacketManager {
new() -> Self74     fn new() -> Self {
75         PacketManager { transports: RwLock::new(HashMap::new()) }
76     }
77     /// Register a transport stream for handle_response calls.
register_transport( &self, chip_id: ChipIdentifier, mut transport: Box<dyn Response + Send>, )78     pub fn register_transport(
79         &self,
80         chip_id: ChipIdentifier,
81         mut transport: Box<dyn Response + Send>,
82     ) {
83         let (tx, rx) = channel::<ResponsePacket>();
84         if self.transports.write().unwrap().insert(chip_id, tx).is_some() {
85             error!("register_transport: key already present for chip_id: {chip_id}");
86         }
87         let _ = thread::Builder::new().name(format!("transport_responder_{chip_id}")).spawn(
88             move || {
89                 info!("register_transport: started thread chip_id: {chip_id}");
90                 while let Ok(ResponsePacket { packet, packet_type }) = rx.recv() {
91                     transport.response(packet, packet_type);
92                 }
93                 info!("register_transport: finished thread chip_id: {chip_id}");
94             },
95         );
96     }
97 
98     /// Unregister a chip controller instance.
unregister_transport(&self, chip_id: ChipIdentifier)99     pub fn unregister_transport(&self, chip_id: ChipIdentifier) {
100         // Shuts down the responder thread, because sender is dropped.
101         self.transports.write().unwrap().remove(&chip_id);
102     }
103 }
104 
105 /// Handle requests from gRPC transport in C++.
handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)106 pub fn handle_response_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
107     // TODO(b/314840701):
108     // 1. Per EChip Struct should contain private field of channel & facade_id
109     // 2. Lookup from ECHIPS with given chip_id
110     // 3. Call adaptor.handle_response
111     let packet = Bytes::from(packet.as_slice().to_vec());
112     let chip_id = ChipIdentifier(chip_id);
113     captures::controller_to_host(chip_id, &packet, packet_type.into());
114 
115     let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
116         transport.send(ResponsePacket { packet, packet_type })
117     } else {
118         warn!("handle_response: chip {chip_id} not found");
119         Ok(())
120     };
121     // transports lock is now released
122     if let Err(e) = result {
123         warn!("handle_response: error {:?}", e);
124         unregister_transport(chip_id);
125     }
126 }
127 
128 // Handle response from rust libraries
handle_response(chip_id: ChipIdentifier, packet: &Bytes)129 pub fn handle_response(chip_id: ChipIdentifier, packet: &Bytes) {
130     let packet_type = PacketType::HCI_PACKET_UNSPECIFIED.value() as u8;
131     captures::controller_to_host(chip_id, packet, packet_type.into());
132 
133     let result = if let Some(transport) = get_manager().transports.read().unwrap().get(&chip_id) {
134         transport.send(ResponsePacket { packet: packet.clone(), packet_type })
135     } else {
136         warn!("handle_response: chip {chip_id} not found");
137         Ok(())
138     };
139     // transports lock is now released
140     if let Err(e) = result {
141         warn!("handle_response: error {:?}", e);
142         unregister_transport(chip_id);
143     }
144 }
145 
146 /// Handle requests from transports.
handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8)147 pub fn handle_request(chip_id: ChipIdentifier, packet: &Bytes, packet_type: u8) {
148     captures::host_to_controller(chip_id, packet, packet_type.into());
149 
150     let mut packet_vec = packet.to_vec();
151     // Prepend packet_type to packet if specified
152     if PacketType::HCI_PACKET_UNSPECIFIED.value()
153         != <u8 as std::convert::Into<i32>>::into(packet_type)
154     {
155         packet_vec.insert(0, packet_type);
156     }
157 
158     // Perform handle_request
159     match chip::get_chip(&chip_id) {
160         Some(c) => c.wireless_chip.handle_request(&Bytes::from(packet_vec)),
161         None => warn!("SharedWirelessChip doesn't exist for chip_id: {chip_id}"),
162     }
163 }
164 
165 /// Handle requests from gRPC transport in C++.
handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8)166 pub fn handle_request_cxx(chip_id: u32, packet: &cxx::CxxVector<u8>, packet_type: u8) {
167     let packet_bytes = Bytes::from(packet.as_slice().to_vec());
168     handle_request(ChipIdentifier(chip_id), &packet_bytes, packet_type);
169 }
170 
171 #[cfg(test)]
172 mod tests {
173     use super::*;
174 
175     struct TestTransport {}
176     impl Response for TestTransport {
response(&mut self, _packet: Bytes, _packet_type: u8)177         fn response(&mut self, _packet: Bytes, _packet_type: u8) {}
178     }
179 
180     #[test]
test_register_transport()181     fn test_register_transport() {
182         let val: Box<dyn Response + Send> = Box::new(TestTransport {});
183         let manager = PacketManager::new();
184         let chip_id = ChipIdentifier(0);
185         manager.register_transport(chip_id, val);
186         {
187             assert!(manager.transports.read().unwrap().contains_key(&chip_id));
188         }
189     }
190 
191     #[test]
test_unregister_transport()192     fn test_unregister_transport() {
193         let manager = PacketManager::new();
194         let chip_id = ChipIdentifier(1);
195         manager.register_transport(chip_id, Box::new(TestTransport {}));
196         manager.unregister_transport(chip_id);
197         assert!(manager.transports.read().unwrap().get(&chip_id).is_none());
198     }
199 }
200