1 extern crate slab; |
1 extern crate slab; |
2 |
2 |
3 use std::{ |
3 use std::{ |
4 io, io::{Error, ErrorKind, Write}, |
4 io, io::{Error, ErrorKind, Write}, |
5 net::{SocketAddr, IpAddr, Ipv4Addr}, |
5 net::{SocketAddr, IpAddr, Ipv4Addr}, |
6 collections::VecDeque |
6 collections::HashSet, |
|
7 mem::swap |
7 }; |
8 }; |
8 |
9 |
9 use mio::{ |
10 use mio::{ |
10 net::{TcpStream, TcpListener}, |
11 net::{TcpStream, TcpListener}, |
11 Poll, PollOpt, Ready, Token |
12 Poll, PollOpt, Ready, Token |
88 pub fn flush(&mut self) -> NetworkResult<()> { |
89 pub fn flush(&mut self) -> NetworkResult<()> { |
89 let result = loop { |
90 let result = loop { |
90 match self.buf_out.write_to(&mut self.socket) { |
91 match self.buf_out.write_to(&mut self.socket) { |
91 Ok(bytes) if self.buf_out.is_empty() || bytes == 0 => |
92 Ok(bytes) if self.buf_out.is_empty() || bytes == 0 => |
92 break Ok(((), NetworkClientState::Idle)), |
93 break Ok(((), NetworkClientState::Idle)), |
93 Ok(bytes) => |
94 Ok(_) => (), |
94 (), |
|
95 Err(ref error) if error.kind() == ErrorKind::Interrupted |
95 Err(ref error) if error.kind() == ErrorKind::Interrupted |
96 || error.kind() == ErrorKind::WouldBlock => { |
96 || error.kind() == ErrorKind::WouldBlock => { |
97 break Ok(((), NetworkClientState::NeedsWrite)); |
97 break Ok(((), NetworkClientState::NeedsWrite)); |
98 }, |
98 }, |
99 Err(error) => |
99 Err(error) => |
118 } |
118 } |
119 |
119 |
120 pub struct NetworkLayer { |
120 pub struct NetworkLayer { |
121 listener: TcpListener, |
121 listener: TcpListener, |
122 server: HWServer, |
122 server: HWServer, |
123 |
|
124 clients: Slab<NetworkClient>, |
123 clients: Slab<NetworkClient>, |
125 pending: VecDeque<(ClientId, NetworkClientState)> |
124 pending: HashSet<(ClientId, NetworkClientState)>, |
|
125 pending_cache: Vec<(ClientId, NetworkClientState)> |
126 } |
126 } |
127 |
127 |
128 impl NetworkLayer { |
128 impl NetworkLayer { |
129 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
129 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
130 let server = HWServer::new(clients_limit, rooms_limit); |
130 let server = HWServer::new(clients_limit, rooms_limit); |
131 let clients = Slab::with_capacity(clients_limit); |
131 let clients = Slab::with_capacity(clients_limit); |
132 let pending = VecDeque::with_capacity(clients_limit); |
132 let pending = HashSet::with_capacity(2 * clients_limit); |
133 NetworkLayer {listener, server, clients, pending} |
133 let pending_cache = Vec::with_capacity(2 * clients_limit); |
|
134 NetworkLayer {listener, server, clients, pending, pending_cache} |
134 } |
135 } |
135 |
136 |
136 pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
137 pub fn register_server(&self, poll: &Poll) -> io::Result<()> { |
137 poll.register(&self.listener, utils::SERVER, Ready::readable(), |
138 poll.register(&self.listener, utils::SERVER, Ready::readable(), |
138 PollOpt::edge()) |
139 PollOpt::edge()) |
168 for PendingMessage(destination, msg) in self.server.output.drain(..) { |
169 for PendingMessage(destination, msg) in self.server.output.drain(..) { |
169 match destination { |
170 match destination { |
170 Destination::ToSelf(id) => { |
171 Destination::ToSelf(id) => { |
171 if let Some(ref mut client) = self.clients.get_mut(id) { |
172 if let Some(ref mut client) = self.clients.get_mut(id) { |
172 client.send_msg(msg); |
173 client.send_msg(msg); |
173 self.pending.push_back((id, NetworkClientState::NeedsWrite)); |
174 self.pending.insert((id, NetworkClientState::NeedsWrite)); |
174 } |
175 } |
175 } |
176 } |
176 Destination::ToOthers(id) => { |
177 Destination::ToOthers(id) => { |
177 let msg_string = msg.to_raw_protocol(); |
178 let msg_string = msg.to_raw_protocol(); |
178 for (client_id, client) in self.clients.iter_mut() { |
179 for (client_id, client) in self.clients.iter_mut() { |
179 if client_id != id { |
180 if client_id != id { |
180 client.send_string(&msg_string); |
181 client.send_string(&msg_string); |
181 self.pending.push_back((client_id, NetworkClientState::NeedsWrite)); |
182 self.pending.insert((client_id, NetworkClientState::NeedsWrite)); |
182 } |
183 } |
183 } |
184 } |
184 } |
185 } |
185 } |
186 } |
186 } |
187 } |
221 Ok((messages, state)) => { |
222 Ok((messages, state)) => { |
222 for message in messages { |
223 for message in messages { |
223 self.server.handle_msg(client_id, message); |
224 self.server.handle_msg(client_id, message); |
224 } |
225 } |
225 match state { |
226 match state { |
226 NetworkClientState::NeedsRead => |
227 NetworkClientState::NeedsRead => { |
227 self.pending.push_back((client_id, state)), |
228 self.pending.insert((client_id, state)); |
|
229 }, |
228 NetworkClientState::Closed => |
230 NetworkClientState::Closed => |
229 self.client_error(&poll, client_id)?, |
231 self.client_error(&poll, client_id)?, |
230 _ => {} |
232 _ => {} |
231 }; |
233 }; |
232 } |
234 } |
256 warn!("invalid writable client: {}", client_id); |
258 warn!("invalid writable client: {}", client_id); |
257 Ok(((), NetworkClientState::Idle)) |
259 Ok(((), NetworkClientState::Idle)) |
258 }; |
260 }; |
259 |
261 |
260 match result { |
262 match result { |
261 Ok(((), state)) if state == NetworkClientState::NeedsWrite => |
263 Ok(((), state)) if state == NetworkClientState::NeedsWrite => { |
262 self.pending.push_back((client_id, state)), |
264 self.pending.insert((client_id, state)); |
263 Ok(_) => |
265 }, |
264 {} |
266 Ok(_) => {} |
265 Err(e) => self.operation_failed( |
267 Err(e) => self.operation_failed( |
266 poll, client_id, e, |
268 poll, client_id, e, |
267 "Error while writing to client socket")? |
269 "Error while writing to client socket")? |
268 } |
270 } |
269 |
271 |
281 pub fn has_pending_operations(&self) -> bool { |
283 pub fn has_pending_operations(&self) -> bool { |
282 !self.pending.is_empty() |
284 !self.pending.is_empty() |
283 } |
285 } |
284 |
286 |
285 pub fn on_idle(&mut self, poll: &Poll) -> io::Result<()> { |
287 pub fn on_idle(&mut self, poll: &Poll) -> io::Result<()> { |
286 while let Some((id, state)) = self.pending.pop_front() { |
288 if self.has_pending_operations() { |
287 match state { |
289 let mut cache = Vec::new(); |
288 NetworkClientState::NeedsRead => |
290 swap(&mut cache, &mut self.pending_cache); |
289 self.client_readable(poll, id)?, |
291 cache.extend(self.pending.drain()); |
290 NetworkClientState::NeedsWrite => |
292 for (id, state) in cache.drain(..) { |
291 self.client_writable(poll, id)?, |
293 match state { |
292 _ => {} |
294 NetworkClientState::NeedsRead => |
293 } |
295 self.client_readable(poll, id)?, |
294 } |
296 NetworkClientState::NeedsWrite => |
295 Ok(()) |
297 self.client_writable(poll, id)?, |
296 } |
298 _ => {} |
297 } |
299 } |
|
300 } |
|
301 swap(&mut cache, &mut self.pending_cache); |
|
302 } |
|
303 Ok(()) |
|
304 } |
|
305 } |