use actix_http::ws::Message;
use futures_util::StreamExt as _;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
use tokio::select;
use tokio::time::interval;

/// How often heartbeat pings are sent
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);

/// How long before lack of client response causes a timeout
const CLIENT_TIMEOUT: Duration = Duration::from_secs(20);

/// Broadcast text & binary messages received from a client, respond to ping messages, and monitor
/// connection health to detect network issues and free up resources.
pub async fn handle(
    mut session: actix_ws::Session,
    mut msg_stream: actix_ws::MessageStream,
    mut socket: UnixStream,
) {
    log::info!("Connected to websocket");

    let mut last_heartbeat = Instant::now();
    let mut interval = interval(HEARTBEAT_INTERVAL);

    let mut buf_socket = [0u8; 1024];

    let reason = loop {
        // waits for either `msg_stream` to receive a message from the client, the broadcast channel
        // to send a message, or the heartbeat interval timer to tick, yielding the value of
        // whichever one is ready first
        select! {

            // heartbeat interval ticked
            _tick = interval.tick() => {
                // if no heartbeat ping/pong received recently, close the connection
                if Instant::now().duration_since(last_heartbeat) > CLIENT_TIMEOUT {
                    log::info!(
                        "client has not sent heartbeat in over {CLIENT_TIMEOUT:?}; disconnecting"
                    );

                    break None;
                }

                // send heartbeat ping
                let _ = session.ping(b"").await;
            }

            msg = msg_stream.next() => {
                let msg = match msg {
                    // received message from WebSocket client
                    Some(Ok(msg)) => msg,

                    // client WebSocket stream error
                    Some(Err(err)) => {
                        log::error!("{err}");
                        break None;
                    }

                    // client WebSocket stream ended
                    None => break None
                };

                log::debug!("msg: {msg:?}");

                match msg {
                    Message::Text(_) => {
                        log::error!("Received unexpected text on VNC WebSocket!");
                    }

                    Message::Binary(bin) => {
                        log::info!("Forward {} bytes to VNC server", bin.len());
                        if let Err(e) = socket.write(&bin).await {
                            log::error!("Failed to relay bytes to VNC socket {e}");
                            break None;
                        }
                    }

                    Message::Close(reason) => {
                        break reason;
                    }

                    Message::Ping(bytes) => {
                        last_heartbeat = Instant::now();
                        let _ = session.pong(&bytes).await;
                    }

                    Message::Pong(_) => {
                        last_heartbeat = Instant::now();
                    }

                    Message::Continuation(_) => {
                        log::warn!("no support for continuation frames");
                    }

                    // no-op; ignore
                    Message::Nop => {}
                };
            }

            // Forward socket packet to WS client
            count = socket.read(&mut buf_socket) => {
                let count = match count {
                    Ok(count) => count,
                    Err(e) => {
                        log::error!("[VNC] Failed to read from upstream! {e}");
                        break None;
                    }
                };

                if count == 0 {
                    log::warn!("[VNC] infinite loop (upstream), closing connection");
                    break None;
                }

                if let Err(e)=session.binary(buf_socket[0..count].to_vec()).await{
                    log::error!("[VNC] Failed to forward messages to upstream, will close connection! {e}");
                    break None
                }
            }
        }
    };

    // attempt to close connection gracefully
    let _ = session.close(reason).await;

    log::info!("Disconnected from websocket");
}