use std::sync::Arc; use futures::{SinkExt, StreamExt}; use hyper_rustls::ConfigBuilderExt; use rustls::RootCertStore; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::tungstenite::Message; use crate::base::cert_utils; use crate::tcp_relay_client::client_config::ClientConfig; pub async fn relay_client(ws_url: String, listen_address: String, config: Arc) { log::info!("Start to listen on {}", listen_address); let listener = match TcpListener::bind(&listen_address).await { Ok(l) => l, Err(e) => { log::error!("Failed to start to listen on {}! {}", listen_address, e); std::process::exit(3); } }; loop { let (socket, _) = listener .accept() .await .expect("Failed to accept new connection!"); tokio::spawn(relay_connection(ws_url.clone(), socket, config.clone())); } } /// Relay connection /// /// WS read => TCP write /// TCP read => WS write async fn relay_connection(ws_url: String, socket: TcpStream, conf: Arc) { log::debug!("Connecting to {}...", ws_url); let ws_stream = if ws_url.starts_with("wss") { let config = rustls::ClientConfig::builder().with_safe_defaults(); let config = match conf.get_root_certificate() { None => config.with_native_roots(), Some(cert) => { log::debug!("Using custom root certificates"); let mut store = RootCertStore::empty(); cert_utils::parse_pem_certificates(&cert) .unwrap() .iter() .for_each(|c| store.add(c).expect("Failed to add certificate to chain!")); config.with_root_certificates(store) } }; let config = match conf.get_client_keypair() { None => config.with_no_client_auth(), Some((certs, key)) => { let certs = cert_utils::parse_pem_certificates(certs) .expect("Failed to parse client certificate!"); let key = cert_utils::parse_pem_private_key(key) .expect("Failed to parse client auth private key!"); config .with_single_cert(certs, key) .expect("Failed to set client certificate!") } }; let connector = tokio_tungstenite::Connector::Rustls(Arc::new(config)); let (ws_stream, _) = tokio_tungstenite::connect_async_tls_with_config(ws_url, None, Some(connector)) .await .expect("Failed to connect to server relay!"); ws_stream } else { let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url) .await .expect("Failed to connect to server relay!"); ws_stream }; let (mut tcp_read, mut tcp_write) = socket.into_split(); let (mut ws_write, mut ws_read) = ws_stream.split(); // TCP read -> WS write let future = async move { let mut buff: [u8; 5000] = [0; 5000]; loop { match tcp_read.read(&mut buff).await { Ok(s) => { if let Err(e) = ws_write.send(Message::Binary(Vec::from(&buff[0..s]))).await { log::error!( "Failed to write to WS connection! {:?} Exiting TCP read -> WS write loop...",e); break; } if s == 0 { log::info!("Got empty read TCP buffer. Stopping..."); break; } } Err(e) => { log::error!( "Failed to read from TCP connection! {:?} Exitin TCP read -> WS write loop...", e ); break; } } } }; tokio::spawn(future); // WS read -> TCP write while let Some(m) = ws_read.next().await { match m { Err(e) => { log::error!( "Failed to read from WebSocket. Breaking read loop... {:?}", e ); break; } Ok(Message::Binary(b)) => { if let Err(e) = tcp_write.write_all(&b).await { log::error!( "Failed to forward message to websocket. Closing reading end... {:?}", e ); break; }; } Ok(Message::Close(_r)) => { log::info!("Server asked to close this WebSocket connection"); break; } Ok(m) => log::info!("{:?}", m), } } }