• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 //! NFCC and RF emulator.
16 
17 use anyhow::Result;
18 use argh::FromArgs;
19 use log::{error, info, warn};
20 use rustutils::inherited_fd;
21 use std::future::Future;
22 use std::net::{Ipv4Addr, SocketAddrV4};
23 use std::pin::{pin, Pin};
24 use std::task::Context;
25 use std::task::Poll;
26 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
27 use tokio::net::{TcpListener, UnixListener};
28 use tokio::select;
29 use tokio::sync::mpsc;
30 
31 pub mod controller;
32 pub mod crc;
33 pub mod packets;
34 
35 use controller::Controller;
36 use packets::{nci, rf};
37 
38 const MAX_DEVICES: usize = 128;
39 type Id = u16;
40 
41 /// Read RF Control and Data packets received on the RF transport.
42 /// Performs recombination of the segmented packets.
43 pub struct RfReader {
44     socket: Pin<Box<dyn AsyncRead>>,
45 }
46 
47 /// Write RF Control and Data packets received to the RF transport.
48 /// Performs segmentation of the packets.
49 pub struct RfWriter {
50     socket: Pin<Box<dyn AsyncWrite>>,
51 }
52 
53 impl RfReader {
54     /// Create a new RF reader from an `AsyncRead` implementation.
new(socket: impl AsyncRead + 'static) -> Self55     pub fn new(socket: impl AsyncRead + 'static) -> Self {
56         RfReader { socket: Box::pin(socket) }
57     }
58 
59     /// Read a single RF packet from the reader.
60     /// RF packets are framed with the byte size encoded as little-endian u16.
read(&mut self) -> Result<Vec<u8>>61     pub async fn read(&mut self) -> Result<Vec<u8>> {
62         const HEADER_SIZE: usize = 2;
63         let mut header_bytes = [0; HEADER_SIZE];
64 
65         // Read the header bytes.
66         self.socket.read_exact(&mut header_bytes[0..HEADER_SIZE]).await?;
67         let packet_length = u16::from_le_bytes(header_bytes) as usize;
68 
69         // Read the packet data.
70         let mut packet_bytes = vec![0; packet_length];
71         self.socket.read_exact(&mut packet_bytes).await?;
72 
73         Ok(packet_bytes)
74     }
75 }
76 
77 impl RfWriter {
78     /// Create a new RF writer from an `AsyncWrite` implementation.
new(socket: impl AsyncWrite + 'static) -> Self79     pub fn new(socket: impl AsyncWrite + 'static) -> Self {
80         RfWriter { socket: Box::pin(socket) }
81     }
82 
83     /// Write a single RF packet to the writer.
84     /// RF packets are framed with the byte size encoded as little-endian u16.
write(&mut self, packet: &[u8]) -> Result<()>85     async fn write(&mut self, packet: &[u8]) -> Result<()> {
86         let packet_length: u16 = packet.len().try_into()?;
87         let header_bytes = packet_length.to_le_bytes();
88 
89         // Write the header bytes.
90         self.socket.write_all(&header_bytes).await?;
91 
92         // Write the packet data.
93         self.socket.write_all(packet).await?;
94 
95         Ok(())
96     }
97 }
98 
99 /// Represent a generic NFC device interacting on the RF transport.
100 /// Devices communicate together through the RF mpsc channel.
101 /// NFCCs are an instance of Device.
102 pub struct Device {
103     // Unique identifier associated with the device.
104     // The identifier is assured never to be reused in the lifetime of
105     // the emulator.
106     id: u16,
107     // Async task running the controller main loop.
108     task: Pin<Box<dyn Future<Output = Result<()>>>>,
109     // Channel for injecting RF data packets into the controller instance.
110     rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
111 }
112 
113 impl Device {
nci( id: Id, nci_rx: impl AsyncRead + 'static, nci_tx: impl AsyncWrite + 'static, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device114     fn nci(
115         id: Id,
116         nci_rx: impl AsyncRead + 'static,
117         nci_tx: impl AsyncWrite + 'static,
118         controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
119     ) -> Device {
120         let (rf_tx, rf_rx) = mpsc::unbounded_channel();
121         Device {
122             id,
123             rf_tx,
124             task: Box::pin(async move {
125                 Controller::run(
126                     id,
127                     pin!(nci::Reader::new(nci_rx).into_stream()),
128                     nci::Writer::new(nci_tx),
129                     rf_rx,
130                     controller_rf_tx,
131                 )
132                 .await
133             }),
134         }
135     }
136 
rf( id: Id, socket_rx: impl AsyncRead + 'static, socket_tx: impl AsyncWrite + 'static, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device137     fn rf(
138         id: Id,
139         socket_rx: impl AsyncRead + 'static,
140         socket_tx: impl AsyncWrite + 'static,
141         controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
142     ) -> Device {
143         let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
144         Device {
145             id,
146             rf_tx,
147             task: Box::pin(async move {
148                 let mut rf_reader = RfReader::new(socket_rx);
149                 let mut rf_writer = RfWriter::new(socket_tx);
150 
151                 let result: Result<((), ())> = futures::future::try_join(
152                     async {
153                         loop {
154                             // Replace the sender identifier in the packet
155                             // with the assigned number for the RF connection.
156                             // TODO: currently the generated API does not allow
157                             // modifying the parsed fields so the change needs to be
158                             // applied to the unparsed packet.
159                             let mut packet_bytes = rf_reader.read().await?;
160                             packet_bytes[0..2].copy_from_slice(&id.to_le_bytes());
161 
162                             // Parse the input packet.
163                             let packet = rf::RfPacket::parse(&packet_bytes)?;
164 
165                             // Forward the packet to other devices.
166                             controller_rf_tx.send(packet)?;
167                         }
168                     },
169                     async {
170                         loop {
171                             // Forward the packet to the socket connection.
172                             use pdl_runtime::Packet;
173                             let packet = rf_rx
174                                 .recv()
175                                 .await
176                                 .ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
177                             rf_writer.write(&packet.encode_to_vec()?).await?;
178                         }
179                     },
180                 )
181                 .await;
182 
183                 result?;
184                 Ok(())
185             }),
186         }
187     }
188 }
189 
190 struct Scene {
191     next_id: u16,
192     waker: Option<std::task::Waker>,
193     devices: [Option<Device>; MAX_DEVICES],
194 }
195 
196 impl Default for Scene {
default() -> Self197     fn default() -> Self {
198         const NONE: Option<Device> = None;
199         Scene { next_id: 0, waker: None, devices: [NONE; MAX_DEVICES] }
200     }
201 }
202 
203 impl Scene {
new() -> Scene204     fn new() -> Scene {
205         Default::default()
206     }
207 
wake(&mut self)208     fn wake(&mut self) {
209         if let Some(waker) = self.waker.take() {
210             waker.wake()
211         }
212     }
213 
add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id>214     fn add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id> {
215         for n in 0..MAX_DEVICES {
216             if self.devices[n].is_none() {
217                 self.devices[n] = Some(builder(self.next_id));
218                 self.next_id += 1;
219                 self.wake();
220                 return Ok(n as Id);
221             }
222         }
223         Err(anyhow::anyhow!("max number of connections reached"))
224     }
225 
disconnect(&mut self, n: usize)226     fn disconnect(&mut self, n: usize) {
227         let id = self.devices[n].as_ref().unwrap().id;
228         self.devices[n] = None;
229         for other_n in 0..MAX_DEVICES {
230             let Some(ref device) = self.devices[other_n] else { continue };
231             assert!(n != other_n);
232             device
233                 .rf_tx
234                 .send(
235                     rf::DeactivateNotificationBuilder {
236                         type_: rf::DeactivateType::Discovery,
237                         reason: rf::DeactivateReason::RfLinkLoss,
238                         sender: id,
239                         receiver: device.id,
240                         bitrate: rf::BitRate::BitRate106KbitS,
241                         power_level: 255,
242                         technology: rf::Technology::NfcA,
243                         protocol: rf::Protocol::Undetermined,
244                     }
245                     .into(),
246                 )
247                 .expect("failed to send deactive notification")
248         }
249     }
250 
send(&self, packet: &rf::RfPacket) -> Result<()>251     fn send(&self, packet: &rf::RfPacket) -> Result<()> {
252         for n in 0..MAX_DEVICES {
253             let Some(ref device) = self.devices[n] else { continue };
254             if packet.get_sender() != device.id
255                 && (packet.get_receiver() == u16::MAX || packet.get_receiver() == device.id)
256             {
257                 device.rf_tx.send(packet.to_owned())?;
258             }
259         }
260 
261         Ok(())
262     }
263 }
264 
265 impl Future for Scene {
266     type Output = ();
267 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>268     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
269         for n in 0..MAX_DEVICES {
270             let dropped = match self.devices[n] {
271                 Some(ref mut device) => match device.task.as_mut().poll(cx) {
272                     Poll::Ready(Ok(_)) => unreachable!(),
273                     Poll::Ready(Err(err)) => {
274                         warn!("dropping device {}: {}", n, err);
275                         true
276                     }
277                     Poll::Pending => false,
278                 },
279                 None => false,
280             };
281             if dropped {
282                 self.disconnect(n)
283             }
284         }
285         self.waker = Some(cx.waker().clone());
286         Poll::Pending
287     }
288 }
289 
290 #[derive(FromArgs, Debug)]
291 /// Nfc emulator.
292 struct Opt {
293     #[argh(option)]
294     /// configure the TCP port for the NCI server.
295     nci_port: Option<u16>,
296     #[argh(option)]
297     /// configure a preexisting unix server fd for the NCI server.
298     nci_unix_fd: Option<i32>,
299     #[argh(option)]
300     /// configure the TCP port for the RF server.
301     rf_port: Option<u16>,
302     #[argh(option)]
303     /// configure a preexisting unix server fd for the RF server.
304     rf_unix_fd: Option<i32>,
305 }
306 
307 /// Abstraction between different server sources
308 enum Listener {
309     Tcp(TcpListener),
310     #[allow(unused)]
311     Unix(UnixListener),
312 }
313 
314 impl Listener {
accept_split( &self, ) -> Result<(Pin<Box<dyn AsyncRead>>, Pin<Box<dyn AsyncWrite>>, String)>315     async fn accept_split(
316         &self,
317     ) -> Result<(Pin<Box<dyn AsyncRead>>, Pin<Box<dyn AsyncWrite>>, String)> {
318         match self {
319             Listener::Tcp(tcp) => {
320                 let (socket, addr) = tcp.accept().await?;
321                 let (rx, tx) = socket.into_split();
322                 Ok((Box::pin(rx), Box::pin(tx), format!("{}", addr)))
323             }
324             Listener::Unix(unix) => {
325                 let (socket, addr) = unix.accept().await?;
326                 let (rx, tx) = socket.into_split();
327                 Ok((Box::pin(rx), Box::pin(tx), format!("{:?}", addr)))
328             }
329         }
330     }
331 }
332 
333 #[tokio::main]
run() -> Result<()>334 async fn run() -> Result<()> {
335     env_logger::init_from_env(
336         env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"),
337     );
338 
339     let opt: Opt = argh::from_env();
340 
341     let nci_listener = match (opt.nci_port, opt.nci_unix_fd) {
342         (None, Some(unix_fd)) => {
343             let owned_fd = inherited_fd::take_fd_ownership(unix_fd)?;
344             let nci_listener = std::os::unix::net::UnixListener::from(owned_fd);
345             nci_listener.set_nonblocking(true)?;
346             let nci_listener = UnixListener::from_std(nci_listener)?;
347             info!("Listening for NCI connections on fd {}", unix_fd);
348             Listener::Unix(nci_listener)
349         }
350         (port, None) => {
351             let port = port.unwrap_or(7000);
352             let nci_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
353             let nci_listener = TcpListener::bind(nci_addr).await?;
354             info!("Listening for NCI connections at address {}", nci_addr);
355             Listener::Tcp(nci_listener)
356         }
357         _ => anyhow::bail!("Specify at most one of `--nci-port` and `--nci-unix-fd`."),
358     };
359 
360     let rf_listener = match (opt.rf_port, opt.rf_unix_fd) {
361         (None, Some(unix_fd)) => {
362             let owned_fd = inherited_fd::take_fd_ownership(unix_fd)?;
363             let nci_listener = std::os::unix::net::UnixListener::from(owned_fd);
364             nci_listener.set_nonblocking(true)?;
365             let nci_listener = UnixListener::from_std(nci_listener)?;
366             info!("Listening for RF connections on fd {}", unix_fd);
367             Listener::Unix(nci_listener)
368         }
369         (port, None) => {
370             let port = port.unwrap_or(7001);
371             let rf_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
372             let rf_listener = TcpListener::bind(rf_addr).await?;
373             info!("Listening for RF connections at address {}", rf_addr);
374             Listener::Tcp(rf_listener)
375         }
376         _ => anyhow::bail!("Specify at most one of `--rf-port` and `--rf-unix-fd`"),
377     };
378 
379     let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
380     let mut scene = Scene::new();
381     loop {
382         select! {
383             result = nci_listener.accept_split() => {
384                 let (socket_rx, socket_tx, addr) = result?;
385                 info!("Incoming NCI connection from {}", addr);
386                 match scene.add_device(|id| Device::nci(id, socket_rx, socket_tx, rf_tx.clone())) {
387                     Ok(id) => info!("Accepted NCI connection from {} in slot {}", addr, id),
388                     Err(err) => error!("Failed to accept NCI connection from {}: {}", addr, err)
389                 }
390             },
391             result = rf_listener.accept_split() => {
392                 let (socket_rx, socket_tx, addr) = result?;
393                 info!("Incoming RF connection from {}", addr);
394                 match scene.add_device(|id| Device::rf(id, socket_rx, socket_tx, rf_tx.clone())) {
395                     Ok(id) => info!("Accepted RF connection from {} in slot {}", addr, id),
396                     Err(err) => error!("Failed to accept RF connection from {}: {}", addr, err)
397                 }
398             },
399             _ = &mut scene => (),
400             result = rf_rx.recv() => {
401                 let packet = result.ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
402                 scene.send(&packet)?
403             }
404         }
405     }
406 }
407 
main() -> Result<()>408 fn main() -> Result<()> {
409     // Safety: First function call in the `main` function, before any other library calls
410     unsafe { inherited_fd::init_once()? };
411     run()
412 }
413