Start to forward messages

This commit is contained in:
Pierre HUBERT 2022-08-30 14:17:21 +02:00
parent 235f03d3e9
commit 11d7ccf7c9
4 changed files with 155 additions and 14 deletions

82
Cargo.lock generated
View File

@ -346,6 +346,12 @@ version = "3.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.2.1"
@ -1378,6 +1384,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha-1"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sha1"
version = "0.10.1"
@ -1451,6 +1468,8 @@ dependencies = [
"log",
"reqwest",
"tokio",
"tokio-tungstenite",
"urlencoding",
]
[[package]]
@ -1498,6 +1517,26 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
[[package]]
name = "thiserror"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5f6586b7f764adc0231f4c79be7b920e766bb2f3e51b3661cdb263828f19994"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12bafc5b54507e0149cdf1b145a5d80ab80a90bcd9275df43d4fff68460f6c21"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.3.14"
@ -1573,6 +1612,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.17.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.3"
@ -1620,6 +1671,25 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0"
dependencies = [
"base64",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha-1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.15.0"
@ -1659,6 +1729,18 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "urlencoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821"
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@ -10,4 +10,6 @@ log = "0.4.17"
env_logger = "0.9.0"
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3.24"
futures = "0.3.24"
tokio-tungstenite = "0.17.2"
urlencoding = "2.1.0"

View File

@ -50,9 +50,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listen_address = format!("{}:{}", args.listen_address, port.port);
let h = tokio::spawn(relay_client(
args.token.clone(),
port.id,
args.listen_address.clone(),
format!("{}/ws?id={}&token={}",
args.relay_url, port.id, urlencoding::encode(&args.token))
.replace("http", "ws"),
listen_address,
));
handles.push(h);

View File

@ -1,8 +1,10 @@
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::tungstenite::Message;
use tokio::net::TcpListener;
pub async fn relay_client(token: String, port_id: usize, server: String, listen_address: String) {
log::info!("({}) Start to listen on {}", port_id, listen_address);
pub async fn relay_client(ws_url: String, listen_address: String) {
log::info!("Start to listen on {}", listen_address);
let listener = match TcpListener::bind(&listen_address).await {
Ok(l) => l,
Err(e) => {
@ -12,13 +14,68 @@ pub async fn relay_client(token: String, port_id: usize, server: String, listen_
};
loop {
let (mut socket, _) = listener.accept().await
let (socket, _) = listener.accept().await
.expect("Failed to accept new connection!");
let token = token.clone();
let listen_address = listen_address.clone();
tokio::spawn(async move {
log::info!("New connection to {}", &listen_address);
});
tokio::spawn(relay_connection(ws_url.clone(), socket));
}
}
/// Relay connection
///
/// WS read => TCP write
/// TCP read => WS write
async fn relay_connection(ws_url: String, socket: TcpStream) {
log::debug!("Connecting to {}...", ws_url);
let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url)
.await.expect("Failed to connect to server relay!");
let (mut tcp_read, mut tcp_write) = socket.into_split();
let (mut ws_write, mut ws_read) =
ws_stream.split();
// TCP read -> WS write
let future = async move {
let mut buff: [u8; 5000] = [0; 5000];
loop {
match tcp_read.read(&mut buff).await {
Ok(s) => {
if let Err(e) = ws_write.send(Message::Binary(Vec::from(&buff[0..s]))).await {
log::error!(
"Failed to write to WS connection! {:?} Exiting TCP read -> WS write loop...",e);
}
}
Err(e) => {
log::error!(
"Failed to read from TCP connection! {:?} Exitin TCP read -> WS write loop...",
e
);
break;
}
}
}
};
tokio::spawn(future);
// WS read -> TCP write
while let Some(m) = ws_read.next().await {
match m {
Err(e) => {
log::error!("Failed to read from WebSocket. Breaking read loop... {:?}", e);
break;
}
Ok(Message::Binary(b)) => {
if let Err(e) = tcp_write.write_all(&b).await {
log::error!("Failed to forward message to websocket. Closing reading end... {:?}", e);
break;
};
}
Ok(Message::Close(_r)) => {
log::info!("Server asked to close this WebSocket connection");
break;
}
Ok(m) => log::info!("{:?}", m)
}
}
}