|
1 use lapin::{ |
|
2 message::Delivery, options::*, types::FieldTable, BasicProperties, Connection, |
|
3 ConnectionProperties, |
|
4 }; |
|
5 use tokio_amqp::*; |
|
6 |
|
7 use futures::prelude::*; |
|
8 use irc::client::prelude::*; |
|
9 |
|
10 use anyhow::{bail, Result as AHResult}; |
|
11 |
|
12 use url::Url; |
|
13 |
|
14 use rand::distributions::Alphanumeric; |
|
15 use rand::{thread_rng, Rng}; |
|
16 |
|
17 fn url2irc_config(url: &str) -> AHResult<Config> { |
|
18 let url = Url::parse(url)?; |
|
19 |
|
20 if url.scheme() != "irc" { |
|
21 bail!("Expected 'irc' scheme") |
|
22 } |
|
23 |
|
24 Ok(Config { |
|
25 nickname: Some(url.username().to_owned()), |
|
26 nick_password: url.password().map(|s| s.to_owned()), |
|
27 server: url.host_str().map(|s| s.to_owned()), |
|
28 port: url.port(), |
|
29 channels: vec![format!("#{}", &url.path()[1..])], |
|
30 //use_mock_connection: true, |
|
31 ..Config::default() |
|
32 }) |
|
33 } |
|
34 |
|
35 fn random_string(size: usize) -> String { |
|
36 thread_rng() |
|
37 .sample_iter(&Alphanumeric) |
|
38 .take(size) |
|
39 .map(char::from) |
|
40 .collect() |
|
41 } |
|
42 |
|
43 async fn handle_irc(pub_channel: &lapin::Channel, irc_message: &Message) -> AHResult<()> { |
|
44 if let Command::PRIVMSG(msgtarget, message) = &irc_message.command { |
|
45 let target = irc_message |
|
46 .response_target() |
|
47 .expect("Really expected PRIVMSG would have a source"); |
|
48 let target = if target.starts_with('#') { |
|
49 &target[1..] |
|
50 } else { |
|
51 &target |
|
52 }; |
|
53 |
|
54 if message.starts_with("!") { |
|
55 if let Some((cmd, param)) = message.split_once(' ') { |
|
56 pub_channel |
|
57 .basic_publish( |
|
58 "irc", |
|
59 &format!("cmd.{}.{}", &cmd[1..], target), |
|
60 BasicPublishOptions::default(), |
|
61 format!("{}\n{}", msgtarget, param).as_bytes().to_vec(), |
|
62 BasicProperties::default(), |
|
63 ) |
|
64 .await?; |
|
65 } else { |
|
66 pub_channel |
|
67 .basic_publish( |
|
68 "irc", |
|
69 &format!("cmd.{}.{}", &message[1..], target), |
|
70 BasicPublishOptions::default(), |
|
71 msgtarget.as_bytes().to_vec(), |
|
72 BasicProperties::default(), |
|
73 ) |
|
74 .await?; |
|
75 } |
|
76 } else { |
|
77 pub_channel |
|
78 .basic_publish( |
|
79 "irc", |
|
80 &format!("msg.{}", target), |
|
81 BasicPublishOptions::default(), |
|
82 format!("{}\n{}", msgtarget, message).as_bytes().to_vec(), |
|
83 BasicProperties::default(), |
|
84 ) |
|
85 .await?; |
|
86 } |
|
87 } |
|
88 |
|
89 Ok(()) |
|
90 } |
|
91 |
|
92 async fn handle_amqp( |
|
93 irc_client: &mut Client, |
|
94 irc_channel: &str, |
|
95 delivery: Delivery, |
|
96 ) -> AHResult<()> { |
|
97 let message = String::from_utf8(delivery.data)?; |
|
98 Ok(irc_client.send_privmsg(irc_channel, message)?) |
|
99 } |
|
100 |
|
101 #[tokio::main] |
|
102 async fn main() -> AHResult<()> { |
|
103 let amqp_url = std::env::var("AMQP_URL").expect("expected AMQP_URL env variabe"); |
|
104 let irc_url = std::env::var("IRC_URL").expect("expected IRC_URL env variabe"); |
|
105 let conn = Connection::connect(&amqp_url, ConnectionProperties::default().with_tokio()).await?; |
|
106 |
|
107 let pub_channel = conn.create_channel().await?; |
|
108 let sub_channel = conn.create_channel().await?; |
|
109 |
|
110 let irc_config = url2irc_config(&irc_url)?; |
|
111 let irc_channel = irc_config.channels[0].to_owned(); |
|
112 let mut irc_client = Client::from_config(irc_config).await?; |
|
113 let mut irc_stream = irc_client.stream()?; |
|
114 irc_client.identify()?; |
|
115 |
|
116 let queue = sub_channel |
|
117 .queue_declare( |
|
118 &random_string(32), |
|
119 QueueDeclareOptions { |
|
120 exclusive: true, |
|
121 auto_delete: true, |
|
122 ..QueueDeclareOptions::default() |
|
123 }, |
|
124 FieldTable::default(), |
|
125 ) |
|
126 .await?; |
|
127 |
|
128 sub_channel |
|
129 .queue_bind( |
|
130 queue.name().as_str(), |
|
131 "irc", |
|
132 &format!("say.{}", &irc_channel[1..]), |
|
133 QueueBindOptions::default(), |
|
134 FieldTable::default(), |
|
135 ) |
|
136 .await?; |
|
137 |
|
138 let mut subscriber = sub_channel |
|
139 .basic_consume( |
|
140 queue.name().as_str(), |
|
141 &random_string(32), |
|
142 BasicConsumeOptions::default(), |
|
143 FieldTable::default(), |
|
144 ) |
|
145 .await?; |
|
146 |
|
147 loop { |
|
148 tokio::select! { |
|
149 Some(irc_message) = irc_stream.next() => handle_irc(&pub_channel, &irc_message?).await?, |
|
150 Some(amqp_message) = subscriber.next() => { |
|
151 let (_, delivery) = amqp_message.expect("error in consumer"); |
|
152 delivery |
|
153 .ack(BasicAckOptions::default()) |
|
154 .await?; |
|
155 |
|
156 handle_amqp(&mut irc_client, &irc_channel, delivery).await? |
|
157 } |
|
158 } |
|
159 } |
|
160 } |