1 extern crate slab; |
1 extern crate slab; |
2 |
2 |
3 use std::io::ErrorKind; |
3 use std::{ |
4 use mio::net::*; |
4 io, io::{Error, ErrorKind, Write}, |
5 use super::server::{HWServer, PendingMessage, Destination}; |
5 net::{SocketAddr, IpAddr, Ipv4Addr}, |
6 use super::client::ClientId; |
6 collections::VecDeque |
|
7 }; |
|
8 |
|
9 use mio::{ |
|
10 net::{TcpStream, TcpListener}, |
|
11 Poll, PollOpt, Ready, Token |
|
12 }; |
|
13 use netbuf; |
7 use slab::Slab; |
14 use slab::Slab; |
8 |
15 |
9 use mio::net::TcpStream; |
|
10 use mio::*; |
|
11 use std::io::Write; |
|
12 use std::io; |
|
13 use netbuf; |
|
14 |
|
15 use utils; |
16 use utils; |
16 use protocol::ProtocolDecoder; |
17 use protocol::{ProtocolDecoder, messages::*}; |
17 use protocol::messages::*; |
18 use super::{ |
18 use std::net::SocketAddr; |
19 server::{HWServer, PendingMessage, Destination}, |
|
20 client::ClientId |
|
21 }; |
|
22 |
|
23 const MAX_BYTES_PER_READ: usize = 2048; |
|
24 |
|
25 #[derive(PartialEq, Copy, Clone)] |
|
26 pub enum NetworkClientState { |
|
27 Idle, |
|
28 NeedsWrite, |
|
29 NeedsRead, |
|
30 Closed, |
|
31 } |
|
32 |
|
33 type NetworkResult<T> = io::Result<(T, NetworkClientState)>; |
19 |
34 |
20 pub struct NetworkClient { |
35 pub struct NetworkClient { |
21 id: ClientId, |
36 id: ClientId, |
22 socket: TcpStream, |
37 socket: TcpStream, |
23 peer_addr: SocketAddr, |
38 peer_addr: SocketAddr, |
24 decoder: ProtocolDecoder, |
39 decoder: ProtocolDecoder, |
25 buf_out: netbuf::Buf, |
40 buf_out: netbuf::Buf |
26 closed: bool |
|
27 } |
41 } |
28 |
42 |
29 impl NetworkClient { |
43 impl NetworkClient { |
30 pub fn new(id: ClientId, socket: TcpStream, peer_addr: SocketAddr) -> NetworkClient { |
44 pub fn new(id: ClientId, socket: TcpStream, peer_addr: SocketAddr) -> NetworkClient { |
31 NetworkClient { |
45 NetworkClient { |
32 id, socket, peer_addr, |
46 id, socket, peer_addr, |
33 decoder: ProtocolDecoder::new(), |
47 decoder: ProtocolDecoder::new(), |
34 buf_out: netbuf::Buf::new(), |
48 buf_out: netbuf::Buf::new() |
35 closed: false |
49 } |
36 } |
50 } |
|
51 |
|
52 pub fn read_messages(&mut self) -> NetworkResult<Vec<HWProtocolMessage>> { |
|
53 let mut bytes_read = 0; |
|
54 let result = loop { |
|
55 match self.decoder.read_from(&mut self.socket) { |
|
56 Ok(bytes) => { |
|
57 debug!("Read {} bytes", bytes); |
|
58 bytes_read += bytes; |
|
59 if bytes == 0 { |
|
60 let result = if bytes_read == 0 { |
|
61 info!("EOF for client {} ({})", self.id, self.peer_addr); |
|
62 (Vec::new(), NetworkClientState::Closed) |
|
63 } else { |
|
64 (self.decoder.extract_messages(), NetworkClientState::NeedsRead) |
|
65 }; |
|
66 break Ok(result); |
|
67 } |
|
68 else if bytes_read >= MAX_BYTES_PER_READ { |
|
69 break Ok((self.decoder.extract_messages(), NetworkClientState::NeedsRead)) |
|
70 } |
|
71 } |
|
72 Err(ref error) if error.kind() == ErrorKind::WouldBlock => { |
|
73 let messages = if bytes_read == 0 { |
|
74 Vec::new() |
|
75 } else { |
|
76 self.decoder.extract_messages() |
|
77 }; |
|
78 break Ok((messages, NetworkClientState::Idle)); |
|
79 } |
|
80 Err(error) => |
|
81 break Err(error) |
|
82 } |
|
83 }; |
|
84 self.decoder.sweep(); |
|
85 result |
|
86 } |
|
87 |
|
88 pub fn flush(&mut self) -> NetworkResult<()> { |
|
89 let result = loop { |
|
90 match self.buf_out.write_to(&mut self.socket) { |
|
91 Ok(bytes) if self.buf_out.is_empty() || bytes == 0 => |
|
92 break Ok(((), NetworkClientState::Idle)), |
|
93 Ok(bytes) => |
|
94 (), |
|
95 Err(ref error) if error.kind() == ErrorKind::Interrupted |
|
96 || error.kind() == ErrorKind::WouldBlock => { |
|
97 break Ok(((), NetworkClientState::NeedsWrite)); |
|
98 }, |
|
99 Err(error) => |
|
100 break Err(error) |
|
101 } |
|
102 }; |
|
103 self.socket.flush()?; |
|
104 result |
37 } |
105 } |
38 |
106 |
39 pub fn send_raw_msg(&mut self, msg: &[u8]) { |
107 pub fn send_raw_msg(&mut self, msg: &[u8]) { |
40 self.buf_out.write(msg).unwrap(); |
108 self.buf_out.write(msg).unwrap(); |
41 self.flush(); |
|
42 } |
109 } |
43 |
110 |
44 pub fn send_string(&mut self, msg: &String) { |
111 pub fn send_string(&mut self, msg: &String) { |
45 self.send_raw_msg(&msg.as_bytes()); |
112 self.send_raw_msg(&msg.as_bytes()); |
46 } |
113 } |
47 |
114 |
48 pub fn send_msg(&mut self, msg: HWServerMessage) { |
115 pub fn send_msg(&mut self, msg: HWServerMessage) { |
49 self.send_string(&msg.to_raw_protocol()); |
116 self.send_string(&msg.to_raw_protocol()); |
50 } |
|
51 |
|
52 fn flush(&mut self) { |
|
53 self.buf_out.write_to(&mut self.socket).unwrap(); |
|
54 self.socket.flush().unwrap(); |
|
55 } |
|
56 |
|
57 pub fn read_messages(&mut self) -> io::Result<Vec<HWProtocolMessage>> { |
|
58 let bytes_read = self.decoder.read_from(&mut self.socket)?; |
|
59 debug!("Read {} bytes", bytes_read); |
|
60 |
|
61 if bytes_read == 0 { |
|
62 self.closed = true; |
|
63 info!("EOF for client {} ({})", self.id, self.peer_addr); |
|
64 } |
|
65 |
|
66 Ok(self.decoder.extract_messages()) |
|
67 } |
|
68 |
|
69 pub fn write_messages(&mut self) -> io::Result<()> { |
|
70 self.buf_out.write_to(&mut self.socket)?; |
|
71 Ok(()) |
|
72 } |
117 } |
73 } |
118 } |
74 |
119 |
75 pub struct NetworkLayer { |
120 pub struct NetworkLayer { |
76 listener: TcpListener, |
121 listener: TcpListener, |
77 server: HWServer, |
122 server: HWServer, |
78 |
123 |
79 clients: Slab<NetworkClient> |
124 clients: Slab<NetworkClient>, |
|
125 pending: VecDeque<(ClientId, NetworkClientState)> |
80 } |
126 } |
81 |
127 |
82 impl NetworkLayer { |
128 impl NetworkLayer { |
83 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
129 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
84 let server = HWServer::new(clients_limit, rooms_limit); |
130 let server = HWServer::new(clients_limit, rooms_limit); |
85 let clients = Slab::with_capacity(clients_limit); |
131 let clients = Slab::with_capacity(clients_limit); |
86 NetworkLayer {listener, server, clients} |
132 let pending = VecDeque::with_capacity(clients_limit); |
|
133 NetworkLayer {listener, server, clients, pending} |
87 } |
134 } |
88 |
135 |
89 pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
136 pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
90 poll.register(&self.listener, utils::SERVER, Ready::readable(), |
137 poll.register(&self.listener, utils::SERVER, Ready::readable(), |
91 PollOpt::edge()) |
138 PollOpt::edge()) |
92 } |
139 } |
93 |
140 |
94 fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
141 fn deregister_client(&mut self, poll: &Poll, id: ClientId) { |
95 let mut client_exists = false; |
142 let mut client_exists = false; |
96 if let Some(ref client) = self.clients.get_mut(id) { |
143 if let Some(ref client) = self.clients.get(id) { |
97 poll.deregister(&client.socket) |
144 poll.deregister(&client.socket) |
98 .ok().expect("could not deregister socket"); |
145 .ok().expect("could not deregister socket"); |
99 info!("client {} ({}) removed", client.id, client.peer_addr); |
146 info!("client {} ({}) removed", client.id, client.peer_addr); |
100 client_exists = true; |
147 client_exists = true; |
101 } |
148 } |
114 let client = NetworkClient::new(id, client_socket, addr); |
161 let client = NetworkClient::new(id, client_socket, addr); |
115 info!("client {} ({}) added", client.id, client.peer_addr); |
162 info!("client {} ({}) added", client.id, client.peer_addr); |
116 entry.insert(client); |
163 entry.insert(client); |
117 } |
164 } |
118 |
165 |
119 pub fn accept_client(&mut self, poll: &Poll) -> io::Result<()> { |
|
120 let (client_socket, addr) = self.listener.accept()?; |
|
121 info!("Connected: {}", addr); |
|
122 |
|
123 let client_id = self.server.add_client(); |
|
124 self.register_client(poll, client_id, client_socket, addr); |
|
125 self.flush_server_messages(); |
|
126 |
|
127 Ok(()) |
|
128 } |
|
129 |
|
130 fn flush_server_messages(&mut self) { |
166 fn flush_server_messages(&mut self) { |
|
167 debug!("{} pending server messages", self.server.output.len()); |
131 for PendingMessage(destination, msg) in self.server.output.drain(..) { |
168 for PendingMessage(destination, msg) in self.server.output.drain(..) { |
132 match destination { |
169 match destination { |
133 Destination::ToSelf(id) => { |
170 Destination::ToSelf(id) => { |
134 if let Some(ref mut client) = self.clients.get_mut(id) { |
171 if let Some(ref mut client) = self.clients.get_mut(id) { |
135 client.send_msg(msg) |
172 client.send_msg(msg); |
|
173 self.pending.push_back((id, NetworkClientState::NeedsWrite)); |
136 } |
174 } |
137 } |
175 } |
138 Destination::ToOthers(id) => { |
176 Destination::ToOthers(id) => { |
139 let msg_string = msg.to_raw_protocol(); |
177 let msg_string = msg.to_raw_protocol(); |
140 for item in self.clients.iter_mut() { |
178 for (client_id, client) in self.clients.iter_mut() { |
141 if item.0 != id { |
179 if client_id != id { |
142 item.1.send_string(&msg_string) |
180 client.send_string(&msg_string); |
|
181 self.pending.push_back((client_id, NetworkClientState::NeedsWrite)); |
143 } |
182 } |
144 } |
183 } |
145 } |
184 } |
146 } |
185 } |
147 } |
186 } |
|
187 } |
|
188 |
|
189 pub fn accept_client(&mut self, poll: &Poll) -> io::Result<()> { |
|
190 let (client_socket, addr) = self.listener.accept()?; |
|
191 info!("Connected: {}", addr); |
|
192 |
|
193 let client_id = self.server.add_client(); |
|
194 self.register_client(poll, client_id, client_socket, addr); |
|
195 self.flush_server_messages(); |
|
196 |
|
197 Ok(()) |
|
198 } |
|
199 |
|
200 fn operation_failed(&mut self, poll: &Poll, client_id: ClientId, error: Error, msg: &str) -> io::Result<()> { |
|
201 let addr = if let Some(ref mut client) = self.clients.get_mut(client_id) { |
|
202 client.peer_addr |
|
203 } else { |
|
204 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) |
|
205 }; |
|
206 debug!("{}({}): {}", msg, addr, error); |
|
207 self.client_error(poll, client_id) |
148 } |
208 } |
149 |
209 |
150 pub fn client_readable(&mut self, poll: &Poll, |
210 pub fn client_readable(&mut self, poll: &Poll, |
151 client_id: ClientId) -> io::Result<()> { |
211 client_id: ClientId) -> io::Result<()> { |
152 let mut client_lost = false; |
212 let messages = |
153 let messages; |
213 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
154 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
214 client.read_messages() |
155 messages = match client.read_messages() { |
215 } else { |
156 Ok(messages) => Some(messages), |
216 warn!("invalid readable client: {}", client_id); |
157 Err(ref error) if error.kind() == ErrorKind::WouldBlock => None, |
217 Ok((Vec::new(), NetworkClientState::Idle)) |
158 Err(error) => return Err(error) |
|
159 }; |
218 }; |
160 if client.closed { |
219 |
161 client_lost = true; |
220 match messages { |
162 } |
221 Ok((messages, state)) => { |
163 } else { |
222 for message in messages { |
164 warn!("invalid readable client: {}", client_id); |
223 self.server.handle_msg(client_id, message); |
165 messages = None; |
224 } |
166 }; |
225 match state { |
167 |
226 NetworkClientState::NeedsRead => |
168 if client_lost { |
227 self.pending.push_back((client_id, state)), |
169 self.client_error(&poll, client_id)?; |
228 NetworkClientState::Closed => |
170 } else if let Some(msg) = messages { |
229 self.client_error(&poll, client_id)?, |
171 for message in msg { |
230 _ => {} |
172 self.server.handle_msg(client_id, message); |
231 }; |
173 } |
232 } |
174 self.flush_server_messages(); |
233 Err(e) => self.operation_failed( |
175 } |
234 poll, client_id, e, |
|
235 "Error while reading from client socket")? |
|
236 } |
|
237 |
|
238 self.flush_server_messages(); |
176 |
239 |
177 if !self.server.removed_clients.is_empty() { |
240 if !self.server.removed_clients.is_empty() { |
178 let ids = self.server.removed_clients.to_vec(); |
241 let ids: Vec<_> = self.server.removed_clients.drain(..).collect(); |
179 self.server.removed_clients.clear(); |
|
180 for client_id in ids { |
242 for client_id in ids { |
181 self.deregister_client(poll, client_id); |
243 self.deregister_client(poll, client_id); |
182 } |
244 } |
183 } |
245 } |
184 |
246 |
185 Ok(()) |
247 Ok(()) |
186 } |
248 } |
187 |
249 |
188 pub fn client_writable(&mut self, poll: &Poll, |
250 pub fn client_writable(&mut self, poll: &Poll, |
189 client_id: ClientId) -> io::Result<()> { |
251 client_id: ClientId) -> io::Result<()> { |
190 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
252 let result = |
191 match client.write_messages() { |
253 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
192 Ok(_) => (), |
254 client.flush() |
193 Err(ref error) if error.kind() == ErrorKind::WouldBlock => (), |
255 } else { |
194 Err(error) => return Err(error) |
256 warn!("invalid writable client: {}", client_id); |
195 } |
257 Ok(((), NetworkClientState::Idle)) |
196 } else { |
258 }; |
197 warn!("invalid writable client: {}", client_id); |
259 |
|
260 match result { |
|
261 Ok(((), state)) if state == NetworkClientState::NeedsWrite => |
|
262 self.pending.push_back((client_id, state)), |
|
263 Ok(_) => |
|
264 {} |
|
265 Err(e) => self.operation_failed( |
|
266 poll, client_id, e, |
|
267 "Error while writing to client socket")? |
198 } |
268 } |
199 |
269 |
200 Ok(()) |
270 Ok(()) |
201 } |
271 } |
202 |
272 |