tools/ubot/src/main.rs
changeset 15789 d97ea528ce95
child 15792 191e51179d1b
equal deleted inserted replaced
15788:acf70c44065b 15789:d97ea528ce95
       
     1 use lapin::{
       
     2     message::Delivery, options::*, types::FieldTable, BasicProperties, Connection,
       
     3     ConnectionProperties,
       
     4 };
       
     5 use tokio_amqp::*;
       
     6 
       
     7 use futures::prelude::*;
       
     8 use irc::client::prelude::*;
       
     9 
       
    10 use anyhow::{bail, Result as AHResult};
       
    11 
       
    12 use url::Url;
       
    13 
       
    14 use rand::distributions::Alphanumeric;
       
    15 use rand::{thread_rng, Rng};
       
    16 
       
    17 fn url2irc_config(url: &str) -> AHResult<Config> {
       
    18     let url = Url::parse(url)?;
       
    19 
       
    20     if url.scheme() != "irc" {
       
    21         bail!("Expected 'irc' scheme")
       
    22     }
       
    23 
       
    24     Ok(Config {
       
    25         nickname: Some(url.username().to_owned()),
       
    26         nick_password: url.password().map(|s| s.to_owned()),
       
    27         server: url.host_str().map(|s| s.to_owned()),
       
    28         port: url.port(),
       
    29         channels: vec![format!("#{}", &url.path()[1..])],
       
    30         //use_mock_connection: true,
       
    31         ..Config::default()
       
    32     })
       
    33 }
       
    34 
       
    35 fn random_string(size: usize) -> String {
       
    36     thread_rng()
       
    37         .sample_iter(&Alphanumeric)
       
    38         .take(size)
       
    39         .map(char::from)
       
    40         .collect()
       
    41 }
       
    42 
       
    43 async fn handle_irc(pub_channel: &lapin::Channel, irc_message: &Message) -> AHResult<()> {
       
    44     if let Command::PRIVMSG(msgtarget, message) = &irc_message.command {
       
    45         let target = irc_message
       
    46             .response_target()
       
    47             .expect("Really expected PRIVMSG would have a source");
       
    48         let target = if target.starts_with('#') {
       
    49             &target[1..]
       
    50         } else {
       
    51             &target
       
    52         };
       
    53 
       
    54         if message.starts_with("!") {
       
    55             if let Some((cmd, param)) = message.split_once(' ') {
       
    56                 pub_channel
       
    57                     .basic_publish(
       
    58                         "irc",
       
    59                         &format!("cmd.{}.{}", &cmd[1..], target),
       
    60                         BasicPublishOptions::default(),
       
    61                         format!("{}\n{}", msgtarget, param).as_bytes().to_vec(),
       
    62                         BasicProperties::default(),
       
    63                     )
       
    64                     .await?;
       
    65             } else {
       
    66                 pub_channel
       
    67                     .basic_publish(
       
    68                         "irc",
       
    69                         &format!("cmd.{}.{}", &message[1..], target),
       
    70                         BasicPublishOptions::default(),
       
    71                         msgtarget.as_bytes().to_vec(),
       
    72                         BasicProperties::default(),
       
    73                     )
       
    74                     .await?;
       
    75             }
       
    76         } else {
       
    77             pub_channel
       
    78                 .basic_publish(
       
    79                     "irc",
       
    80                     &format!("msg.{}", target),
       
    81                     BasicPublishOptions::default(),
       
    82                     format!("{}\n{}", msgtarget, message).as_bytes().to_vec(),
       
    83                     BasicProperties::default(),
       
    84                 )
       
    85                 .await?;
       
    86         }
       
    87     }
       
    88 
       
    89     Ok(())
       
    90 }
       
    91 
       
    92 async fn handle_amqp(
       
    93     irc_client: &mut Client,
       
    94     irc_channel: &str,
       
    95     delivery: Delivery,
       
    96 ) -> AHResult<()> {
       
    97     let message = String::from_utf8(delivery.data)?;
       
    98     Ok(irc_client.send_privmsg(irc_channel, message)?)
       
    99 }
       
   100 
       
   101 #[tokio::main]
       
   102 async fn main() -> AHResult<()> {
       
   103     let amqp_url = std::env::var("AMQP_URL").expect("expected AMQP_URL env variabe");
       
   104     let irc_url = std::env::var("IRC_URL").expect("expected IRC_URL env variabe");
       
   105     let conn = Connection::connect(&amqp_url, ConnectionProperties::default().with_tokio()).await?;
       
   106 
       
   107     let pub_channel = conn.create_channel().await?;
       
   108     let sub_channel = conn.create_channel().await?;
       
   109 
       
   110     let irc_config = url2irc_config(&irc_url)?;
       
   111     let irc_channel = irc_config.channels[0].to_owned();
       
   112     let mut irc_client = Client::from_config(irc_config).await?;
       
   113     let mut irc_stream = irc_client.stream()?;
       
   114     irc_client.identify()?;
       
   115 
       
   116     let queue = sub_channel
       
   117         .queue_declare(
       
   118             &random_string(32),
       
   119             QueueDeclareOptions {
       
   120                 exclusive: true,
       
   121                 auto_delete: true,
       
   122                 ..QueueDeclareOptions::default()
       
   123             },
       
   124             FieldTable::default(),
       
   125         )
       
   126         .await?;
       
   127 
       
   128     sub_channel
       
   129         .queue_bind(
       
   130             queue.name().as_str(),
       
   131             "irc",
       
   132             &format!("say.{}", &irc_channel[1..]),
       
   133             QueueBindOptions::default(),
       
   134             FieldTable::default(),
       
   135         )
       
   136         .await?;
       
   137 
       
   138     let mut subscriber = sub_channel
       
   139         .basic_consume(
       
   140             queue.name().as_str(),
       
   141             &random_string(32),
       
   142             BasicConsumeOptions::default(),
       
   143             FieldTable::default(),
       
   144         )
       
   145         .await?;
       
   146 
       
   147     loop {
       
   148         tokio::select! {
       
   149             Some(irc_message) = irc_stream.next() => handle_irc(&pub_channel, &irc_message?).await?,
       
   150             Some(amqp_message) = subscriber.next() => {
       
   151                 let (_, delivery) = amqp_message.expect("error in consumer");
       
   152                 delivery
       
   153                     .ack(BasicAckOptions::default())
       
   154                     .await?;
       
   155 
       
   156                     handle_amqp(&mut irc_client, &irc_channel, delivery).await?
       
   157             }
       
   158         }
       
   159     }
       
   160 }