- Run engine and socket listener in parallel to avoid ping timeouts
authorunc0rr
Fri, 28 Oct 2022 21:12:01 +0200
changeset 15913 fe519de9c270
parent 15912 5e8d2a8eb473
child 15916 0cd6996cd4c8
- Run engine and socket listener in parallel to avoid ping timeouts - Update dependencies
rust/hedgewars-checker/Cargo.toml
rust/hedgewars-checker/src/main.rs
--- a/rust/hedgewars-checker/Cargo.toml	Mon Oct 24 10:07:35 2022 -0400
+++ b/rust/hedgewars-checker/Cargo.toml	Fri Oct 28 21:12:01 2022 +0200
@@ -5,9 +5,9 @@
 edition = "2018"
 
 [dependencies]
-rust-ini = "0.17"
-dirs = "3.0"
-argparse = "0.2.2"
+rust-ini = "0.18"
+dirs = "4"
+argparse = "0.2"
 log = "0.4"
 stderrlog = "0.5"
 netbuf = "0.4"
@@ -15,4 +15,4 @@
 base64 = "0.13"
 hedgewars-network-protocol = { path = "../hedgewars-network-protocol" }
 anyhow = "1.0"
-tokio = {version="1.6", features = ["full"]}
+tokio = {version="1", features = ["full"]}
--- a/rust/hedgewars-checker/src/main.rs	Mon Oct 24 10:07:35 2022 -0400
+++ b/rust/hedgewars-checker/src/main.rs	Fri Oct 28 21:12:01 2022 +0200
@@ -7,7 +7,7 @@
 use log::{debug, info, warn};
 use netbuf::Buf;
 use std::{io::Write, str::FromStr};
-use tokio::{io, io::AsyncWriteExt, net::TcpStream, process::Command};
+use tokio::{io, io::AsyncWriteExt, net::TcpStream, process::Command, sync::mpsc};
 
 async fn check(executable: &str, data_prefix: &str, buffer: &[String]) -> Result<Vec<String>> {
     let mut replay = tempfile::NamedTempFile::new()?;
@@ -81,7 +81,7 @@
         }
     }
 
-    println!("Engine lines: {:?}", &result);
+    // println!("Engine lines: {:?}", &result);
 
     if result.len() > 0 {
         Ok(result)
@@ -90,12 +90,27 @@
     }
 }
 
+async fn check_loop(
+    executable: &str,
+    data_prefix: &str,
+    results_sender: mpsc::Sender<Result<Vec<String>>>,
+    mut replay_receiver: mpsc::Receiver<Vec<String>>,
+) -> Result<()> {
+    while let Some(replay) = replay_receiver.recv().await {
+        results_sender
+            .send(check(executable, data_prefix, &replay).await)
+            .await?;
+    }
+
+    Ok(())
+}
+
 async fn connect_and_run(
     username: &str,
     password: &str,
     protocol_number: u16,
-    executable: &str,
-    data_prefix: &str,
+    replay_sender: mpsc::Sender<Vec<String>>,
+    mut results_receiver: mpsc::Receiver<Result<Vec<String>>>,
 ) -> Result<()> {
     info!("Connecting...");
 
@@ -103,26 +118,15 @@
 
     let mut buf = Buf::new();
 
-    let mut replay_lines: Option<Vec<String>> = None;
-
     loop {
-        let r = if let Some(ref lines) = replay_lines {
-            let r = tokio::select! {
-                _ = stream.readable() => None,
-                r = check(executable, data_prefix, &lines) => Some(r)
-            };
-
-            r
-        } else {
-            stream.readable().await?;
-            None
+        let r = tokio::select! {
+            _ = stream.readable() => None,
+            r = results_receiver.recv() => r
         };
 
-        println!("Loop: {:?}", &r);
+        //println!("Loop: {:?}", &r);
 
         if let Some(execution_result) = r {
-            replay_lines = None;
-
             match execution_result {
                 Ok(result) => {
                     info!("Checked");
@@ -173,7 +177,7 @@
             let tail_len = tail.len();
             buf.consume(buf.len() - tail_len);
 
-            println!("Message from server: {:?}", &msg);
+            // println!("Message from server: {:?}", &msg);
 
             match msg {
                 Connected(_, _) => {
@@ -202,7 +206,7 @@
                 }
                 Replay(lines) => {
                     info!("Got a replay");
-                    replay_lines = Some(lines);
+                    replay_sender.send(lines).await?;
                 }
                 Bye(message) => {
                     warn!("Received BYE: {}", message);
@@ -279,5 +283,20 @@
 
     info!("Using protocol number {}", protocol_number);
 
-    connect_and_run(&username, &password, protocol_number, &exe, &prefix).await
+    let (replay_sender, replay_receiver) = mpsc::channel(1);
+    let (results_sender, results_receiver) = mpsc::channel(1);
+
+    let (network_result, checker_result) = tokio::join!(
+        connect_and_run(
+            &username,
+            &password,
+            protocol_number,
+            replay_sender,
+            results_receiver
+        ),
+        check_loop(&exe, &prefix, results_sender, replay_receiver)
+    );
+
+    network_result?;
+    checker_result
 }