/** * User websocket controller * * @author Pierre Hubert */ import * as ws from 'ws'; import { Request } from 'express'; import { RequestHandler } from '../entities/RequestHandler'; import { time } from '../utils/DateUtils'; import { randomStr } from '../utils/CryptUtils'; import { EventsHelper } from '../helpers/EventsHelper'; import { NotificationsHelper } from '../helpers/NotificationsHelper'; import { ConversationsHelper } from '../helpers/ConversationsHelper'; interface PendingRequests { time: number, clientID: number, userID: number, token: string } export interface ActiveClient { socketID: string, clientID: number, userID: number, ws: ws } export interface WsMessage { id: string, title: string, data: any } // Tokens are valid only 10 seconds after they are generated const TOKENS_DURATION = 10 const TOKEN_LENGTH = 20 export class UserWebSocketController { /** * The list of pending connections */ static pending_list: PendingRequests[] = [] /** * The list of active clients */ static active_clients: ActiveClient[] = [] /** * Clean the list of tokens */ private static CleanList() { // Clean the list this.pending_list = this.pending_list .filter((l) => l.time + TOKENS_DURATION + 1 > time()) } /** * Get a websocket access token * * @param h Request handler */ public static async GetToken(h: RequestHandler) { this.CleanList(); // Generate a temporary token const token = randomStr(TOKEN_LENGTH); // Add the token to the list this.pending_list.push({ time: time(), clientID: h.getClientInfo().id, userID: h.getUserId(), token: token }); h.send({ token: token }); } /** * Handler user websocket request * * @param req Associated request * @param ws The socket */ public static async UserWS(req: Request, ws: ws) { this.CleanList(); // First, check for token if(!req.query.hasOwnProperty("token") || String(req.query.token).length != TOKEN_LENGTH) { ws.send("Missing token!"); ws.close(); return; } // Search appropriate connection const token = req.query.token; const entryIndex = this.pending_list.findIndex((el) => el.token == token); if(entryIndex == -1) { ws.send("Invalid token!"); ws.close(); return; } // Remove the entry from the array const entry = this.pending_list[entryIndex]; this.pending_list.splice(entryIndex, 1); // Add the client to the list of active clients const client: ActiveClient = { socketID: randomStr(30), clientID: entry.clientID, userID: entry.userID, ws: ws } this.active_clients.push(client); // Remove the client for the list as soon as the // socket is closed ws.addEventListener("close", () => { this.active_clients.splice(this.active_clients.indexOf(client), 1); }) // Handles error ws.addEventListener("error", (e) => { if(ws.readyState == ws.OPEN) ws.close(); console.log("WebSocket error", e) }) // Handles incoming messages ws.addEventListener("message", (msg) => { // Only accept text messages if(msg.type != "message") { console.error("Received a non-text messsage through a WebSocket !") ws.close(); return; } }) } /** * Close a specific user websocket * * @param clientID Target client ID * @param userID Target user ID */ public static async CloseClientSockets(clientID: number, userID: number) { for(const entry of this.active_clients.filter((f) => f.clientID == clientID && f.userID == userID)) entry.ws.close(); } /** * Send a message to a socket * * @param userID Target user ID * @param socketID Target socket ID (if null the message is sent to * all the active sockets of the user) * @param message The message to send */ public static Send(userID: number, socketID: string, message: WsMessage) { for(const entry of this.active_clients.filter( (e) => e.userID == userID && (socketID.length == 0 || e.socketID == socketID))) { if(entry.ws.readyState == ws.OPEN) entry.ws.send(JSON.stringify(message)); } } /** * Check out whether a user has an active websocket or not * * @param userID Target user ID */ public static IsConnected(userID: number) : boolean { return this.active_clients.find((e) => e.userID == userID) != undefined; } /** * Send updated notifications number to some users * * @param usersID Target users ID */ public static async SendNewNotificationsNumber(usersID: number[]) { // Process each user for(const userID of usersID) { if(!this.IsConnected(userID)) continue; // Notify user this.Send(userID, "", { title: "number_notifs", id: "", data: await NotificationsHelper.CountUnread(userID) }); } } /** * Send upated number of unread conversations count * * @param usersID Target users ID */ public static async SendNewUnreadConversationsCount(usersID: number[]) { for(const userID of usersID) { if(!this.IsConnected(userID)) continue; // Notify user this.Send(userID, "", { title: "number_unread_conversations", id: "", data: await ConversationsHelper.CountUnreadForUser(userID) }); } } } // When user sign out EventsHelper.Listen("destroyed_login_tokens", (e) => UserWebSocketController.CloseClientSockets(e.client.id, e.userID)); // When we get a new number of notifications EventsHelper.Listen("updated_number_notifications", async (e) => await UserWebSocketController.SendNewNotificationsNumber(e.usersID)); // When we get a new number of unread conversations EventsHelper.Listen("updated_number_unread_conversations", async (e) => await UserWebSocketController.SendNewUnreadConversationsCount(e.usersID));