rust/hedgewars-server/src/server/network.rs
changeset 15832 a4d505a32879
parent 15831 7d0f747afcb8
child 15936 c5c53ebb2d91
equal deleted inserted replaced
15831:7d0f747afcb8 15832:a4d505a32879
     1 use bytes::{Buf, Bytes};
     1 use bytes::{Buf, Bytes};
     2 use log::*;
     2 use log::*;
     3 use slab::Slab;
     3 use slab::Slab;
     4 use std::{
     4 use std::{
     5     collections::HashSet,
       
     6     io,
       
     7     io::{Error, ErrorKind, Read, Write},
       
     8     iter::Iterator,
     5     iter::Iterator,
     9     mem::{replace, swap},
     6     net::{IpAddr, SocketAddr},
    10     net::{IpAddr, Ipv4Addr, SocketAddr},
       
    11     num::NonZeroU32,
       
    12     time::Duration,
     7     time::Duration,
    13     time::Instant,
       
    14 };
     8 };
    15 use tokio::{
     9 use tokio::{
    16     io::AsyncReadExt,
    10     io::AsyncReadExt,
    17     net::{TcpListener, TcpStream},
    11     net::{TcpListener, TcpStream},
    18     sync::mpsc::{channel, Receiver, Sender},
    12     sync::mpsc::{channel, Receiver, Sender},
    23         events::{TimedEvents, Timeout},
    17         events::{TimedEvents, Timeout},
    24         types::ClientId,
    18         types::ClientId,
    25     },
    19     },
    26     handlers,
    20     handlers,
    27     handlers::{IoResult, IoTask, ServerState},
    21     handlers::{IoResult, IoTask, ServerState},
    28     protocol::ProtocolDecoder,
    22     protocol::{self, ProtocolDecoder, ProtocolError},
    29     utils,
    23     utils,
    30 };
    24 };
    31 use hedgewars_network_protocol::{
    25 use hedgewars_network_protocol::{
    32     messages::HwServerMessage::Redirect, messages::*, parser::server_message,
    26     messages::HwServerMessage::Redirect, messages::*, parser::server_message,
    33 };
    27 };
    34 use tokio::io::AsyncWriteExt;
    28 use tokio::io::AsyncWriteExt;
       
    29 
       
    30 const PING_TIMEOUT: Duration = Duration::from_secs(15);
    35 
    31 
    36 enum ClientUpdateData {
    32 enum ClientUpdateData {
    37     Message(HwProtocolMessage),
    33     Message(HwProtocolMessage),
    38     Error(String),
    34     Error(String),
    39 }
    35 }
    78         Self {
    74         Self {
    79             id,
    75             id,
    80             socket,
    76             socket,
    81             peer_addr,
    77             peer_addr,
    82             receiver,
    78             receiver,
    83             decoder: ProtocolDecoder::new(),
    79             decoder: ProtocolDecoder::new(PING_TIMEOUT),
    84         }
    80         }
    85     }
    81     }
    86 
    82 
    87     async fn read(&mut self) -> Option<HwProtocolMessage> {
    83     async fn read(
    88         self.decoder.read_from(&mut self.socket).await
    84         socket: &mut TcpStream,
    89     }
    85         decoder: &mut ProtocolDecoder,
    90 
    86     ) -> protocol::Result<HwProtocolMessage> {
    91     async fn write(&mut self, mut data: Bytes) -> bool {
    87         let result = decoder.read_from(socket).await;
    92         !data.has_remaining() || matches!(self.socket.write_buf(&mut data).await, Ok(n) if n > 0)
    88         if matches!(result, Err(ProtocolError::Timeout)) {
       
    89             if Self::write(socket, Bytes::from(HwServerMessage::Ping.to_raw_protocol())).await {
       
    90                 decoder.read_from(socket).await
       
    91             } else {
       
    92                 Err(ProtocolError::Eof)
       
    93             }
       
    94         } else {
       
    95             result
       
    96         }
       
    97     }
       
    98 
       
    99     async fn write(socket: &mut TcpStream, mut data: Bytes) -> bool {
       
   100         !data.has_remaining() || matches!(socket.write_buf(&mut data).await, Ok(n) if n > 0)
    93     }
   101     }
    94 
   102 
    95     async fn run(mut self, sender: Sender<ClientUpdate>) {
   103     async fn run(mut self, sender: Sender<ClientUpdate>) {
    96         use ClientUpdateData::*;
   104         use ClientUpdateData::*;
    97         let mut sender = ClientUpdateSender {
   105         let mut sender = ClientUpdateSender {
   101 
   109 
   102         loop {
   110         loop {
   103             tokio::select! {
   111             tokio::select! {
   104                 server_message = self.receiver.recv() => {
   112                 server_message = self.receiver.recv() => {
   105                     match server_message {
   113                     match server_message {
   106                         Some(message) => if !self.write(message).await {
   114                         Some(message) => if !Self::write(&mut self.socket, message).await {
   107                             sender.send(Error("Connection reset by peer".to_string())).await;
   115                             sender.send(Error("Connection reset by peer".to_string())).await;
   108                             break;
   116                             break;
   109                         }
   117                         }
   110                         None => {
   118                         None => {
   111                             break;
   119                             break;
   112                         }
   120                         }
   113                     }
   121                     }
   114                 }
   122                 }
   115                 client_message = self.decoder.read_from(&mut self.socket) => {
   123                 client_message = Self::read(&mut self.socket, &mut self.decoder) => {
   116                      match client_message {
   124                      match client_message {
   117                         Some(message) => {
   125                         Ok(message) => {
   118                             if !sender.send(Message(message)).await {
   126                             if !sender.send(Message(message)).await {
   119                                 break;
   127                                 break;
   120                             }
   128                             }
   121                         }
   129                         }
   122                         None => {
   130                         Err(e) => {
   123                             sender.send(Error("Connection reset by peer".to_string())).await;
   131                             sender.send(Error(format!("{}", e))).await;
       
   132                             if matches!(e, ProtocolError::Timeout) {
       
   133                                 Self::write(&mut self.socket, Bytes::from(HwServerMessage::Bye("Ping timeout".to_string()).to_raw_protocol())).await;
       
   134                             }
   124                             break;
   135                             break;
   125                         }
   136                         }
   126                     }
   137                     }
   127                 }
   138                 }
   128             }
   139             }
   151                     use ClientUpdateData::*;
   162                     use ClientUpdateData::*;
   152                     match client_message {
   163                     match client_message {
   153                         Some(ClientUpdate{ client_id, data: Message(message) } ) => {
   164                         Some(ClientUpdate{ client_id, data: Message(message) } ) => {
   154                             self.handle_message(client_id, message).await;
   165                             self.handle_message(client_id, message).await;
   155                         }
   166                         }
   156                         Some(ClientUpdate{ client_id, .. } ) => {
   167                         Some(ClientUpdate{ client_id, data: Error(e) } ) => {
   157                             let mut response = handlers::Response::new(client_id);
   168                             let mut response = handlers::Response::new(client_id);
       
   169                             info!("Client {} error: {:?}", client_id, e);
       
   170                             response.remove_client(client_id);
   158                             handlers::handle_client_loss(&mut self.server_state, client_id, &mut response);
   171                             handlers::handle_client_loss(&mut self.server_state, client_id, &mut response);
   159                             self.handle_response(response).await;
   172                             self.handle_response(response).await;
   160                         }
   173                         }
   161                         None => unreachable!()
   174                         None => unreachable!()
   162                     }
   175                     }