11 use log::*; |
11 use log::*; |
12 use mio::{ |
12 use mio::{ |
13 net::{TcpListener, TcpStream}, |
13 net::{TcpListener, TcpStream}, |
14 Poll, PollOpt, Ready, Token, |
14 Poll, PollOpt, Ready, Token, |
15 }; |
15 }; |
|
16 use mio_extras::timer; |
16 use netbuf; |
17 use netbuf; |
17 use slab::Slab; |
18 use slab::Slab; |
18 |
19 |
19 use super::{core::HWServer, coretypes::ClientId, handlers}; |
20 use super::{core::HWServer, coretypes::ClientId, handlers}; |
20 use crate::{ |
21 use crate::{ |
32 ssl::{ |
33 ssl::{ |
33 HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype, |
34 HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype, |
34 SslMethod, SslOptions, SslStream, SslStreamBuilder, SslVerifyMode, |
35 SslMethod, SslOptions, SslStream, SslStreamBuilder, SslVerifyMode, |
35 }, |
36 }, |
36 }; |
37 }; |
|
38 use std::time::Duration; |
37 |
39 |
38 const MAX_BYTES_PER_READ: usize = 2048; |
40 const MAX_BYTES_PER_READ: usize = 2048; |
|
41 const SEND_PING_TIMEOUT: Duration = Duration::from_secs(30); |
|
42 const DROP_CLIENT_TIMEOUT: Duration = Duration::from_secs(10); |
39 |
43 |
40 #[derive(Hash, Eq, PartialEq, Copy, Clone)] |
44 #[derive(Hash, Eq, PartialEq, Copy, Clone)] |
41 pub enum NetworkClientState { |
45 pub enum NetworkClientState { |
42 Idle, |
46 Idle, |
43 NeedsWrite, |
47 NeedsWrite, |
78 id: ClientId, |
82 id: ClientId, |
79 socket: ClientSocket, |
83 socket: ClientSocket, |
80 peer_addr: SocketAddr, |
84 peer_addr: SocketAddr, |
81 decoder: ProtocolDecoder, |
85 decoder: ProtocolDecoder, |
82 buf_out: netbuf::Buf, |
86 buf_out: netbuf::Buf, |
|
87 timeout: timer::Timeout, |
83 } |
88 } |
84 |
89 |
85 impl NetworkClient { |
90 impl NetworkClient { |
86 pub fn new(id: ClientId, socket: ClientSocket, peer_addr: SocketAddr) -> NetworkClient { |
91 pub fn new( |
|
92 id: ClientId, |
|
93 socket: ClientSocket, |
|
94 peer_addr: SocketAddr, |
|
95 timeout: timer::Timeout, |
|
96 ) -> NetworkClient { |
87 NetworkClient { |
97 NetworkClient { |
88 id, |
98 id, |
89 socket, |
99 socket, |
90 peer_addr, |
100 peer_addr, |
91 decoder: ProtocolDecoder::new(), |
101 decoder: ProtocolDecoder::new(), |
92 buf_out: netbuf::Buf::new(), |
102 buf_out: netbuf::Buf::new(), |
|
103 timeout, |
93 } |
104 } |
94 } |
105 } |
95 |
106 |
96 #[cfg(feature = "tls-connections")] |
107 #[cfg(feature = "tls-connections")] |
97 fn handshake_impl( |
108 fn handshake_impl( |
229 } |
240 } |
230 |
241 |
231 pub fn send_string(&mut self, msg: &str) { |
242 pub fn send_string(&mut self, msg: &str) { |
232 self.send_raw_msg(&msg.as_bytes()); |
243 self.send_raw_msg(&msg.as_bytes()); |
233 } |
244 } |
|
245 |
|
246 pub fn replace_timeout(&mut self, timeout: timer::Timeout) -> timer::Timeout { |
|
247 replace(&mut self.timeout, timeout) |
|
248 } |
234 } |
249 } |
235 |
250 |
236 #[cfg(feature = "tls-connections")] |
251 #[cfg(feature = "tls-connections")] |
237 struct ServerSsl { |
252 struct ServerSsl { |
238 context: SslContext, |
253 context: SslContext, |
286 } |
301 } |
287 } |
302 } |
288 } |
303 } |
289 } |
304 } |
290 |
305 |
|
306 enum TimeoutEvent { |
|
307 SendPing, |
|
308 DropClient, |
|
309 } |
|
310 |
|
311 struct TimerData(TimeoutEvent, ClientId); |
|
312 |
291 pub struct NetworkLayer { |
313 pub struct NetworkLayer { |
292 listener: TcpListener, |
314 listener: TcpListener, |
293 server: HWServer, |
315 server: HWServer, |
294 clients: Slab<NetworkClient>, |
316 clients: Slab<NetworkClient>, |
295 pending: HashSet<(ClientId, NetworkClientState)>, |
317 pending: HashSet<(ClientId, NetworkClientState)>, |
296 pending_cache: Vec<(ClientId, NetworkClientState)>, |
318 pending_cache: Vec<(ClientId, NetworkClientState)>, |
297 #[cfg(feature = "tls-connections")] |
319 #[cfg(feature = "tls-connections")] |
298 ssl: ServerSsl, |
320 ssl: ServerSsl, |
299 #[cfg(feature = "official-server")] |
321 #[cfg(feature = "official-server")] |
300 io: IoLayer, |
322 io: IoLayer, |
|
323 timer: timer::Timer<TimerData>, |
|
324 } |
|
325 |
|
326 fn create_ping_timeout(timer: &mut timer::Timer<TimerData>, client_id: ClientId) -> timer::Timeout { |
|
327 timer.set_timeout( |
|
328 SEND_PING_TIMEOUT, |
|
329 TimerData(TimeoutEvent::SendPing, client_id), |
|
330 ) |
|
331 } |
|
332 |
|
333 fn create_drop_timeout(timer: &mut timer::Timer<TimerData>, client_id: ClientId) -> timer::Timeout { |
|
334 timer.set_timeout( |
|
335 DROP_CLIENT_TIMEOUT, |
|
336 TimerData(TimeoutEvent::DropClient, client_id), |
|
337 ) |
301 } |
338 } |
302 |
339 |
303 impl NetworkLayer { |
340 impl NetworkLayer { |
304 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
341 pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer { |
305 let server = HWServer::new(clients_limit, rooms_limit); |
342 let server = HWServer::new(clients_limit, rooms_limit); |
306 let clients = Slab::with_capacity(clients_limit); |
343 let clients = Slab::with_capacity(clients_limit); |
307 let pending = HashSet::with_capacity(2 * clients_limit); |
344 let pending = HashSet::with_capacity(2 * clients_limit); |
308 let pending_cache = Vec::with_capacity(2 * clients_limit); |
345 let pending_cache = Vec::with_capacity(2 * clients_limit); |
|
346 let timer = timer::Builder::default().build(); |
309 |
347 |
310 NetworkLayer { |
348 NetworkLayer { |
311 listener, |
349 listener, |
312 server, |
350 server, |
313 clients, |
351 clients, |
315 pending_cache, |
353 pending_cache, |
316 #[cfg(feature = "tls-connections")] |
354 #[cfg(feature = "tls-connections")] |
317 ssl: NetworkLayer::create_ssl_context(), |
355 ssl: NetworkLayer::create_ssl_context(), |
318 #[cfg(feature = "official-server")] |
356 #[cfg(feature = "official-server")] |
319 io: IoLayer::new(), |
357 io: IoLayer::new(), |
|
358 timer, |
320 } |
359 } |
321 } |
360 } |
322 |
361 |
323 #[cfg(feature = "tls-connections")] |
362 #[cfg(feature = "tls-connections")] |
324 fn create_ssl_context() -> ServerSsl { |
363 fn create_ssl_context() -> ServerSsl { |
382 Ready::readable() | Ready::writable(), |
428 Ready::readable() | Ready::writable(), |
383 PollOpt::edge(), |
429 PollOpt::edge(), |
384 ) |
430 ) |
385 .expect("could not register socket with event loop"); |
431 .expect("could not register socket with event loop"); |
386 |
432 |
387 let client = NetworkClient::new(client_id, client_socket, addr); |
433 let client = NetworkClient::new( |
|
434 client_id, |
|
435 client_socket, |
|
436 addr, |
|
437 create_ping_timeout(&mut self.timer, client_id), |
|
438 ); |
388 info!("client {} ({}) added", client.id, client.peer_addr); |
439 info!("client {} ({}) added", client.id, client.peer_addr); |
389 entry.insert(client); |
440 entry.insert(client); |
390 |
441 |
391 client_id |
442 client_id |
392 } |
443 } |
415 let client_id = response.client_id(); |
466 let client_id = response.client_id(); |
416 for task in response.extract_io_tasks() { |
467 for task in response.extract_io_tasks() { |
417 self.io.send(client_id, task); |
468 self.io.send(client_id, task); |
418 } |
469 } |
419 } |
470 } |
|
471 } |
|
472 |
|
473 pub fn handle_timeout(&mut self, poll: &Poll) -> io::Result<()> { |
|
474 while let Some(TimerData(event, client_id)) = self.timer.poll() { |
|
475 match event { |
|
476 TimeoutEvent::SendPing => { |
|
477 if let Some(ref mut client) = self.clients.get_mut(client_id) { |
|
478 client.send_string(&HWServerMessage::Ping.to_raw_protocol()); |
|
479 client.write()?; |
|
480 client.replace_timeout(create_drop_timeout(&mut self.timer, client_id)); |
|
481 } |
|
482 } |
|
483 TimeoutEvent::DropClient => { |
|
484 self.operation_failed( |
|
485 poll, |
|
486 client_id, |
|
487 &ErrorKind::TimedOut.into(), |
|
488 "No ping response", |
|
489 )?; |
|
490 } |
|
491 } |
|
492 } |
|
493 Ok(()) |
420 } |
494 } |
421 |
495 |
422 #[cfg(feature = "official-server")] |
496 #[cfg(feature = "official-server")] |
423 pub fn handle_io_result(&mut self) { |
497 pub fn handle_io_result(&mut self) { |
424 if let Some((client_id, result)) = self.io.try_recv() { |
498 if let Some((client_id, result)) = self.io.try_recv() { |
484 self.client_error(poll, client_id) |
558 self.client_error(poll, client_id) |
485 } |
559 } |
486 |
560 |
487 pub fn client_readable(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> { |
561 pub fn client_readable(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> { |
488 let messages = if let Some(ref mut client) = self.clients.get_mut(client_id) { |
562 let messages = if let Some(ref mut client) = self.clients.get_mut(client_id) { |
|
563 let timeout = client.replace_timeout(create_ping_timeout(&mut self.timer, client_id)); |
|
564 self.timer.cancel_timeout(&timeout); |
489 client.read() |
565 client.read() |
490 } else { |
566 } else { |
491 warn!("invalid readable client: {}", client_id); |
567 warn!("invalid readable client: {}", client_id); |
492 Ok((Vec::new(), NetworkClientState::Idle)) |
568 Ok((Vec::new(), NetworkClientState::Idle)) |
493 }; |
569 }; |