use crate::cli_args::cli_args; use crate::server; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use hyper_rustls::ConfigBuilderExt; use sea_battle_backend::data::*; use sea_battle_backend::human_player_ws::{ClientMessage, ServerMessage}; use sea_battle_backend::server::{ AcceptInviteQuery, BotPlayQuery, CreateInviteQuery, PlayRandomQuery, }; use sea_battle_backend::utils::res_utils::{boxed_error, Res}; use std::error::Error; use std::fmt::Display; use std::sync::mpsc::TryRecvError; use std::sync::{mpsc, Arc}; use tokio::net::TcpStream; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; type WsStream = WebSocketStream>; pub enum GetRemoteVersionError { ConnectionFailed, Other(Box), } /// Connection client /// /// This structure acts as a wrapper around websocket connection that handles automatically parsing /// of incoming messages and encoding of outgoing messages pub struct Client { sink: SplitSink, receiver: mpsc::Receiver, } impl Client { /// Get remote server version pub async fn get_server_version() -> Result { let url = format!("{}/version", cli_args().remote_server); log::debug!("Getting remote information from {} ...", url); let res = match reqwest::get(url).await { Ok(r) => r, Err(e) if e.is_timeout() || e.is_connect() => { return Err(GetRemoteVersionError::ConnectionFailed) } Err(e) => return Err(GetRemoteVersionError::Other(Box::new(e))), }; res.json() .await .map_err(|e| GetRemoteVersionError::Other(Box::new(e))) } /// Start to play against a bot /// /// When playing against a bot, local server is always used pub async fn start_bot_play(rules: &GameRules) -> Res { server::start_server_if_missing().await; Self::connect_url( &cli_args().local_server_address(), &format!( "/play/bot?{}", serde_urlencoded::to_string(&BotPlayQuery { rules: rules.clone(), player_name: "Human".to_string() }) .unwrap() ), ) .await } /// Start to play against a random player pub async fn start_random_play(player_name: D) -> Res { Self::connect_url( &cli_args().remote_server, &format!( "/play/random?{}", serde_urlencoded::to_string(&PlayRandomQuery { player_name: player_name.to_string() }) .unwrap() ), ) .await } /// Start a play by creating an invite pub async fn start_create_invite(rules: &GameRules, player_name: D) -> Res { Self::connect_url( &cli_args().remote_server, &format!( "/play/create_invite?{}", serde_urlencoded::to_string(&CreateInviteQuery { rules: rules.clone(), player_name: player_name.to_string() }) .unwrap() ), ) .await } /// Start a play by accepting an invite pub async fn start_accept_invite(code: String, player_name: D) -> Res { Self::connect_url( &cli_args().remote_server, &format!( "/play/accept_invite?{}", serde_urlencoded::to_string(&AcceptInviteQuery { code, player_name: player_name.to_string() }) .unwrap() ), ) .await } /// Do connect to a server, returning async fn connect_url(server: &str, uri: &str) -> Res { let mut ws_url = server.replace("http", "ws"); ws_url.push_str(uri); log::debug!("Connecting to {}", ws_url); let (socket, _) = if ws_url.starts_with("wss") { // Perform a connection over TLS let config = rustls::ClientConfig::builder() .with_safe_defaults() .with_native_roots() .with_no_client_auth(); let connector = tokio_tungstenite::Connector::Rustls(Arc::new(config)); tokio_tungstenite::connect_async_tls_with_config(ws_url, None, Some(connector)).await? } else { // Perform an unsecure connection tokio_tungstenite::connect_async(ws_url).await? }; let (sink, mut stream) = socket.split(); // Receive server message on a separate task let (sender, receiver) = mpsc::channel(); tokio::task::spawn(async move { loop { match Self::recv_next_msg(&mut stream).await { Ok(msg) => { if let Err(e) = sender.send(msg.clone()) { log::debug!("Failed to forward ws message! {} (msg={:?})", e, msg); break; } } Err(e) => { log::debug!("Failed receive next message from websocket! {}", e); break; } } } }); Ok(Self { sink, receiver }) } /// Receive next message from stream async fn recv_next_msg(stream: &mut SplitStream) -> Res { loop { let chunk = match stream.next().await { None => return Err(boxed_error("No more message in queue!")), Some(d) => d, }; match chunk? { Message::Text(t) => { log::debug!("TEXT Got a text message from server!"); let msg: ServerMessage = serde_json::from_str(&t)?; return Ok(msg); } Message::Binary(_) => { log::debug!("BINARY Got an unexpected binary message"); return Err(boxed_error("Received an unexpected binary message!")); } Message::Ping(_) => { log::debug!("PING Got a ping message from server"); } Message::Pong(_) => { log::debug!("PONG Got a pong message"); } Message::Close(_) => { log::debug!("CLOSE Got a close websocket message"); return Err(boxed_error("Server requested to close connection!")); } Message::Frame(_) => { log::debug!("FRAME Got an unexpected frame from server!"); return Err(boxed_error("Got an unexpected frame!")); } } } } /// Send a message through the stream pub async fn send_message(&mut self, msg: &ClientMessage) -> Res { self.sink .send(Message::Text(serde_json::to_string(&msg)?)) .await?; Ok(()) } /// Try to receive next message from websocket, in a non-blocking way pub async fn try_recv_next_message(&self) -> Res> { match self.receiver.try_recv() { Ok(msg) => Ok(Some(msg)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Disconnected) => Err(boxed_error("Receiver channel disconnected!")), } } /// Block until the next message from websocket is availabl pub async fn recv_next_message(&self) -> Res { Ok(self.receiver.recv()?) } /// Close connection pub async fn close_connection(&mut self) { if let Err(e) = self.sink.send(Message::Close(None)).await { log::debug!("Failed to close WS connection! {:?}", e); } } }