diff -r bbec6b28d072 -r f43ab2bd76ae rust/hedgewars-server/src/server/network.rs --- a/rust/hedgewars-server/src/server/network.rs Tue Apr 09 00:45:14 2019 +0200 +++ b/rust/hedgewars-server/src/server/network.rs Tue Apr 09 21:08:35 2019 +0300 @@ -16,11 +16,16 @@ use netbuf; use slab::Slab; -use super::{core::HWServer, coretypes::ClientId, handlers, io::FileServerIO}; +use super::{core::FileServerIO, core::HWServer, coretypes::ClientId, handlers}; use crate::{ protocol::{messages::*, ProtocolDecoder}, utils, }; + +#[cfg(feature = "official-server")] +use super::io::{IOThread, RequestId}; + +use crate::server::handlers::{IoResult, IoTask}; #[cfg(feature = "tls-connections")] use openssl::{ error::ErrorStack, @@ -234,6 +239,56 @@ context: SslContext, } +#[cfg(feature = "official-server")] +pub struct IoLayer { + next_request_id: RequestId, + request_queue: Vec<(RequestId, ClientId)>, + io_thread: IOThread, +} + +#[cfg(feature = "official-server")] +impl IoLayer { + fn new() -> Self { + Self { + next_request_id: 0, + request_queue: vec![], + io_thread: IOThread::new(), + } + } + + fn send(&mut self, client_id: ClientId, task: IoTask) { + let request_id = self.next_request_id; + self.next_request_id += 1; + self.request_queue.push((request_id, client_id)); + self.io_thread.send(request_id, task); + } + + fn try_recv(&mut self) -> Option<(ClientId, IoResult)> { + let (request_id, result) = self.io_thread.try_recv()?; + if let Some(index) = self + .request_queue + .iter() + .position(|(id, _)| *id == request_id) + { + let (_, client_id) = self.request_queue.swap_remove(index); + Some((client_id, result)) + } else { + None + } + } + + fn cancel(&mut self, client_id: ClientId) { + let mut index = 0; + while index < self.request_queue.len() { + if self.request_queue[index].1 == client_id { + self.request_queue.swap_remove(index); + } else { + index += 1; + } + } + } +} + pub struct NetworkLayer { listener: TcpListener, server: HWServer, @@ -242,6 +297,8 @@ pending_cache: Vec<(ClientId, NetworkClientState)>, #[cfg(feature = "tls-connections")] ssl: ServerSsl, + #[cfg(feature = "official-server")] + io: IoLayer, } impl NetworkLayer { @@ -259,6 +316,8 @@ pending_cache, #[cfg(feature = "tls-connections")] ssl: NetworkLayer::create_ssl_context(), + #[cfg(feature = "official-server")] + io: IoLayer::new(), } } @@ -283,10 +342,15 @@ pub fn register_server(&self, poll: &Poll) -> io::Result<()> { poll.register( &self.listener, - utils::SERVER, + utils::SERVER_TOKEN, Ready::readable(), PollOpt::edge(), - ) + )?; + + #[cfg(feature = "official-server")] + self.io.io_thread.register_rx(poll, utils::IO_TOKEN)?; + + Ok(()) } fn deregister_client(&mut self, poll: &Poll, id: ClientId) { @@ -299,6 +363,8 @@ } if client_exists { self.clients.remove(id); + #[cfg(feature = "official-server")] + self.io.cancel(id); } } @@ -326,7 +392,7 @@ client_id } - fn flush_server_messages(&mut self, mut response: handlers::Response, poll: &Poll) { + fn handle_response(&mut self, mut response: handlers::Response, poll: &Poll) { debug!("{} pending server messages", response.len()); let output = response.extract_messages(&mut self.server); for (clients, message) in output { @@ -344,6 +410,22 @@ for client_id in response.extract_removed_clients() { self.deregister_client(poll, client_id); } + + #[cfg(feature = "official-server")] + { + let client_id = response.client_id(); + for task in response.extract_io_tasks() { + self.io.send(client_id, task); + } + } + } + + #[cfg(feature = "official-server")] + pub fn handle_io_result(&mut self) { + if let Some((client_id, result)) = self.io.try_recv() { + let mut response = handlers::Response::new(client_id); + handlers::handle_io_result(&mut self.server, client_id, &mut response, result); + } } fn create_client_socket(&self, socket: TcpStream) -> io::Result { @@ -381,7 +463,7 @@ handlers::handle_client_accept(&mut self.server, client_id, &mut response); if !response.is_empty() { - self.flush_server_messages(response, poll); + self.handle_response(response, poll); } Ok(()) @@ -438,7 +520,7 @@ } if !response.is_empty() { - self.flush_server_messages(response, poll); + self.handle_response(response, poll); } Ok(()) @@ -469,7 +551,7 @@ self.deregister_client(poll, client_id); let mut response = handlers::Response::new(client_id); handlers::handle_client_loss(&mut self.server, client_id, &mut response); - self.flush_server_messages(response, poll); + self.handle_response(response, poll); Ok(()) }