From 11d7ccf7c93cfec44330759da8a58a3f776600e4 Mon Sep 17 00:00:00 2001 From: Pierre Hubert Date: Tue, 30 Aug 2022 14:17:21 +0200 Subject: [PATCH] Start to forward messages --- Cargo.lock | 82 ++++++++++++++++++++++++++++ tcp_relay_client/Cargo.toml | 4 +- tcp_relay_client/src/main.rs | 6 +- tcp_relay_client/src/relay_client.rs | 77 ++++++++++++++++++++++---- 4 files changed, 155 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b30e3f6..6884b95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,6 +346,12 @@ version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.2.1" @@ -1378,6 +1384,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1" version = "0.10.1" @@ -1451,6 +1468,8 @@ dependencies = [ "log", "reqwest", "tokio", + "tokio-tungstenite", + "urlencoding", ] [[package]] @@ -1498,6 +1517,26 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" +[[package]] +name = "thiserror" +version = "1.0.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5f6586b7f764adc0231f4c79be7b920e766bb2f3e51b3661cdb263828f19994" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12bafc5b54507e0149cdf1b145a5d80ab80a90bcd9275df43d4fff68460f6c21" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.14" @@ -1573,6 +1612,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.3" @@ -1620,6 +1671,25 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand", + "sha-1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.15.0" @@ -1659,6 +1729,18 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821" + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/tcp_relay_client/Cargo.toml b/tcp_relay_client/Cargo.toml index 76d4d01..a35978d 100644 --- a/tcp_relay_client/Cargo.toml +++ b/tcp_relay_client/Cargo.toml @@ -10,4 +10,6 @@ log = "0.4.17" env_logger = "0.9.0" reqwest = { version = "0.11", features = ["json"] } tokio = { version = "1", features = ["full"] } -futures = "0.3.24" \ No newline at end of file +futures = "0.3.24" +tokio-tungstenite = "0.17.2" +urlencoding = "2.1.0" \ No newline at end of file diff --git a/tcp_relay_client/src/main.rs b/tcp_relay_client/src/main.rs index 9045fa0..430c92b 100644 --- a/tcp_relay_client/src/main.rs +++ b/tcp_relay_client/src/main.rs @@ -50,9 +50,9 @@ async fn main() -> Result<(), Box> { let listen_address = format!("{}:{}", args.listen_address, port.port); let h = tokio::spawn(relay_client( - args.token.clone(), - port.id, - args.listen_address.clone(), + format!("{}/ws?id={}&token={}", + args.relay_url, port.id, urlencoding::encode(&args.token)) + .replace("http", "ws"), listen_address, )); handles.push(h); diff --git a/tcp_relay_client/src/relay_client.rs b/tcp_relay_client/src/relay_client.rs index c7c958f..d8a7e74 100644 --- a/tcp_relay_client/src/relay_client.rs +++ b/tcp_relay_client/src/relay_client.rs @@ -1,8 +1,10 @@ +use futures::{SinkExt, StreamExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_tungstenite::tungstenite::Message; -use tokio::net::TcpListener; - -pub async fn relay_client(token: String, port_id: usize, server: String, listen_address: String) { - log::info!("({}) Start to listen on {}", port_id, listen_address); +pub async fn relay_client(ws_url: String, listen_address: String) { + log::info!("Start to listen on {}", listen_address); let listener = match TcpListener::bind(&listen_address).await { Ok(l) => l, Err(e) => { @@ -12,13 +14,68 @@ pub async fn relay_client(token: String, port_id: usize, server: String, listen_ }; loop { - let (mut socket, _) = listener.accept().await + let (socket, _) = listener.accept().await .expect("Failed to accept new connection!"); - let token = token.clone(); - let listen_address = listen_address.clone(); - tokio::spawn(async move { - log::info!("New connection to {}", &listen_address); - }); + tokio::spawn(relay_connection(ws_url.clone(), socket)); + } +} + +/// Relay connection +/// +/// WS read => TCP write +/// TCP read => WS write +async fn relay_connection(ws_url: String, socket: TcpStream) { + log::debug!("Connecting to {}...", ws_url); + let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url) + .await.expect("Failed to connect to server relay!"); + + + let (mut tcp_read, mut tcp_write) = socket.into_split(); + let (mut ws_write, mut ws_read) = + ws_stream.split(); + + // TCP read -> WS write + let future = async move { + let mut buff: [u8; 5000] = [0; 5000]; + loop { + match tcp_read.read(&mut buff).await { + Ok(s) => { + if let Err(e) = ws_write.send(Message::Binary(Vec::from(&buff[0..s]))).await { + log::error!( + "Failed to write to WS connection! {:?} Exiting TCP read -> WS write loop...",e); + } + } + Err(e) => { + log::error!( + "Failed to read from TCP connection! {:?} Exitin TCP read -> WS write loop...", + e + ); + break; + } + } + } + }; + tokio::spawn(future); + + // WS read -> TCP write + while let Some(m) = ws_read.next().await { + match m { + Err(e) => { + log::error!("Failed to read from WebSocket. Breaking read loop... {:?}", e); + break; + } + Ok(Message::Binary(b)) => { + if let Err(e) = tcp_write.write_all(&b).await { + log::error!("Failed to forward message to websocket. Closing reading end... {:?}", e); + break; + }; + } + Ok(Message::Close(_r)) => { + log::info!("Server asked to close this WebSocket connection"); + break; + } + Ok(m) => log::info!("{:?}", m) + } } } \ No newline at end of file