1 // Copyright 2023, The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //! NFCC and RF emulator.
16
17 use anyhow::Result;
18 use argh::FromArgs;
19 use log::{error, info, warn};
20 use rustutils::inherited_fd;
21 use std::future::Future;
22 use std::net::{Ipv4Addr, SocketAddrV4};
23 use std::pin::{pin, Pin};
24 use std::task::Context;
25 use std::task::Poll;
26 use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
27 use tokio::net::{TcpListener, UnixListener};
28 use tokio::select;
29 use tokio::sync::mpsc;
30
31 pub mod controller;
32 pub mod crc;
33 pub mod packets;
34
35 use controller::Controller;
36 use packets::{nci, rf};
37
38 const MAX_DEVICES: usize = 128;
39 type Id = u16;
40
41 /// Read RF Control and Data packets received on the RF transport.
42 /// Performs recombination of the segmented packets.
43 pub struct RfReader {
44 socket: Pin<Box<dyn AsyncRead>>,
45 }
46
47 /// Write RF Control and Data packets received to the RF transport.
48 /// Performs segmentation of the packets.
49 pub struct RfWriter {
50 socket: Pin<Box<dyn AsyncWrite>>,
51 }
52
53 impl RfReader {
54 /// Create a new RF reader from an `AsyncRead` implementation.
new(socket: impl AsyncRead + 'static) -> Self55 pub fn new(socket: impl AsyncRead + 'static) -> Self {
56 RfReader { socket: Box::pin(socket) }
57 }
58
59 /// Read a single RF packet from the reader.
60 /// RF packets are framed with the byte size encoded as little-endian u16.
read(&mut self) -> Result<Vec<u8>>61 pub async fn read(&mut self) -> Result<Vec<u8>> {
62 const HEADER_SIZE: usize = 2;
63 let mut header_bytes = [0; HEADER_SIZE];
64
65 // Read the header bytes.
66 self.socket.read_exact(&mut header_bytes[0..HEADER_SIZE]).await?;
67 let packet_length = u16::from_le_bytes(header_bytes) as usize;
68
69 // Read the packet data.
70 let mut packet_bytes = vec![0; packet_length];
71 self.socket.read_exact(&mut packet_bytes).await?;
72
73 Ok(packet_bytes)
74 }
75 }
76
77 impl RfWriter {
78 /// Create a new RF writer from an `AsyncWrite` implementation.
new(socket: impl AsyncWrite + 'static) -> Self79 pub fn new(socket: impl AsyncWrite + 'static) -> Self {
80 RfWriter { socket: Box::pin(socket) }
81 }
82
83 /// Write a single RF packet to the writer.
84 /// RF packets are framed with the byte size encoded as little-endian u16.
write(&mut self, packet: &[u8]) -> Result<()>85 async fn write(&mut self, packet: &[u8]) -> Result<()> {
86 let packet_length: u16 = packet.len().try_into()?;
87 let header_bytes = packet_length.to_le_bytes();
88
89 // Write the header bytes.
90 self.socket.write_all(&header_bytes).await?;
91
92 // Write the packet data.
93 self.socket.write_all(packet).await?;
94
95 Ok(())
96 }
97 }
98
99 /// Represent a generic NFC device interacting on the RF transport.
100 /// Devices communicate together through the RF mpsc channel.
101 /// NFCCs are an instance of Device.
102 pub struct Device {
103 // Unique identifier associated with the device.
104 // The identifier is assured never to be reused in the lifetime of
105 // the emulator.
106 id: u16,
107 // Async task running the controller main loop.
108 task: Pin<Box<dyn Future<Output = Result<()>>>>,
109 // Channel for injecting RF data packets into the controller instance.
110 rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
111 }
112
113 impl Device {
nci( id: Id, nci_rx: impl AsyncRead + 'static, nci_tx: impl AsyncWrite + 'static, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device114 fn nci(
115 id: Id,
116 nci_rx: impl AsyncRead + 'static,
117 nci_tx: impl AsyncWrite + 'static,
118 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
119 ) -> Device {
120 let (rf_tx, rf_rx) = mpsc::unbounded_channel();
121 Device {
122 id,
123 rf_tx,
124 task: Box::pin(async move {
125 Controller::run(
126 id,
127 pin!(nci::Reader::new(nci_rx).into_stream()),
128 nci::Writer::new(nci_tx),
129 rf_rx,
130 controller_rf_tx,
131 )
132 .await
133 }),
134 }
135 }
136
rf( id: Id, socket_rx: impl AsyncRead + 'static, socket_tx: impl AsyncWrite + 'static, controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>, ) -> Device137 fn rf(
138 id: Id,
139 socket_rx: impl AsyncRead + 'static,
140 socket_tx: impl AsyncWrite + 'static,
141 controller_rf_tx: mpsc::UnboundedSender<rf::RfPacket>,
142 ) -> Device {
143 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
144 Device {
145 id,
146 rf_tx,
147 task: Box::pin(async move {
148 let mut rf_reader = RfReader::new(socket_rx);
149 let mut rf_writer = RfWriter::new(socket_tx);
150
151 let result: Result<((), ())> = futures::future::try_join(
152 async {
153 loop {
154 // Replace the sender identifier in the packet
155 // with the assigned number for the RF connection.
156 // TODO: currently the generated API does not allow
157 // modifying the parsed fields so the change needs to be
158 // applied to the unparsed packet.
159 let mut packet_bytes = rf_reader.read().await?;
160 packet_bytes[0..2].copy_from_slice(&id.to_le_bytes());
161
162 // Parse the input packet.
163 let packet = rf::RfPacket::parse(&packet_bytes)?;
164
165 // Forward the packet to other devices.
166 controller_rf_tx.send(packet)?;
167 }
168 },
169 async {
170 loop {
171 // Forward the packet to the socket connection.
172 use pdl_runtime::Packet;
173 let packet = rf_rx
174 .recv()
175 .await
176 .ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
177 rf_writer.write(&packet.encode_to_vec()?).await?;
178 }
179 },
180 )
181 .await;
182
183 result?;
184 Ok(())
185 }),
186 }
187 }
188 }
189
190 struct Scene {
191 next_id: u16,
192 waker: Option<std::task::Waker>,
193 devices: [Option<Device>; MAX_DEVICES],
194 }
195
196 impl Default for Scene {
default() -> Self197 fn default() -> Self {
198 const NONE: Option<Device> = None;
199 Scene { next_id: 0, waker: None, devices: [NONE; MAX_DEVICES] }
200 }
201 }
202
203 impl Scene {
new() -> Scene204 fn new() -> Scene {
205 Default::default()
206 }
207
wake(&mut self)208 fn wake(&mut self) {
209 if let Some(waker) = self.waker.take() {
210 waker.wake()
211 }
212 }
213
add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id>214 fn add_device(&mut self, builder: impl FnOnce(Id) -> Device) -> Result<Id> {
215 for n in 0..MAX_DEVICES {
216 if self.devices[n].is_none() {
217 self.devices[n] = Some(builder(self.next_id));
218 self.next_id += 1;
219 self.wake();
220 return Ok(n as Id);
221 }
222 }
223 Err(anyhow::anyhow!("max number of connections reached"))
224 }
225
disconnect(&mut self, n: usize)226 fn disconnect(&mut self, n: usize) {
227 let id = self.devices[n].as_ref().unwrap().id;
228 self.devices[n] = None;
229 for other_n in 0..MAX_DEVICES {
230 let Some(ref device) = self.devices[other_n] else { continue };
231 assert!(n != other_n);
232 device
233 .rf_tx
234 .send(
235 rf::DeactivateNotificationBuilder {
236 type_: rf::DeactivateType::Discovery,
237 reason: rf::DeactivateReason::RfLinkLoss,
238 sender: id,
239 receiver: device.id,
240 bitrate: rf::BitRate::BitRate106KbitS,
241 power_level: 255,
242 technology: rf::Technology::NfcA,
243 protocol: rf::Protocol::Undetermined,
244 }
245 .into(),
246 )
247 .expect("failed to send deactive notification")
248 }
249 }
250
send(&self, packet: &rf::RfPacket) -> Result<()>251 fn send(&self, packet: &rf::RfPacket) -> Result<()> {
252 for n in 0..MAX_DEVICES {
253 let Some(ref device) = self.devices[n] else { continue };
254 if packet.get_sender() != device.id
255 && (packet.get_receiver() == u16::MAX || packet.get_receiver() == device.id)
256 {
257 device.rf_tx.send(packet.to_owned())?;
258 }
259 }
260
261 Ok(())
262 }
263 }
264
265 impl Future for Scene {
266 type Output = ();
267
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>268 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
269 for n in 0..MAX_DEVICES {
270 let dropped = match self.devices[n] {
271 Some(ref mut device) => match device.task.as_mut().poll(cx) {
272 Poll::Ready(Ok(_)) => unreachable!(),
273 Poll::Ready(Err(err)) => {
274 warn!("dropping device {}: {}", n, err);
275 true
276 }
277 Poll::Pending => false,
278 },
279 None => false,
280 };
281 if dropped {
282 self.disconnect(n)
283 }
284 }
285 self.waker = Some(cx.waker().clone());
286 Poll::Pending
287 }
288 }
289
290 #[derive(FromArgs, Debug)]
291 /// Nfc emulator.
292 struct Opt {
293 #[argh(option)]
294 /// configure the TCP port for the NCI server.
295 nci_port: Option<u16>,
296 #[argh(option)]
297 /// configure a preexisting unix server fd for the NCI server.
298 nci_unix_fd: Option<i32>,
299 #[argh(option)]
300 /// configure the TCP port for the RF server.
301 rf_port: Option<u16>,
302 #[argh(option)]
303 /// configure a preexisting unix server fd for the RF server.
304 rf_unix_fd: Option<i32>,
305 }
306
307 /// Abstraction between different server sources
308 enum Listener {
309 Tcp(TcpListener),
310 #[allow(unused)]
311 Unix(UnixListener),
312 }
313
314 impl Listener {
accept_split( &self, ) -> Result<(Pin<Box<dyn AsyncRead>>, Pin<Box<dyn AsyncWrite>>, String)>315 async fn accept_split(
316 &self,
317 ) -> Result<(Pin<Box<dyn AsyncRead>>, Pin<Box<dyn AsyncWrite>>, String)> {
318 match self {
319 Listener::Tcp(tcp) => {
320 let (socket, addr) = tcp.accept().await?;
321 let (rx, tx) = socket.into_split();
322 Ok((Box::pin(rx), Box::pin(tx), format!("{}", addr)))
323 }
324 Listener::Unix(unix) => {
325 let (socket, addr) = unix.accept().await?;
326 let (rx, tx) = socket.into_split();
327 Ok((Box::pin(rx), Box::pin(tx), format!("{:?}", addr)))
328 }
329 }
330 }
331 }
332
333 #[tokio::main]
run() -> Result<()>334 async fn run() -> Result<()> {
335 env_logger::init_from_env(
336 env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "debug"),
337 );
338
339 let opt: Opt = argh::from_env();
340
341 let nci_listener = match (opt.nci_port, opt.nci_unix_fd) {
342 (None, Some(unix_fd)) => {
343 let owned_fd = inherited_fd::take_fd_ownership(unix_fd)?;
344 let nci_listener = std::os::unix::net::UnixListener::from(owned_fd);
345 nci_listener.set_nonblocking(true)?;
346 let nci_listener = UnixListener::from_std(nci_listener)?;
347 info!("Listening for NCI connections on fd {}", unix_fd);
348 Listener::Unix(nci_listener)
349 }
350 (port, None) => {
351 let port = port.unwrap_or(7000);
352 let nci_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
353 let nci_listener = TcpListener::bind(nci_addr).await?;
354 info!("Listening for NCI connections at address {}", nci_addr);
355 Listener::Tcp(nci_listener)
356 }
357 _ => anyhow::bail!("Specify at most one of `--nci-port` and `--nci-unix-fd`."),
358 };
359
360 let rf_listener = match (opt.rf_port, opt.rf_unix_fd) {
361 (None, Some(unix_fd)) => {
362 let owned_fd = inherited_fd::take_fd_ownership(unix_fd)?;
363 let nci_listener = std::os::unix::net::UnixListener::from(owned_fd);
364 nci_listener.set_nonblocking(true)?;
365 let nci_listener = UnixListener::from_std(nci_listener)?;
366 info!("Listening for RF connections on fd {}", unix_fd);
367 Listener::Unix(nci_listener)
368 }
369 (port, None) => {
370 let port = port.unwrap_or(7001);
371 let rf_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
372 let rf_listener = TcpListener::bind(rf_addr).await?;
373 info!("Listening for RF connections at address {}", rf_addr);
374 Listener::Tcp(rf_listener)
375 }
376 _ => anyhow::bail!("Specify at most one of `--rf-port` and `--rf-unix-fd`"),
377 };
378
379 let (rf_tx, mut rf_rx) = mpsc::unbounded_channel();
380 let mut scene = Scene::new();
381 loop {
382 select! {
383 result = nci_listener.accept_split() => {
384 let (socket_rx, socket_tx, addr) = result?;
385 info!("Incoming NCI connection from {}", addr);
386 match scene.add_device(|id| Device::nci(id, socket_rx, socket_tx, rf_tx.clone())) {
387 Ok(id) => info!("Accepted NCI connection from {} in slot {}", addr, id),
388 Err(err) => error!("Failed to accept NCI connection from {}: {}", addr, err)
389 }
390 },
391 result = rf_listener.accept_split() => {
392 let (socket_rx, socket_tx, addr) = result?;
393 info!("Incoming RF connection from {}", addr);
394 match scene.add_device(|id| Device::rf(id, socket_rx, socket_tx, rf_tx.clone())) {
395 Ok(id) => info!("Accepted RF connection from {} in slot {}", addr, id),
396 Err(err) => error!("Failed to accept RF connection from {}: {}", addr, err)
397 }
398 },
399 _ = &mut scene => (),
400 result = rf_rx.recv() => {
401 let packet = result.ok_or(anyhow::anyhow!("rf_rx channel closed"))?;
402 scene.send(&packet)?
403 }
404 }
405 }
406 }
407
main() -> Result<()>408 fn main() -> Result<()> {
409 // Safety: First function call in the `main` function, before any other library calls
410 unsafe { inherited_fd::init_once()? };
411 run()
412 }
413