gameServer2/src/server/network.rs
changeset 13414 28b314ad566d
parent 13119 1e39b8749072
child 13415 0eedc17055a0
equal deleted inserted replaced
13412:236cc4cf2448 13414:28b314ad566d
     1 extern crate slab;
     1 extern crate slab;
     2 
     2 
     3 use std::io::ErrorKind;
     3 use std::{
     4 use mio::net::*;
     4     io, io::{Error, ErrorKind, Write},
     5 use super::server::{HWServer, PendingMessage, Destination};
     5     net::{SocketAddr, IpAddr, Ipv4Addr},
     6 use super::client::ClientId;
     6     collections::VecDeque
       
     7 };
       
     8 
       
     9 use mio::{
       
    10     net::{TcpStream, TcpListener},
       
    11     Poll, PollOpt, Ready, Token
       
    12 };
       
    13 use netbuf;
     7 use slab::Slab;
    14 use slab::Slab;
     8 
    15 
     9 use mio::net::TcpStream;
       
    10 use mio::*;
       
    11 use std::io::Write;
       
    12 use std::io;
       
    13 use netbuf;
       
    14 
       
    15 use utils;
    16 use utils;
    16 use protocol::ProtocolDecoder;
    17 use protocol::{ProtocolDecoder, messages::*};
    17 use protocol::messages::*;
    18 use super::{
    18 use std::net::SocketAddr;
    19     server::{HWServer, PendingMessage, Destination},
       
    20     client::ClientId
       
    21 };
       
    22 
       
    23 const MAX_BYTES_PER_READ: usize = 2048;
       
    24 
       
    25 #[derive(PartialEq, Copy, Clone)]
       
    26 pub enum NetworkClientState {
       
    27     Idle,
       
    28     NeedsWrite,
       
    29     NeedsRead,
       
    30     Closed,
       
    31 }
       
    32 
       
    33 type NetworkResult<T> = io::Result<(T, NetworkClientState)>;
    19 
    34 
    20 pub struct NetworkClient {
    35 pub struct NetworkClient {
    21     id: ClientId,
    36     id: ClientId,
    22     socket: TcpStream,
    37     socket: TcpStream,
    23     peer_addr: SocketAddr,
    38     peer_addr: SocketAddr,
    24     decoder: ProtocolDecoder,
    39     decoder: ProtocolDecoder,
    25     buf_out: netbuf::Buf,
    40     buf_out: netbuf::Buf
    26     closed: bool
       
    27 }
    41 }
    28 
    42 
    29 impl NetworkClient {
    43 impl NetworkClient {
    30     pub fn new(id: ClientId, socket: TcpStream, peer_addr: SocketAddr) -> NetworkClient {
    44     pub fn new(id: ClientId, socket: TcpStream, peer_addr: SocketAddr) -> NetworkClient {
    31         NetworkClient {
    45         NetworkClient {
    32             id, socket, peer_addr,
    46             id, socket, peer_addr,
    33             decoder: ProtocolDecoder::new(),
    47             decoder: ProtocolDecoder::new(),
    34             buf_out: netbuf::Buf::new(),
    48             buf_out: netbuf::Buf::new()
    35             closed: false
    49         }
    36         }
    50     }
       
    51 
       
    52     pub fn read_messages(&mut self) -> NetworkResult<Vec<HWProtocolMessage>> {
       
    53         let mut bytes_read = 0;
       
    54         let result = loop {
       
    55             match self.decoder.read_from(&mut self.socket) {
       
    56                 Ok(bytes) => {
       
    57                     debug!("Read {} bytes", bytes);
       
    58                     bytes_read += bytes;
       
    59                     if bytes == 0 {
       
    60                         let result = if bytes_read == 0 {
       
    61                             info!("EOF for client {} ({})", self.id, self.peer_addr);
       
    62                             (Vec::new(), NetworkClientState::Closed)
       
    63                         } else {
       
    64                             (self.decoder.extract_messages(), NetworkClientState::NeedsRead)
       
    65                         };
       
    66                         break Ok(result);
       
    67                     }
       
    68                     else if bytes_read >= MAX_BYTES_PER_READ {
       
    69                         break Ok((self.decoder.extract_messages(), NetworkClientState::NeedsRead))
       
    70                     }
       
    71                 }
       
    72                 Err(ref error) if error.kind() == ErrorKind::WouldBlock => {
       
    73                     let messages =  if bytes_read == 0 {
       
    74                         Vec::new()
       
    75                     } else {
       
    76                         self.decoder.extract_messages()
       
    77                     };
       
    78                     break Ok((messages, NetworkClientState::Idle));
       
    79                 }
       
    80                 Err(error) =>
       
    81                     break Err(error)
       
    82             }
       
    83         };
       
    84         self.decoder.sweep();
       
    85         result
       
    86     }
       
    87 
       
    88     pub fn flush(&mut self) -> NetworkResult<()> {
       
    89         let result = loop {
       
    90             match self.buf_out.write_to(&mut self.socket) {
       
    91                 Ok(bytes) if self.buf_out.is_empty() || bytes == 0 =>
       
    92                     break Ok(((), NetworkClientState::Idle)),
       
    93                 Ok(bytes) =>
       
    94                     (),
       
    95                 Err(ref error) if error.kind() == ErrorKind::Interrupted
       
    96                     || error.kind() == ErrorKind::WouldBlock => {
       
    97                     break Ok(((), NetworkClientState::NeedsWrite));
       
    98                 },
       
    99                 Err(error) =>
       
   100                     break Err(error)
       
   101             }
       
   102         };
       
   103         self.socket.flush()?;
       
   104         result
    37     }
   105     }
    38 
   106 
    39     pub fn send_raw_msg(&mut self, msg: &[u8]) {
   107     pub fn send_raw_msg(&mut self, msg: &[u8]) {
    40         self.buf_out.write(msg).unwrap();
   108         self.buf_out.write(msg).unwrap();
    41         self.flush();
       
    42     }
   109     }
    43 
   110 
    44     pub fn send_string(&mut self, msg: &String) {
   111     pub fn send_string(&mut self, msg: &String) {
    45         self.send_raw_msg(&msg.as_bytes());
   112         self.send_raw_msg(&msg.as_bytes());
    46     }
   113     }
    47 
   114 
    48     pub fn send_msg(&mut self, msg: HWServerMessage) {
   115     pub fn send_msg(&mut self, msg: HWServerMessage) {
    49         self.send_string(&msg.to_raw_protocol());
   116         self.send_string(&msg.to_raw_protocol());
    50     }
       
    51 
       
    52     fn flush(&mut self) {
       
    53         self.buf_out.write_to(&mut self.socket).unwrap();
       
    54         self.socket.flush().unwrap();
       
    55     }
       
    56 
       
    57     pub fn read_messages(&mut self) -> io::Result<Vec<HWProtocolMessage>> {
       
    58         let bytes_read = self.decoder.read_from(&mut self.socket)?;
       
    59         debug!("Read {} bytes", bytes_read);
       
    60 
       
    61         if bytes_read == 0 {
       
    62             self.closed = true;
       
    63             info!("EOF for client {} ({})", self.id, self.peer_addr);
       
    64         }
       
    65 
       
    66         Ok(self.decoder.extract_messages())
       
    67     }
       
    68 
       
    69     pub fn write_messages(&mut self) -> io::Result<()> {
       
    70         self.buf_out.write_to(&mut self.socket)?;
       
    71         Ok(())
       
    72     }
   117     }
    73 }
   118 }
    74 
   119 
    75 pub struct NetworkLayer {
   120 pub struct NetworkLayer {
    76     listener: TcpListener,
   121     listener: TcpListener,
    77     server: HWServer,
   122     server: HWServer,
    78 
   123 
    79     clients: Slab<NetworkClient>
   124     clients: Slab<NetworkClient>,
       
   125     pending: VecDeque<(ClientId, NetworkClientState)>
    80 }
   126 }
    81 
   127 
    82 impl NetworkLayer {
   128 impl NetworkLayer {
    83     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
   129     pub fn new(listener: TcpListener, clients_limit: usize, rooms_limit: usize) -> NetworkLayer {
    84         let server = HWServer::new(clients_limit, rooms_limit);
   130         let server = HWServer::new(clients_limit, rooms_limit);
    85         let clients = Slab::with_capacity(clients_limit);
   131         let clients = Slab::with_capacity(clients_limit);
    86         NetworkLayer {listener, server, clients}
   132         let pending = VecDeque::with_capacity(clients_limit);
       
   133         NetworkLayer {listener, server, clients, pending}
    87     }
   134     }
    88 
   135 
    89     pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
   136     pub fn register_server(&self, poll: &Poll) -> io::Result<()> {
    90         poll.register(&self.listener, utils::SERVER, Ready::readable(),
   137         poll.register(&self.listener, utils::SERVER, Ready::readable(),
    91                       PollOpt::edge())
   138                       PollOpt::edge())
    92     }
   139     }
    93 
   140 
    94     fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
   141     fn deregister_client(&mut self, poll: &Poll, id: ClientId) {
    95         let mut client_exists = false;
   142         let mut client_exists = false;
    96         if let Some(ref client) = self.clients.get_mut(id) {
   143         if let Some(ref client) = self.clients.get(id) {
    97             poll.deregister(&client.socket)
   144             poll.deregister(&client.socket)
    98                 .ok().expect("could not deregister socket");
   145                 .ok().expect("could not deregister socket");
    99             info!("client {} ({}) removed", client.id, client.peer_addr);
   146             info!("client {} ({}) removed", client.id, client.peer_addr);
   100             client_exists = true;
   147             client_exists = true;
   101         }
   148         }
   114         let client = NetworkClient::new(id, client_socket, addr);
   161         let client = NetworkClient::new(id, client_socket, addr);
   115         info!("client {} ({}) added", client.id, client.peer_addr);
   162         info!("client {} ({}) added", client.id, client.peer_addr);
   116         entry.insert(client);
   163         entry.insert(client);
   117     }
   164     }
   118 
   165 
   119     pub fn accept_client(&mut self, poll: &Poll) -> io::Result<()> {
       
   120         let (client_socket, addr) = self.listener.accept()?;
       
   121         info!("Connected: {}", addr);
       
   122 
       
   123         let client_id = self.server.add_client();
       
   124         self.register_client(poll, client_id, client_socket, addr);
       
   125         self.flush_server_messages();
       
   126 
       
   127         Ok(())
       
   128     }
       
   129 
       
   130     fn flush_server_messages(&mut self) {
   166     fn flush_server_messages(&mut self) {
       
   167         debug!("{} pending server messages", self.server.output.len());
   131         for PendingMessage(destination, msg) in self.server.output.drain(..) {
   168         for PendingMessage(destination, msg) in self.server.output.drain(..) {
   132             match destination {
   169             match destination {
   133                 Destination::ToSelf(id)  => {
   170                 Destination::ToSelf(id)  => {
   134                     if let Some(ref mut client) = self.clients.get_mut(id) {
   171                     if let Some(ref mut client) = self.clients.get_mut(id) {
   135                         client.send_msg(msg)
   172                         client.send_msg(msg);
       
   173                         self.pending.push_back((id, NetworkClientState::NeedsWrite));
   136                     }
   174                     }
   137                 }
   175                 }
   138                 Destination::ToOthers(id) => {
   176                 Destination::ToOthers(id) => {
   139                     let msg_string = msg.to_raw_protocol();
   177                     let msg_string = msg.to_raw_protocol();
   140                     for item in self.clients.iter_mut() {
   178                     for (client_id, client) in self.clients.iter_mut() {
   141                         if item.0 != id {
   179                         if client_id != id {
   142                             item.1.send_string(&msg_string)
   180                             client.send_string(&msg_string);
       
   181                             self.pending.push_back((client_id, NetworkClientState::NeedsWrite));
   143                         }
   182                         }
   144                     }
   183                     }
   145                 }
   184                 }
   146             }
   185             }
   147         }
   186         }
       
   187     }
       
   188 
       
   189     pub fn accept_client(&mut self, poll: &Poll) -> io::Result<()> {
       
   190         let (client_socket, addr) = self.listener.accept()?;
       
   191         info!("Connected: {}", addr);
       
   192 
       
   193         let client_id = self.server.add_client();
       
   194         self.register_client(poll, client_id, client_socket, addr);
       
   195         self.flush_server_messages();
       
   196 
       
   197         Ok(())
       
   198     }
       
   199 
       
   200     fn operation_failed(&mut self, poll: &Poll, client_id: ClientId, error: Error, msg: &str) -> io::Result<()> {
       
   201         let addr = if let Some(ref mut client) = self.clients.get_mut(client_id) {
       
   202             client.peer_addr
       
   203         } else {
       
   204             SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
       
   205         };
       
   206         debug!("{}({}): {}", msg, addr, error);
       
   207         self.client_error(poll, client_id)
   148     }
   208     }
   149 
   209 
   150     pub fn client_readable(&mut self, poll: &Poll,
   210     pub fn client_readable(&mut self, poll: &Poll,
   151                            client_id: ClientId) -> io::Result<()> {
   211                            client_id: ClientId) -> io::Result<()> {
   152         let mut client_lost = false;
   212         let messages =
   153         let messages;
   213             if let Some(ref mut client) = self.clients.get_mut(client_id) {
   154         if let Some(ref mut client) = self.clients.get_mut(client_id) {
   214                 client.read_messages()
   155             messages = match client.read_messages() {
   215             } else {
   156                 Ok(messages) => Some(messages),
   216                 warn!("invalid readable client: {}", client_id);
   157                 Err(ref error) if error.kind() == ErrorKind::WouldBlock => None,
   217                 Ok((Vec::new(), NetworkClientState::Idle))
   158                 Err(error) => return Err(error)
       
   159             };
   218             };
   160             if client.closed {
   219 
   161                 client_lost = true;
   220         match messages {
   162             }
   221             Ok((messages, state)) => {
   163         } else {
   222                 for message in messages {
   164             warn!("invalid readable client: {}", client_id);
   223                     self.server.handle_msg(client_id, message);
   165             messages = None;
   224                 }
   166         };
   225                 match state {
   167 
   226                     NetworkClientState::NeedsRead =>
   168         if client_lost {
   227                         self.pending.push_back((client_id, state)),
   169             self.client_error(&poll, client_id)?;
   228                     NetworkClientState::Closed =>
   170         } else if let Some(msg) = messages {
   229                         self.client_error(&poll, client_id)?,
   171             for message in msg {
   230                     _ => {}
   172                 self.server.handle_msg(client_id, message);
   231                 };
   173             }
   232             }
   174             self.flush_server_messages();
   233             Err(e) => self.operation_failed(
   175         }
   234                 poll, client_id, e,
       
   235                 "Error while reading from client socket")?
       
   236         }
       
   237 
       
   238         self.flush_server_messages();
   176 
   239 
   177         if !self.server.removed_clients.is_empty() {
   240         if !self.server.removed_clients.is_empty() {
   178             let ids = self.server.removed_clients.to_vec();
   241             let ids: Vec<_> = self.server.removed_clients.drain(..).collect();
   179             self.server.removed_clients.clear();
       
   180             for client_id in ids {
   242             for client_id in ids {
   181                 self.deregister_client(poll, client_id);
   243                 self.deregister_client(poll, client_id);
   182             }
   244             }
   183         }
   245         }
   184 
   246 
   185         Ok(())
   247         Ok(())
   186     }
   248     }
   187 
   249 
   188     pub fn client_writable(&mut self, poll: &Poll,
   250     pub fn client_writable(&mut self, poll: &Poll,
   189                            client_id: ClientId) -> io::Result<()> {
   251                            client_id: ClientId) -> io::Result<()> {
   190         if let Some(ref mut client) = self.clients.get_mut(client_id) {
   252         let result =
   191             match client.write_messages() {
   253             if let Some(ref mut client) = self.clients.get_mut(client_id) {
   192                 Ok(_) => (),
   254                 client.flush()
   193                 Err(ref error) if error.kind() == ErrorKind::WouldBlock => (),
   255             } else {
   194                 Err(error) => return Err(error)
   256                 warn!("invalid writable client: {}", client_id);
   195             }
   257                 Ok(((), NetworkClientState::Idle))
   196         } else {
   258             };
   197             warn!("invalid writable client: {}", client_id);
   259 
       
   260         match result {
       
   261             Ok(((), state)) if state == NetworkClientState::NeedsWrite =>
       
   262                 self.pending.push_back((client_id, state)),
       
   263             Ok(_) =>
       
   264                 {}
       
   265             Err(e) => self.operation_failed(
       
   266                 poll, client_id, e,
       
   267                 "Error while writing to client socket")?
   198         }
   268         }
   199 
   269 
   200         Ok(())
   270         Ok(())
   201     }
   271     }
   202 
   272 
   205         self.deregister_client(poll, client_id);
   275         self.deregister_client(poll, client_id);
   206         self.server.client_lost(client_id);
   276         self.server.client_lost(client_id);
   207 
   277 
   208         Ok(())
   278         Ok(())
   209     }
   279     }
   210 }
   280 
   211 
   281     pub fn has_pending_operations(&self) -> bool {
       
   282         !self.pending.is_empty()
       
   283     }
       
   284 
       
   285     pub fn on_idle(&mut self, poll: &Poll) -> io::Result<()> {
       
   286         while let Some((id, state)) = self.pending.pop_front() {
       
   287             match state {
       
   288                 NetworkClientState::NeedsRead =>
       
   289                     self.client_readable(poll, id)?,
       
   290                 NetworkClientState::NeedsWrite =>
       
   291                     self.client_writable(poll, id)?,
       
   292                 _ => {}
       
   293             }
       
   294         }
       
   295         Ok(())
       
   296     }
       
   297 }