Compare commits
8 Commits
a6f7966243
...
220830
| Author | SHA1 | Date | |
|---|---|---|---|
| 884018a90d | |||
| 53ad29727e | |||
| dde219a717 | |||
| a866deb3e4 | |||
| dff4384bd8 | |||
| 1eedfba81c | |||
| 11d7ccf7c9 | |||
| 235f03d3e9 |
82
Cargo.lock
generated
82
Cargo.lock
generated
@@ -346,6 +346,12 @@ version = "3.11.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d"
|
checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "byteorder"
|
||||||
|
version = "1.4.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bytes"
|
name = "bytes"
|
||||||
version = "1.2.1"
|
version = "1.2.1"
|
||||||
@@ -1378,6 +1384,17 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sha-1"
|
||||||
|
version = "0.10.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"cpufeatures",
|
||||||
|
"digest",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha1"
|
name = "sha1"
|
||||||
version = "0.10.1"
|
version = "0.10.1"
|
||||||
@@ -1451,6 +1468,8 @@ dependencies = [
|
|||||||
"log",
|
"log",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-tungstenite",
|
||||||
|
"urlencoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1498,6 +1517,26 @@ version = "0.15.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
|
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thiserror"
|
||||||
|
version = "1.0.32"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f5f6586b7f764adc0231f4c79be7b920e766bb2f3e51b3661cdb263828f19994"
|
||||||
|
dependencies = [
|
||||||
|
"thiserror-impl",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thiserror-impl"
|
||||||
|
version = "1.0.32"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "12bafc5b54507e0149cdf1b145a5d80ab80a90bcd9275df43d4fff68460f6c21"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.3.14"
|
version = "0.3.14"
|
||||||
@@ -1573,6 +1612,18 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-tungstenite"
|
||||||
|
version = "0.17.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
"tungstenite",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-util"
|
name = "tokio-util"
|
||||||
version = "0.7.3"
|
version = "0.7.3"
|
||||||
@@ -1620,6 +1671,25 @@ version = "0.2.3"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
|
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tungstenite"
|
||||||
|
version = "0.17.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"byteorder",
|
||||||
|
"bytes",
|
||||||
|
"http",
|
||||||
|
"httparse",
|
||||||
|
"log",
|
||||||
|
"rand",
|
||||||
|
"sha-1",
|
||||||
|
"thiserror",
|
||||||
|
"url",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "typenum"
|
name = "typenum"
|
||||||
version = "1.15.0"
|
version = "1.15.0"
|
||||||
@@ -1659,6 +1729,18 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "urlencoding"
|
||||||
|
version = "2.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "utf-8"
|
||||||
|
version = "0.7.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "vcpkg"
|
name = "vcpkg"
|
||||||
version = "0.2.15"
|
version = "0.2.15"
|
||||||
|
|||||||
22
README.MD
Normal file
22
README.MD
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
# TCP over HTTP
|
||||||
|
This project aims to provide an easy-to-setup TCP forwarding solution:
|
||||||
|
|
||||||
|
```
|
||||||
|
|--------| |--------| |--------| | -------|
|
||||||
|
| | | Client | | Server | | |
|
||||||
|
| Client | -- TCP xx -- | | -- HTTP 80 / 443 -- | | -- TCP xx -- | Server |
|
||||||
|
| | | Relay | | Relay | | |
|
||||||
|
|--------| |--------| |--------| |--------|
|
||||||
|
```
|
||||||
|
|
||||||
|
This project can be used especially to bypass firewalls that blocks traffics
|
||||||
|
from ports others than the 80 / 443 duo.
|
||||||
|
|
||||||
|
This repository contains two binaries:
|
||||||
|
|
||||||
|
* `tpc_relay_server`: The server relay
|
||||||
|
* `tcp_relay_client`: The client relay
|
||||||
|
|
||||||
|
The clients relay authenticates itself to the server using a token.
|
||||||
|
|
||||||
|
A single server - client relay pair can relay multiple ports simultaneously from the same machine.
|
||||||
@@ -11,3 +11,5 @@ env_logger = "0.9.0"
|
|||||||
reqwest = { version = "0.11", features = ["json"] }
|
reqwest = { version = "0.11", features = ["json"] }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
futures = "0.3.24"
|
futures = "0.3.24"
|
||||||
|
tokio-tungstenite = "0.17.2"
|
||||||
|
urlencoding = "2.1.0"
|
||||||
@@ -50,9 +50,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let listen_address = format!("{}:{}", args.listen_address, port.port);
|
let listen_address = format!("{}:{}", args.listen_address, port.port);
|
||||||
|
|
||||||
let h = tokio::spawn(relay_client(
|
let h = tokio::spawn(relay_client(
|
||||||
args.token.clone(),
|
format!("{}/ws?id={}&token={}",
|
||||||
port.id,
|
args.relay_url, port.id, urlencoding::encode(&args.token))
|
||||||
args.listen_address.clone(),
|
.replace("http", "ws"),
|
||||||
listen_address,
|
listen_address,
|
||||||
));
|
));
|
||||||
handles.push(h);
|
handles.push(h);
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
pub async fn relay_client(ws_url: String, listen_address: String) {
|
||||||
|
log::info!("Start to listen on {}", listen_address);
|
||||||
pub async fn relay_client(token: String, port_id: usize, server: String, listen_address: String) {
|
|
||||||
log::info!("({}) Start to listen on {}", port_id, listen_address);
|
|
||||||
let listener = match TcpListener::bind(&listen_address).await {
|
let listener = match TcpListener::bind(&listen_address).await {
|
||||||
Ok(l) => l,
|
Ok(l) => l,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -12,13 +14,74 @@ pub async fn relay_client(token: String, port_id: usize, server: String, listen_
|
|||||||
};
|
};
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (mut socket, _) = listener.accept().await
|
let (socket, _) = listener.accept().await
|
||||||
.expect("Failed to accept new connection!");
|
.expect("Failed to accept new connection!");
|
||||||
|
|
||||||
let token = token.clone();
|
tokio::spawn(relay_connection(ws_url.clone(), socket));
|
||||||
let listen_address = listen_address.clone();
|
}
|
||||||
tokio::spawn(async move {
|
}
|
||||||
log::info!("New connection to {}", &listen_address);
|
|
||||||
});
|
/// Relay connection
|
||||||
|
///
|
||||||
|
/// WS read => TCP write
|
||||||
|
/// TCP read => WS write
|
||||||
|
async fn relay_connection(ws_url: String, socket: TcpStream) {
|
||||||
|
log::debug!("Connecting to {}...", ws_url);
|
||||||
|
let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url)
|
||||||
|
.await.expect("Failed to connect to server relay!");
|
||||||
|
|
||||||
|
|
||||||
|
let (mut tcp_read, mut tcp_write) = socket.into_split();
|
||||||
|
let (mut ws_write, mut ws_read) =
|
||||||
|
ws_stream.split();
|
||||||
|
|
||||||
|
// TCP read -> WS write
|
||||||
|
let future = async move {
|
||||||
|
let mut buff: [u8; 5000] = [0; 5000];
|
||||||
|
loop {
|
||||||
|
match tcp_read.read(&mut buff).await {
|
||||||
|
Ok(s) => {
|
||||||
|
if let Err(e) = ws_write.send(Message::Binary(Vec::from(&buff[0..s]))).await {
|
||||||
|
log::error!(
|
||||||
|
"Failed to write to WS connection! {:?} Exiting TCP read -> WS write loop...",e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if s == 0 {
|
||||||
|
log::info!("Got empty read TCP buffer. Stopping...");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::error!(
|
||||||
|
"Failed to read from TCP connection! {:?} Exitin TCP read -> WS write loop...",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tokio::spawn(future);
|
||||||
|
|
||||||
|
// WS read -> TCP write
|
||||||
|
while let Some(m) = ws_read.next().await {
|
||||||
|
match m {
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("Failed to read from WebSocket. Breaking read loop... {:?}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(Message::Binary(b)) => {
|
||||||
|
if let Err(e) = tcp_write.write_all(&b).await {
|
||||||
|
log::error!("Failed to forward message to websocket. Closing reading end... {:?}", e);
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(Message::Close(_r)) => {
|
||||||
|
log::info!("Server asked to close this WebSocket connection");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(m) => log::info!("{:?}", m)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -8,6 +8,10 @@ pub struct Args {
|
|||||||
#[clap(short, long)]
|
#[clap(short, long)]
|
||||||
pub tokens: Vec<String>,
|
pub tokens: Vec<String>,
|
||||||
|
|
||||||
|
/// Access tokens stored in a file, one token per line
|
||||||
|
#[clap(long)]
|
||||||
|
pub tokens_file: Option<String>,
|
||||||
|
|
||||||
/// Forwarded ports
|
/// Forwarded ports
|
||||||
#[clap(short, long)]
|
#[clap(short, long)]
|
||||||
pub ports: Vec<u16>,
|
pub ports: Vec<u16>,
|
||||||
|
|||||||
@@ -35,14 +35,22 @@ pub async fn config_route(req: HttpRequest, data: Data<Arc<Args>>) -> impl Respo
|
|||||||
async fn main() -> std::io::Result<()> {
|
async fn main() -> std::io::Result<()> {
|
||||||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||||
|
|
||||||
let args: Args = Args::parse();
|
let mut args: Args = Args::parse();
|
||||||
let args = Arc::new(args);
|
|
||||||
|
|
||||||
if args.ports.is_empty() {
|
if args.ports.is_empty() {
|
||||||
log::error!("No port to forward!");
|
log::error!("No port to forward!");
|
||||||
std::process::exit(2);
|
std::process::exit(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read tokens from file, if any
|
||||||
|
if let Some(file) = &args.tokens_file {
|
||||||
|
std::fs::read_to_string(file)
|
||||||
|
.expect("Failed to read tokens file!")
|
||||||
|
.split('\n')
|
||||||
|
.filter(|l| !l.is_empty())
|
||||||
|
.for_each(|t| args.tokens.push(t.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
if args.tokens.is_empty() {
|
if args.tokens.is_empty() {
|
||||||
log::error!("No tokens specified!");
|
log::error!("No tokens specified!");
|
||||||
std::process::exit(3);
|
std::process::exit(3);
|
||||||
@@ -50,6 +58,7 @@ async fn main() -> std::io::Result<()> {
|
|||||||
|
|
||||||
log::info!("Starting relay on http://{}", args.listen_address);
|
log::info!("Starting relay on http://{}", args.listen_address);
|
||||||
|
|
||||||
|
let args = Arc::new(args);
|
||||||
let args_clone = args.clone();
|
let args_clone = args.clone();
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
App::new()
|
App::new()
|
||||||
|
|||||||
@@ -1,35 +1,71 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use actix::{Actor, ActorContext, Addr, ArbiterHandle, AsyncContext, Context, Handler, Message, Running, StreamHandler};
|
use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler};
|
||||||
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
||||||
use actix_web::web::Data;
|
|
||||||
use actix_web_actors::ws;
|
use actix_web_actors::ws;
|
||||||
|
use actix_web_actors::ws::{CloseCode, CloseReason};
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
use crate::args::Args;
|
use crate::args::Args;
|
||||||
|
|
||||||
|
/// How often heartbeat pings are sent
|
||||||
|
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
|
/// How long before lack of client response causes a timeout
|
||||||
|
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "bool")]
|
#[rtype(result = "bool")]
|
||||||
pub struct DataForWebSocket(Vec<u8>);
|
pub struct DataForWebSocket(Vec<u8>);
|
||||||
|
|
||||||
|
#[derive(Message)]
|
||||||
|
#[rtype(result = "()")]
|
||||||
|
pub struct TCPReadEndClosed;
|
||||||
|
|
||||||
/// Define HTTP actor
|
/// Define HTTP actor
|
||||||
struct RelayWS {
|
struct RelayWS {
|
||||||
tcp_read: Option<OwnedReadHalf>,
|
tcp_read: Option<OwnedReadHalf>,
|
||||||
tcp_write: OwnedWriteHalf,
|
tcp_write: OwnedWriteHalf,
|
||||||
|
|
||||||
// TODO : add disconnect after ping timeout
|
// Client must respond to ping at a specific interval, otherwise we drop connection
|
||||||
|
hb: Instant,
|
||||||
|
|
||||||
// TODO : handle socket close
|
// TODO : handle socket close
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RelayWS {
|
||||||
|
/// helper method that sends ping to client every second.
|
||||||
|
///
|
||||||
|
/// also this method checks heartbeats from client
|
||||||
|
fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
|
||||||
|
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
|
||||||
|
// check client heartbeats
|
||||||
|
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
|
||||||
|
// heartbeat timed out
|
||||||
|
log::warn!("WebSocket Client heartbeat failed, disconnecting!");
|
||||||
|
|
||||||
|
// stop actor
|
||||||
|
ctx.stop();
|
||||||
|
|
||||||
|
// don't try to send a ping
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!("Send ping message...");
|
||||||
|
ctx.ping(b"");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Actor for RelayWS {
|
impl Actor for RelayWS {
|
||||||
type Context = ws::WebsocketContext<Self>;
|
type Context = ws::WebsocketContext<Self>;
|
||||||
|
|
||||||
fn started(&mut self, ctx: &mut Self::Context) {
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
|
self.hb(ctx);
|
||||||
|
|
||||||
// Start to read on remote socket
|
// Start to read on remote socket
|
||||||
let mut read_half = self.tcp_read.take().unwrap();
|
let mut read_half = self.tcp_read.take().unwrap();
|
||||||
let addr = ctx.address();
|
let addr = ctx.address();
|
||||||
@@ -38,6 +74,12 @@ impl Actor for RelayWS {
|
|||||||
loop {
|
loop {
|
||||||
match read_half.read(&mut buff).await {
|
match read_half.read(&mut buff).await {
|
||||||
Ok(l) => {
|
Ok(l) => {
|
||||||
|
if l == 0 {
|
||||||
|
log::info!("Got empty read. Closing read end...");
|
||||||
|
addr.do_send(TCPReadEndClosed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let to_send = DataForWebSocket(Vec::from(&buff[0..l]));
|
let to_send = DataForWebSocket(Vec::from(&buff[0..l]));
|
||||||
if let Err(e) = addr.send(to_send).await {
|
if let Err(e) = addr.send(to_send).await {
|
||||||
log::error!("Failed to send to websocket. Stopping now... {:?}", e);
|
log::error!("Failed to send to websocket. Stopping now... {:?}", e);
|
||||||
@@ -64,11 +106,17 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for RelayWS {
|
|||||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||||
match msg {
|
match msg {
|
||||||
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
||||||
|
Ok(ws::Message::Pong(_)) => self.hb = Instant::now(),
|
||||||
Ok(ws::Message::Text(text)) => ctx.text(text),
|
Ok(ws::Message::Text(text)) => ctx.text(text),
|
||||||
Ok(ws::Message::Close(_reason)) => ctx.stop(),
|
Ok(ws::Message::Close(_reason)) => ctx.stop(),
|
||||||
Ok(ws::Message::Binary(data)) => {
|
Ok(ws::Message::Binary(data)) => {
|
||||||
if let Err(e) = futures::executor::block_on(self.tcp_write.write_all(&data.to_vec())) {
|
if let Err(e) = futures::executor::block_on(self.tcp_write.write_all(&data.to_vec())) {
|
||||||
log::error!("Failed to forward some data, closing connection!");
|
log::error!("Failed to forward some data, closing connection! {:?}", e);
|
||||||
|
ctx.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if data.is_empty() {
|
||||||
|
log::info!("Got empty binary message. Closing websocket...");
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -86,6 +134,16 @@ impl Handler<DataForWebSocket> for RelayWS {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Handler<TCPReadEndClosed> for RelayWS {
|
||||||
|
type Result = ();
|
||||||
|
|
||||||
|
fn handle(&mut self, _msg: TCPReadEndClosed, ctx: &mut Self::Context) -> Self::Result {
|
||||||
|
ctx.close(Some(CloseReason {
|
||||||
|
code: CloseCode::Away,
|
||||||
|
description: Some("TCP read end closed.".to_string()),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(serde::Deserialize)]
|
#[derive(serde::Deserialize)]
|
||||||
pub struct WebSocketQuery {
|
pub struct WebSocketQuery {
|
||||||
@@ -117,7 +175,7 @@ pub async fn relay_ws(req: HttpRequest, stream: web::Payload,
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let relay = RelayWS { tcp_read: Some(tcp_read), tcp_write };
|
let relay = RelayWS { tcp_read: Some(tcp_read), tcp_write, hb: Instant::now() };
|
||||||
let resp = ws::start(relay, &req, stream);
|
let resp = ws::start(relay, &req, stream);
|
||||||
log::info!("Opening new WS connection for {:?} to {}", req.peer_addr(), upstream_addr);
|
log::info!("Opening new WS connection for {:?} to {}", req.peer_addr(), upstream_addr);
|
||||||
resp
|
resp
|
||||||
|
|||||||
Reference in New Issue
Block a user