• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // You can run this example from the root of the mio repo:
2 // cargo run --example tcp_server --features="os-poll tcp"
3 use mio::event::Event;
4 use mio::net::{TcpListener, TcpStream};
5 use mio::{Events, Interest, Poll, Registry, Token};
6 use std::collections::HashMap;
7 use std::io::{self, Read, Write};
8 use std::str::from_utf8;
9 
10 // Setup some tokens to allow us to identify which event is for which socket.
11 const SERVER: Token = Token(0);
12 
13 // Some data we'll send over the connection.
14 const DATA: &[u8] = b"Hello world!\n";
15 
main() -> io::Result<()>16 fn main() -> io::Result<()> {
17     env_logger::init();
18 
19     // Create a poll instance.
20     let mut poll = Poll::new()?;
21     // Create storage for events.
22     let mut events = Events::with_capacity(128);
23 
24     // Setup the TCP server socket.
25     let addr = "127.0.0.1:9000".parse().unwrap();
26     let mut server = TcpListener::bind(addr)?;
27 
28     // Register the server with poll we can receive events for it.
29     poll.registry()
30         .register(&mut server, SERVER, Interest::READABLE)?;
31 
32     // Map of `Token` -> `TcpStream`.
33     let mut connections = HashMap::new();
34     // Unique token for each incoming connection.
35     let mut unique_token = Token(SERVER.0 + 1);
36 
37     println!("You can connect to the server using `nc`:");
38     println!(" $ nc 127.0.0.1 9000");
39     println!("You'll see our welcome message and anything you type we'll be printed here.");
40 
41     loop {
42         poll.poll(&mut events, None)?;
43 
44         for event in events.iter() {
45             match event.token() {
46                 SERVER => loop {
47                     // Received an event for the TCP server socket, which
48                     // indicates we can accept an connection.
49                     let (mut connection, address) = match server.accept() {
50                         Ok((connection, address)) => (connection, address),
51                         Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
52                             // If we get a `WouldBlock` error we know our
53                             // listener has no more incoming connections queued,
54                             // so we can return to polling and wait for some
55                             // more.
56                             break;
57                         }
58                         Err(e) => {
59                             // If it was any other kind of error, something went
60                             // wrong and we terminate with an error.
61                             return Err(e);
62                         }
63                     };
64 
65                     println!("Accepted connection from: {}", address);
66 
67                     let token = next(&mut unique_token);
68                     poll.registry().register(
69                         &mut connection,
70                         token,
71                         Interest::READABLE.add(Interest::WRITABLE),
72                     )?;
73 
74                     connections.insert(token, connection);
75                 },
76                 token => {
77                     // Maybe received an event for a TCP connection.
78                     let done = if let Some(connection) = connections.get_mut(&token) {
79                         handle_connection_event(poll.registry(), connection, event)?
80                     } else {
81                         // Sporadic events happen, we can safely ignore them.
82                         false
83                     };
84                     if done {
85                         connections.remove(&token);
86                     }
87                 }
88             }
89         }
90     }
91 }
92 
next(current: &mut Token) -> Token93 fn next(current: &mut Token) -> Token {
94     let next = current.0;
95     current.0 += 1;
96     Token(next)
97 }
98 
99 /// Returns `true` if the connection is done.
handle_connection_event( registry: &Registry, connection: &mut TcpStream, event: &Event, ) -> io::Result<bool>100 fn handle_connection_event(
101     registry: &Registry,
102     connection: &mut TcpStream,
103     event: &Event,
104 ) -> io::Result<bool> {
105     if event.is_writable() {
106         // We can (maybe) write to the connection.
107         match connection.write(DATA) {
108             // We want to write the entire `DATA` buffer in a single go. If we
109             // write less we'll return a short write error (same as
110             // `io::Write::write_all` does).
111             Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
112             Ok(_) => {
113                 // After we've written something we'll reregister the connection
114                 // to only respond to readable events.
115                 registry.reregister(connection, event.token(), Interest::READABLE)?
116             }
117             // Would block "errors" are the OS's way of saying that the
118             // connection is not actually ready to perform this I/O operation.
119             Err(ref err) if would_block(err) => {}
120             // Got interrupted (how rude!), we'll try again.
121             Err(ref err) if interrupted(err) => {
122                 return handle_connection_event(registry, connection, event)
123             }
124             // Other errors we'll consider fatal.
125             Err(err) => return Err(err),
126         }
127     }
128 
129     if event.is_readable() {
130         let mut connection_closed = false;
131         let mut received_data = vec![0; 4096];
132         let mut bytes_read = 0;
133         // We can (maybe) read from the connection.
134         loop {
135             match connection.read(&mut received_data[bytes_read..]) {
136                 Ok(0) => {
137                     // Reading 0 bytes means the other side has closed the
138                     // connection or is done writing, then so are we.
139                     connection_closed = true;
140                     break;
141                 }
142                 Ok(n) => {
143                     bytes_read += n;
144                     if bytes_read == received_data.len() {
145                         received_data.resize(received_data.len() + 1024, 0);
146                     }
147                 }
148                 // Would block "errors" are the OS's way of saying that the
149                 // connection is not actually ready to perform this I/O operation.
150                 Err(ref err) if would_block(err) => break,
151                 Err(ref err) if interrupted(err) => continue,
152                 // Other errors we'll consider fatal.
153                 Err(err) => return Err(err),
154             }
155         }
156 
157         if bytes_read != 0 {
158             let received_data = &received_data[..bytes_read];
159             if let Ok(str_buf) = from_utf8(received_data) {
160                 println!("Received data: {}", str_buf.trim_end());
161             } else {
162                 println!("Received (none UTF-8) data: {:?}", received_data);
163             }
164         }
165 
166         if connection_closed {
167             println!("Connection closed");
168             return Ok(true);
169         }
170     }
171 
172     Ok(false)
173 }
174 
would_block(err: &io::Error) -> bool175 fn would_block(err: &io::Error) -> bool {
176     err.kind() == io::ErrorKind::WouldBlock
177 }
178 
interrupted(err: &io::Error) -> bool179 fn interrupted(err: &io::Error) -> bool {
180     err.kind() == io::ErrorKind::Interrupted
181 }
182