use futures::{SinkExt, StreamExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::tungstenite::Message; pub async fn relay_client(ws_url: String, listen_address: String) { log::info!("Start to listen on {}", listen_address); let listener = match TcpListener::bind(&listen_address).await { Ok(l) => l, Err(e) => { log::error!("Failed to start to listen on {}! {}", listen_address, e); std::process::exit(3); } }; loop { let (socket, _) = listener.accept().await .expect("Failed to accept new connection!"); tokio::spawn(relay_connection(ws_url.clone(), socket)); } } /// Relay connection /// /// WS read => TCP write /// TCP read => WS write async fn relay_connection(ws_url: String, socket: TcpStream) { log::debug!("Connecting to {}...", ws_url); let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url) .await.expect("Failed to connect to server relay!"); let (mut tcp_read, mut tcp_write) = socket.into_split(); let (mut ws_write, mut ws_read) = ws_stream.split(); // TCP read -> WS write let future = async move { let mut buff: [u8; 5000] = [0; 5000]; loop { match tcp_read.read(&mut buff).await { Ok(s) => { if s == 0 { log::info!("Got empty read TCP buffer. Stopping..."); break; } if let Err(e) = ws_write.send(Message::Binary(Vec::from(&buff[0..s]))).await { log::error!( "Failed to write to WS connection! {:?} Exiting TCP read -> WS write loop...",e); break; } } Err(e) => { log::error!( "Failed to read from TCP connection! {:?} Exitin TCP read -> WS write loop...", e ); break; } } } }; tokio::spawn(future); // WS read -> TCP write while let Some(m) = ws_read.next().await { match m { Err(e) => { log::error!("Failed to read from WebSocket. Breaking read loop... {:?}", e); break; } Ok(Message::Binary(b)) => { if let Err(e) = tcp_write.write_all(&b).await { log::error!("Failed to forward message to websocket. Closing reading end... {:?}", e); break; }; } Ok(Message::Close(_r)) => { log::info!("Server asked to close this WebSocket connection"); break; } Ok(m) => log::info!("{:?}", m) } } }