# HG changeset patch # User unc0rr # Date 1623169906 -7200 # Node ID d97ea528ce958e6339bdb6091f7a86efa49fffdf # Parent acf70c44065bdf3e45547caea4a82a0049eba239 Add main module for irc bot which serves as an irc<>rabbitmq relay diff -r acf70c44065b -r d97ea528ce95 .hgignore --- a/.hgignore Fri May 28 15:06:39 2021 +0200 +++ b/.hgignore Tue Jun 08 18:31:46 2021 +0200 @@ -95,3 +95,4 @@ *.so *_autogen build/ +target/ \ No newline at end of file diff -r acf70c44065b -r d97ea528ce95 tools/ubot/Cargo.toml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/ubot/Cargo.toml Tue Jun 08 18:31:46 2021 +0200 @@ -0,0 +1,15 @@ +[package] +name = "ubot" +version = "0.1.0" +authors = ["Andrey Korotaev "] +edition = "2018" + +[dependencies] +tokio-amqp = "1.0" +lapin = "1.7" +tokio = {version="1.6", features = ["full"]} +irc = "0.15" +anyhow = "1.0" +futures = "0.3" +url = "2.2" +rand = "0.8" diff -r acf70c44065b -r d97ea528ce95 tools/ubot/src/main.rs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/ubot/src/main.rs Tue Jun 08 18:31:46 2021 +0200 @@ -0,0 +1,160 @@ +use lapin::{ + message::Delivery, options::*, types::FieldTable, BasicProperties, Connection, + ConnectionProperties, +}; +use tokio_amqp::*; + +use futures::prelude::*; +use irc::client::prelude::*; + +use anyhow::{bail, Result as AHResult}; + +use url::Url; + +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; + +fn url2irc_config(url: &str) -> AHResult { + let url = Url::parse(url)?; + + if url.scheme() != "irc" { + bail!("Expected 'irc' scheme") + } + + Ok(Config { + nickname: Some(url.username().to_owned()), + nick_password: url.password().map(|s| s.to_owned()), + server: url.host_str().map(|s| s.to_owned()), + port: url.port(), + channels: vec![format!("#{}", &url.path()[1..])], + //use_mock_connection: true, + ..Config::default() + }) +} + +fn random_string(size: usize) -> String { + thread_rng() + .sample_iter(&Alphanumeric) + .take(size) + .map(char::from) + .collect() +} + +async fn handle_irc(pub_channel: &lapin::Channel, irc_message: &Message) -> AHResult<()> { + if let Command::PRIVMSG(msgtarget, message) = &irc_message.command { + let target = irc_message + .response_target() + .expect("Really expected PRIVMSG would have a source"); + let target = if target.starts_with('#') { + &target[1..] + } else { + &target + }; + + if message.starts_with("!") { + if let Some((cmd, param)) = message.split_once(' ') { + pub_channel + .basic_publish( + "irc", + &format!("cmd.{}.{}", &cmd[1..], target), + BasicPublishOptions::default(), + format!("{}\n{}", msgtarget, param).as_bytes().to_vec(), + BasicProperties::default(), + ) + .await?; + } else { + pub_channel + .basic_publish( + "irc", + &format!("cmd.{}.{}", &message[1..], target), + BasicPublishOptions::default(), + msgtarget.as_bytes().to_vec(), + BasicProperties::default(), + ) + .await?; + } + } else { + pub_channel + .basic_publish( + "irc", + &format!("msg.{}", target), + BasicPublishOptions::default(), + format!("{}\n{}", msgtarget, message).as_bytes().to_vec(), + BasicProperties::default(), + ) + .await?; + } + } + + Ok(()) +} + +async fn handle_amqp( + irc_client: &mut Client, + irc_channel: &str, + delivery: Delivery, +) -> AHResult<()> { + let message = String::from_utf8(delivery.data)?; + Ok(irc_client.send_privmsg(irc_channel, message)?) +} + +#[tokio::main] +async fn main() -> AHResult<()> { + let amqp_url = std::env::var("AMQP_URL").expect("expected AMQP_URL env variabe"); + let irc_url = std::env::var("IRC_URL").expect("expected IRC_URL env variabe"); + let conn = Connection::connect(&amqp_url, ConnectionProperties::default().with_tokio()).await?; + + let pub_channel = conn.create_channel().await?; + let sub_channel = conn.create_channel().await?; + + let irc_config = url2irc_config(&irc_url)?; + let irc_channel = irc_config.channels[0].to_owned(); + let mut irc_client = Client::from_config(irc_config).await?; + let mut irc_stream = irc_client.stream()?; + irc_client.identify()?; + + let queue = sub_channel + .queue_declare( + &random_string(32), + QueueDeclareOptions { + exclusive: true, + auto_delete: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await?; + + sub_channel + .queue_bind( + queue.name().as_str(), + "irc", + &format!("say.{}", &irc_channel[1..]), + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + + let mut subscriber = sub_channel + .basic_consume( + queue.name().as_str(), + &random_string(32), + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + loop { + tokio::select! { + Some(irc_message) = irc_stream.next() => handle_irc(&pub_channel, &irc_message?).await?, + Some(amqp_message) = subscriber.next() => { + let (_, delivery) = amqp_message.expect("error in consumer"); + delivery + .ack(BasicAckOptions::default()) + .await?; + + handle_amqp(&mut irc_client, &irc_channel, delivery).await? + } + } + } +}