1 // You can run this example from the root of the mio repo:
2 // cargo run --example tcp_server --features="os-poll net"
3 use mio::event::Event;
4 use mio::net::{TcpListener, TcpStream};
5 use mio::{Events, Interest, Poll, Registry, Token};
6 use std::collections::HashMap;
7 use std::io::{self, Read, Write};
8 use std::str::from_utf8;
9
10 // Setup some tokens to allow us to identify which event is for which socket.
11 const SERVER: Token = Token(0);
12
13 // Some data we'll send over the connection.
14 const DATA: &[u8] = b"Hello world!\n";
15
16 #[cfg(not(target_os = "wasi"))]
main() -> io::Result<()>17 fn main() -> io::Result<()> {
18 env_logger::init();
19
20 // Create a poll instance.
21 let mut poll = Poll::new()?;
22 // Create storage for events.
23 let mut events = Events::with_capacity(128);
24
25 // Setup the TCP server socket.
26 let addr = "127.0.0.1:9000".parse().unwrap();
27 let mut server = TcpListener::bind(addr)?;
28
29 // Register the server with poll we can receive events for it.
30 poll.registry()
31 .register(&mut server, SERVER, Interest::READABLE)?;
32
33 // Map of `Token` -> `TcpStream`.
34 let mut connections = HashMap::new();
35 // Unique token for each incoming connection.
36 let mut unique_token = Token(SERVER.0 + 1);
37
38 println!("You can connect to the server using `nc`:");
39 println!(" $ nc 127.0.0.1 9000");
40 println!("You'll see our welcome message and anything you type will be printed here.");
41
42 loop {
43 poll.poll(&mut events, None)?;
44
45 for event in events.iter() {
46 match event.token() {
47 SERVER => loop {
48 // Received an event for the TCP server socket, which
49 // indicates we can accept an connection.
50 let (mut connection, address) = match server.accept() {
51 Ok((connection, address)) => (connection, address),
52 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
53 // If we get a `WouldBlock` error we know our
54 // listener has no more incoming connections queued,
55 // so we can return to polling and wait for some
56 // more.
57 break;
58 }
59 Err(e) => {
60 // If it was any other kind of error, something went
61 // wrong and we terminate with an error.
62 return Err(e);
63 }
64 };
65
66 println!("Accepted connection from: {}", address);
67
68 let token = next(&mut unique_token);
69 poll.registry().register(
70 &mut connection,
71 token,
72 Interest::READABLE.add(Interest::WRITABLE),
73 )?;
74
75 connections.insert(token, connection);
76 },
77 token => {
78 // Maybe received an event for a TCP connection.
79 let done = if let Some(connection) = connections.get_mut(&token) {
80 handle_connection_event(poll.registry(), connection, event)?
81 } else {
82 // Sporadic events happen, we can safely ignore them.
83 false
84 };
85 if done {
86 if let Some(mut connection) = connections.remove(&token) {
87 poll.registry().deregister(&mut connection)?;
88 }
89 }
90 }
91 }
92 }
93 }
94 }
95
next(current: &mut Token) -> Token96 fn next(current: &mut Token) -> Token {
97 let next = current.0;
98 current.0 += 1;
99 Token(next)
100 }
101
102 /// Returns `true` if the connection is done.
handle_connection_event( registry: &Registry, connection: &mut TcpStream, event: &Event, ) -> io::Result<bool>103 fn handle_connection_event(
104 registry: &Registry,
105 connection: &mut TcpStream,
106 event: &Event,
107 ) -> io::Result<bool> {
108 if event.is_writable() {
109 // We can (maybe) write to the connection.
110 match connection.write(DATA) {
111 // We want to write the entire `DATA` buffer in a single go. If we
112 // write less we'll return a short write error (same as
113 // `io::Write::write_all` does).
114 Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
115 Ok(_) => {
116 // After we've written something we'll reregister the connection
117 // to only respond to readable events.
118 registry.reregister(connection, event.token(), Interest::READABLE)?
119 }
120 // Would block "errors" are the OS's way of saying that the
121 // connection is not actually ready to perform this I/O operation.
122 Err(ref err) if would_block(err) => {}
123 // Got interrupted (how rude!), we'll try again.
124 Err(ref err) if interrupted(err) => {
125 return handle_connection_event(registry, connection, event)
126 }
127 // Other errors we'll consider fatal.
128 Err(err) => return Err(err),
129 }
130 }
131
132 if event.is_readable() {
133 let mut connection_closed = false;
134 let mut received_data = vec![0; 4096];
135 let mut bytes_read = 0;
136 // We can (maybe) read from the connection.
137 loop {
138 match connection.read(&mut received_data[bytes_read..]) {
139 Ok(0) => {
140 // Reading 0 bytes means the other side has closed the
141 // connection or is done writing, then so are we.
142 connection_closed = true;
143 break;
144 }
145 Ok(n) => {
146 bytes_read += n;
147 if bytes_read == received_data.len() {
148 received_data.resize(received_data.len() + 1024, 0);
149 }
150 }
151 // Would block "errors" are the OS's way of saying that the
152 // connection is not actually ready to perform this I/O operation.
153 Err(ref err) if would_block(err) => break,
154 Err(ref err) if interrupted(err) => continue,
155 // Other errors we'll consider fatal.
156 Err(err) => return Err(err),
157 }
158 }
159
160 if bytes_read != 0 {
161 let received_data = &received_data[..bytes_read];
162 if let Ok(str_buf) = from_utf8(received_data) {
163 println!("Received data: {}", str_buf.trim_end());
164 } else {
165 println!("Received (none UTF-8) data: {:?}", received_data);
166 }
167 }
168
169 if connection_closed {
170 println!("Connection closed");
171 return Ok(true);
172 }
173 }
174
175 Ok(false)
176 }
177
would_block(err: &io::Error) -> bool178 fn would_block(err: &io::Error) -> bool {
179 err.kind() == io::ErrorKind::WouldBlock
180 }
181
interrupted(err: &io::Error) -> bool182 fn interrupted(err: &io::Error) -> bool {
183 err.kind() == io::ErrorKind::Interrupted
184 }
185
186 #[cfg(target_os = "wasi")]
main()187 fn main() {
188 panic!("can't bind to an address with wasi")
189 }
190