mirror of
https://gitlab.com/comunic/comunicapiv3
synced 2025-06-20 16:35:17 +00:00
Upgrade user tokens system
This commit is contained in:
@ -86,10 +86,9 @@ pub fn login_user(request: &mut HttpRequestHandler) -> RequestResult {
|
||||
|
||||
/// Sign out user
|
||||
pub fn logout_user(request: &mut HttpRequestHandler) -> RequestResult {
|
||||
account_helper::destroy_login_tokens(
|
||||
&request.user_id()?,
|
||||
request.api_client(),
|
||||
)?;
|
||||
if let Some(token) = request.user_access_token() {
|
||||
account_helper::destroy_login_tokens(token)?;
|
||||
}
|
||||
|
||||
request.success("User disconnected.")
|
||||
}
|
||||
|
@ -50,7 +50,7 @@ impl UserWsRequestHandler {
|
||||
|
||||
let mut found = false;
|
||||
user_ws_controller::foreach_connection(|p| {
|
||||
if !found && p.user_id == peer_id && p.is_having_call_with_conversation(call_id) {
|
||||
if !found && p.user_id() == peer_id && p.is_having_call_with_conversation(call_id) {
|
||||
found = true;
|
||||
}
|
||||
|
||||
@ -148,7 +148,7 @@ pub fn join_call(r: &mut UserWsRequestHandler) -> RequestResult {
|
||||
|
||||
// Remove any other active connection to current call of current user
|
||||
user_ws_controller::foreach_connection(|conn| {
|
||||
if &conn.user_id != r.user_id_ref()? || conn.session.eq(&r.get_conn().session) {
|
||||
if conn.user_id() != r.user_id_ref()? || conn.session.eq(&r.get_conn().session) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@ -197,7 +197,7 @@ pub fn get_members_list(r: &mut UserWsRequestHandler) -> RequestResult {
|
||||
|
||||
user_ws_controller::foreach_connection(|conn| {
|
||||
if conn.is_having_call_with_conversation(&conv_id) {
|
||||
list.push(CallMemberInfo::new(&conn.user_id, &conn.active_call.as_ref().unwrap()));
|
||||
list.push(CallMemberInfo::new(conn.user_id(), &conn.active_call.as_ref().unwrap()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -297,7 +297,7 @@ pub fn mark_user_ready(r: &mut UserWsRequestHandler) -> Res {
|
||||
r.update_call(|call| call.ready = true)?;
|
||||
|
||||
user_ws_controller::send_message_to_specific_connections(
|
||||
|c| c.user_id != &user_id && c.is_having_call_with_conversation(&call_id),
|
||||
|c| c.user_id() != &user_id && c.is_having_call_with_conversation(&call_id),
|
||||
|_| UserWsMessage::no_id_message("call_peer_ready", CallPeerReadyAPI::new(&call_id, r.user_id_ref()?)),
|
||||
None::<fn(&_) -> _>,
|
||||
)?;
|
||||
@ -335,7 +335,7 @@ pub fn stop_streaming(r: &mut UserWsRequestHandler) -> Res {
|
||||
|
||||
// Notify all other users
|
||||
user_ws_controller::send_message_to_specific_connections(
|
||||
|c| c.is_having_call_with_conversation(&call_id) && c.user_id != &user_id,
|
||||
|c| c.is_having_call_with_conversation(&call_id) && c.user_id() != &user_id,
|
||||
|_| UserWsMessage::no_id_message("call_peer_interrupted_streaming", CallPeerInterruptedStreamingAPI::new(&call_id, &user_id)),
|
||||
None::<fn(&_) -> _>,
|
||||
)?;
|
||||
@ -362,17 +362,17 @@ pub fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) ->
|
||||
|
||||
// Close main stream (sender)
|
||||
events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream {
|
||||
call_hash: gen_call_hash(&conv_id, &connection.user_id),
|
||||
call_hash: gen_call_hash(&conv_id, connection.user_id()),
|
||||
peer_id: None,
|
||||
}))?;
|
||||
|
||||
// Close receiver streams (other users streams)
|
||||
user_ws_controller::foreach_connection(
|
||||
|peer_conn| {
|
||||
if peer_conn.is_having_call_with_conversation(conv_id) && &peer_conn.user_id != &connection.user_id {
|
||||
if peer_conn.is_having_call_with_conversation(conv_id) && peer_conn.user_id() != connection.user_id() {
|
||||
events_helper::propagate_event(&Event::CloseCallStream(&CloseCallStream {
|
||||
call_hash: gen_call_hash(&conv_id, &peer_conn.user_id),
|
||||
peer_id: Some(connection.user_id.clone()),
|
||||
call_hash: gen_call_hash(&conv_id, peer_conn.user_id()),
|
||||
peer_id: Some(connection.user_id().clone()),
|
||||
}))?;
|
||||
}
|
||||
|
||||
@ -381,7 +381,7 @@ pub fn make_user_leave_call(conv_id: &ConvID, connection: &UserWsConnection) ->
|
||||
)?;
|
||||
|
||||
// Create a notification
|
||||
events_helper::propagate_event(&Event::UserLeftCall(conv_id, &connection.user_id))?;
|
||||
events_helper::propagate_event(&Event::UserLeftCall(conv_id, connection.user_id()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -391,7 +391,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
|
||||
match e {
|
||||
Event::UserJoinedCall(conv_id, user_id) => {
|
||||
user_ws_controller::send_message_to_specific_connections(
|
||||
|c| c.is_having_call_with_conversation(conv_id) && &c.user_id != user_id,
|
||||
|c| c.is_having_call_with_conversation(conv_id) && c.user_id() != user_id,
|
||||
|_| UserWsMessage::no_id_message("user_joined_call", JoinedCallMessage::new(conv_id, user_id)),
|
||||
None::<fn(&_) -> _>,
|
||||
)?;
|
||||
@ -429,7 +429,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
|
||||
|
||||
|
||||
user_ws_controller::send_message_to_specific_connections(
|
||||
|c| c.user_id == target_user && c.is_having_call_with_conversation(&call_id),
|
||||
|c| c.user_id() == target_user && c.is_having_call_with_conversation(&call_id),
|
||||
|_| UserWsMessage::no_id_message("new_call_signal", NewCallSignalAPI::new(&call_id, &peer_id, &msg.data)?),
|
||||
None::<fn(&_) -> _>,
|
||||
)?;
|
||||
|
@ -94,7 +94,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
|
||||
|c| c.posts.contains(&comment.post_id),
|
||||
|c| UserWsMessage::no_id_message(
|
||||
"new_comment",
|
||||
CommentAPI::new(comment, &Some(c.user_id.clone()))?,
|
||||
CommentAPI::new(comment, &Some(c.user_id().clone()))?,
|
||||
),
|
||||
None::<fn(&_) -> _>,
|
||||
)?;
|
||||
@ -105,7 +105,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
|
||||
|c| c.posts.contains(&comment.post_id),
|
||||
|c| UserWsMessage::no_id_message(
|
||||
"comment_updated",
|
||||
CommentAPI::new(comment, &Some(c.user_id.clone()))?,
|
||||
CommentAPI::new(comment, &Some(c.user_id().clone()))?,
|
||||
),
|
||||
None::<fn(&_) -> _>,
|
||||
)?;
|
||||
|
@ -362,7 +362,7 @@ pub fn handle_event(e: &events_helper::Event) -> Res {
|
||||
user_ws_controller::send_message_to_specific_connections(
|
||||
|f| f.conversations.contains(&msg.conv_id),
|
||||
|_| UserWsMessage::no_id_message("new_conv_message", ConversationMessageAPI::new(msg)),
|
||||
Some(|conn: &UserWsConnection| conversations_helper::mark_user_seen(msg.conv_id, &conn.user_id)),
|
||||
Some(|conn: &UserWsConnection| conversations_helper::mark_user_seen(msg.conv_id, conn.user_id())),
|
||||
)?;
|
||||
}
|
||||
|
||||
|
@ -15,12 +15,12 @@ use crate::api_data::res_get_ws_token::ResGetWsToken;
|
||||
use crate::constants::{USER_LAST_ACTIVITY_REFRESH, WS_ACCESS_TOKEN_LENGTH};
|
||||
use crate::controllers::user_ws_controller::ws_connections_list::{add_connection, find_connection, get_ws_connections_list, remove_connection};
|
||||
use crate::controllers::user_ws_routes::find_user_ws_route;
|
||||
use crate::data::api_client::APIClient;
|
||||
use crate::data::base_request_handler::BaseRequestHandler;
|
||||
use crate::data::config::conf;
|
||||
use crate::data::error::{ExecError, Res, ResultBoxError};
|
||||
use crate::data::http_request_handler::HttpRequestHandler;
|
||||
use crate::data::user::UserID;
|
||||
use crate::data::user_token::UserAccessToken;
|
||||
use crate::data::user_ws_connection::UserWsConnection;
|
||||
use crate::data::user_ws_message::UserWsMessage;
|
||||
use crate::data::user_ws_request_handler::{UserWsRequestHandler, UserWsResponseType};
|
||||
@ -41,15 +41,14 @@ mod ws_tokens_list {
|
||||
use std::sync::Mutex;
|
||||
|
||||
use crate::constants::WS_ACCESS_TOKEN_LIFETIME;
|
||||
use crate::data::user::UserID;
|
||||
use crate::data::user_token::UserAccessToken;
|
||||
use crate::utils::date_utils::time;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WsToken {
|
||||
pub time: u64,
|
||||
pub client_id: u32,
|
||||
pub user_id: UserID,
|
||||
pub token: String,
|
||||
pub user_token: UserAccessToken,
|
||||
pub ws_token: String,
|
||||
pub incognito: bool,
|
||||
pub remote_ip: String,
|
||||
}
|
||||
@ -89,7 +88,7 @@ mod ws_tokens_list {
|
||||
let list = get_list();
|
||||
let mut list = list.lock().unwrap();
|
||||
for i in 0..list.len() {
|
||||
if list[i].token == t {
|
||||
if list[i].ws_token == t {
|
||||
return Some(list.remove(i));
|
||||
}
|
||||
}
|
||||
@ -178,10 +177,9 @@ pub fn get_token(r: &mut HttpRequestHandler) -> ResultBoxError {
|
||||
let access_token = rand_str(WS_ACCESS_TOKEN_LENGTH);
|
||||
|
||||
let token = ws_tokens_list::WsToken {
|
||||
user_id: r.user_id()?,
|
||||
client_id: r.api_client().id,
|
||||
user_token: r.some_or_internal_error(r.user_access_token().cloned(), "No access token!")?,
|
||||
time: time(),
|
||||
token: access_token.to_string(),
|
||||
ws_token: access_token.to_string(),
|
||||
incognito: r.post_bool_opt("incognito", false),
|
||||
remote_ip: r.remote_ip(),
|
||||
};
|
||||
@ -195,10 +193,8 @@ pub fn get_token(r: &mut HttpRequestHandler) -> ResultBoxError {
|
||||
pub struct WsSession {
|
||||
// NOTE : apart from hb, the values here won't change !
|
||||
|
||||
user_id: UserID,
|
||||
|
||||
// Client used for the connection
|
||||
client_id: u32,
|
||||
// Information about user connection
|
||||
user_token: UserAccessToken,
|
||||
|
||||
// Remote IP address
|
||||
remote_ip: String,
|
||||
@ -232,16 +228,16 @@ impl WsSession {
|
||||
});
|
||||
}
|
||||
|
||||
/// helper method that update user last activity at every specified amount of time
|
||||
/// Helper method that update user last activity at every specified amount of time
|
||||
fn user_activity(&self, ctx: &mut actix_web_actors::ws::WebsocketContext<Self>) {
|
||||
if !self.incognito && account_helper::update_last_activity(&self.user_id).is_err() {
|
||||
eprintln!("Failed to do initial refresh of last activity for user {} !", self.user_id.id());
|
||||
if !self.incognito && account_helper::update_last_activity(&self.user_token.user_id).is_err() {
|
||||
eprintln!("Failed to do initial refresh of last activity for user {} !", self.user_token.user_id.id());
|
||||
}
|
||||
|
||||
ctx.run_interval(USER_LAST_ACTIVITY_REFRESH, |_, ctx| {
|
||||
if let Some(conn) = find_connection(ctx.address()) {
|
||||
if !conn.incognito && account_helper::update_last_activity(&conn.user_id).is_err() {
|
||||
eprintln!("Failed to refresh last activity for user {} !", conn.user_id.id());
|
||||
if !conn.incognito && account_helper::update_last_activity(conn.user_id()).is_err() {
|
||||
eprintln!("Failed to refresh last activity for user {} !", conn.user_id().id());
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -312,8 +308,7 @@ impl Actor for WsSession {
|
||||
self.user_activity(ctx);
|
||||
|
||||
add_connection(UserWsConnection {
|
||||
user_id: self.user_id.clone(),
|
||||
client_id: self.client_id,
|
||||
user_token: self.user_token.clone(),
|
||||
remote_ip: self.remote_ip.clone(),
|
||||
session: ctx.address(),
|
||||
incognito: self.incognito,
|
||||
@ -445,10 +440,9 @@ pub async fn ws_route(
|
||||
actix_web_actors::ws::start(
|
||||
WsSession {
|
||||
remote_ip: token.remote_ip,
|
||||
user_id: token.user_id,
|
||||
user_token: token.user_token,
|
||||
hb: std::time::Instant::now(),
|
||||
incognito: token.incognito,
|
||||
client_id: token.client_id,
|
||||
},
|
||||
&req,
|
||||
stream,
|
||||
@ -467,7 +461,7 @@ pub fn send_message_to_users(msg: &UserWsMessage, users: &Vec<UserID>) -> Res {
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|f| users.contains(&f.user_id))
|
||||
.filter(|f| users.contains(f.user_id()))
|
||||
.map(|f| f.session.clone())
|
||||
.collect::<Vec<Addr<WsSession>>>();
|
||||
|
||||
@ -484,7 +478,7 @@ pub fn send_message_to_user(msg: &UserWsMessage, user: &UserID) -> Res {
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|f| user == &f.user_id)
|
||||
.filter(|f| user == f.user_id())
|
||||
.map(|f| f.session.clone())
|
||||
.collect::<Vec<Addr<WsSession>>>();
|
||||
|
||||
@ -527,22 +521,22 @@ pub fn send_message_to_specific_connections<F, M, A>(filter: F, msg_generator: M
|
||||
|
||||
/// Check out whether user is connected or not
|
||||
pub fn is_user_connected(user_id: &UserID) -> bool {
|
||||
get_ws_connections_list().lock().unwrap().iter().any(|c| &c.user_id == user_id)
|
||||
get_ws_connections_list().lock().unwrap().iter().any(|c| c.user_id() == user_id)
|
||||
}
|
||||
|
||||
/// Check out whether user is connected or not and has at list one not incognito connection
|
||||
pub fn is_user_connected_not_incognito(user_id: &UserID) -> bool {
|
||||
get_ws_connections_list().lock().unwrap().iter().any(|c| &c.user_id == user_id && !c.incognito)
|
||||
get_ws_connections_list().lock().unwrap().iter().any(|c| c.user_id() == user_id && !c.incognito)
|
||||
}
|
||||
|
||||
|
||||
/// Disconnect a user from all the WebSockets of a given client
|
||||
pub fn disconnect_user_from_client(user_id: &UserID, client: &APIClient) -> Res {
|
||||
/// Disconnect a user from all the WebSockets of a given access token
|
||||
pub fn disconnect_from_user_token(token: &UserAccessToken) -> Res {
|
||||
let connections = get_ws_connections_list()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|f| &f.user_id == user_id && f.client_id == client.id)
|
||||
.filter(|f| &f.user_token == token)
|
||||
.map(|f| f.session.clone())
|
||||
.collect::<Vec<Addr<WsSession>>>();
|
||||
|
||||
@ -559,7 +553,7 @@ pub fn disconnect_user_from_all_sockets(user_id: &UserID) -> Res {
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|f| &f.user_id == user_id)
|
||||
.filter(|f| f.user_id() == user_id)
|
||||
.map(|f| f.session.clone())
|
||||
.collect::<Vec<Addr<WsSession>>>();
|
||||
|
||||
@ -585,8 +579,8 @@ pub fn foreach_connection<F>(mut f: F) -> Res
|
||||
/// Events handler
|
||||
pub fn handle_event(e: &events_helper::Event) -> Res {
|
||||
match e {
|
||||
Event::DestroyedLoginToken(user_id, client) => {
|
||||
disconnect_user_from_client(user_id, client)?;
|
||||
Event::DestroyedLoginToken(token) => {
|
||||
disconnect_from_user_token(token)?;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
Reference in New Issue
Block a user