rust/hedgewars-server/src/server/network.rs
changeset 15195 e705ac360785
parent 15194 21e87882df1c
child 15196 f1c2289d40bd
equal deleted inserted replaced
15194:21e87882df1c 15195:e705ac360785
    82     socket: ClientSocket,
    82     socket: ClientSocket,
    83     peer_addr: SocketAddr,
    83     peer_addr: SocketAddr,
    84     decoder: ProtocolDecoder,
    84     decoder: ProtocolDecoder,
    85     buf_out: netbuf::Buf,
    85     buf_out: netbuf::Buf,
    86     timeout: timer::Timeout,
    86     timeout: timer::Timeout,
       
    87     pending_close: bool,
    87 }
    88 }
    88 
    89 
    89 impl NetworkClient {
    90 impl NetworkClient {
    90     pub fn new(
    91     pub fn new(
    91         id: ClientId,
    92         id: ClientId,
    98             socket,
    99             socket,
    99             peer_addr,
   100             peer_addr,
   100             decoder: ProtocolDecoder::new(),
   101             decoder: ProtocolDecoder::new(),
   101             buf_out: netbuf::Buf::new(),
   102             buf_out: netbuf::Buf::new(),
   102             timeout,
   103             timeout,
       
   104             pending_close: false,
   103         }
   105         }
   104     }
   106     }
   105 
   107 
   106     #[cfg(feature = "tls-connections")]
   108     #[cfg(feature = "tls-connections")]
   107     fn handshake_impl(
   109     fn handshake_impl(
   183                 NetworkClient::read_impl(&mut self.decoder, stream, self.id, &self.peer_addr)
   185                 NetworkClient::read_impl(&mut self.decoder, stream, self.id, &self.peer_addr)
   184             }
   186             }
   185         }
   187         }
   186     }
   188     }
   187 
   189 
   188     fn write_impl<W: Write>(buf_out: &mut netbuf::Buf, destination: &mut W) -> NetworkResult<()> {
   190     fn write_impl<W: Write>(
       
   191         buf_out: &mut netbuf::Buf,
       
   192         destination: &mut W,
       
   193         close_on_empty: bool,
       
   194     ) -> NetworkResult<()> {
   189         let result = loop {
   195         let result = loop {
   190             match buf_out.write_to(destination) {
   196             match buf_out.write_to(destination) {
   191                 Ok(bytes) if buf_out.is_empty() || bytes == 0 => {
   197                 Ok(bytes) if buf_out.is_empty() || bytes == 0 => {
   192                     break Ok(((), NetworkClientState::Idle));
   198                     let status = if buf_out.is_empty() && close_on_empty {
       
   199                         NetworkClientState::Closed
       
   200                     } else {
       
   201                         NetworkClientState::Idle
       
   202                     };
       
   203                     break Ok(((), status));
   193                 }
   204                 }
   194                 Ok(_) => (),
   205                 Ok(_) => (),
   195                 Err(ref error)
   206                 Err(ref error)
   196                     if error.kind() == ErrorKind::Interrupted
   207                     if error.kind() == ErrorKind::Interrupted
   197                         || error.kind() == ErrorKind::WouldBlock =>
   208                         || error.kind() == ErrorKind::WouldBlock =>
   205     }
   216     }
   206 
   217 
   207     pub fn write(&mut self) -> NetworkResult<()> {
   218     pub fn write(&mut self) -> NetworkResult<()> {
   208         let result = match self.socket {
   219         let result = match self.socket {
   209             ClientSocket::Plain(ref mut stream) => {
   220             ClientSocket::Plain(ref mut stream) => {
   210                 NetworkClient::write_impl(&mut self.buf_out, stream)
   221                 NetworkClient::write_impl(&mut self.buf_out, stream, self.pending_close)
   211             }
   222             }
   212             #[cfg(feature = "tls-connections")]
   223             #[cfg(feature = "tls-connections")]
   213             ClientSocket::SslHandshake(ref mut handshake_opt) => {
   224             ClientSocket::SslHandshake(ref mut handshake_opt) => {
   214                 let handshake = std::mem::replace(handshake_opt, None).unwrap();
   225                 let handshake = std::mem::replace(handshake_opt, None).unwrap();
   215                 Ok(((), self.handshake_impl(handshake)?))
   226                 Ok(((), self.handshake_impl(handshake)?))
   216             }
   227             }
   217             #[cfg(feature = "tls-connections")]
   228             #[cfg(feature = "tls-connections")]
   218             ClientSocket::SslStream(ref mut stream) => {
   229             ClientSocket::SslStream(ref mut stream) => {
   219                 NetworkClient::write_impl(&mut self.buf_out, stream)
   230                 NetworkClient::write_impl(&mut self.buf_out, stream, self.pending_close)
   220             }
   231             }
   221         };
   232         };
   222 
   233 
   223         self.socket.inner().flush()?;
   234         self.socket.inner().flush()?;
   224         result
   235         result
   232         self.send_raw_msg(&msg.as_bytes());
   243         self.send_raw_msg(&msg.as_bytes());
   233     }
   244     }
   234 
   245 
   235     pub fn replace_timeout(&mut self, timeout: timer::Timeout) -> timer::Timeout {
   246     pub fn replace_timeout(&mut self, timeout: timer::Timeout) -> timer::Timeout {
   236         replace(&mut self.timeout, timeout)
   247         replace(&mut self.timeout, timeout)
       
   248     }
       
   249 
       
   250     pub fn has_pending_sends(&self) -> bool {
       
   251         !self.buf_out.is_empty()
   237     }
   252     }
   238 }
   253 }
   239 
   254 
   240 #[cfg(feature = "tls-connections")]
   255 #[cfg(feature = "tls-connections")]
   241 struct ServerSsl {
   256 struct ServerSsl {
   347 
   362 
   348         Ok(())
   363         Ok(())
   349     }
   364     }
   350 
   365 
   351     fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
   366     fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
   352         if let Some(ref client) = self.clients.get(id) {
   367         if let Some(ref mut client) = self.clients.get_mut(id) {
   353             poll.deregister(client.socket.inner())
   368             poll.deregister(client.socket.inner())
   354                 .expect("could not deregister socket");
   369                 .expect("could not deregister socket");
   355             info!("client {} ({}) removed", client.id, client.peer_addr);
   370             if client.has_pending_sends() {
   356             self.clients.remove(id);
   371                 info!(
       
   372                     "client {} ({}) pending removal",
       
   373                     client.id, client.peer_addr
       
   374                 );
       
   375                 client.pending_close = true;
       
   376                 poll.register(
       
   377                     client.socket.inner(),
       
   378                     Token(id),
       
   379                     Ready::writable(),
       
   380                     PollOpt::edge(),
       
   381                 )
       
   382                 .unwrap_or_else(|_| {
       
   383                     self.clients.remove(id);
       
   384                 });
       
   385             } else {
       
   386                 info!("client {} ({}) removed", client.id, client.peer_addr);
       
   387                 self.clients.remove(id);
       
   388             }
   357             #[cfg(feature = "official-server")]
   389             #[cfg(feature = "official-server")]
   358             self.io.cancel(id);
   390             self.io.cancel(id);
   359         }
   391         }
   360     }
   392     }
   361 
   393 
   581 
   613 
   582         match result {
   614         match result {
   583             Ok(((), state)) if state == NetworkClientState::NeedsWrite => {
   615             Ok(((), state)) if state == NetworkClientState::NeedsWrite => {
   584                 self.pending.insert((client_id, state));
   616                 self.pending.insert((client_id, state));
   585             }
   617             }
   586             Ok(_) => {}
   618             Ok(((), state)) if state == NetworkClientState::Closed => {
       
   619                 self.deregister_client(poll, client_id);
       
   620             }
       
   621             Ok(_) => (),
   587             Err(e) => {
   622             Err(e) => {
   588                 self.operation_failed(poll, client_id, &e, "Error while writing to client socket")?
   623                 self.operation_failed(poll, client_id, &e, "Error while writing to client socket")?
   589             }
   624             }
   590         }
   625         }
   591 
   626