1# Refactor I/O driver 2 3Describes changes to the I/O driver for the Tokio 0.3 release. 4 5## Goals 6 7* Support `async fn` on I/O types with `&self`. 8* Refine the `Registration` API. 9 10### Non-goals 11 12* Implement `AsyncRead` / `AsyncWrite` for `&TcpStream` or other reference type. 13 14## Overview 15 16Currently, I/O types require `&mut self` for `async` functions. The reason for 17this is the task's waker is stored in the I/O resource's internal state 18(`ScheduledIo`) instead of in the future returned by the `async` function. 19Because of this limitation, I/O types limit the number of wakers to one per 20direction (a direction is either read-related events or write-related events). 21 22Moving the waker from the internal I/O resource's state to the operation's 23future enables multiple wakers to be registered per operation. The "intrusive 24wake list" strategy used by `Notify` applies to this case, though there are some 25concerns unique to the I/O driver. 26 27## Reworking the `Registration` type 28 29While `Registration` is made private (per #2728), it remains in Tokio as an 30implementation detail backing I/O resources such as `TcpStream`. The API of 31`Registration` is updated to support waiting for an arbitrary interest set with 32`&self`. This supports concurrent waiters with a different readiness interest. 33 34```rust 35struct Registration { ... } 36 37// TODO: naming 38struct ReadyEvent { 39 tick: u32, 40 ready: mio::Ready, 41} 42 43impl Registration { 44 /// `interest` must be a super set of **all** interest sets specified in 45 /// the other methods. This is the interest set passed to `mio`. 46 pub fn new<T>(io: &T, interest: mio::Ready) -> io::Result<Registration> 47 where T: mio::Evented; 48 49 /// Awaits for any readiness event included in `interest`. Returns a 50 /// `ReadyEvent` representing the received readiness event. 51 async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent>; 52 53 /// Clears resource level readiness represented by the specified `ReadyEvent` 54 async fn clear_readiness(&self, ready_event: ReadyEvent); 55``` 56 57A new registration is created for a `T: mio::Evented` and a `interest`. This 58creates a `ScheduledIo` entry with the I/O driver and registers the resource 59with `mio`. 60 61Because Tokio uses **edge-triggered** notifications, the I/O driver only 62receives readiness from the OS once the ready state **changes**. The I/O driver 63must track each resource's known readiness state. This helps prevent syscalls 64when the process knows the syscall should return with `EWOULDBLOCK`. 65 66A call to `readiness()` checks if the currently known resource readiness 67overlaps with `interest`. If it does, then the `readiness()` immediately 68returns. If it does not, then the task waits until the I/O driver receives a 69readiness event. 70 71The pseudocode to perform a TCP read is as follows. 72 73```rust 74async fn read(&self, buf: &mut [u8]) -> io::Result<usize> { 75 loop { 76 // Await readiness 77 let event = self.readiness(interest).await?; 78 79 match self.mio_socket.read(buf) { 80 Ok(v) => return Ok(v), 81 Err(ref e) if e.kind() == WouldBlock => { 82 self.clear_readiness(event); 83 } 84 Err(e) => return Err(e), 85 } 86 } 87} 88``` 89 90## Reworking the `ScheduledIo` type 91 92The `ScheduledIo` type is switched to use an intrusive waker linked list. Each 93entry in the linked list includes the `interest` set passed to `readiness()`. 94 95```rust 96#[derive(Debug)] 97pub(crate) struct ScheduledIo { 98 /// Resource's known state packed with other state that must be 99 /// atomically updated. 100 readiness: AtomicUsize, 101 102 /// Tracks tasks waiting on the resource 103 waiters: Mutex<Waiters>, 104} 105 106#[derive(Debug)] 107struct Waiters { 108 // List of intrusive waiters. 109 list: LinkedList<Waiter>, 110 111 /// Waiter used by `AsyncRead` implementations. 112 reader: Option<Waker>, 113 114 /// Waiter used by `AsyncWrite` implementations. 115 writer: Option<Waker>, 116} 117 118// This struct is contained by the **future** returned by `readiness()`. 119#[derive(Debug)] 120struct Waiter { 121 /// Intrusive linked-list pointers 122 pointers: linked_list::Pointers<Waiter>, 123 124 /// Waker for task waiting on I/O resource 125 waiter: Option<Waker>, 126 127 /// Readiness events being waited on. This is 128 /// the value passed to `readiness()` 129 interest: mio::Ready, 130 131 /// Should not be `Unpin`. 132 _p: PhantomPinned, 133} 134``` 135 136When an I/O event is received from `mio`, the associated resources' readiness is 137updated and the waiter list is iterated. All waiters with `interest` that 138overlap the received readiness event are notified. Any waiter with an `interest` 139that does not overlap the readiness event remains in the list. 140 141## Cancel interest on drop 142 143The future returned by `readiness()` uses an intrusive linked list to store the 144waker with `ScheduledIo`. Because `readiness()` can be called concurrently, many 145wakers may be stored simultaneously in the list. If the `readiness()` future is 146dropped early, it is essential that the waker is removed from the list. This 147prevents leaking memory. 148 149## Race condition 150 151Consider how many tasks may concurrently attempt I/O operations. This, combined 152with how Tokio uses edge-triggered events, can result in a race condition. Let's 153revisit the TCP read function: 154 155```rust 156async fn read(&self, buf: &mut [u8]) -> io::Result<usize> { 157 loop { 158 // Await readiness 159 let event = self.readiness(interest).await?; 160 161 match self.mio_socket.read(buf) { 162 Ok(v) => return Ok(v), 163 Err(ref e) if e.kind() == WouldBlock => { 164 self.clear_readiness(event); 165 } 166 Err(e) => return Err(e), 167 } 168 } 169} 170``` 171 172If care is not taken, if between `mio_socket.read(buf)` returning and 173`clear_readiness(event)` is called, a readiness event arrives, the `read()` 174function could deadlock. This happens because the readiness event is received, 175`clear_readiness()` unsets the readiness event, and on the next iteration, 176`readiness().await` will block forever as a new readiness event is not received. 177 178The current I/O driver handles this condition by always registering the task's 179waker before performing the operation. This is not ideal as it will result in 180unnecessary task notification. 181 182Instead, we will use a strategy to prevent clearing readiness if an "unseen" 183readiness event has been received. The I/O driver will maintain a "tick" value. 184Every time the `mio` `poll()` function is called, the tick is incremented. Each 185readiness event has an associated tick. When the I/O driver sets the resource's 186readiness, the driver's tick is packed into the atomic `usize`. 187 188The `ScheduledIo` readiness `AtomicUsize` is structured as: 189 190``` 191| shutdown | generation | driver tick | readiness | 192|----------+------------+--------------+-----------| 193| 1 bit | 7 bits + 8 bits + 16 bits | 194``` 195 196The `shutdown` and `generation` components exist today. 197 198The `readiness()` function returns a `ReadyEvent` value. This value includes the 199`tick` component read with the resource's readiness value. When 200`clear_readiness()` is called, the `ReadyEvent` is provided. Readiness is only 201cleared if the current `tick` matches the `tick` included in the `ReadyEvent`. 202If the tick values do not match, the call to `readiness()` on the next iteration 203will not block and the new `tick` is included in the new `ReadyToken.` 204 205TODO 206 207## Implementing `AsyncRead` / `AsyncWrite` 208 209The `AsyncRead` and `AsyncWrite` traits use a "poll" based API. This means that 210it is not possible to use an intrusive linked list to track the waker. 211Additionally, there is no future associated with the operation which means it is 212not possible to cancel interest in the readiness events. 213 214To implement `AsyncRead` and `AsyncWrite`, `ScheduledIo` includes dedicated 215waker values for the read direction and the write direction. These values are 216used to store the waker. Specific `interest` is not tracked for `AsyncRead` and 217`AsyncWrite` implementations. It is assumed that only events of interest are: 218 219* Read ready 220* Read closed 221* Write ready 222* Write closed 223 224Note that "read closed" and "write closed" are only available with Mio 0.7. With 225Mio 0.6, things were a bit messy. 226 227It is only possible to implement `AsyncRead` and `AsyncWrite` for resource types 228themselves and not for `&Resource`. Implementing the traits for `&Resource` 229would permit concurrent operations to the resource. Because only a single waker 230is stored per direction, any concurrent usage would result in deadlocks. An 231alternate implementation would call for a `Vec<Waker>` but this would result in 232memory leaks. 233 234## Enabling reads and writes for `&TcpStream` 235 236Instead of implementing `AsyncRead` and `AsyncWrite` for `&TcpStream`, a new 237function is added to `TcpStream`. 238 239```rust 240impl TcpStream { 241 /// Naming TBD 242 fn by_ref(&self) -> TcpStreamRef<'_>; 243} 244 245struct TcpStreamRef<'a> { 246 stream: &'a TcpStream, 247 248 // `Waiter` is the node in the intrusive waiter linked-list 249 read_waiter: Waiter, 250 write_waiter: Waiter, 251} 252``` 253 254Now, `AsyncRead` and `AsyncWrite` can be implemented on `TcpStreamRef<'a>`. When 255the `TcpStreamRef` is dropped, all associated waker resources are cleaned up. 256 257### Removing all the `split()` functions 258 259With `TcpStream::by_ref()`, `TcpStream::split()` is no longer needed. Instead, 260it is possible to do something as follows. 261 262```rust 263let rd = my_stream.by_ref(); 264let wr = my_stream.by_ref(); 265 266select! { 267 // use `rd` and `wr` in separate branches. 268} 269``` 270 271It is also possible to store a `TcpStream` in an `Arc`. 272 273```rust 274let arc_stream = Arc::new(my_tcp_stream); 275let n = arc_stream.by_ref().read(buf).await?; 276``` 277