author | alfadur |
Tue, 01 Feb 2022 02:23:35 +0300 | |
changeset 15853 | 7d0f747afcb8 |
parent 15826 | 747278149393 |
child 15854 | a4d505a32879 |
permissions | -rw-r--r-- |
15853 | 1 |
use bytes::{Buf, Bytes}; |
2 |
use log::*; |
|
3 |
use slab::Slab; |
|
13414 | 4 |
use std::{ |
13415 | 5 |
collections::HashSet, |
14478 | 6 |
io, |
7 |
io::{Error, ErrorKind, Read, Write}, |
|
15853 | 8 |
iter::Iterator, |
14478 | 9 |
mem::{replace, swap}, |
10 |
net::{IpAddr, Ipv4Addr, SocketAddr}, |
|
15822 | 11 |
num::NonZeroU32, |
12 |
time::Duration, |
|
13 |
time::Instant, |
|
13414 | 14 |
}; |
15853 | 15 |
use tokio::{ |
16 |
io::AsyncReadExt, |
|
14478 | 17 |
net::{TcpListener, TcpStream}, |
15853 | 18 |
sync::mpsc::{channel, Receiver, Sender}, |
13414 | 19 |
}; |
13119 | 20 |
|
13666 | 21 |
use crate::{ |
15822 | 22 |
core::{ |
23 |
events::{TimedEvents, Timeout}, |
|
24 |
types::ClientId, |
|
25 |
}, |
|
15095 | 26 |
handlers, |
15542 | 27 |
handlers::{IoResult, IoTask, ServerState}, |
15826 | 28 |
protocol::ProtocolDecoder, |
13666 | 29 |
utils, |
13414 | 30 |
}; |
15853 | 31 |
use hedgewars_network_protocol::{ |
32 |
messages::HwServerMessage::Redirect, messages::*, parser::server_message, |
|
33 |
}; |
|
34 |
use tokio::io::AsyncWriteExt; |
|
14800
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14718
diff
changeset
|
35 |
|
15853 | 36 |
enum ClientUpdateData { |
37 |
Message(HwProtocolMessage), |
|
38 |
Error(String), |
|
39 |
} |
|
40 |
||
41 |
struct ClientUpdate { |
|
42 |
client_id: ClientId, |
|
43 |
data: ClientUpdateData, |
|
44 |
} |
|
13414 | 45 |
|
15853 | 46 |
struct ClientUpdateSender { |
47 |
client_id: ClientId, |
|
48 |
sender: Sender<ClientUpdate>, |
|
49 |
} |
|
13119 | 50 |
|
15853 | 51 |
impl ClientUpdateSender { |
52 |
async fn send(&mut self, data: ClientUpdateData) -> bool { |
|
53 |
self.sender |
|
54 |
.send(ClientUpdate { |
|
55 |
client_id: self.client_id, |
|
56 |
data, |
|
57 |
}) |
|
58 |
.await |
|
59 |
.is_ok() |
|
60 |
} |
|
61 |
} |
|
62 |
||
63 |
struct NetworkClient { |
|
64 |
id: ClientId, |
|
65 |
socket: TcpStream, |
|
66 |
receiver: Receiver<Bytes>, |
|
67 |
peer_addr: SocketAddr, |
|
68 |
decoder: ProtocolDecoder, |
|
13414 | 69 |
} |
70 |
||
15853 | 71 |
impl NetworkClient { |
72 |
fn new( |
|
73 |
id: ClientId, |
|
74 |
socket: TcpStream, |
|
75 |
peer_addr: SocketAddr, |
|
76 |
receiver: Receiver<Bytes>, |
|
77 |
) -> Self { |
|
78 |
Self { |
|
79 |
id, |
|
80 |
socket, |
|
81 |
peer_addr, |
|
82 |
receiver, |
|
83 |
decoder: ProtocolDecoder::new(), |
|
84 |
} |
|
85 |
} |
|
13119 | 86 |
|
15853 | 87 |
async fn read(&mut self) -> Option<HwProtocolMessage> { |
88 |
self.decoder.read_from(&mut self.socket).await |
|
89 |
} |
|
90 |
||
91 |
async fn write(&mut self, mut data: Bytes) -> bool { |
|
92 |
!data.has_remaining() || matches!(self.socket.write_buf(&mut data).await, Ok(n) if n > 0) |
|
93 |
} |
|
13773 | 94 |
|
15853 | 95 |
async fn run(mut self, sender: Sender<ClientUpdate>) { |
96 |
use ClientUpdateData::*; |
|
97 |
let mut sender = ClientUpdateSender { |
|
98 |
client_id: self.id, |
|
99 |
sender, |
|
100 |
}; |
|
101 |
||
102 |
loop { |
|
103 |
tokio::select! { |
|
104 |
server_message = self.receiver.recv() => { |
|
105 |
match server_message { |
|
106 |
Some(message) => if !self.write(message).await { |
|
107 |
sender.send(Error("Connection reset by peer".to_string())).await; |
|
108 |
break; |
|
109 |
} |
|
110 |
None => { |
|
111 |
break; |
|
112 |
} |
|
113 |
} |
|
114 |
} |
|
115 |
client_message = self.decoder.read_from(&mut self.socket) => { |
|
116 |
match client_message { |
|
117 |
Some(message) => { |
|
118 |
if !sender.send(Message(message)).await { |
|
119 |
break; |
|
120 |
} |
|
121 |
} |
|
122 |
None => { |
|
123 |
sender.send(Error("Connection reset by peer".to_string())).await; |
|
124 |
break; |
|
125 |
} |
|
126 |
} |
|
127 |
} |
|
128 |
} |
|
13773 | 129 |
} |
130 |
} |
|
131 |
} |
|
132 |
||
15853 | 133 |
pub struct NetworkLayer { |
134 |
listener: TcpListener, |
|
135 |
server_state: ServerState, |
|
136 |
clients: Slab<Sender<Bytes>>, |
|
13119 | 137 |
} |
138 |
||
15853 | 139 |
impl NetworkLayer { |
140 |
pub async fn run(&mut self) { |
|
141 |
let (update_tx, mut update_rx) = channel(128); |
|
13119 | 142 |
|
15853 | 143 |
loop { |
144 |
tokio::select! { |
|
145 |
Ok((stream, addr)) = self.listener.accept() => { |
|
146 |
if let Some(client) = self.create_client(stream, addr).await { |
|
147 |
tokio::spawn(client.run(update_tx.clone())); |
|
148 |
} |
|
149 |
} |
|
150 |
client_message = update_rx.recv(), if !self.clients.is_empty() => { |
|
151 |
use ClientUpdateData::*; |
|
152 |
match client_message { |
|
153 |
Some(ClientUpdate{ client_id, data: Message(message) } ) => { |
|
154 |
self.handle_message(client_id, message).await; |
|
155 |
} |
|
156 |
Some(ClientUpdate{ client_id, .. } ) => { |
|
157 |
let mut response = handlers::Response::new(client_id); |
|
158 |
handlers::handle_client_loss(&mut self.server_state, client_id, &mut response); |
|
159 |
self.handle_response(response).await; |
|
160 |
} |
|
161 |
None => unreachable!() |
|
162 |
} |
|
163 |
} |
|
13776 | 164 |
} |
165 |
} |
|
166 |
} |
|
167 |
||
15853 | 168 |
async fn create_client( |
169 |
&mut self, |
|
170 |
stream: TcpStream, |
|
171 |
addr: SocketAddr, |
|
172 |
) -> Option<NetworkClient> { |
|
173 |
let entry = self.clients.vacant_entry(); |
|
174 |
let client_id = entry.key(); |
|
175 |
let (tx, rx) = channel(16); |
|
176 |
entry.insert(tx); |
|
177 |
||
178 |
let client = NetworkClient::new(client_id, stream, addr, rx); |
|
13414 | 179 |
|
15853 | 180 |
info!("client {} ({}) added", client.id, client.peer_addr); |
181 |
||
182 |
let mut response = handlers::Response::new(client_id); |
|
183 |
||
184 |
let added = if let IpAddr::V4(addr) = client.peer_addr.ip() { |
|
185 |
handlers::handle_client_accept( |
|
186 |
&mut self.server_state, |
|
187 |
client_id, |
|
188 |
&mut response, |
|
189 |
addr.octets(), |
|
190 |
addr.is_loopback(), |
|
191 |
) |
|
192 |
} else { |
|
193 |
todo!("implement something") |
|
15822 | 194 |
}; |
195 |
||
15853 | 196 |
self.handle_response(response).await; |
13414 | 197 |
|
15853 | 198 |
if added { |
199 |
Some(client) |
|
14800
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14718
diff
changeset
|
200 |
} else { |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14718
diff
changeset
|
201 |
None |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14718
diff
changeset
|
202 |
} |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14718
diff
changeset
|
203 |
} |
f43ab2bd76ae
add a thread for internal server IO and implement account checking with it
alfadur
parents:
14718
diff
changeset
|
204 |
|
15853 | 205 |
async fn handle_message(&mut self, client_id: ClientId, message: HwProtocolMessage) { |
206 |
debug!("Handling message {:?} for client {}", message, client_id); |
|
207 |
let mut response = handlers::Response::new(client_id); |
|
208 |
handlers::handle(&mut self.server_state, client_id, &mut response, message); |
|
209 |
self.handle_response(response).await; |
|
13119 | 210 |
} |
211 |
||
15853 | 212 |
async fn handle_response(&mut self, mut response: handlers::Response) { |
14851
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
213 |
if response.is_empty() { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
214 |
return; |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
215 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
216 |
|
14693
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14692
diff
changeset
|
217 |
debug!("{} pending server messages", response.len()); |
15542 | 218 |
let output = response.extract_messages(&mut self.server_state.server); |
14693
6e6632068a33
Server action refactoring part 3 of N
alfadur <mail@none>
parents:
14692
diff
changeset
|
219 |
for (clients, message) in output { |
13419 | 220 |
debug!("Message {:?} to {:?}", message, clients); |
15853 | 221 |
Self::send_message(&mut self.clients, message, clients.iter().cloned()).await; |
13414 | 222 |
} |
14717 | 223 |
|
224 |
for client_id in response.extract_removed_clients() { |
|
15853 | 225 |
if self.clients.contains(client_id) { |
226 |
self.clients.remove(client_id); |
|
14824 | 227 |
} |
15853 | 228 |
info!("Client {} removed", client_id); |
15539 | 229 |
} |
14856 | 230 |
} |
231 |
||
15853 | 232 |
async fn send_message<I>( |
233 |
clients: &mut Slab<Sender<Bytes>>, |
|
234 |
message: HwServerMessage, |
|
235 |
to_clients: I, |
|
236 |
) where |
|
237 |
I: Iterator<Item = ClientId>, |
|
238 |
{ |
|
239 |
let msg_string = message.to_raw_protocol(); |
|
240 |
let bytes = Bytes::copy_from_slice(msg_string.as_bytes()); |
|
241 |
for client_id in to_clients { |
|
242 |
if let Some(client) = clients.get_mut(client_id) { |
|
243 |
if !client.send(bytes.clone()).await.is_ok() { |
|
244 |
clients.remove(client_id); |
|
13414 | 245 |
} |
14478 | 246 |
} |
13119 | 247 |
} |
13414 | 248 |
} |
13119 | 249 |
} |
14851
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
250 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
251 |
pub struct NetworkLayerBuilder { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
252 |
listener: Option<TcpListener>, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
253 |
clients_capacity: usize, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
254 |
rooms_capacity: usize, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
255 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
256 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
257 |
impl Default for NetworkLayerBuilder { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
258 |
fn default() -> Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
259 |
Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
260 |
clients_capacity: 1024, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
261 |
rooms_capacity: 512, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
262 |
listener: None, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
263 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
264 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
265 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
266 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
267 |
impl NetworkLayerBuilder { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
268 |
pub fn with_listener(self, listener: TcpListener) -> Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
269 |
Self { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
270 |
listener: Some(listener), |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
271 |
..self |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
272 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
273 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
274 |
|
15853 | 275 |
pub fn build(self) -> NetworkLayer { |
15542 | 276 |
let server_state = ServerState::new(self.clients_capacity, self.rooms_capacity); |
277 |
||
14851
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
278 |
let clients = Slab::with_capacity(self.clients_capacity); |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
279 |
|
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
280 |
NetworkLayer { |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
281 |
listener: self.listener.expect("No listener provided"), |
15542 | 282 |
server_state, |
14851
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
283 |
clients, |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
284 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
285 |
} |
8ddb5842fe0b
allow running plaintext and tls servers in parallel
alfadur
parents:
14828
diff
changeset
|
286 |
} |