diff -r d97ea528ce95 -r efe4e3290870 tools/ubot-plugins/url-bot-rs/src/bin/ubot-url-plugin.rs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/ubot-plugins/url-bot-rs/src/bin/ubot-url-plugin.rs Sat Jun 12 20:15:42 2021 +0200 @@ -0,0 +1,331 @@ +use url_bot_rs::config::Rtd; +use url_bot_rs::VERSION; +use url_bot_rs::{feat, http::resolve_url, param, plugins::TITLE_PLUGINS, tld::TLD}; + +use anyhow::Result as AHResult; +use atty::{is, Stream}; +use directories::ProjectDirs; +use docopt::Docopt; +use failure::Error; +use lazy_static::lazy_static; +use log::{error, info}; +use regex::Regex; +use reqwest::Url; +use serde_derive::Deserialize; +use std::collections::HashSet; +use std::path::PathBuf; +use stderrlog::{ColorChoice, Timestamp}; + +use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties}; +use tokio_amqp::*; + +use futures::prelude::*; + +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; + +use std::sync::mpsc; +use std::thread; + +// docopt usage string +const USAGE: &str = " +URL munching IRC bot. + +Usage: + ubot-url-plugin [options] [-v...] [--conf=PATH...] [--conf-dir=DIR...] + +Options: + -h --help Show this help message. + --version Print version. + -v --verbose Show extra information. + -t --timestamp Force timestamps. +"; + +#[derive(Debug, Deserialize, Default)] +pub struct Args { + flag_verbose: usize, + flag_conf: Vec, + flag_conf_dir: Vec, + flag_timestamp: bool, +} + +const MIN_VERBOSITY: usize = 2; + +#[derive(Debug, PartialEq)] +enum TitleResp { + Title(String), + Error(String), +} + +/// Run available plugins on a single URL, return the first successful title. +fn process_plugins(rtd: &Rtd, url: &Url) -> Option { + let result: String = TITLE_PLUGINS + .iter() + .filter(|p| p.check(&rtd.conf.plugins, url)) + .filter_map(|p| p.evaluate(&rtd, url).ok()) + .take(1) + .collect(); + + if result.is_empty() { + None + } else { + Some(result) + } +} + +/// find titles in a message and generate responses +fn process_titles(rtd: &Rtd, msg: &str) -> impl Iterator { + let mut responses: Vec = vec![]; + + let mut num_processed = 0; + let mut dedup_urls = HashSet::new(); + + // look at each space-separated message token + for token in msg.split_whitespace() { + // the token must not contain unsafe characters + if contains_unsafe_chars(token) { + continue; + } + + // get a full URL for tokens without a scheme + let maybe_token = if feat!(rtd, partial_urls) { + add_scheme_for_tld(token) + } else { + None + }; + + let token = maybe_token.as_ref().map_or(token, String::as_str); + + // the token must be a valid url + let url = match token.parse::() { + Ok(url) => url, + _ => continue, + }; + + // the scheme must be http or https + if !["http", "https"].contains(&url.scheme()) { + continue; + } + + // skip duplicate urls within the message + if dedup_urls.contains(&url) { + continue; + } + + info!("[{}] RESOLVE <{}>", rtd.conf.network.name, token); + + // try to get the title from the url + let title = if let Some(title) = process_plugins(rtd, &url) { + title + } else { + match resolve_url(token, rtd) { + Ok(title) => title, + Err(err) => { + error!("{:?}", err); + responses.push(TitleResp::Error(err.to_string())); + continue; + } + } + }; + + // limit response length, see RFC1459 + + let msg = utf8_truncate(&format!("⤷ {}", title), 510); + + info!("[{}] {}", rtd.conf.network.name, msg); + + responses.push(TitleResp::Title(msg.to_string())); + + dedup_urls.insert(url); + + // limit the number of processed URLs + num_processed += 1; + if num_processed == param!(rtd, url_limit) { + break; + } + } + + responses.into_iter() +} + +// regex for unsafe characters, as defined in RFC 1738 +const RE_UNSAFE_CHARS: &str = r#"[{}|\\^~\[\]`<>"]"#; + +/// does the token contain characters not permitted by RFC 1738 +fn contains_unsafe_chars(token: &str) -> bool { + lazy_static! { + static ref UNSAFE: Regex = Regex::new(RE_UNSAFE_CHARS).unwrap(); + } + UNSAFE.is_match(token) +} + +/// truncate to a maximum number of bytes, taking UTF-8 into account +fn utf8_truncate(s: &str, n: usize) -> String { + s.char_indices() + .take_while(|(len, c)| len + c.len_utf8() <= n) + .map(|(_, c)| c) + .collect() +} + +lazy_static! { + static ref REPEATED_DOTS: Regex = Regex::new(r"\.\.+").unwrap(); +} + +/// if a token has a recognised TLD, but no scheme, add one +pub fn add_scheme_for_tld(token: &str) -> Option { + if token.parse::().is_err() { + if token.starts_with(|s: char| !s.is_alphabetic()) { + return None; + } + + if REPEATED_DOTS.is_match(&token) { + return None; + } + + let new_token = format!("http://{}", token); + + if let Ok(url) = new_token.parse::() { + if !url.domain()?.contains('.') { + return None; + } + + // reject email addresses + if url.username() != "" { + return None; + } + + let tld = url.domain()?.split('.').last()?; + + if TLD.contains(tld) { + return Some(new_token); + } + } + } + + None +} + +fn init_rtd() -> AHResult { + // parse command line arguments with docopt + let args: Args = Docopt::new(USAGE) + .and_then(|d| d.version(Some(VERSION.to_string())).deserialize()) + .unwrap_or_else(|e| e.exit()); + + // avoid timestamping when piped, e.g. systemd + let timestamp = if is(Stream::Stderr) || args.flag_timestamp { + Timestamp::Second + } else { + Timestamp::Off + }; + + stderrlog::new() + .module(module_path!()) + .modules(vec![ + "url_bot_rs::message", + "url_bot_rs::config", + "url_bot_rs::http", + ]) + .verbosity(args.flag_verbose + MIN_VERBOSITY) + .timestamp(timestamp) + .color(ColorChoice::Never) + .init() + .unwrap(); + + let dirs = ProjectDirs::from("org", "", "url-bot-rs").unwrap(); + let default_conf_dir = dirs.config_dir(); + + let default_conf = default_conf_dir.join("config.toml"); + + let rtd: Rtd = Rtd::new().conf(&default_conf).load()?.init_http_client()?; + + Ok(rtd) +} + +fn random_string(size: usize) -> String { + thread_rng() + .sample_iter(&Alphanumeric) + .take(size) + .map(char::from) + .collect() +} + +#[tokio::main] +async fn main() -> AHResult<()> { + let (tx1, rx1) = mpsc::channel::(); + let (tx2, rx2) = mpsc::channel(); + + thread::spawn(move || { + let rtd = init_rtd().expect("RTD not initialized"); + + loop { + let message = &rx1.recv().expect("rx1 recv error"); + let titles: Vec<_> = process_titles(&rtd, message).collect(); + tx2.send(titles).expect("tx2 send error"); + } + }); + let amqp_url = std::env::var("AMQP_URL").expect("expected AMQP_URL env variabe"); + let conn = Connection::connect(&amqp_url, ConnectionProperties::default().with_tokio()).await?; + + let pub_channel = conn.create_channel().await?; + let sub_channel = conn.create_channel().await?; + + let queue = sub_channel + .queue_declare( + &random_string(32), + QueueDeclareOptions { + exclusive: true, + auto_delete: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await?; + + sub_channel + .queue_bind( + queue.name().as_str(), + "irc", + "msg.hedgewars", + QueueBindOptions::default(), + FieldTable::default(), + ) + .await?; + + let mut subscriber = sub_channel + .basic_consume( + queue.name().as_str(), + &random_string(32), + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + while let Some(amqp_message) = subscriber.next().await { + let (_, delivery) = amqp_message.expect("error in consumer"); + delivery.ack(BasicAckOptions::default()).await?; + + let chat_message = String::from_utf8(delivery.data)?; + if let Some((_who, message)) = chat_message.split_once('\n') { + tx1.send(message.to_owned())?; + let titles = rx2.recv()?; + + for title in titles { + let title_message = match title { + TitleResp::Title(t) => t, + TitleResp::Error(e) => e, + }; + pub_channel + .basic_publish( + "irc", + "say.hedgewars", + BasicPublishOptions::default(), + title_message.as_bytes().to_vec(), + BasicProperties::default(), + ) + .await?; + } + } + } + + Ok(()) +}