rust/hedgewars-server/src/server/network.rs
changeset 14803 92225a708bda
parent 14796 f5d43f007970
child 14807 b2beb784e4b5
equal deleted inserted replaced
14802:a40139603cde 14803:92225a708bda
    11 use log::*;
    11 use log::*;
    12 use mio::{
    12 use mio::{
    13     net::{TcpListener, TcpStream},
    13     net::{TcpListener, TcpStream},
    14     Poll, PollOpt, Ready, Token,
    14     Poll, PollOpt, Ready, Token,
    15 };
    15 };
       
    16 use mio_extras::timer;
    16 use netbuf;
    17 use netbuf;
    17 use slab::Slab;
    18 use slab::Slab;
    18 
    19 
    19 use super::{core::HWServer, coretypes::ClientId, handlers};
    20 use super::{core::HWServer, coretypes::ClientId, handlers};
    20 use crate::{
    21 use crate::{
    32     ssl::{
    33     ssl::{
    33         HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype,
    34         HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslContextBuilder, SslFiletype,
    34         SslMethod, SslOptions, SslStream, SslStreamBuilder, SslVerifyMode,
    35         SslMethod, SslOptions, SslStream, SslStreamBuilder, SslVerifyMode,
    35     },
    36     },
    36 };
    37 };
       
    38 use std::time::Duration;
    37 
    39 
    38 const MAX_BYTES_PER_READ: usize = 2048;
    40 const MAX_BYTES_PER_READ: usize = 2048;
       
    41 const SEND_PING_TIMEOUT: Duration = Duration::from_secs(30);
       
    42 const DROP_CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
    39 
    43 
    40 #[derive(Hash, Eq, PartialEq, Copy, Clone)]
    44 #[derive(Hash, Eq, PartialEq, Copy, Clone)]
    41 pub enum NetworkClientState {
    45 pub enum NetworkClientState {
    42     Idle,
    46     Idle,
    43     NeedsWrite,
    47     NeedsWrite,
    78     id: ClientId,
    82     id: ClientId,
    79     socket: ClientSocket,
    83     socket: ClientSocket,
    80     peer_addr: SocketAddr,
    84     peer_addr: SocketAddr,
    81     decoder: ProtocolDecoder,
    85     decoder: ProtocolDecoder,
    82     buf_out: netbuf::Buf,
    86     buf_out: netbuf::Buf,
       
    87     timeout: timer::Timeout,
    83 }
    88 }
    84 
    89 
    85 impl NetworkClient {
    90 impl NetworkClient {
    86     pub fn new(id: ClientId, socket: ClientSocket, peer_addr: SocketAddr) -> NetworkClient {
    91     pub fn new(
       
    92         id: ClientId,
       
    93         socket: ClientSocket,
       
    94         peer_addr: SocketAddr,
       
    95         timeout: timer::Timeout,
       
    96     ) -> NetworkClient {
    87         NetworkClient {
    97         NetworkClient {
    88             id,
    98             id,
    89             socket,
    99             socket,
    90             peer_addr,
   100             peer_addr,
    91             decoder: ProtocolDecoder::new(),
   101             decoder: ProtocolDecoder::new(),
    92             buf_out: netbuf::Buf::new(),
   102             buf_out: netbuf::Buf::new(),
       
   103             timeout,
    93         }
   104         }
    94     }
   105     }
    95 
   106 
    96     #[cfg(feature = "tls-connections")]
   107     #[cfg(feature = "tls-connections")]
    97     fn handshake_impl(
   108     fn handshake_impl(
   229     }
   240     }
   230 
   241 
   231     pub fn send_string(&mut self, msg: &str) {
   242     pub fn send_string(&mut self, msg: &str) {
   232         self.send_raw_msg(&msg.as_bytes());
   243         self.send_raw_msg(&msg.as_bytes());
   233     }
   244     }
       
   245 
       
   246     pub fn replace_timeout(&mut self, timeout: timer::Timeout) -> timer::Timeout {
       
   247         replace(&mut self.timeout, timeout)
       
   248     }
   234 }
   249 }
   235 
   250 
   236 #[cfg(feature = "tls-connections")]
   251 #[cfg(feature = "tls-connections")]
   237 struct ServerSsl {
   252 struct ServerSsl {
   238     context: SslContext,
   253     context: SslContext,
   286             }
   301             }
   287         }
   302         }
   288     }
   303     }
   289 }
   304 }
   290 
   305 
       
   306 enum TimeoutEvent {
       
   307     SendPing,
       
   308     DropClient,
       
   309 }
       
   310 
       
   311 struct TimerData(TimeoutEvent, ClientId);
       
   312 
   291 pub struct NetworkLayer {
   313 pub struct NetworkLayer {
   292     listener: TcpListener,
   314     listener: TcpListener,
   293     server: HWServer,
   315     server: HWServer,
   294     clients: Slab<NetworkClient>,
   316     clients: Slab<NetworkClient>,
   295     pending: HashSet<(ClientId, NetworkClientState)>,
   317     pending: HashSet<(ClientId, NetworkClientState)>,
   296     pending_cache: Vec<(ClientId, NetworkClientState)>,
   318     pending_cache: Vec<(ClientId, NetworkClientState)>,
   297     #[cfg(feature = "tls-connections")]
   319     #[cfg(feature = "tls-connections")]
   298     ssl: ServerSsl,
   320     ssl: ServerSsl,
   299     #[cfg(feature = "official-server")]
   321     #[cfg(feature = "official-server")]
   300     io: IoLayer,
   322     io: IoLayer,
       
   323     timer: timer::Timer<TimerData>,
       
   324 }
       
   325 
       
   326 fn create_ping_timeout(timer: &mut timer::Timer<TimerData>, client_id: ClientId) -> timer::Timeout {
       
   327     timer.set_timeout(
       
   328         SEND_PING_TIMEOUT,
       
   329         TimerData(TimeoutEvent::SendPing, client_id),
       
   330     )
       
   331 }
       
   332 
       
   333 fn create_drop_timeout(timer: &mut timer::Timer<TimerData>, client_id: ClientId) -> timer::Timeout {
       
   334     timer.set_timeout(
       
   335         DROP_CLIENT_TIMEOUT,
       
   336         TimerData(TimeoutEvent::DropClient, client_id),
       
   337     )
   301 }
   338 }
   302 
   339 
   303 impl NetworkLayer {
   340 impl NetworkLayer {
   304     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
   341     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
   305         let server = HWServer::new(clients_limit, rooms_limit);
   342         let server = HWServer::new(clients_limit, rooms_limit);
   306         let clients = Slab::with_capacity(clients_limit);
   343         let clients = Slab::with_capacity(clients_limit);
   307         let pending = HashSet::with_capacity(2 * clients_limit);
   344         let pending = HashSet::with_capacity(2 * clients_limit);
   308         let pending_cache = Vec::with_capacity(2 * clients_limit);
   345         let pending_cache = Vec::with_capacity(2 * clients_limit);
       
   346         let timer = timer::Builder::default().build();
   309 
   347 
   310         NetworkLayer {
   348         NetworkLayer {
   311             listener,
   349             listener,
   312             server,
   350             server,
   313             clients,
   351             clients,
   315             pending_cache,
   353             pending_cache,
   316             #[cfg(feature = "tls-connections")]
   354             #[cfg(feature = "tls-connections")]
   317             ssl: NetworkLayer::create_ssl_context(),
   355             ssl: NetworkLayer::create_ssl_context(),
   318             #[cfg(feature = "official-server")]
   356             #[cfg(feature = "official-server")]
   319             io: IoLayer::new(),
   357             io: IoLayer::new(),
       
   358             timer,
   320         }
   359         }
   321     }
   360     }
   322 
   361 
   323     #[cfg(feature = "tls-connections")]
   362     #[cfg(feature = "tls-connections")]
   324     fn create_ssl_context() -> ServerSsl {
   363     fn create_ssl_context() -> ServerSsl {
   344             utils::SERVER_TOKEN,
   383             utils::SERVER_TOKEN,
   345             Ready::readable(),
   384             Ready::readable(),
   346             PollOpt::edge(),
   385             PollOpt::edge(),
   347         )?;
   386         )?;
   348 
   387 
       
   388         poll.register(
       
   389             &self.timer,
       
   390             utils::TIMER_TOKEN,
       
   391             Ready::readable(),
       
   392             PollOpt::edge(),
       
   393         )?;
       
   394 
   349         #[cfg(feature = "official-server")]
   395         #[cfg(feature = "official-server")]
   350         self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?;
   396         self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?;
   351 
   397 
   352         Ok(())
   398         Ok(())
   353     }
   399     }
   382             Ready::readable() | Ready::writable(),
   428             Ready::readable() | Ready::writable(),
   383             PollOpt::edge(),
   429             PollOpt::edge(),
   384         )
   430         )
   385         .expect("could not register socket with event loop");
   431         .expect("could not register socket with event loop");
   386 
   432 
   387         let client = NetworkClient::new(client_id, client_socket, addr);
   433         let client = NetworkClient::new(
       
   434             client_id,
       
   435             client_socket,
       
   436             addr,
       
   437             create_ping_timeout(&mut self.timer, client_id),
       
   438         );
   388         info!("client {} ({}) added", client.id, client.peer_addr);
   439         info!("client {} ({}) added", client.id, client.peer_addr);
   389         entry.insert(client);
   440         entry.insert(client);
   390 
   441 
   391         client_id
   442         client_id
   392     }
   443     }
   415             let client_id = response.client_id();
   466             let client_id = response.client_id();
   416             for task in response.extract_io_tasks() {
   467             for task in response.extract_io_tasks() {
   417                 self.io.send(client_id, task);
   468                 self.io.send(client_id, task);
   418             }
   469             }
   419         }
   470         }
       
   471     }
       
   472 
       
   473     pub fn handle_timeout(&mut self, poll: &Poll) -> io::Result<()> {
       
   474         while let Some(TimerData(event, client_id)) = self.timer.poll() {
       
   475             match event {
       
   476                 TimeoutEvent::SendPing => {
       
   477                     if let Some(ref mut client) = self.clients.get_mut(client_id) {
       
   478                         client.send_string(&HWServerMessage::Ping.to_raw_protocol());
       
   479                         client.write()?;
       
   480                         client.replace_timeout(create_drop_timeout(&mut self.timer, client_id));
       
   481                     }
       
   482                 }
       
   483                 TimeoutEvent::DropClient => {
       
   484                     self.operation_failed(
       
   485                         poll,
       
   486                         client_id,
       
   487                         &ErrorKind::TimedOut.into(),
       
   488                         "No ping response",
       
   489                     )?;
       
   490                 }
       
   491             }
       
   492         }
       
   493         Ok(())
   420     }
   494     }
   421 
   495 
   422     #[cfg(feature = "official-server")]
   496     #[cfg(feature = "official-server")]
   423     pub fn handle_io_result(&mut self) {
   497     pub fn handle_io_result(&mut self) {
   424         if let Some((client_id, result)) = self.io.try_recv() {
   498         if let Some((client_id, result)) = self.io.try_recv() {
   484         self.client_error(poll, client_id)
   558         self.client_error(poll, client_id)
   485     }
   559     }
   486 
   560 
   487     pub fn client_readable(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> {
   561     pub fn client_readable(&mut self, poll: &Poll, client_id: ClientId) -> io::Result<()> {
   488         let messages = if let Some(ref mut client) = self.clients.get_mut(client_id) {
   562         let messages = if let Some(ref mut client) = self.clients.get_mut(client_id) {
       
   563             let timeout = client.replace_timeout(create_ping_timeout(&mut self.timer, client_id));
       
   564             self.timer.cancel_timeout(&timeout);
   489             client.read()
   565             client.read()
   490         } else {
   566         } else {
   491             warn!("invalid readable client: {}", client_id);
   567             warn!("invalid readable client: {}", client_id);
   492             Ok((Vec::new(), NetworkClientState::Idle))
   568             Ok((Vec::new(), NetworkClientState::Idle))
   493         };
   569         };