--- a/rust/lib-hedgewars-engine/Cargo.toml Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/Cargo.toml Thu Jul 25 15:18:00 2019 +0200
@@ -9,6 +9,7 @@
netbuf = "0.4"
itertools = "0.8"
png = "0.13"
+queues = "1.1"
fpnum = { path = "../fpnum" }
land2d = { path = "../land2d" }
--- a/rust/lib-hedgewars-engine/src/instance.rs Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/src/instance.rs Thu Jul 25 15:18:00 2019 +0200
@@ -6,11 +6,12 @@
use integral_geometry::{Point, Rect, Size};
use landgen::outline_template::OutlineTemplate;
-use super::{ipc::IPC, world::World};
+use super::{ipc::*, world::World};
pub struct EngineInstance {
pub world: World,
- pub ipc: IPC,
+ pub ipc_channel: Channel,
+ ipc_queue: MessagesQueue,
}
impl EngineInstance {
@@ -34,7 +35,8 @@
Self {
world,
- ipc: IPC::new(),
+ ipc_channel: Channel::new(),
+ ipc_queue: MessagesQueue::new(QueueChatStrategy::LocalGame),
}
}
@@ -57,7 +59,11 @@
}
pub fn process_ipc_queue(&mut self) {
- let messages: Vec<EngineMessage> = self.ipc.iter().collect();
+ for message in self.ipc_channel.iter() {
+ self.ipc_queue.push(message);
+ }
+
+ let messages: Vec<EngineMessage> = self.ipc_queue.iter(0).collect();
for message in messages {
println!("Processing message: {:?}", message);
--- a/rust/lib-hedgewars-engine/src/ipc.rs Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/src/ipc.rs Thu Jul 25 15:18:00 2019 +0200
@@ -1,67 +1,5 @@
-use hedgewars_engine_messages::{messages::*, parser::extract_message};
-use netbuf::*;
-use std::io::*;
-
-pub struct IPC {
- in_buffer: Buf,
- out_buffer: Buf,
-}
-
-impl IPC {
- pub fn new() -> Self {
- Self {
- in_buffer: Buf::new(),
- out_buffer: Buf::new(),
- }
- }
-
- pub fn send_message(&mut self, message: &EngineMessage) {
- self.out_buffer.write(&message.to_bytes()).unwrap();
- }
-
- pub fn iter(&mut self) -> IPCMessagesIterator {
- IPCMessagesIterator::new(self)
- }
-}
-
-impl Write for IPC {
- fn write(&mut self, buf: &[u8]) -> Result<usize> {
- self.in_buffer.write(buf)
- }
+mod channel;
+mod queue;
- fn flush(&mut self) -> Result<()> {
- self.in_buffer.flush()
- }
-}
-
-impl Read for IPC {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- let read_bytes = self.out_buffer.as_ref().read(buf)?;
-
- self.out_buffer.consume(read_bytes);
-
- Ok(read_bytes)
- }
-}
-
-pub struct IPCMessagesIterator<'a> {
- ipc: &'a mut IPC,
-}
-
-impl<'a> IPCMessagesIterator<'a> {
- pub fn new(ipc: &'a mut IPC) -> Self {
- Self { ipc }
- }
-}
-
-impl<'a> Iterator for IPCMessagesIterator<'a> {
- type Item = EngineMessage;
-
- fn next(&mut self) -> Option<Self::Item> {
- let (consumed, message) = extract_message(&self.ipc.in_buffer[..])?;
-
- self.ipc.in_buffer.consume(consumed);
-
- Some(message)
- }
-}
+pub use self::channel::*;
+pub use self::queue::*;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/lib-hedgewars-engine/src/ipc/channel.rs Thu Jul 25 15:18:00 2019 +0200
@@ -0,0 +1,67 @@
+use hedgewars_engine_messages::{messages::*, parser::extract_message};
+use netbuf::*;
+use std::io::*;
+
+pub struct Channel {
+ in_buffer: Buf,
+ out_buffer: Buf,
+}
+
+impl Channel {
+ pub fn new() -> Self {
+ Self {
+ in_buffer: Buf::new(),
+ out_buffer: Buf::new(),
+ }
+ }
+
+ pub fn send_message(&mut self, message: &EngineMessage) {
+ self.out_buffer.write(&message.to_bytes()).unwrap();
+ }
+
+ pub fn iter(&mut self) -> IPCMessagesIterator {
+ IPCMessagesIterator::new(self)
+ }
+}
+
+impl Write for Channel {
+ fn write(&mut self, buf: &[u8]) -> Result<usize> {
+ self.in_buffer.write(buf)
+ }
+
+ fn flush(&mut self) -> Result<()> {
+ self.in_buffer.flush()
+ }
+}
+
+impl Read for Channel {
+ fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ let read_bytes = self.out_buffer.as_ref().read(buf)?;
+
+ self.out_buffer.consume(read_bytes);
+
+ Ok(read_bytes)
+ }
+}
+
+pub struct IPCMessagesIterator<'a> {
+ ipc: &'a mut Channel,
+}
+
+impl<'a> IPCMessagesIterator<'a> {
+ pub fn new(ipc: &'a mut Channel) -> Self {
+ Self { ipc }
+ }
+}
+
+impl<'a> Iterator for IPCMessagesIterator<'a> {
+ type Item = EngineMessage;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let (consumed, message) = extract_message(&self.ipc.in_buffer[..])?;
+
+ self.ipc.in_buffer.consume(consumed);
+
+ Some(message)
+ }
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/lib-hedgewars-engine/src/ipc/queue.rs Thu Jul 25 15:18:00 2019 +0200
@@ -0,0 +1,115 @@
+use hedgewars_engine_messages::{
+ messages::EngineMessage::*, messages::SyncedEngineMessage::*,
+ messages::UnsyncedEngineMessage::*, messages::*,
+};
+use queues::*;
+
+#[derive(PartialEq)]
+pub enum QueueChatStrategy {
+ NetworkGame,
+ LocalGame,
+}
+
+pub struct MessagesQueue {
+ strategy: QueueChatStrategy,
+ hi_ticks: u32,
+ unordered: Queue<EngineMessage>,
+ ordered: Queue<EngineMessage>,
+}
+
+impl MessagesQueue {
+ pub fn new(strategy: QueueChatStrategy) -> Self {
+ MessagesQueue {
+ strategy,
+ hi_ticks: 0,
+ unordered: queue![],
+ ordered: queue![],
+ }
+ }
+
+ fn is_unordered(&self, message: &EngineMessage) -> bool {
+ match message {
+ Unordered(_) => true,
+ Unsynced(HogSay(_)) | Unsynced(ChatMessage(_)) | Unsynced(TeamMessage(_)) => {
+ self.strategy == QueueChatStrategy::NetworkGame
+ }
+ _ => false,
+ }
+ }
+
+ pub fn push(&mut self, engine_message: EngineMessage) {
+ if self.is_unordered(&engine_message) {
+ self.unordered.add(engine_message).unwrap();
+ } else if let Synced(TimeWrap, timestamp) = engine_message {
+ self.ordered
+ .add(Synced(TimeWrap, timestamp + self.hi_ticks))
+ .unwrap();
+ self.hi_ticks += 65536;
+ } else if let Synced(message, timestamp) = engine_message {
+ self.ordered
+ .add(Synced(message, timestamp + self.hi_ticks))
+ .unwrap();
+ } else {
+ self.ordered.add(engine_message).unwrap();
+ }
+ }
+
+ pub fn pop(&mut self, timestamp: u32) -> Option<EngineMessage> {
+ if let Ok(message) = self.unordered.remove() {
+ Some(message)
+ } else if let Ok(Synced(_, message_timestamp)) = self.ordered.peek() {
+ if message_timestamp == timestamp {
+ self.ordered.remove().ok()
+ } else {
+ None
+ }
+ } else {
+ self.ordered.remove().ok()
+ }
+ }
+
+ pub fn iter(&mut self, timestamp: u32) -> MessagesQueueIterator {
+ MessagesQueueIterator {
+ timestamp,
+ queue: self,
+ }
+ }
+}
+
+pub struct MessagesQueueIterator<'a> {
+ timestamp: u32,
+ queue: &'a mut MessagesQueue,
+}
+
+impl<'a> Iterator for MessagesQueueIterator<'a> {
+ type Item = EngineMessage;
+
+ fn next(&mut self) -> Option<EngineMessage> {
+ self.queue.pop(self.timestamp)
+ }
+}
+
+#[test]
+fn queue_order() {
+ use hedgewars_engine_messages::messages::UnorderedEngineMessage::*;
+
+ let mut queue = MessagesQueue::new(QueueChatStrategy::LocalGame);
+
+ queue.push(Synced(Skip, 1));
+ queue.push(Unsynced(ChatMessage("hi".to_string())));
+ queue.push(Synced(TimeWrap, 65535));
+ queue.push(Unordered(Ping));
+ queue.push(Synced(Skip, 2));
+
+ let zero_tick: Vec<EngineMessage> = queue.iter(0).collect();
+ assert_eq!(zero_tick, vec![Unordered(Ping)]);
+ assert_eq!(queue.pop(1), Some(Synced(Skip, 1)));
+ assert_eq!(queue.pop(1), Some(Unsynced(ChatMessage("hi".to_string()))));
+ assert_eq!(queue.pop(1), None);
+ assert_eq!(queue.pop(2), None);
+ assert_eq!(queue.pop(65535), Some(Synced(TimeWrap, 65535)));
+ assert_eq!(queue.pop(65535), None);
+ assert_eq!(queue.pop(65538), Some(Synced(Skip, 65538)));
+ assert_eq!(queue.pop(65538), None);
+ assert_eq!(queue.pop(65539), None);
+}
--- a/rust/lib-hedgewars-engine/src/lib.rs Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/src/lib.rs Thu Jul 25 15:18:00 2019 +0200
@@ -118,7 +118,7 @@
pub extern "C" fn send_ipc(engine_state: &mut EngineInstance, buf: *const u8, size: usize) {
unsafe {
(*engine_state)
- .ipc
+ .ipc_channel
.write(std::slice::from_raw_parts(buf, size))
.unwrap();
}
@@ -128,7 +128,7 @@
pub extern "C" fn read_ipc(engine_state: &mut EngineInstance, buf: *mut u8, size: usize) -> usize {
unsafe {
(*engine_state)
- .ipc
+ .ipc_channel
.read(std::slice::from_raw_parts_mut(buf, size))
.unwrap_or(0)
}