gameServer2/src/server/network.rs
changeset 13415 0eedc17055a0
parent 13414 28b314ad566d
child 13416 cdf69667593b
equal deleted inserted replaced
13414:28b314ad566d 13415:0eedc17055a0
     1 extern crate slab;
     1 extern crate slab;
     2 
     2 
     3 use std::{
     3 use std::{
     4     io, io::{Error, ErrorKind, Write},
     4     io, io::{Error, ErrorKind, Write},
     5     net::{SocketAddr, IpAddr, Ipv4Addr},
     5     net::{SocketAddr, IpAddr, Ipv4Addr},
     6     collections::VecDeque
     6     collections::HashSet,
       
     7     mem::swap
     7 };
     8 };
     8 
     9 
     9 use mio::{
    10 use mio::{
    10     net::{TcpStream, TcpListener},
    11     net::{TcpStream, TcpListener},
    11     Poll, PollOpt, Ready, Token
    12     Poll, PollOpt, Ready, Token
    20     client::ClientId
    21     client::ClientId
    21 };
    22 };
    22 
    23 
    23 const MAX_BYTES_PER_READ: usize = 2048;
    24 const MAX_BYTES_PER_READ: usize = 2048;
    24 
    25 
    25 #[derive(PartialEq, Copy, Clone)]
    26 #[derive(Hash, Eq, PartialEq, Copy, Clone)]
    26 pub enum NetworkClientState {
    27 pub enum NetworkClientState {
    27     Idle,
    28     Idle,
    28     NeedsWrite,
    29     NeedsWrite,
    29     NeedsRead,
    30     NeedsRead,
    30     Closed,
    31     Closed,
    88     pub fn flush(&mut self) -> NetworkResult<()> {
    89     pub fn flush(&mut self) -> NetworkResult<()> {
    89         let result = loop {
    90         let result = loop {
    90             match self.buf_out.write_to(&mut self.socket) {
    91             match self.buf_out.write_to(&mut self.socket) {
    91                 Ok(bytes) if self.buf_out.is_empty() || bytes == 0 =>
    92                 Ok(bytes) if self.buf_out.is_empty() || bytes == 0 =>
    92                     break Ok(((), NetworkClientState::Idle)),
    93                     break Ok(((), NetworkClientState::Idle)),
    93                 Ok(bytes) =>
    94                 Ok(_) => (),
    94                     (),
       
    95                 Err(ref error) if error.kind() == ErrorKind::Interrupted
    95                 Err(ref error) if error.kind() == ErrorKind::Interrupted
    96                     || error.kind() == ErrorKind::WouldBlock => {
    96                     || error.kind() == ErrorKind::WouldBlock => {
    97                     break Ok(((), NetworkClientState::NeedsWrite));
    97                     break Ok(((), NetworkClientState::NeedsWrite));
    98                 },
    98                 },
    99                 Err(error) =>
    99                 Err(error) =>
   118 }
   118 }
   119 
   119 
   120 pub struct NetworkLayer {
   120 pub struct NetworkLayer {
   121     listener: TcpListener,
   121     listener: TcpListener,
   122     server: HWServer,
   122     server: HWServer,
   123 
       
   124     clients: Slab<NetworkClient>,
   123     clients: Slab<NetworkClient>,
   125     pending: VecDeque<(ClientId, NetworkClientState)>
   124     pending: HashSet<(ClientId, NetworkClientState)>,
       
   125     pending_cache: Vec<(ClientId, NetworkClientState)>
   126 }
   126 }
   127 
   127 
   128 impl NetworkLayer {
   128 impl NetworkLayer {
   129     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
   129     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
   130         let server = HWServer::new(clients_limit, rooms_limit);
   130         let server = HWServer::new(clients_limit, rooms_limit);
   131         let clients = Slab::with_capacity(clients_limit);
   131         let clients = Slab::with_capacity(clients_limit);
   132         let pending = VecDeque::with_capacity(clients_limit);
   132         let pending = HashSet::with_capacity(2 * clients_limit);
   133         NetworkLayer {listener, server, clients, pending}
   133         let pending_cache = Vec::with_capacity(2 * clients_limit);
       
   134         NetworkLayer {listener, server, clients, pending, pending_cache}
   134     }
   135     }
   135 
   136 
   136     pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
   137     pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
   137         poll.register(&self.listener, utils::SERVER, Ready::readable(),
   138         poll.register(&self.listener, utils::SERVER, Ready::readable(),
   138                       PollOpt::edge())
   139                       PollOpt::edge())
   168         for PendingMessage(destination, msg) in self.server.output.drain(..) {
   169         for PendingMessage(destination, msg) in self.server.output.drain(..) {
   169             match destination {
   170             match destination {
   170                 Destination::ToSelf(id)  => {
   171                 Destination::ToSelf(id)  => {
   171                     if let Some(ref mut client) = self.clients.get_mut(id) {
   172                     if let Some(ref mut client) = self.clients.get_mut(id) {
   172                         client.send_msg(msg);
   173                         client.send_msg(msg);
   173                         self.pending.push_back((id, NetworkClientState::NeedsWrite));
   174                         self.pending.insert((id, NetworkClientState::NeedsWrite));
   174                     }
   175                     }
   175                 }
   176                 }
   176                 Destination::ToOthers(id) => {
   177                 Destination::ToOthers(id) => {
   177                     let msg_string = msg.to_raw_protocol();
   178                     let msg_string = msg.to_raw_protocol();
   178                     for (client_id, client) in self.clients.iter_mut() {
   179                     for (client_id, client) in self.clients.iter_mut() {
   179                         if client_id != id {
   180                         if client_id != id {
   180                             client.send_string(&msg_string);
   181                             client.send_string(&msg_string);
   181                             self.pending.push_back((client_id, NetworkClientState::NeedsWrite));
   182                             self.pending.insert((client_id, NetworkClientState::NeedsWrite));
   182                         }
   183                         }
   183                     }
   184                     }
   184                 }
   185                 }
   185             }
   186             }
   186         }
   187         }
   221             Ok((messages, state)) => {
   222             Ok((messages, state)) => {
   222                 for message in messages {
   223                 for message in messages {
   223                     self.server.handle_msg(client_id, message);
   224                     self.server.handle_msg(client_id, message);
   224                 }
   225                 }
   225                 match state {
   226                 match state {
   226                     NetworkClientState::NeedsRead =>
   227                     NetworkClientState::NeedsRead => {
   227                         self.pending.push_back((client_id, state)),
   228                         self.pending.insert((client_id, state));
       
   229                     },
   228                     NetworkClientState::Closed =>
   230                     NetworkClientState::Closed =>
   229                         self.client_error(&poll, client_id)?,
   231                         self.client_error(&poll, client_id)?,
   230                     _ => {}
   232                     _ => {}
   231                 };
   233                 };
   232             }
   234             }
   256                 warn!("invalid writable client: {}", client_id);
   258                 warn!("invalid writable client: {}", client_id);
   257                 Ok(((), NetworkClientState::Idle))
   259                 Ok(((), NetworkClientState::Idle))
   258             };
   260             };
   259 
   261 
   260         match result {
   262         match result {
   261             Ok(((), state)) if state == NetworkClientState::NeedsWrite =>
   263             Ok(((), state)) if state == NetworkClientState::NeedsWrite => {
   262                 self.pending.push_back((client_id, state)),
   264                 self.pending.insert((client_id, state));
   263             Ok(_) =>
   265             },
   264                 {}
   266             Ok(_) => {}
   265             Err(e) => self.operation_failed(
   267             Err(e) => self.operation_failed(
   266                 poll, client_id, e,
   268                 poll, client_id, e,
   267                 "Error while writing to client socket")?
   269                 "Error while writing to client socket")?
   268         }
   270         }
   269 
   271 
   281     pub fn has_pending_operations(&self) -> bool {
   283     pub fn has_pending_operations(&self) -> bool {
   282         !self.pending.is_empty()
   284         !self.pending.is_empty()
   283     }
   285     }
   284 
   286 
   285     pub fn on_idle(&mut self, poll: &Poll) -> io::Result<()> {
   287     pub fn on_idle(&mut self, poll: &Poll) -> io::Result<()> {
   286         while let Some((id, state)) = self.pending.pop_front() {
   288         if self.has_pending_operations() {
   287             match state {
   289             let mut cache = Vec::new();
   288                 NetworkClientState::NeedsRead =>
   290             swap(&mut cache, &mut self.pending_cache);
   289                     self.client_readable(poll, id)?,
   291             cache.extend(self.pending.drain());
   290                 NetworkClientState::NeedsWrite =>
   292             for (id, state) in cache.drain(..) {
   291                     self.client_writable(poll, id)?,
   293                 match state {
   292                 _ => {}
   294                     NetworkClientState::NeedsRead =>
   293             }
   295                         self.client_readable(poll, id)?,
   294         }
   296                     NetworkClientState::NeedsWrite =>
   295         Ok(())
   297                         self.client_writable(poll, id)?,
   296     }
   298                     _ => {}
   297 }
   299                 }
       
   300             }
       
   301             swap(&mut cache, &mut self.pending_cache);
       
   302         }
       
   303         Ok(())
       
   304     }
       
   305 }