1 use bytes::{Buf, BufMut, BytesMut}; |
1 use bytes::{Buf, BufMut, BytesMut}; |
2 use log::*; |
2 use log::*; |
3 use std::{io, io::ErrorKind, marker::Unpin}; |
3 use std::{ |
4 use tokio::io::AsyncReadExt; |
4 error::Error, |
|
5 fmt::{Debug, Display, Formatter}, |
|
6 io, |
|
7 io::ErrorKind, |
|
8 marker::Unpin, |
|
9 time::Duration, |
|
10 }; |
|
11 use tokio::{io::AsyncReadExt, time::timeout}; |
5 |
12 |
|
13 use crate::protocol::ProtocolError::Timeout; |
6 use hedgewars_network_protocol::{ |
14 use hedgewars_network_protocol::{ |
7 messages::HwProtocolMessage, |
15 messages::HwProtocolMessage, |
|
16 parser::HwProtocolError, |
8 parser::{malformed_message, message}, |
17 parser::{malformed_message, message}, |
9 }; |
18 }; |
10 |
19 |
|
20 #[derive(Debug)] |
|
21 pub enum ProtocolError { |
|
22 Eof, |
|
23 Timeout, |
|
24 Network(Box<dyn Error + Send>), |
|
25 } |
|
26 |
|
27 impl Display for ProtocolError { |
|
28 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { |
|
29 match self { |
|
30 ProtocolError::Eof => write!(f, "Connection reset by peer"), |
|
31 ProtocolError::Timeout => write!(f, "Read operation timed out"), |
|
32 ProtocolError::Network(source) => write!(f, "{:?}", source), |
|
33 } |
|
34 } |
|
35 } |
|
36 |
|
37 impl Error for ProtocolError { |
|
38 fn source(&self) -> Option<&(dyn Error + 'static)> { |
|
39 if let Self::Network(source) = self { |
|
40 Some(source.as_ref()) |
|
41 } else { |
|
42 None |
|
43 } |
|
44 } |
|
45 } |
|
46 |
|
47 pub type Result<T> = std::result::Result<T, ProtocolError>; |
|
48 |
11 pub struct ProtocolDecoder { |
49 pub struct ProtocolDecoder { |
12 buffer: BytesMut, |
50 buffer: BytesMut, |
|
51 read_timeout: Duration, |
13 is_recovering: bool, |
52 is_recovering: bool, |
14 } |
53 } |
15 |
54 |
16 impl ProtocolDecoder { |
55 impl ProtocolDecoder { |
17 pub fn new() -> ProtocolDecoder { |
56 pub fn new(read_timeout: Duration) -> ProtocolDecoder { |
18 ProtocolDecoder { |
57 ProtocolDecoder { |
19 buffer: BytesMut::with_capacity(1024), |
58 buffer: BytesMut::with_capacity(1024), |
|
59 read_timeout, |
20 is_recovering: false, |
60 is_recovering: false, |
21 } |
61 } |
22 } |
62 } |
23 |
63 |
24 fn recover(&mut self) -> bool { |
64 fn recover(&mut self) -> bool { |
55 } |
95 } |
56 |
96 |
57 pub async fn read_from<R: AsyncReadExt + Unpin>( |
97 pub async fn read_from<R: AsyncReadExt + Unpin>( |
58 &mut self, |
98 &mut self, |
59 stream: &mut R, |
99 stream: &mut R, |
60 ) -> Option<HwProtocolMessage> { |
100 ) -> Result<HwProtocolMessage> { |
|
101 use ProtocolError::*; |
|
102 |
61 loop { |
103 loop { |
62 if !self.buffer.has_remaining() { |
104 if !self.buffer.has_remaining() { |
63 let count = stream.read_buf(&mut self.buffer).await.ok()?; |
105 match timeout(self.read_timeout, stream.read_buf(&mut self.buffer)).await { |
64 if count == 0 { |
106 Err(_) => return Err(Timeout), |
65 return None; |
107 Ok(Err(e)) => return Err(Network(Box::new(e))), |
66 } |
108 Ok(Ok(0)) => return Err(Eof), |
|
109 Ok(Ok(_)) => (), |
|
110 }; |
67 } |
111 } |
68 while !self.buffer.is_empty() { |
112 while !self.buffer.is_empty() { |
69 if let Some(result) = self.extract_message() { |
113 if let Some(result) = self.extract_message() { |
70 return Some(result); |
114 return Ok(result); |
71 } |
115 } |
72 } |
116 } |
73 } |
117 } |
74 } |
118 } |
75 } |
119 } |