Implement ipc queue which takes care of message ordering and timestamps
authorunC0Rr
Thu, 25 Jul 2019 15:18:00 +0200
changeset 15265 07e909ba4203
parent 15264 7515ae6010bb
child 15266 b58f98bbc120
Implement ipc queue which takes care of message ordering and timestamps
rust/lib-hedgewars-engine/Cargo.toml
rust/lib-hedgewars-engine/src/instance.rs
rust/lib-hedgewars-engine/src/ipc.rs
rust/lib-hedgewars-engine/src/ipc/channel.rs
rust/lib-hedgewars-engine/src/ipc/queue.rs
rust/lib-hedgewars-engine/src/lib.rs
--- a/rust/lib-hedgewars-engine/Cargo.toml	Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/Cargo.toml	Thu Jul 25 15:18:00 2019 +0200
@@ -9,6 +9,7 @@
 netbuf = "0.4"
 itertools = "0.8"
 png = "0.13"
+queues = "1.1"
 
 fpnum = { path = "../fpnum" }
 land2d = { path = "../land2d" }
--- a/rust/lib-hedgewars-engine/src/instance.rs	Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/src/instance.rs	Thu Jul 25 15:18:00 2019 +0200
@@ -6,11 +6,12 @@
 use integral_geometry::{Point, Rect, Size};
 use landgen::outline_template::OutlineTemplate;
 
-use super::{ipc::IPC, world::World};
+use super::{ipc::*, world::World};
 
 pub struct EngineInstance {
     pub world: World,
-    pub ipc: IPC,
+    pub ipc_channel: Channel,
+    ipc_queue: MessagesQueue,
 }
 
 impl EngineInstance {
@@ -34,7 +35,8 @@
 
         Self {
             world,
-            ipc: IPC::new(),
+            ipc_channel: Channel::new(),
+            ipc_queue: MessagesQueue::new(QueueChatStrategy::LocalGame),
         }
     }
 
@@ -57,7 +59,11 @@
     }
 
     pub fn process_ipc_queue(&mut self) {
-        let messages: Vec<EngineMessage> = self.ipc.iter().collect();
+        for message in self.ipc_channel.iter() {
+            self.ipc_queue.push(message);
+        }
+
+        let messages: Vec<EngineMessage> = self.ipc_queue.iter(0).collect();
 
         for message in messages {
             println!("Processing message: {:?}", message);
--- a/rust/lib-hedgewars-engine/src/ipc.rs	Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/src/ipc.rs	Thu Jul 25 15:18:00 2019 +0200
@@ -1,67 +1,5 @@
-use hedgewars_engine_messages::{messages::*, parser::extract_message};
-use netbuf::*;
-use std::io::*;
-
-pub struct IPC {
-    in_buffer: Buf,
-    out_buffer: Buf,
-}
-
-impl IPC {
-    pub fn new() -> Self {
-        Self {
-            in_buffer: Buf::new(),
-            out_buffer: Buf::new(),
-        }
-    }
-
-    pub fn send_message(&mut self, message: &EngineMessage) {
-        self.out_buffer.write(&message.to_bytes()).unwrap();
-    }
-
-    pub fn iter(&mut self) -> IPCMessagesIterator {
-        IPCMessagesIterator::new(self)
-    }
-}
-
-impl Write for IPC {
-    fn write(&mut self, buf: &[u8]) -> Result<usize> {
-        self.in_buffer.write(buf)
-    }
+mod channel;
+mod queue;
 
-    fn flush(&mut self) -> Result<()> {
-        self.in_buffer.flush()
-    }
-}
-
-impl Read for IPC {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        let read_bytes = self.out_buffer.as_ref().read(buf)?;
-
-        self.out_buffer.consume(read_bytes);
-
-        Ok(read_bytes)
-    }
-}
-
-pub struct IPCMessagesIterator<'a> {
-    ipc: &'a mut IPC,
-}
-
-impl<'a> IPCMessagesIterator<'a> {
-    pub fn new(ipc: &'a mut IPC) -> Self {
-        Self { ipc }
-    }
-}
-
-impl<'a> Iterator for IPCMessagesIterator<'a> {
-    type Item = EngineMessage;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let (consumed, message) = extract_message(&self.ipc.in_buffer[..])?;
-
-        self.ipc.in_buffer.consume(consumed);
-
-        Some(message)
-    }
-}
+pub use self::channel::*;
+pub use self::queue::*;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/lib-hedgewars-engine/src/ipc/channel.rs	Thu Jul 25 15:18:00 2019 +0200
@@ -0,0 +1,67 @@
+use hedgewars_engine_messages::{messages::*, parser::extract_message};
+use netbuf::*;
+use std::io::*;
+
+pub struct Channel {
+    in_buffer: Buf,
+    out_buffer: Buf,
+}
+
+impl Channel {
+    pub fn new() -> Self {
+        Self {
+            in_buffer: Buf::new(),
+            out_buffer: Buf::new(),
+        }
+    }
+
+    pub fn send_message(&mut self, message: &EngineMessage) {
+        self.out_buffer.write(&message.to_bytes()).unwrap();
+    }
+
+    pub fn iter(&mut self) -> IPCMessagesIterator {
+        IPCMessagesIterator::new(self)
+    }
+}
+
+impl Write for Channel {
+    fn write(&mut self, buf: &[u8]) -> Result<usize> {
+        self.in_buffer.write(buf)
+    }
+
+    fn flush(&mut self) -> Result<()> {
+        self.in_buffer.flush()
+    }
+}
+
+impl Read for Channel {
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        let read_bytes = self.out_buffer.as_ref().read(buf)?;
+
+        self.out_buffer.consume(read_bytes);
+
+        Ok(read_bytes)
+    }
+}
+
+pub struct IPCMessagesIterator<'a> {
+    ipc: &'a mut Channel,
+}
+
+impl<'a> IPCMessagesIterator<'a> {
+    pub fn new(ipc: &'a mut Channel) -> Self {
+        Self { ipc }
+    }
+}
+
+impl<'a> Iterator for IPCMessagesIterator<'a> {
+    type Item = EngineMessage;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let (consumed, message) = extract_message(&self.ipc.in_buffer[..])?;
+
+        self.ipc.in_buffer.consume(consumed);
+
+        Some(message)
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/lib-hedgewars-engine/src/ipc/queue.rs	Thu Jul 25 15:18:00 2019 +0200
@@ -0,0 +1,115 @@
+use hedgewars_engine_messages::{
+    messages::EngineMessage::*, messages::SyncedEngineMessage::*,
+    messages::UnsyncedEngineMessage::*, messages::*,
+};
+use queues::*;
+
+#[derive(PartialEq)]
+pub enum QueueChatStrategy {
+    NetworkGame,
+    LocalGame,
+}
+
+pub struct MessagesQueue {
+    strategy: QueueChatStrategy,
+    hi_ticks: u32,
+    unordered: Queue<EngineMessage>,
+    ordered: Queue<EngineMessage>,
+}
+
+impl MessagesQueue {
+    pub fn new(strategy: QueueChatStrategy) -> Self {
+        MessagesQueue {
+            strategy,
+            hi_ticks: 0,
+            unordered: queue![],
+            ordered: queue![],
+        }
+    }
+
+    fn is_unordered(&self, message: &EngineMessage) -> bool {
+        match message {
+            Unordered(_) => true,
+            Unsynced(HogSay(_)) | Unsynced(ChatMessage(_)) | Unsynced(TeamMessage(_)) => {
+                self.strategy == QueueChatStrategy::NetworkGame
+            }
+            _ => false,
+        }
+    }
+
+    pub fn push(&mut self, engine_message: EngineMessage) {
+        if self.is_unordered(&engine_message) {
+            self.unordered.add(engine_message).unwrap();
+        } else if let Synced(TimeWrap, timestamp) = engine_message {
+            self.ordered
+                .add(Synced(TimeWrap, timestamp + self.hi_ticks))
+                .unwrap();
+            self.hi_ticks += 65536;
+        } else if let Synced(message, timestamp) = engine_message {
+            self.ordered
+                .add(Synced(message, timestamp + self.hi_ticks))
+                .unwrap();
+        } else {
+            self.ordered.add(engine_message).unwrap();
+        }
+    }
+
+    pub fn pop(&mut self, timestamp: u32) -> Option<EngineMessage> {
+        if let Ok(message) = self.unordered.remove() {
+            Some(message)
+        } else if let Ok(Synced(_, message_timestamp)) = self.ordered.peek() {
+            if message_timestamp == timestamp {
+                self.ordered.remove().ok()
+            } else {
+                None
+            }
+        } else {
+            self.ordered.remove().ok()
+        }
+    }
+
+    pub fn iter(&mut self, timestamp: u32) -> MessagesQueueIterator {
+        MessagesQueueIterator {
+            timestamp,
+            queue: self,
+        }
+    }
+}
+
+pub struct MessagesQueueIterator<'a> {
+    timestamp: u32,
+    queue: &'a mut MessagesQueue,
+}
+
+impl<'a> Iterator for MessagesQueueIterator<'a> {
+    type Item = EngineMessage;
+
+    fn next(&mut self) -> Option<EngineMessage> {
+        self.queue.pop(self.timestamp)
+    }
+}
+
+#[test]
+fn queue_order() {
+    use hedgewars_engine_messages::messages::UnorderedEngineMessage::*;
+
+    let mut queue = MessagesQueue::new(QueueChatStrategy::LocalGame);
+
+    queue.push(Synced(Skip, 1));
+    queue.push(Unsynced(ChatMessage("hi".to_string())));
+    queue.push(Synced(TimeWrap, 65535));
+    queue.push(Unordered(Ping));
+    queue.push(Synced(Skip, 2));
+
+    let zero_tick: Vec<EngineMessage> = queue.iter(0).collect();
+    assert_eq!(zero_tick, vec![Unordered(Ping)]);
+    assert_eq!(queue.pop(1), Some(Synced(Skip, 1)));
+    assert_eq!(queue.pop(1), Some(Unsynced(ChatMessage("hi".to_string()))));
+    assert_eq!(queue.pop(1), None);
+    assert_eq!(queue.pop(2), None);
+    assert_eq!(queue.pop(65535), Some(Synced(TimeWrap, 65535)));
+    assert_eq!(queue.pop(65535), None);
+    assert_eq!(queue.pop(65538), Some(Synced(Skip, 65538)));
+    assert_eq!(queue.pop(65538), None);
+    assert_eq!(queue.pop(65539), None);
+}
--- a/rust/lib-hedgewars-engine/src/lib.rs	Thu Jul 25 14:23:25 2019 +0200
+++ b/rust/lib-hedgewars-engine/src/lib.rs	Thu Jul 25 15:18:00 2019 +0200
@@ -118,7 +118,7 @@
 pub extern "C" fn send_ipc(engine_state: &mut EngineInstance, buf: *const u8, size: usize) {
     unsafe {
         (*engine_state)
-            .ipc
+            .ipc_channel
             .write(std::slice::from_raw_parts(buf, size))
             .unwrap();
     }
@@ -128,7 +128,7 @@
 pub extern "C" fn read_ipc(engine_state: &mut EngineInstance, buf: *mut u8, size: usize) -> usize {
     unsafe {
         (*engine_state)
-            .ipc
+            .ipc_channel
             .read(std::slice::from_raw_parts_mut(buf, size))
             .unwrap_or(0)
     }