rust/hedgewars-server/src/protocol.rs
changeset 15832 a4d505a32879
parent 15831 7d0f747afcb8
child 15989 fb389df02e3e
equal deleted inserted replaced
15831:7d0f747afcb8 15832:a4d505a32879
     1 use bytes::{Buf, BufMut, BytesMut};
     1 use bytes::{Buf, BufMut, BytesMut};
     2 use log::*;
     2 use log::*;
     3 use std::{io, io::ErrorKind, marker::Unpin};
     3 use std::{
     4 use tokio::io::AsyncReadExt;
     4     error::Error,
       
     5     fmt::{Debug, Display, Formatter},
       
     6     io,
       
     7     io::ErrorKind,
       
     8     marker::Unpin,
       
     9     time::Duration,
       
    10 };
       
    11 use tokio::{io::AsyncReadExt, time::timeout};
     5 
    12 
       
    13 use crate::protocol::ProtocolError::Timeout;
     6 use hedgewars_network_protocol::{
    14 use hedgewars_network_protocol::{
     7     messages::HwProtocolMessage,
    15     messages::HwProtocolMessage,
       
    16     parser::HwProtocolError,
     8     parser::{malformed_message, message},
    17     parser::{malformed_message, message},
     9 };
    18 };
    10 
    19 
       
    20 #[derive(Debug)]
       
    21 pub enum ProtocolError {
       
    22     Eof,
       
    23     Timeout,
       
    24     Network(Box<dyn Error + Send>),
       
    25 }
       
    26 
       
    27 impl Display for ProtocolError {
       
    28     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
       
    29         match self {
       
    30             ProtocolError::Eof => write!(f, "Connection reset by peer"),
       
    31             ProtocolError::Timeout => write!(f, "Read operation timed out"),
       
    32             ProtocolError::Network(source) => write!(f, "{:?}", source),
       
    33         }
       
    34     }
       
    35 }
       
    36 
       
    37 impl Error for ProtocolError {
       
    38     fn source(&self) -> Option<&(dyn Error + 'static)> {
       
    39         if let Self::Network(source) = self {
       
    40             Some(source.as_ref())
       
    41         } else {
       
    42             None
       
    43         }
       
    44     }
       
    45 }
       
    46 
       
    47 pub type Result<T> = std::result::Result<T, ProtocolError>;
       
    48 
    11 pub struct ProtocolDecoder {
    49 pub struct ProtocolDecoder {
    12     buffer: BytesMut,
    50     buffer: BytesMut,
       
    51     read_timeout: Duration,
    13     is_recovering: bool,
    52     is_recovering: bool,
    14 }
    53 }
    15 
    54 
    16 impl ProtocolDecoder {
    55 impl ProtocolDecoder {
    17     pub fn new() -> ProtocolDecoder {
    56     pub fn new(read_timeout: Duration) -> ProtocolDecoder {
    18         ProtocolDecoder {
    57         ProtocolDecoder {
    19             buffer: BytesMut::with_capacity(1024),
    58             buffer: BytesMut::with_capacity(1024),
       
    59             read_timeout,
    20             is_recovering: false,
    60             is_recovering: false,
    21         }
    61         }
    22     }
    62     }
    23 
    63 
    24     fn recover(&mut self) -> bool {
    64     fn recover(&mut self) -> bool {
    55     }
    95     }
    56 
    96 
    57     pub async fn read_from<R: AsyncReadExt + Unpin>(
    97     pub async fn read_from<R: AsyncReadExt + Unpin>(
    58         &mut self,
    98         &mut self,
    59         stream: &mut R,
    99         stream: &mut R,
    60     ) -> Option<HwProtocolMessage> {
   100     ) -> Result<HwProtocolMessage> {
       
   101         use ProtocolError::*;
       
   102 
    61         loop {
   103         loop {
    62             if !self.buffer.has_remaining() {
   104             if !self.buffer.has_remaining() {
    63                 let count = stream.read_buf(&mut self.buffer).await.ok()?;
   105                 match timeout(self.read_timeout, stream.read_buf(&mut self.buffer)).await {
    64                 if count == 0 {
   106                     Err(_) => return Err(Timeout),
    65                     return None;
   107                     Ok(Err(e)) => return Err(Network(Box::new(e))),
    66                 }
   108                     Ok(Ok(0)) => return Err(Eof),
       
   109                     Ok(Ok(_)) => (),
       
   110                 };
    67             }
   111             }
    68             while !self.buffer.is_empty() {
   112             while !self.buffer.is_empty() {
    69                 if let Some(result) = self.extract_message() {
   113                 if let Some(result) = self.extract_message() {
    70                     return Some(result);
   114                     return Ok(result);
    71                 }
   115                 }
    72             }
   116             }
    73         }
   117         }
    74     }
   118     }
    75 }
   119 }